MQTT协议原理
2026/1/15大约 4 分钟
MQTT协议原理
深入理解 MQTT 协议的底层原理,包括报文结构、消息流程、状态机等核心内容。
报文结构
MQTT 报文由三部分组成:
+------------------+------------------+------------------+
| Fixed Header | Variable Header | Payload |
| (必须) | (部分报文) | (部分报文) |
+------------------+------------------+------------------+固定头(Fixed Header)
固定头是所有报文都有的部分,最少 2 字节:
Byte 1:
+-------+-------+-------+-------+-------+-------+-------+-------+
| Bit 7 | Bit 6 | Bit 5 | Bit 4 | Bit 3 | Bit 2 | Bit 1 | Bit 0 |
+-------+-------+-------+-------+-------+-------+-------+-------+
| 报文类型 (4 bits) | 标志位 (4 bits) |
+-------+-------+-------+-------+-------+-------+-------+-------+
Byte 2+:
+-------+-------+-------+-------+-------+-------+-------+-------+
| 剩余长度 (1-4 字节) |
+-------+-------+-------+-------+-------+-------+-------+-------+报文类型
| 类型值 | 报文名称 | 方向 | 说明 |
|---|---|---|---|
| 1 | CONNECT | C→S | 客户端连接请求 |
| 2 | CONNACK | S→C | 连接确认 |
| 3 | PUBLISH | 双向 | 发布消息 |
| 4 | PUBACK | 双向 | QoS 1 确认 |
| 5 | PUBREC | 双向 | QoS 2 第一步确认 |
| 6 | PUBREL | 双向 | QoS 2 第二步确认 |
| 7 | PUBCOMP | 双向 | QoS 2 完成确认 |
| 8 | SUBSCRIBE | C→S | 订阅请求 |
| 9 | SUBACK | S→C | 订阅确认 |
| 10 | UNSUBSCRIBE | C→S | 取消订阅 |
| 11 | UNSUBACK | S→C | 取消订阅确认 |
| 12 | PINGREQ | C→S | 心跳请求 |
| 13 | PINGRESP | S→C | 心跳响应 |
| 14 | DISCONNECT | 双向 | 断开连接 |
剩余长度编码
剩余长度使用变长编码,每字节低 7 位表示数值,最高位表示是否有后续字节:
字节数 最大值
1 127
2 16,383
3 2,097,151
4 268,435,455 (约 256MB)# 剩余长度编码
def encode_remaining_length(length):
result = []
while True:
byte = length % 128
length = length // 128
if length > 0:
byte |= 0x80 # 设置继续位
result.append(byte)
if length == 0:
break
return bytes(result)
# 剩余长度解码
def decode_remaining_length(data):
multiplier = 1
value = 0
for byte in data:
value += (byte & 0x7F) * multiplier
if (byte & 0x80) == 0:
break
multiplier *= 128
return valueCONNECT 报文详解
Fixed Header:
+--------+--------+
| 0x10 | Length |
+--------+--------+
Variable Header:
+--------+--------+--------+--------+--------+--------+--------+--------+
| Protocol Name Length (2 bytes) | 'M' | 'Q' | 'T' | 'T' |
+--------+--------+--------+--------+--------+--------+--------+--------+
| Protocol Level | Connect Flags | Keep Alive (2 bytes) |
+--------+--------+--------+--------+--------+--------+--------+--------+
Connect Flags:
+-------+-------+-------+-------+-------+-------+-------+-------+
| User | Pass | Will | Will | Will | Clean | Rsrvd | |
| Name | word | Retain| QoS | Flag |Session| | |
+-------+-------+-------+-------+-------+-------+-------+-------+Connect Flags 详解
| 位 | 名称 | 说明 |
|---|---|---|
| 7 | User Name Flag | 是否包含用户名 |
| 6 | Password Flag | 是否包含密码 |
| 5 | Will Retain | 遗嘱消息是否保留 |
| 4-3 | Will QoS | 遗嘱消息 QoS |
| 2 | Will Flag | 是否有遗嘱消息 |
| 1 | Clean Session | 是否清除会话 |
| 0 | Reserved | 保留位,必须为 0 |
PUBLISH 报文详解
Fixed Header:
+-------+-------+-------+-------+-------+-------+-------+-------+
| 0x3 | DUP | QoS | QoS |RETAIN |
+-------+-------+-------+-------+-------+-------+-------+-------+
| Remaining Length |
+-------+-------+-------+-------+-------+-------+-------+-------+
Variable Header:
+--------+--------+--------+--------+--------+--------+
| Topic Length | Topic Name ... |
+--------+--------+--------+--------+--------+--------+
| Packet ID (QoS > 0 时存在) |
+--------+--------+--------+--------+--------+--------+
Payload:
+--------+--------+--------+--------+--------+--------+
| Message Content ... |
+--------+--------+--------+--------+--------+--------+PUBLISH 标志位
| 位 | 名称 | 说明 |
|---|---|---|
| 3 | DUP | 重复标志(重传时为 1) |
| 2-1 | QoS | 服务质量等级 |
| 0 | RETAIN | 保留消息标志 |
QoS 消息流程
QoS 0 流程
QoS 1 流程
QoS 2 流程
状态机
客户端状态机
QoS 2 发送方状态机
主题匹配算法
精确匹配
def exact_match(topic, filter):
return topic == filter通配符匹配
def wildcard_match(topic, filter):
topic_parts = topic.split('/')
filter_parts = filter.split('/')
i = 0
for j, f in enumerate(filter_parts):
if f == '#':
return True # # 匹配剩余所有层级
if i >= len(topic_parts):
return False
if f != '+' and f != topic_parts[i]:
return False
i += 1
return i == len(topic_parts)
# 测试
print(wildcard_match("home/room1/temp", "home/+/temp")) # True
print(wildcard_match("home/room1/temp", "home/#")) # True
print(wildcard_match("home/room1/temp", "home/+")) # FalseBroker 消息路由
订阅树结构
路由算法
class TopicTree:
def __init__(self):
self.root = {}
self.subscribers = {}
def subscribe(self, client_id, topic_filter, qos):
parts = topic_filter.split('/')
node = self.root
for part in parts:
if part not in node:
node[part] = {}
node = node[part]
if '_subscribers' not in node:
node['_subscribers'] = []
node['_subscribers'].append((client_id, qos))
def match(self, topic):
parts = topic.split('/')
return self._match_recursive(self.root, parts, 0)
def _match_recursive(self, node, parts, index):
subscribers = []
# 检查 # 通配符
if '#' in node and '_subscribers' in node['#']:
subscribers.extend(node['#']['_subscribers'])
if index >= len(parts):
if '_subscribers' in node:
subscribers.extend(node['_subscribers'])
return subscribers
part = parts[index]
# 精确匹配
if part in node:
subscribers.extend(self._match_recursive(node[part], parts, index + 1))
# + 通配符匹配
if '+' in node:
subscribers.extend(self._match_recursive(node['+'], parts, index + 1))
return subscribers消息存储
会话存储
Session Store:
+-------------+------------------+
| Client ID | Session Data |
+-------------+------------------+
| device-001 | subscriptions: |
| | - sensor/# |
| | - device/+/cmd |
| | pending_msgs: |
| | - msg_id: 1 |
| | - msg_id: 2 |
+-------------+------------------+保留消息存储
Retained Store:
+------------------+------------------+
| Topic | Message |
+------------------+------------------+
| device/001/status| {"online": true} |
| config/global | {"interval": 60} |
+------------------+------------------+面试题预览
常见面试题
- MQTT 报文的基本结构是什么?
- 剩余长度是如何编码的?最大能表示多少?
- QoS 2 的四次握手流程是怎样的?
- MQTT Broker 是如何进行主题匹配的?
- PUBLISH 报文的 DUP 标志有什么作用?
小结
理解 MQTT 协议原理对于开发和调试 MQTT 应用非常重要。报文结构决定了协议的效率,QoS 机制保证了消息可靠性,主题匹配算法影响了路由性能。掌握这些原理有助于更好地使用和优化 MQTT 系统。
