欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > RocketMQ学习记录

RocketMQ学习记录

2025/6/18 22:48:45 来源:https://blog.csdn.net/weixin_42173947/article/details/143747221  浏览:    关键词:RocketMQ学习记录

服务器操作系统版本:Ubuntu 24.04

Java版本:21

Spring Boot版本:3.3.5

如果打算用GUI,虚拟机安装Ubuntu 24.04,见虚拟机安装Ubuntu 24.04及其常用软件(2024.7)_ubuntu24.04-CSDN博客icon-default.png?t=O83Ahttps://blog.csdn.net/weixin_42173947/article/details/140335522如果打算用纯命令行,见

虚拟机安装Ubuntu 24.04服务器版(命令行版)-CSDN博客icon-default.png?t=O83Ahttps://blog.csdn.net/weixin_42173947/article/details/143747375

1 Ubuntu上部署RocketMQ

这里准备两台服务器,做集群使用,一台IP是192.168.100.200,一台是192.168.100.201

1.1 安装JDK8

首先需要部署JDK8+,这里我使用了JDK8

sudo apt-get install -y openjdk-8-jdk;

1.2 下载RocketMQ

这里使用5.2.0版本

mkdir -p /home/user/softwares;
cd /home/user/softwares;
wget https://dist.apache.org/repos/dist/release/rocketmq/5.2.0/rocketmq-all-5.2.0-bin-release.zip;

1.3 解压,改文件夹名

unzip rocketmq-all-5.2.0-bin-release.zip;
mv rocketmq-all-5.2.0-bin-release rocketmq;

1.4 修改参数,减少内存消耗量

cd /home/user/softwares/rocketmq;
vim bin/runserver.sh;

-Xms4g 改为 -Xms256m,-Xmx4g 改为 -Xmx256m,-Xmn2g 改为 -Xmn128m

vim bin/runbroker.sh;

-Xmn4g 改为 -Xmn256m,-Xms8g 改为 -Xms256m, -Xmx8g 改为 -Xmx256m

1.5 启动NameServer

cd /home/user/softwares/rocketmq;
nohup sh bin/mqnamesrv &

验证namesrv是否启动成功

tail -f ~/logs/rocketmqlogs/namesrv.log;
jps -l;

1.6 启动Broker+Proxy

1.6.1 单点版

只有一个节点,最简单,最不稳定,一般用于测试

nohup sh bin/mqbroker -n <服务器IP>:9876 --enable-proxy &

1.6.2 全Master版

nohup sh bin/mqbroker -n <NameServer服务器IP1>:9876;<NameServer服务器IP2>:9876 -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-a.properties --enable-proxy &
nohup sh bin/mqbroker -n <NameServer服务器IP1>:9876;<NameServer服务器IP2>:9876 -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-b.properties --enable-proxy &

我这里IP是192.168.100.200,192.168.100.201,注意两台服务器要执行不一样的命令

nohup sh bin/mqbroker -n '192.168.100.200:9876;192.168.100.201:9876' -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-a.properties --enable-proxy &
nohup sh bin/mqbroker -n '192.168.100.100:9876;192.168.100.201:9876' -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-b.properties --enable-proxy &

验证broker是否启动成功

tail -f ~/logs/rocketmqlogs/proxy.log;
jps -l;

1.7 停止broker,namesrv

先停broker,后停namesrv

cd /home/user/softwares/rocketmq;
sh bin/mqshutdown broker;
sh bin/mqshutdown namesrv;

1.8 测试生产者,消费者

重新开启namesrv和broker,然后执行下面操作

cd /usr/local/softwares/rocketmq;
export NAMESRV_ADDR=localhost:9876;
# 生产者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer;
# 消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer;

1.9 防火墙放开RocketMQ端口

sudo ufw allow 8081/tcp;
sudo ufw allow 9876/tcp;
sudo ufw allow 10911/tcp;
sudo ufw allow 10909/tcp;
sudo ufw reload;

2 关于Topic的操作

2.1 创建Topic

cd /usr/local/softwares/rocketmq;

执行如下命令,创建一个叫TestTopic的Topic

nohup sh bin/mqadmin updatetopic -n <服务器IP>:9876 -t TestTopic -b <服务器IP>:10911 &
nohup sh bin/mqadmin updatetopic -n <服务器IP>:9876 -t TestTopic -b <服务器IP>:10911 &

