欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 维修 > 预订接口优化:使用本地消息表保证订单生成、库存扣减的一致性

预订接口优化:使用本地消息表保证订单生成、库存扣减的一致性

2025/5/3 7:30:48 来源:https://blog.csdn.net/laodanqiu/article/details/147672978  浏览:    关键词:预订接口优化:使用本地消息表保证订单生成、库存扣减的一致性

🎯 本文介绍了一种优化预订接口的方法,通过引入本地消息表解决分布式事务中的最终一致性问题。原先的实现是在一个事务中同时扣减库存和创建订单,容易因网络不稳定导致数据不一致。改进后的方法将业务操作和消息发送封装在本地事务中,并利用MQ进行异步解耦,确保了即使在网络故障时也能保证系统的数据一致性。此外,还设计了定时任务重试机制以及幂等性保障措施来进一步确保消息被成功处理,从而实现了高效可靠的分布式事务处理。

说明

在前面的预订实现中,是先开启一个事务,然后去扣减库存,再通过RPC调用订单服务来创建订单,如果订单创建成功,就提交事务;否则回滚事务。代码实现如下:

/*** 执行下单和数据库库存扣减操作** @param timePeriodDO* @param courtIndex* @param venueId* @return*/
@Override
public OrderDO executePreserveV1(TimePeriodDO timePeriodDO,Long courtIndex, Long venueId,String stockKey, String freeIndexBitMapKey) {// 编程式开启事务,减少事务粒度,避免长事务的发生return transactionTemplate.execute(status -> {try {// 扣减当前时间段的库存,修改空闲场信息baseMapper.updateStockAndBookedSlots(timePeriodDO.getId(), timePeriodDO.getPartitionId(), courtIndex);// 调用远程服务创建订单OrderGenerateReqDTO orderGenerateReqDTO = OrderGenerateReqDTO.builder().timePeriodId(timePeriodDO.getId()).partitionId(timePeriodDO.getPartitionId()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).courtIndex(courtIndex).userId(UserContext.getUserId()).userName(UserContext.getUsername()).venueId(venueId).payAmount(timePeriodDO.getPrice()).build();Result<OrderDO> result;try {result = orderFeignService.generateOrder(orderGenerateReqDTO);if (result == null || !result.isSuccess()) {// --if-- 订单生成失败,抛出异常,上面的库存扣减也会回退throw new ServiceException(BaseErrorCode.ORDER_GENERATE_ERROR);}} catch (Exception e) {// --if-- 订单生成服务调用失败// 恢复缓存中的信息this.restoreStockAndBookedSlotsCache(timePeriodDO.getId(),UserContext.getUserId(),courtIndex,stockKey,freeIndexBitMapKey);// todo 如果说由于网络原因,实际上订单已经创建成功了,但是因为超时访问失败,这里库存却回滚了,此时需要将订单置为废弃状态(即删除)// 发送一个短暂的延时消息(时间过长,用户可能已经支付),去检查订单是否生成,如果生成,将其删除// 打印错误堆栈信息e.printStackTrace();// 把错误返回到前端throw new ServiceException(e.getMessage());}return result.getData();} catch (Exception ex) {status.setRollbackOnly();throw ex;}});
}

但是网络有时候是不稳定的,假如订单服务创建订单成功,但是由于网络原因,没办法将订单数据返回给库存服务。这时候库存服务就会误认为订单服务出错,进而回滚了事务。这样,就出现了订单创建成功,但是库存却没有扣减,出现了不一致问题,这种不一致会导致超卖。

由于库存扣减、订单生成处于不同的服务中,双方无法使用本地事务来保证两者的一致性,这属于分布式事务。常见的分布式事务解决方案有:

  • 强一致:2PC、3PC、TCC、Saga模式
  • 最终一致:本地消息表、MQ事务消息、最大努力通知
  • 工具:Seata

本文使用比较常用的本地消息表来解决

本地消息表介绍

本地消息表的核心思想:将分布式事务拆分为本地事务+异步消息,通过本地事务保证消息的可靠存储,通过重试机制确保远程业务最终执行成功。

