欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > Springboot仿抖音app开发之RabbitMQ 异步解耦(进阶)

Springboot仿抖音app开发之RabbitMQ 异步解耦(进阶)

2025/6/23 15:35:23 来源:https://blog.csdn.net/xuziang13/article/details/148746303  浏览:    关键词:Springboot仿抖音app开发之RabbitMQ 异步解耦(进阶)

Springboot仿抖音app开发之评论业务模块后端复盘及相关业务知识总结

Springboot仿抖音app开发之粉丝业务模块后端复盘及相关业务知识总结

Springboot仿抖音app开发之用短视频务模块后端复盘及相关业务知识总结

Springboot仿抖音app开发之用户业务模块后端复盘及相关业务知识总结

Springboot仿抖音app开发之消息业务模块后端复盘及相关业务知识总结 

为什么需要接口解耦

1. 数据重要性分级处理

在实际业务系统中,数据通常被分为不同重要级别:

重要数据(关键业务数据)

  • 用户账户信息、交易记录、订单数据
  • 需要强一致性和ACID特性
  • 通常存储在关系型数据库(MySQL、PostgreSQL等)

非重要数据(辅助业务数据)

  • 用户行为日志、消息通知、统计数据
  • 可以容忍最终一致性
  • 适合存储在NoSQL数据库(MongoDB、Redis等)

2. 接口解耦的核心优势

故障隔离

  • 重要数据操作失败不影响非重要数据的处理
  • MongoDB服务异常不会阻塞核心业务流程
  • 提高系统整体可用性

为什么MongoDB服务异常会阻塞核心业务流程 

问题场景分析

1. 未解耦的情况(会阻塞核心业务)

问题

  • 如果MongoDB服务异常,整个订单处理流程都会失败
  • 核心的订单数据无法保存,影响业务连续性
  • 一个非关键功能的故障导致关键业务无法进行

2. 故障隔离的好处

业务连续性保障

  • 核心业务(订单创建)不受MongoDB故障影响
  • 用户可以正常下单,不会感知到系统部分组件的异常

系统健壮性提升

  • 不同重要级别的数据采用不同的处理策略
  • 非关键功能的故障不会造成系统雪崩

运维友好

  • 可以独立维护和升级MongoDB服务
  • MongoDB的性能调优不会影响核心业务

3. 实际案例

假设一个电商系统:

核心流程:用户下单 → 扣减库存 → 生成订单 → 扣款 辅助功能:记录用户行为 → 发送消息通知 → 更新推荐算法数据

如果没有解耦,MongoDB异常会导致:

  • 用户无法下单
  • 订单系统完全瘫痪
  • 收入损失

解耦后的效果:

  • 用户正常下单和支付
  • 部分通知功能暂时不可用(用户基本无感知)
  • 系统稳定运行,收入不受影响

工厂与批发商的故事

如果没有中间件(微信群)生产者给消费者发消息需要逐个去发送对应的消息,有了中间件之后只需要 统一发送就行,消费者去找自己对应的消息

 RabbitMQ

 

1. 异步任务处理

  • 场景:耗时操作(如发送邮件、生成报表、图片处理)不适合阻塞主流程。
  • 实现:将任务放入消息队列,由消费者异步处理。
  • 优势
    • 主程序快速响应(如用户注册后立即返回,邮件发送由队列处理)。
    • 避免因任务失败导致主流程崩溃。

2. 系统提速

  • 场景:高延迟操作(如数据库写入、第三方API调用)拖慢整体性能。
  • 实现:主程序发布消息后立即返回,消费者逐步处理。
  • 示例
    • 电商下单后,库存扣减和日志记录通过队列异步执行。
    • 吞吐量提升:队列充当缓冲区,允许系统以最大承受速度处理任务。

3. 接口解耦

  • 场景:系统间直接调用导致强依赖(如支付系统与物流系统)。
  • 实现:通过消息队列间接通信,生产者无需知道消费者细节。
  • 优势
    • 系统可独立扩展或升级(如新增一个消费者不会影响生产者)。
    • 协议灵活性:不同语言/框架的系统可通过标准协议(如AMQP)交互。

4. 流量削峰(Peak Shaving)

  • 场景:突发流量(如秒杀活动)可能压垮后端服务。
  • 实现:将请求放入队列,消费者按固定速率处理。
  • 关键点
    • 队列缓冲超载请求,避免服务崩溃。
    • 配合限流策略(如设置队列最大长度),保证系统稳定。

