MQTT安全机制
2026/1/15大约 4 分钟
MQTT安全机制
MQTT 协议本身不强制安全机制,但在生产环境中,安全性至关重要。本章介绍 MQTT 的各种安全机制。
安全威胁
常见威胁
| 威胁类型 | 描述 | 影响 |
|---|---|---|
| 窃听 | 监听网络流量获取消息内容 | 数据泄露 |
| 中间人攻击 | 拦截并篡改消息 | 数据篡改 |
| 身份伪造 | 冒充合法客户端 | 非法访问 |
| 拒绝服务 | 大量连接耗尽资源 | 服务不可用 |
| 未授权访问 | 访问未授权的主题 | 数据泄露 |
传输层安全(TLS/SSL)
配置 TLS
TLS 加密可以防止窃听和中间人攻击。
EMQX 配置:
# emqx.conf
listeners.ssl.default = 8883
listeners.ssl.default.ssl_options.cacertfile = /etc/emqx/certs/ca.pem
listeners.ssl.default.ssl_options.certfile = /etc/emqx/certs/server.pem
listeners.ssl.default.ssl_options.keyfile = /etc/emqx/certs/server.key
listeners.ssl.default.ssl_options.verify = verify_peerMosquitto 配置:
# mosquitto.conf
listener 8883
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate true客户端 TLS 连接
# Python
import ssl
client = mqtt.Client()
client.tls_set(
ca_certs="/path/to/ca.crt",
certfile="/path/to/client.crt",
keyfile="/path/to/client.key",
tls_version=ssl.PROTOCOL_TLSv1_2
)
client.connect("broker.example.com", 8883)// Java
SSLSocketFactory socketFactory = getSSLSocketFactory(
"/path/to/ca.crt",
"/path/to/client.crt",
"/path/to/client.key"
);
options.setSocketFactory(socketFactory);
client.connect("ssl://broker.example.com:8883", options);生成证书
# 生成 CA 私钥和证书
openssl genrsa -out ca.key 2048
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt \
-subj "/CN=MQTT CA"
# 生成服务器私钥和证书
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr \
-subj "/CN=broker.example.com"
openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key \
-CAcreateserial -out server.crt
# 生成客户端私钥和证书
openssl genrsa -out client.key 2048
openssl req -new -key client.key -out client.csr \
-subj "/CN=client001"
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key \
-CAcreateserial -out client.crt用户名密码认证
配置认证
EMQX 内置数据库认证:
# 添加用户
emqx_ctl admins add admin public
# 或通过 Dashboard 管理用户
# http://localhost:18083 -> 访问控制 -> 认证Mosquitto 密码文件:
# 创建密码文件
mosquitto_passwd -c /etc/mosquitto/passwd user1
mosquitto_passwd /etc/mosquitto/passwd user2
# mosquitto.conf
allow_anonymous false
password_file /etc/mosquitto/passwd客户端认证
# Python
client.username_pw_set("username", "password")
client.connect("localhost", 1883)// Java
options.setUserName("username");
options.setPassword("password".toCharArray());访问控制(ACL)
ACL 规则
ACL 控制客户端对主题的访问权限:
规则格式:
{allow|deny} {user|client|all} {username|clientid|all} {subscribe|publish|all} {topic}EMQX ACL 配置:
%% acl.conf
%% 允许 admin 用户访问所有主题
{allow, {user, "admin"}, all, ["#"]}.
%% 允许设备只能发布到自己的主题
{allow, {client, "device-001"}, publish, ["device/device-001/#"]}.
%% 禁止所有用户访问系统主题
{deny, all, subscribe, ["$SYS/#"]}.
%% 默认拒绝
{deny, all, all, ["#"]}.Mosquitto ACL 配置:
# acl_file /etc/mosquitto/acl
# 管理员完全访问
user admin
topic readwrite #
# 设备只能访问自己的主题
pattern readwrite device/%c/#
# 所有用户可以订阅公共主题
topic read public/#动态 ACL
EMQX 支持通过 HTTP API 实现动态 ACL:
{
"type": "http",
"enable": true,
"method": "post",
"url": "http://auth-server:8080/mqtt/acl",
"headers": {
"Content-Type": "application/json"
},
"body": {
"username": "${username}",
"topic": "${topic}",
"action": "${action}"
}
}Token 认证
JWT 认证
使用 JWT Token 进行认证:
import jwt
import time
# 生成 Token
def generate_token(client_id, secret):
payload = {
"client_id": client_id,
"exp": int(time.time()) + 3600 # 1小时过期
}
return jwt.encode(payload, secret, algorithm="HS256")
# 使用 Token 连接
token = generate_token("device-001", "your-secret-key")
client.username_pw_set("device-001", token)
client.connect("localhost", 1883)EMQX JWT 认证配置
{
"mechanism": "jwt",
"from": "password",
"algorithm": "hmac-based",
"secret": "your-secret-key",
"verify_claims": {
"exp": true
}
}客户端证书认证
双向 TLS 认证(mTLS)提供最高级别的安全性:
Client Broker
| |
|--- ClientHello ------->|
|<-- ServerHello --------|
|<-- Server Certificate -|
|<-- Certificate Request-| (要求客户端证书)
|--- Client Certificate->|
|--- Certificate Verify->|
|<-- Finished -----------|
|--- Finished ---------->|# Python mTLS
client.tls_set(
ca_certs="/path/to/ca.crt",
certfile="/path/to/client.crt",
keyfile="/path/to/client.key",
cert_reqs=ssl.CERT_REQUIRED
)消息加密
应用层加密
对于敏感数据,可以在应用层额外加密:
from cryptography.fernet import Fernet
# 生成密钥
key = Fernet.generate_key()
cipher = Fernet(key)
# 加密消息
def encrypt_message(message):
return cipher.encrypt(message.encode())
# 解密消息
def decrypt_message(encrypted):
return cipher.decrypt(encrypted).decode()
# 发布加密消息
encrypted = encrypt_message('{"temp": 25.5}')
client.publish("sensor/data", encrypted)
# 接收并解密
def on_message(client, userdata, msg):
decrypted = decrypt_message(msg.payload)
print(f"解密后: {decrypted}")安全最佳实践
1. 网络层
✓ 使用 TLS 1.2 或更高版本
✓ 禁用不安全的加密套件
✓ 使用防火墙限制访问
✓ 部署在私有网络2. 认证层
✓ 禁用匿名访问
✓ 使用强密码策略
✓ 定期轮换凭证
✓ 使用证书认证(高安全场景)3. 授权层
✓ 实施最小权限原则
✓ 使用 ACL 限制主题访问
✓ 定期审计权限配置4. 监控审计
✓ 记录连接和认证日志
✓ 监控异常连接行为
✓ 设置告警规则安全配置检查清单
| 检查项 | 状态 |
|---|---|
| 启用 TLS 加密 | ☐ |
| 禁用匿名访问 | ☐ |
| 配置用户认证 | ☐ |
| 配置 ACL 规则 | ☐ |
| 限制连接速率 | ☐ |
| 启用日志审计 | ☐ |
| 定期更新证书 | ☐ |
面试题预览
常见面试题
- MQTT 有哪些安全机制?
- TLS 和 mTLS 有什么区别?
- 如何防止 MQTT 客户端访问未授权的主题?
- JWT 认证相比用户名密码有什么优势?
小结
MQTT 安全需要多层防护:TLS 保护传输安全,认证验证身份,ACL 控制访问权限。生产环境必须启用 TLS 和认证,并根据业务需求配置细粒度的 ACL 规则。
