NATS客户端使用
2026/1/15大约 3 分钟
NATS客户端使用
Go 客户端
安装
go get github.com/nats-io/nats.go连接服务器
package main
import (
"log"
"github.com/nats-io/nats.go"
)
func main() {
// 简单连接
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// 带选项连接
nc, err = nats.Connect("nats://localhost:4222",
nats.Name("my-client"),
nats.UserInfo("user", "password"),
nats.ReconnectWait(time.Second),
nats.MaxReconnects(10),
)
}发布订阅
// 发布消息
err := nc.Publish("subject", []byte("Hello NATS"))
// 订阅消息
sub, err := nc.Subscribe("subject", func(m *nats.Msg) {
log.Printf("Received: %s", string(m.Data))
})
// 同步订阅
sub, err := nc.SubscribeSync("subject")
msg, err := sub.NextMsg(time.Second)
// 取消订阅
sub.Unsubscribe()请求响应
// 发送请求(等待响应)
msg, err := nc.Request("service", []byte("request"), time.Second)
if err != nil {
log.Fatal(err)
}
log.Printf("Response: %s", string(msg.Data))
// 响应请求
nc.Subscribe("service", func(m *nats.Msg) {
nc.Publish(m.Reply, []byte("response"))
})队列订阅
// 多个消费者加入同一队列组
nc.QueueSubscribe("tasks", "worker-group", func(m *nats.Msg) {
log.Printf("Worker received: %s", string(m.Data))
})完整示例
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// 连接 NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// 订阅消息
nc.Subscribe("updates", func(m *nats.Msg) {
log.Printf("Received on [%s]: %s", m.Subject, string(m.Data))
})
// 发布消息
for i := 0; i < 5; i++ {
nc.Publish("updates", []byte("Hello NATS"))
time.Sleep(time.Second)
}
// 等待消息处理完成
nc.Flush()
}Java 客户端
Maven 依赖
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.17.2</version>
</dependency>连接服务器
import io.nats.client.*;
// 简单连接
Connection nc = Nats.connect("nats://localhost:4222");
// 带选项连接
Options options = new Options.Builder()
.server("nats://localhost:4222")
.userInfo("user", "password")
.connectionName("my-java-client")
.reconnectWait(Duration.ofSeconds(1))
.maxReconnects(10)
.build();
Connection nc = Nats.connect(options);发布订阅
// 发布消息
nc.publish("subject", "Hello NATS".getBytes());
// 订阅消息
Dispatcher dispatcher = nc.createDispatcher((msg) -> {
System.out.println("Received: " + new String(msg.getData()));
});
dispatcher.subscribe("subject");
// 同步订阅
Subscription sub = nc.subscribe("subject");
Message msg = sub.nextMessage(Duration.ofSeconds(1));请求响应
// 发送请求
Message response = nc.request("service", "request".getBytes(), Duration.ofSeconds(1));
System.out.println("Response: " + new String(response.getData()));
// 响应请求
Dispatcher dispatcher = nc.createDispatcher((msg) -> {
nc.publish(msg.getReplyTo(), "response".getBytes());
});
dispatcher.subscribe("service");完整示例
package com.example.nats;
import io.nats.client.*;
import java.time.Duration;
public class NatsExample {
public static void main(String[] args) throws Exception {
// 连接 NATS
Connection nc = Nats.connect("nats://localhost:4222");
// 创建订阅
Dispatcher dispatcher = nc.createDispatcher((msg) -> {
System.out.printf("Received on [%s]: %s%n",
msg.getSubject(), new String(msg.getData()));
});
dispatcher.subscribe("updates");
// 发布消息
for (int i = 0; i < 5; i++) {
nc.publish("updates", ("Message " + i).getBytes());
Thread.sleep(1000);
}
// 等待消息处理
nc.flush(Duration.ofSeconds(1));
nc.close();
}
}Python 客户端
安装
pip install nats-py异步客户端
import asyncio
import nats
async def main():
# 连接 NATS
nc = await nats.connect("nats://localhost:4222")
# 订阅消息
async def message_handler(msg):
subject = msg.subject
data = msg.data.decode()
print(f"Received on [{subject}]: {data}")
await nc.subscribe("updates", cb=message_handler)
# 发布消息
for i in range(5):
await nc.publish("updates", f"Message {i}".encode())
await asyncio.sleep(1)
# 请求响应
response = await nc.request("service", b"request", timeout=1)
print(f"Response: {response.data.decode()}")
await nc.close()
if __name__ == "__main__":
asyncio.run(main())队列订阅
import asyncio
import nats
async def worker(nc, worker_id):
async def handler(msg):
print(f"Worker {worker_id} received: {msg.data.decode()}")
await nc.subscribe("tasks", queue="workers", cb=handler)
async def main():
nc = await nats.connect("nats://localhost:4222")
# 启动多个 worker
for i in range(3):
await worker(nc, i)
# 发布任务
for i in range(10):
await nc.publish("tasks", f"Task {i}".encode())
await asyncio.sleep(2)
await nc.close()
asyncio.run(main())JavaScript/Node.js 客户端
安装
npm install nats使用示例
const { connect, StringCodec } = require('nats');
async function main() {
// 连接 NATS
const nc = await connect({ servers: 'nats://localhost:4222' });
const sc = StringCodec();
// 订阅消息
const sub = nc.subscribe('updates');
(async () => {
for await (const msg of sub) {
console.log(`Received on [${msg.subject}]: ${sc.decode(msg.data)}`);
}
})();
// 发布消息
for (let i = 0; i < 5; i++) {
nc.publish('updates', sc.encode(`Message ${i}`));
await new Promise(r => setTimeout(r, 1000));
}
// 请求响应
const response = await nc.request('service', sc.encode('request'), { timeout: 1000 });
console.log(`Response: ${sc.decode(response.data)}`);
await nc.drain();
}
main();连接选项
各语言客户端都支持丰富的连接选项:
| 选项 | 说明 | 默认值 |
|---|---|---|
| servers | 服务器地址列表 | localhost:4222 |
| user/password | 用户名密码 | - |
| token | Token 认证 | - |
| reconnect | 自动重连 | true |
| maxReconnects | 最大重连次数 | 60 |
| reconnectWait | 重连等待时间 | 2s |
| timeout | 连接超时 | 2s |
| pingInterval | 心跳间隔 | 2m |
| maxPingsOut | 最大未响应心跳数 | 2 |
小结
本章介绍了 NATS 在 Go、Java、Python、JavaScript 等主流语言中的客户端使用方法,包括连接、发布订阅、请求响应和队列订阅等核心操作。
面试题预览
常见面试题
- NATS 客户端如何实现自动重连?
- 同步订阅和异步订阅有什么区别?
- 如何在 NATS 中实现负载均衡?
