欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 幼教 > RabbitMQ快速入门

RabbitMQ快速入门

2025/9/23 6:53:16 来源:https://blog.csdn.net/qq_45528306/article/details/148571523  浏览:    关键词:RabbitMQ快速入门

RabbitMQ快速入门

1. docker安装RabbitMQ

rabbitMQ docker镜像官网:https://hub.docker.com/_/rabbitmq/

SpringBoot整合RabbitMQ参考网站:https://docs.spring.io/spring-boot/3.3/reference/messaging/amqp.html

docker安装rabbitMQ

docker run -d --name rabbitmq --restart=always -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 5672:5672 -p 15672:15672 rabbitmq:3.13.7-management

RabbitMQ结构和概念
在这里插入图片描述

RabbitMQ中的几个概念:

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源进行逻辑分组

基本消息队列消息发送流程:

  1. 建立connection;
  2. 创建channel;
  3. 利用channel声明队列;
  4. 利用channel向队列发送消息

基本消息队列消息接收流程:

  1. 建立connection;
  2. 创建channel;
  3. 利用channel声明队列;
  4. 定义consumer的消费行为handleDelivery();
  5. 利用channel将消费者与队列绑定

2. 简单队列模型

在这里插入图片描述

简单队列模型角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接收并缓存消息
  • consumer:订阅队列,处理队列中的消息

SpringBoot整合RabbitMQ(生产者)

  1. 引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置application.yml
  rabbitmq:host: 192.168.xx.xxport: 5672username: adminpassword: adminvirtual-host: /
  1. 创建RabbitMQ配置类
@Configuration
public class RabbitMQConfig {@Bean()public Queue simpleQueue(){return new Queue("simple_queue",true);}
}

对于生产者,当队列不存在时或消息发送时,均会报错。未防止误判,需提前创建Queue的Bean,若该队列不存在,则会自动创建,若存在,则不创建。

  1. 编写消息发送类
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;@Service
public class SimpleQueueService {@Resourceprivate RabbitTemplate rabbitTemplate;public Boolean send(String msg){rabbitTemplate.convertAndSend("simple_queue",msg);return true;}
}

消费者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SimpleConsumerService {@RabbitListener(queues = "simple_queue")public void receive(String msg){System.out.println("处理消息:"+msg);}
}

添加@RabbitListener注解的方法即为消费者处理函数,queues参数可以为数组(多个队列时),函数的参数类型与发送者的类型一致

3. 工作队列模型

在这里插入图片描述

consumer1和consumer2为合作关系,一起处理queue中的消息。作用:提高消息的处理速度,避免消息的堆积

生产者

public Boolean send(String msg){for (int i=0;i<50;i++){rabbitTemplate.convertAndSend("simple_queue","message "+i);}return true;
}

消费者

@Component
public class SimpleConsumerService {@RabbitListener(queues = "simple_queue")public void receiver1(String msg) throws InterruptedException {System.out.println("队列1处理消息:"+msg);Thread.sleep(10);}@RabbitListener(queues = "simple_queue")public void receiver2(String msg) throws InterruptedException {System.out.println("队列2处理消息:"+msg);Thread.sleep(20);}
}

默认情况下,队列会以轮询的方式将消息均分给消费者,如果想实现消费者按需处理,可设置prefetch的值

spring:application:name: hello-worldrabbitmq:host: 192.168.5.3port: 5672username: adminpassword: adminvirtual-host: /listener:simple:prefetch: 1 #工作队列的消费者每次可取1条消息,处理完成后再取下一条

4. 发布订阅模型

简单队列模型和工作队列模型只有一个队列,消息只能提交给一个消费者处理。发布订阅模型通过交换机(exchange)可实现将消息推送到多个队列,交给多个消费者处理。
在这里插入图片描述

常见的exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

exchange只负责消息的转发,而不是存储,路由失败则消息丢失

4.1 Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个与其绑定的queue

在这里插入图片描述

案例

  1. 在Consumer服务中添加配置类,声明FanoutExchange和队列Queue,同时绑定FanoutExchange和Queue
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {
//    声明exchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanout");}
//      声明队列1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1",true);}//      声明队列2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2",true);}
//    绑定队列1和交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//    绑定队列2和交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
  1. 编写消费者
public class SimpleConsumerService {@RabbitListener(queues = "fanout.queue1")public void receiver1(String msg) throws InterruptedException {System.out.println("队列1处理消息:"+msg);Thread.sleep(10);}@RabbitListener(queues = "fanout.queue2")public void receiver2(String msg) throws InterruptedException {System.out.println("队列2处理消息:"+msg);Thread.sleep(20);}
}

3.编写生产者

@Service
public class SimpleQueueService {@Resourceprivate RabbitTemplate rabbitTemplate;public Boolean send(String msg){
//        rabbitTemplate.convertAndSend("simple_queue",msg);for (int i=0;i<10;i++){rabbitTemplate.convertAndSend("fanout","","message "+i);}return true;}
}

4.2 DirectExchange

DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模型。

在这里插入图片描述

  • 每一个Queue都与Exchange设置一个BindingKey(Exchange和Queue默认BindingKey为交换机或Queue名称)
  • 生产者发布消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例

  1. 在consumer服务中声明两个消费者,分别监听queue1和queue2,并利用@RabbitListener声明Exchange,Queue、RoutingKey
@Component
public class SimpleConsumerService {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "direct",type = ExchangeTypes.DIRECT),key = {"red","blue"})) 	//key为数组,为Queue设置的RoutingKeypublic void receiver1(String msg) throws InterruptedException {System.out.println("队列1处理消息:"+msg);Thread.sleep(10);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))	//key为数组,为Queue设置的RoutingKeypublic void receiver2(String msg) throws InterruptedException {System.out.println("队列2处理消息:"+msg);Thread.sleep(20);}
}
  1. 编写生产者
@Service
public class SimpleQueueService {@Resourceprivate RabbitTemplate rabbitTemplate;public Boolean send(String msg){for (int i=0;i<10;i++){rabbitTemplate.convertAndSend("direct","blue","message "+i);}return true;}
}

4.3 TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以==.==分割

在这里插入图片描述

Queue与Exchange指定BingKey时可以使用通配符:

#:代指0个或者多个单词

*:代指一个单词

案例

  1. 消费者
@Component
public class SimpleConsumerService {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),key = "hunan.#"))public void receiver1(String msg) throws InterruptedException {System.out.println("队列1处理消息:"+msg);Thread.sleep(10);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void receiver2(String msg) throws InterruptedException {System.out.println("队列2处理消息:"+msg);Thread.sleep(20);}
}
  1. 生产者
 public Boolean send(String msg){for (int i=0;i<10;i++){rabbitTemplate.convertAndSend("topic","hunan.flowers","message "+i);}return true;
}

5. SpringAMQP消息转化器

Spring的消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:

  1. 在publisher服务中引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.11.3</version>
</dependency>
  1. 在publisher服务中声明MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词