Skip to content
Go back

IAsyncEnumerable<T>:流式处理数据,不把所有东西塞进内存

IAsyncEnumerable 流式处理

原文:IAsyncEnumerable<T> in C#: Streaming Data Without Loading Everything into Memory 作者:Adrian Bailador · Apr 05, 2026


我的 API 在生产环境崩了。500,000 条数据库记录,一个 List<T>,加上 8GB 内存的服务器。算术根本行不通。

我盯着日志看了两个小时才弄明白发生了什么。修复只用了十分钟。这就是我学到的经验——希望能帮你省去同样的麻烦。

问题所在

你肯定写过这种代码:

// ❌ 问题:100 万条记录全部加载进内存
public async Task<List<LogEntry>> GetAllLogsAsync()
{
    return await _context.LogEntries.ToListAsync(); // 定时炸弹
}

数据量少时没问题。但当数据集超出 RAM 容量的那一天,这个无辜的 ToListAsync() 就变成了定时炸弹。

IAsyncEnumerable<T> 是什么?

C# 8.0(.NET Core 3.0)引入的 IAsyncEnumerable<T> 让你能异步地流式处理数据——逐条处理到达的数据,而不是在处理前把整个集合装进内存。就像逐页阅读一本书,而不是先把整本书复印一遍。

// ✅ 解决方案:数据到达时流式处理
public IAsyncEnumerable<LogEntry> StreamLogsAsync()
{
    return _context.LogEntries.AsAsyncEnumerable(); // 内存友好
}

四个生产级模式

以下是最常用的四种模式——都是真正上过生产的东西,不是理论。

模式一:Entity Framework Core 流式查询

这是大多数人的起点,也有充分的理由。如果你用 EF Core 且有持续增长的表,迟早需要这个模式。

public class LogRepository
{
    private readonly AppDbContext _context;

    public LogRepository(AppDbContext context)
    {
        _context = context;
    }

    /// <summary>
    /// 按条件流式读取日志,不把所有数据加载进内存。
    /// WARNING: 谨慎配合 OrderBy/Limit 使用。
    /// </summary>
    public async IAsyncEnumerable<LogEntry> StreamLogsAsync(
        DateTime from,
        DateTime to,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        await foreach (var log in _context.LogEntries
            .Where(l => l.Timestamp >= from && l.Timestamp <= to)
            .AsAsyncEnumerable()
            .WithCancellation(ct))
        {
            yield return log;
        }
    }

    /// <summary>
    /// 分批导出以防止连接超时。
    /// </summary>
    public async IAsyncEnumerable<IReadOnlyList<LogEntry>> StreamBatchedLogsAsync(
        int batchSize = 1000,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        var query = _context.LogEntries
            .OrderBy(l => l.Id)
            .AsAsyncEnumerable();

        var batch = new List<LogEntry>(batchSize);

        await foreach (var log in query.WithCancellation(ct))
        {
            batch.Add(log);

            if (batch.Count >= batchSize)
            {
                yield return batch.ToList();
                batch.Clear();
            }
        }

        // 不要漏掉最后一个不足 batchSize 的批次
        if (batch.Count > 0)
            yield return batch;
    }
}

模式二:HttpClient 流式 JSON 解析

返回数千条记录的第三方 API 出乎意料地常见。没有流式处理时,你要缓冲完整响应才能反序列化第一个对象。这里的关键是 HttpCompletionOption.ResponseHeadersRead——别忘了加。

public class StreamingApiClient
{
    private readonly HttpClient _httpClient;
    private readonly JsonSerializerOptions _jsonOptions;

    public StreamingApiClient(HttpClient httpClient)
    {
        _httpClient = httpClient;
        _jsonOptions = new JsonSerializerOptions
        {
            PropertyNameCaseInsensitive = true,
            DefaultBufferSize = 4096 // 根据你的 payload 大小调整
        };
    }

    /// <summary>
    /// 从 NDJSON 端点流式获取数据。
    /// 每行是一个独立的 JSON 对象。
    /// </summary>
    public async IAsyncEnumerable<T> StreamFromNdJsonAsync<T>(
        string url,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        using var response = await _httpClient.GetAsync(
            url,
            HttpCompletionOption.ResponseHeadersRead, // 关键:流式,不缓冲
            ct);

        response.EnsureSuccessStatusCode();

        await using var stream = await response.Content.ReadAsStreamAsync(ct);
        using var reader = new StreamReader(stream, Encoding.UTF8);

        string? line;
        while ((line = await reader.ReadLineAsync(ct)) != null)
        {
            if (string.IsNullOrWhiteSpace(line))
                continue;

            // 逐行独立反序列化
            var item = JsonSerializer.Deserialize<T>(line, _jsonOptions);

            if (item != null)
                yield return item;
        }
    }

