欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > 007 订单支付超时自动取消订单(rabbitmq死信队列 mybatis)

007 订单支付超时自动取消订单(rabbitmq死信队列 mybatis)

2025/9/20 3:53:08 来源:https://blog.csdn.net/m0_46695127/article/details/145968588  浏览:    关键词:007 订单支付超时自动取消订单(rabbitmq死信队列 mybatis)

文章目录

  • 死信队列
    • RabbitMQ 配置类 RabbitMQConfig.java
    • 生产者 OrderTimeoutProducer.java
    • 消费者 OrderTimeoutConsumer.java
    • 应用配置 application.yml
    • pom.xml 依赖
    • 实体类 Order.java(不变)
    • Mapper 接口 OrderMapper.java(不变)
    • 服务层 OrderService.java(不变)
    • 缓存配置 CacheConfig.java(不变)
    • 对账服务 ReconciliationTask.java(不变)
    • 控制器 OrderController.java(不变)

死信队列

在代码中,processedMsgCache的类型是Cache<String, Boolean>,键是消息ID,值是Boolean。当处理成功时,会执行processedMsgCache.put(msgId, true),将消息ID存入缓存,并设置有效期1小时,最大容量10000条。这样,在1小时内,如果同样的消息再次被处理,缓存中已经有记录,就会直接ACK并跳过处理。

不过,这里需要注意的是,消息ID是否唯一。在RabbitMQ中,消息ID默认可能不是唯一的,生产者发送消息时可以设置messageId。如果生产者没有明确设置,可能无法保证唯一性,这会导致幂等性检查失效。因此,需要确保生产者发送消息时设置了唯一的messageId,或者在消费者端使用其他唯一标识,比如消息内容中的orderId结合其他属性。

另外,缓存的过期时间设置为1小时,这可能与业务场景有关。如果消息的存活时间超过1小时,可能会有重复处理的风险。需要根据实际消息的存活时间来调整缓存的过期时间,确保覆盖消息可能被重新投递的时间窗口。

还有一个问题是,缓存是本地缓存,如果消费者有多个实例,每个实例的缓存是独立的。这可能导致不同的实例处理同一条消息,因为一个实例处理过,但另一个实例的缓存中没有记录。这种情况下,本地缓存的幂等性检查可能不够,需要考虑分布式缓存,比如Redis,来保证全局唯一性。但根据当前代码,在单实例或允许短暂重复的场景下使用本地缓存。

总结来说,幂等性检查的逻辑是通过缓存已处理消息的ID,在消息处理前检查是否已存在,存在则跳过处理,避免重复执行。这适用于消息队列保证至少一次投递,但业务需要确保幂等的场景。

                      +---------------------+|   RabbitMQ Message  ||  (携带唯一messageId)   |+----------+----------+|v
+----------------+       +-------+-------+       +-----------------+
|  消息到达消费者   | ----> | 检查缓存是否存在 | ----> | 存在:直接ACK丢弃消息 |
+----------------+       +-------+-------+       +-----------------+|| 不存在v+-------+-------+       +-----------------+| 执行业务逻辑处理  | ----> | 成功:存入缓存并ACK |+---------------+       +-----------------+

缓存过期时间(1小时)> 消息最大存活时间(30分钟+重试时间)
计算公式:缓存过期时间 = 消息TTL + 最大重试时间 * 重试次数 + 缓冲时间

缓存击穿空值缓存对不存在的key也进行缓存(需设置较短过期时间)
缓存穿透布隆过滤器在缓存前增加过滤层
消费者重启持久化存储配合数据库记录处理状态
网络分区最终一致性依赖对账服务修正状态
组件类型作用说明
processedMsgCacheCaffeine缓存存储已处理消息的唯一标识
messageId字符串消息唯一标识(需生产者保证唯一性)
deliveryTag长整型RabbitMQ消息投递标识
sequenceDiagramparticipant RabbitMQparticipant Consumerparticipant Cacheparticipant DBRabbitMQ->>Consumer: 投递消息(messageId=123)Consumer->>Cache: 查询messageId=123alt 存在缓存Cache-->>Consumer: 返回trueConsumer->>RabbitMQ: 发送ACKelse 无缓存Consumer->>DB: 执行取消操作alt 操作成功Consumer->>Cache: 写入messageId=123Consumer->>RabbitMQ: 发送ACKelse 操作失败Consumer->>RabbitMQ: 发送NACK(requeue=true)endend

