MQTT客户端使用
2026/1/15大约 5 分钟
MQTT客户端使用
本章介绍各种编程语言的 MQTT 客户端库使用方法。
Python 客户端
安装 paho-mqtt
pip install paho-mqtt基础使用
import paho.mqtt.client as mqtt
import time
# 连接成功回调
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("连接成功")
# 订阅主题
client.subscribe("sensor/temperature")
else:
print(f"连接失败,错误码: {rc}")
# 断开连接回调
def on_disconnect(client, userdata, rc):
print(f"断开连接,错误码: {rc}")
# 消息接收回调
def on_message(client, userdata, msg):
print(f"收到消息 [{msg.topic}]: {msg.payload.decode()}")
# 发布成功回调
def on_publish(client, userdata, mid):
print(f"消息发布成功,mid: {mid}")
# 创建客户端
client = mqtt.Client(client_id="python-client-001")
# 设置回调函数
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.on_publish = on_publish
# 设置用户名密码(如果需要)
# client.username_pw_set("username", "password")
# 连接 Broker
client.connect("localhost", 1883, keepalive=60)
# 启动网络循环(非阻塞)
client.loop_start()
# 发布消息
for i in range(5):
client.publish("sensor/temperature", f"温度: {20 + i}°C", qos=1)
time.sleep(1)
# 保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
client.loop_stop()
client.disconnect()使用 TLS/SSL
import ssl
client = mqtt.Client()
# 配置 TLS
client.tls_set(
ca_certs="/path/to/ca.crt",
certfile="/path/to/client.crt",
keyfile="/path/to/client.key",
tls_version=ssl.PROTOCOL_TLSv1_2
)
# 连接到 SSL 端口
client.connect("localhost", 8883, keepalive=60)遗嘱消息
client = mqtt.Client()
# 设置遗嘱消息
client.will_set(
topic="device/status",
payload="offline",
qos=1,
retain=True
)
client.connect("localhost", 1883)Java 客户端
Maven 依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>基础使用
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttClientDemo {
private static final String BROKER = "tcp://localhost:1883";
private static final String CLIENT_ID = "java-client-001";
public static void main(String[] args) {
try {
// 创建客户端
MqttClient client = new MqttClient(BROKER, CLIENT_ID, new MemoryPersistence());
// 连接选项
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(60);
// options.setUserName("username");
// options.setPassword("password".toCharArray());
// 设置遗嘱消息
options.setWill("device/status", "offline".getBytes(), 1, true);
// 设置回调
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("连接断开: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
System.out.println("收到消息 [" + topic + "]: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("消息发送完成");
}
});
// 连接
System.out.println("正在连接...");
client.connect(options);
System.out.println("连接成功");
// 订阅
client.subscribe("sensor/#", 1);
// 发布消息
for (int i = 0; i < 5; i++) {
String content = "温度: " + (20 + i) + "°C";
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(1);
client.publish("sensor/temperature", message);
Thread.sleep(1000);
}
// 保持连接
Thread.sleep(10000);
// 断开连接
client.disconnect();
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}异步客户端
import org.eclipse.paho.client.mqttv3.*;
public class AsyncMqttClientDemo {
public static void main(String[] args) throws Exception {
MqttAsyncClient client = new MqttAsyncClient(
"tcp://localhost:1883",
"async-client-001"
);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
// 异步连接
client.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
System.out.println("连接成功");
try {
// 订阅
client.subscribe("test/#", 1);
} catch (MqttException e) {
e.printStackTrace();
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
System.out.println("连接失败: " + exception.getMessage());
}
});
Thread.sleep(30000);
client.disconnect();
}
}Go 客户端
安装
go get github.com/eclipse/paho.mqtt.golang基础使用
package main
import (
"fmt"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// 消息处理函数
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("收到消息 [%s]: %s\n", msg.Topic(), msg.Payload())
}
// 连接处理函数
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("连接成功")
}
// 断开连接处理函数
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("连接断开: %v\n", err)
}
func main() {
// 配置选项
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://localhost:1883")
opts.SetClientID("go-client-001")
opts.SetUsername("username")
opts.SetPassword("password")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
// 设置遗嘱消息
opts.SetWill("device/status", "offline", 1, true)
// 创建客户端
client := mqtt.NewClient(opts)
// 连接
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// 订阅
topic := "sensor/#"
if token := client.Subscribe(topic, 1, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
return
}
fmt.Printf("订阅主题: %s\n", topic)
// 发布消息
for i := 0; i < 5; i++ {
text := fmt.Sprintf("温度: %d°C", 20+i)
token := client.Publish("sensor/temperature", 1, false, text)
token.Wait()
time.Sleep(time.Second)
}
// 保持运行
time.Sleep(10 * time.Second)
// 断开连接
client.Disconnect(250)
}JavaScript 客户端
Node.js 环境
npm install mqttconst mqtt = require('mqtt')
// 连接配置
const options = {
clientId: 'nodejs-client-001',
clean: true,
connectTimeout: 4000,
username: 'username',
password: 'password',
reconnectPeriod: 1000,
// 遗嘱消息
will: {
topic: 'device/status',
payload: 'offline',
qos: 1,
retain: true
}
}
// 连接
const client = mqtt.connect('mqtt://localhost:1883', options)
// 连接成功
client.on('connect', () => {
console.log('连接成功')
// 订阅主题
client.subscribe('sensor/#', { qos: 1 }, (err) => {
if (!err) {
console.log('订阅成功')
}
})
})
// 收到消息
client.on('message', (topic, message) => {
console.log(`收到消息 [${topic}]: ${message.toString()}`)
})
// 发布消息
let count = 0
const timer = setInterval(() => {
const temp = 20 + count
client.publish('sensor/temperature', `温度: ${temp}°C`, { qos: 1 })
count++
if (count >= 5) {
clearInterval(timer)
}
}, 1000)
// 错误处理
client.on('error', (err) => {
console.error('连接错误:', err)
})
// 断开连接
client.on('close', () => {
console.log('连接关闭')
})浏览器环境(WebSocket)
<!DOCTYPE html>
<html>
<head>
<title>MQTT WebSocket Demo</title>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body>
<div id="messages"></div>
<script>
// 通过 WebSocket 连接
const client = mqtt.connect('ws://localhost:8083/mqtt', {
clientId: 'browser-client-' + Math.random().toString(16).substr(2, 8)
})
client.on('connect', () => {
console.log('连接成功')
client.subscribe('sensor/#')
})
client.on('message', (topic, message) => {
const div = document.getElementById('messages')
div.innerHTML += `<p>[${topic}]: ${message.toString()}</p>`
})
// 发布消息
function publish() {
client.publish('sensor/test', 'Hello from browser')
}
</script>
<button onclick="publish()">发送消息</button>
</body>
</html>C/C++ 客户端
使用 Paho C 库
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "c-client-001"
#define TOPIC "sensor/temperature"
#define QOS 1
#define TIMEOUT 10000L
int main(int argc, char* argv[]) {
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
// 创建客户端
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
// 连接选项
conn_opts.keepAliveInterval = 60;
conn_opts.cleansession = 1;
// 连接
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {
printf("连接失败,错误码: %d\n", rc);
return -1;
}
// 发布消息
char payload[] = "温度: 25°C";
pubmsg.payload = payload;
pubmsg.payloadlen = strlen(payload);
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
printf("等待消息发送完成...\n");
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("消息发送完成\n");
// 断开连接
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
return 0;
}客户端最佳实践
1. 连接管理
# 自动重连
client = mqtt.Client()
client.reconnect_delay_set(min_delay=1, max_delay=120)
# 连接超时处理
def on_connect(client, userdata, flags, rc):
if rc == 0:
# 重新订阅(CleanSession=True 时需要)
client.subscribe("sensor/#")2. 消息确认
# QoS 1/2 消息需要等待确认
result = client.publish("topic", "message", qos=1)
result.wait_for_publish()3. 异常处理
try:
client.connect("localhost", 1883)
except ConnectionRefusedError:
print("连接被拒绝")
except TimeoutError:
print("连接超时")
except Exception as e:
print(f"连接错误: {e}")小结
本章介绍了 Python、Java、Go、JavaScript、C/C++ 等语言的 MQTT 客户端使用方法。各语言的客户端库 API 设计相似,都支持连接、订阅、发布、回调等核心功能。选择合适的客户端库,结合业务需求实现可靠的消息通信。