核心组件及其关系

1. 生产者 (Producer)

  • 作用: 消息的发送方,类似于写信的人
  • 职责: 创建消息并发送到交换机
  • 特点: 生产者不直接将消息发送给队列,而是发送给Exchange

2. 交换机 (Exchange)

  • 作用: 消息的邮局/分拣中心
  • 职责: 接收生产者的消息,根据路由规则决定消息发送到哪个队列
  • 类型:
    • Direct: 精确匹配路由键
    • Topic: 支持通配符匹配 (* 和 #)
    • Fanout: 广播到所有绑定的队列
    • Headers: 根据消息头属性路由

3. 队列 (Queue)

  • 作用: 消息的邮箱
  • 职责: 存储消息,等待消费者来获取
  • 特点: 先进先出(FIFO)的数据结构

4. 消费者 (Consumer)

  • 作用: 消息的接收方,类似于收信的人
  • 职责: 从队列中获取并处理消息

工作流程

生产者 → Exchange → Queue → 消费者↓        ↓        ↓        ↓写信   →  邮局   → 邮箱  →  收信人

绑定关系 (Binding)

绑定是连接Exchange和Queue的路由规则

// 您代码中的绑定示例
.bind(queue)           // 绑定队列
.to(exchange)          // 到交换机  
.with("sys.msg.*")     // 路由键规则

这意味着:

  • 当生产者发送路由键为 sys.msg.login 的消息时 → 会路由到 queue_sys_msg
  • 当发送路由键为 user.info.update 的消息时 → 不会路由到此队列

实际应用场景举例

场景: 系统消息通知

  1. 生产者: 用户服务

    // 发送登录消息
    rabbitTemplate.convertAndSend("exchange_msg", "sys.msg.login", "用户张三登录");
    
  2. Exchange: exchange_msg (Topic类型)

    • 接收到路由键 sys.msg.login 的消息
  3. 路由判断:

    • sys.msg.login 匹配 sys.msg.* 规则 ✅
    • 消息被路由到 queue_sys_msg
  4. 队列: queue_sys_msg

    • 存储消息等待处理
  5. 消费者: 系统通知服务

    @RabbitListener(queues = "queue_sys_msg")
    public void handleSysMessage(String message) {// 处理系统消息logger.info("收到系统消息: " + message);
    }
    

关键理解点

  1. 解耦性: 生产者不需要知道具体哪个消费者会处理消息
  2. 灵活性: 通过不同的Exchange类型和绑定规则,可以实现各种消息路由策略
  3. 可靠性: 消息持久化、队列持久化确保消息不会丢失
  4. 扩展性: 可以轻松添加新的队列和消费者

集成Rabbitmq - 引入配置和依赖

        <!-- 引入 RabbitMQ 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: adminvirtual-host: imooc-red-book

集成Rabbitmq - 创建交换机和队列

 我们来看完整代码

package com.imooc;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {/*** 根据模型编写代码:* 1. 定义交换机* 2. 定义队列* 3. 创建交换机* 4. 创建队列* 5. 队列和交换机的绑定*/public static final String EXCHANGE_MSG = "exchange_msg";public static final String QUEUE_SYS_MSG = "queue_sys_msg";@Bean(EXCHANGE_MSG)public Exchange exchange() {return ExchangeBuilder                      // 构建交换机.topicExchange(EXCHANGE_MSG)        // 使用topic类型,参考:https://www.rabbitmq.com/getstarted.html.durable(true)                      // 设置持久化,重启mq后依然存在.build();}@Bean(QUEUE_SYS_MSG)public Queue queue() {return new Queue(QUEUE_SYS_MSG);}@Beanpublic Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("sys.msg.*")          // 定义路由规则(requestMapping).noargs();// FIXME: * 和 # 分别代表什么意思?}}

Spring Boot自动创建机制

当Spring容器启动时,会自动扫描所有标注了@Bean的方法,并将返回值注册到Spring容器中。对于RabbitMQ组件,Spring AMQP会自动检测这些Bean并在RabbitMQ服务器上创建对应的实体。

代码执行流程

1. 创建交换机

@Bean(EXCHANGE_MSG)
public Exchange exchange() {return ExchangeBuilder.topicExchange(EXCHANGE_MSG)    // 创建名为"exchange_msg"的Topic交换机.durable(true)                  // 持久化,服务器重启后不会丢失.build();
}

2. 创建队列

@Bean(QUEUE_SYS_MSG)
public Queue queue() {return new Queue(QUEUE_SYS_MSG);    // 创建名为"queue_sys_msg"的队列
}

3. 建立绑定关系

@Bean
public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue)                    // 绑定队列.to(exchange)                   // 到交换机.with("sys.msg.*")              // 使用路由键模式.noargs();
}

Topic交换机的路由规则

关于您代码中的FIXME注释,Topic类型的通配符含义:

  • *(星号): 匹配一个单词

    • sys.msg.* 可以匹配:sys.msg.loginsys.msg.logoutsys.msg.error
    • 不能匹配:sys.msg.user.login(因为有两个单词)
  • #(井号): 匹配零个或多个单词

    • sys.msg.# 可以匹配:sys.msgsys.msg.loginsys.msg.user.login.success

集成Rabbitmq - 创建生产者,配置路由规则

1. 添加依赖 (pom.xml)

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2. 配置文件 (application.yml)

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /

3. RabbitMQ配置类 (已有)

@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_MSG = "exchange_msg";public static final String QUEUE_SYS_MSG = "queue_sys_msg";// 创建Topic交换机@Bean(EXCHANGE_MSG)public Exchange exchange() {return ExchangeBuilder.topicExchange(EXCHANGE_MSG).durable(true).build();}// 创建队列@Bean(QUEUE_SYS_MSG)public Queue queue() {return new Queue(QUEUE_SYS_MSG);}// 绑定关系:队列绑定到交换机,使用路由规则@Beanpublic Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("sys.msg.*")  // 关键:路由键规则.noargs();}
}
1. 核心常量定义
public static final String EXCHANGE_MSG = "exchange_msg";
public static final String QUEUE_SYS_MSG = "queue_sys_msg";
  • 作用: 定义交换机和队列的名称常量
  • 好处: 避免硬编码,便于维护和引用