注意修改IP

nohup sh bin/mqadmin updatetopic -n '192.168.100.200:9876;192.168.100.201:9876' -t TestTopic -b 192.168.100.200:10911 &
nohup sh bin/mqadmin updatetopic -n '192.168.100.200:9876;192.168.100.201:9876' -t TestTopic -b 192.168.100.201:10911 &

2.2 删除Topic

cd /usr/local/softwares/rocketmq;

执行如下命令

nohup sh bin/mqadmin deleteTopic -n <服务器IP>:9876 -c DefaultCluster -t TestTopic

注意修改IP

nohup sh bin/mqadmin deleteTopic -n 192.168.100.200:9876 -c DefaultCluster -t TestTopic
nohup sh bin/mqadmin deleteTopic -n 192.168.100.201:9876 -c DefaultCluster -t TestTopic

2.3 查看Topic

cd /usr/local/softwares/rocketmq;

执行如下命令

sh bin/mqadmin topicList -n <服务器IP>:9876

注意修改IP

sh bin/mqadmin topicList -c -n 192.168.100.200:9876;
sh bin/mqadmin topicList -c -n 192.168.100.201:9876;

3 Java连接RocketMQ环境搭建

3.1 文件树结构

3.2 父节点pom.xml 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.sliverbullet</groupId><artifactId>jdk21-maven-test</artifactId><packaging>pom</packaging><version>1.0</version><properties><maven.compiler.source>21</maven.compiler.source><maven.compiler.target>21</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>21</java.version><spring-boot.version>3.3.5</spring-boot.version></properties><modules><module>springboot3-test</module><module>rocketmq-test</module></modules><repositories><repository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases></repository></repositories><pluginRepositories><pluginRepository><id>public</id><name>aliyun nexus</name><url>https://maven.aliyun.com/repository/public</url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories>
</project>

3.3 子节点xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.sliverbullet</groupId><artifactId>jdk21-maven-test</artifactId><version>1.0</version></parent><artifactId>rocketmq-test</artifactId><properties><maven.compiler.source>21</maven.compiler.source><maven.compiler.target>21</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>21</java.version><spring-boot.version>3.3.5</spring-boot.version><fastjson2-version>2.0.53</fastjson2-version><lombok-version>1.18.34</lombok-version><rocketmq-spring-boot-starter-version>2.3.0</rocketmq-spring-boot-starter-version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${spring-boot.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>${spring-boot.version}</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>${fastjson2-version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok-version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq-spring-boot-starter-version}</version></dependency></dependencies></project>

依赖环境搭建完成

4 RocketMQ基本配置

4.1 文件树结构

4.2 application.yml的配置

server:port: 8002
spring:application:name: rocketmq-testprofiles:active: dev
machine-no: 1

4.3 application-dev.yml的配置

test,prod,自行配置

spring:logging:file:path: D:/log/SpringBoot3-Testname: ${logging.file.path}/test.log
rocketmq:name-server: 192.168.100.200:9876;192.168.100.201:9876producer:group: boot-productsend-message-timeout: 10000

4.4 RocketMQConsumer类

package com.sliverbullet.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;@Component
@Slf4j
@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "my-consumer-test-topic", consumeTimeout = 1000L)
public class RocketMQConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");LocalDateTime localDateTime = LocalDateTime.now();String formattedDate = dateTimeFormatter.format(localDateTime);log.info("RocketMQ消费者接收时间:{}" ,formattedDate);log.info("RocketMQ消费者接收内容:{}" ,message);}
}

4.5 IRocketMQService接口