核心步骤

  1. 本地事务与消息写入 业务执行时,先在本地数据库完成业务操作,同时将待发送的消息(含业务ID、状态等)插入同一事务的消息表,利用本地事务的ACID特性保证两者原子性。
  2. 异步轮询消息 后台定时任务扫描消息表中状态为"待发送"的消息,调用下游服务的接口。
  3. 下游服务处理 下游服务执行业务逻辑,成功后返回确认;若失败或超时,触发重试(需保证接口幂等性)。
  4. 消息状态更新 下游处理成功后,更新本地消息表中该消息状态为"已完成";若多次重试失败则标记为"失败",人工介入处理。

关键点

  • 可靠性:消息表与业务数据同库,本地事务确保业务执行成功,本地消息就会记录成功
  • 异步解耦:通过异步重试替代同步阻塞,提高系统吞吐量
  • 幂等性:下游服务调用要支持幂等性,不然重复消费可能出问题

本文实践过程

  • 预订接口首先通过缓存验证用户是否预订成功,预订成功就发送一条预订消息到MQ
  • 订单服务去消费预订消息,通过本地事务保证插入订单、插入本地消息的原子性
  • 通过定时任务轮询本地消息表中还没有执行成功的消息,将任务投递到MQ中,后面让库存服务去消费,进行库存扣减(当然这里也可以直接通过RPC调用库存服务扣减,但是为了解耦两个服务,本文使用MQ来实现)
  • 注意:库存服务执行库存扣减的时候,需要保证幂等性。即一个订单扣减过库存之后,不允许再扣减第二次。

数据库设计

首先需要创建一个表,用来记录本地消息

