欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 幼教 > RabbitMQ:业务幂等、死信交换机

RabbitMQ:业务幂等、死信交换机

2025/9/25 3:21:45 来源:https://blog.csdn.net/2301_80412275/article/details/146293572  浏览:    关键词:RabbitMQ:业务幂等、死信交换机

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的: f(x) = f(f(x)) 。在程序开发中,幂等性指的是无论操作执行多少次,结果都是一致的在消息队列的上下文中,幂等性意味着即使消费者多次接收到同一条消息,也不会对业务逻辑产生负面影响。

为什么需要幂等性?
在 RabbitMQ 中,消息可能会被重复消费,原因包括:

  • 网络问题:消费者处理完消息后,发送确认(ACK)时网络中断,RabbitMQ 未收到确认,会重新投递消息。
  • 消费者故障:消费者处理消息时崩溃,未发送确认,RabbitMQ 会重新投递消息。
  • 手动重试:业务逻辑中手动触发了消息的重新投递。

如果消费者没有实现幂等性,重复消费可能会导致数据不一致或业务逻辑错误。例如:

  1. 订单支付消息被重复消费,导致用户被多次扣款。
  2. 库存扣减消息被重复消费,导致库存数量错误。

延迟消息接收

延迟消息在以下场景中非常有用:
1、定时任务:如订单超时未支付自动取消(12306买票)。
2、重试机制:如消息处理失败后延迟重试。
3、通知提醒:如预约提醒、会议通知等。

实现延迟消息的常见方法

死信交换机机制

死信: 死信(Dead Letter)是指那些无法被正常消费的消息。
如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称 DLX)
在这里插入图片描述
私信交换机具体实现思路
消费者监听dlx队列

@RabbitListener(bindings = @QueueBinding(  value = @Queue(name = "dlx.queue", durable = "true"),  // 定义死信队列,名称为 dlx.queue,并设置为持久化exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),  // 定义死信交换机,名称为 dlx.direct,类型为 DIRECTkey = {"hi"}  // 绑定路由键为 "hi"
))  
public void listenDlxQueue(String message) {  log.info("消费者监听到 dlx.queue 的消息: [{}]", message);  // 打印从死信队列中接收到的消息
}  

定义普通交换机

@Configuration  // 声明这是一个 Spring 配置类
public class NormalConfiguration {  // 定义普通交换机@Bean  public DirectExchange normalExchange() {  return new DirectExchange(name = "normal.direct");  // 创建一个名为 normal.direct 的 DIRECT 类型交换机}  // 定义普通队列@Bean  public Queue normalQueue() {  return QueueBuilder  .durable(name = "normal.queue")  // 创建一个名为 normal.queue 的持久化队列.deadLetterExchange(dlx = "dlx.direct")  // 设置死信交换机为 dlx.direct.build();  }  // 绑定普通队列到普通交换机@Bean  public Binding normalExchangeBinding(Queue normalQueue, DirectExchange normalExchange) {  return BindingBuilder.bind(normalQueue).to(normalExchange).with(routingKey = "hi");  // 将 normal.queue 绑定到 normal.direct 交换机,并使用路由键 "hi"}  
}  

发送消息,同时设置延迟时间

@Test
void testSendDelayMessage() {  String messageId = UUID.randomUUID().toString();  // 生成唯一IDrabbitTemplate.convertAndSend(  "normal.direct",  "hi",  "hello",  message -> {  message.getMessageProperties().setExpiration("10000");  message.getMessageProperties().setCorrelationId(messageId);  // 设置消息的唯一IDreturn message;  }  );  // 将 messageId 存储到数据库或缓存中,以便后续使用
}

使用死信交换机

public void onPaymentSuccess(String messageId) {// 支付成功后的逻辑cancelDelayMessage(messageId);  // 取消延迟消息
}

删除队列里等待的消息

@Autowired
private RabbitTemplate rabbitTemplate;public void cancelDelayMessage(String messageId) {// 从队列中删除消息rabbitTemplate.execute(channel -> {GetResponse response = channel.basicGet("normal.queue", false);  // 从队列中获取消息while (response != null) {AMQP.BasicProperties properties = response.getProps();if (messageId.equals(properties.getCorrelationId())) {channel.basicAck(response.getEnvelope().getDeliveryTag(), false);  // 确认消息return null;  // 找到并删除消息}response = channel.basicGet("normal.queue", false);}return null;});
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词