欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > 深入解析 RocketMQ 中的 BrokerOuterAPI 组件​

深入解析 RocketMQ 中的 BrokerOuterAPI 组件​

2025/5/30 11:21:25 来源:https://blog.csdn.net/u013127325/article/details/147017125  浏览:    关键词:深入解析 RocketMQ 中的 BrokerOuterAPI 组件​

在 RocketMQ 这一高性能分布式消息队列系统中,BrokerOuterAPI 组件犹如一座桥梁,连接着 Broker 与外部世界,在系统的运行、管理以及与其他组件交互过程中发挥着极为关键的作用。本文将深入探讨 BrokerOuterAPI 组件的内部机制、核心功能以及其在实际应用场景中的价值。​

一、BrokerOuterAPI 组件概述​

BrokerOuterAPI 并非一个孤立的模块,而是一组封装了 Broker 对外提供服务接口的集合。它涵盖了与客户端(Producer、Consumer)、其他 Broker 以及 NameServer 等进行通信和交互的关键逻辑。通过这些接口,Broker 能够接收并处理各种请求,实现消息的发送、消费、存储管理以及集群状态同步等核心功能。

二、核心功能剖析​

1.主要属性信息

    /*** netty客户端的组件*/private final RemotingClient remotingClient;/*** 地址*/private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());/*** nameServer地址*/private String nameSrvAddr = null;/*** 固定大小的线程池 4-10个*/private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
public class TopAddressing {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);/*** nameServer的地址*/private String nsAddr;/*** ws地址*/private String wsAddr;/*** 单元名称*/private String unitName;
}

2.核心方法

2.1fetchNameServerAddr

在 RocketMQ 中,BrokerOuterAPIfetchNameServerAddr方法在整个系统的架构和运行中扮演着不可或缺的角色。

方法功能

fetchNameServerAddr方法主要用于获取 NameServer 的地址信息。NameServer 在 RocketMQ 集群中承担着路由管理的重要职责,它维护着 Broker 的地址、主题与队列的映射关系等关键信息。Broker 需要与 NameServer 保持紧密通信,无论是注册自身信息、获取最新的路由数据,还是进行心跳检测等操作,都依赖于准确的 NameServer 地址。而fetchNameServerAddr方法正是为 Broker 提供了获取这些关键地址信息的途径。通过调用此方法,Broker 能够知晓 NameServer 的网络位置,进而建立起与 NameServer 之间的有效连接,确保后续各种交互操作得以顺利进行。

方法调用时机
  1. Broker 启动阶段:当 Broker 启动时,它首先需要知道 NameServer 的地址,以便能够向其注册自身信息并获取初始的路由数据。此时,Broker 会调用fetchNameServerAddr方法来获取 NameServer 的地址。例如,在一个新搭建的 RocketMQ 集群中,各个 Broker 节点在启动过程中,会通过此方法获取到预先配置或动态发现的 NameServer 地址,然后与 NameServer 建立连接,完成注册流程,使得自身能够被纳入到整个集群的管理体系中。

  2. 地址变更或重连场景:在 RocketMQ 集群运行过程中,可能会出现 NameServer 地址变更的情况,比如由于集群的扩容、网络架构调整等原因,NameServer 的地址发生了改变。或者当 Broker 与 NameServer 之间的连接因为网络故障等原因断开时,Broker 需要重新连接到 NameServer。在这些场景下,Broker 会再次调用fetchNameServerAddr方法,以获取最新有效的 NameServer 地址,从而重新建立连接,恢复与 NameServer 之间的通信,保证系统的正常运行。

代码