2. Topic交换机配置
@Bean(EXCHANGE_MSG)
public Exchange exchange() {return ExchangeBuilder.topicExchange(EXCHANGE_MSG)    // 创建Topic类型交换机.durable(true)                  // 持久化配置.build();
}

Topic交换机特点

  • 类型: Topic Exchange(主题交换机)
  • 路由规则: 支持通配符匹配
    • * :匹配一个单词
    • # :匹配零个或多个单词
  • 持久化durable(true) 确保服务器重启后交换机不丢失
3. 队列配置
@Bean(QUEUE_SYS_MSG)
public Queue queue() {return new Queue(QUEUE_SYS_MSG);
}
  • 队列名queue_sys_msg
  • 用途: 存储系统消息
  • 默认配置: 非持久化队列(可以改为持久化)
4. 绑定关系配置
@Bean
public Binding binding(@Qualifier(EXCHANGE_MSG) Exchange exchange,@Qualifier(QUEUE_SYS_MSG) Queue queue) {return BindingBuilder.bind(queue)           // 绑定队列.to(exchange)          // 到交换机.with("sys.msg.*")     // 使用路由规则.noargs();
}

 

4. 生产者控制器

@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("produce")public Object produce() throws Exception {// 发送消息到指定交换机,使用特定路由键rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG,    // 交换机名称"sys.msg.send",                 // 路由键"我发了一个消息\~\~"                // 消息内容);return GraceJSONResult.ok();}
}

集成Rabbitmq - 消费者接受消息处理业务 

package com.imooc;import com.imooc.enums.MessageEnum;
import com.imooc.exceptions.GraceException;
import com.imooc.grace.result.ResponseStatusEnum;
import com.imooc.mo.MessageMO;
import com.imooc.service.MsgService;
import com.imooc.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RabbitMQConsumer {@Autowiredprivate MsgService msgService;@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})public void watchQueue(String payload, Message message) {log.info(payload);String routingKey = message.getMessageProperties().getReceivedRoutingKey();log.info(routingKey);}}

1. 消费者组件定义