CREATE TABLE `local_message` (`id` bigint NOT NULL COMMENT '主键ID',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '逻辑删除 0:未删除 1:已删除',`msg_id` varchar(64) NOT NULL COMMENT '唯一消息ID',`topic` varchar(200) NOT NULL COMMENT '消息Topic',`tag` varchar(200) NOT NULL DEFAULT '' COMMENT '消息Tag',`content` text NOT NULL COMMENT '消息内容(JSON格式)',`status` tinyint NOT NULL DEFAULT '0' COMMENT '消息状态 0:待发送 1:消费失败 2:消费成功 3:超过重试次数',`fail_reason` varchar(1000) DEFAULT NULL COMMENT '失败原因',`retry_count` int NOT NULL DEFAULT '0' COMMENT '已重试次数',`next_retry_time` bigint NOT NULL DEFAULT '0' COMMENT '下次重试时间戳(毫秒)',`max_retry_count` int NOT NULL DEFAULT '3' COMMENT '最大重试次数',PRIMARY KEY (`id`),UNIQUE KEY `uk_msg_id` (`msg_id`),KEY `idx_status_retry` (`status`, `next_retry_time`),KEY `idx_topic_tag` (`topic`, `tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='本地消息表';

业务实现

枚举

package com.vrs.enums;import lombok.Getter;
import lombok.RequiredArgsConstructor;/*** 场馆类型枚举*/
@RequiredArgsConstructor
public enum LocalMessageStatusEnum {INIT(0, "待发送"),SEND_FAIL(1, "消费失败"),SEND_SUCCESS(2, "消费成功"),ARRIVE_MAX_RETRY_COUNT(3, "超过重试次数"),;@Getterprivate final int status;@Getterprivate final String msg;}

预订

首先验证令牌是否充足,充足就发送一条预订消息到 MQ

/*** 尝试获取令牌,令牌获取成功之后,发送消息,异步执行库存扣减和订单生成* 注意:令牌在极端情况下,如扣减令牌之后,服务宕机了,此时令牌的库存是小于真实库存的* 如果查询令牌发现库存为0,尝试去数据库中加载数据,加载之后库存还是0,说明时间段确实售罄了* 使用消息队列异步 扣减库存,更新缓存,生成订单** @param timePeriodId* @param courtIndex*/
@Override
public String reserve2(Long timePeriodId, Integer courtIndex) { 参数校验:使用责任链模式校验数据是否正确TimePeriodReserveReqDTO timePeriodReserveReqDTO = new TimePeriodReserveReqDTO(timePeriodId, courtIndex);chainContext.handler(ChainConstant.RESERVE_CHAIN_NAME, timePeriodReserveReqDTO);Long venueId = timePeriodReserveReqDTO.getVenueId();VenueDO venueDO = timePeriodReserveReqDTO.getVenueDO();PartitionDO partitionDO = timePeriodReserveReqDTO.getPartitionDO();TimePeriodDO timePeriodDO = timePeriodReserveReqDTO.getTimePeriodDO(); 使用lua脚本获取一个空场地对应的索引,并扣除相应的库存,同时在里面进行用户的查重// 首先检测空闲场号缓存有没有加载好,没有的话进行加载this.checkBitMapCache(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY, timePeriodReserveReqDTO.getTimePeriodId()),timePeriodId,partitionDO.getNum());// 其次检测时间段库存有没有加载好,没有的话进行加载this.getStockByTimePeriodId(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY, timePeriodReserveReqDTO.getTimePeriodId());// todo 判断是否还有令牌,没有的话,重新加载(注意要分布式锁)// 执行lua脚本Long freeCourtIndex = executeStockReduceByLua(timePeriodReserveReqDTO,venueDO,courtIndex, RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY,RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY);if (freeCourtIndex == -2L) {// --if-- 用户已经购买过该时间段throw new ClientException(BaseErrorCode.TIME_PERIOD_HAVE_BOUGHT_ERROR);} else if (freeCourtIndex == -1L) {// --if-- 没有空闲的场号,查询数据库,如果数据库中有库存,删除缓存,下一个用户预定时重新加载令牌this.refreshTokenByCheckDataBase(timePeriodId);throw new ServiceException(BaseErrorCode.TIME_PERIOD_SELL_OUT_ERROR);} 发送消息,异步更新库存并生成订单String orderSn = SnowflakeIdUtil.nextId() + String.valueOf(UserContext.getUserId() % 1000000);SendResult sendResult = executeReserveProducer.sendMessage(ExecuteReserveMqDTO.builder().orderSn(orderSn).timePeriodId(timePeriodId).courtIndex(freeCourtIndex).venueId(venueId).userId(UserContext.getUserId()).userName(UserContext.getUsername()).partitionId(partitionDO.getId()).price(timePeriodDO.getPrice()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).build());if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {log.error("消息发送失败: " + sendResult.getSendStatus());// 恢复令牌缓存this.restoreStockAndBookedSlotsCache(timePeriodId,UserContext.getUserId(),freeCourtIndex,RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY,RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY);throw new ServiceException(BaseErrorCode.MQ_SEND_ERROR);}return orderSn;
}

订单生成

消息预订消息,执行订单创建,并插入本地事务

/*** 消费预订之后的消息* 生成订单、生成本地消息** @param message*/
@Override
public void generateOrder(ExecuteReserveMqDTO message) {OrderDO orderDO = OrderDO.builder()// 订单号使用雪花算法生成分布式ID,然后再拼接用户ID的后面六位.orderSn(message.getOrderSn()).orderTime(new Date()).venueId(message.getVenueId()).partitionId(message.getPartitionId()).courtIndex(message.getCourtIndex()).timePeriodId(message.getTimePeriodId()).periodDate(message.getPeriodDate()).beginTime(message.getBeginTime()).endTime(message.getEndTime()).userId(message.getUserId()).userName(message.getUserName()).payAmount(message.getPrice()).orderStatus(OrderStatusConstant.UN_PAID).build();TimePeriodStockReduceMqDTO timePeriodStockReduceMqDTO = TimePeriodStockReduceMqDTO.builder().orderSn(message.getOrderSn()).timePeriodId(message.getTimePeriodId()).partitionId(message.getPartitionId()).courtIndex(message.getCourtIndex()).build();LocalMessageDO stockReduceLocalMessageDO = LocalMessageDO.builder().msgId(message.getOrderSn()).topic(RocketMqConstant.VENUE_TOPIC).tag(RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG).content(JSON.toJSONString(timePeriodStockReduceMqDTO)).nextRetryTime(System.currentTimeMillis()).maxRetryCount(5).build();LocalMessageDO delayCloseLocalMessageD0 = LocalMessageDO.builder().msgId(SnowflakeIdUtil.nextIdStr()).topic(RocketMqConstant.ORDER_TOPIC).tag(RocketMqConstant.ORDER_DELAY_CLOSE_TAG).content(JSON.toJSONString(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build())).nextRetryTime(System.currentTimeMillis()).maxRetryCount(5).build();// 使用编程式事务,保证订单创建、本地消息插入的一致性boolean success = transactionTemplate.execute(status -> {try {int insertCount = baseMapper.insert(orderDO);localMessageService.save(stockReduceLocalMessageDO);// 也保存一个本地消息,进行兜底。防止事务提交成功之后就宕机,延时消息没有发生成功localMessageService.save(delayCloseLocalMessageD0);return insertCount > 0;} catch (Exception ex) {status.setRollbackOnly();throw ex;}});if (success) {// 发送延时消息来关闭未支付的订单SendResult sendResult = orderDelayCloseProducer.sendMessage(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build());if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {// 延迟关单已经发生成功,后面扫描的时候,无需再处理LocalMessageDO localMessageDO = new LocalMessageDO();localMessageDO.setId(delayCloseLocalMessageD0.getId());localMessageDO.setStatus(LocalMessageStatusEnum.INIT.getStatus());localMessageService.updateById(localMessageDO);}// todo 如果出现宕机,可能出现宕机,但是 websocket 消息没有消息,所以前端还要实现一个轮询来保底// 通过 websocket 发送消息,通知前端websocketSendMessageProducer.sendMessage(WebsocketMqDTO.builder().toUsername(orderDO.getUserName()).message(JSON.toJSONString(orderDO)).build());}
}

定时任务

  • 定期扫描本地消息表(local_message)中待处理(未处理、上次处理失败、下次重试时间小于等于现在)的消息
  • 根据消息 Topic 和 tag 调用不同的消息处理器,将本地消息投递到消息队列中
  • 消息投递成功后更新消息状态,失败则通过指数退避算法计算下次重试时间,等待下次重试
  • 使用分布式锁保证集群环境下只有一个实例执行任务

【性能优化】

  • 使用流式查询,避免分页查询的无效扫描
  • 通过批量修改优化单条修改的效率

【策略模式】

  • 通过策略模式,根据不同的 tag 获得不同的 MQ 生产者,避免if else代码
package com.vrs.service.scheduled;import com.alibaba.fastjson2.JSON;
import com.vrs.constant.RocketMqConstant;
import com.vrs.design_pattern.strategy.MessageProcessor;
import com.vrs.domain.dto.mq.OrderDelayCloseMqDTO;
import com.vrs.domain.dto.mq.TimePeriodStockReduceMqDTO;
import com.vrs.domain.entity.LocalMessageDO;
import com.vrs.enums.LocalMessageStatusEnum;
import com.vrs.rocketMq.producer.OrderDelayCloseProducer;
import com.vrs.rocketMq.producer.TimePeriodStockReduceProducer;
import com.vrs.service.LocalMessageService;
import jakarta.annotation.PostConstruct;
import lombok.Cleanup;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @Author dam* @create 2024/11/17 16:44*/
@Component
@RequiredArgsConstructor
@Slf4j
public class LocalMessageScheduledScan {private final DataSource dataSource;private final LocalMessageService localMessageService;private final TimePeriodStockReduceProducer timePeriodStockReduceProducer;private final OrderDelayCloseProducer orderDelayCloseProducer;private final RedissonClient redissonClient;/*** 使用策略模式处理消息*/// todo 可以优化策略模式的写法,方便代码扩展private final Map<String, MessageProcessor> messageProcessors = new HashMap<>();private final int BATCH_SIZE = 1000;/*** 注册 tag 和其对应的消息处理器*/@PostConstructpublic void init() {messageProcessors.put(RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG, mqDTO -> {TimePeriodStockReduceMqDTO dto = JSON.parseObject(mqDTO.getContent(), TimePeriodStockReduceMqDTO.class);return timePeriodStockReduceProducer.sendMessage(dto);});messageProcessors.put(RocketMqConstant.ORDER_DELAY_CLOSE_TAG, mqDTO -> {OrderDelayCloseMqDTO dto = JSON.parseObject(mqDTO.getContent(), OrderDelayCloseMqDTO.class);return orderDelayCloseProducer.sendMessage(dto);});}/*** 定时任务:扫描并处理本地消息* 每分钟执行一次*/@Scheduled(cron = "0 */1 * * * ?")@SneakyThrowspublic void processLocalMessage() {RLock lock = redissonClient.getLock("LocalMessageScan");boolean locked = false;try {locked = lock.tryLock(1, TimeUnit.MINUTES);if (!locked) {log.warn("获取分布式锁失败,跳过本次处理");return;}log.info("开始扫描本地消息表...");long start = System.currentTimeMillis();@Cleanup Connection conn = dataSource.getConnection();@Cleanup Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);stmt.setFetchSize(Integer.MIN_VALUE);// 查询sql,只查询关键的字段String sql = "SELECT id,msg_id,topic,tag,content,retry_count,max_retry_count,next_retry_time FROM local_message where " +"is_deleted = 0 and (status = 0 OR status = 1) and next_retry_time<" + start;@Cleanup ResultSet rs = stmt.executeQuery(sql);List<LocalMessageDO> localMessageBuffer = new ArrayList<>();while (rs.next()) {// 获取数据中的属性LocalMessageDO localMessageDO = new LocalMessageDO();localMessageDO.setId(rs.getLong("id"));localMessageDO.setMsgId(rs.getString("msg_id"));localMessageDO.setTopic(rs.getString("topic"));localMessageDO.setTag(rs.getString("tag"));localMessageDO.setContent(rs.getString("content"));localMessageDO.setRetryCount(rs.getInt("retry_count"));localMessageDO.setMaxRetryCount(rs.getInt("max_retry_count"));localMessageDO.setNextRetryTime(rs.getLong("next_retry_time"));if (localMessageDO.getRetryCount() > localMessageDO.getMaxRetryCount()) continue;localMessageBuffer.add(localMessageDO);if (localMessageBuffer.size() > BATCH_SIZE) {batchProcessMessages(localMessageBuffer);localMessageBuffer.clear();}}if (!localMessageBuffer.isEmpty()) {batchProcessMessages(localMessageBuffer);}log.info("结束扫描本地消息表..." + (System.currentTimeMillis() - start) + "ms");} catch (Exception e) {log.error("处理本地消息表时发生异常", e);throw e; // 或根据业务决定是否抛出} finally {if (locked && lock.isHeldByCurrentThread()) {lock.unlock();}}}/*** 批量处理消息*/private void batchProcessMessages(List<LocalMessageDO> messages) {// 成功和失败的消息分开处理List<Long> successIds = new ArrayList<>();List<Long> retryIds = new ArrayList<>();List<Long> arriveMaxRetryCountIds = new ArrayList<>();Map<Long, String> failureReasons = new HashMap<>();for (LocalMessageDO message : messages) {try {if (message.getRetryCount() > message.getMaxRetryCount()) {// 已经到达最大重试次数arriveMaxRetryCountIds.add(message.getId());continue;}MessageProcessor processor = messageProcessors.get(message.getTag());SendResult sendResult = processor.process(message);if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {successIds.add(message.getId());} else {retryIds.add(message.getId());failureReasons.put(message.getId(), "MQ发送状态: " + sendResult.getSendStatus());}} catch (Exception e) {log.error("处理消息 {} 时发生异常", message.getMsgId(), e);retryIds.add(message.getId());failureReasons.put(message.getId(), "处理异常: " + e.getMessage());}}// 批量更新状态if (!successIds.isEmpty()) {batchUpdateMessagesStatus(successIds, LocalMessageStatusEnum.SEND_SUCCESS);}if (!arriveMaxRetryCountIds.isEmpty()) {// todo 通知人工处理batchUpdateMessagesStatus(arriveMaxRetryCountIds, LocalMessageStatusEnum.ARRIVE_MAX_RETRY_COUNT);}if (!retryIds.isEmpty()) {batchUpdateRetryMessages(retryIds, failureReasons);}}/*** 批量更新消息状态*/private void batchUpdateMessagesStatus(List<Long> ids, LocalMessageStatusEnum status) {if (ids.isEmpty()) return;List<LocalMessageDO> updates = ids.stream().map(id -> {LocalMessageDO update = new LocalMessageDO();update.setId(id);update.setStatus(status.getStatus());if (status == LocalMessageStatusEnum.SEND_FAIL) {update.setRetryCount(localMessageService.getById(id).getMaxRetryCount());}return update;}).collect(Collectors.toList());if (updates.size() > 0) {localMessageService.updateBatchById(updates);}}/*** 批量更新重试消息*/private void batchUpdateRetryMessages(List<Long> ids, Map<Long, String> failReasons) {if (ids.isEmpty()) return;List<LocalMessageDO> messages = localMessageService.listByIds(ids);List<LocalMessageDO> updates = messages.stream().map(message -> {LocalMessageDO update = new LocalMessageDO();update.setId(message.getId());update.setStatus(LocalMessageStatusEnum.SEND_FAIL.getStatus());update.setRetryCount(message.getRetryCount() + 1);update.setNextRetryTime(getNextRetryTime(message.getRetryCount() + 1));update.setFailReason(failReasons.get(message.getId()));return update;}).collect(Collectors.toList());if (updates.size() > 0) {localMessageService.updateBatchById(updates);}}/*** 获取下次重试时间** @param retryCount* @return*/private long getNextRetryTime(int retryCount) {long interval = (long) Math.min(Math.pow(2, retryCount) * 1000, 3600 * 1000);return System.currentTimeMillis() + interval;}
}

库存扣减

注意库存扣减需要通过幂等组件来保证消费幂等性,key 是订单号,即保证同一个订单号只能扣减库存一次

package com.vrs.rocketMq.listener;import com.vrs.annotation.Idempotent;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.TimePeriodStockReduceMqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.TimePeriodService;
import com.vrs.templateMethod.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.VENUE_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.VENUE_TOPIC,consumerGroup = RocketMqConstant.VENUE_CONSUMER_GROUP + "-" + RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG,messageModel = MessageModel.CLUSTERING,// 监听tagselectorType = SelectorType.TAG,selectorExpression = RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG
)
@RequiredArgsConstructor
public class TimePeriodStockReduceListener implements RocketMQListener<MessageWrapper<TimePeriodStockReduceMqDTO>> {private final TimePeriodService timePeriodService;/*** 消费消息的方法* 方法报错就会拒收消息** @param messageWrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数*/@Idempotent(uniqueKeyPrefix = "time_period_stock_reduce:",key = "#messageWrapper.getMessage().getOrderSn()+''",scene = IdempotentSceneEnum.MQ,keyTimeout = 3600L)@SneakyThrows@Overridepublic void onMessage(MessageWrapper<TimePeriodStockReduceMqDTO> messageWrapper) {// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)log.info("[消费者] 更新时间段的库存和空闲场号,时间段ID:{}", messageWrapper.getMessage().getTimePeriodId());timePeriodService.reduceStockAndBookedSlots(messageWrapper.getMessage());}
}

【service】

/*** 扣减库存** @param timePeriodStockReduceMqDTO*/
@Override
public void reduceStockAndBookedSlots(TimePeriodStockReduceMqDTO timePeriodStockReduceMqDTO) {baseMapper.updateStockAndBookedSlots(timePeriodStockReduceMqDTO.getTimePeriodId(), timePeriodStockReduceMqDTO.getPartitionId(), timePeriodStockReduceMqDTO.getCourtIndex());
}

【mapper】

<update id="updateStockAndBookedSlots"><![CDATA[UPDATE time_periodSET booked_slots = booked_slots | (1 << #{partitionIndex}), stock = stock - 1WHERE id = #{timePeriodId} AND stock > 0 AND partition_id = #{partitionId}]]>
</update>

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com