    /// <summary>
    /// 使用 System.Text.Json 内置异步反序列化流式处理。
    /// 要求 JSON 数组格式:[{}, {}, {}]
    /// </summary>
    public async IAsyncEnumerable<T> StreamJsonArrayAsync<T>(
        string url,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        using var response = await _httpClient.GetAsync(
            url,
            HttpCompletionOption.ResponseHeadersRead,
            ct);

        response.EnsureSuccessStatusCode();

        await using var stream = await response.Content.ReadAsStreamAsync(ct);

        await foreach (var item in JsonSerializer.DeserializeAsyncEnumerable<T>(
            stream,
            _jsonOptions,
            ct))
        {
            if (item != null)
                yield return item;
        }
    }
}

模式三:文件处理流水线

CSV 导入、日志分析、数据迁移——任何逐行读取文件并处理每行的场景。关键洞察:你可以把多个 IAsyncEnumerable<T> 方法串成流水线,无论文件多大,内存占用都保持不变。

public class LogFileProcessor
{
    /// <summary>
    /// 在流水线中读取、转换、写入日志。
    /// 无论文件多大,内存使用量始终恒定。
    /// </summary>
    public async IAsyncEnumerable<ProcessedLog> ProcessLogFileAsync(
        string filePath,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        await foreach (var rawLog in ReadLinesAsync(filePath, ct))
        {
            // 实时转换——无内存堆积
            if (TryParseLog(rawLog, out var parsed))
            {
                var enriched = await EnrichLogAsync(parsed, ct);
                yield return enriched;
            }
        }
    }

    private static async IAsyncEnumerable<string> ReadLinesAsync(
        string path,
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        using var reader = new StreamReader(path, Encoding.UTF8);

        string? line;
        while ((line = await reader.ReadLineAsync(ct)) != null)
        {
            yield return line;
        }
    }

    private static bool TryParseLog(string raw, out ParsedLog parsed)
    {
        // 你的解析逻辑
        parsed = default!;
        return true;
    }

    private static async Task<ProcessedLog> EnrichLogAsync(
        ParsedLog log,
        CancellationToken ct)
    {
        // 异步丰富(例如查询用户信息)
        await Task.Delay(1, ct); // 模拟异步操作
        return new ProcessedLog(log);
    }
}

public record ParsedLog(string Timestamp, string Level, string Message);
public record ProcessedLog(ParsedLog Original);

模式四:Controller Action 流式响应

ASP.NET Core 原生支持 IAsyncEnumerable<T> 返回类型——自动序列化为 NDJSON,并立即开始向客户端发送数据。客户端不需要等待完整数据集;第一条记录就绪后就立刻开始接收。

[ApiController]
[Route("api/[controller]")]
public class ReportsController : ControllerBase
{
    private readonly IReportService _reportService;

    public ReportsController(IReportService reportService)
    {
        _reportService = reportService;
    }

    /// <summary>
    /// 直接向客户端流式传输 CSV 数据。
    /// 客户端立即开始接收,服务器无缓冲。
    /// </summary>
    [HttpGet("export")]
    public IAsyncEnumerable<ReportRow> ExportData(
        [FromQuery] DateTime from,
        [FromQuery] DateTime to,
        CancellationToken ct)
    {
        // ASP.NET Core 自动将 IAsyncEnumerable 序列化为 NDJSON
        return _reportService.StreamReportAsync(from, to, ct);
    }

    /// <summary>
    /// 自定义 Server-Sent Events (SSE) 流式传输。
    /// </summary>
    [HttpGet("live-events")]
    public async IAsyncEnumerable<ServerSentEvent> GetLiveEvents(
        [EnumeratorCancellation] CancellationToken ct)
    {
        await foreach (var evt in _reportService.SubscribeToEventsAsync(ct))
        {
            yield return new ServerSentEvent
            {
                Id = evt.Id,
                Event = evt.Type,
                Data = JsonSerializer.Serialize(evt.Payload)
            };
        }
    }
}

public record ReportRow(string Id, string Name, decimal Value);
public record ServerSentEvent(string Id, string Event, string Data);

性能对比:数字说话

旧方式:List(Before)

// 内存:O(n) — 随数据集大小线性增长
// 首条输出时间:O(n) — 等待全部数据
public async Task<List<LargeRecord>> ProcessAllRecordsAsync()
{
    var allRecords = await _repository.GetAllAsync();

    var processed = new List<LargeRecord>();
    foreach (var record in allRecords)
    {
        processed.Add(await ProcessAsync(record));
    }

    return processed;
}