//获取NameServer的地址public String fetchNameServerAddr() {try {//获取到nameServer的地址String addrs = this.topAddressing.fetchNSAddr();if (addrs != null && !UtilAll.isBlank(addrs)) {if (!addrs.equals(this.nameSrvAddr)) {log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);this.updateNameServerAddressList(addrs);this.nameSrvAddr = addrs;return nameSrvAddr;}}} catch (Exception e) {log.error("fetchNameServerAddr Exception", e);}return nameSrvAddr;}//更新nameServer的地址public void updateNameServerAddressList(final String addrs) {List<String> lst = new ArrayList<String>();String[] addrArray = addrs.split(";");for (String addr : addrArray) {lst.add(addr);}//针对remotingClient 更新nameServer的地址this.remotingClient.updateNameServerAddressList(lst);}

2.2 registerBrokerAll

在 RocketMQ 里,BrokerOuterAPI中的registerBrokerAll方法是实现 Broker 在集群中注册与信息同步的关键。下面我将详细为你介绍它的功能、工作流程以及在集群管理中的重要性。

方法功能

registerBrokerAll方法主要用于 Broker 向 NameServer 注册自身信息,并且会同步一些关键的配置与状态数据,以确保 NameServer 掌握整个集群的最新布局与各 Broker 详细信息。这个方法的执行,让 Producer 和 Consumer 能够通过 NameServer 获取到准确的 Broker 地址与相关属性,从而实现消息的发送与消费。

工作流程
  1. 构建注册请求数据:当 Broker 启动或者检测到自身状态有重大变化(如新增或移除消息队列等)时,会调用registerBrokerAll方法。方法内部首先会收集一系列要注册的信息,包括 Broker 的唯一标识符(brokerId)、所属集群名称(clusterName)、Broker 地址(brokerAddr)、Master Broker 地址(如果当前 Broker 是 Slave 角色)、Broker 所支持的消息类型、当前 Broker 存储的消息队列信息(包括每个主题下的队列数量与分布情况)以及 Broker 的配置参数等。

  2. 向 NameServer 发送请求:将上述构建好的注册信息封装成网络请求,通过与 NameServer 建立的网络连接,发送到 NameServer 集群中的各个节点。RocketMQ 中的 NameServer 通常以集群形式部署,以保证高可用性和负载均衡,所以registerBrokerAll方法会确保注册信息同步到所有 NameServer 节点。

  3. NameServer 处理注册请求:NameServer 接收到注册请求后,会进行一系列处理。它会检查注册信息的完整性与合法性,比如验证brokerId是否唯一、clusterName是否存在等。若信息合法,NameServer 会将 Broker 的信息更新到其内部维护的路由表中。这个路由表记录了集群中所有 Broker 的详细信息,包括它们的地址、所属集群、负责的主题与队列等,是 Producer 和 Consumer 进行消息路由的重要依据。

  4. 返回注册结果:NameServer 处理完注册请求后,会向 Broker 返回注册结果。如果注册成功,Broker 会收到确认信息,表明其已经成功在 NameServer 中注册,并且后续可以正常接收来自 Producer 和 Consumer 的请求;若注册失败,NameServer 会返回失败原因,Broker 可能需要根据错误信息进行相应调整后重新尝试注册。