package com.sliverbullet.service;import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;import java.util.List;
import java.util.Map;/*** <p>* RocektMQ生产者常用发送消息方法* 最佳实践:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md* </p>** @author MrWen* @since 2022-01-06 17:10**/
public interface IRocketMQService {/*** 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)* <p>* (send消息方法只要不抛异常,就代表发送成功。但不保证100%可靠投递(所有消息都一样,后面不在叙述)。* 要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。* 解析看:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md* )** @param destination 主题名:标签 topicName:tags* @param msg         发送对象* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendMessage(String destination, Object msg);/*** 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)** @param topicName 主题名 topicName* @param tags      标签 tags* @param msg       发送对象* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendMessage(String topicName, String tags, Object msg);/*** 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)** @param topicName 主题名 topicName* @param tags      标签 tags* @param key       唯一标识码要设置到keys字段,方便将来定位消息丢失问题* @param msg       发送对象* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendMessage(String topicName, String tags, String key, Object msg);/*** 发送同步消息-SQL92模式* 需要配置RocketMQ服务器  vim conf/broker.conf  ##支持sql语句过滤  enablePropertyFilter=true* 在console控制台查看集群状态  enablePropertyFilter=true 才正常** @param topicName 主题名 topicName* @param map       自定义属性* @param msg       发送对象* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendMessageBySql(String topicName, Map<String, Object> map, Object msg);/*** 发送同步消息-SQL92模式* 需要配置RocketMQ服务器  vim conf/broker.conf  ##支持sql语句过滤  enablePropertyFilter=true* 在console控制台查看集群状态  enablePropertyFilter=true 才正常** @param topicName 主题名 topicName* @param map       自定义属性* @param key       唯一标识码要设置到keys字段,方便将来定位消息丢失问题* @param msg       发送对象* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendMessageBySql(String topicName, Map<String, Object> map, String key, Object msg);/*** 发生异步消息(异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。)** @param destination  主题名:标签 topicName:tags* @param msg          发送对象* @param sendCallback 异步回调函数*/void sendAsyncMessage(String destination, Object msg, SendCallback sendCallback);/*** 发送单向消息(这种方式主要用在不特别关心发送结果的场景,例如日志发送。)** @param destination 主题名:标签 topicName:tags* @param msg         发送对象*/void sendOneway(String destination, Object msg);/*** 发送批量消息(发送超过1MB,做了自动分割,超时时间设置30s(默认3s)),注:默认最大是4MB,为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB** @param destination 主题名:标签 topicName:tags* @param list        批量消息*/void sendBatchMessage(String destination, List<?> list);/*** 发送批量消息(发送超过1MB,做了自动分割。),注:默认最大是4MB,为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB** @param topicName 主题名 topicName* @param tags      标签 tags* @param timeout   超时时间,空则默认设为30s* @param list      批量消息*/void sendBatchMessage(String topicName, String tags, Long timeout, List<?> list);/*** 发送延时消息(超时时间,设置30s(默认3s))* 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h* 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18** @param destination    主题名:标签 topicName:tags* @param msg            发送对象* @param delayTimeLevel 延时等级(从1开始)* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel);/*** 发送延时消息* 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h* 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18** @param destination    主题名:标签 topicName:tags* @param msg            发送对象* @param timeout        超时时间(单位毫秒)* @param delayTimeLevel 延时等级(从1开始)* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendDelayLevel(String destination, Object msg, int timeout, int delayTimeLevel);/*** 发送顺序消息(分区有序,多个queue参与,即相对每个queue,消息都是有序的。)** @param destination 主题名:标签 topicName:tags* @param msg         发送对象* @param hashKey     根据其哈希值取模后确定发送到哪一个queue队列* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendInOrder(String destination, Object msg, String hashKey);/*** 发送事务消息* 事务消息使用上的限制* 1:事务消息不支持延时消息和批量消息。* 2:为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。* 3:事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。* 4:事务性消息可能不止一次被检查或消费。* 5:提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。* 6:事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。** @param destination 主题名:标签 topicName:tags* @param msg         发送对象* @param arg         arg* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendMessageInTransaction(String destination, Object msg, Object arg);
}

4.6 RocketMQServiceImpl接口实现类

package com.sliverbullet.service.impl;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import com.sliverbullet.service.IRocketMQService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;import java.util.List;
import java.util.Map;@Slf4j
@Service
public class RocketMQServiceImpl implements IRocketMQService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Overridepublic SendResult sendMessage(String topicName, Object msg) {MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);Message<?> message = messageBuilder.build();SendResult sendResult = rocketMQTemplate.syncSend(topicName, message);if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {log.info("【RocketMQ测试】发送同步消息成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);} else {log.warn("【RocketMQ测试】发送同步消息不一定成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);}return sendResult;}@Overridepublic SendResult sendMessage(String topicName, String tags, Object msg) {MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);Message<?> message = messageBuilder.build();SendResult sendResult = rocketMQTemplate.syncSend(topicName + ":" + tags, message);if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {log.info("【RocketMQ测试】发送同步带Tag消息成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);} else {log.warn("【RocketMQ测试】发送同步带Tag消息不一定成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);}return sendResult;}@Overridepublic SendResult sendMessage(String topicName, String tags, String key, Object msg) {MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);if (StringUtils.isNotBlank(key)) {messageBuilder.setHeader(MessageConst.PROPERTY_KEYS, key);}Message<?> message = messageBuilder.build();SendResult sendResult = rocketMQTemplate.syncSend(topicName + ":" + tags, message);if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {log.info("【RocketMQ测试】发送同步带Tag和Key消息成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);} else {log.warn("【RocketMQ测试】发送同步带Tag和Key消息不一定成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);}return sendResult;}@Overridepublic SendResult sendMessageBySql(String topicName, Map<String, Object> map, Object msg) {return null;}@Overridepublic SendResult sendMessageBySql(String topicName, Map<String, Object> map, String key, Object msg) {return null;}@Overridepublic void sendAsyncMessage(String destination, Object msg, SendCallback sendCallback) {}@Overridepublic void sendOneway(String destination, Object msg) {}@Overridepublic void sendBatchMessage(String destination, List<?> list) {}@Overridepublic void sendBatchMessage(String topicName, String tags, Long timeout, List<?> list) {}@Overridepublic SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel) {Message<?> message = MessageBuilder.withPayload(msg).build();SendResult sendResult = rocketMQTemplate.syncSend(destination, message, 10000L, delayTimeLevel);if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {log.info("【RocketMQ测试】发送延时消息成功, destination: {}, msg: {}, sendResult: {}", destination, message, sendResult);} else {log.warn("【RocketMQ测试】发送延时消息不一定成功, destination: {}, msg: {}, sendResult: {}", destination, message, sendResult);}return sendResult;}@Overridepublic SendResult sendDelayLevel(String destination, Object msg, int timeout, int delayTimeLevel) {return null;}@Overridepublic SendResult sendInOrder(String destination, Object msg, String hashKey) {return null;}@Overridepublic SendResult sendMessageInTransaction(String destination, Object msg, Object arg) {return null;}
}

5 正式测试

5.1 基本同步消息测试

传入一个JSON,同步进入消息队列,消息队列同步消费

5.1.1 Controller层

    @Value("${machine-no}")private String machineNo;@Resourceprivate IRocketMQService rocketMQService;@RequestMapping("/sync")public JSONObject send(@RequestBody JSONObject param) {DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");LocalDateTime localDateTime = LocalDateTime.now();String formattedDate = dateTimeFormatter.format(localDateTime);System.out.println(formattedDate);log.info("【SpringBoot3测试】-【RocketMQ测试】:{}", formattedDate);param.put("time", formattedDate);param.put("name", "Sliver");param.put("machine_no", machineNo);SendResult sendResult = rocketMQService.sendMessage("TestTopic", param);JSONObject returnJSONObject = JSONObject.parseObject(JSONObject.toJSONString(sendResult));return returnJSONObject;}

5.1.2 Service接口

    /*** 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)* <p>* (send消息方法只要不抛异常,就代表发送成功。但不保证100%可靠投递(所有消息都一样,后面不在叙述)。* 要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。* 解析看:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md* )** @param destination 主题名:标签 topicName:tags* @param msg         发送对象* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendMessage(String destination, Object msg);

5.1.3 Service接口实现类

    @Overridepublic SendResult sendMessage(String topicName, Object msg) {MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);Message<?> message = messageBuilder.build();SendResult sendResult = rocketMQTemplate.syncSend(topicName, message);if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {log.info("【RocketMQ测试】发送同步消息成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);} else {log.warn("【RocketMQ测试】发送同步消息不一定成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);}return sendResult;}

5.1.4 访问测试

{"traceOn": true,"regionId": "DefaultRegion","messageQueue": {"queueId": 1,"topic": "TestTopic","brokerName": "broker-a"},"msgId": "0ACC4A893DF836BAF30C6BBBD62D0000","queueOffset": 0,"sendStatus": "SEND_OK","offsetMsgId": "C0A864C800002A9F00000000000764BC","transactionId": "0ACC4A893DF836BAF30C6BBBD62D0000"
}

后台日志

2024-11-21T22:04:32.151+08:00  INFO 15864 --- [rocketmq-test] [nio-8002-exec-4] c.s.controller.RocketMQSyncController    : 【SpringBoot3测试】-【RocketMQ测试】:2024-11-21 22:04:32.151695800
2024-11-21T22:04:32.202+08:00  INFO 15864 --- [rocketmq-test] [nio-8002-exec-4] c.s.service.impl.RocketMQServiceImpl     : 【RocketMQ测试】发送同步消息成功, topicName: TestTopic, msg: {"machine_no":"1","name":"Sliver","time":"2024-11-21 22:04:32.151695800"}, sendResult: SendResult [sendStatus=SEND_OK, msgId=0ACC4A893DF836BAF30C6BBBD62D0000, offsetMsgId=C0A864C800002A9F00000000000764BC, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1], queueOffset=0]
2024-11-21T22:04:32.205+08:00  INFO 15864 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQConsumer            : RocketMQ消费者接收时间:2024-11-21 22:04:32.205206100
2024-11-21T22:04:32.206+08:00  INFO 15864 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQConsumer            : RocketMQ消费者接收内容:{"machine_no":"1","name":"Sliver","time":"2024-11-21 22:04:32.151695800"}

5.2 同步带Tag消息测试

传入一个JSON,同步进入消息队列,消息队列同步消费,增加Tag

5.2.1 Controller层

    @Value("${machine-no}")private String machineNo;@Resourceprivate IRocketMQService rocketMQService;@RequestMapping("/syncWithTag")public JSONObject sendWithTag(@RequestBody JSONObject param) {DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");LocalDateTime localDateTime = LocalDateTime.now();String formattedDate = dateTimeFormatter.format(localDateTime);System.out.println(formattedDate);log.info("【SpringBoot3测试】-【RocketMQ测试】:{}", formattedDate);param.put("time", formattedDate);param.put("name", "Sliver");param.put("machine_no", machineNo);SendResult sendResult = rocketMQService.sendMessage("TestTopic", "DateTime", param);JSONObject returnJSONObject = JSONObject.parseObject(JSONObject.toJSONString(sendResult));return returnJSONObject;}

5.2.2 Service接口

    /*** 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)** @param topicName 主题名 topicName* @param tags      标签 tags* @param msg       发送对象* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendMessage(String topicName, String tags, Object msg);

5.2.3 Service接口实现类

    @Overridepublic SendResult sendMessage(String topicName, String tags, Object msg) {MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);Message<?> message = messageBuilder.build();SendResult sendResult = rocketMQTemplate.syncSend(topicName + ":" + tags, message);if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {log.info("【RocketMQ测试】发送同步带Tag消息成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);} else {log.warn("【RocketMQ测试】发送同步带Tag消息不一定成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);}return sendResult;}

5.2.4 访问测试

{"traceOn": true,"regionId": "DefaultRegion","messageQueue": {"queueId": 7,"topic": "TestTopic","brokerName": "broker-a"},"msgId": "0ACC4A893D7836BAF30C7F9864680000","queueOffset": 0,"sendStatus": "SEND_OK","offsetMsgId": "C0A864C800002A9F0000000000077510","transactionId": "0ACC4A893D7836BAF30C7F9864680000"
}

后台日志

2024-11-25T18:38:13.585+08:00  INFO 15736 --- [rocketmq-test] [nio-8002-exec-2] c.s.controller.RocketMQSyncController    : 【SpringBoot3测试】-【RocketMQ测试】:2024-11-25 18:38:13.585133600
2024-11-25T18:38:13.617+08:00  INFO 15736 --- [rocketmq-test] [nio-8002-exec-2] c.s.service.impl.RocketMQServiceImpl     : 【RocketMQ测试】发送同步带Tag消息成功, topicName: TestTopic, tag:DateTime, msg: {"machine_no":"1","name":"Sliver","time":"2024-11-25 18:38:13.585133600"}, sendResult: SendResult [sendStatus=SEND_OK, msgId=0ACC4A893D7836BAF30C7F9864680000, offsetMsgId=C0A864C800002A9F0000000000077510, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=7], queueOffset=0]
2024-11-25T18:38:13.621+08:00  INFO 15736 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQConsumer            : RocketMQ消费者接收时间:2024-11-25 18:38:13.621071200
2024-11-25T18:38:13.621+08:00  INFO 15736 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQConsumer            : RocketMQ消费者接收内容:{"machine_no":"1","name":"Sliver","time":"2024-11-25 18:38:13.585133600"}

5.3 同步带Tag及Key消息测试

传入一个JSON,同步进入消息队列,消息队列同步消费,增加Tag及Key

5.3.1 Controller层

    @Value("${machine-no}")private String machineNo;@Resourceprivate IRocketMQService rocketMQService;@RequestMapping("/syncWithTagAndKey")public JSONObject sendWithTagAndKey(@RequestBody JSONObject param) {DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");LocalDateTime localDateTime = LocalDateTime.now();String formattedDate = dateTimeFormatter.format(localDateTime);System.out.println(formattedDate);log.info("【SpringBoot3测试】-【RocketMQ测试】:{}", formattedDate);param.put("time", formattedDate);param.put("name", "Sliver");param.put("machine_no", machineNo);SendResult sendResult = rocketMQService.sendMessage("TestTopic", "DateTime", formattedDate, param);JSONObject returnJSONObject = JSONObject.parseObject(JSONObject.toJSONString(sendResult));return returnJSONObject;}

5.3.2 Service接口

    /*** 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)** @param topicName 主题名 topicName* @param tags      标签 tags* @param key       唯一标识码要设置到keys字段,方便将来定位消息丢失问题* @param msg       发送对象* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendMessage(String topicName, String tags, String key, Object msg);

5.3.3 Service接口实现类

    @Overridepublic SendResult sendMessage(String topicName, String tags, String key, Object msg) {MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);if (StringUtils.isNotBlank(key)) {messageBuilder.setHeader(MessageConst.PROPERTY_KEYS, key);}Message<?> message = messageBuilder.build();SendResult sendResult = rocketMQTemplate.syncSend(topicName + ":" + tags, message);if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {log.info("【RocketMQ测试】发送同步带Tag和Key消息成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);} else {log.warn("【RocketMQ测试】发送同步带Tag和Key消息不一定成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);}return sendResult;}

5.1.4 访问测试

{"traceOn": true,"regionId": "DefaultRegion","messageQueue": {"queueId": 5,"topic": "TestTopic","brokerName": "broker-a"},"msgId": "0ACC4A893D7836BAF30C7FA031620001","queueOffset": 0,"sendStatus": "SEND_OK","offsetMsgId": "C0A864C800002A9F000000000007769A","transactionId": "0ACC4A893D7836BAF30C7FA031620001"
}

后台日志

2024-11-25T18:46:44.833+08:00  INFO 15736 --- [rocketmq-test] [nio-8002-exec-5] c.s.controller.RocketMQSyncController    : 【SpringBoot3测试】-【RocketMQ测试】:2024-11-25 18:46:44.833604500
2024-11-25T18:46:44.837+08:00  INFO 15736 --- [rocketmq-test] [nio-8002-exec-5] c.s.service.impl.RocketMQServiceImpl     : 【RocketMQ测试】发送同步带Tag和Key消息成功, topicName: TestTopic, tag:DateTime, msg: {"machine_no":"1","name":"Sliver","time":"2024-11-25 18:46:44.833604500"}, sendResult: SendResult [sendStatus=SEND_OK, msgId=0ACC4A893D7836BAF30C7FA031620001, offsetMsgId=C0A864C800002A9F000000000007769A, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=5], queueOffset=0]
2024-11-25T18:46:44.838+08:00  INFO 15736 --- [rocketmq-test] [er-test-topic_2] c.s.consumer.RocketMQConsumer            : RocketMQ消费者接收时间:2024-11-25 18:46:44.838433100
2024-11-25T18:46:44.838+08:00  INFO 15736 --- [rocketmq-test] [er-test-topic_2] c.s.consumer.RocketMQConsumer            : RocketMQ消费者接收内容:{"machine_no":"1","name":"Sliver","time":"2024-11-25 18:46:44.833604500"}

5.4 延时消息测试

5.4.1 Controller层

    @Value("${machine-no}")private String machineNo;@Resourceprivate IRocketMQService rocketMQService;@RequestMapping("/delayLevel")public JSONObject sendDelayLevel(@RequestBody JSONObject param) {DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");LocalDateTime localDateTime = LocalDateTime.now();String formattedDate = dateTimeFormatter.format(localDateTime);System.out.println(formattedDate);log.info("【SpringBoot3测试】-【RocketMQ测试】:{}", formattedDate);param.put("time", formattedDate);param.put("name", "Sliver");param.put("machine_no", machineNo);SendResult sendResult = rocketMQService.sendDelayLevel("TestTopic:DateTime", param, 2);JSONObject returnJSONObject = JSONObject.parseObject(JSONObject.toJSONString(sendResult));return returnJSONObject;}

5.4.2 Service接口

    /*** 发送延时消息(超时时间,设置30s(默认3s))* 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h* 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18** @param destination    主题名:标签 topicName:tags* @param msg            发送对象* @param delayTimeLevel 延时等级(从1开始)* @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功*/SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel);

