JetStream高级特性
2026/1/15大约 4 分钟
JetStream高级特性
消息重放
JetStream 支持从任意位置重放消息,这是其核心优势之一。
重放策略
// 从头开始消费所有消息
js.Subscribe("orders.*", handler,
nats.DeliverAll())
// 只消费新消息
js.Subscribe("orders.*", handler,
nats.DeliverNew())
// 从最后一条消息开始
js.Subscribe("orders.*", handler,
nats.DeliverLast())
// 从指定序号开始
js.Subscribe("orders.*", handler,
nats.StartSequence(100))
// 从指定时间开始
js.Subscribe("orders.*", handler,
nats.StartTime(time.Now().Add(-1*time.Hour)))重放速度
// 即时重放(尽快投递)
nats.ReplayInstant()
// 原始速度重放(按原始时间间隔)
nats.ReplayOriginal()消息去重
JetStream 支持基于消息 ID 的去重,防止重复消息。
// 发布时指定消息 ID
js.Publish("orders.new", data, nats.MsgId("order-12345"))
// 重复发布相同 ID 的消息会被忽略
js.Publish("orders.new", data, nats.MsgId("order-12345")) // 被去重配置去重窗口:
js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.*"},
Duplicates: 2 * time.Minute, // 2分钟内的重复消息会被去重
})恰好一次语义
通过消息 ID 和确认机制实现恰好一次(Exactly-Once)语义。
// 发布端:使用消息 ID
ack, err := js.Publish("orders.new", data,
nats.MsgId("unique-id-123"),
nats.ExpectLastSequence(99), // 期望的上一条消息序号
)
// 消费端:处理后确认
sub, _ := js.Subscribe("orders.*", func(m *nats.Msg) {
// 幂等处理
if !isProcessed(m.Header.Get("Nats-Msg-Id")) {
processOrder(m.Data)
markProcessed(m.Header.Get("Nats-Msg-Id"))
}
m.Ack()
}, nats.Durable("processor"))Key-Value 存储
JetStream 提供了内置的 Key-Value 存储功能。
创建 KV Bucket
js, _ := nc.JetStream()
// 创建 KV Bucket
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "CONFIG",
Description: "Application configuration",
MaxBytes: 1024 * 1024 * 100, // 100MB
History: 5, // 保留5个历史版本
TTL: 24 * time.Hour, // 24小时过期
})KV 操作
// 写入
revision, _ := kv.Put("app.name", []byte("MyApp"))
// 读取
entry, _ := kv.Get("app.name")
fmt.Printf("Key: %s, Value: %s, Revision: %d\n",
entry.Key(), string(entry.Value()), entry.Revision())
// 更新(带版本检查)
kv.Update("app.name", []byte("NewApp"), revision)
// 删除
kv.Delete("app.name")
// 清除(保留墓碑)
kv.Purge("app.name")
// 获取历史
history, _ := kv.History("app.name")
for _, entry := range history {
fmt.Printf("Revision %d: %s\n", entry.Revision(), string(entry.Value()))
}监听变更
// 监听所有 key 的变更
watcher, _ := kv.WatchAll()
for entry := range watcher.Updates() {
if entry != nil {
fmt.Printf("Changed: %s = %s\n", entry.Key(), string(entry.Value()))
}
}
// 监听特定 key
watcher, _ := kv.Watch("app.>")Object Store
JetStream 还提供了对象存储功能,适合存储大文件。
创建 Object Store
js, _ := nc.JetStream()
// 创建 Object Store
obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{
Bucket: "FILES",
Description: "File storage",
MaxBytes: 1024 * 1024 * 1024, // 1GB
})对象操作
// 上传文件
file, _ := os.Open("document.pdf")
info, _ := obs.Put(&nats.ObjectMeta{Name: "document.pdf"}, file)
fmt.Printf("Uploaded: %s, Size: %d\n", info.Name, info.Size)
// 下载文件
result, _ := obs.Get("document.pdf")
data, _ := io.ReadAll(result)
os.WriteFile("downloaded.pdf", data, 0644)
// 获取元信息
info, _ := obs.GetInfo("document.pdf")
fmt.Printf("Name: %s, Size: %d, Modified: %v\n",
info.Name, info.Size, info.ModTime)
// 删除对象
obs.Delete("document.pdf")
// 列出所有对象
objects, _ := obs.List()
for _, obj := range objects {
fmt.Printf("%s (%d bytes)\n", obj.Name, obj.Size)
}消息头
JetStream 支持消息头,用于传递元数据。
// 发布带头的消息
msg := nats.NewMsg("orders.new")
msg.Data = []byte(`{"id": 1}`)
msg.Header.Set("Content-Type", "application/json")
msg.Header.Set("X-Priority", "high")
msg.Header.Set("X-Trace-Id", "abc123")
js.PublishMsg(msg)
// 消费时读取头
sub, _ := js.Subscribe("orders.*", func(m *nats.Msg) {
contentType := m.Header.Get("Content-Type")
traceId := m.Header.Get("X-Trace-Id")
log.Printf("Content-Type: %s, Trace-Id: %s", contentType, traceId)
m.Ack()
})流镜像与源
镜像(Mirror)
创建一个 Stream 的只读副本。
// 创建镜像 Stream
js.AddStream(&nats.StreamConfig{
Name: "ORDERS_MIRROR",
Mirror: &nats.StreamSource{
Name: "ORDERS",
},
})源(Sources)
从多个 Stream 聚合消息。
// 创建聚合 Stream
js.AddStream(&nats.StreamConfig{
Name: "ALL_EVENTS",
Sources: []*nats.StreamSource{
{Name: "USER_EVENTS"},
{Name: "ORDER_EVENTS"},
{Name: "PAYMENT_EVENTS"},
},
})流量控制
消费者流量控制
// 限制未确认消息数
js.Subscribe("orders.*", handler,
nats.MaxAckPending(100), // 最多100条未确认消息
)
// 限制拉取数量
sub, _ := js.PullSubscribe("orders.*", "worker")
msgs, _ := sub.Fetch(10) // 每次最多拉取10条发布流量控制
// 异步发布时限制并发
js, _ := nc.JetStream(
nats.PublishAsyncMaxPending(256), // 最多256条待确认
)
// 等待所有异步发布完成
select {
case <-js.PublishAsyncComplete():
log.Println("All published")
}小结
JetStream 的高级特性包括消息重放、去重、恰好一次语义、Key-Value 存储、Object Store 等。这些功能使 NATS 能够满足更复杂的业务场景需求。
面试题预览
常见面试题
- JetStream 如何实现消息去重?
- Key-Value 存储和普通 Stream 有什么区别?
- 如何使用 JetStream 实现恰好一次语义?
