欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > 【分布式】如何使用RocketMQ实现下单-库存-支付这个场景的分布式事务问题

【分布式】如何使用RocketMQ实现下单-库存-支付这个场景的分布式事务问题

2025/6/2 5:47:07 来源:https://blog.csdn.net/weixin_45325628/article/details/146230835  浏览:    关键词:【分布式】如何使用RocketMQ实现下单-库存-支付这个场景的分布式事务问题

下单-库存-支付 场景中,通过消息队列实现最终一致性,需保证三个微服务的操作最终一致,且在支付失败或库存不足时触发回滚补偿。以下是具体实现方案:


1. 整体流程设计

正常流程(成功场景)
  1. 订单服务 创建订单(状态为待支付),发送 订单创建成功事件
  2. 库存服务 消费事件,扣减库存,发送 库存扣减成功事件
  3. 支付服务 消费事件,执行支付,发送 支付成功事件
  4. 订单服务 消费支付成功事件,更新订单状态为已完成
异常流程(失败场景)

库存不足:库存服务直接发送 库存不足事件,订单服务取消订单。
支付失败:支付服务发送 支付失败事件,触发库存回滚和订单取消。


2. 核心组件与消息设计

消息队列(以RocketMQ为例)
事件类型Topic生产者消费者说明
订单创建成功事件order_created订单服务库存服务触发库存扣减
库存扣减成功事件stock_reduced库存服务支付服务触发支付
支付成功事件payment_done支付服务订单服务完成订单
库存不足事件stock_failed库存服务订单服务取消订单
支付失败事件payment_failed支付服务订单/库存服务触发库存回滚和订单取消
数据表设计(关键字段)

订单表(Order)

CREATE TABLE orders (id VARCHAR(64) PRIMARY KEY,  -- 订单ID(全局唯一)user_id BIGINT,amount DECIMAL(10,2),status ENUM('pending', 'completed', 'canceled')  -- 订单状态
);

库存表(Inventory)

CREATE TABLE inventory (product_id BIGINT PRIMARY KEY,stock INT CHECK(stock >= 0)  -- 库存不可为负
);

3. 实现步骤

(1) 订单服务:创建订单并发送事件
// 订单服务(OrderService.java)
@Transactional
public void createOrder(Order order) {// 1. 检查参数合法性(如用户存在性)// 2. 插入订单记录(状态为pending)orderDao.insert(order);// 3. 发送订单创建成功事件(事务消息)Message msg = new Message("order_created", JSON.toJSONBytes(order));TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("order_group", msg, order.getId()  // 事务ID(用订单ID标识));// 4. 若消息发送失败,抛出异常触发事务回滚if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {throw new RuntimeException("订单创建事件发送失败");}
}
(2) 库存服务:扣减库存
// 库存服务(InventoryService.java)
@RocketMQMessageListener(topic = "order_created", consumerGroup = "inventory_group")
public class OrderCreatedListener implements RocketMQListener<MessageExt> {@Override@Transactionalpublic void onMessage(MessageExt message) {Order order = JSON.parseObject(message.getBody(), Order.class);// 1. 检查库存是否充足Inventory inventory = inventoryDao.selectByProductId(order.getProductId());if (inventory.getStock() < order.getQuantity()) {// 发送库存不足事件rocketMQTemplate.sendOneWay("stock_failed", JSON.toJSONBytes(order));return;}// 2. 扣减库存(乐观锁防止超卖)int updated = inventoryDao.reduceStock(order.getProductId(), order.getQuantity(), inventory.getVersion());if (updated == 0) {throw new RetryException("库存扣减冲突,稍后重试"); // 触发消息重试}// 3. 发送库存扣减成功事件rocketMQTemplate.sendOneWay("stock_reduced", JSON.toJSONBytes(order));}
}
(3) 支付服务:执行支付
// 支付服务(PaymentService.java)
@RocketMQMessageListener(topic = "stock_reduced", consumerGroup = "payment_group")
public class StockReducedListener implements RocketMQListener<MessageExt> {@Override@Transactionalpublic void onMessage(MessageExt message) {Order order = JSON.parseObject(message.getBody(), Order.class);// 1. 调用第三方支付接口boolean success = paymentClient.pay(order.getUserId(), order.getAmount());if (!success) {// 发送支付失败事件rocketMQTemplate.sendOneWay("payment_failed", JSON.toJSONBytes(order));return;}// 2. 发送支付成功事件rocketMQTemplate.sendOneWay("payment_done", JSON.toJSONBytes(order));}
}
(4) 订单服务:处理成功/失败事件
// 订单服务(OrderService.java)
@RocketMQMessageListener(topic = "payment_done", consumerGroup = "order_complete_group")
public class PaymentDoneListener implements RocketMQListener<MessageExt> {@Override@Transactionalpublic void onMessage(MessageExt message) {Order order = JSON.parseObject(message.getBody(), Order.class);orderDao.updateStatus(order.getId(), "completed");}
}@RocketMQMessageListener(topic = {"stock_failed", "payment_failed"}, consumerGroup = "order_cancel_group")
public class OrderCancelListener implements RocketMQListener<MessageExt> {@Override@Transactionalpublic void onMessage(MessageExt message) {Order order = JSON.parseObject(message.getBody(), Order.class);// 1. 取消订单(状态置为canceled)orderDao.updateStatus(order.getId(), "canceled");// 2. 若事件来自支付失败,需触发库存回滚(发送库存回滚事件)if (message.getTopic().equals("payment_failed")) {rocketMQTemplate.sendOneWay("stock_rollback", JSON.toJSONBytes(order));}}
}
(5) 库存服务:处理回滚事件
// 库存服务(InventoryService.java)
@RocketMQMessageListener(topic = "stock_rollback", consumerGroup = "inventory_rollback_group")
public class StockRollbackListener implements RocketMQListener<MessageExt> {@Override@Transactionalpublic void onMessage(MessageExt message) {Order order = JSON.parseObject(message.getBody(), Order.class);inventoryDao.addStock(order.getProductId(), order.getQuantity());}
}

4. 容错与补偿机制

(1) 消息可靠性

生产者端
使用 RocketMQ 事务消息,确保本地事务与消息发送的原子性。若消息发送失败,订单创建事务回滚。
消费者端
开启消费者重试(默认重试16次),若最终消费失败,消息进入死信队列(需人工干预)。

(2) 幂等性设计

订单服务
通过订单ID唯一标识,updateStatus 操作天然幂等。
库存服务
使用乐观锁(version字段)避免重复扣减。

(3) 最终一致性保障

支付失败/库存不足
通过监听失败事件触发补偿动作(订单取消 + 库存回滚)。
消息顺序性
同一订单的消息发送到同一队列(MessageQueue),保证顺序消费。


5. 方案优缺点

优点

低侵入性:业务代码仅需处理消息发送/监听,无分布式事务框架依赖。
高可用性:依赖消息队列的可靠性和重试机制,天然支持服务宕机容错。
性能友好:异步解耦,避免同步阻塞。

缺点

最终一致性延迟:依赖消息消费速度,不适用于实时性要求高的场景。
补偿逻辑需完备:需覆盖所有异常分支(如消息丢失、服务宕机)。


总结

通过消息队列实现下单-库存-支付的最终一致性,核心在于:

  1. 事件驱动:每个服务通过发布/订阅事件推进流程。
  2. 可靠消息:使用事务消息保证本地操作与消息发送的原子性。
  3. 补偿机制:监听失败事件,触发反向操作回滚状态。

此方案适合容忍短暂不一致性的场景(如电商交易),若需强一致性,可结合 Seata AT模式TCC模式

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词