欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > 【RabbitMQ】整合 SpringBoot,实现工作队列、发布/订阅、路由和通配符模式

【RabbitMQ】整合 SpringBoot,实现工作队列、发布/订阅、路由和通配符模式

2025/5/18 8:50:19 来源:https://blog.csdn.net/Yeeear/article/details/148025177  浏览:    关键词:【RabbitMQ】整合 SpringBoot,实现工作队列、发布/订阅、路由和通配符模式

文章目录

  • 工作队列模式
    • 引入依赖
    • 配置
    • 声明
    • 生产者代码
    • 消费者代码
  • 发布/订阅模式
    • 引入依赖
    • 声明
    • 生产者代码
      • 发送消息
    • 消费者代码
    • 运行程序
  • 路由模式
    • 声明
    • 生产者代码
    • 消费者代码
    • 运行程序
  • 通配符模式
    • 声明
    • 生产者代码
    • 消费者代码
    • 运行程序

工作队列模式

引入依赖

我们在创建 SpringBoot 项目的时候,选上这两个依赖即可 |380

或者在依赖中加入

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

配置

将配置文件后缀改成 yml 之后,进行配置image.png|372

#配置 RabbitMQ 的基本信息
spring:rabbitmq:  host: 127.0.0.1 #RabbitMQ 服务器的地址  port: 15673  #RabbitMQ的TCP协议的端口号,而不是管理平台的端口号。默认为5672  username: guest  password: guest  virtual-host: coding #默认为 /

或者这样写

spring:rabbitmq:addresses: amqp://guest:guest@127.0.0.1:5672/coding
  • 格式为: amqp://username:password@ip:port/virtual-host

声明

注意引入的是这个包
image.png

package org.example.rabbitmq.config;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.core.QueueBuilder;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  @Configuration  
public class RabbitMQConfig {  // 声明一个队列,来自第三方包,就是一个对象  @Bean("workQueue")  public Queue workQueue(){  return QueueBuilder.durable(Constants.WORK_QUEUE).build();  }  
}

生产者代码

package org.example.rabbitmq.controller;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  @RestController  
@RequestMapping("/producer")  
public class ProducerController {  @Autowired  private RabbitTemplate rabbitTemplate;  @RequestMapping("/work")  public String work() {  // 使用内置交换机的话,RoutingKey 和队列名称一致  rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp: work...");  return "发送成功";  }  
}
  • 在运行程序之后,队列不会被立马创建出来
  • 需要发送消息之后才会被创建image.png|278

消费者代码

消费者是通过实现一个监听类,来监听有没有消息

  • 采用一个注解—— @RabbitListener

@RabbitListenerSpring 框架中用于监听 RabbitMQ 队列的注解,通过使用这个注解,可以定义一个方法,以便从 RabbitMQ 队列中接收消息。

  • 该注解支持多种参数类型,这些参数类型代表了从 RabbitMQ 接收到的消息和相关信息
  • 以下是一些常用的参数类型:
    • String:返回消息的内容
    • Message (org.spring.framework.ampq.core.Message):Spring AMPQMessage 类,返回原始的消息体以及消息的属性,如消息 ID,内容,队列信息等
    • Channel (com.rabbitmq.client.Channel):RabbitMQ 的通道对象,可以用于进行高级的操作,如手动确认消息
package org.example.rabbitmq.listener;  import org.apache.logging.log4j.message.Message;  
import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  @Component  
public class WorkListener {  @RabbitListener(queues = Constants.WORK_QUEUE)  public void queueListener1(Message message) {  System.out.println("listener 1 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message);  }  @RabbitListener(queues = Constants.WORK_QUEUE)  public void queueListener2(String message) {  System.out.println("listener 2 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message);  }  
}

发布/订阅模式

在发布/订阅模式中,多了一个 Exchange 角色。Exchange 常见有三种类型,分别代表不同的路由规则

