MQTT 5.0新特性
2026/1/15大约 4 分钟
MQTT 5.0新特性
MQTT 5.0 于 2019 年发布,是 MQTT 协议的重大升级,引入了许多新特性,提升了协议的灵活性和可扩展性。
版本对比
| 特性 | MQTT 3.1.1 | MQTT 5.0 |
|---|---|---|
| 原因码 | 有限 | 丰富 |
| 属性 | 不支持 | 支持 |
| 共享订阅 | 不支持 | 支持 |
| 消息过期 | 不支持 | 支持 |
| 主题别名 | 不支持 | 支持 |
| 用户属性 | 不支持 | 支持 |
| 请求/响应 | 不支持 | 支持 |
| 流控 | 不支持 | 支持 |
原因码(Reason Code)
MQTT 5.0 为所有响应报文添加了详细的原因码:
CONNACK 原因码
| 原因码 | 名称 | 说明 |
|---|---|---|
| 0x00 | Success | 连接成功 |
| 0x80 | Unspecified error | 未指定错误 |
| 0x81 | Malformed Packet | 报文格式错误 |
| 0x82 | Protocol Error | 协议错误 |
| 0x84 | Unsupported Protocol Version | 不支持的协议版本 |
| 0x85 | Client Identifier not valid | Client ID 无效 |
| 0x86 | Bad User Name or Password | 用户名密码错误 |
| 0x87 | Not authorized | 未授权 |
| 0x88 | Server unavailable | 服务不可用 |
| 0x89 | Server busy | 服务器繁忙 |
| 0x8A | Banned | 被禁止 |
PUBACK/SUBACK 原因码
# Python MQTT 5.0 处理原因码
def on_subscribe(client, userdata, mid, reason_codes, properties):
for i, rc in enumerate(reason_codes):
if rc.is_failure:
print(f"订阅失败: {rc}")
else:
print(f"订阅成功,QoS: {rc.value}")属性(Properties)
MQTT 5.0 引入属性机制,可以在报文中携带额外信息:
常用属性
| 属性 | 适用报文 | 说明 |
|---|---|---|
| Message Expiry Interval | PUBLISH | 消息过期时间 |
| Content Type | PUBLISH | 内容类型 |
| Response Topic | PUBLISH | 响应主题 |
| Correlation Data | PUBLISH | 关联数据 |
| User Property | 所有 | 用户自定义属性 |
| Topic Alias | PUBLISH | 主题别名 |
| Session Expiry Interval | CONNECT | 会话过期时间 |
使用示例
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
# 创建属性
props = Properties(PacketTypes.PUBLISH)
props.MessageExpiryInterval = 60 # 60秒后过期
props.ContentType = "application/json"
props.UserProperty = [("key1", "value1"), ("key2", "value2")]
# 发布带属性的消息
client.publish("topic", payload, qos=1, properties=props)消息过期(Message Expiry)
消息可以设置过期时间,过期后 Broker 不再投递:
# 设置消息60秒后过期
props = Properties(PacketTypes.PUBLISH)
props.MessageExpiryInterval = 60
client.publish("sensor/data", "temperature: 25", properties=props)应用场景:
- 实时数据:过期时间短(几秒)
- 通知消息:过期时间中等(几分钟)
- 配置下发:过期时间长或不过期共享订阅(Shared Subscription)
共享订阅允许多个客户端组成消费组,实现负载均衡:
格式: $share/{group}/{topic}
示例: $share/consumer-group/sensor/+/data工作原理
+-- Consumer 1 (收到消息1, 4, 7...)
|
Broker --消息--> $share/group/topic --+-- Consumer 2 (收到消息2, 5, 8...)
|
+-- Consumer 3 (收到消息3, 6, 9...)使用示例
# 多个消费者订阅同一共享组
# Consumer 1
client1.subscribe("$share/mygroup/sensor/#")
# Consumer 2
client2.subscribe("$share/mygroup/sensor/#")
# Consumer 3
client3.subscribe("$share/mygroup/sensor/#")
# 发布者发送消息,只有一个消费者收到
publisher.publish("sensor/temp", "25.5")负载均衡策略
EMQX 支持多种策略:
# emqx.conf
broker.shared_subscription_strategy = random
# 可选: random, round_robin, sticky, hash主题别名(Topic Alias)
主题别名用整数代替主题字符串,减少网络开销:
第一次发布:
Topic: "very/long/topic/name/sensor/temperature"
Topic Alias: 1
后续发布:
Topic: "" (空)
Topic Alias: 1 (使用别名)# 设置主题别名
props = Properties(PacketTypes.PUBLISH)
props.TopicAlias = 1
# 第一次发布(建立别名映射)
client.publish("sensor/temperature", "25", properties=props)
# 后续发布(使用别名)
props = Properties(PacketTypes.PUBLISH)
props.TopicAlias = 1
client.publish("", "26", properties=props) # 主题为空,使用别名请求/响应模式
MQTT 5.0 原生支持请求/响应模式:
Requester Broker Responder
| | |
|-- PUBLISH ---------->|-- PUBLISH --------->|
| topic: request | topic: request |
| response_topic: | response_topic: |
| response/123 | response/123 |
| correlation: abc | correlation: abc |
| | |
|<-- PUBLISH ----------|<-- PUBLISH ---------|
| topic: response/123| topic: response/123
| correlation: abc | correlation: abc |实现示例
import uuid
# 请求方
def send_request(client, request_topic, payload):
correlation_id = str(uuid.uuid4())
response_topic = f"response/{client._client_id}/{correlation_id}"
# 订阅响应主题
client.subscribe(response_topic)
# 发送请求
props = Properties(PacketTypes.PUBLISH)
props.ResponseTopic = response_topic
props.CorrelationData = correlation_id.encode()
client.publish(request_topic, payload, properties=props)
return correlation_id
# 响应方
def on_message(client, userdata, msg):
# 获取响应主题和关联数据
response_topic = msg.properties.ResponseTopic
correlation_data = msg.properties.CorrelationData
# 处理请求
result = process_request(msg.payload)
# 发送响应
props = Properties(PacketTypes.PUBLISH)
props.CorrelationData = correlation_data
client.publish(response_topic, result, properties=props)用户属性(User Properties)
用户属性允许在消息中携带自定义键值对:
props = Properties(PacketTypes.PUBLISH)
props.UserProperty = [
("trace-id", "abc123"),
("source", "sensor-001"),
("timestamp", "1699999999")
]
client.publish("data/sensor", payload, properties=props)# 接收方读取用户属性
def on_message(client, userdata, msg):
if hasattr(msg.properties, 'UserProperty'):
for key, value in msg.properties.UserProperty:
print(f"{key}: {value}")会话过期(Session Expiry)
MQTT 5.0 可以精确控制会话过期时间:
# 连接时设置会话过期时间
props = Properties(PacketTypes.CONNECT)
props.SessionExpiryInterval = 3600 # 1小时
client.connect("localhost", 1883, properties=props)SessionExpiryInterval:
0: 连接断开时立即删除会话
0xFFFFFFFF: 会话永不过期
其他值: 指定秒数后过期流控(Flow Control)
MQTT 5.0 引入接收最大值(Receive Maximum)控制未确认消息数量:
# 限制同时处理的 QoS 1/2 消息数量
props = Properties(PacketTypes.CONNECT)
props.ReceiveMaximum = 10 # 最多10条未确认消息
client.connect("localhost", 1883, properties=props)遗嘱延迟(Will Delay)
遗嘱消息可以延迟发布,给客户端重连的机会:
# 遗嘱延迟30秒发布
props = Properties(PacketTypes.CONNECT)
props.WillDelayInterval = 30
client.will_set("device/status", "offline", properties=props)服务器断开(Server Disconnect)
MQTT 5.0 允许服务器主动断开连接并说明原因:
Broker --> Client: DISCONNECT
Reason Code: 0x98 (Administrative action)
Reason String: "Server maintenance"面试题预览
常见面试题
- MQTT 5.0 相比 3.1.1 有哪些主要改进?
- 共享订阅有什么作用?如何实现?
- 消息过期机制是如何工作的?
- 如何使用 MQTT 5.0 实现请求/响应模式?
小结
MQTT 5.0 带来了许多重要改进:原因码提供详细错误信息,属性机制增强扩展性,共享订阅实现负载均衡,消息过期控制消息生命周期。升级到 MQTT 5.0 可以构建更强大、更灵活的物联网应用。