@Slf4j
@Component
public class RabbitMQConsumer {
  • @Component: 将此类注册为Spring Bean
  • @Slf4j: 自动生成日志对象,用于记录日志

2. 队列监听

@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {
  • @RabbitListener: 监听指定队列的注解
  • queues = {RabbitMQConfig.QUEUE_SYS_MSG}: 监听名为 queue_sys_msg 的队列
  • String payload: 接收消息的具体内容
  • Message message: 完整的消息对象,包含元数据

3. 消息处理逻辑

log.info(payload);  // 打印消息内容
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
log.info(routingKey);  // 打印路由键

完整工作机制

监听机制

  1. 当应用启动时,Spring会自动扫描带有 @RabbitListener 的方法
  2. 为该方法创建一个消息监听容器
  3. 容器会持续监听 queue_sys_msg 队列

消息处理流程

  1. 接收消息: 当队列中有新消息时,自动触发 watchQueue 方法
  2. 解析内容: 获取消息的文本内容 (payload)
  3. 提取路由键: 从消息属性中获取路由键信息
  4. 记录日志: 将消息内容和路由键打印到控制台

rabbitTemplate.convertAndSend 方法详解 

这行代码是RabbitMQ异步消息发送的核心部分,让我逐个参数详细解析:

rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG,"sys.msg." + MessageEnum.FOLLOW_YOU.enValue,JsonUtils.objectToJson(messageMO)
);

参数解析

参数1: 交换机名称

RabbitMQConfig.EXCHANGE_MSG
  • 作用: 指定消息要发送到哪个交换机
  • 实际值: 通常是 "exchange_msg" (常量值)
  • 工作方式: 交换机接收所有消息,并根据路由规则分发

参数2: 路由键

"sys.msg." + MessageEnum.FOLLOW_YOU.enValue
  • 拼接结果"sys.msg.follow"
  • 作用: 决定消息如何被路由到目标队列
  • 匹配规则: 与交换机绑定时定义的模式进行匹配
    • 如: sys.msg.* 会匹配此路由键

参数3: 消息内容

JsonUtils.objectToJson(messageMO)
  • 输入messageMO (消息对象)
  • 转换过程: 对象 → JSON字符串
  • 输出示例:
{"fromUserId": "用户123","toUserId": "博主456","msgContent": null
}
  • 传输形式: 字节数组形式在网络上传输

 异步解耦 - 系统消息入库保存

阶段一:关注操作(生产者)

@Transactional
@Override
public void doFollow(String myId, String vlogerId) {// 1. 核心业务逻辑(同步)String fid = sid.nextShort();Fans fans = new Fans();// ... 设置粉丝关系fansMapper.insert(fans);  // 插入粉丝关系到数据库// 2. 构建消息对象MessageMO messageMO = new MessageMO();messageMO.setFromUserId(myId);     // 关注者messageMO.setToUserId(vlogerId);   // 被关注者// 3. 异步发送消息(关键点)rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG,"sys.msg." + MessageEnum.FOLLOW_YOU.enValue,  // 路由键:sys.msg.followJsonUtils.objectToJson(messageMO));// 事务提交,关注操作立即完成!
}

阶段二:消息消费(消费者)

@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {// 1. 解析JSON消息MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);String routingKey = message.getMessageProperties().getReceivedRoutingKey();// 2. 根据路由键判断消息类型并处理if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {// 异步执行系统消息入库msgService.createMsg(messageMO.getFromUserId(),    // 关注者IDmessageMO.getToUserId(),      // 被关注者ID  MessageEnum.FOLLOW_YOU.type,  // 消息类型:关注(1)null                         // 无额外内容);}// ... 处理其他消息类型
}

异步解耦的核心优势

1. 事务分离

// 主事务:关注业务
@Transactional  
public void doFollow() {fansMapper.insert(fans);        // 核心业务rabbitTemplate.convertAndSend(); // 发送MQ(非阻塞)
}  // 事务立即提交// 独立处理:消息入库
@RabbitListener
public void watchQueue() {msgService.createMsg();  // 在独立的事务中处理
}

2. 时序对比

传统同步方式

用户点击关注↓
[开始事务]├── 插入粉丝关系     (50ms)├── 更新互关状态     (30ms)  └── 创建系统消息     (100ms)  ← 可能慢
[提交事务]             (180ms总耗时)↓
返回成功给用户

MQ异步方式

用户点击关注↓
[开始事务]├── 插入粉丝关系     (50ms)├── 更新互关状态     (30ms)└── 发送MQ消息       (5ms)   ← 超快
[提交事务]             (85ms总耗时)↓
返回成功给用户          ← 用户立即看到结果[后台异步]└── 创建系统消息     (100ms)  ← 后台处理

3. 系统消息入库的异步处理

消息流转过程

// 发送的JSON消息
{"fromUserId": "user123","toUserId": "vlogger456"
}// 路由键
"sys.msg.follow"// 最终入库的系统消息
INSERT INTO sys_msg (from_user_id = 'user123',to_user_id = 'vlogger456', msg_type = 1,              -- FOLLOW_YOU类型msg_content = null,create_time = now()
);

容错和可靠性保障

1. 消息持久化

// RabbitMQ配置
@Bean
public Queue queue() {return QueueBuilder.durable("queue_sys_msg")  // 持久化队列.build();
}

2. 失败重试机制

@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {try {// 处理消息msgService.createMsg(...);} catch (Exception e) {log.error("处理消息失败: {}", e.getMessage());// 消息会自动重新入队重试throw e;  // 抛出异常触发重试}
}

其他相关的操作也同样进行异步解耦即可,我们已经在消费者模型中做了if判断处理

package com.imooc;import com.imooc.base.RabbitMQConfig;
import com.imooc.enums.MessageEnum;
import com.imooc.exceptions.GraceException;
import com.imooc.grace.result.ResponseStatusEnum;
import com.imooc.mo.MessageMO;
import com.imooc.service.MsgService;
import com.imooc.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class RabbitMQConsumer {@Autowiredprivate MsgService msgService;@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})public void watchQueue(String payload, Message message) {log.info(payload);MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);String routingKey = message.getMessageProperties().getReceivedRoutingKey();log.info(routingKey);if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.FOLLOW_YOU.type,null);} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_VLOG.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.FOLLOW_YOU.type,messageMO.getMsgContent());} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.COMMENT_VLOG.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.COMMENT_VLOG.type,messageMO.getMsgContent());} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.REPLY_YOU.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.REPLY_YOU.type,messageMO.getMsgContent());} else if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.LIKE_COMMENT.enValue)) {msgService.createMsg(messageMO.getFromUserId(),messageMO.getToUserId(),MessageEnum.LIKE_COMMENT.type,messageMO.getMsgContent());} else {GraceException.display(ResponseStatusEnum.SYSTEM_OPERATION_ERROR);}}}

