文章目录
- 项目地址
- 一、RabbitMQ
- 1.1 基础概念
- 1. 基础组成
- 2. 死信队列,延迟队列
- 3. 交换机类型
- 4. 3种队列
- 5. 消息持久化
- 6. 消息优先级
- 1.2 问题
- 1. 为什么使用MQ/有什么缺点
- 2. 保证消息的可靠性(一)
- 4. 如何保证高可用
- 5. 为什么消息会重复?如何解决
- 6. 消息积压如何解决
- 7. 如何实现顺序消费
- 8. 如何解决消息队列延迟以及过期失效?
- 9. 队列满了怎么办
- 10. 线上几百万消息积压了几小时怎么解决
- 11. 顺序消费
- 1.3 exchange的使用
- 1. Fanout
- 2. Direct
- 3. Topic
- 1.4 消息确认机制
- 1. 消息丢失原因
- 2. 事务的方式发送消息(用的比较少)
- 3. confirm模式
- 4. 异步ack机制
- 1.5 高可用集群
- 1.6 消息可靠性
- 1. 消息可以到达broker
- 2. exchange到queue
- 3. MQ 重启
- 4. 消息投递到消费者手里
- 1.7 消息幂等
- 1. 消息重复原因
- 2. 唯一消息ID + 去重表
- 3. 乐观锁
项目地址
- 教程作者:
- 教程地址:
- 代码仓库地址:
- 所用到的框架和插件:
dbt
airflow
一、RabbitMQ
- 课程地址
https://www.bilibili.com/video/BV1JG4y187iv?spm_id_from=333.788.player.switch&vd_source=791e6deaa9c8a56b1f845a0bc1431b71
1.1 基础概念
1. 基础组成
- broker:节点
- queue: Once a message is consumed by one consumer, other consumers will not be able to receive it. 如果需要广播数据,可以一个交换机通过Key绑定多个队列,再由多个消费者订阅这些队列的方式。
- exchange: 消息投递到交换机,交换机通过规则投递到一个或者多个queue中
- routingkey:生产者发送消息时候,会带着routing-key发送给exchange,这个key通过binding key 结合使用,可以控制消息的流向。
- bingding key:路由器和queue关联作用,这样exchange就可以指定queue了
- channel :每个Channel表示一个会话任务,可建立多个channel.
2. 死信队列,延迟队列
死信队列(Dead Letter Queue):
设置:在设置了死信队列的时候,死信消息会被丢在死信队列里,不设置,消息会被丢弃;同一个项目使用一个死信交换机,然后给每个业务单独的分配一个routing key即可
死信消息:①手动被消费者拒绝或者设为死信消息;②超过了设置消息过期时间的;③超过设置的queue的长度和大小的消息;
延迟队列():
- 死信+TTL:设置消息5分钟过期,不消费,会被放入死信队列里,然后在通过
3. 交换机类型
- 直连交换机(Direct Exchange ):将消息路由刀routing key 完全匹配的队列,routing key =1 只会发送到消费者端口routing key =1 的消费者
- Topic:根据routing key的通匹符,模糊匹配
- Fanout:将消息广播到所有交换机绑定的队列,忽略routing key
- Headers: 根据消息头属性进行匹配
- 如果不指定使用的交换机,MQ默认生成7个不同的交换机
4. 3种队列
classic 队列: 先进先出
5. 消息持久化
- 设置交换机持久化为true
- 设置队列持久化为true
- 设置消息发送持久化为true
6. 消息优先级
- 只有在消息堆积的情况下,才会生效
- 设置queue的
x-max-priority
- 发送消息时,带上消息的优先级
- 消费端正常消费
- 设置queue的
1.2 问题
1. 为什么使用MQ/有什么缺点
- 结合项目场景:解耦,异步,消峰
缺点:
1. 复杂度提高,要保证rabbit mq的高可用
2. 一致性,消息幂等以及重复消息问题
2. 保证消息的可靠性(一)
- 消息确认机制(ACK)+ 持久化 + 死信队列 + 重试机制
生产者:
- 创建工厂连接对象,并创建AMQP 通道
2. 声明主队列,并且绑定死信交换机(设置死信交换机,死信路由)
3. 将需要发送的消息转为二进制
4. 设置消息持久化,并且发送
消费者:
- 创建连接
2. 声明主队列,并且绑定死信交换机(设置死信交换机,死信路由)
3. 设置消费者的消息预取数量(公平调度:限制消费者一次只拉取 1 条未确认消息)
4. 注册消息接收处理事件
5. 开始消费消息
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Collections.Concurrent;var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();// 1. 创建主队列 + 死信队列 + 死信交换机
channel.ExchangeDeclare("dlx_exchange", ExchangeType.Direct, durable: true);
channel.QueueDeclare("dlx_queue", durable: true,