MQTT集群部署
2026/1/15大约 4 分钟
MQTT集群部署
生产环境中,单节点 MQTT Broker 无法满足高可用和高并发需求,需要部署集群。本章以 EMQX 为例介绍集群部署方案。
集群架构
基本架构
集群特性
| 特性 | 说明 |
|---|---|
| 节点发现 | 自动发现集群节点 |
| 会话同步 | 会话状态跨节点同步 |
| 消息路由 | 消息自动路由到订阅者所在节点 |
| 负载均衡 | 连接分散到各节点 |
| 故障转移 | 节点故障自动切换 |
EMQX 集群部署
节点配置
节点1 配置 (/etc/emqx/emqx.conf):
## 节点名称
node.name = emqx@192.168.1.1
## 集群发现方式
cluster.discovery = static
## 静态节点列表
cluster.static.seeds = emqx@192.168.1.1,emqx@192.168.1.2,emqx@192.168.1.3
## 集群通信端口
cluster.proto_dist = inet_tcp
listener.tcp.internal = 0.0.0.0:11883节点2 配置:
node.name = emqx@192.168.1.2
cluster.discovery = static
cluster.static.seeds = emqx@192.168.1.1,emqx@192.168.1.2,emqx@192.168.1.3节点3 配置:
node.name = emqx@192.168.1.3
cluster.discovery = static
cluster.static.seeds = emqx@192.168.1.1,emqx@192.168.1.2,emqx@192.168.1.3启动集群
# 在每个节点上启动
emqx start
# 查看集群状态
emqx_ctl cluster status输出示例:
Cluster status: #{running_nodes =>
['emqx@192.168.1.1','emqx@192.168.1.2',
'emqx@192.168.1.3'],
stopped_nodes => []}手动加入集群
# 在节点2上执行
emqx_ctl cluster join emqx@192.168.1.1
# 在节点3上执行
emqx_ctl cluster join emqx@192.168.1.1Docker Compose 集群
version: '3'
services:
emqx1:
image: emqx/emqx:latest
container_name: emqx1
environment:
- EMQX_NAME=emqx
- EMQX_HOST=node1.emqx.io
- EMQX_CLUSTER__DISCOVERY_STRATEGY=static
- EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io]
networks:
emqx-bridge:
aliases:
- node1.emqx.io
ports:
- "1883:1883"
- "18083:18083"
emqx2:
image: emqx/emqx:latest
container_name: emqx2
environment:
- EMQX_NAME=emqx
- EMQX_HOST=node2.emqx.io
- EMQX_CLUSTER__DISCOVERY_STRATEGY=static
- EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io]
networks:
emqx-bridge:
aliases:
- node2.emqx.io
ports:
- "1884:1883"
- "18084:18083"
emqx3:
image: emqx/emqx:latest
container_name: emqx3
environment:
- EMQX_NAME=emqx
- EMQX_HOST=node3.emqx.io
- EMQX_CLUSTER__DISCOVERY_STRATEGY=static
- EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io]
networks:
emqx-bridge:
aliases:
- node3.emqx.io
ports:
- "1885:1883"
- "18085:18083"
networks:
emqx-bridge:
driver: bridgeKubernetes 部署
EMQX Operator
# 安装 EMQX Operator
apiVersion: apps.emqx.io/v2beta1
kind: EMQX
metadata:
name: emqx
spec:
image: emqx/emqx:5.3.0
coreTemplate:
spec:
replicas: 3
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
replicantTemplate:
spec:
replicas: 2Helm 部署
# 添加 EMQX Helm 仓库
helm repo add emqx https://repos.emqx.io/charts
helm repo update
# 安装 EMQX 集群
helm install emqx emqx/emqx \
--set replicaCount=3 \
--set service.type=LoadBalancer负载均衡配置
Nginx 配置
# nginx.conf
stream {
upstream mqtt_servers {
# 轮询负载均衡
server 192.168.1.1:1883;
server 192.168.1.2:1883;
server 192.168.1.3:1883;
}
upstream mqtt_ws_servers {
server 192.168.1.1:8083;
server 192.168.1.2:8083;
server 192.168.1.3:8083;
}
# MQTT TCP
server {
listen 1883;
proxy_pass mqtt_servers;
proxy_connect_timeout 10s;
proxy_timeout 1h;
}
# MQTT WebSocket
server {
listen 8083;
proxy_pass mqtt_ws_servers;
}
}
http {
upstream emqx_dashboard {
server 192.168.1.1:18083;
server 192.168.1.2:18083;
server 192.168.1.3:18083;
}
server {
listen 80;
location / {
proxy_pass http://emqx_dashboard;
}
}
}HAProxy 配置
# haproxy.cfg
global
daemon
maxconn 50000
defaults
mode tcp
timeout connect 10s
timeout client 1h
timeout server 1h
frontend mqtt_frontend
bind *:1883
default_backend mqtt_backend
backend mqtt_backend
balance roundrobin
server emqx1 192.168.1.1:1883 check
server emqx2 192.168.1.2:1883 check
server emqx3 192.168.1.3:1883 check
frontend mqtt_ws_frontend
bind *:8083
default_backend mqtt_ws_backend
backend mqtt_ws_backend
balance roundrobin
server emqx1 192.168.1.1:8083 check
server emqx2 192.168.1.2:8083 check
server emqx3 192.168.1.3:8083 check集群消息路由
路由原理
路由表
每个节点维护全局路由表:
+----------+------------------+
| Topic | Nodes |
+----------+------------------+
| sensor/# | [Node1, Node2] |
| device/+ | [Node1, Node3] |
| config | [Node2] |
+----------+------------------+会话持久化
内置数据库
EMQX 使用 Mnesia 存储会话:
# emqx.conf
## 会话持久化
session.persistent = true
session.storage = disc外部数据库
使用 Redis 存储会话:
# 配置 Redis 后端
backend.redis.pool1.server = 127.0.0.1:6379
backend.redis.pool1.database = 0
backend.redis.pool1.password =
# 会话存储
backend.redis.session.pool = pool1监控与运维
集群监控指标
# 查看节点状态
emqx_ctl cluster status
# 查看连接数
emqx_ctl broker stats
# 查看订阅数
emqx_ctl subscriptions listPrometheus 监控
# prometheus.yml
scrape_configs:
- job_name: 'emqx'
static_configs:
- targets:
- '192.168.1.1:18083'
- '192.168.1.2:18083'
- '192.168.1.3:18083'
metrics_path: '/api/v5/prometheus/stats'告警配置
# alertmanager rules
groups:
- name: emqx
rules:
- alert: EMQXNodeDown
expr: up{job="emqx"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "EMQX node is down"
- alert: EMQXHighConnections
expr: emqx_connections_count > 100000
for: 5m
labels:
severity: warning故障处理
节点故障
# 移除故障节点
emqx_ctl cluster leave emqx@192.168.1.3
# 强制移除
emqx_ctl cluster force-leave emqx@192.168.1.3脑裂处理
# 配置自动愈合
cluster.autoheal = on
# 配置自动清理
cluster.autoclean = 5m面试题预览
常见面试题
- MQTT 集群是如何实现消息路由的?
- 如何保证 MQTT 集群的高可用?
- MQTT 集群的会话是如何同步的?
- 如何处理 MQTT 集群的脑裂问题?
小结
MQTT 集群部署是生产环境的必要配置。通过多节点部署实现高可用,通过负载均衡分散连接压力,通过会话同步保证消息可靠性。合理的监控和告警配置能够及时发现和处理问题。
