本章深入讲解消息队列的核心原理、RocketMQ 和 Kafka 的架构设计等高频面试考点。
| 作用 | 说明 | 场景 |
|---|
| 解耦 | 生产者消费者独立演进 | 订单创建通知多个系统 |
| 异步 | 非核心流程异步处理 | 发短信、发邮件 |
| 削峰 | 缓冲突发流量 | 秒杀、大促 |
| 环节 | 丢失原因 | 解决方案 |
|---|
| 生产端 | 网络问题、Broker 宕机 | 同步发送 + 重试 + 确认机制 |
| Broker | 宕机、磁盘故障 | 同步刷盘 + 主从同步 |
| 消费端 | 消费失败、未提交 offset | 手动 ACK + 幂等处理 |
// RocketMQ 生产者确认
SendResult result = producer.send(message);
if (result.getSendStatus() == SendStatus.SEND_OK) {
// 发送成功
} else {
// 重试或记录
}
// Kafka 生产者确认
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 发送失败,重试
}
});
// 方案1:数据库唯一键
@Transactional
public void processOrder(OrderMessage msg) {
try {
orderMapper.insert(order); // 唯一键冲突则失败
} catch (DuplicateKeyException e) {
log.info("订单已处理,忽略重复消息");
}
}
// 方案2:Redis 去重
public void processMessage(Message msg) {
String msgId = msg.getMsgId();
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent("msg:" + msgId, "1", 24, TimeUnit.HOURS);
if (!isNew) {
log.info("消息已处理,忽略");
return;
}
// 处理消息
}
// 方案3:状态机校验
public void processOrder(OrderMessage msg) {
Order order = orderMapper.selectById(msg.getOrderId());
if (order.getStatus() != OrderStatus.CREATED) {
log.info("订单状态已变更,忽略");
return;
}
// 处理订单
}
// RocketMQ 顺序消息
// 生产者:相同订单发到同一队列
SendResult result = producer.send(message, (mqs, msg, arg) -> {
Long orderId = (Long) arg;
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}, orderId);
// 消费者:顺序消费
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-group",
consumeMode = ConsumeMode.ORDERLY
)
public class OrderConsumer implements RocketMQListener<OrderMessage> {
@Override
public void onMessage(OrderMessage message) {
// 顺序处理
}
}
// 批量消费
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-group",
consumeMessageBatchMaxSize = 100 // 批量消费
)
public class BatchConsumer implements RocketMQListener<List<OrderMessage>> {
@Override
public void onMessage(List<OrderMessage> messages) {
// 批量处理
batchProcess(messages);
}
}
| 组件 | 作用 |
|---|
| NameServer | 路由注册中心,无状态,可集群部署 |
| Broker | 消息存储和转发,分 Master/Slave |
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| 文件 | 作用 | 特点 |
|---|
| CommitLog | 存储消息体 | 顺序写,性能高 |
| ConsumeQueue | 消费队列索引 | 定长,快速定位 |
| IndexFile | 消息索引 | 支持 key/时间查询 |
| 模式 | 说明 | 数据可靠性 | 性能 |
|---|
| 异步复制 | Master 写入后立即返回 | 可能丢数据 | 高 |
| 同步双写 | Master 和 Slave 都写入后返回 | 不丢数据 | 较低 |
# Broker 配置
brokerRole=SYNC_MASTER # 同步双写
flushDiskType=SYNC_FLUSH # 同步刷盘
// 事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("tx-group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务
orderService.createOrder(order);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务状态
Order order = orderService.getByOrderNo(msg.getKeys());
if (order != null) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
});
| 概念 | 说明 |
|---|
| Broker | Kafka 服务节点 |
| Topic | 消息主题 |
| Partition | 分区,并行处理单位 |
| Replica | 副本,保证高可用 |
| Consumer Group | 消费者组,组内竞争消费 |
| 技术 | 说明 |
|---|
| 顺序写 | 磁盘顺序写性能接近内存 |
| 零拷贝 | sendfile 系统调用,减少数据拷贝 |
| 分区 | 多分区并行读写 |
| 批量 | 批量发送和消费 |
| 页缓存 | 利用操作系统页缓存 |
// 生产者配置
Properties props = new Properties();
props.put("acks", "all"); // 所有副本确认
props.put("retries", 3); // 重试次数
props.put("enable.idempotence", true); // 幂等
// 消费者配置
props.put("enable.auto.commit", false); // 关闭自动提交
// 手动提交
consumer.poll(Duration.ofMillis(100));
// 处理消息
consumer.commitSync(); // 同步提交
| 配置 | 说明 |
|---|
| acks=all | 所有 ISR 副本确认 |
| retries | 发送失败重试 |
| enable.auto.commit=false | 手动提交 offset |
| min.insync.replicas | 最小同步副本数 |
ISR 机制:
- ISR:与 Leader 保持同步的副本集合
- OSR:落后于 Leader 的副本
- HW:高水位,消费者可见的最大 offset
- LEO:日志末端偏移量
| 特性 | Kafka | RocketMQ |
|---|
| 开发语言 | Scala/Java | Java |
| 单机吞吐 | 百万级 | 十万级 |
| 消息延迟 | 毫秒级 | 毫秒级 |
| 消息可靠性 | 较高 | 高 |
| 事务消息 | 支持 | 支持(更完善) |
| 延迟消息 | 不支持 | 支持 |
| 消息回溯 | 支持 | 支持 |
| 消息轨迹 | 不支持 | 支持 |
| 管理界面 | 第三方 | 官方 Dashboard |
| 适用场景 | 大数据、日志 | 业务消息 |
// RocketMQ 延迟消息(固定级别)
Message message = new Message("topic", "Hello".getBytes());
message.setDelayTimeLevel(3); // 延迟 10 秒
// 级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// RocketMQ 5.0 任意延迟
message.setDeliveryTimestamp(System.currentTimeMillis() + 60000); // 延迟 60 秒
MQ 深入面试重点:
- 消息不丢失、不重复、顺序性
- 消息积压处理
- RocketMQ 架构和存储结构
- RocketMQ 事务消息
- Kafka 高性能原理
- Kafka ISR 机制
- 延迟消息实现
高频面试题
- 如何保证消息不丢失?
- 如何保证消息不重复消费?
- 如何保证消息顺序性?
- RocketMQ 事务消息原理?
- Kafka 为什么性能这么高?