MQTT性能优化
2026/1/15大约 5 分钟
MQTT性能优化
本章介绍 MQTT 系统的性能优化策略,包括 Broker 调优、客户端优化、网络优化等方面。
性能指标
关键指标
| 指标 | 说明 | 参考值 |
|---|---|---|
| 连接数 | 同时在线客户端数 | 百万级 |
| 消息吞吐 | 每秒消息数 | 百万级 |
| 消息延迟 | 端到端延迟 | 毫秒级 |
| 内存使用 | Broker 内存占用 | 根据连接数 |
| CPU 使用 | Broker CPU 占用 | 根据消息量 |
性能测试工具
# emqtt-bench 压测工具
# 连接测试
emqtt_bench conn -h localhost -p 1883 -c 10000
# 发布测试
emqtt_bench pub -h localhost -p 1883 -c 100 -I 10 -t bench/%i -s 256
# 订阅测试
emqtt_bench sub -h localhost -p 1883 -c 100 -t bench/%i -q 1Broker 优化
EMQX 配置优化
## 系统限制
node.process_limit = 2097152
node.max_ports = 1048576
## 连接限制
listeners.tcp.default.max_connections = 1024000
listeners.tcp.default.acceptors = 64
## 消息队列
mqtt.max_mqueue_len = 1000
mqtt.mqueue_store_qos0 = false
## 会话
mqtt.max_inflight = 32
mqtt.max_awaiting_rel = 100
mqtt.await_rel_timeout = 300s
## 保留消息
retainer.max_retained_messages = 1000000
retainer.max_payload_size = 1MB系统内核优化
# /etc/sysctl.conf
# 文件描述符
fs.file-max = 2097152
fs.nr_open = 2097152
# TCP 优化
net.core.somaxconn = 32768
net.core.netdev_max_backlog = 16384
net.core.rmem_default = 262144
net.core.wmem_default = 262144
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
net.ipv4.tcp_max_syn_backlog = 8096
net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_fin_timeout = 15
# 应用配置
sysctl -p# /etc/security/limits.conf
* soft nofile 1048576
* hard nofile 1048576Mosquitto 优化
# mosquitto.conf
# 最大连接数
max_connections -1
# 消息大小限制
message_size_limit 0
# 持久化优化
persistence true
persistence_location /var/lib/mosquitto/
autosave_interval 1800
# 日志优化
log_type error
log_type warning
connection_messages false客户端优化
连接优化
# 连接池复用
class MqttConnectionPool:
def __init__(self, broker, port, pool_size=10):
self.pool = []
for i in range(pool_size):
client = mqtt.Client(client_id=f"pool-{i}")
client.connect(broker, port)
client.loop_start()
self.pool.append(client)
self.index = 0
def get_client(self):
client = self.pool[self.index]
self.index = (self.index + 1) % len(self.pool)
return client
def publish(self, topic, payload, qos=0):
client = self.get_client()
return client.publish(topic, payload, qos)批量发布
import asyncio
from asyncio_mqtt import Client
async def batch_publish(messages):
async with Client("localhost") as client:
tasks = []
for topic, payload in messages:
task = client.publish(topic, payload, qos=1)
tasks.append(task)
await asyncio.gather(*tasks)
# 使用
messages = [
("sensor/1/temp", "25.5"),
("sensor/2/temp", "26.0"),
("sensor/3/temp", "24.8"),
]
asyncio.run(batch_publish(messages))QoS 选择
场景 推荐 QoS 原因
传感器周期上报 0 数据连续,丢失影响小
设备状态变更 1 重要但允许重复
远程控制指令 1 幂等操作
金融交易 2 不允许丢失和重复Keep Alive 优化
# 根据网络环境调整
# 稳定网络:较长间隔减少心跳开销
client.connect("localhost", 1883, keepalive=120)
# 不稳定网络:较短间隔快速检测断开
client.connect("localhost", 1883, keepalive=30)主题设计优化
避免过多通配符订阅
不推荐:
订阅 #(匹配所有主题)
推荐:
订阅具体主题或有限通配符
sensor/+/temperature
device/001/#主题层级优化
不推荐(层级过深):
company/region/city/building/floor/room/device/sensor/type/value
推荐(3-5层):
company/building-floor-room/device/sensor主题别名(MQTT 5.0)
# 使用主题别名减少带宽
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
props = Properties(PacketTypes.PUBLISH)
props.TopicAlias = 1
# 第一次发布建立映射
client.publish("very/long/topic/name", "data", properties=props)
# 后续使用别名
props = Properties(PacketTypes.PUBLISH)
props.TopicAlias = 1
client.publish("", "data", properties=props) # 主题为空消息优化
消息压缩
import gzip
import json
def compress_message(data):
json_str = json.dumps(data)
return gzip.compress(json_str.encode())
def decompress_message(compressed):
json_str = gzip.decompress(compressed).decode()
return json.loads(json_str)
# 发布压缩消息
data = {"temp": 25.5, "humidity": 60, "timestamp": 1699999999}
compressed = compress_message(data)
client.publish("sensor/data", compressed)
# 接收解压
def on_message(client, userdata, msg):
data = decompress_message(msg.payload)
print(data)消息格式优化
JSON (可读性好,体积大):
{"deviceId":"001","temp":25.5,"humidity":60}
约 45 字节
Protobuf (体积小,需要 schema):
约 15 字节
MessagePack (体积小,无需 schema):
约 30 字节import msgpack
# MessagePack 序列化
data = {"deviceId": "001", "temp": 25.5, "humidity": 60}
packed = msgpack.packb(data)
client.publish("sensor/data", packed)
# 反序列化
def on_message(client, userdata, msg):
data = msgpack.unpackb(msg.payload)
print(data)网络优化
TCP 参数调优
import socket
# 设置 TCP_NODELAY 禁用 Nagle 算法
client._sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# 设置发送缓冲区
client._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536)连接复用
不推荐:
每次发布都建立新连接
推荐:
保持长连接,复用连接发布多条消息内存优化
会话清理
# 使用 Clean Session 减少服务器内存
client = mqtt.Client(clean_session=True)
# MQTT 5.0 设置会话过期
props = Properties(PacketTypes.CONNECT)
props.SessionExpiryInterval = 3600 # 1小时后过期消息队列限制
# EMQX 配置
mqtt.max_mqueue_len = 1000
mqtt.mqueue_store_qos0 = false # 不存储 QoS 0 消息保留消息清理
# 定期清理过期保留消息
def clear_retained(client, topic):
client.publish(topic, "", retain=True)监控与调优
性能监控
import time
from collections import deque
class PerformanceMonitor:
def __init__(self, window_size=1000):
self.latencies = deque(maxlen=window_size)
self.message_count = 0
self.start_time = time.time()
def record_message(self, latency):
self.latencies.append(latency)
self.message_count += 1
def get_stats(self):
elapsed = time.time() - self.start_time
return {
"throughput": self.message_count / elapsed,
"avg_latency": sum(self.latencies) / len(self.latencies) if self.latencies else 0,
"max_latency": max(self.latencies) if self.latencies else 0,
"min_latency": min(self.latencies) if self.latencies else 0,
}瓶颈分析
症状 可能原因 解决方案
连接数上不去 文件描述符限制 调整 ulimit
消息延迟高 QoS 2 过多 降低 QoS
内存持续增长 会话未清理 启用 Clean Session
CPU 使用率高 主题匹配复杂 优化主题设计性能优化检查清单
| 优化项 | 状态 |
|---|---|
| 系统内核参数调优 | ☐ |
| Broker 配置优化 | ☐ |
| 合理选择 QoS | ☐ |
| 主题设计优化 | ☐ |
| 消息格式优化 | ☐ |
| 连接复用 | ☐ |
| 会话管理 | ☐ |
| 监控告警 | ☐ |
面试题预览
常见面试题
- 如何优化 MQTT 的消息吞吐量?
- QoS 等级对性能有什么影响?
- 如何减少 MQTT 消息的网络开销?
- MQTT Broker 的内存优化有哪些方法?
小结
MQTT 性能优化需要从多个层面入手:系统层面调整内核参数,Broker 层面优化配置,客户端层面选择合适的 QoS 和连接策略,消息层面优化格式和压缩。通过监控发现瓶颈,针对性优化,才能达到最佳性能。