5.4.3 Service接口实现类

    @Overridepublic SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel) {Message<?> message = MessageBuilder.withPayload(msg).build();SendResult sendResult = rocketMQTemplate.syncSend(destination, message, 10000L, delayTimeLevel);if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {log.info("【RocketMQ测试】发送延时消息成功, destination: {}, msg: {}, sendResult: {}", destination, message, sendResult);} else {log.warn("【RocketMQ测试】发送延时消息不一定成功, destination: {}, msg: {}, sendResult: {}", destination, message, sendResult);}return sendResult;}

5.4.4 访问测试

{"traceOn": true,"regionId": "DefaultRegion","messageQueue": {"queueId": 3,"topic": "TestTopic","brokerName": "broker-a"},"msgId": "0ACC4A893D7836BAF30C7FA617190002","queueOffset": 0,"sendStatus": "SEND_OK","offsetMsgId": "C0A864C800002A9F0000000000077847","transactionId": "0ACC4A893D7836BAF30C7FA617190002"
}

后台日志

2024-11-25T19:00:08.204+08:00  INFO 15736 --- [rocketmq-test] [nio-8002-exec-1] c.s.controller.RocketMQSyncController    : 【SpringBoot3测试】-【RocketMQ测试】:2024-11-25 19:00:08.204141800
2024-11-25T19:00:08.207+08:00  INFO 15736 --- [rocketmq-test] [nio-8002-exec-1] c.s.service.impl.RocketMQServiceImpl     : 【RocketMQ测试】发送延时消息成功, destination: TestTopic:DateTime, msg: GenericMessage [payload={"machine_no":"1","name":"Sliver","time":"2024-11-25 19:00:08.204141800"}, headers={id=e14fb691-ca84-8c3b-969f-2b1f87e9a136, timestamp=1732532408204}], sendResult: SendResult [sendStatus=SEND_OK, msgId=0ACC4A893D7836BAF30C7FAC738C0003, offsetMsgId=C0A864C800002A9F0000000000077BB5, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=3], queueOffset=1]
2024-11-25T19:00:13.297+08:00  INFO 15736 --- [rocketmq-test] [er-test-topic_4] c.s.consumer.RocketMQConsumer            : RocketMQ消费者接收时间:2024-11-25 19:00:13.297856500
2024-11-25T19:00:13.297+08:00  INFO 15736 --- [rocketmq-test] [er-test-topic_4] c.s.consumer.RocketMQConsumer            : RocketMQ消费者接收内容:{"machine_no":"1","name":"Sliver","time":"2024-11-25 19:00:08.204141800"}

