欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 资讯 > RabbitMQ

RabbitMQ

2026/4/27 0:33:44 来源:https://blog.csdn.net/m0_73569492/article/details/148529209  浏览:    关键词:RabbitMQ

文章目录

    • 什么是RabbitMQ?
      • 协议
      • AMQP协议
      • 主要组件:
      • 队列(Queue)
        • **1. 标准队列(Standard Queue)**
        • **2. 持久化队列(Durable Queue)**
        • **3. 惰性队列(Lazy Queue)**
        • **4. 镜像队列(Mirrored Queue)**
        • **5. 死信队列(Dead Letter Queue, DLQ)**
        • **6. 延迟队列(Delayed Queue)**
        • **7. 临时队列(Temporary Queue)**
        • **8. 优先级队列(Priority Queue)**
      • 交换机(Exchange)
        • **1. 直连交换机(Direct Exchange)**
        • **2. 主题交换机(Topic Exchange)**
        • **3. 扇形交换机(Fanout Exchange)**
        • **4. 头交换机(Headers Exchange)**
    • Spring AMQP
      • 介绍
      • AMQP的关键组件
        • Queue
        • Binding
        • Exchange
        • Message
      • Spring AMQP案例学习
        • 声明队列
        • 声明交换机
        • 声明绑定
          • 直连交换机:
          • 扇形交换机
          • 主题交换机
          • 头交换机
        • 声明消息转换器及确认机制
        • 确认机制容器工厂
          • 如有错误,欢迎指正!

官方文档:https://www.rabbitmq.com/docs

什么是RabbitMQ?

RabbitMQ 是一个消息代理。它接收来自发布者的消息,路由它们,并且(如果有可路由到的队列)存储它们以供使用,或者立即发送给消费者(如果有)。

发布者根据协议的不同将信息发布到不同的目的地。

协议

RabbitMQ 支持的每个协议中,发布消息的过程都非常相似。所有四种协议都允许用户发布包含有效负载(消息体)和一个或多个消息属性(消息头)的消息。

所有四种协议还支持发布者的确认机制,该机制允许发布应用程序跟踪代理已成功或未成功接受的消息,并继续发布下一批消息或重试发布当前消息。

  • AMQP协议(高级消息队列协议):RabbitMQ的核心协议
  • STOMP(简单文本导向的消息协议)
  • MQTT(消息队列遥测传输)
  • HTTP API(REST API)

AMQP协议

是 RabbitMQ 的核心协议,提供可靠、异步的消息传递。

  • 支持事务、确认机制(publisher confirms)和持久化。
  • 提供丰富的消息路由模型(交换机类型)。
  • 广泛支持各种编程语言的客户端库。

主要组件:

Broker(消息代理服务器):RabbitMQ 服务器实例,负责接收客户端连接、管理消息路由和队列

Exchange(交换机):消息路由的核心组件,根据规则将消息分发到对应队列。

  • Direct:按消息路由键(Routing Key)精确匹配队列。
  • Topic:按通配符模式(如*.order.*)匹配队列。
  • Fanout:广播消息到所有绑定的队列。
  • Headers:按消息头(Headers)属性匹配队列。

Queue(队列):消息的存储容器,保存等待消费的消息,支持持久化(保存在磁盘)、消息过期(TTL)、死信队列(DLQ)等机制。

Binding(绑定):建立交换机与队列的关联关系,定义消息路由规则。

  • 对 Direct 交换机:绑定规则为具体的 Routing Key。
  • 对 Topic 交换机:绑定规则为通配符模式。
  • 对 Fanout 交换机:绑定规则无意义(广播所有消息)。

Connection(连接):客户端与 Broker 之间的 TCP 长连接,维护通信通道。

Channel(信道):基于 Connection 的逻辑会话通道,用于执行具体的消息操作(发布、消费、确认等)

Virtual Host(虚拟主机):逻辑隔离的资源分组,类似数据库中的 “Schema”。

Message(消息):由消息体(Payload)和元数据(路由键、消息头、属性等)组成

  • Routing Key:用于 Direct/Topic 交换机的路由。
  • Headers:用于 Headers 交换机的匹配条件。
  • Delivery Mode:标记消息是否持久化(1 = 非持久,2 = 持久)。

队列(Queue)

  • 标准队列(Standard Queue)
  • 持久化队列(Durable Queue)
  • 惰性队列(Lazy Queue)
  • 镜像队列(Mirrored Queue)
  • 死信队列(Dead Letter Queue, DLQ)
  • 延迟队列(Delayed Queue)
  • 临时队列(Temporary Queue)
  • 优先级队列(Priority Queue)
