欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > SpringBoot 整合 RabbitMQ 的使用

SpringBoot 整合 RabbitMQ 的使用

2025/5/5 23:46:50 来源:https://blog.csdn.net/cmh1008611/article/details/142795233  浏览:    关键词:SpringBoot 整合 RabbitMQ 的使用

一、RabbitTemplate 的使用

1.【导入依赖】

<!-- rabbitMQ -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.6.1</version>
</dependency>

2.【添加配置】

rabbitmq:host:   #ip地址port: 5672 #端口username: guestpassword: guestvirtual-host: /listener:simple:prefetch: 1 # 默认每次取出一条消息消费, 消费完成取下一条acknowledge-mode: manual # 设置消费端手动ack确认retry:enabled: true # 是否支持重试publisher-confirm-type: correlated  #确认消息已发送到交换机(Exchange)publisher-returns: true  #确认消息已发送到队列(Queue)

3.【点对点通信(队列模式)(Point-to-Point Messaging)】

使用方式:

这种方式也被称为队列(Queue)模型。消息发送者(Producer)发送消息到队列,然后消息接收者(Consumer)从队列中获取消息进行处理。这种模型下,每个消息只有一个消费者可以接收,确保消息的可靠传递和顺序处理。

代码示例:

生产者
/*** 第一种模型: 简单模型* 一个消息生产者  一个队列  一个消费者* @return*/@GetMapping("hello/world")public void helloWorld() {SysUser sysUser = new SysUser();// 发送消息// 第一个参数: String routingKey 路由规则 【交换机 和队列的绑定规则 】  队列名称// 第二个参数: object message 消息的内容
//        rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!");///  MessagePostProcessor 消息包装器  如果需要对消息进行包装rabbitTemplate.convertAndSend("hello_world_queue", "hello world rabbit!", message -> {// 设置唯一的标识message.getMessageProperties().setMessageId(UUID.randomUUID().toString());return message;});
消费者
import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;@Component
@Log4j2
public class HelloWorldConsumer {@Autowiredprivate StringRedisTemplate redisTemplate;/*** 监听 hello_world_queue 队列消费消息* queues 监听队列的名称  要求这个队列必须是已经存在的队列* queuesToDeclare 监听队列 如果这个队列不存在 则 rabbitMQ 中 RabbitAdmin 会帮助去构建这个队列*/@RabbitListener(queuesToDeclare = @Queue("hello_world_queue"))public void helloWorldConsumer(String msg, Message message, Channel channel) {// 获取消息的唯一标识String messageId = message.getMessageProperties().getMessageId();// 将消息添加到 Redis的set集合中  set 不能重复的  方法的返回值 添加成功的数量Long count = redisTemplate.opsForSet().add("hello_world_queue", messageId);if (count != null && count == 1) {// 没有消费过   正常消费log.info("hello_world_queue队列消费者接收到了消息,消息内容:{}", message);}}}

4.【发布/订阅模式(Publish/Subscribe Messaging)】

使用方式:

在发布/订阅模式中,消息发送者将消息发布到交换机(Exchange),而不是直接发送到队列。交换机负责将消息路由到一个或多个绑定的队列中。每个订阅者(Subscriber)可以选择订阅它感兴趣的消息队列,从而接收消息。

代码示例:

生产者

/*** 工作队列* 一个生产者  一个队列  多个消费者*/
@GetMapping("work/queue")
public void workQueue() {for (int i = 1; i <= 10; i++) {rabbitTemplate.convertAndSend("work_queue", i + "hello work queue!");}
}
消费者
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Log4j2
public class WorkQueueConsumer {/**** 消费者1* @param message*/@RabbitListener(queuesToDeclare = @Queue("work_queue"))public void workQueueConsumer(String message) throws InterruptedException {Thread.sleep(200);log.info("work_queue队列消费者1接收到了消息,消息内容:{}", message);}/**** 消费者2* @param message*/@RabbitListener(queuesToDeclare = @Queue("work_queue"))public void workQueueConsumer2(String message) throws InterruptedException {Thread.sleep(400);log.info("work_queue队列消费者2接收到了消息,消息内容:{}", message);}
}

5.【工作队列模式(Work Queues)】

使用方式:

工作队列模式也称为任务队列(Task Queues),它可以用来实现任务的异步处理。多个工作者(Worker)同时监听同一个队列,当有新的任务消息被发送到队列中时,空闲的工作者会获取并处理这些任务,确保任务能够并行处理而不会重复执行。

代码示例:

生产者
/*** 发布订阅* 一个生产者  多个队列   多个消费者   涉及 到交换机  fanout*/
@GetMapping("publish/subscribe")
public void publishSubscribe() {// 第一个参数: 交换机的名称  没有要求// 第二个参数: 交换机和队列的绑定规则    如果是发布订阅模式 那么这个规则默认不写 只需要交换机和队列绑定即可不需要规则// 第三个参数: 消息内容rabbitTemplate.convertAndSend("publish_subscribe_exchange", "","hello publisher subscribe!!");
}
消费者

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class PublisherSubscribeConsumer {private static final Logger log = LoggerFactory.getLogger(PublisherSubscribeConsumer.class);/*** 发布订阅模型消费者** @param message*/@RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_01"),exchange = @Exchange(name = "publish_subscribe_exchange",type = ExchangeTypes.FANOUT)))public void publisherSubscribe(String message) {log.info("发布订阅模型消费者1接收到了消息,消息内容:{}", message);}/*** 发布订阅模型消费者** @param message*/@RabbitListener(bindings = @QueueBinding(value = @Queue("pb_sb_queue_02"),exchange = @Exchange(name = "publish_subscribe_exchange", type = ExchangeTypes.FANOUT)))public void publisherSubscribe2(String message) {log.info("发布订阅模型消费者2接收到了消息,消息内容:{}", message);}}

6.【路由模式(Routing)】

使用方式:

路由模式允许发送者根据消息的路由键(Routing Key)将消息路由到特定的队列。发送者将消息发送到交换机,并且通过设置不同的路由键,使消息能够被交换机路由到不同的队列。消费者可以根据需要选择监听哪些队列来接收消息。

代码示例:

生产者

 

/*** 路由模型* 一个生产者  多个队列   多个消费者   涉及 到交换机  direct*/
@GetMapping("routing")
public void routing() {// 第一个参数: 交换机的名称  没有要求// 第二个参数: 交换机和队列的绑定规则    字符串 随意// 第三个参数: 消息内容rabbitTemplate.convertAndSend("routing_exchange", "aaa","hello routing!!");
}
消费者
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Log4j2
public class RoutingConsumer {/*** 路由模型消费者* @param message*/@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_01"),exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),key = { "abc", "error", "info" }))public void routingConsumer(String message) {log.info("路由模型消费者1接收到了消息,消息内容:{}", message);}/*** 路由模型消费者* @param message*/@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_02"),exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),key = { "aaa", "ccc", "waadaffas" }))public void routingConsumer2(String message) {log.info("路由模型消费者2接收到了消息,消息内容:{}", message);}/*** 路由模型消费者* @param message*/@RabbitListener(bindings = @QueueBinding(value = @Queue("routing_queue_03"),exchange = @Exchange(name = "routing_exchange", type = ExchangeTypes.DIRECT),key = { "bbbb", "asdfasd", "asdfasdf" }))public void routingConsumer3(String message) {log.info("路由模型消费者3接收到了消息,消息内容:{}", message);}}