RabbitMQ 配置类 RabbitMQConfig.java

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 订单超时相关配置public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange";public static final String ORDER_DELAY_QUEUE = "order.delay.queue";public static final String ORDER_DELAY_ROUTING_KEY = "order.delay";// 死信队列配置public static final String ORDER_DEAD_LETTER_EXCHANGE = "order.dead.letter.exchange";public static final String ORDER_DEAD_LETTER_QUEUE = "order.dead.letter.queue";public static final String ORDER_DEAD_LETTER_ROUTING_KEY = "order.dead.letter";// 声明延迟队列(设置死信参数)@Beanpublic Queue orderDelayQueue() {return QueueBuilder.durable(ORDER_DELAY_QUEUE).withArgument("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", ORDER_DEAD_LETTER_ROUTING_KEY).build();}// 声明延迟交换机@Beanpublic DirectExchange orderDelayExchange() {return new DirectExchange(ORDER_DELAY_EXCHANGE);}// 绑定延迟队列到交换机@Beanpublic Binding delayBinding() {return BindingBuilder.bind(orderDelayQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);}// 声明死信队列@Beanpublic Queue deadLetterQueue() {return new Queue(ORDER_DEAD_LETTER_QUEUE, true);}// 声明死信交换机@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(ORDER_DEAD_LETTER_EXCHANGE);}// 绑定死信队列到交换机@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ORDER_DEAD_LETTER_ROUTING_KEY);}// JSON 消息转换器@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}
}

生产者 OrderTimeoutProducer.java

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
public class OrderTimeoutProducer {private final RabbitTemplate rabbitTemplate;public OrderTimeoutProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendTimeoutMessage(String orderId) {// 设置消息过期时间为30分钟(单位:毫秒)MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("1800000");return message;}};rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_DELAY_EXCHANGE,RabbitMQConfig.ORDER_DELAY_ROUTING_KEY,orderId,messagePostProcessor);}
}

消费者 OrderTimeoutConsumer.java

import com.github.benmanes.caffeine.cache.Cache;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;@Component
public class OrderTimeoutConsumer {private final OrderService orderService;private final Cache<String, Boolean> processedMsgCache;public OrderTimeoutConsumer(OrderService orderService, Cache<String, Boolean> processedMsgCache) {this.orderService = orderService;this.processedMsgCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).maximumSize(10000).build();}@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)public void processMessage(Message message, Channel channel) throws IOException {String orderId = new String(message.getBody(), StandardCharsets.UTF_8);String messageId = message.getMessageProperties().getMessageId();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 幂等性检查if (processedMsgCache.getIfPresent(messageId) != null) {channel.basicAck(deliveryTag, false);return;}boolean success = orderService.safeCancel(orderId);if (success) {processedMsgCache.put(messageId, true);System.out.println("订单超时取消成功: " + orderId);}channel.basicAck(deliveryTag, false);} catch (Exception e) {// 记录错误日志,重新放回队列channel.basicNack(deliveryTag, false, true);System.err.println("处理订单超时取消失败: " + orderId);e.printStackTrace();}}
}

应用配置 application.yml

spring:rabbitmq:host: ${RABBITMQ_HOST:localhost}port: 5672username: ${RABBITMQ_USER:guest}password: ${RABBITMQ_PASSWORD:guest}virtual-host: /connection-timeout: 5000template:retry:enabled: truemax-attempts: 3initial-interval: 1000mslistener:simple:acknowledge-mode: manual # 手动确认模式prefetch: 10 # 每次预取数量retry:enabled: truemax-attempts: 3initial-interval: 1000ms

