欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 幼教 > RabbitMQ高级特性 - 生产者消息确认机制

RabbitMQ高级特性 - 生产者消息确认机制

2025/5/8 6:40:58 来源:https://blog.csdn.net/CYK_byte/article/details/140908709  浏览:    关键词:RabbitMQ高级特性 - 生产者消息确认机制

文章目录

  • 生产者消息确认机制
    • 概述
    • confirm 代码实现
    • return 代码实现

生产者消息确认机制


概述

在这里插入图片描述
为了保证信息 从生产者 发送到 队列,因此引入了生产者的消息确认机制.

RabbitMQ 提供了两种解决方案:

  • 通过事务机制实现.
  • 通过发送确认机制(confirm 和 return)实现.

因为事务机制比较消耗性能,在实际工作中用的也不多,因此这里主要介绍 confirm 和 return 机制来实现发送放的确认.

a)confirm 确认模式
在这里插入图片描述
如上图,confirm 确认模式主要保障于 生产者 到 交换机 的消息可靠性.

具体的,在生产者发送消息之前,给 RabbitTemplate 设置一个 ConfirmCallback 回调监听:

  • 如果 Exchange 成功收到消息,那么 ConfirmCallback 这个回调 ack 参数就为 true
  • 如果 Exchange 没有收到消息,那么 ConfirmCallback 这个回调 ack 参数就为 false

b)return 退回模式
在这里插入图片描述
如上图,confirm 确认模式主要保障于 交换机 到 队列 的消息可靠性.

具体的,在生产者发送消息之前,给 RabbitTemplate 设置一个 ReturnsCallback 回调监听:

  • 如果 Queue 成功收到 Exchange 的消息,那么 ReturnsCallback 回调监听 就不会收到任何消息.
  • 如果 Queue 没有收到 Exchange 的消息,那么 ReturnsCallback 回调监听 就会收到该消息.

confirm 代码实现

a)配置文件

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111publisher-confirm-type: correlated # 开启发送方确认机制

b)交换机、队列、绑定配置

    @Bean("confirmExchange")fun confirmExchange() = DirectExchange(MQConst.CONFIRM_EXCHANGE)@Bean("confirmQueue")fun confirmQueue() = Queue(MQConst.CONFIRM_QUEUE)@Bean("confirmBinding")fun confirmBinding(@Qualifier("confirmExchange") exchange: DirectExchange,@Qualifier("confirmQueue") queue: Queue,): Binding = BindingBuilder.bind(queue).to(exchange).with(MQConst.CONFIRM_BINDING)

c)confirmRabbitTemplate Bean 配置

@Configuration
class MQTemplateConfig {/*** 这个配置一定要有!!!(或者有大于等于 2 个的 RabbitTemplate Bean)** 这是由于 Autowired 注解自身的原因(以 rabbitmq 为例):* 如果配置文件中配置 rabbitmq 相关连接信息,那么 spring 会自动为其创建 RabbitTemplate Bean 对象* 如果配置文件中配置 rabbitmq 相关连接信息,而且代码中也配置了一个 RabbitTemplate 的 Bean(名称为 confirmRabbitTemplate),那么 Spring 将不会自动配置默认的 RabbitTemplate Bean 对象* 这就导致,我们无论代码写的注入的是 rabbitTemplate 还是 confirmRabbitTemplate,但实际上注入的都是 confirmRabbitTemplate*/@Bean("rabbitTemplate")fun rabbitTemplate(connectionFactory: ConnectionFactory): RabbitTemplate {return RabbitTemplate(connectionFactory)}@Bean("confirmRabbitTemplate")fun confirmRabbitTemplate(connectionFactory: ConnectionFactory): RabbitTemplate {val tpl = RabbitTemplate(connectionFactory)tpl.setConfirmCallback(RabbitTemplate.ConfirmCallback { correlationData, ack, cause ->println("执行了 confirm ...")if (ack) {println("confirm ack: { 消息id: ${correlationData?.id} }")} else {println("confirm nack: { 消息id: ${correlationData?.id}, cause: $cause }")//进行相应的业务处理...}})return tpl}}

d)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(val confirmRabbitTemplate: RabbitTemplate
) {@RequestMapping("/confirm")fun confirm(): String {val data = CorrelationData("1")confirmRabbitTemplate.convertAndSend(MQConst.CONFIRM_EXCHANGE, MQConst.CONFIRM_BINDING, "confirm msg 1", data)return "ok"}}

此处演示无需消费者…

e)消息正确的路由到交换机,效果如下:
在这里插入图片描述
在这里插入图片描述

f)消息没有找到交换机(发送消息时,写了一个不存在的交换机的名字),效果如下:
在这里插入图片描述
在这里插入图片描述

return 代码实现

a)配置文件

spring:application:name: rabbitmqrabbitmq:host: env-baseport: 5672username: rootpassword: 1111publisher-confirm-type: correlated # 开启发送方确认机制

b)bean 的配置

@Configuration
class MQTemplateConfig {/*** 这个配置一定要有!!!(或者有大于等于 2 个的 RabbitTemplate Bean)** 这是由于 Autowired 注解自身的原因(以 rabbitmq 为例):* 如果配置文件中配置 rabbitmq 相关连接信息,那么 spring 会自动为其创建 RabbitTemplate Bean 对象* 如果配置文件中配置 rabbitmq 相关连接信息,而且代码中也配置了一个 RabbitTemplate 的 Bean(名称为 confirmRabbitTemplate),那么 Spring 将不会自动配置默认的 RabbitTemplate Bean 对象* 这就导致,我们无论代码写的注入的是 rabbitTemplate 还是 confirmRabbitTemplate,但实际上注入的都是 confirmRabbitTemplate*/@Bean("rabbitTemplate")fun rabbitTemplate(connectionFactory: ConnectionFactory): RabbitTemplate {return RabbitTemplate(connectionFactory)}@Bean("confirmRabbitTemplate")fun confirmRabbitTemplate(connectionFactory: ConnectionFactory): RabbitTemplate {val tpl = RabbitTemplate(connectionFactory)tpl.setConfirmCallback(RabbitTemplate.ConfirmCallback { correlationData, ack, cause ->println("执行了 confirm ...")if (ack) {println("confirm ack: { 消息id: ${correlationData?.id} }")} else {println("confirm nack: { 消息id: ${correlationData?.id}, cause: $cause }")//进行相应的业务处理...}})//这里可以和 confirm模式 一起配置//mandatory = true 属性是在告诉 rabbitmq,如果一个消息无法被任何队列消费,那么该消息就会返回给发送者,此时 ReturnCallback 就会被触发//mandatory 相当于是开启 ReturnsCallback 前提tpl.setMandatory(true)tpl.setReturnsCallback(RabbitTemplate.ReturnsCallback { returned ->println("执行了 return ...")println("return: $returned")})return tpl}}

c)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(val confirmRabbitTemplate: RabbitTemplate
) {@RequestMapping("/confirm")fun confirm(): String {val data = CorrelationData("1")confirmRabbitTemplate.convertAndSend(MQConst.CONFIRM_EXCHANGE, MQConst.CONFIRM_BINDING, "confirm msg 1", data)return "ok"}}

d)正确的路由到队列,效果如下:
可以看到只有 confirm模式 被触发.
在这里插入图片描述
e)没有路由到队列(发送消息时,我改成了一个不存在的 routingKey 名字),效果如下:
在这里插入图片描述

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词