NATS性能优化
2026/1/15大约 4 分钟
NATS性能优化
性能基准
NATS 以其卓越的性能著称,单服务器可达到:
| 指标 | 数值 |
|---|---|
| 消息吞吐量 | 1000万+ 消息/秒 |
| 延迟 | 微秒级 |
| 连接数 | 10万+ |
| 内存占用 | 极低 |
服务器优化
1. 系统参数调优
# /etc/sysctl.conf
# 增加文件描述符限制
fs.file-max = 2097152
# TCP 优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65535
net.ipv4.tcp_max_syn_backlog = 65535
# TCP 缓冲区
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
# 快速回收 TIME_WAIT
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 15# /etc/security/limits.conf
* soft nofile 1048576
* hard nofile 1048576
* soft nproc 65535
* hard nproc 655352. NATS 服务器配置
# 高性能配置
port: 4222
http_port: 8222
# 最大连接数
max_connections: 100000
# 最大消息大小
max_payload: 1048576
# 写入超时
write_deadline: "2s"
# 禁用详细日志
debug: false
trace: false
# 内存优化
max_control_line: 4096
# JetStream 优化
jetstream {
store_dir: "/data/jetstream"
max_mem: 4G
max_file: 100G
# 使用 SSD
# store_dir 应指向 SSD 存储
}3. JetStream 存储优化
jetstream {
store_dir: "/ssd/jetstream" # 使用 SSD
max_mem: 8G # 增加内存缓存
max_file: 500G
# 同步策略
sync_interval: "1m" # 批量同步
}客户端优化
1. 连接池
// 使用连接池
type NATSPool struct {
conns []*nats.Conn
idx int
mu sync.Mutex
}
func NewNATSPool(url string, size int) (*NATSPool, error) {
pool := &NATSPool{
conns: make([]*nats.Conn, size),
}
for i := 0; i < size; i++ {
nc, err := nats.Connect(url,
nats.MaxReconnects(-1),
nats.ReconnectWait(time.Second),
)
if err != nil {
return nil, err
}
pool.conns[i] = nc
}
return pool, nil
}
func (p *NATSPool) Get() *nats.Conn {
p.mu.Lock()
defer p.mu.Unlock()
conn := p.conns[p.idx]
p.idx = (p.idx + 1) % len(p.conns)
return conn
}2. 批量发布
// 批量发布消息
func batchPublish(js nats.JetStreamContext, subject string, messages [][]byte) error {
// 使用异步发布
var futures []nats.PubAckFuture
for _, msg := range messages {
future, err := js.PublishAsync(subject, msg)
if err != nil {
return err
}
futures = append(futures, future)
}
// 等待所有确认
for _, future := range futures {
select {
case <-future.Ok():
// 成功
case err := <-future.Err():
return err
}
}
return nil
}3. 消息压缩
import (
"bytes"
"compress/gzip"
)
// 压缩消息
func compress(data []byte) ([]byte, error) {
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
_, err := w.Write(data)
if err != nil {
return nil, err
}
w.Close()
return buf.Bytes(), nil
}
// 解压消息
func decompress(data []byte) ([]byte, error) {
r, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
defer r.Close()
return io.ReadAll(r)
}
// 发布压缩消息
func publishCompressed(nc *nats.Conn, subject string, data []byte) error {
compressed, err := compress(data)
if err != nil {
return err
}
msg := nats.NewMsg(subject)
msg.Data = compressed
msg.Header.Set("Content-Encoding", "gzip")
return nc.PublishMsg(msg)
}4. 消费者优化
// 批量拉取
sub, _ := js.PullSubscribe("orders.*", "processor")
for {
// 批量拉取消息
msgs, err := sub.Fetch(100, nats.MaxWait(5*time.Second))
if err != nil {
continue
}
// 并行处理
var wg sync.WaitGroup
for _, msg := range msgs {
wg.Add(1)
go func(m *nats.Msg) {
defer wg.Done()
processMessage(m)
m.Ack()
}(msg)
}
wg.Wait()
}Stream 优化
1. 合理设置保留策略
// 根据场景选择保留策略
js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"events.>"},
// 限制策略 - 适合需要保留历史的场景
Retention: nats.LimitsPolicy,
MaxMsgs: 10000000,
MaxBytes: 100 * 1024 * 1024 * 1024, // 100GB
MaxAge: 7 * 24 * time.Hour,
// 或者工作队列策略 - 适合任务队列
// Retention: nats.WorkQueuePolicy,
// 丢弃旧消息
Discard: nats.DiscardOld,
})2. 分区策略
// 按时间分区
func getPartitionedSubject(baseSubject string) string {
now := time.Now()
return fmt.Sprintf("%s.%d.%02d.%02d",
baseSubject, now.Year(), now.Month(), now.Day())
}
// 创建分区 Stream
for i := 0; i < 10; i++ {
js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("ORDERS_%d", i),
Subjects: []string{fmt.Sprintf("orders.%d.*", i)},
})
}
// 发布时选择分区
func publish(js nats.JetStreamContext, orderID string, data []byte) {
partition := hash(orderID) % 10
subject := fmt.Sprintf("orders.%d.new", partition)
js.Publish(subject, data)
}3. 副本数优化
// 根据重要性设置副本数
// 关键数据 - 3副本
js.AddStream(&nats.StreamConfig{
Name: "CRITICAL_EVENTS",
Replicas: 3,
})
// 普通数据 - 1副本
js.AddStream(&nats.StreamConfig{
Name: "LOGS",
Replicas: 1,
})监控与调优
1. 性能监控
# 使用 nats-top 实时监控
nats-top
# 输出示例
NATS server version 2.10.7 (uptime: 24h3m2s)
Server:
Load: CPU: 2.3% Memory: 256.3M Slow Consumers: 0
In: Msgs: 1.2M Bytes: 128.5M Msgs/Sec: 50000 Bytes/Sec: 5.3M
Out: Msgs: 2.4M Bytes: 257.0M Msgs/Sec: 100000 Bytes/Sec: 10.6M
Connections: 1000
HOST CID NAME SUBS PENDING MSGS_TO MSGS_FROM BYTES_TO BYTES_FROM
192.168.1.10:52341 1 producer-1 0 0 500000 0 53.5M 0
192.168.1.11:52342 2 consumer-1 10 0 0 250000 0 26.7M2. 性能测试
# 使用 nats-bench 进行基准测试
nats bench test --pub 10 --sub 10 --msgs 1000000 --size 128
# 输出示例
Pub stats: 9,523,809 msgs/sec ~ 1.14 GB/sec
Sub stats: 9,523,809 msgs/sec ~ 1.14 GB/sec3. 关键指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| CPU 使用率 | 服务器 CPU | > 80% |
| 内存使用 | 服务器内存 | > 80% |
| 慢消费者 | Slow Consumers | > 0 |
| 消息积压 | Pending Messages | 持续增长 |
| 连接数 | Connections | 接近上限 |
常见性能问题
1. 慢消费者
// 检测慢消费者
nc, _ := nats.Connect(nats.DefaultURL,
nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
if err == nats.ErrSlowConsumer {
log.Printf("Slow consumer detected on %s", sub.Subject)
}
}),
)
// 解决方案:增加消费者或使用 Pull 模式
sub, _ := js.PullSubscribe("orders.*", "processor")2. 消息积压
// 监控消息积压
info, _ := js.StreamInfo("ORDERS")
log.Printf("Messages: %d, Bytes: %d", info.State.Msgs, info.State.Bytes)
// 解决方案:增加消费者并行度
for i := 0; i < 10; i++ {
go func() {
sub, _ := js.PullSubscribe("orders.*", "processor")
for {
msgs, _ := sub.Fetch(100)
for _, msg := range msgs {
processMessage(msg)
msg.Ack()
}
}
}()
}小结
NATS 性能优化涉及服务器配置、客户端优化、Stream 设计等多个方面。关键是根据实际场景选择合适的配置,并通过监控持续调优。
面试题预览
常见面试题
- 如何优化 NATS 的消息吞吐量?
- 什么是慢消费者?如何解决?
- JetStream 的存储优化有哪些策略?