pom.xml 依赖

<!-- 移除 RocketMQ 依赖 -->
<!-- 添加 RabbitMQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

实体类 Order.java(不变)

public class Order {// 保持原有实现
}

Mapper 接口 OrderMapper.java(不变)

@Mapper
public interface OrderMapper {// 保持原有SQL操作
}

服务层 OrderService.java(不变)

@Service
public class OrderService {// 保持原有业务逻辑
}

缓存配置 CacheConfig.java(不变)

@Configuration
public class CacheConfig {// 保持原有缓存配置
}

对账服务 ReconciliationTask.java(不变)

@Component
public class ReconciliationTask {// 保持原有定时任务逻辑
}

控制器 OrderController.java(不变)

@RestController
@RequestMapping("/orders")
public class OrderController {// 保持原有API接口
}

关键差异对比

功能点RocketMQ 实现RabbitMQ 实现
延迟机制内置延迟级别TTL+死信队列
消息存储持久化到CommitLog内存+磁盘持久化
消费确认自动ACK手动ACK+重试机制
消息追踪原生支持消息轨迹需要额外实现
集群方案主从复制镜像队列

部署注意事项
队列初始化:确保首次启动时自动创建所需交换机和队列
消息持久化:所有队列声明时设置durable=true
监控配置:需要监控以下指标:
死信队列消息堆积量
消费者处理耗时
消息TTL过期情况
集群配置:建议至少3节点RabbitMQ集群
故障转移:配置镜像队列实现高可用

性能优化建议
批量处理:

@RabbitListener(queues = QUEUE, containerFactory = "batchFactory")
public void processMessages(List<Message> messages) {// 批量查询缓存Map<String, Boolean> existMap = cache.getAllPresent(messageIds);// 过滤已处理消息// 批量执行数据库操作
}

异步更新:

CompletableFuture.runAsync(() -> {processedMsgCache.put(messageId, true);
});

分级缓存:

Cache<String, Boolean> L1 = Caffeine.newBuilder().maximumSize(1000).build();
Cache<String, Boolean> L2 = RedisClient.getCache();

首先看CacheConfig.java,里面有一个orderStatusCache的Bean,使用Caffeine配置,最大大小10000,过期时间10分钟。然后在OrderService.java中,有一个注入的Cache<String, OrderStatus> orderCache。看起来这里应该用的是这个Bean,因为在OrderService里用了@RequiredArgsConstructor,所以orderCache这个字段会被构造器注入,而对应的Bean应该就是CacheConfig里的orderStatusCache。所以虽然用户可能没有显式地用@Autowired,但是因为用了Lombok的@RequiredArgsConstructor,如果orderCache是final的话,Spring会自动注入同类型的Bean。这时候需要检查OrderService中的orderCache的类型是否匹配。
看OrderService的定义:private final Cache<String, OrderStatus> orderCache; 而CacheConfig中的Bean也是这个类型,所以应该会被正确注入。所以这个Bean是有被使用的,可能没注意到OrderService里的注入,导致认为没有用到。
另外,在OrderTimeoutConsumer.java中,还有一个processedMsgCache,这个是用Caffeine直接new的,并没有使用Spring管理的Bean。所以这个缓存是消费者自己创建的,和CacheConfig无关。而orderStatusCache是被OrderService使用的,所以这个配置是必须的,否则OrderService中的orderCache会找不到对应的Bean,导致启动失败。
所以可能在于误解了哪里用到了这个Bean。可能在代码中没有显式地看到@Autowired或者@Resource,但实际上因为Lombok的@RequiredArgsConstructor和Spring的自动注入机制,这个Bean是被OrderService使用的。如果没有CacheConfig中的Bean,Spring启动时会报错,提示找不到符合条件的Cache类型的Bean来注入到OrderService中。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词