欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > Rockermq的部署与使用(0-1)

Rockermq的部署与使用(0-1)

2025/5/5 18:06:49 来源:https://blog.csdn.net/2302_79840586/article/details/147703242  浏览:    关键词:Rockermq的部署与使用(0-1)

RocketMQ​ 是阿里巴巴开源的一款 ​分布式消息中间件,具有高吞吐、低延迟、高可用等特点,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

一.RockerMQ的概念: 

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。Rockermq官网链接地址

1. 架构分析:

  • 生产者(Producer):负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。
  • 消费者(Consumer):负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。
  • 消息服务器(Broker):是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。
  • 名称服务器(NameServer):用来保存 Broker 相关 Topic 等元信息并给 Producer ,提供 Consumer 查找 Broker 信息。

2.整体流程:

1、首先启动 Namesrv(名称服务器,Namesrv 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

2、Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包

心跳包中,包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。 注册成功后,Namesrv 集群中就有 Topic 跟 Broker 的映射关系。

3、收发消息前,先创建 Topic 。创建 Topic 时,需要指定该 Topic 要存储在哪些 Broker上。也可以在发送消息时自动创建Topic。

4、Producer 发送消息。

启动时,先跟 Namesrv 集群中的其中一台建立长连接,并从Namesrv 中获取当前发送的 Topic 存在哪些 Broker 上,然后跟对应的 Broker 建立长连接,直接向 Broker 发消息。

5、Consumer 消费消息。

Consumer 跟 Producer 类似。跟其中一台 Namesrv 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

二.Rockermq单机部署:

1.安装解压包:

rocketmq-all-4.6.0-source-release.zip

点击横杠即可下载安装包。

2.Windows部署 / Linux部署:

(1)Windows部署: 

解压后使用 Maven 编译 RocketMQ 源码。命令行操作如下: 

# 进入 RocketMQ 源码目录
$ cd rocketmq-all-4.6.0-source-release# Maven 编译 RocketMQ ,并跳过测试。耐心等待...
$ mvn -Prelease-all -DskipTests clean install -U

随后进入.\bin\mqnamesrv.cmd来启动Namesrv,如果报错 Java not found,请检查 JAVA_HOME 环境变量。

.\mqnamesrv.cmd

正常会出现下图: 

随后新建一个CMD窗口(不要关闭 NameServer),进入 bin 目录,启动Broker:

mqbroker.cmd -c ..\conf\broker.conf -n localhost:9876

(2)Linux部署:

使用wget命令下载,随后使用unzip命令解压: 

# 下载
$ wget wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-source-release.zip# 解压
$ unzip rocketmq-all-4.6.0-source-release.zip

随后同样使用 Maven 编译 RocketMQ 源码。命令行操作如下:

# 进入 RocketMQ 源码目录
$ cd rocketmq-all-4.6.0-source-release# Maven 编译 RocketMQ ,并跳过测试。耐心等待...
$ mvn -Prelease-all -DskipTests clean install -U

编译完成,在我们进入 distribution 目录下,就可以看到 RocketMQ 的发布包了。命令行操作如下:

# 进入 distribution 目录下
$ cd distribution/target/rocketmq-4.6.0/rocketmq-4.6.0# 打印目录
$ ls
40 -rwxr-xr-x   1 yunai  staff  17336 Nov 19 20:59 LICENSE8 -rwxr-xr-x   1 yunai  staff   1338 Nov 19 20:59 NOTICE
16 -rwxr-xr-x   1 yunai  staff   4225 Nov 19 20:59 README.md0 drwxr-xr-x   6 yunai  staff    192 Dec  3 12:48 benchmark # 性能基准测试0 drwxr-xr-x  30 yunai  staff    960 Nov 19 20:59 bin # 执行脚本0 drwxr-xr-x  12 yunai  staff    384 Nov 19 20:59 conf # 配置文件0 drwxr-xr-x  36 yunai  staff   1152 Dec  3 12:48 lib # RocketMQ jar 包

随后启动Namesrv:

nohup sh bin/mqnamesrv &

启动完成后,查看日志:

# 查看 Namesrv 日志。
$ tail -f ~/logs/rocketmqlogs/namesrv.log2019-12-03 12:58:04 INFO main - The Name Server boot success. serializeType=JSON

默认情况下,Namesrv 日志文件所在地址为 ~/logs/rocketmqlogs/namesrv.log 。如果想要自定义,可以通过 conf/logback_namesrv.xml 配置文件来进行修改。

 随后启动broker:

在 conf 目录下,RocketMQ 提供了多种 Broker 的配置文件:

  • broker.conf :单主,异步刷盘。
  • 2m/ :双主,异步刷盘。
  • 2m-2s-async/ :两主两从,异步复制,异步刷盘。
  • 2m-2s-sync/ :两主两从,同步复制,异步刷盘。
  • dledger/ :Dledger 集群,至少三节点。

这里,我们只启动一个 RocketMQ Broker 服务,所以使用 broker.conf 配置文件。命令行操作如下:

nohup sh bin/mqbroker -c conf/broker.conf  -n 127.0.0.1:9876 &
  • 通过 -c 参数,配置读取的主 Broker 配置。

  • 通过 -n 参数,设置 RocketMQ Namesrv 地址。

 3.测试消息:

(1)测试发送消息: 

通过使用 bin/tools.sh 工具类,实现测试发送消息。命令行操作如下:

# 设置 Namesrv 服务器的地址
export NAMESRV_ADDR=127.0.0.1:9876# 执行生产者 Producer 发送测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

如果发送成功,我们会看到大量成功的发送日志。

SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F100F4, offsetMsgId=C0A8032C00002A9F000000000000D7EE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=61]
SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F200F5, offsetMsgId=C0A8032C00002A9F000000000000D8D1, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=61]

