RabbitMQ
简介:RabbitMQ 是一种开源的消息队列中间件,你可以把它想象成一个高效的“邮局”。它专门负责在不同应用程序之间传递消息,让系统各部分能松耦合地协作
优势:
-
异步处理:比如用户注册后,主程序将发送验证邮件的任务扔进队列就立刻返回,邮件服务后续慢慢处理,避免用户等待。
-
削峰填谷:突然的流量高峰(如秒杀活动)会被队列缓冲,避免服务器被压垮。
-
智能路由:通过交换机(Exchange)的四种路由策略(直连/主题/广播/头匹配),实现精准投递,比如将VIP用户的订单定向到专属客服队列。
-
故障恢复:支持消息持久化和确认机制,即使服务器宕机,消息也不会丢失。
同步VS异步(以实际开发为例子进行说明):
-
同步业务功能的耦合度高,异步耦合度低,可以达到解耦的效果
-
同步业务流程响应的时间长,异步响应的时间短
-
同步模式会导致并发压力向后进行传递,异步可以削峰限流
-
同步模式下系统结构弹性不足,异步模式下系统弹性强,可扩展性强
注意:在实际开发中并不是说异步模式就完全优与同步模式,在一定的场景下使用异步模式是优化系统的架构,但是在一些其它的业务场景下需要同步来保证流程的完整性。所以说异步还是同步要跟据具体业务进行选择。
底层实现:
-
AMQP(Advanced Message Queuing Protocol):AMQP 是 跨语言的通用消息协议,适合异构系统间的复杂通信。
-
JMS(Java Message Service):JMS是 Java 专属的 API 标准,适合统一 Java 生态的消息处理。
主流的MQ产品对比
对比项 | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | Erlang | Java | Java | Scala/Java |
维护方 | Rabbit(公司) | Apache(社区) | 阿里(公司) | Apache(社区) |
核心机制 | 基于 AMQP 协议的生产者-消费者模型 | 基于 JMS 的消息传递模型 | 分布式消息队列(Topic + Tag 分类) | 分布式流处理平台(发布-订阅模型) |
协议支持 | AMQP、STOMP、MQTT、HTTP 插件 | AMQP、STOMP、OpenWire、REST、MQTT | 自定义协议(支持 TCP/HTTP) | 自定义协议(社区封装 HTTP 支持) |
客户端语言 | 官方:Erlang、Java、Ruby;社区:多语言 | Java、C/C++、.NET、Python、PHP | 官方:Java;社区:C++(不成熟) | 官方:Java;社区:Python、Go、Rust 等 |
可用性 | 镜像队列、仲裁队列(Quorum Queue) | 主从复制 | 主从复制 | 分区(Partition) + 副本(Replica) |
单机吞吐量 | 约 10 万/秒 | 约 5 万/秒 | 10 万+/秒(阿里双十一验证) | 百万级/秒 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息确认 | 完整 ACK/NACK 机制 | 支持 JMS ACK 模式 | 基于数据库持久化的消息表 | 基于副本同步和 ISR 机制 |
功能特性 | ✅ 低延迟、高并发、管理界面丰富 | ✅ 老牌稳定、支持 JMS 规范 | ✅ 高吞吐、阿里生态集成、事务消息 | ✅ 高吞吐、流处理、大数据场景专用 |
适用场景 | 复杂路由、实时业务(如支付订单) | 传统企业级系统(Java 生态) | 电商高并发场景(如秒杀、订单) | 日志采集、实时分析、流式计算 |
原生RabbitMQAPI调用:
//=========================================发送消息的代码示例=================================
public class Producer { public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 设置主机地址 connectionFactory.setHost("192.168.200.100"); // 设置连接端口号:默认为 5672connectionFactory.setPort(5672);// 虚拟主机名称:默认为 /connectionFactory.setVirtualHost("/");// 设置连接用户名;默认为guest connectionFactory.setUsername("guest");// 设置连接密码;默认为guest connectionFactory.setPassword("123456");// 创建连接 Connection connection = connectionFactory.newConnection(); // 创建频道 Channel channel = connection.createChannel(); // 声明(创建)队列 // queue 参数1:队列名称 // durable 参数2:是否定义持久化队列,当 MQ 重启之后还在 // exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列 // autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除 // arguments 参数5:队列其它参数 channel.queueDeclare("simple_queue", true, false, false, null); // 要发送的信息 String message = "你好;小兔子!"; // 参数1:交换机名称,如果没有指定则使用默认Default Exchange // 参数2:路由key,简单模式可以传递队列名称 // 参数3:配置信息 // 参数4:消息内容 channel.basicPublish("", "simple_queue", null, message.getBytes()); System.out.println("已发送消息:" + message); // 关闭资源 channel.close(); connection.close(); } }
//=========================================接收消息的代码示例=================================
public class Consumer { public static void main(String[] args) throws Exception { // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2. 设置参数 factory.setHost("192.168.200.100"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest");factory.setPassword("123456"); // 3. 创建连接 Connection Connection connection = factory.newConnection(); // 4. 创建Channel Channel channel = connection.createChannel(); // 5. 创建队列 // 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建 // 参数1. queue:队列名称 // 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在 // 参数3. exclusive:是否独占。 // 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 // 参数5. arguments:其它参数。 channel.queueDeclare("simple_queue",true,false,false,null); // 接收消息 DefaultConsumer consumer = new DefaultConsumer(channel){ // 回调方法,当收到消息后,会自动执行该方法 // 参数1. consumerTag:标识 // 参数2. envelope:获取一些信息,交换机,路由key... // 参数3. properties:配置信息 // 参数4. body:数据 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; // 参数1. queue:队列名称 // 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息 // 参数3. callback:回调对象 // 消费者类似一个监听程序,主要是用来监听消息 channel.basicConsume("simple_queue",true,consumer); } }
封装RabbitMQ工具类:
public class ConnectionUtil { //跟据自己服务的具体需求进行相关ip+端口的配置(动态变化)public static final String HOST_ADDRESS = "192.168.200.100"; public static Connection getConnection() throws Exception { // 定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置服务地址 factory.setHost(HOST_ADDRESS); // 端口 factory.setPort(5672); //设置账号信息,用户名、密码、vhost factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("123456"); // 通过工程获取连接 Connection connection = factory.newConnection(); return connection; } public static void main(String[] args) throws Exception { Connection con = ConnectionUtil.getConnection(); // amqp://guest@192.168.200.100:5672/ System.out.println(con); con.close(); } }
RabbitMQ体系结构:
-
生产者(Producer):发送消息到 RabbitMQ 的应用程序。
-
消费者(Consumer):从队列中接收并处理消息的应用程序。
-
交换机(Exchange):接收生产者消息,根据类型和路由规则将消息分发到队列。
-
四大类型:
-
类型 | 路由规则 | 典型场景 |
---|---|---|
Direct | 精确匹配 Routing Key (如 order.pay ) | 一对一精准投递(如支付成功通知) |
Topic | 通配符匹配(如 order.* 或 *.pay ) | 多服务订阅同一类消息(如日志分类) |
Fanout | 广播到所有绑定队列(无视 Routing Key ) | 群发通知(如系统公告) |
Headers | 通过消息头(Headers)键值对匹配 | 复杂条件路由(需灵活匹配时) |
-
队列(Queue):定义交换机与队列之间的映射关系,指定路由规则
-
信道(Channel):复用 TCP 连接的轻量级虚拟链路,减少资源消耗。
-
虚拟主机(Virtual Host):逻辑隔离的“消息域”,不同 vhost 间资源(交换机、队列)互不干扰。
总结:RabbitMQ 通过 生产者-交换机-队列-消费者 模型实现异步通信,核心在于灵活的路由规则(交换机类型)和可靠性保障(持久化、确认机制)。
基础篇
工作模式
-
简单模式:最简单的消息队列模型,包含一个生产者、一个队列和一个消费者。生产者直接将消息发送到队列,消费者从队列中接收消息。
-
工作队列模式(Work Queues):使用默认的交换机,一个队列对应多个消费者,消息按轮询(Round-Robin)或公平分发(Fair Dispatch)分配给消费者,避免单个消费者过载。
-
发布/订阅模式(Publish/Subscribe):使用 扇形交换机(Fanout Exchange),生产者将消息发送到交换机,交换机将消息广播到所有绑定的队列,每个消费者独立接收一份消息副本。
-
路由模式(Routing):使用 直接交换机(Direct Exchange),生产者指定消息的 路由键(Routing Key),交换机根据路由键将消息精确匹配到绑定的队列。
-
主题模式(Topics):使用 主题交换机(Topic Exchange),路由键支持通配符匹配(
*
匹配一个词,#
匹配多个词)。例如路由键stock.usd.nyse
可被*.nyse
或stock.#
订阅。 -
远程过程调用(RPC):通过消息队列实现远程调用。客户端发送请求消息时附带回调队列和唯一ID,服务端处理完成后将响应发送到回调队列,客户端通过ID匹配响应。
-
发布者确认(Publisher Confirms):生产者发送消息后,RabbitMQ会异步返回确认(ACK)或未确认(NACK),确保消息成功到达交换机或队列。
工作队列模式(Work Queues)
-
并行处理能力
-
多消费者竞争消费:一个队列可绑定多个消费者,消息被并发处理,消息只会被其中的一个消费者拿到。
-
横向扩展:通过增加消费者数量,轻松应对高并发或大流量场景。
-
负载均衡机制
-
轮询分发(Round-Robin):默认策略,均摊消息到所有消费者,简单但可能因消费者性能差异导致负载不均。
-
公平分发(Fair Dispatch):通过
prefetch_count
限制消费者同时处理的消息数,确保“能者多劳”,避免慢消费者堆积任务。
-
消息可靠性保障
-
ACK确认机制:消费者处理完成后需手动发送确认(ACK),若处理失败或消费者宕机,消息自动重新入队,确保任务不丢失。
-
持久化支持:队列和消息均可设置为持久化(
durable=true
),防止RabbitMQ服务重启后数据丢失。
//================================生产端代码循环发送10次消息================================
public class Producer { public static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); for (int i = 1; i <= 10; i++) { String body = i+"hello rabbitmq~~~"; channel.basicPublish("",QUEUE_NAME,null,body.getBytes()); } channel.close(); connection.close(); } }
//================================消费端代码竞争消息================================public class Consumer1/2 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer1 body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
发布/订阅模式(Publish/Subscribe)
-
消息广播机制
-
扇形交换机(Fanout Exchange)驱动:生产者将消息发送到交换机,交换机会将消息无条件广播到所有与其绑定的队列,每个队列的消费者都能收到一份消息副本。
-
一对多分发:一条消息可被多个消费者同时接收,适用于需要广泛触达的场景(如系统通知、日志收集)。
//====================================生产者代码====================================
public class Producer { public static void main(String[] args) throws Exception { // 1、获取连接 Connection connection = ConnectionUtil.getConnection(); // 2、创建频道 Channel channel = connection.createChannel(); // 参数1. exchange:交换机名称 // 参数2. type:交换机类型 // DIRECT("direct"):定向 // FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。 // TOPIC("topic"):通配符的方式 // HEADERS("headers"):参数匹配 // 参数3. durable:是否持久化 // 参数4. autoDelete:自动删除 // 参数5. internal:内部使用。一般false // 参数6. arguments:其它参数 String exchangeName = "test_fanout"; // 3、创建交换机 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); // 4、创建队列 String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 5、绑定队列和交换机 // 参数1. queue:队列名称 // 参数2. exchange:交换机名称 // 参数3. routingKey:路由键,绑定规则 // 如果交换机的类型为fanout,routingKey设置为"" channel.queueBind(queue1Name,exchangeName,""); channel.queueBind(queue2Name,exchangeName,""); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; // 6、发送消息 channel.basicPublish(exchangeName,"",null,body.getBytes()); // 7、释放资源 channel.close(); connection.close(); } }
//====================================消费者1代码===================================
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("队列 1 消费者 1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
//====================================消费者2代码===================================
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("队列 2 消费者 2 将日志信息打印到控制台....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
路由模式(Routing)
-
基于路由键的精确分发
-
直接交换机(Direct Exchange)驱动:生产者发送消息时需指定路由键(Routing Key),交换机会将消息精确匹配到绑定相同路由键的队列。
-
条件性路由:仅当队列绑定的路由键与消息的路由键完全一致时,消息才会被投递,实现按条件分发。
-
灵活的消息过滤
-
多队列绑定不同路由键:可为同一交换机绑定多个队列,每个队列声明不同的路由键(例如
error
、info
、warning
),实现消息分类处理。 -
生产者可控性:生产者通过指定路由键决定消息的目标队列,无需消费者干预。
//================================生产者代码========================================
public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_direct"; // 创建交换机 channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null); // 创建队列 String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; // 声明(创建)队列 channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 队列绑定交换机 // 队列1绑定error channel.queueBind(queue1Name,exchangeName,"error"); // 队列2绑定info error warning channel.queueBind(queue2Name,exchangeName,"info"); channel.queueBind(queue2Name,exchangeName,"error"); channel.queueBind(queue2Name,exchangeName,"warning"); String message = "日志信息:张三调用了delete方法.错误了,日志级别warning"; // 发送消息 channel.basicPublish(exchangeName,"warning",null,message.getBytes()); System.out.println(message); // 释放资源 channel.close(); connection.close(); } }
//===============================消费者1代码========================================
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1"; channel.queueDeclare(queue1Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("Consumer1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name,true,consumer); } }
//===============================消费者2代码========================================
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue2Name,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); System.out.println("Consumer2 将日志信息存储到数据库....."); } }; channel.basicConsume(queue2Name,true,consumer); } }
主题模式(Topics)
-
基于通配符的灵活路由
-
主题交换机(Topic Exchange)驱动:生产者发送消息时指定带层级的路由键(Routing Key,如
order.europe.paid
),消费者通过绑定键(Binding Key)使用通配符(*
匹配一个词,#
匹配零或多个词)订阅消息。-
示例:绑定键
*.europe.*
可匹配order.europe.paid
或shipment.europe.delayed
。 -
绑定键
stock.#
可匹配stock.usd.nyse
或stock.eur.london.close
。
-
-
多维度匹配:支持复杂的分层路由逻辑,适用于需要按多条件分类的场景。
-
高度动态的消息过滤
-
灵活订阅:消费者可动态定义绑定键的通配规则,按需订阅特定模式的消息,无需修改生产者逻辑。
-
精准与模糊匹配结合:既能精确匹配固定路由键,也能通过通配符覆盖一类消息。
//================================生产者代码========================================
public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name,true,false,false,null); channel.queueDeclare(queue2Name,true,false,false,null); // 绑定队列和交换机 // 参数1. queue:队列名称 // 参数2. exchange:交换机名称 // 参数3. routingKey:路由键,绑定规则 // 如果交换机的类型为fanout ,routingKey设置为"" // routing key 常用格式:系统的名称.日志的级别。 // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name,exchangeName,"#.error"); channel.queueBind(queue1Name,exchangeName,"order.*"); channel.queueBind(queue2Name,exchangeName,"*.*"); // 分别发送消息到队列:order.info、goods.info、goods.error String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]"; channel.basicPublish(exchangeName,"order.info",null,body.getBytes()); body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]"; channel.basicPublish(exchangeName,"goods.info",null,body.getBytes()); body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]"; channel.basicPublish(exchangeName,"goods.error",null,body.getBytes()); channel.close(); connection.close(); } }
//================================消费者1代码=======================================
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue1"; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
//================================消费者2代码=======================================
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue2"; channel.queueDeclare(QUEUE_NAME,true,false,false,null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }