JetStream基础
2026/1/15大约 4 分钟
JetStream基础
什么是 JetStream
JetStream 是 NATS 的持久化层,提供了消息持久化、流处理、消息重放等高级功能。它解决了 NATS Core "最多一次"传递的局限性,支持"至少一次"和"恰好一次"的消息传递保证。
核心概念
Stream(流)
Stream 是消息的存储容器,定义了消息的存储策略和保留规则。
Consumer(消费者)
Consumer 定义了如何从 Stream 中消费消息,包括消费位置、确认策略等。
Subject(主题)
Stream 可以捕获一个或多个主题的消息。
启用 JetStream
服务器配置
# nats-server.conf
jetstream {
store_dir: "/data/jetstream"
max_mem: 1G
max_file: 10G
}Docker 启动
docker run -d --name nats-js -p 4222:4222 -p 8222:8222 nats -js验证启用
# 使用 CLI 检查
nats account info
# 输出示例
Connection Information:
Client ID: 6
Client IP: 127.0.0.1
...
JetStream Account Information:
Memory: 0 B of 1.0 GB
Storage: 0 B of 10 GB
Streams: 0 of Unlimited
Consumers: 0 of Unlimited创建 Stream
使用 CLI 创建
# 交互式创建
nats stream add
# 命令行创建
nats stream add ORDERS \
--subjects "orders.*" \
--storage file \
--replicas 1 \
--retention limits \
--max-msgs 1000000 \
--max-bytes 1GB \
--max-age 24h \
--max-msg-size 1MB \
--discard old使用 Go 创建
package main
import (
"log"
"github.com/nats-io/nats.go"
)
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
js, _ := nc.JetStream()
// 创建 Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.*"},
Storage: nats.FileStorage,
Replicas: 1,
Retention: nats.LimitsPolicy,
MaxMsgs: 1000000,
MaxBytes: 1024 * 1024 * 1024, // 1GB
MaxAge: 24 * time.Hour,
MaxMsgSize: 1024 * 1024, // 1MB
Discard: nats.DiscardOld,
})
if err != nil {
log.Fatal(err)
}
log.Println("Stream created")
}Stream 配置选项
| 选项 | 说明 | 可选值 |
|---|---|---|
| Name | Stream 名称 | 字符串 |
| Subjects | 捕获的主题 | 主题列表 |
| Storage | 存储类型 | File / Memory |
| Replicas | 副本数 | 1-5 |
| Retention | 保留策略 | Limits / Interest / WorkQueue |
| MaxMsgs | 最大消息数 | 数字 |
| MaxBytes | 最大存储大小 | 字节数 |
| MaxAge | 消息最大保留时间 | 时间 |
| Discard | 丢弃策略 | Old / New |
发布消息到 Stream
同步发布
// 发布消息并等待确认
ack, err := js.Publish("orders.new", []byte(`{"id": 1, "item": "book"}`))
if err != nil {
log.Fatal(err)
}
log.Printf("Published to stream: %s, seq: %d", ack.Stream, ack.Sequence)异步发布
// 异步发布
future, err := js.PublishAsync("orders.new", []byte(`{"id": 2, "item": "phone"}`))
if err != nil {
log.Fatal(err)
}
// 等待确认
select {
case ack := <-future.Ok():
log.Printf("Ack: stream=%s seq=%d", ack.Stream, ack.Sequence)
case err := <-future.Err():
log.Printf("Error: %v", err)
}带消息 ID 发布(去重)
// 使用消息 ID 实现幂等发布
ack, err := js.Publish("orders.new", data,
nats.MsgId("order-123"),
)创建 Consumer
Push Consumer
消息主动推送给消费者。
// 创建 Push Consumer
_, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "order-processor",
DeliverSubject: "order.deliver",
DeliverPolicy: nats.DeliverAllPolicy,
AckPolicy: nats.AckExplicitPolicy,
AckWait: 30 * time.Second,
})
// 订阅接收消息
sub, _ := nc.Subscribe("order.deliver", func(m *nats.Msg) {
log.Printf("Received: %s", string(m.Data))
m.Ack() // 确认消息
})Pull Consumer
消费者主动拉取消息。
// 创建 Pull Consumer
_, err := js.AddConsumer("ORDERS", &nats.ConsumerConfig{
Durable: "order-worker",
AckPolicy: nats.AckExplicitPolicy,
AckWait: 30 * time.Second,
})
// 拉取消息
sub, _ := js.PullSubscribe("orders.*", "order-worker")
msgs, _ := sub.Fetch(10, nats.MaxWait(5*time.Second))
for _, msg := range msgs {
log.Printf("Received: %s", string(msg.Data))
msg.Ack()
}Consumer 配置选项
| 选项 | 说明 | 可选值 |
|---|---|---|
| Durable | 持久化名称 | 字符串 |
| DeliverPolicy | 投递策略 | All / Last / New / ByStartSeq / ByStartTime |
| AckPolicy | 确认策略 | None / All / Explicit |
| AckWait | 确认等待时间 | 时间 |
| MaxDeliver | 最大重试次数 | 数字 |
| FilterSubject | 过滤主题 | 主题 |
| ReplayPolicy | 重放策略 | Instant / Original |
消息确认
JetStream 支持多种确认方式:
// 确认消息
msg.Ack()
// 否定确认(触发重新投递)
msg.Nak()
// 延迟否定确认
msg.NakWithDelay(5 * time.Second)
// 标记处理中(延长 AckWait)
msg.InProgress()
// 终止消息(不再重试)
msg.Term()完整示例
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// 连接 NATS
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
// 获取 JetStream 上下文
js, _ := nc.JetStream()
// 创建 Stream
js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"events.*"},
Storage: nats.FileStorage,
})
// 创建 Consumer 并订阅
sub, _ := js.Subscribe("events.*", func(m *nats.Msg) {
log.Printf("Received [%s]: %s", m.Subject, string(m.Data))
m.Ack()
}, nats.Durable("event-processor"))
// 发布消息
for i := 0; i < 5; i++ {
js.Publish("events.user", []byte(`{"action": "login"}`))
time.Sleep(time.Second)
}
time.Sleep(2 * time.Second)
sub.Unsubscribe()
}小结
JetStream 为 NATS 提供了持久化能力,通过 Stream 存储消息,通过 Consumer 消费消息。它支持多种投递策略和确认机制,满足不同场景的需求。
面试题预览
常见面试题
- JetStream 和 NATS Core 有什么区别?
- Push Consumer 和 Pull Consumer 的区别是什么?
- JetStream 如何保证消息不丢失?