代码:

 /***  在这里 broker通过netty客户端组件进行向NameSever组件发起注册请求* @param clusterName 集群名称* @param brokerAddr broker地址* @param brokerName broker名字* @param brokerId brokerid* @param haServerAddr 高可用地址* @param topicConfigWrapper topic的元数据* @param filterServerList 过滤服务器* @param oneway 是否oneway请求* @param timeoutMills 超时时间* @param compressed 是否启用压缩* @return*/public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean compressed) {//初始化了一个List集合 用来存放Broker的注册结果的返回值final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();//获取nameServerAddressList的集合List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {//构建注册请求的请求头信息final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);//构建注册请求的请求体信息RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);//这块进行搞了一个CountDownLatch 只有向所有的NameServer注册完成之后才能继续执行final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(() -> {try {//真正的执行注册的操作RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;}private RegisterBrokerResult registerBroker(final String namesrvAddr,final boolean oneway,final int timeoutMills,final RegisterBrokerRequestHeader requestHeader,final byte[] body) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,InterruptedException {//下面两行代码将请求头和请求体封装到RemotingCommand中RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);request.setBody(body);//这个oneway为true的时候就是单向发送请求,不需要等待响应 属于一种特殊情况if (oneway) {try {this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);} catch (RemotingTooMuchRequestException e) {// Ignore}return null;}// 真正执行发送请求的代码RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);RegisterBrokerResult result = new RegisterBrokerResult();result.setMasterAddr(responseHeader.getMasterAddr());result.setHaServerAddr(responseHeader.getHaServerAddr());if (response.getBody() != null) {result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));}return result;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());}

2.3unregisterBrokerAll

在 RocketMQ 的 BrokerOuterAPI 组件里,unregisterBrokerAll方法在集群管理方面扮演着特殊且重要的角色。它主要用于从 NameServer 中注销特定 Broker 的所有相关信息,通常在 Broker 节点需要从集群中彻底移除或进行重大变更时被调用。

方法功能
  1. 信息移除unregisterBrokerAll方法会向 NameServer 发起请求,将该 Broker 在 NameServer 维护的路由表中所有与之相关的记录删除。这包括 Broker 的地址、所属集群名称、Broker 角色(主节点 Master 或从节点 Slave),以及该 Broker 所负责的主题与队列等信息。NameServer 依靠这些信息来引导生产者(Producer)和消费者(Consumer)与正确的 Broker 节点进行通信,当 Broker 通过此方法注销后,NameServer 会更新内部数据结构,使得其他组件不再能通过 NameServer 找到该 Broker 的相关信息。

  2. 关联资源清理:在 Broker 自身内部,该方法会触发一系列关联资源的清理操作。例如,它会关闭与其他 Broker 节点用于数据同步的网络连接,停止对消息存储相关资源的维护(如关闭一些文件句柄、释放缓存资源等),因为该 Broker 即将不再参与集群的数据处理和存储工作。同时,Broker 也会清理本地维护的与其他组件(如 Producer、Consumer)交互的会话信息等。

方法调用时机
  1. Broker 正常下线:当运维人员计划对某个 Broker 节点进行硬件升级、软件版本更新等操作,需要将该 Broker 从集群中暂时移除时,会调用unregisterBrokerAll方法。在操作完成且确保 Broker 符合上线条件后,再通过registerBroker方法重新注册到集群中。这样可以保证在 Broker 下线期间,不会有新的请求被路由到该节点,避免出现服务中断或数据不一致问题。

  2. Broker 故障处理:如果某个 Broker 节点出现严重故障,无法正常提供服务,并且短时间内难以修复,为了保障整个集群的稳定性和可用性,会立即调用unregisterBrokerAll方法将其从集群中注销。与此同时,集群中的其他 Broker 节点会根据配置和相关机制,接管故障 Broker 原本负责的部分工作,例如从节点(Slave)可能会切换为主节点(Master),继续提供消息存储和服务。

代码:

//broker的下线请求public void unregisterBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) {List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null) {for (String namesrvAddr : nameServerAddressList) {try {this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);} catch (Exception e) {log.warn("unregisterBroker Exception, {}", namesrvAddr, e);}}}}public void unregisterBroker(final String namesrvAddr,final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);}

三、总结与展望​

BrokerOuterAPI 组件作为 RocketMQ 中 Broker 对外交互的窗口,承载了消息发送、消费以及集群管理等核心功能,是 RocketMQ 能够高效、可靠运行的重要基石。深入理解 BrokerOuterAPI 的内部机制和功能,有助于开发者更好地优化 RocketMQ 的应用,提升分布式系统的性能和稳定性。随着分布式技术的不断发展和应用场景的日益复杂,相信 BrokerOuterAPI 组件也将持续演进,为 RocketMQ 在更多领域的广泛应用提供有力支撑。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词