RabbitMQ快速入门
1. docker安装RabbitMQ
rabbitMQ docker镜像官网:https://hub.docker.com/_/rabbitmq/
SpringBoot整合RabbitMQ参考网站:https://docs.spring.io/spring-boot/3.3/reference/messaging/amqp.html
docker安装rabbitMQ
docker run -d --name rabbitmq --restart=always -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 5672:5672 -p 15672:15672 rabbitmq:3.13.7-management
RabbitMQ结构和概念
RabbitMQ中的几个概念:
- channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,是对queue、exchange等资源进行逻辑分组
基本消息队列消息发送流程:
- 建立connection;
- 创建channel;
- 利用channel声明队列;
- 利用channel向队列发送消息
基本消息队列消息接收流程:
- 建立connection;
- 创建channel;
- 利用channel声明队列;
- 定义consumer的消费行为handleDelivery();
- 利用channel将消费者与队列绑定
2. 简单队列模型
简单队列模型角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接收并缓存消息
- consumer:订阅队列,处理队列中的消息
SpringBoot整合RabbitMQ(生产者)
- 引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置application.yml
rabbitmq:host: 192.168.xx.xxport: 5672username: adminpassword: adminvirtual-host: /
- 创建RabbitMQ配置类
@Configuration
public class RabbitMQConfig {@Bean()public Queue simpleQueue(){return new Queue("simple_queue",true);}
}
对于生产者,当队列不存在时或消息发送时,均会报错。未防止误判,需提前创建Queue的Bean,若该队列不存在,则会自动创建,若存在,则不创建。
- 编写消息发送类
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;@Service
public class SimpleQueueService {@Resourceprivate RabbitTemplate rabbitTemplate;public Boolean send(String msg){rabbitTemplate.convertAndSend("simple_queue",msg);return true;}
}
消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SimpleConsumerService {@RabbitListener(queues = "simple_queue")public void receive(String msg){System.out.println("处理消息:"+msg);}
}
添加@RabbitListener注解的方法即为消费者处理函数,queues参数可以为数组(多个队列时),函数的参数类型与发送者的类型一致
3. 工作队列模型
consumer1和consumer2为合作关系,一起处理queue中的消息。作用:提高消息的处理速度,避免消息的堆积
生产者
public Boolean send(String msg){for (int i=0;i<50;i++){rabbitTemplate.convertAndSend("simple_queue","message "+i);}return true;
}
消费者
@Component
public class SimpleConsumerService {@RabbitListener(queues = "simple_queue")public void receiver1(String msg) throws InterruptedException {System.out.println("队列1处理消息:"+msg);Thread.sleep(10);}@RabbitListener(queues = "simple_queue")public void receiver2(String msg) throws InterruptedException {System.out.println("队列2处理消息:"+msg);Thread.sleep(20);}
}
默认情况下,队列会以轮询的方式将消息均分给消费者,如果想实现消费者按需处理,可设置prefetch的值
spring:application:name: hello-worldrabbitmq:host: 192.168.5.3port: 5672username: adminpassword: adminvirtual-host: /listener:simple:prefetch: 1 #工作队列的消费者每次可取1条消息,处理完成后再取下一条
4. 发布订阅模型
简单队列模型和工作队列模型只有一个队列,消息只能提交给一个消费者处理。发布订阅模型通过交换机(exchange)可实现将消息推送到多个队列,交给多个消费者处理。
常见的exchange类型包括:
- Fanout:广播
- Direct:路由
- Topic:话题
exchange只负责消息的转发,而不是存储,路由失败则消息丢失
4.1 Fanout Exchange
Fanout Exchange会将接收到的消息路由到每一个与其绑定的queue
案例
- 在Consumer服务中添加配置类,声明FanoutExchange和队列Queue,同时绑定FanoutExchange和Queue
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {
// 声明exchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanout");}
// 声明队列1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1",true);}// 声明队列2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2",true);}
// 绑定队列1和交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 绑定队列2和交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
- 编写消费者
public class SimpleConsumerService {@RabbitListener(queues = "fanout.queue1")public void receiver1(String msg) throws InterruptedException {System.out.println("队列1处理消息:"+msg);Thread.sleep(10);}@RabbitListener(queues = "fanout.queue2")public void receiver2(String msg) throws InterruptedException {System.out.println("队列2处理消息:"+msg);Thread.sleep(20);}
}
3.编写生产者
@Service
public class SimpleQueueService {@Resourceprivate RabbitTemplate rabbitTemplate;public Boolean send(String msg){
// rabbitTemplate.convertAndSend("simple_queue",msg);for (int i=0;i<10;i++){rabbitTemplate.convertAndSend("fanout","","message "+i);}return true;}
}
4.2 DirectExchange
DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模型。
- 每一个Queue都与Exchange设置一个BindingKey(Exchange和Queue默认BindingKey为交换机或Queue名称)
- 生产者发布消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
案例
- 在consumer服务中声明两个消费者,分别监听queue1和queue2,并利用@RabbitListener声明Exchange,Queue、RoutingKey
@Component
public class SimpleConsumerService {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "direct",type = ExchangeTypes.DIRECT),key = {"red","blue"})) //key为数组,为Queue设置的RoutingKeypublic void receiver1(String msg) throws InterruptedException {System.out.println("队列1处理消息:"+msg);Thread.sleep(10);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"})) //key为数组,为Queue设置的RoutingKeypublic void receiver2(String msg) throws InterruptedException {System.out.println("队列2处理消息:"+msg);Thread.sleep(20);}
}
- 编写生产者
@Service
public class SimpleQueueService {@Resourceprivate RabbitTemplate rabbitTemplate;public Boolean send(String msg){for (int i=0;i<10;i++){rabbitTemplate.convertAndSend("direct","blue","message "+i);}return true;}
}
4.3 TopicExchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以==.==分割
Queue与Exchange指定BingKey时可以使用通配符:
#:代指0个或者多个单词
*:代指一个单词
案例
- 消费者
@Component
public class SimpleConsumerService {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),key = "hunan.#"))public void receiver1(String msg) throws InterruptedException {System.out.println("队列1处理消息:"+msg);Thread.sleep(10);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void receiver2(String msg) throws InterruptedException {System.out.println("队列2处理消息:"+msg);Thread.sleep(20);}
}
- 生产者
public Boolean send(String msg){for (int i=0;i<10;i++){rabbitTemplate.convertAndSend("topic","hunan.flowers","message "+i);}return true;
}
5. SpringAMQP消息转化器
Spring的消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,步骤如下:
- 在publisher服务中引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.11.3</version>
</dependency>
- 在publisher服务中声明MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}