5.5 基本请求响应模式消息测试

这种模式,如果消费者不返回响应信息,会抛出异常

5.5.1 Controller层

    @Value("${machine-no}")private String machineNo;@Resourceprivate IRocketMQService rocketMQService;@RequestMapping("/sendAndReceive")public JSONObject sendAndReceive(@RequestBody JSONObject param) {DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");LocalDateTime localDateTime = LocalDateTime.now();String formattedDate = dateTimeFormatter.format(localDateTime);System.out.println(formattedDate);log.info("【SpringBoot3测试】-【RocketMQ测试】:{}", formattedDate);param.put("time", formattedDate);param.put("name", "Sliver");param.put("machine_no", machineNo);String sendResult = rocketMQService.sendAndReceive("TestTopic:DateTime", param, String.class);JSONObject returnJSONObject = JSONObject.parseObject(sendResult);return returnJSONObject;}

5.5.2 Service接口

    /*** 请求响应模式消息** @param destination description* @param msg description* @param type description* @return java.lang.String*/String sendAndReceive(String destination, Object msg, Type type);

5.5.3 Service接口实现类

    public String sendAndReceive(String destination, Object msg, Type type) {Message<?> message = MessageBuilder.withPayload(msg).build();String sendResult = rocketMQTemplate.sendAndReceive(destination, message, type);return sendResult;}

5.5.4 响应模式的消费者类

package com.sliverbullet.consumer;import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;@Component
@Slf4j
@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "my-consumer-test-topic", consumeTimeout = 1000L)
public class RocketMQReplyConsumer implements RocketMQReplyListener<String,String> {@Overridepublic String onMessage(String message) {DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");LocalDateTime localDateTime = LocalDateTime.now();String formattedDate = dateTimeFormatter.format(localDateTime);log.info("RocketMQ消费者接收时间:{}" ,formattedDate);log.info("RocketMQ消费者接收内容:{}" ,message);JSONObject result = new JSONObject();result.put("message","OK");return result.toJSONString();}
}

5.5.5 访问测试

后台日志

2024-11-26T11:00:19.973+08:00  INFO 4188 --- [rocketmq-test] [nio-8002-exec-2] c.s.controller.RocketMQSyncController    : 【SpringBoot3测试】-【RocketMQ测试】:2024-11-26 11:00:19.973858500
2024-11-26T11:00:20.010+08:00  INFO 4188 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQReplyConsumer       : RocketMQ消费者接收时间:2024-11-26 11:00:20.010355100
2024-11-26T11:00:20.010+08:00  INFO 4188 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQReplyConsumer       : RocketMQ消费者接收内容:{"machine_no":"1","name":"Sliver","time":"2024-11-26 11:00:19.973858500"}

5.6 请求响应模式设置响应时间消息测试

基本请求响应模式,自定义响应时间

其他和基本请求响应模式一样,增加响应时间,超过响应时间,会抛出异常

