名称: csharp-async-patterns 用户可调用: false 描述: 当编写异步C#代码时使用,包括C#异步/等待模式、Task、ValueTask、异步流和取消。 允许工具:
- Bash
- Read
- Write
- Edit
C# 异步模式
掌握使用async/await、Task、ValueTask、异步流和取消模式的C#异步编程。这个技能涵盖了从C# 8到12的现代异步模式,用于构建响应式、可扩展的应用程序。
Async/Await 基础
async/await模式提供了一种简单的方式来编写异步代码,看起来和行为像同步代码。
基本异步方法
public async Task<string> FetchDataAsync(string url)
{
using var client = new HttpClient();
string result = await client.GetStringAsync(url);
return result;
}
// 调用异步方法
public async Task ProcessAsync()
{
string data = await FetchDataAsync("https://api.example.com/data");
Console.WriteLine(data);
}
异步方法签名规则
// ✅ 正确 - 返回 Task
public async Task ProcessDataAsync()
{
await Task.Delay(1000);
}
// ✅ 正确 - 返回 Task<T>
public async Task<int> CalculateAsync()
{
await Task.Delay(1000);
return 42;
}
// ⚠️ 仅用于事件处理程序 - 返回 void
public async void Button_Click(object sender, EventArgs e)
{
await ProcessDataAsync();
}
// ❌ 错误 - 非异步但返回 Task
public Task WrongAsync()
{
// 应该是异步或使用 Task.FromResult
return Task.CompletedTask;
}
Task 和 Task<T>
Task 表示一个异步操作。Task<T> 表示一个返回值的操作。
创建任务
// Task.Run 用于 CPU 密集型工作
public async Task<int> CalculateSumAsync(int[] numbers)
{
return await Task.Run(() => numbers.Sum());
}
// Task.FromResult 用于已计算的值
public Task<string> GetCachedValueAsync(string key)
{
if (_cache.TryGetValue(key, out var value))
{
return Task.FromResult(value);
}
return FetchFromDatabaseAsync(key);
}
// Task.CompletedTask 用于 void 异步方法
public Task ProcessIfNeededAsync(bool condition)
{
if (!condition)
{
return Task.CompletedTask;
}
return DoActualWorkAsync();
}
任务组合
public async Task<Result> ProcessOrderAsync(Order order)
{
// 顺序执行
await ValidateOrderAsync(order);
await ChargePaymentAsync(order);
await ShipOrderAsync(order);
return new Result { Success = true };
}
public async Task<Result> ProcessOrderParallelAsync(Order order)
{
// 并行执行
var validationTask = ValidateOrderAsync(order);
var inventoryTask = CheckInventoryAsync(order);
var pricingTask = CalculatePricingAsync(order);
await Task.WhenAll(validationTask, inventoryTask, pricingTask);
return new Result
{
IsValid = await validationTask,
InStock = await inventoryTask,
Price = await pricingTask
};
}
ValueTask 和 ValueTask<T>
ValueTask 是一种性能优化,适用于结果通常同步可用的场景。
何时使用 ValueTask
public class CachedRepository
{
private readonly Dictionary<int, User> _cache = new();
private readonly IDatabase _database;
// ✅ 正确使用 ValueTask - 通常同步从缓存返回
public ValueTask<User> GetUserAsync(int id)
{
if (_cache.TryGetValue(id, out var user))
{
return ValueTask.FromResult(user);
}
return new ValueTask<User>(FetchUserFromDatabaseAsync(id));
}
private async Task<User> FetchUserFromDatabaseAsync(int id)
{
var user = await _database.QueryAsync<User>(id);
_cache[id] = user;
return user;
}
}
ValueTask 最佳实践
public class BufferedReader
{
private readonly byte[] _buffer = new byte[4096];
private int _position;
private int _length;
// ValueTask 用于热路径优化
public async ValueTask<byte> ReadByteAsync()
{
if (_position < _length)
{
// 同步路径 - 无分配
return _buffer[_position++];
}
// 异步路径 - 读取更多数据
await FillBufferAsync();
return _buffer[_position++];
}
private async Task FillBufferAsync()
{
_length = await _stream.ReadAsync(_buffer);
_position = 0;
}
}
// ⚠️ ValueTask 规则
public async Task ConsumeValueTaskAsync()
{
var reader = new BufferedReader();
// ✅ 正确 - 等待一次
byte b = await reader.ReadByteAsync();
// ❌ 错误 - 不要存储 ValueTask
var task = reader.ReadByteAsync();
await task; // 潜在问题
// ❌ 错误 - 不要多次等待
var vt = reader.ReadByteAsync();
await vt;
await vt; // 永远不要这样做
}
异步 Void 与异步 Task
理解何时使用异步 void(很少)与异步 Task(几乎总是)。
异步 Void 问题
// ❌ 不好 - 无法等待,异常未处理
public async void ProcessDataBadAsync()
{
await Task.Delay(1000);
throw new Exception("未处理!"); // 导致应用崩溃
}
// ✅ 好 - 可以等待,异常可处理
public async Task ProcessDataGoodAsync()
{
await Task.Delay(1000);
throw new Exception("已处理!"); // 可以捕获
}
// 使用
public async Task CallerAsync()
{
try
{
// 无法等待异步 void
ProcessDataBadAsync(); // 发射后不管 - 危险
// 可以等待异步 Task
await ProcessDataGoodAsync(); // 异常在此处捕获
}
catch (Exception ex)
{
Console.WriteLine($"捕获: {ex.Message}");
}
}
唯一有效的异步 Void 使用
// ✅ 事件处理程序 - 唯一有效的用例
public partial class MainWindow : Window
{
public async void SaveButton_Click(object sender, RoutedEventArgs e)
{
try
{
await SaveDataAsync();
MessageBox.Show("保存成功!");
}
catch (Exception ex)
{
MessageBox.Show($"错误: {ex.Message}");
}
}
private async Task SaveDataAsync()
{
await _repository.SaveAsync(_data);
}
}
ConfigureAwait(false)
控制同步上下文捕获,用于库代码中的性能。
理解 ConfigureAwait
// 库代码 - 使用 ConfigureAwait(false)
public class DataService
{
public async Task<Data> GetDataAsync(int id)
{
// ConfigureAwait(false) - 不捕获上下文
var json = await _httpClient.GetStringAsync($"/api/data/{id}")
.ConfigureAwait(false);
var data = await DeserializeAsync(json)
.ConfigureAwait(false);
return data;
}
}
// UI 代码 - 不要使用 ConfigureAwait(false)
public class ViewModel
{
public async Task LoadDataAsync()
{
var data = await _dataService.GetDataAsync(42);
// 需要 UI 上下文
this.DataProperty = data; // 更新 UI
}
}
ConfigureAwait 模式
public class AsyncLibrary
{
// ✅ 库方法使用 ConfigureAwait(false)
public async Task<Result> ProcessAsync(string input)
{
var step1 = await Step1Async(input).ConfigureAwait(false);
var step2 = await Step2Async(step1).ConfigureAwait(false);
var step3 = await Step3Async(step2).ConfigureAwait(false);
return step3;
}
// ✅ ASP.NET Core - ConfigureAwait(false) 到处安全
[HttpGet]
public async Task<IActionResult> GetData(int id)
{
// ASP.NET Core 没有同步上下文
var data = await _repository.GetAsync(id).ConfigureAwait(false);
return Ok(data);
}
}
CancellationToken 模式
为长时间运行的操作提供适当的取消支持。
基本取消
public async Task<List<Result>> ProcessItemsAsync(
IEnumerable<Item> items,
CancellationToken cancellationToken = default)
{
var results = new List<Result>();
foreach (var item in items)
{
// 检查取消
cancellationToken.ThrowIfCancellationRequested();
var result = await ProcessItemAsync(item, cancellationToken);
results.Add(result);
}
return results;
}
// 使用超时
public async Task<List<Result>> ProcessWithTimeoutAsync(IEnumerable<Item> items)
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
return await ProcessItemsAsync(items, cts.Token);
}
catch (OperationCanceledException)
{
Console.WriteLine("操作超时");
throw;
}
}
高级取消模式
public class BackgroundProcessor
{
private CancellationTokenSource? _cts;
public async Task StartAsync()
{
_cts = new CancellationTokenSource();
await ProcessLoopAsync(_cts.Token);
}
public void Stop()
{
_cts?.Cancel();
}
private async Task ProcessLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await ProcessBatchAsync(cancellationToken);
await Task.Delay(1000, cancellationToken);
}
catch (OperationCanceledException)
{
// 取消时预期
break;
}
}
}
// 链接取消令牌
public async Task ProcessWithMultipleTokensAsync(
CancellationToken userToken,
CancellationToken systemToken)
{
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(userToken, systemToken);
await DoWorkAsync(linkedCts.Token);
}
}
异步流 (IAsyncEnumerable)
使用 IAsyncEnumerable<T>(C# 8+)异步流式传输数据。
基本异步流
public async IAsyncEnumerable<LogEntry> ReadLogsAsync(
string filePath,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await using var stream = File.OpenRead(filePath);
using var reader = new StreamReader(stream);
string? line;
while ((line = await reader.ReadLineAsync(cancellationToken)) != null)
{
if (TryParseLog(line, out var entry))
{
yield return entry;
}
}
}
// 消费异步流
public async Task ProcessLogsAsync(string filePath)
{
await foreach (var log in ReadLogsAsync(filePath))
{
Console.WriteLine($"{log.Timestamp}: {log.Message}");
}
}
高级异步流模式
public class DataStreamProcessor
{
// 异步流带过滤
public async IAsyncEnumerable<Event> GetEventsAsync(
DateTime startDate,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
int page = 0;
while (true)
{
var events = await FetchPageAsync(page++, cancellationToken);
if (events.Count == 0)
yield break;
foreach (var evt in events.Where(e => e.Date >= startDate))
{
yield return evt;
}
}
}
// LINQ 风格操作异步流
public async IAsyncEnumerable<TResult> SelectAsync<TSource, TResult>(
IAsyncEnumerable<TSource> source,
Func<TSource, TResult> selector)
{
await foreach (var item in source)
{
yield return selector(item);
}
}
// 缓冲异步流
public async IAsyncEnumerable<List<T>> BufferAsync<T>(
IAsyncEnumerable<T> source,
int bufferSize)
{
var buffer = new List<T>(bufferSize);
await foreach (var item in source)
{
buffer.Add(item);
if (buffer.Count >= bufferSize)
{
yield return buffer;
buffer = new List<T>(bufferSize);
}
}
if (buffer.Count > 0)
{
yield return buffer;
}
}
}
并行异步操作
并发执行多个异步操作。
Task.WhenAll 和 Task.WhenAny
public async Task<Summary> GetDashboardDataAsync()
{
// 并发启动所有操作
var userTask = GetUserDataAsync();
var ordersTask = GetOrdersAsync();
var analyticsTask = GetAnalyticsAsync();
// 等待所有完成
await Task.WhenAll(userTask, ordersTask, analyticsTask);
return new Summary
{
User = await userTask,
Orders = await ordersTask,
Analytics = await analyticsTask
};
}
// 处理部分失败
public async Task<Results> ProcessWithPartialFailuresAsync()
{
var tasks = new[]
{
ProcessTask1Async(),
ProcessTask2Async(),
ProcessTask3Async()
};
await Task.WhenAll(tasks.Select(async t =>
{
try
{
await t;
}
catch (Exception ex)
{
// 记录但不抛出
Console.WriteLine($"任务失败: {ex.Message}");
}
}));
// 收集成功结果
var results = tasks
.Where(t => t.IsCompletedSuccessfully)
.Select(t => t.Result)
.ToList();
return new Results { Successful = results };
}
Task.WhenAny 用于超时和竞争
public async Task<T> WithTimeoutAsync<T>(Task<T> task, TimeSpan timeout)
{
var delayTask = Task.Delay(timeout);
var completedTask = await Task.WhenAny(task, delayTask);
if (completedTask == delayTask)
{
throw new TimeoutException("操作超时");
}
return await task;
}
// 竞争多个源
public async Task<Data> GetFastestDataAsync()
{
var primaryTask = GetFromPrimaryAsync();
var secondaryTask = GetFromSecondaryAsync();
var cacheTask = GetFromCacheAsync();
var completedTask = await Task.WhenAny(primaryTask, secondaryTask, cacheTask);
return await completedTask;
}
// 限流并行处理
public async Task<List<Result>> ProcessWithThrottlingAsync(
IEnumerable<Item> items,
int maxConcurrency)
{
var semaphore = new SemaphoreSlim(maxConcurrency);
var tasks = items.Select(async item =>
{
await semaphore.WaitAsync();
try
{
return await ProcessItemAsync(item);
}
finally
{
semaphore.Release();
}
});
return (await Task.WhenAll(tasks)).ToList();
}
异步代码中的异常处理
异步方法的正确异常处理模式。
基本异常处理
public async Task<Result> ProcessWithErrorHandlingAsync()
{
try
{
var data = await FetchDataAsync();
return await ProcessDataAsync(data);
}
catch (HttpRequestException ex)
{
_logger.LogError(ex, "发生网络错误");
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "发生意外错误");
return Result.Failed(ex.Message);
}
}
// 异常处理与 Task.WhenAll
public async Task ProcessMultipleAsync()
{
var tasks = new[] { Task1Async(), Task2Async(), Task3Async() };
try
{
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
// 只抛出第一个异常
_logger.LogError(ex, "至少一个任务失败");
// 获取所有异常:
var exceptions = tasks
.Where(t => t.IsFaulted)
.Select(t => t.Exception)
.ToList();
foreach (var exception in exceptions)
{
_logger.LogError(exception, "任务失败");
}
}
}
AggregateException 处理
public async Task HandleAllExceptionsAsync()
{
var tasks = Enumerable.Range(1, 10)
.Select(i => ProcessItemAsync(i))
.ToArray();
try
{
await Task.WhenAll(tasks);
}
catch
{
// 检查所有异常
var aggregateException = new AggregateException(
tasks.Where(t => t.IsFaulted)
.SelectMany(t => t.Exception?.InnerExceptions ?? Array.Empty<Exception>())
);
aggregateException.Handle(ex =>
{
if (ex is HttpRequestException)
{
_logger.LogWarning(ex, "网络错误 - 重试");
return true; // 已处理
}
return false; // 重新抛出
});
}
}
死锁预防
避免异步代码中的常见死锁场景。
常见死锁模式
// ❌ 死锁 - 在异步代码上阻塞
public void DeadlockExample()
{
// 在 UI 或 ASP.NET 上下文中会死锁
var result = GetDataAsync().Result;
// 这也会死锁
GetDataAsync().Wait();
}
// ✅ 正确 - 全程异步
public async Task CorrectExample()
{
var result = await GetDataAsync();
}
// ✅ 正确 - 在库代码中使用 ConfigureAwait(false)
public async Task<Data> LibraryMethodAsync()
{
var data = await FetchAsync().ConfigureAwait(false);
return ProcessData(data);
}
避免死锁
public class DeadlockFreeService
{
// ✅ 全程异步
public async Task<Result> ProcessAsync()
{
var data = await GetDataAsync();
var processed = await ProcessDataAsync(data);
return processed;
}
// ✅ 如果必须阻塞,使用 Task.Run
public Result ProcessSync()
{
return Task.Run(async () => await ProcessAsync()).GetAwaiter().GetResult();
}
// ✅ 使用异步处置
public async Task UseResourceAsync()
{
await using var resource = new AsyncDisposableResource();
await resource.ProcessAsync();
}
}
ASP.NET Core 中的异步
ASP.NET Core 应用程序中异步代码的最佳实践。
控制器异步模式
[ApiController]
[Route("api/[controller]")]
public class ProductsController : ControllerBase
{
private readonly IProductRepository _repository;
// ✅ 异步操作方法
[HttpGet("{id}")]
public async Task<ActionResult<Product>> GetProduct(
int id,
CancellationToken cancellationToken)
{
var product = await _repository.GetByIdAsync(id, cancellationToken);
if (product == null)
return NotFound();
return Ok(product);
}
[HttpPost]
public async Task<ActionResult<Product>> CreateProduct(
[FromBody] CreateProductRequest request,
CancellationToken cancellationToken)
{
var product = await _repository.CreateAsync(request, cancellationToken);
return CreatedAtAction(nameof(GetProduct), new { id = product.Id }, product);
}
// ✅ 使用 IAsyncEnumerable 流式响应
[HttpGet("stream")]
public async IAsyncEnumerable<Product> StreamProducts(
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var product in _repository.GetAllStreamAsync(cancellationToken))
{
yield return product;
}
}
}
后台服务
public class DataProcessorService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<DataProcessorService> _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("数据处理器服务启动");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessDataBatchAsync(stoppingToken);
await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
}
catch (OperationCanceledException)
{
// 停止时预期
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理数据批错误");
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
}
}
_logger.LogInformation("数据处理器服务停止");
}
private async Task ProcessDataBatchAsync(CancellationToken cancellationToken)
{
using var scope = _serviceProvider.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IDataRepository>();
await repository.ProcessBatchAsync(cancellationToken);
}
}
最佳实践
- 全程异步: 永远不要用 .Result 或 .Wait() 阻塞异步代码
- 使用 CancellationToken: 始终为长时间运行的操作接受 CancellationToken
- 库中使用 ConfigureAwait: 在库代码中使用 ConfigureAwait(false)
- 避免异步 Void: 只对事件处理程序使用异步 void
- 直接返回 Task: 可能时,直接返回 Task 而不使用 await
- 热路径使用 ValueTask: 为频繁调用、通常同步的方法考虑 ValueTask
- 处理所有异常: 始终在异步方法中处理异常
- 不要混合阻塞和异步: 每个调用链选择一个范式
- 处置异步资源: 使用 await using 处理 IAsyncDisposable
- 用取消测试: 测试取消是否正确工作
常见陷阱
- 阻塞异步代码: 使用 .Result 或 .Wait() 导致死锁
- 忘记 ConfigureAwait: 可能导致库中的性能问题
- 异步 Void 方法: 无法等待并吞没异常
- 不处理取消: 忽略 CancellationToken 参数
- 过度使用 Task.Run: 不要将已异步的代码包装在 Task.Run 中
- 不必要地捕获上下文: 不需要上下文时浪费资源
- 发射后不管: 启动异步操作而不等待
- 混合同步和异步: 造成混淆和潜在死锁
- 不正确使用 ValueTask: 多次等待 ValueTask
- 忽略 Task.WhenAll 中的异常: 只捕获第一个异常
何时使用
使用此技能当:
- 用 C# 编写异步代码
- 实现 I/O 绑定操作(数据库、网络、文件系统)
- 构建响应式 UI 应用程序
- 创建可扩展的 Web 服务
- 处理数据流
- 实现取消支持
- 用 ValueTask 优化异步性能
- 处理并行异步操作
- 预防异步代码中的死锁
- 使用 ASP.NET Core 异步模式
资源
- Async/Await 最佳实践 <!-- markdownlint-disable-line MD013 -->
- ConfigureAwait 常见问题
- 异步流教程
- ValueTask 概述
- 基于任务的异步模式