欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 八卦 > RabbitMQ高级特性

RabbitMQ高级特性

2025/5/9 10:42:32 来源:https://blog.csdn.net/2303_80933038/article/details/147720025  浏览:    关键词:RabbitMQ高级特性

 1.消息的可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

1.confirm 确认模式

2.return  退回模式

rabbitmq 整个消息投递的路径为:

producer--->rabbitmq broker--->exchange--->queue--->consumer

如果消息从 producer 到 exchange 失败则会调用confirm的一个api返回一个 confirmCallback 。

如果消息从 exchange-->queue 投递失败则会调用return的一个api返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

生产端:

​​ConfirmCallback 的用途​
  • ​核心功能​​:监听消息是否成功到达 RabbitMQ 交换机(Exchange)。
    • 如果消息成功到达交换机,RabbitMQ 会返回 ack=true
    • 如果消息未能到达交换机(如交换机不存在、网络故障等),RabbitMQ 会返回 ack=false 并附带失败原因(cause)。
  • ​异步通知​​:回调是异步触发的,不会阻塞主线程。
睡眠操作:测试代码执行完毕后,测试线程立即退出,导致RabbitMQ客户端的异步确认回调(ConfirmCallback)还未被触发,程序就终止了

测试代码与原生:

​对比项​RabbitTemplate + ConfirmCallback (Spring AMQP)​​原生 Channel + ConfirmListener (RabbitMQ Java Client)​
​所属框架​Spring AMQP(对 RabbitMQ Java Client 的高级封装)RabbitMQ 官方 Java 客户端(底层 API)
​编程复杂度​更简单(自动管理连接、通道、序列化等)更复杂(需手动管理连接、通道、序列化等)
​适用场景​Spring/Spring Boot 项目非 Spring 项目或需要精细控制的原生场景
    //测试Confirm模式@Testpublic void testConfirm() {//定义回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Override//如果成功则ack为true,否则为falsepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm方法被执行了....");//ack 为  true表示 消息已经到达交换机if (ack) {//接收成功System.out.println("接收成功消息" + cause);} else {//接收失败System.out.println("接收失败消息" + cause);//做一些处理,让消息再次发送。}}});//进行消息发送rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm...");//测试代码执行完毕后,测试线程立即退出,导致RabbitMQ客户端的异步确认回调(ConfirmCallback)还未被触发,程序就终止了try {Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();}}

​​ReturnCallback 的用途

核心机制:Return 模式​

  • ​触发条件​​:
    当消息成功到达交换机,但无法路由到任何队列时(比如路由键 routingKey 不匹配或队列不存在),RabbitMQ 会触发 ReturnCallback

总结:

​​ConfirmCallback:

ConnectionFactory的publisher-confirms="true" 开启 确认模式。

使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

​​ReturnCallback

设置ConnectionFactory的publisher-returns="true" 开启 退回模式。

使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

2.Consumer Ack

消费端:

Consumer Ack

ack指Acknowledge:确认。 表示消费端收到消息后的确认方式。

代码:

@Component
public class AckListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//1、获取消息的idlong deliveryTag = message.getMessageProperties().getDeliveryTag();try {//2、获取消息System.out.println("message:"+new String(message.getBody()));//3、进行业务处理System.out.println("=====进行业务处理====");//模拟出现异常int  i = 5/0;//4、进行消息签收// deliveryTag:消息的唯一id// true:确认所有小于deliveryTag的消息,全部签收;false:只确认deliveryTag对应的消息channel.basicAck(deliveryTag, true);} catch (Exception e) {//拒绝签收//第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端channel.basicNack(deliveryTag, true, true);}}
}

在此之前,你得确定你开启了自动签收,在对应的xml文件中

有三种确认方式:

自动确认:acknowledge="none"

手动确认:acknowledge="manual"

根据异常情况确认:acknowledge="auto"

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息 

 3.消费端限流

prefetch="1":每次抓取多少条消息

消费端的确认模式一定为手动确认。acknowledge="manual"

 4.TTL

TTL 全称 Time To Live (存活 时间 / 过期时间 )。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ 可以对消息设置 过期时间 ,也可以对整个队列( Queue )设置 过期时间

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

如果两者都进行了设置,以时间短的为准。

5.死信队列

消息成为死信的三种情况:

1. 队列消息长度到达限制

2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false

3. 原队列存在消息过期设置,消息到达超时时间未被消费;

 队列绑定死信交换机:

给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

1. 死信交换机和死信队列和普通的没有区别

2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

xml配置代码例子:

 

6.延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

1. 下单后,30分钟未支付,取消订单,回滚库存。

2. 新用户注册成功7天后,发送短信问候。

实现方式:

1. 定时器

2. 延迟队列

 

很可惜,在RabbitMQ中并未提供延迟队列功能。

但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

7.消息幂等性保障--乐观锁机制

幂等性: 指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。

乐观锁:

生产者发送一条消息到MQ服务器,MQ服务器再发送给消费者,但是因为某些原因(比如网络不好之类的...)导致MQ服务器以为没有签收则会再发一次,实际上消费者已经接收到了 2(甚至n条)条一模一样的消息,如果是购物那可能会导致多次扣款,就可以采用redis的方法

8.消息积压

消费者宕机挂掉了,积压消息

消费者消费能力不足积压,进大于出

发送者发流量太大

解决方案:上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,再慢慢处理

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词