MQTT常见问题
2026/1/15大约 4 分钟
MQTT常见问题
本章汇总 MQTT 开发和运维中的常见问题及解决方案。
连接问题
1. 连接被拒绝
错误信息:
Connection refused: Not authorized (5)
Connection refused: Bad username or password (4)原因与解决:
| 错误码 | 原因 | 解决方案 |
|---|---|---|
| 4 | 用户名密码错误 | 检查认证信息 |
| 5 | 未授权 | 检查 ACL 配置 |
| 3 | 服务不可用 | 检查 Broker 状态 |
| 2 | Client ID 无效 | 使用合法的 Client ID |
# 检查认证配置
client.username_pw_set("correct_username", "correct_password")
client.connect("localhost", 1883)2. 连接超时
错误信息:
Connection timed out
socket.timeout: timed out排查步骤:
# 1. 检查网络连通性
ping broker.example.com
telnet broker.example.com 1883
# 2. 检查防火墙
sudo iptables -L -n | grep 1883
sudo firewall-cmd --list-ports
# 3. 检查 Broker 是否运行
systemctl status emqx
netstat -tlnp | grep 18833. 频繁断开重连
可能原因:
- Keep Alive 设置过短
- 网络不稳定
- 相同 Client ID 冲突
- 服务器资源不足
解决方案:
# 1. 调整 Keep Alive
client.connect("localhost", 1883, keepalive=60)
# 2. 使用唯一 Client ID
import uuid
client_id = f"device-{uuid.uuid4().hex[:8]}"
client = mqtt.Client(client_id=client_id)
# 3. 启用自动重连
client.reconnect_delay_set(min_delay=1, max_delay=120)4. SSL/TLS 连接失败
错误信息:
ssl.SSLCertVerificationError: certificate verify failed解决方案:
import ssl
# 方案1:使用正确的 CA 证书
client.tls_set(ca_certs="/path/to/ca.crt")
# 方案2:跳过证书验证(仅测试环境)
client.tls_set(cert_reqs=ssl.CERT_NONE)
client.tls_insecure_set(True)消息问题
1. 消息丢失
排查方向:
1. QoS 等级是否合适?
- QoS 0 可能丢失
- 使用 QoS 1 或 2
2. Clean Session 设置?
- Clean Session = true 会丢失离线消息
- 需要离线消息时设为 false
3. 消息队列是否溢出?
- 检查 Broker 队列配置
- 增加队列大小或加快消费# 使用 QoS 1 确保消息送达
result = client.publish("topic", "message", qos=1)
result.wait_for_publish()
# 持久会话接收离线消息
client = mqtt.Client(client_id="device-001", clean_session=False)2. 消息重复
原因:QoS 1 的重传机制可能导致重复
解决方案:
# 消息去重
from collections import OrderedDict
import hashlib
class MessageDeduplicator:
def __init__(self, max_size=10000):
self.seen = OrderedDict()
self.max_size = max_size
def is_duplicate(self, msg_id, payload):
key = f"{msg_id}:{hashlib.md5(payload).hexdigest()}"
if key in self.seen:
return True
self.seen[key] = True
if len(self.seen) > self.max_size:
self.seen.popitem(last=False)
return False
dedup = MessageDeduplicator()
def on_message(client, userdata, msg):
if dedup.is_duplicate(msg.mid, msg.payload):
return # 忽略重复消息
process_message(msg)3. 消息延迟高
排查方向:
1. 网络延迟
- 使用就近的 Broker
- 检查网络质量
2. QoS 等级
- QoS 2 延迟最高
- 非必要不用 QoS 2
3. Broker 负载
- 检查 CPU、内存使用
- 考虑扩容或集群
4. 消息积压
- 检查消费速度
- 增加消费者4. 收不到消息
排查步骤:
# 1. 确认订阅成功
def on_subscribe(client, userdata, mid, granted_qos):
print(f"订阅成功,QoS: {granted_qos}")
client.on_subscribe = on_subscribe
client.subscribe("test/topic", qos=1)
# 2. 检查主题是否匹配
# 发布: sensor/device001/temp
# 订阅: sensor/+/temp ✓
# 订阅: sensor/device001 ✗ (不匹配)
# 3. 检查 ACL 权限
# 确认用户有订阅该主题的权限性能问题
1. 连接数上不去
系统限制检查:
# 查看当前限制
ulimit -n
# 修改限制
# /etc/security/limits.conf
* soft nofile 1048576
* hard nofile 1048576
# /etc/sysctl.conf
fs.file-max = 2097152
fs.nr_open = 20971522. 内存持续增长
可能原因:
- 会话未清理
- 保留消息过多
- 消息队列积压
解决方案:
# EMQX 配置
# 限制会话数量
mqtt.max_sessions = 1000000
# 限制消息队列
mqtt.max_mqueue_len = 1000
# 会话过期(MQTT 5.0)
mqtt.session_expiry_interval = 2h3. CPU 使用率高
排查方向:
1. 主题匹配复杂
- 减少通配符订阅
- 优化主题设计
2. 消息处理慢
- 异步处理消息
- 增加处理线程
3. TLS 加解密
- 使用硬件加速
- 考虑内网不加密集群问题
1. 节点无法加入集群
# 检查节点名称解析
ping emqx@node1.example.com
# 检查 Erlang Cookie
cat /var/lib/emqx/.erlang.cookie
# 检查端口连通性
telnet node1.example.com 43702. 脑裂问题
现象:集群分裂成多个独立部分
解决方案:
# EMQX 配置自动愈合
cluster.autoheal = on
# 配置自动清理
cluster.autoclean = 5m3. 消息路由失败
排查:
# 检查路由表
emqx_ctl routes list
# 检查订阅
emqx_ctl subscriptions list
# 检查集群状态
emqx_ctl cluster status安全问题
1. 未授权访问
加固措施:
# 禁用匿名访问
allow_anonymous = false
# 配置认证
authentication = [
{
mechanism = password_based
backend = built_in_database
}
]
# 配置 ACL
authorization.sources = [
{
type = built_in_database
enable = true
}
]2. 证书过期
监控脚本:
#!/bin/bash
# 检查证书过期时间
CERT_FILE="/etc/emqx/certs/server.crt"
EXPIRE_DATE=$(openssl x509 -enddate -noout -in $CERT_FILE | cut -d= -f2)
EXPIRE_EPOCH=$(date -d "$EXPIRE_DATE" +%s)
NOW_EPOCH=$(date +%s)
DAYS_LEFT=$(( ($EXPIRE_EPOCH - $NOW_EPOCH) / 86400 ))
if [ $DAYS_LEFT -lt 30 ]; then
echo "警告:证书将在 $DAYS_LEFT 天后过期"
fi调试技巧
1. 开启详细日志
# EMQX
log.level = debug
# Mosquitto
log_type all2. 使用 Wireshark 抓包
过滤器: tcp.port == 18833. 使用命令行工具测试
# 订阅
mosquitto_sub -h localhost -t "test/#" -v -d
# 发布
mosquitto_pub -h localhost -t "test/topic" -m "hello" -d问题排查清单
| 问题类型 | 检查项 |
|---|---|
| 连接失败 | 网络、端口、认证、Client ID |
| 消息丢失 | QoS、Clean Session、队列 |
| 性能问题 | 系统限制、Broker 配置、主题设计 |
| 集群问题 | 节点通信、路由表、脑裂 |
| 安全问题 | 认证、ACL、TLS |
小结
MQTT 问题排查需要从连接、消息、性能、集群、安全等多个维度入手。掌握常见问题的排查方法和解决方案,能够快速定位和解决生产环境中的问题。
