NATS实战案例
2026/1/15大约 5 分钟
NATS实战案例
案例一:微服务事件总线
使用 NATS 构建微服务间的事件驱动通信。
架构设计
事件定义
// events/events.go
package events
import "time"
type UserCreatedEvent struct {
UserID string `json:"user_id"`
Email string `json:"email"`
CreatedAt time.Time `json:"created_at"`
}
type OrderCreatedEvent struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Items []Item `json:"items"`
Total float64 `json:"total"`
CreatedAt time.Time `json:"created_at"`
}
type PaymentCompletedEvent struct {
PaymentID string `json:"payment_id"`
OrderID string `json:"order_id"`
Amount float64 `json:"amount"`
PaidAt time.Time `json:"paid_at"`
}事件发布者
// publisher/publisher.go
package publisher
import (
"encoding/json"
"github.com/nats-io/nats.go"
)
type EventPublisher struct {
js nats.JetStreamContext
}
func NewEventPublisher(nc *nats.Conn) (*EventPublisher, error) {
js, err := nc.JetStream()
if err != nil {
return nil, err
}
// 创建事件 Stream
js.AddStream(&nats.StreamConfig{
Name: "EVENTS",
Subjects: []string{"events.>"},
Storage: nats.FileStorage,
Replicas: 3,
})
return &EventPublisher{js: js}, nil
}
func (p *EventPublisher) Publish(subject string, event interface{}) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
_, err = p.js.Publish(subject, data)
return err
}事件消费者
// consumer/consumer.go
package consumer
import (
"encoding/json"
"log"
"github.com/nats-io/nats.go"
)
type EventConsumer struct {
js nats.JetStreamContext
}
func NewEventConsumer(nc *nats.Conn) (*EventConsumer, error) {
js, err := nc.JetStream()
if err != nil {
return nil, err
}
return &EventConsumer{js: js}, nil
}
func (c *EventConsumer) Subscribe(subject, durable string, handler func([]byte)) error {
_, err := c.js.Subscribe(subject, func(m *nats.Msg) {
handler(m.Data)
m.Ack()
}, nats.Durable(durable))
return err
}用户服务
// services/user/main.go
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
publisher, _ := NewEventPublisher(nc)
// 创建用户后发布事件
event := UserCreatedEvent{
UserID: "user-123",
Email: "user@example.com",
CreatedAt: time.Now(),
}
publisher.Publish("events.user.created", event)
log.Println("User created event published")
}通知服务
// services/notification/main.go
package main
import (
"encoding/json"
"log"
"github.com/nats-io/nats.go"
)
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
consumer, _ := NewEventConsumer(nc)
// 订阅用户创建事件
consumer.Subscribe("events.user.created", "notification-service", func(data []byte) {
var event UserCreatedEvent
json.Unmarshal(data, &event)
// 发送欢迎邮件
sendWelcomeEmail(event.Email)
log.Printf("Welcome email sent to %s", event.Email)
})
select {} // 保持运行
}案例二:IoT 数据采集
使用 NATS 收集和处理 IoT 设备数据。
架构设计
设备数据模型
type SensorData struct {
DeviceID string `json:"device_id"`
SensorType string `json:"sensor_type"`
Value float64 `json:"value"`
Unit string `json:"unit"`
Timestamp time.Time `json:"timestamp"`
Location Location `json:"location"`
}
type Location struct {
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
}设备端(模拟)
// device/main.go
package main
import (
"encoding/json"
"log"
"math/rand"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, _ := nats.Connect("nats://leaf-node:4222")
defer nc.Close()
deviceID := "sensor-001"
for {
data := SensorData{
DeviceID: deviceID,
SensorType: "temperature",
Value: 20 + rand.Float64()*10,
Unit: "celsius",
Timestamp: time.Now(),
Location: Location{
Latitude: 39.9042,
Longitude: 116.4074,
},
}
payload, _ := json.Marshal(data)
nc.Publish("iot.sensor.temperature."+deviceID, payload)
log.Printf("Published: %.2f %s", data.Value, data.Unit)
time.Sleep(5 * time.Second)
}
}数据处理服务
// processor/main.go
package main
import (
"encoding/json"
"log"
"github.com/nats-io/nats.go"
)
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
js, _ := nc.JetStream()
// 创建 IoT 数据 Stream
js.AddStream(&nats.StreamConfig{
Name: "IOT_DATA",
Subjects: []string{"iot.>"},
Storage: nats.FileStorage,
MaxAge: 7 * 24 * time.Hour, // 保留7天
})
// 处理温度数据
js.Subscribe("iot.sensor.temperature.>", func(m *nats.Msg) {
var data SensorData
json.Unmarshal(m.Data, &data)
// 异常检测
if data.Value > 30 {
log.Printf("ALERT: High temperature %.2f from %s", data.Value, data.DeviceID)
// 发送告警
nc.Publish("alerts.temperature", m.Data)
}
// 存储数据
storeData(data)
m.Ack()
}, nats.Durable("iot-processor"))
select {}
}案例三:分布式任务队列
使用 NATS 实现分布式任务处理。
架构设计
任务定义
type Task struct {
ID string `json:"id"`
Type string `json:"type"`
Payload map[string]interface{} `json:"payload"`
Priority int `json:"priority"`
CreatedAt time.Time `json:"created_at"`
Retries int `json:"retries"`
}
type TaskResult struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
Result interface{} `json:"result"`
Error string `json:"error,omitempty"`
Duration int64 `json:"duration_ms"`
}任务队列
// queue/queue.go
package queue
import (
"encoding/json"
"time"
"github.com/nats-io/nats.go"
)
type TaskQueue struct {
js nats.JetStreamContext
}
func NewTaskQueue(nc *nats.Conn) (*TaskQueue, error) {
js, _ := nc.JetStream()
// 创建任务 Stream(工作队列模式)
js.AddStream(&nats.StreamConfig{
Name: "TASKS",
Subjects: []string{"tasks.*"},
Storage: nats.FileStorage,
Retention: nats.WorkQueuePolicy, // 消费后删除
Replicas: 3,
})
return &TaskQueue{js: js}, nil
}
func (q *TaskQueue) Enqueue(taskType string, task Task) error {
data, _ := json.Marshal(task)
_, err := q.js.Publish("tasks."+taskType, data)
return err
}
func (q *TaskQueue) Subscribe(taskType string, handler func(Task) error) error {
_, err := q.js.QueueSubscribe("tasks."+taskType, "workers", func(m *nats.Msg) {
var task Task
json.Unmarshal(m.Data, &task)
if err := handler(task); err != nil {
// 重试逻辑
if task.Retries < 3 {
task.Retries++
q.Enqueue(taskType, task)
}
m.Nak()
} else {
m.Ack()
}
}, nats.Durable("task-workers"), nats.AckWait(30*time.Second))
return err
}任务生产者
// producer/main.go
package main
import (
"log"
"time"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
)
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
queue, _ := NewTaskQueue(nc)
// 提交任务
for i := 0; i < 100; i++ {
task := Task{
ID: uuid.New().String(),
Type: "email",
Payload: map[string]interface{}{"to": "user@example.com", "subject": "Hello"},
Priority: 1,
CreatedAt: time.Now(),
}
queue.Enqueue("email", task)
log.Printf("Task %s enqueued", task.ID)
}
}任务消费者
// worker/main.go
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
queue, _ := NewTaskQueue(nc)
// 处理邮件任务
queue.Subscribe("email", func(task Task) error {
log.Printf("Processing task %s", task.ID)
// 模拟处理
time.Sleep(time.Second)
// 发送邮件
to := task.Payload["to"].(string)
subject := task.Payload["subject"].(string)
sendEmail(to, subject)
log.Printf("Task %s completed", task.ID)
return nil
})
select {}
}案例四:实时聊天系统
使用 NATS 构建实时聊天功能。
消息模型
type ChatMessage struct {
ID string `json:"id"`
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
Username string `json:"username"`
Content string `json:"content"`
Type string `json:"type"` // text, image, file
Timestamp time.Time `json:"timestamp"`
}聊天服务
// chat/service.go
package chat
import (
"encoding/json"
"github.com/nats-io/nats.go"
)
type ChatService struct {
nc *nats.Conn
js nats.JetStreamContext
}
func NewChatService(nc *nats.Conn) (*ChatService, error) {
js, _ := nc.JetStream()
// 创建聊天消息 Stream
js.AddStream(&nats.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
Storage: nats.FileStorage,
MaxAge: 30 * 24 * time.Hour, // 保留30天
})
return &ChatService{nc: nc, js: js}, nil
}
// 发送消息
func (s *ChatService) SendMessage(msg ChatMessage) error {
data, _ := json.Marshal(msg)
_, err := s.js.Publish("chat.room."+msg.RoomID, data)
return err
}
// 加入房间
func (s *ChatService) JoinRoom(roomID, userID string, handler func(ChatMessage)) error {
_, err := s.js.Subscribe("chat.room."+roomID, func(m *nats.Msg) {
var msg ChatMessage
json.Unmarshal(m.Data, &msg)
handler(msg)
m.Ack()
}, nats.Durable(userID+"-"+roomID), nats.DeliverNew())
return err
}
// 获取历史消息
func (s *ChatService) GetHistory(roomID string, limit int) ([]ChatMessage, error) {
sub, _ := s.js.PullSubscribe("chat.room."+roomID, "history-reader",
nats.DeliverAll())
msgs, _ := sub.Fetch(limit)
var messages []ChatMessage
for _, m := range msgs {
var msg ChatMessage
json.Unmarshal(m.Data, &msg)
messages = append(messages, msg)
}
return messages, nil
}小结
本章通过四个实战案例展示了 NATS 在不同场景下的应用:微服务事件总线、IoT 数据采集、分布式任务队列和实时聊天系统。这些案例涵盖了 NATS 的核心功能和最佳实践。
面试题预览
常见面试题
- 如何使用 NATS 实现微服务间的事件驱动通信?
- NATS 在 IoT 场景下有哪些优势?
- 如何使用 JetStream 实现可靠的任务队列?