通过发送结果为 sendStatus=SEND_OK 状态,说明消息都发送成功了。

(2)测试消费消息:

通过使用 bin/tools.sh 工具类,实现测试消费消息。命令行操作如下:

# 设置 Namesrv 服务器的地址
export NAMESRV_ADDR=127.0.0.1:9876# 执行消费者 Consumer 消费测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

如果消费成功,我们会看到大量成功的消费日志。

ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=131, sysFlag=0, bornTimestamp=1575354513732, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513733, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001D1FC, commitLogOffset=119292, bodyCRC=1549304357, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867104, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E944020E, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 50, 54], transactionId='null'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=130, sysFlag=0, bornTimestamp=1575354513729, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513729, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001CE70, commitLogOffset=118384, bodyCRC=1530218044, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867103, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E941020A, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 50, 50], transactionId='null'}]]

通过 ConsumeMessageThread_4 和 ConsumeMessageThread_3 线程名,我们可以看出,目前是进行并发消费消息。

三.Rockermq的简单使用实例:

(1)项目结构:

rocketmq-demo/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/example/
│   │   │       ├── producer/OrderProducer.java   # 生产者
│   │   │       ├── consumer/OrderConsumer.java  # 消费者
│   │   │       └── RocketmqDemoApplication.java # 启动类
│   │   └── resources/
│   │       └── application.yml                  # 配置文件
│   └── test/
└── pom.xml                                      # Maven依赖

(2)添加依赖:

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- RocketMQ Starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency>
</dependencies>

(3)编写配置文件:

rocketmq:name-server: localhost:9876  # NameServer地址producer:group: order-producer-group  # 生产者组consumer:group: order-consumer-group  # 消费者组topic: order-topic           # 订阅的Topic

(4)编写生产者代码:

package com.example.producer;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class OrderProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendOrderMessage(String orderId) {// 发送消息(Topic: order-topic, Tag: pay)rocketMQTemplate.convertAndSend("order-topic:pay", "订单支付成功,订单ID: " + orderId);System.out.println("生产者发送消息: " + orderId);}
}

(5)编写消费者代码:

package com.example.consumer;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(topic = "order-topic",          // 监听的TopicselectorExpression = "pay",     // 过滤Tag=pay的消息consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("消费者收到消息: " + message);}
}

(6)启动类:

package com.example;import com.example.producer.OrderProducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;@SpringBootApplication
public class RocketmqDemoApplication {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(RocketmqDemoApplication.class, args);// 测试发送消息OrderProducer producer = context.getBean(OrderProducer.class);producer.sendOrderMessage("1001");  // 发送订单ID=1001}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词