Skip to content
Go back

用 NATS JetStream 在 .NET 中快速搭建持久化消息队列

.NET 开发者需要消息队列时,通常直奔 RabbitMQ、Azure Service Bus,或者干脆拿一张 Postgres 表顶上。NATS 几乎从来不在讨论范围内。这挺可惜的——它悄悄成了我在这类场景里最喜欢的工具。

NATS 是一个 Go 写的消息系统,单二进制运行,无外部依赖。JetStream 是它上面的持久化层,把它变成了一个真正支持至少一次投递(at-least-once delivery)的队列。而它的 .NET 客户端 用起来很舒服。

Core NATS vs JetStream

NATS 有两层,区别很重要。

Core NATS 是即发即忘的 pub/sub——你发布到某个 subject,那一刻订阅了的人才能收到。如果没人监听,消息就丢了。适合实时通知,但不适合工作队列。

JetStream 是加在上面的持久化层。它把发布到 subject 的消息捕获到磁盘上的 stream 里,消费者可以之后来读,甚至重启之后也可以。正是这个持久化把一个 subject 变成了持久队列。

简单说:Core NATS 在没人订阅时丢消息,JetStream 把它持久化到文件存储的 stream 里,稍后再投递。

为什么值得一试

从我常用那些 broker 转过来,几个东西让我印象深刻:

搭起来

需要服务器和两个 NuGet 包。

用 JetStream 模式启动服务器,-js 开启 JetStream,-sd 指定数据目录让 stream 在重启后存活:

# docker-compose.yml
nats:
  image: nats:2.14-alpine
  command: ['-js', '-sd', '/data']
  ports: ['4222:4222']
  volumes:
    - nats-data:/data
  restart: unless-stopped

添加客户端和 DI 集成包:

dotnet add package NATS.Net
dotnet add package NATS.Extensions.Microsoft.DependencyInjection

然后在 Program.cs 中接入。AddNatsClient 注册一个多路复用、自动重连的连接,下一行暴露一个可以在任何地方注入的 JetStream 上下文:

// Program.cs
builder.Services.AddNatsClient(nats =>
    nats.ConfigureOptions(opts => opts with { Url = "nats://localhost:4222" }));

builder.Services.AddSingleton(sp =>
    sp.GetRequiredService<INatsConnection>().CreateJetStreamContext());

发布任务

JetStream 上下文注入之后,一个 Minimal API 端点一行调用就能发布。Job 是个普通 record,NATS.Net 自动帮你序列化成 JSON,所以你可以直接用类型化消息,不需要额外设置。EnsureSuccess 在 stream 没存储消息时抛异常:

app.MapPost("/jobs", async (CreateJob request, INatsJSContext js, CancellationToken ct) =>
{
    var job = new Job(Guid.NewGuid(), request.Payload);

    PubAckResponse ack = await js.PublishAsync("jobs.work", job, cancellationToken: ct);
    ack.EnsureSuccess();

    return Results.Accepted($"/jobs/{job.Id}");
});

在 Worker 中处理任务

BackgroundService 是消费者的天然归属。它在启动时创建 stream 和持久消费者,然后在循环中拉取消息。每个运行中的实例共享 workers 消费者,所以它们竞争任务,每个任务只跑一次:

public class JobWorker(INatsJSContext js) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        // 创建 stream —— 只在不存在时创建
        await js.CreateStreamAsync(new StreamConfig("JOBS", ["jobs.work"])
        {
            Retention = StreamConfigRetention.Workqueue, // ack 后移除消息
            Storage   = StreamConfigStorage.File          // 持久化到磁盘
        }, ct);

        // 创建消费者 —— 多实例共享、竞争式消费
        var consumer = await js.CreateOrUpdateConsumerAsync("JOBS", new ConsumerConfig("workers")
        {
            AckPolicy  = ConsumerConfigAckPolicy.Explicit,
            AckWait    = TimeSpan.FromSeconds(30), // 必须超过最坏情况的处理时间
            MaxDeliver = 5                          // 5 次尝试后丢弃毒消息
        }, ct);

        await foreach (var msg in consumer.ConsumeAsync<Job>(cancellationToken: ct))
        {
            await ProcessAsync(msg.Data, ct);  // 副作用
            await msg.AckAsync(cancellationToken: ct); // 再 ack
        }
    }
}

builder.Services.AddHostedService<JobWorker>() 注册。worker 是单例,如果需要 DbContext 等 scoped 依赖,通过 IServiceScopeFactory 解析。

两个 stream 设置决定了队列行为:

工作队列用 Workqueue + File

先处理,再 Ack

仔细看 worker 循环:先处理,再 ack。这个顺序是让 JetStream 可靠的规则,大多数快速入门都略过了。

副作用完成之后再 ack,绝不反过来。

JetStream 给你至少一次投递。如果 worker 跑了任务但在 ack 之前崩溃,JetStream 会重新投递它。但如果在工作完成之前就 ack,崩溃后任务被标记为已完成却什么都没产出。

反过来说,一个任务可能被执行不止一次,所以你的处理器必须是幂等的。通常的做法是跟踪已经处理过的消息并跳过重复,在和副作用同一个事务里做。我在 The Idempotent Consumer Pattern in .NET 里覆盖了完整模式。至少一次投递只有在读 stream 的处理器是幂等的前提下才成立。

总结

NATS JetStream 用单个 18 MB 的二进制给你一个持久化、至少一次投递的工作队列,而且可以干净地嵌入 ASP.NET Core 应用:从 endpoint 发布,在 BackgroundService 里处理,工作完成后再 ack。

我一开始是带着怀疑去试的,半心半意地预期会想念 RabbitMQ。结果它说服了我:易于运维、没有意外、负载更大时可以用基于 Raft 的复制来集群。现在它是我想加个队列又不想在 broker 上费脑筋时的第一选择。

如果你还没试过,把容器跑起来,发布一条消息。上手就这么简单。

参考


Tags


Previous

HttpClient DNS 问题:PooledConnectionLifetime 与 SocketsHttpHandler

Next

多智能体 A2A 系统中的上下文传递:三种方案与选择标准