1. 标准队列(Standard Queue)
  • 特点:
    • 最基础的队列类型,遵循 “生产者 - 消费者” 模型,消息被消费后从队列中移除
    • 支持基本的队列属性配置(如持久化、排他性、自动删除等)。
  • 关键属性:
    • durable:是否持久化(消息和队列元数据保存到磁盘,重启后恢复)。
    • exclusive:是否排他(仅创建它的连接可访问,连接关闭后队列自动删除)。
    • auto-delete:是否自动删除(最后一个消费者取消订阅后自动删除)。
  • 适用场景:
    • 常规的异步任务处理、消息通信。
2. 持久化队列(Durable Queue)
  • 特点:
    • 通过设置durable=true实现,队列元数据和消息会持久化到磁盘
    • 即使 RabbitMQ 服务重启,队列和未消费的消息仍可恢复。
  • 技术实现:
    • 队列声明时指定durable参数为true
    • 消息需配合deliveryMode=2(持久化标记)才能同时持久化消息体。
  • 适用场景:
    • 对消息可靠性要求高的场景(如订单处理、金融交易)。
3. 惰性队列(Lazy Queue)
  • 特点:
    • 基于lazy参数(RabbitMQ 3.6.0 + 引入),将消息优先存储在磁盘而非内存
    • 减少内存占用,适合处理海量消息(百万级以上)。
  • 技术实现:
    • 声明队列时设置"x-queue-type": "lazy"(或通过插件配置)。
    • 消费时按需将消息加载到内存,降低峰值内存压力。
  • 适用场景:
    • 日志收集、大数据流处理、消息积压缓冲。
4. 镜像队列(Mirrored Queue)
  • 特点:
    • 通过集群镜像策略,将队列数据同步到多个节点,实现高可用性。
    • 主节点故障时,从节点自动接管,避免单点故障。
  • 配置方式:
    • 通过 RabbitMQ 管理界面或策略(Policy)设置ha-mode参数(如allexactly-n)。
    • 同步方式:异步复制,可能存在短暂数据不一致。
  • 适用场景:
    • 对服务可用性要求极高的核心业务(如实时通信、交易系统)。
5. 死信队列(Dead Letter Queue, DLQ)
  • 特点:
    • 用于存储无法被正常消费的 “死信” 消息,通常因以下原因进入死信队列:
      • 消息被消费者拒绝(basic.reject/basic.nack且未重新入队)。
      • 消息超时未被消费(设置x-message-ttl)。
      • 队列达到最大长度(设置x-max-lengthx-max-length-bytes)。
  • 技术实现:
    • 为队列设置x-dead-letter-exchange参数,指定死信交换机。
    • 死信交换机再将消息路由到死信队列。
  • 适用场景:
    • 消息重试机制、异常消息监控和处理。
6. 延迟队列(Delayed Queue)
  • 特点:
    • 消息在指定延迟时间后才允许被消费,非 RabbitMQ 原生功能,需插件或间接实现。
  • 实现方式:
    • 插件方式:使用rabbitmq-delayed-message-exchange插件,通过交换机实现延迟。
    • TTL + 死信队列方式:给消息设置x-message-ttl,超时后通过死信交换机路由到目标队列。
  • 适用场景:
    • 订单超时取消、任务定时执行、消息重试延迟。
7. 临时队列(Temporary Queue)
  • 特点:
    • 通常设置为exclusive=trueauto-delete=true连接关闭后自动删除
    • 无需显式删除,适用于临时任务或一次性消费场景。
  • 典型用法:
    • 客户端创建临时队列用于响应式通信(如 RPC 模式中的回调队列)。
8. 优先级队列(Priority Queue)
  • 特点:
    • 通过x-max-priority参数设置队列优先级,高优先级消息优先被消费。
  • 技术实现:
    • 声明队列时指定x-max-priority(如 10),消息发送时设置priority属性。
  • 适用场景:
    • 任务调度(如紧急订单优先处理)、资源抢占场景。

交换机(Exchange)

  • 直连交换机(Direct Exchange)
  • 主题交换机(Topic Exchange)
  • 扇形交换机(Fanout Exchange)
  • 头交换机(Headers Exchange)