// 调用侧——内存在这里暴涨
var results = await ProcessAllRecordsAsync();
foreach (var item in results)
{
    await WriteToDiskAsync(item);
}

100 万条记录 × 100KB/条——实际发生的事:

新方式:IAsyncEnumerable(After)

// 内存:O(1) — 无论数据集多大,始终恒定
// 首条输出时间:O(1) — 立即流式传输
public async IAsyncEnumerable<LargeRecord> ProcessAllRecordsAsync(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    await foreach (var record in _repository.StreamAllAsync(ct))
    {
        yield return await ProcessAsync(record);
    }
}

// 调用侧——内存占用恒定
await foreach (var item in ProcessAllRecordsAsync().WithCancellation(ct))
{
    await WriteToDiskAsync(item);
}

同样 100 万条记录——完全不同的结果:


常见错误及修复

每一条我都踩过,你不必重蹈覆辙。

错误一:忘记 CancellationToken

// ❌ 糟糕:流式中途无法取消
public async IAsyncEnumerable<Item> GetItemsAsync()
{
    await foreach (var item in _source.GetAsync())
    {
        yield return item; // 无法取消!
    }
}

// ✅ 正确:始终包含取消支持
public async IAsyncEnumerable<Item> GetItemsAsync(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    await foreach (var item in _source.GetAsync().WithCancellation(ct))
    {
        yield return item;
    }
}

关键点[EnumeratorCancellation] 特性让调用方能通过 .WithCancellation(ct) 传入取消令牌。没有它,你的流式操作就是无法被客户端断开连接所打断的黑洞。

错误二:用 ToListAsync() 把流式缓冲掉了

// ❌ 糟糕:你刚把自己的努力全抵消掉了
public async Task<List<Item>> GetItemsAsync() // 返回 List<T>,不是 IAsyncEnumerable
{
    var items = new List<Item>();
    await foreach (var item in _source.GetAsync())
    {
        items.Add(item);
    }
    return items;
}

// ✅ 正确:直接返回 IAsyncEnumerable<T>
public IAsyncEnumerable<Item> GetItemsAsync(CancellationToken ct)
{
    return _source.GetAsync(); // 直接传递
}

错误三:资源过早释放

// ❌ 糟糕:reader 在第一次 yield 时就被释放
public async IAsyncEnumerable<string> ReadFileAsync(string path)
{
    using var reader = new StreamReader(path); // 第一次 yield 时即释放!

    while (await reader.ReadLineAsync() is { } line)
    {
        yield return line; // reader 已经释放了
    }
}

// ✅ 正确:保持作用域存活
public async IAsyncEnumerable<string> ReadFileAsync(
    string path,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    using var reader = new StreamReader(path);

    try
    {
        while (await reader.ReadLineAsync(ct) is { } line)
        {
            yield return line;
        }
    }
    finally
    {
        // 迭代完成或中断时才释放 reader
    }
}

错误四:未配置 DbContext 生命周期

// ❌ 糟糕:ASP.NET Core 中,Scoped DbContext 在流式中途被释放
[HttpGet("stream")]
public async IAsyncEnumerable<Order> GetOrders()
{
    var db = _serviceProvider.GetRequiredService<OrderDbContext>();
    // Action 方法返回后,db 就会被释放!

    await foreach (var order in db.Orders.AsAsyncEnumerable())
    {
        yield return order; // DbContext 在这里已被释放
    }
}

// ✅ 正确:使用工厂模式用于流式场景
public class StreamingOrderService
{
    private readonly IDbContextFactory<OrderDbContext> _contextFactory;

    public async IAsyncEnumerable<Order> StreamOrdersAsync(
        [EnumeratorCancellation] CancellationToken ct = default)
    {
        await using var db = await _contextFactory.CreateDbContextAsync(ct);

        await foreach (var order in db.Orders.AsAsyncEnumerable().WithCancellation(ct))
        {
            yield return order;
        }
    }
}

何时用,何时不用

适合流式处理的场景:

不适合流式处理的场景:


小结

同样 100 万条记录,选择不同,结果天差地别:

List<T>IAsyncEnumerable<T>
内存复杂度O(n)O(1)
首条输出时间等待全部完成立即
100 万×100KB~95 GB,OOM~150 MB,稳定
可取消不便内置支持

IAsyncEnumerable<T> 不是用来替换所有 List<T> 的银弹——但当你处理大数据集、文件流、外部 API 响应或实时事件时,它就是那个让服务器不崩溃的关键。


参考链接


Tags


Previous

C# 代理模式实战:用组合叠加缓存、限流和日志

Next

在 ASP.NET Core 中获取客户端 IP 地址:直连与代理转发完整处理