Maven依赖
<properties><zeromq.version>0.5.2</zeromq.version>
</properties><!-- zeromq-->
<dependency><groupId>org.zeromq</groupId><artifactId>jeromq</artifactId><version>${zeromq.version}</version>
</dependency>
Yml配置
# zeromq配置
zeromq:# 过滤开关server:enabled: trueclient:enabled: trueserverHost: localhostserverPort: 5555
单客户端(Socket)非并发请求配置
package com.xxx.web.core.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;@Slf4j
@Configuration
public class ZeromqConfig {@Value("${zeromq.client.enabled:true}")private Boolean enabledClient;@Value("${zeromq.server.enabled:true}")private Boolean enabledServer;@Value("${zeromq.serverHost:localhost}")private String serverHost;@Value("${zeromq.serverPort:5555}")private Integer serverPort;@Bean(name = "zmqReqSocket")public ZMQ.Socket zmqReqSocket() {ZMQ.Socket socket = null;if (enabledClient) {ZMQ.Context context = ZMQ.context(1);socket = context.socket(SocketType.REQ);String uri = "tcp://" + serverHost + ":"+ serverPort;socket.connect(uri);log.info("zeromq connect to " + uri + " success");}return socket;}@Bean(name = "zmqRepSocket")public ZMQ.Socket zmqRepSocket() {ZMQ.Socket socket = null;if (enabledServer) {ZMQ.Context context = ZMQ.context(1);socket = context.socket(SocketType.REP);String uri = "tcp://" + serverHost + ":" + serverPort;socket.bind(uri);log.info("zeromq bind to " + uri + " success");}return socket;}
}
客户端
package com.xxx.web.service;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.zeromq.ZMQ;import javax.annotation.Resource;@Slf4j
@Service
public class ZmqService {@Resourceprivate ZMQ.Socket zmqReqSocket;public String sendRequest(String request){try {// 发送请求zmqReqSocket.send(request.getBytes(), 0);// 接收响应byte[] response = zmqReqSocket.recv(0);return new String(response);}catch (Exception e){log.warn("zmq sendRequest error, e:{}", e);}return StringUtils.EMPTY;}
}
服务端
package com.xxx.runner;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.zeromq.ZMQ;import javax.annotation.Resource;/*** @author xxx.xxx* @description ZmqRepServerRunner 测试ZMQ使用,后续需要删除或者注释掉Component注解* @date 2024/10/17 15:42*/
@Component
@Order(1)
@Slf4j
public class ZmqRepServerRunner implements ApplicationRunner {@Resourceprivate ZMQ.Socket zmqRepSocket;@Overridepublic void run(ApplicationArguments args) throws Exception {while (true) {// 等待接收消息byte[] request = zmqRepSocket.recv(0);String text = new String(request);log.info("接收到消息: " + text);// 返回消息Thread.sleep(20*1000); //延迟20sbyte[] reply = ("你好,我是服务端。 原输入:"+text).getBytes();zmqRepSocket.send(reply, 0);}}
}
特性 / 问题
此种注入方式,只允许客户端非并发形式请求Zmq服务。
比如:
服务端设置了20s的延迟,模拟服务请求需要20s才能处理完成并回复响应。当发起第一个请求之后的20s内,如果有业务逻辑再调用ZmqService的sendRequest方法请求Zmq服务,则会报错,因为注入的是同一个Socket,而此Socket上一个请求还未完成。
多客户端并发请求配置
package com.xxx.web.core.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;import javax.annotation.PostConstruct;/*** @author xxx.xxx* @description Zeromq多实例配置* @date 2024/10/18 11:02*/
@Slf4j
@Configuration
public class ZeromqMultConfig {@Value("${zeromq.client.enabled:true}")private Boolean enabledClient;@Value("${zeromq.server.enabled:true}")private Boolean enabledServer;@Value("${zeromq.serverHost:localhost}")private String serverHost;@Value("${zeromq.serverPort:5555}")private Integer serverPort;private ZMQ.Context zmqContext;@PostConstructpublic void init() {// 初始化ZeroMQ上下文,这通常在整个应用程序生命周期内是唯一的zmqContext = ZMQ.context(1);}@Bean(name = "zmqReqSocketFactory")public ZMQSocketFactory zmqReqSocketFactory() {return new ZMQSocketFactory(zmqContext, enabledClient, serverHost, serverPort, SocketType.REQ);}@Bean(name = "zmqRepSocketFactory")public ZMQSocketFactory zmqRepSocketFactory() {return new ZMQSocketFactory(zmqContext, enabledServer, serverHost, serverPort, SocketType.REP);}// 工厂类,用于根据需要创建ZMQ.Socket实例public class ZMQSocketFactory {private final ZMQ.Context context;private final boolean enabled;private final String serverHost;private final int serverPort;private final SocketType socketType;public ZMQSocketFactory(ZMQ.Context context, boolean enabled, String serverHost, int serverPort, SocketType socketType) {this.context = context;this.enabled = enabled;this.serverHost = serverHost;this.serverPort = serverPort;this.socketType = socketType;}public ZMQ.Socket createSocket() {if (!enabled) {return null;}ZMQ.Socket socket = context.socket(socketType);String uri = "tcp://" + serverHost + ":" + serverPort;if (socketType == SocketType.REQ) {socket.connect(uri);log.info("ZeroMQ client connected to {}", uri);} else if (socketType == SocketType.REP) {socket.bind(uri);log.info("ZeroMQ server bound to {}", uri);}return socket;}}
}
客户端
package com.xxx.web.service;import com.fdbatt.web.core.config.ZeromqMultConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.zeromq.ZMQ;import javax.annotation.Resource;@Slf4j
@Service
public class ZmqService {@Resourceprivate ZeromqMultConfig.ZMQSocketFactory zmqReqSocketFactory;public String sendRequest(String request){ZMQ.Socket zmqReqSocket = null;try {zmqReqSocket = zmqReqSocketFactory.createSocket();// 发送请求zmqReqSocket.send(request.getBytes(), 0);// 接收响应byte[] response = zmqReqSocket.recv(0);return new String(response);}catch (Exception e){log.warn("zmq sendRequest error, e:{}", e);}finally {if(null != zmqReqSocket){zmqReqSocket.close();}}return StringUtils.EMPTY;}
}
服务端
跟前面第一种情况一样
package com.xxx.runner;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.zeromq.ZMQ;import javax.annotation.Resource;/*** @author xxx.xxx* @description ZmqRepServerRunner 测试ZMQ使用,后续需要删除或者注释掉Component注解* @date 2024/10/17 15:42*/
@Component
@Order(1)
@Slf4j
public class ZmqRepServerRunner implements ApplicationRunner {@Resourceprivate ZMQ.Socket zmqRepSocket;@Overridepublic void run(ApplicationArguments args) throws Exception {while (true) {// 等待接收消息byte[] request = zmqRepSocket.recv(0);String text = new String(request);log.info("接收到消息: " + text);// 返回消息Thread.sleep(20*1000); //延迟20sbyte[] reply = ("你好,我是服务端。 原输入:"+text).getBytes();zmqRepSocket.send(reply, 0);}}
}
特性 / 问题
此种注入方式,允许多客户端并发形式请求Zmq服务,并不会出现单客户端的报错问题。但由于多个客户端连接的是同一个服务,且服务端类型为REP,即同步阻塞的。因此虽然在前一个请求逻辑未完成时,再请求,不会再出现报错问题;但依然需要等待前一个请求完成后,方会执行当前的请求。
比如:
服务端设置了20s的延迟,模拟服务请求需要20s才能处理完成并回复响应。当发起第一个请求之后的第10s,业务逻辑再调用ZmqService的sendRequest方法请求Zmq服务发起第二个请求。之后再过10s,第一个请求已经经过了20s,就会返回响应结果。而第二个请求,此时才会开始处理,也就是在第一个20s后,需再经过20s。
也就是如下经过:
第一个请求触发 第0s
第二个请求触发 第10s
第一个请求响应 第20s
第二个请求响应 第40s
第二个请求,从触发到响应,经过了30s,而服务端只是20s延迟,是因为有10s是在等待服务器处理上一个请求。