在分布式消息系统中,幂等性是确保消息被消费多次仍产生相同结果的关键机制。RocketMQ本身不直接提供全局幂等性保证,需通过业务逻辑实现。以下是RocketMQ实现幂等性的核心方案:
编程相关书籍分享:https://blog.csdn.net/weixin_47763579/article/details/145855793
DeepSeek使用技巧pdf资料分享:https://blog.csdn.net/weixin_47763579/article/details/145884039
一、幂等性问题的根源
重复消费可能由以下原因引发:
- 生产者重试:网络抖动时消息可能重复发送
- Broker主从切换:未同步的消息可能被重新投递
- 消费者重试:消费失败后消息重新入队
二、RocketMQ幂等性实现方案
1. 唯一业务标识法
核心逻辑:通过业务唯一键(如订单ID)标识消息唯一性
实现步骤:
- 生产者附加唯一标识:
Message msg = new Message("OrderTopic", "Order_123456".getBytes(), // 业务唯一键作为消息Key"OrderBody".getBytes());
- 消费者检查唯一键:
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {String orderId = msg.getKeys(); // 提取唯一键if (!isProcessed(orderId)) { // 检查是否已处理processOrder(orderId); // 处理业务逻辑markAsProcessed(orderId); // 标记为已处理}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
2. 数据库唯一约束
适用场景:强一致性要求的场景(如支付系统)
实现示例:
CREATE TABLE order_records (order_id VARCHAR(64) PRIMARY KEY, -- 唯一键约束status TINYINT,...
);
消费时通过数据库事务实现:
@Transactional
public void processOrder(MessageExt msg) {String orderId = msg.getKeys();if (orderDao.exists(orderId)) return; // 已存在则跳过orderDao.insert(orderId); // 插入唯一记录// 执行业务操作...
}
3. Redis原子操作
适用场景:高频低延迟场景(如秒杀系统)
实现示例:
public boolean checkIdempotent(String messageId) {String redisKey = "msg:" + messageId;// SETNX + EXPIRE 原子操作return redisTemplate.opsForValue().setIfAbsent(redisKey, "1", Duration.ofMinutes(30));
}
4. 消息状态标记法
实现步骤:
- 消息表增加状态字段:
ALTER TABLE messages ADD