MQTT实战案例
2026/1/15大约 5 分钟
MQTT实战案例
本章通过几个实际案例,展示 MQTT 在不同场景下的应用。
案例一:智能家居系统
系统架构
主题设计
# 设备状态上报
home/{room}/{device}/status
home/living-room/light/status
home/bedroom/ac/status
# 设备控制
home/{room}/{device}/set
home/living-room/light/set
home/bedroom/ac/set
# 传感器数据
home/{room}/sensor/{type}
home/living-room/sensor/temperature
home/living-room/sensor/humidity设备端代码(Python)
import json
import time
import paho.mqtt.client as mqtt
class SmartDevice:
def __init__(self, device_id, room, device_type):
self.device_id = device_id
self.room = room
self.device_type = device_type
self.state = {"power": False, "brightness": 0}
self.client = mqtt.Client(client_id=device_id)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
# 设置遗嘱消息
status_topic = f"home/{room}/{device_type}/status"
self.client.will_set(status_topic, json.dumps({"online": False}), qos=1, retain=True)
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f"设备 {self.device_id} 连接成功")
# 订阅控制主题
control_topic = f"home/{self.room}/{self.device_type}/set"
client.subscribe(control_topic, qos=1)
# 发布上线状态
self.publish_status(online=True)
def on_message(self, client, userdata, msg):
try:
command = json.loads(msg.payload.decode())
print(f"收到指令: {command}")
self.execute_command(command)
except Exception as e:
print(f"处理指令失败: {e}")
def execute_command(self, command):
if "power" in command:
self.state["power"] = command["power"]
if "brightness" in command:
self.state["brightness"] = command["brightness"]
self.publish_status()
def publish_status(self, online=True):
status_topic = f"home/{self.room}/{self.device_type}/status"
status = {**self.state, "online": online, "timestamp": int(time.time())}
self.client.publish(status_topic, json.dumps(status), qos=1, retain=True)
def connect(self, broker, port=1883):
self.client.connect(broker, port)
self.client.loop_start()
# 使用示例
light = SmartDevice("light-001", "living-room", "light")
light.connect("localhost")控制端代码
class SmartHomeController:
def __init__(self):
self.client = mqtt.Client(client_id="controller-001")
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.device_states = {}
def on_connect(self, client, userdata, flags, rc):
# 订阅所有设备状态
client.subscribe("home/+/+/status", qos=1)
def on_message(self, client, userdata, msg):
# 解析主题获取设备信息
parts = msg.topic.split("/")
room, device = parts[1], parts[2]
key = f"{room}/{device}"
status = json.loads(msg.payload.decode())
self.device_states[key] = status
print(f"设备状态更新 [{key}]: {status}")
def control_device(self, room, device, command):
topic = f"home/{room}/{device}/set"
self.client.publish(topic, json.dumps(command), qos=1)
def turn_on_light(self, room):
self.control_device(room, "light", {"power": True, "brightness": 100})
def turn_off_all_lights(self):
for key in self.device_states:
if "light" in key:
room = key.split("/")[0]
self.control_device(room, "light", {"power": False})
# 使用
controller = SmartHomeController()
controller.client.connect("localhost")
controller.client.loop_start()
# 控制灯光
controller.turn_on_light("living-room")案例二:物联网数据采集平台
架构设计
数据上报格式
{
"deviceId": "sensor-001",
"timestamp": 1699999999,
"data": {
"temperature": 25.5,
"humidity": 60,
"pressure": 1013.25
},
"metadata": {
"location": "factory-1",
"type": "environmental"
}
}数据采集服务
import json
import time
from datetime import datetime
from influxdb_client import InfluxDBClient, Point
class DataCollector:
def __init__(self, mqtt_broker, influx_url, influx_token):
# MQTT 客户端
self.mqtt_client = mqtt.Client(client_id="data-collector")
self.mqtt_client.on_message = self.on_message
# InfluxDB 客户端
self.influx_client = InfluxDBClient(url=influx_url, token=influx_token)
self.write_api = self.influx_client.write_api()
# 告警规则
self.alert_rules = {
"temperature": {"min": 0, "max": 50},
"humidity": {"min": 20, "max": 80}
}
def on_message(self, client, userdata, msg):
try:
data = json.loads(msg.payload.decode())
self.process_data(data)
except Exception as e:
print(f"处理数据失败: {e}")
def process_data(self, data):
device_id = data["deviceId"]
timestamp = data["timestamp"]
# 写入时序数据库
for key, value in data["data"].items():
point = Point("sensor_data") \
.tag("device_id", device_id) \
.tag("location", data["metadata"].get("location", "unknown")) \
.field(key, value) \
.time(datetime.fromtimestamp(timestamp))
self.write_api.write(bucket="iot", record=point)
# 检查告警
self.check_alerts(device_id, data["data"])
def check_alerts(self, device_id, data):
for key, value in data.items():
if key in self.alert_rules:
rule = self.alert_rules[key]
if value < rule["min"] or value > rule["max"]:
self.send_alert(device_id, key, value, rule)
def send_alert(self, device_id, metric, value, rule):
alert = {
"device_id": device_id,
"metric": metric,
"value": value,
"rule": rule,
"timestamp": int(time.time())
}
self.mqtt_client.publish("alerts/sensor", json.dumps(alert), qos=1)
print(f"告警: {alert}")
def start(self, broker):
self.mqtt_client.connect(broker)
self.mqtt_client.subscribe("iot/+/telemetry", qos=1)
self.mqtt_client.loop_forever()案例三:即时通讯系统
主题设计
# 私聊消息
chat/user/{user_id}/inbox
# 群聊消息
chat/group/{group_id}/messages
# 用户状态
chat/user/{user_id}/presence
# 消息回执
chat/user/{user_id}/receipt消息格式
{
"msgId": "uuid-xxx",
"from": "user-001",
"to": "user-002",
"type": "text",
"content": "Hello!",
"timestamp": 1699999999
}聊天客户端
import uuid
import json
import time
class ChatClient:
def __init__(self, user_id):
self.user_id = user_id
self.client = mqtt.Client(client_id=f"chat-{user_id}")
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.message_handlers = []
def on_connect(self, client, userdata, flags, rc):
# 订阅私聊消息
client.subscribe(f"chat/user/{self.user_id}/inbox", qos=1)
# 发布在线状态
self.publish_presence("online")
def on_message(self, client, userdata, msg):
message = json.loads(msg.payload.decode())
# 发送已读回执
self.send_receipt(message["msgId"], message["from"])
# 通知消息处理器
for handler in self.message_handlers:
handler(message)
def send_message(self, to_user, content, msg_type="text"):
msg_id = str(uuid.uuid4())
message = {
"msgId": msg_id,
"from": self.user_id,
"to": to_user,
"type": msg_type,
"content": content,
"timestamp": int(time.time())
}
topic = f"chat/user/{to_user}/inbox"
self.client.publish(topic, json.dumps(message), qos=1)
return msg_id
def send_group_message(self, group_id, content):
msg_id = str(uuid.uuid4())
message = {
"msgId": msg_id,
"from": self.user_id,
"group": group_id,
"type": "text",
"content": content,
"timestamp": int(time.time())
}
topic = f"chat/group/{group_id}/messages"
self.client.publish(topic, json.dumps(message), qos=1)
return msg_id
def join_group(self, group_id):
self.client.subscribe(f"chat/group/{group_id}/messages", qos=1)
def publish_presence(self, status):
presence = {
"userId": self.user_id,
"status": status,
"timestamp": int(time.time())
}
topic = f"chat/user/{self.user_id}/presence"
self.client.publish(topic, json.dumps(presence), qos=1, retain=True)
def send_receipt(self, msg_id, to_user):
receipt = {
"msgId": msg_id,
"status": "read",
"timestamp": int(time.time())
}
topic = f"chat/user/{to_user}/receipt"
self.client.publish(topic, json.dumps(receipt), qos=1)
def on_message_received(self, handler):
self.message_handlers.append(handler)
def connect(self, broker):
# 设置遗嘱消息(离线状态)
presence_topic = f"chat/user/{self.user_id}/presence"
offline_presence = json.dumps({
"userId": self.user_id,
"status": "offline",
"timestamp": int(time.time())
})
self.client.will_set(presence_topic, offline_presence, qos=1, retain=True)
self.client.connect(broker)
self.client.loop_start()
# 使用示例
def message_handler(msg):
print(f"收到消息 from {msg['from']}: {msg['content']}")
client = ChatClient("user-001")
client.on_message_received(message_handler)
client.connect("localhost")
# 发送消息
client.send_message("user-002", "你好!")
client.join_group("group-001")
client.send_group_message("group-001", "大家好!")案例四:车联网系统
主题设计
# 车辆遥测数据
vehicle/{vin}/telemetry
# 车辆位置
vehicle/{vin}/location
# 远程控制
vehicle/{vin}/command
# 控制响应
vehicle/{vin}/response
# OTA 升级
vehicle/{vin}/ota车载终端代码
class VehicleTerminal:
def __init__(self, vin):
self.vin = vin
self.client = mqtt.Client(client_id=f"vehicle-{vin}")
self.client.on_message = self.on_message
# 车辆状态
self.state = {
"engine": False,
"doors_locked": True,
"ac": False,
"speed": 0,
"fuel": 80
}
def on_message(self, client, userdata, msg):
if "/command" in msg.topic:
self.handle_command(json.loads(msg.payload.decode()))
elif "/ota" in msg.topic:
self.handle_ota(json.loads(msg.payload.decode()))
def handle_command(self, command):
cmd_type = command.get("type")
result = {"success": False, "message": ""}
if cmd_type == "lock":
self.state["doors_locked"] = True
result = {"success": True, "message": "车门已锁定"}
elif cmd_type == "unlock":
self.state["doors_locked"] = False
result = {"success": True, "message": "车门已解锁"}
elif cmd_type == "start_ac":
self.state["ac"] = True
result = {"success": True, "message": "空调已开启"}
# 发送响应
response_topic = f"vehicle/{self.vin}/response"
self.client.publish(response_topic, json.dumps({
"commandId": command.get("commandId"),
**result
}), qos=1)
def report_telemetry(self):
telemetry = {
"vin": self.vin,
"timestamp": int(time.time()),
**self.state
}
topic = f"vehicle/{self.vin}/telemetry"
self.client.publish(topic, json.dumps(telemetry), qos=1)
def report_location(self, lat, lng, speed):
location = {
"vin": self.vin,
"latitude": lat,
"longitude": lng,
"speed": speed,
"timestamp": int(time.time())
}
topic = f"vehicle/{self.vin}/location"
self.client.publish(topic, json.dumps(location), qos=0)面试题预览
常见面试题
- 如何设计一个物联网平台的 MQTT 主题结构?
- MQTT 在即时通讯场景中如何保证消息可靠性?
- 车联网场景中如何处理大量车辆的实时数据?
- 如何实现 MQTT 消息的离线存储和推送?
小结
本章通过智能家居、物联网数据采集、即时通讯、车联网四个案例,展示了 MQTT 在不同场景下的应用。合理的主题设计、消息格式定义、QoS 选择是构建可靠 MQTT 应用的关键。