1. 直连交换机(Direct Exchange)
  • 路由规则:根据消息的路由键(Routing Key)和绑定键(Binding Key)的精确匹配进行路由。
  • 绑定方式:
    • 队列通过指定 Binding Key 绑定到直连交换机。
    • 当消息的 Routing Key 与某个队列的 Binding Key 完全匹配时,消息被路由到该队列。
  • 适用场景:
    • 基于类型的任务分发:例如,将用户注册、订单支付等不同类型的消息路由到不同队列。
    • 优先级路由:将高优先级订单路由到专门的队列处理。
2. 主题交换机(Topic Exchange)
  • 路由规则:

    根据消息的路由键和绑定键的模式匹配进行路由。绑定键支持两种通配符:

    • *(星号):匹配一个单词(例如order.*匹配order.create但不匹配order.create.paid)。
    • #(井号):匹配零个或多个单词(例如order.#匹配orderorder.createorder.create.paid)。
  • 绑定方式:

    • 队列通过带通配符的 Binding Key 绑定到主题交换机。
    • 当消息的 Routing Key 满足某个队列的 Binding Key 模式时,消息被路由到该队列。
  • 适用场景:

    • 系统事件广播:例如,user.createdorder.paid等事件按主题分类。
    • 日志分类:将不同级别的日志(如system.errorapplication.warning)路由到不同队列。
3. 扇形交换机(Fanout Exchange)
  • 路由规则:将接收到的所有消息广播到所有与之绑定的队列,完全忽略消息的路由键。
  • 绑定方式:
    • 队列直接绑定到扇形交换机,无需指定 Binding Key(或 Binding Key 为空)。
    • 交换机接收到的所有消息都会被复制到所有绑定的队列。
  • 适用场景:
    • 配置更新:当系统配置变更时,同时通知所有服务。
    • 事件广播:如系统通知、用户登录事件,需要多个服务同时处理。
4. 头交换机(Headers Exchange)
  • 路由规则:

    根据消息的头部信息(Headers)而非路由键进行匹配。匹配方式有两种:

    • x-match=all:所有指定的头键值对必须匹配。
    • x-match=any:至少一个头键值对匹配。

    绑定方式:

    • 队列通过指定 Headers 键值对和匹配方式(x-match)绑定到头交换机。
    • 当消息的 Headers 满足某个队列的绑定条件时,消息被路由到该队列。
  • 适用场景:

    • 多条件过滤:当路由规则复杂且不适合用路由键表达时。
    • 元数据路由:根据消息的元数据(如 content-type、version)路由。

Spring AMQP

官方文档:https://springdoc.cn/spring-amqp/

介绍

Spring AMQP是一个基于AMQP协议的消息中间件框架,它提供了一个简单的API来发送和接收异步、可靠的消息。

来源:https://springdoc.cn/spring-amqp/

AMQP的关键组件

Queue

在 Spring AMQP 中,Queue对象代表消息队列,是消息的存储和处理单元

参数有:

参数类型描述
nameString队列名称(必须唯一)。
durableboolean是否持久化(默认true)。持久化队列会在 Broker 重启后保留。
exclusiveboolean是否排他(默认false)。排他队列仅对首次声明它的连接可见,并在连接断开时自动删除。
autoDeleteboolean是否自动删除(默认false)。自动删除队列在最后一个消费者断开连接后自动删除。
argumentsMap<String, Object>自定义参数(如死信交换机、消息 TTL、优先级等)。
Binding

将消息从 Exchange 路由到 Queue,通过 绑定键(Binding Key)消息头(Headers)** 实现匹配。

Spring AMQP 提供了Binding类及其构建器BindingBuilder,用于声明不同类型的绑定:

import org.springframework.amqp.core.*;// 声明Binding的示例(以Direct类型为例)
Binding directBinding = BindingBuilder.bind(queue())     // 目标队列.to(directExchange()) // 源交换机.with("order.create"); // 绑定键
Exchange

在 Spring AMQP 中,Exchange对象是消息路由的核心组件,负责接收生产者发送的消息,并根据规则将消息路由到一个或多个队列

**Exchange**的实现类:

DirectExchange实现 Direct 类型交换机,路由键精确匹配。
TopicExchange实现 Topic 类型交换机,支持通配符匹配。
FanoutExchange实现 Fanout 类型交换机,广播消息到所有绑定队列。
HeadersExchange实现 Headers 类型交换机,根据消息头匹配。
CustomExchange自定义交换机类型(如 RabbitMQ 的延迟交换机x-delayed-message)。
Message

