MQTT与Spring集成
2026/1/15大约 4 分钟
MQTT与Spring集成
本章介绍如何在 Spring Boot 项目中集成 MQTT,实现消息的发布和订阅。
Spring Integration MQTT
添加依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
</dependencies>配置文件
# application.yml
mqtt:
broker: tcp://localhost:1883
client-id: spring-mqtt-client
username: admin
password: public
default-topic: test/topic
qos: 1
completion-timeout: 5000配置类
@Configuration
@EnableIntegration
public class MqttConfig {
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.default-topic}")
private String defaultTopic;
@Value("${mqtt.qos}")
private int qos;
// MQTT 连接工厂
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{broker});
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(60);
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
return factory;
}
// 入站通道(接收消息)
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
// 入站适配器(订阅)
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
clientId + "-inbound",
mqttClientFactory(),
"sensor/#", "device/+/status"
);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(qos);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
// 出站通道(发送消息)
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
// 出站适配器(发布)
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outbound() {
MqttPahoMessageHandler handler =
new MqttPahoMessageHandler(
clientId + "-outbound",
mqttClientFactory()
);
handler.setAsync(true);
handler.setDefaultTopic(defaultTopic);
handler.setDefaultQos(qos);
return handler;
}
}消息处理器
@Component
public class MqttMessageHandler {
private static final Logger log = LoggerFactory.getLogger(MqttMessageHandler.class);
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMessage(Message<?> message) {
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC, String.class);
String payload = message.getPayload().toString();
log.info("收到消息 - Topic: {}, Payload: {}", topic, payload);
// 根据主题处理消息
if (topic.startsWith("sensor/")) {
handleSensorData(topic, payload);
} else if (topic.contains("/status")) {
handleDeviceStatus(topic, payload);
}
}
private void handleSensorData(String topic, String payload) {
// 处理传感器数据
log.info("处理传感器数据: {}", payload);
}
private void handleDeviceStatus(String topic, String payload) {
// 处理设备状态
log.info("处理设备状态: {}", payload);
}
}消息发送服务
@Service
public class MqttPublisher {
@Autowired
private MessageChannel mqttOutboundChannel;
public void publish(String topic, String payload) {
Message<String> message = MessageBuilder
.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic)
.setHeader(MqttHeaders.QOS, 1)
.build();
mqttOutboundChannel.send(message);
}
public void publish(String topic, String payload, int qos, boolean retained) {
Message<String> message = MessageBuilder
.withPayload(payload)
.setHeader(MqttHeaders.TOPIC, topic)
.setHeader(MqttHeaders.QOS, qos)
.setHeader(MqttHeaders.RETAINED, retained)
.build();
mqttOutboundChannel.send(message);
}
}使用示例
@RestController
@RequestMapping("/api/mqtt")
public class MqttController {
@Autowired
private MqttPublisher mqttPublisher;
@PostMapping("/publish")
public ResponseEntity<String> publish(
@RequestParam String topic,
@RequestParam String message) {
mqttPublisher.publish(topic, message);
return ResponseEntity.ok("消息已发送");
}
@PostMapping("/device/{deviceId}/command")
public ResponseEntity<String> sendCommand(
@PathVariable String deviceId,
@RequestBody String command) {
String topic = "device/" + deviceId + "/command";
mqttPublisher.publish(topic, command, 1, false);
return ResponseEntity.ok("指令已发送");
}
}Eclipse Paho 直接集成
添加依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>MQTT 客户端配置
@Configuration
public class PahoMqttConfig {
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.client-id}")
private String clientId;
@Bean
public MqttClient mqttClient() throws MqttException {
MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(10);
options.setKeepAliveInterval(60);
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("MQTT 连接成功: {}", serverURI);
try {
client.subscribe("sensor/#", 1);
} catch (MqttException e) {
log.error("订阅失败", e);
}
}
@Override
public void connectionLost(Throwable cause) {
log.warn("MQTT 连接断开", cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
log.info("收到消息 - Topic: {}, Payload: {}",
topic, new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.debug("消息发送完成");
}
});
client.connect(options);
return client;
}
}MQTT 服务类
@Service
public class PahoMqttService {
@Autowired
private MqttClient mqttClient;
public void publish(String topic, String payload, int qos) {
try {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
mqttClient.publish(topic, message);
} catch (MqttException e) {
throw new RuntimeException("发布消息失败", e);
}
}
public void subscribe(String topic, int qos, IMqttMessageListener listener) {
try {
mqttClient.subscribe(topic, qos, listener);
} catch (MqttException e) {
throw new RuntimeException("订阅失败", e);
}
}
public void unsubscribe(String topic) {
try {
mqttClient.unsubscribe(topic);
} catch (MqttException e) {
throw new RuntimeException("取消订阅失败", e);
}
}
}动态订阅管理
@Service
public class DynamicSubscriptionService {
@Autowired
private MqttPahoMessageDrivenChannelAdapter adapter;
private final Set<String> subscriptions = ConcurrentHashMap.newKeySet();
public void addSubscription(String topic) {
if (subscriptions.add(topic)) {
adapter.addTopic(topic, 1);
log.info("添加订阅: {}", topic);
}
}
public void removeSubscription(String topic) {
if (subscriptions.remove(topic)) {
adapter.removeTopic(topic);
log.info("移除订阅: {}", topic);
}
}
public Set<String> getSubscriptions() {
return Collections.unmodifiableSet(subscriptions);
}
}消息转换器
@Component
public class JsonMqttMessageConverter implements MqttMessageConverter {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public Message<?> toMessage(String topic, MqttMessage mqttMessage) {
try {
JsonNode jsonNode = objectMapper.readTree(mqttMessage.getPayload());
return MessageBuilder
.withPayload(jsonNode)
.setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
.setHeader(MqttHeaders.RECEIVED_QOS, mqttMessage.getQos())
.setHeader(MqttHeaders.RECEIVED_RETAINED, mqttMessage.isRetained())
.build();
} catch (Exception e) {
// 非 JSON 消息,返回原始字符串
return MessageBuilder
.withPayload(new String(mqttMessage.getPayload()))
.setHeader(MqttHeaders.RECEIVED_TOPIC, topic)
.build();
}
}
@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
try {
return objectMapper.writeValueAsBytes(message.getPayload());
} catch (Exception e) {
return message.getPayload().toString().getBytes();
}
}
}异常处理
@Component
public class MqttErrorHandler implements ErrorHandler {
private static final Logger log = LoggerFactory.getLogger(MqttErrorHandler.class);
@Override
public void handleError(Throwable t) {
log.error("MQTT 消息处理异常", t);
if (t.getCause() instanceof MqttException) {
MqttException mqttException = (MqttException) t.getCause();
log.error("MQTT 错误码: {}", mqttException.getReasonCode());
}
}
}
// 配置错误处理器
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = ...;
adapter.setErrorChannel(errorChannel());
return adapter;
}
@Bean
public MessageChannel errorChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
log.error("MQTT 错误: {}", errorMessage.getPayload().getMessage());
}面试题预览
常见面试题
- Spring Integration MQTT 的工作原理是什么?
- 如何在 Spring Boot 中实现 MQTT 的动态订阅?
- MQTT 消息处理失败如何重试?
- 如何保证 Spring MQTT 客户端的高可用?
小结
Spring 提供了多种方式集成 MQTT:Spring Integration MQTT 提供声明式配置,Eclipse Paho 提供更底层的控制。根据项目需求选择合适的方式,配合消息转换器和异常处理,构建健壮的 MQTT 应用。