    @Overridepublic String sendAndReceive(String destination, Object msg, Type type) {Message<?> message = MessageBuilder.withPayload(msg).build();String sendResult = rocketMQTemplate.sendAndReceive(destination, message, type,3000);return sendResult;}

后台日志

2024-11-26T12:34:49.763+08:00  INFO 28728 --- [rocketmq-test] [nio-8002-exec-2] c.s.controller.RocketMQSyncController    : 【SpringBoot3测试】-【RocketMQ测试】:2024-11-26 12:34:49.763185400
2024-11-26T12:34:49.805+08:00  INFO 28728 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQReplyConsumer       : RocketMQ消费者接收时间:2024-11-26 12:34:49.804778100
2024-11-26T12:34:49.805+08:00  INFO 28728 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQReplyConsumer       : RocketMQ消费者接收内容:{"machine_no":"1","name":"Sliver","time":"2024-11-26 12:34:49.763185400"}

5.6 请求响应模式设置响应时间及延时消息测试

基本请求响应模式,自定义响应时间,以及延时

其他和基本请求响应模式一样,增加响应时间,以及延时,超过响应时间,会抛出异常

    @Overridepublic String sendAndReceive(String destination, Object msg, Type type) {Message<?> message = MessageBuilder.withPayload(msg).build();String sendResult = rocketMQTemplate.sendAndReceive(destination, message, type,6000,2);return sendResult;}

后台日志

2024-11-26T12:40:30.208+08:00  INFO 27856 --- [rocketmq-test] [nio-8002-exec-5] c.s.controller.RocketMQSyncController    : 【SpringBoot3测试】-【RocketMQ测试】:2024-11-26 12:40:30.208133000
2024-11-26T12:40:35.303+08:00  INFO 27856 --- [rocketmq-test] [er-test-topic_3] c.s.consumer.RocketMQReplyConsumer       : RocketMQ消费者接收时间:2024-11-26 12:40:35.303156700
2024-11-26T12:40:35.303+08:00  INFO 27856 --- [rocketmq-test] [er-test-topic_3] c.s.consumer.RocketMQReplyConsumer       : RocketMQ消费者接收内容:{"machine_no":"1","name":"Sliver","time":"2024-11-26 12:40:30.208133000"}

5.5 基本请求响应模式消息测试

5.5.1 Controller层

5.5.2 Service接口

5.5.3 Service接口实现类

5.5.4 访问测试

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词