欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > RabbitMQ 知识详解(Java版)

RabbitMQ 知识详解(Java版)

2025/6/15 20:56:18 来源:https://blog.csdn.net/yuren_xia/article/details/148657345  浏览:    关键词:RabbitMQ 知识详解(Java版)

RabbitMQ 知识详解(Java版)

RabbitMQ 是一个开源的消息代理,实现了高级消息队列协议(AMQP)。它用于在分布式系统中实现应用解耦、异步通信和流量削峰。


核心概念
  1. 生产者(Producer):发送消息的应用
  2. 消费者(Consumer):接收消息的应用
  3. 队列(Queue):消息存储的缓冲区
  4. 交换机(Exchange):接收消息并路由到队列
  5. 绑定(Binding):连接交换机和队列的规则
  6. 路由键(Routing Key):消息的路由标识

交换机类型
类型路由规则典型用途
Direct精确匹配Routing Key点对点通信
Topic模式匹配(支持通配符)多条件路由
Fanout广播到所有绑定队列发布/订阅
Headers消息头键值对匹配复杂路由

Java 示例(使用官方客户端)

依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-nop</artifactId><version>1.7.30</version>
</dependency>

1. 直连交换机(Direct Exchange)

// Producer
public class DirectExchangeProducer {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 绑定不同路由键String routingKey1 = "red";String message1 = "重要消息";channel.basicPublish(EXCHANGE_NAME, routingKey1, null, message1.getBytes());System.out.println("发送消息: " + message1);String routingKey2 = "blue";String message2 = "普通消息";channel.basicPublish(EXCHANGE_NAME, routingKey2, null, message2.getBytes());System.out.println("发送消息: " + message2);}}
}// Consumer (红色队列)
public class DirectConsumerRed {private static final String EXCHANGE_NAME = "direct_exchange";private static final String QUEUE_NAME = "red_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "red");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("红色队列收到消息: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

2. 扇出交换机(Fanout Exchange)

// Producer
public class FanoutExchangeProducer {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "广播消息";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println("广播消息已发送");}}
}// Consumer (邮件队列)
public class FanoutConsumerEmail {private static final String EXCHANGE_NAME = "fanout_exchange";private static final String QUEUE_NAME = "email_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("邮件服务收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

3. 主题交换机(Topic Exchange)

// Producer
public class TopicExchangeProducer {private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 发送不同主题的消息String routingKey = "order.create";String message = "订单创建通知";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println("发送订单创建消息");routingKey = "user.login";message = "用户登录通知";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println("发送用户登录消息");}}
}// Consumer (订单服务)
public class TopicConsumerOrder {private static final String EXCHANGE_NAME = "topic_exchange";private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.*");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("订单服务收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

4. 头交换机(Headers Exchange)

// Producer
public class HeadersExchangeProducer {private static final String EXCHANGE_NAME = "headers_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "headers");// 设置消息头Map<String, Object> headers = new HashMap<>();headers.put("type", "log");headers.put("level", "error");AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();String message = "系统错误日志";channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());System.out.println("发送错误日志消息");}}
}// Consumer (日志服务)
public class HeadersConsumerLog {private static final String EXCHANGE_NAME = "headers_exchange";private static final String QUEUE_NAME = "log_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "headers");channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 设置匹配规则 (必须包含type=log且level=error)Map<String, Object> bindingArgs = new HashMap<>();bindingArgs.put("x-match", "all"); // 全部匹配bindingArgs.put("type", "log");bindingArgs.put("level", "error");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", bindingArgs);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("日志服务收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

通用配置说明

  1. 交换机类型总结:

    • 直连交换机:路由键精确匹配
    • 扇出交换机:忽略路由键,广播所有绑定队列
    • 主题交换机:使用通配符匹配路由键
    • 头交换机:通过消息头属性匹配(非路由键)
  2. 重要参数:

    • channel.queueDeclare() 参数说明:
      • durable: 是否持久化
      • exclusive: 是否排他
      • autoDelete: 是否自动删除
    • x-match参数在头交换机中有两种模式:
      • “all”: 需匹配所有指定头
      • “any”: 匹配任意指定头

关键特性(Java实现)

1. 消息持久化
// 声明持久化队列
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);// 发送持久化消息
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
2. 公平分发(Prefetch)
// 每次只分发一条消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);
3. 消息确认(ACK)
// 消费者关闭自动ACK
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});// 处理完成后手动ACK
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
4. 持久化消费者
// 重启后自动恢复的消费者
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("persistent_queue", true, false, false, args);

使用场景

  1. 服务解耦:订单系统与库存系统分离
  2. 异步处理:耗时操作(如邮件发送)
  3. 流量削峰:突发请求缓冲(秒杀系统)
  4. 分布式事务:最终一致性实现
  5. 日志收集:多系统日志聚合

最佳实践

  1. 连接管理:使用连接池(如Spring AMQP的CachingConnectionFactory)
  2. 异常处理:实现Consumer和Connection的监听器
  3. 死信队列:处理失败消息
  4. 集群部署:保证高可用性
  5. 监控管理:使用RabbitMQ Management Plugin

提示:生产环境推荐使用Spring AMQP简化开发,它提供了RabbitTemplate和@RabbitListener等便捷工具。

建议运行测试时:

  1. 先启动所有消费者
  2. 再运行生产者发送消息
  3. 观察各消费者接收到的消息是否符合路由规则

以上示例展示了RabbitMQ的核心路由机制,在实际生产环境中需添加异常处理、连接恢复、消息确认等机制。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词