7.【主题模式(Topics)】

使用方式:

主题模式是路由模式的一种扩展,它允许发送者根据消息的多个属性(如主题)将消息路由到一个或多个队列。主题交换机(Topic Exchange)使用通配符匹配路由键与队列绑定键的模式,从而实现更灵活的消息路由和过滤。

代码示例:

生产者

 


/*** 主题模型* 一个生产者  多个队列   多个消费者   涉及 到交换机  topic*/
@GetMapping("topic")
public void topic() {// 第一个参数: 交换机的名称  没有要求// 第二个参数: 交换机和队列的绑定规则    多个单词  以 “.” 拼起来// 第三个参数: 消息内容rabbitTemplate.convertAndSend("topic_exchange", "bwie.age.name","hello topic!!");
}
消费者
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Log4j2
public class TopicConsumer {/*** *  表示任意一个单词* #  表示任意一个单词 或 多个*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_01"),exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),key = { "abc.*", "error.*.info", "#.name" }))public void topicConsumer(String message) {log.info("xxxxxxxxx1");}/*** *  表示任意一个单词* #  表示任意一个单词 或 多个*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_02"),exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),key = { "abc.*", "username" }))public void topicConsumer2(String message) {log.info("xxxxxxxxx2");}/*** *  表示任意一个单词* #  表示任意一个单词 或 多个*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic_queue_03"),exchange = @Exchange(name = "topic_exchange", type = ExchangeTypes.TOPIC),key = { "bwie.*", "error.*.info" }))public void topicConsumer3(String message) {log.info("xxxxxxxxx3");}}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词