在 Spring AMQP 框架中,Message对象是消息传递的核心载体,它封装了从 AMQP broker(如 RabbitMQ)接收或发送的消息内容和元数据

主要部分

public class Message {private final byte[] body;              // 消息内容(字节数组)private final MessageProperties messageProperties; // 消息属性
}
  1. 消息体(Body):实际传输的数据(字节数组)
  2. 消息属性(MessageProperties):包含路由信息、优先级、过期时间等元数据

Spring AMQP案例学习

jdk版本:1.8

springboot版本: 2.1.6.RELEASE

rabbitmq版本: rabbitmq:3.12-management(docker部署)

在这里插入图片描述

声明队列
@Bean
public Queue queue() {return new Queue("queue", true);
}
声明交换机
/*** 直连交换机* @return*/
@Bean
public DirectExchange directExchange() {// 持久化,非自动删除return new DirectExchange("directExchange", true, false);
}/*** 扇形交换机(广播)* @return*/
@Bean
public FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange", true, false);
}/*** 主题交换机* @return*/
@Bean
public TopicExchange topicExchange() {return new TopicExchange("topicExchange", true, false);
}/*** 头交换机* @return*/
@Bean
public HeadersExchange headersExchange() {return new HeadersExchange("headersExchange", true, false);
}
声明绑定
直连交换机:
@Bean
public Binding binding() {return BindingBuilder.bind(queue()).to(directExchange()).with("red");
}
扇形交换机
@Bean
public Binding binding() {return BindingBuilder.bind(queue()).to(fanoutExchange());
}
主题交换机
@Bean
public Binding binding() {return BindingBuilder.bind(queue()).to(topicExchange()).with("topic.#");
}
头交换机
@Bean
public Binding binding() {HashMap<String, Object> headers = new HashMap<>();headers.put("key1", "value1");headers.put("key2", "value2");headers.put("key3", "value3");return BindingBuilder.bind(queue()).to(headersExchange()).whereAny(headers).match();
}
声明消息转换器及确认机制
/*** 消息转换器* 转换成json格式* @return*/
@Bean
public Jackson2JsonMessageConverter messageConverter() {return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(messageConverter()); // 设置 JSON 转换器// 启用发布确认(确保消息到达Exchange)template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息成功发送到Exchange: " + correlationData);} else {System.err.println("消息发送失败: " + cause);// 可在此处实现重试逻辑}});// 启用失败退回(当Exchange无法路由到队列时触发)template.setMandatory(true);template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.err.println("消息被退回: " + new String(message.getBody()) +", replyCode: " + replyCode +", replyText: " + replyText +", exchange: " + exchange +", routingKey: " + routingKey);});return template;
}
确认机制容器工厂
@Autowired
private ConnectionFactory connectionFactory;
// 手动确认模式的容器工厂
@Bean
public SimpleRabbitListenerContainerFactory manualAckContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);  // 手动确认factory.setPrefetchCount(1);  // 每次只处理一条消息factory.setMessageConverter(jsonMessageConverter()); // 必须设置return factory;
}// 自动确认模式的容器工厂
@Bean
public SimpleRabbitListenerContainerFactory autoAckContainerFactory() {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.AUTO);  // 自动确认factory.setMessageConverter(jsonMessageConverter()); // 必须设置factory.setPrefetchCount(250);  // 默认预取数量return factory;
}

在消费者监听时指定工厂:

@RabbitListener(queues = "queue10", containerFactory = "autoAckContainerFactory") // 自动确认
@RabbitListener(queues = "queue9", containerFactory = "manualAckContainerFactory") // 手动确认

手动确认:

/*** 测试话题交换机* 手动确认消息已消费* @param message 消息内容* @param channel RabbitMQ Channel* @param tag deliveryTag*/
@RabbitListener(queues = "queue7", containerFactory = "manualAckContainerFactory")
public void queue7(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag,@Header(AmqpHeaders.CONSUMER_QUEUE) String queueName,Message amqpMessage) {try {System.out.println("处理消息: " + message + ", 来自队列:" + queueName);// 手动确认channel.basicAck(tag, false); // 正确调用 basicAck} catch (IOException e) {try {// 拒绝消息并重新入队channel.basicNack(tag, false, true);} catch (IOException ex) {ex.printStackTrace();}}
}

测试:
在这里插入图片描述
在这里插入图片描述

如有错误,欢迎指正!

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词