  • Fanout: 广播,将消息交给所有绑定到交换机的队列 (Publish/Subscribe 模式)
  • Direct: 定向,把消息交给符合指定 Routing Key 的队列(Routing 模式)
  • Topic: 通配符,把消息交给符合 Routing pattern (路由模式) 的队列(Topics 模式)

引入依赖

我们在创建 SpringBoot 项目的时候,选上这两个依赖即可 |380

或者在依赖中加入

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-amqp</artifactId>  
</dependency>

声明

package org.example.rabbitmq.config;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.core.*;  
import org.springframework.beans.factory.annotation.Qualifier;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  @Configuration  
public class RabbitMQConfig {  /**  * 二、发布/订阅模式  * 声明队列、声明交换机、声明队列和交换机的绑定  * @return  */  @Bean("fanoutQueue1")  // @Bean注解:交给Spring进行管理, 括号里面是指定名称  public Queue fanoutQueue1() {  return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();  }  @Bean("fanoutQueue2")  public Queue fanoutQueue2() {  return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();  }  @Bean("fanoutExchange")  // 声明交换机有很多种类型:FanoutExchange、DirectExchange、TopicExchange  public FanoutExchange fanoutExchange() {  return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();  }  @Bean("fanoutQueueBinding1")  public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue) {  return BindingBuilder.bind(queue).to(fanoutExchange);  }  @Bean("fanoutQueueBinding2")  public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(fanoutExchange);  }  
}

生产者代码

image.png|276

  1. 声明队列
  2. 声明交换机
  3. 声明交换机和队列的绑定
  4. 发送消息

发送消息

package org.example.rabbitmq.controller;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  @RestController  
@RequestMapping("/producer")  
public class ProducerController {  @Autowired  private RabbitTemplate rabbitTemplate;  @RequestMapping("/fanout")  public String fanout() {  rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello spring amqp:fanout...");  return "发送成功";  }  
}

消费者代码

package org.example.rabbitmq.listener;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  @Component  
public class FanoutListener {  @RabbitListener(queues = Constants.FANOUT_QUEUE1)  public void queueListener1(String message) {  System.out.println("队列[" + Constants.FANOUT_QUEUE1 + "] 接收到消息:" + message);  }  @RabbitListener(queues = Constants.FANOUT_QUEUE2)  public void queueListener2(String message) {  System.out.println("队列[" + Constants.FANOUT_QUEUE2 + "] 接收到消息:" + message);  }  
}

运行程序

  1. 运行项目,调用接口发送消息
    • http://127.0.0.1:8080/producer/fanout
    • image.png

image.png

  1. 监听类收到消息,并打印
    image.png

路由模式

交换机类型为 Direct 时,会把消息交给符合指定 Routing Key 的队列

  • 队列和交换机的绑定,不是任意的绑定了,而是要制定一个 RoutingKey(路由 key
  • 消息的发送方在向 Exchange 发送消息时,也需要指定消息的 RoutingKey
  • Exchange 也不再把消息交给每一个绑定的 key,而是根据消息的 RoutingKey 进行判断,只有队列的 RoutingKey 和消息的 RoutingKey 完全一致,才会接收消息

image.png|315

声明

按照这个图片,进行绑定image.png|385

/**  * 三、 路由模式  * 声明队列、声明交换机、声明队列和交换机的绑定  * @return  */  
@Bean("directQueue1")  
public Queue directQueue1(){  return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();  
}  @Bean("directQueue2")  
public Queue directQueue2(){  return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();  
}  @Bean("directExchange")  
// 声明交换机有很多种类型:FanoutExchange、DirectExchange、TopicExchange  
public DirectExchange directExchange() {  return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();  
}  @Bean("directQueueBinding1")  
public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue) {  return BindingBuilder.bind(queue).to(directExchange).with("a");  
}  @Bean("directQueueBinding2")  
public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(directExchange).with("a");  
}  @Bean("directQueueBinding3")  
public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(directExchange).with("b");  
}  @Bean("directQueueBinding4")  
public Binding directQueueBinding4(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(directExchange).with("c");  
}

生产者代码

