QoS服务质量
2026/1/15大约 4 分钟
QoS服务质量
QoS(Quality of Service,服务质量)是 MQTT 协议的核心特性之一,定义了消息传递的可靠性级别。
QoS 等级概述
MQTT 定义了三种 QoS 等级:
| QoS | 名称 | 说明 | 消息传递 |
|---|---|---|---|
| 0 | At most once | 至多一次 | 可能丢失 |
| 1 | At least once | 至少一次 | 可能重复 |
| 2 | Exactly once | 恰好一次 | 不丢失不重复 |
QoS 0:至多一次
工作原理
QoS 0 是最简单的传输模式,发送者只发送一次消息,不等待确认:
特点
- 最快:无需等待确认,延迟最低
- 最不可靠:消息可能丢失
- 无重传:发送后不管结果
- 适用场景:
- 传感器周期性上报数据(丢失一次无影响)
- 实时性要求高但允许丢失的场景
- 网络环境稳定的内网通信
代码示例
# Python
client.publish("sensor/temp", "25.5", qos=0)// Java
MqttMessage message = new MqttMessage("25.5".getBytes());
message.setQos(0);
client.publish("sensor/temp", message);QoS 1:至少一次
工作原理
QoS 1 通过 PUBACK 确认机制保证消息至少送达一次:
重传机制
如果发送者未收到 PUBACK,会重新发送消息(DUP=1):
特点
- 可靠:保证消息至少送达一次
- 可能重复:网络问题可能导致重复
- 需要去重:接收方需处理重复消息
- 适用场景:
- 重要的状态变更通知
- 允许重复但不允许丢失的场景
- 大多数物联网应用
代码示例
# Python - 等待发布完成
result = client.publish("device/status", "online", qos=1)
result.wait_for_publish()
print(f"消息ID: {result.mid}")// Java - 同步发布
MqttMessage message = new MqttMessage("online".getBytes());
message.setQos(1);
client.publish("device/status", message);QoS 2:恰好一次
工作原理
QoS 2 通过四次握手保证消息恰好送达一次:
消息流程详解
- PUBLISH:发送者发送消息
- PUBREC:接收者确认收到消息
- PUBREL:发送者释放消息
- PUBCOMP:接收者确认完成
特点
- 最可靠:保证消息不丢失不重复
- 最慢:四次握手,延迟最高
- 开销大:需要存储消息状态
- 适用场景:
- 金融交易、计费系统
- 不允许重复的关键业务
- 对可靠性要求极高的场景
代码示例
# Python
result = client.publish("payment/transaction", payload, qos=2)
result.wait_for_publish()QoS 降级
当发布者和订阅者的 QoS 不同时,Broker 会进行 QoS 降级:
最终 QoS = min(发布 QoS, 订阅 QoS)| 发布 QoS | 订阅 QoS | 实际 QoS |
|---|---|---|
| 0 | 2 | 0 |
| 1 | 0 | 0 |
| 2 | 1 | 1 |
| 2 | 2 | 2 |
QoS 选择指南
决策流程
场景对照表
| 场景 | 推荐 QoS | 原因 |
|---|---|---|
| 传感器周期上报 | 0 | 数据连续,丢失一次影响小 |
| 设备状态变更 | 1 | 状态重要,允许重复 |
| 远程控制指令 | 1 | 指令重要,幂等操作 |
| 金融交易 | 2 | 不允许丢失和重复 |
| 实时位置追踪 | 0/1 | 根据精度要求选择 |
| OTA 升级通知 | 1 | 通知重要,重复无害 |
消息去重
QoS 1 可能产生重复消息,需要在应用层处理:
方案1:消息ID去重
import hashlib
from collections import OrderedDict
class MessageDeduplicator:
def __init__(self, max_size=10000):
self.seen = OrderedDict()
self.max_size = max_size
def is_duplicate(self, message):
# 生成消息指纹
msg_hash = hashlib.md5(message.encode()).hexdigest()
if msg_hash in self.seen:
return True
# 添加到已见集合
self.seen[msg_hash] = True
# 限制大小
if len(self.seen) > self.max_size:
self.seen.popitem(last=False)
return False
dedup = MessageDeduplicator()
def on_message(client, userdata, msg):
if dedup.is_duplicate(msg.payload.decode()):
print("重复消息,忽略")
return
# 处理消息
process_message(msg)方案2:幂等设计
# 使用唯一业务ID
def process_order(order_id, data):
# 检查订单是否已处理
if order_exists(order_id):
return # 幂等,直接返回
# 处理订单
create_order(order_id, data)性能对比
| 指标 | QoS 0 | QoS 1 | QoS 2 |
|---|---|---|---|
| 网络往返 | 1 | 2 | 4 |
| 存储需求 | 无 | 低 | 高 |
| CPU 开销 | 低 | 中 | 高 |
| 延迟 | 最低 | 中等 | 最高 |
面试题预览
常见面试题
- MQTT 三种 QoS 等级的区别是什么?
- QoS 1 为什么会产生重复消息?如何处理?
- QoS 2 的四次握手流程是怎样的?
- 什么场景下应该使用 QoS 2?
- 发布者 QoS 2,订阅者 QoS 1,实际传输是什么 QoS?
小结
QoS 是 MQTT 保证消息可靠性的核心机制。QoS 0 最快但可能丢失,QoS 1 保证送达但可能重复,QoS 2 最可靠但开销最大。选择合适的 QoS 等级需要权衡可靠性和性能,大多数物联网场景使用 QoS 1 即可满足需求。