消息流转的完整过程

1. 发送端(生产者)

// 关注操作中发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_MSG,           // 交换机:exchange_msg"sys.msg." + MessageEnum.FOLLOW_YOU.enValue,  // 路由键:sys.msg.followJsonUtils.objectToJson(messageMO)      // JSON消息
);

2. RabbitMQ路由过程

消息发送到交换机 "exchange_msg"↓
交换机根据路由键 "sys.msg.follow" 进行路由判断↓  
匹配绑定规则:queue_sys_msg 绑定了 "sys.msg.*"↓
"sys.msg.follow" 匹配 "sys.msg.*" ✅↓
消息被路由到队列 "queue_sys_msg"↓
消息在队列中等待被消费

3. 消费端(监听器)

@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})
public void watchQueue(String payload, Message message) {// 自动监听队列,有消息就触发此方法
}

监听机制的工作原理

@RabbitListener 注解的作用

@RabbitListener(queues = {RabbitMQConfig.QUEUE_SYS_MSG})

Spring Boot启动时

  1. 扫描注解: Spring扫描到 @RabbitListener 注解
  2. 创建监听容器: 为该方法创建一个 MessageListenerContainer
  3. 建立连接: 连接到RabbitMQ服务器
  4. 监听队列: 持续监听 queue_sys_msg 队列
  5. 等待消息: 进入阻塞状态,等待队列中有新消息

消息到达时

[队列中有新消息]↓
[监听容器检测到消息]↓
[自动调用 watchQueue 方法]↓
[传入消息内容和元数据]

消息处理的具体流程

参数接收

public void watchQueue(String payload, Message message) {// payload: JSON字符串内容// message: 完整的AMQP消息对象(包含属性、路由键等)
}

消息解析

// 1. 打印消息内容
log.info(payload);  // 输出:{"fromUserId":"user123","toUserId":"vlogger456"}// 2. 解析JSON为对象
MessageMO messageMO = JsonUtils.jsonToPojo(payload, MessageMO.class);// 3. 获取路由键
String routingKey = message.getMessageProperties().getReceivedRoutingKey();
log.info(routingKey);  // 输出:sys.msg.follow

业务逻辑分发

// 根据路由键判断消息类型
if (routingKey.equalsIgnoreCase("sys.msg." + MessageEnum.FOLLOW_YOU.enValue)) {// 处理关注消息:sys.msg.followmsgService.createMsg(messageMO.getFromUserId(),    // 关注者messageMO.getToUserId(),      // 被关注者MessageEnum.FOLLOW_YOU.type,  // 消息类型:1null                         // 无额外内容);
}

热搜词