package org.example.rabbitmq.controller;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.PathVariable;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  @RestController  
@RequestMapping("/producer")  
public class ProducerController {  @Autowired  private RabbitTemplate rabbitTemplate;  /**  * 三、路由模式  * @param routingKey  * @return  */  @RequestMapping("/direct/{routingKey}")  //从路径中拿到这个routingKey  public String direct(@PathVariable("routingKey") String routingKey) {  rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey,"hello spring amqp:direct, my routing key is" + routingKey);  return "发送成功";  }  
}

消费者代码

package org.example.rabbitmq.listener;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.stereotype.Component;  @Component  
public class DirectListener {  @RabbitListener(queues = Constants.DIRECT_QUEUE1)  public void queueListener1(String message) {  System.out.println("队列[" + Constants.DIRECT_QUEUE1 + "] 接收到消息:" + message);  }  @RabbitListener(queues = Constants.DIRECT_QUEUE2)  public void queueListener2(String message) {  System.out.println("队列[" + Constants.DIRECT_QUEUE2 + "] 接收到消息:" + message);  }  
}

运行程序

  1. 运行项目

  2. 调用接口发送 routingKeya 的消息

    • http://127.0.0.1:8080/producer/direct/a
    • 观察后端日志,队列 1 和 2 都收到消息 image.png
  3. 调用接口发送 routingKeyb 的消息

    • http://127.0.0.1:8080/producer/direct/b
    • 观察后端日志,队列 2 收到消息image.png|347
  4. 调用接口发送 routingKeyc 的消息

    • http://127.0.0.1:8080/producer/direct/c
    • 观察后端日志,队列 2 收到消息|372

通配符模式

TopicsRouting 模式的区别是:

  1. topics 模式使用的交换机类型为 topicRouting 模式使用的是 direct
  2. topic 类型的交换机在匹配规则上进行了扩展,Binding Key 支持通配符匹配

image.png|419

  • * 表示一个单词
  • # 表示多个单词

声明

/**  * 四、通配符模式  * 声明队列、声明交换机、声明队列和交换机的绑定  * @return  */  
@Bean("topicQueue1")  
public Queue topicQueue1(){  return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();  
}  @Bean("topicQueue2")  
public Queue topicQueue2(){  return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();  
}  @Bean("topicExchange")  
public TopicExchange topicExchange() {  return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();  
}  @Bean("topicQueueBinding1")  
public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue) {  return BindingBuilder.bind(queue).to(topicExchange()).with("*.a.*");  
}  @Bean("topicQueueBinding2")  
public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(topicExchange()).with("*.*.b");  
}  @Bean("topicQueueBinding3")  
public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) {  return BindingBuilder.bind(queue).to(topicExchange()).with("c.#");  
}

生产者代码

package org.example.rabbitmq.controller;  import org.example.rabbitmq.constant.Constants;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.PathVariable;  
import org.springframework.web.bind.annotation.RequestMapping;  
import org.springframework.web.bind.annotation.RestController;  @RestController  
@RequestMapping("/producer")  
public class ProducerController {  @Autowired  private RabbitTemplate rabbitTemplate;  /**  * 四、通配符模式  * @param routingKey  * @return  */  @RequestMapping("/topic/{routingKey}")  public String topic(@PathVariable("routingKey") String routingKey) {  rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey, "hello spring amqp:topic, my routing key is " + routingKey);  return "发送成功";  }  
}

消费者代码

运行程序

  1. 运行程序

  2. 调用接口发送 routingKeyqqq.a.b 的消息

    • http://127.0.0.1:8080/producer/topic/qqq.a.b
    • 观察后端日志,队列 1 和队列 2 均收到消息image.png|435
  3. 调用接口发送 routingKeyc.abc.fff 的消息

    • http://127.0.0.1:8080/producer/topic/c.abc.fff
    • 观察后端日志,队列 2 收到信息image.png
  4. 调用接口发送 routingKeyg.h.j 的消息

    • http://127.0.0.1:8080/producer/topic/g.h.j
    • 观察后端日志,没有队列收到消息

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词