1.消息的可靠投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
1.confirm 确认模式
2.return 退回模式
rabbitmq 整个消息投递的路径为:
producer--->rabbitmq broker--->exchange--->queue--->consumer
如果消息从 producer 到 exchange 失败则会调用confirm的一个api返回一个 confirmCallback 。
如果消息从 exchange-->queue 投递失败则会调用return的一个api返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
生产端:
ConfirmCallback 的用途
- 核心功能:监听消息是否成功到达 RabbitMQ 交换机(Exchange)。
- 如果消息成功到达交换机,RabbitMQ 会返回
ack=true
。 - 如果消息未能到达交换机(如交换机不存在、网络故障等),RabbitMQ 会返回
ack=false
并附带失败原因(cause
)。
- 如果消息成功到达交换机,RabbitMQ 会返回
- 异步通知:回调是异步触发的,不会阻塞主线程。
睡眠操作:测试代码执行完毕后,测试线程立即退出,导致RabbitMQ客户端的异步确认回调(ConfirmCallback)还未被触发,程序就终止了
测试代码与原生:
对比项 | RabbitTemplate + ConfirmCallback (Spring AMQP) | 原生 Channel + ConfirmListener (RabbitMQ Java Client) |
---|---|---|
所属框架 | Spring AMQP(对 RabbitMQ Java Client 的高级封装) | RabbitMQ 官方 Java 客户端(底层 API) |
编程复杂度 | 更简单(自动管理连接、通道、序列化等) | 更复杂(需手动管理连接、通道、序列化等) |
适用场景 | Spring/Spring Boot 项目 | 非 Spring 项目或需要精细控制的原生场景 |
//测试Confirm模式@Testpublic void testConfirm() {//定义回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Override//如果成功则ack为true,否则为falsepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm方法被执行了....");//ack 为 true表示 消息已经到达交换机if (ack) {//接收成功System.out.println("接收成功消息" + cause);} else {//接收失败System.out.println("接收失败消息" + cause);//做一些处理,让消息再次发送。}}});//进行消息发送rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm...");//测试代码执行完毕后,测试线程立即退出,导致RabbitMQ客户端的异步确认回调(ConfirmCallback)还未被触发,程序就终止了try {Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();}}
ReturnCallback 的用途
核心机制:Return 模式
- 触发条件:
当消息成功到达交换机,但无法路由到任何队列时(比如路由键routingKey
不匹配或队列不存在),RabbitMQ 会触发ReturnCallback
。
总结:
ConfirmCallback:
ConnectionFactory的publisher-confirms="true" 开启 确认模式。
使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。
ReturnCallback :
设置ConnectionFactory的publisher-returns="true" 开启 退回模式。
使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
2.Consumer Ack
消费端:
Consumer Ack
ack指Acknowledge:确认。 表示消费端收到消息后的确认方式。
代码:
@Component
public class AckListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//1、获取消息的idlong deliveryTag = message.getMessageProperties().getDeliveryTag();try {//2、获取消息System.out.println("message:"+new String(message.getBody()));//3、进行业务处理System.out.println("=====进行业务处理====");//模拟出现异常int i = 5/0;//4、进行消息签收// deliveryTag:消息的唯一id// true:确认所有小于deliveryTag的消息,全部签收;false:只确认deliveryTag对应的消息channel.basicAck(deliveryTag, true);} catch (Exception e) {//拒绝签收//第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端channel.basicNack(deliveryTag, true, true);}}
}
在此之前,你得确定你开启了自动签收,在对应的xml文件中
有三种确认方式:
•自动确认:acknowledge="none"
•手动确认:acknowledge="manual"
•根据异常情况确认:acknowledge="auto"
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息
3.消费端限流
prefetch="1":每次抓取多少条消息
消费端的确认模式一定为手动确认。acknowledge="manual"
4.TTL

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
如果两者都进行了设置,以时间短的为准。
5.死信队列
消息成为死信的三种情况:
1. 队列消息长度到达限制;
2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
3. 原队列存在消息过期设置,消息到达超时时间未被消费;
队列绑定死信交换机:
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
1. 死信交换机和死信队列和普通的没有区别
2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
xml配置代码例子:
6.延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
1. 下单后,30分钟未支付,取消订单,回滚库存。
2. 新用户注册成功7天后,发送短信问候。
实现方式:
1. 定时器
2. 延迟队列
很可惜,在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
7.消息幂等性保障--乐观锁机制
幂等性: 指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
乐观锁:
生产者发送一条消息到MQ服务器,MQ服务器再发送给消费者,但是因为某些原因(比如网络不好之类的...)导致MQ服务器以为没有签收则会再发一次,实际上消费者已经接收到了 2(甚至n条)条一模一样的消息,如果是购物那可能会导致多次扣款,就可以采用redis的方法
8.消息积压
消费者宕机挂掉了,积压消息
消费者消费能力不足积压,进大于出
发送者发流量太大
解决方案:上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,再慢慢处理