欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 教育 > 幼教 > springboot简单集成t-io框架实现即时通讯

springboot简单集成t-io框架实现即时通讯

2025/9/25 2:04:08 来源:https://blog.csdn.net/qq_38227017/article/details/144734302  浏览:    关键词:springboot简单集成t-io框架实现即时通讯

springboot简单集成t-io框架实现即时通讯

  • 1、pom.xml添加相关依赖
    • 2、配置文件添加配置(application.yml)
    • 3、启动类添加注解
    • 4、添加消息接收处理类,包括连接握手,消息发送都在这里面
    • 5、实现WsServerAioListener类,自定义各类消息的前置或后续处理
    • 6、继承IpStatListener类,实现ip监控,实现相关业务
    • 7、启动项目并进行连接
    • 8、客户端如何发送消息
    • 9、最后啰嗦,问题探讨
      • 1、手动注册一个服务到nacos
      • 2、直接通过网关配置真实地址做负载,不走服务名
  • 结束

话不多说,直接上代码吧

先奉上官网地址文档:https://www.tiocloud.com/doc/tio/125?pageNumber=1
开源源码地址:https://gitee.com/tywo45/t-io

1、pom.xml添加相关依赖

<dependency><groupId>org.t-io</groupId><artifactId>tio-websocket-spring-boot-starter</artifactId><version>3.6.0.v20200315-RELEASE</version>
</dependency>

版本我用的是3.6.0.v20200315-RELEASE这个,目前(2024年12月26日09:40:40)最高的版本(3.8.6.v20240801-RELEASE)好像需要jdk17以上了,不想升级jdk。

2、配置文件添加配置(application.yml)

tio:websocket:server:port: 9876heartbeat-timeout: 60000#是否支持集群,集群开启需要rediscluster:enabled: falseredis:ip: 127.0.0.1port: 6379password: 123456# all: true# group: true# user: true# ip: true# channel: true

t-io是可以支持集群的,但是他很强大,一般中小型的项目单体就够了,喜欢集群就开启集群,配置redis,启动多个实例,做负载均衡就可以了。这里主要介绍单例模式。好了继续。。。

3、启动类添加注解

@EnableTioWebSocketServer

就是我们springboot的启动类,给你们看一个完整的吧

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.tio.websocket.starter.EnableTioWebSocketServer;@SpringBootApplication
@EnableTioWebSocketServer
public class WebsocketApplication {public static void main(String[] args) {SpringApplication.run(WebsocketApplication.class, args);}
}

到这里springboot简单集成t-io就已经完成了,启动项目就可以用了,简单吧。
但是怎么用呢?接下来就看我们的业务如何去使用

4、添加消息接收处理类,包括连接握手,消息发送都在这里面

import cn.hutool.core.util.StrUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.common.starter.annotation.TioServerMsgHandler;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.common.WsSessionContext;
import org.tio.websocket.server.handler.IWsMsgHandler;import java.util.Map;
import java.util.Objects;/*** @author tanyaowu* 2017年6月28日 下午5:32:38*/
@TioServerMsgHandler
@Component
public class ShowcaseWsMsgHandler implements IWsMsgHandler {private static Logger log = LoggerFactory.getLogger(ShowcaseWsMsgHandler.class);/*** 握手时走这个方法,业务可以在这里获取cookie,request参数等*/@Overridepublic HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {String clientip = request.getClientIp();String groupId = request.getParam("groupId");String userId = request.getParam("userId");Tio.bindUser(channelContext, userId);log.info("收到来自{}的ws握手包\r\n{}", clientip, request.toString());// 这里是用户发起连接的时候首先触发的方法,其实这里也可以去绑定用户,绑定组之类的业务// 但是这里只是初始连接,不一定连接得上,所有我们在下面这个连接之后的方法去做绑定业务,具体// 看自己的需求return httpResponse;}/*** @param httpRequest* @param httpResponse* @param channelContext* @throws Exception* @author tanyaowu*/@Overridepublic void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {Map<String, Object[]> params = httpRequest.getParams();String groupId = httpRequest.getParam("groupId");String userId = httpRequest.getParam("userId");//绑定到群组,后面会有群发Tio.bindGroup(channelContext, groupId);// 这里是连接成功之后触发的方法,在这里可以去绑定用户id,绑定群id等等// 它是支持一人绑定多个群的,这里可以查询个人的所有群进行绑定,后续可以发送消息log.info(msg);}/*** 字节消息(binaryType = arraybuffer)过来后会走这个方法*/@Overridepublic Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {return null;}/*** 当客户端发close flag时,会走这个方法*/@Overridepublic Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {// 这里是将断开的人移除Tio.remove(channelContext, "receive close flag");String userId = channelContext.userid;// 这里是有人断开连接时触发的方法,可以做相关的业务处理return null;}/** 字符消息(binaryType = blob)过来后会走这个方法*/@Overridepublic Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get();HttpRequest httpRequest = wsSessionContext.getHandshakeRequest();//获取websocket握手包if (log.isDebugEnabled()) {log.debug("握手包:{}", httpRequest);}//		log.info("收到ws消息:{}", text);String userId = channelContext.userid;// 这里收到消息后去做业务处理return null;}}

到这里,集成才算是真正的完成,很简单,就是添加依赖,添加配置,添加业务处理类,做自己的业务处理。没了,简单的集成就完成了。
但是有些同学可能需要更深入的业务处理。
比如说:比如说我消息处理完之后我要做一些后续的业务,例如:聊天记录的处理,日志等等。
或者考虑到安全问题,有恶意连接的ip怎么处理?是否可以加入很名单等等。
如果有需要就加下面这个两个处理类。

5、实现WsServerAioListener类,自定义各类消息的前置或后续处理


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.intf.Packet;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.common.WsSessionContext;
import org.tio.websocket.server.WsServerAioListener;/*** @author tanyaowu* 用户根据情况来完成该类的实现*/
@Component
public class ShowcaseTioServerListener extends WsServerAioListener {private static Logger log = LoggerFactory.getLogger(ShowcaseTioServerListener.class);@Overridepublic void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {super.onAfterConnected(channelContext, isConnected, isReconnect);
//		if (log.isInfoEnabled()) {
//			log.info("onAfterConnected\r\n{}", channelContext);
//		}}@Overridepublic void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {super.onAfterSent(channelContext, packet, isSentSuccess);
//		if (log.isInfoEnabled()) {
//			log.info("onAfterSent\r\n{}\r\n{}", packet.logstr(), channelContext);
//		}}@Overridepublic void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {super.onBeforeClose(channelContext, throwable, remark, isRemove);if (log.isInfoEnabled()) {log.info("onBeforeClose\r\n{}", channelContext);}WsSessionContext wsSessionContext = (WsSessionContext) channelContext.get();}@Overridepublic void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {super.onAfterDecoded(channelContext, packet, packetSize);
//		if (log.isInfoEnabled()) {
//			log.info("onAfterDecoded\r\n{}\r\n{}", packet.logstr(), channelContext);
//		}}@Overridepublic void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {super.onAfterReceivedBytes(channelContext, receivedBytes);
//		if (log.isInfoEnabled()) {
//			log.info("onAfterReceivedBytes\r\n{}", channelContext);
//		}}@Overridepublic void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {super.onAfterHandled(channelContext, packet, cost);
//		if (log.isInfoEnabled()) {
//			log.info("onAfterHandled\r\n{}\r\n{}", packet.logstr(), channelContext);
//		}}}

6、继承IpStatListener类,实现ip监控,实现相关业务


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.tio.core.ChannelContext;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.core.stat.IpStat;
import org.tio.core.stat.IpStatListener;/*** * @author tanyaowu**/
@Component
public class ShowcaseIpStatListener implements IpStatListener {@SuppressWarnings("unused")private static Logger log = LoggerFactory.getLogger(ShowcaseIpStatListener.class);public static final ShowcaseIpStatListener me = new ShowcaseIpStatListener();/*** */private ShowcaseIpStatListener() {}@Overridepublic void onExpired(TioConfig tioConfig, IpStat ipStat) {//在这里把统计数据入库中或日志
//		if (log.isInfoEnabled()) {
//			log.info("可以把统计数据入库\r\n{}", Json.toFormatedJson(ipStat));
//		}}@Overridepublic void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect, IpStat ipStat) throws Exception {
//		if (log.isInfoEnabled()) {
//			log.info("onAfterConnected\r\n{}", Json.toFormatedJson(ipStat));
//		}}@Overridepublic void onDecodeError(ChannelContext channelContext, IpStat ipStat) {
//		if (log.isInfoEnabled()) {
//			log.info("onDecodeError\r\n{}", Json.toFormatedJson(ipStat));
//		}}@Overridepublic void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess, IpStat ipStat) throws Exception {
//		if (log.isInfoEnabled()) {
//			log.info("onAfterSent\r\n{}\r\n{}", packet.logstr(), Json.toFormatedJson(ipStat));
//		}}@Overridepublic void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize, IpStat ipStat) throws Exception {
//		if (log.isInfoEnabled()) {
//			log.info("onAfterDecoded\r\n{}\r\n{}", packet.logstr(), Json.toFormatedJson(ipStat));
//		}}@Overridepublic void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes, IpStat ipStat) throws Exception {
//		if (log.isInfoEnabled()) {
//			log.info("onAfterReceivedBytes\r\n{}", Json.toFormatedJson(ipStat));
//		}}@Overridepublic void onAfterHandled(ChannelContext channelContext, Packet packet, IpStat ipStat, long cost) throws Exception {
//		if (log.isInfoEnabled()) {
//			log.info("onAfterHandled\r\n{}\r\n{}", packet.logstr(), Json.toFormatedJson(ipStat));
//		}}}

好了,到这里算是真真正正的完成了,简单的基础就这样。当然还有很多的东西,可以自己去官网了解。这里就不做更多介绍。
接下来看一下如何使用吧。

7、启动项目并进行连接

看到下面这个,说明启动成功。

在这里插入图片描述

接下来我们找一个在线测试工具试一下:
推荐两个:
http://www.websocket-test.com/
http://wstool.js.org/
这两个各有优劣
第一个不能定时发心跳,第二个可以发心跳
但是第二个好像只能连接本地的127.0.0.1,连192.168.. 就连不了

我们用第二个看一下吧
在这里插入图片描述
这个就是连接成功的
我的连接地址是:
ws://127.0.0.1:9876?userId=1&groupId=178979558244139008
后面的参数可以自己带,就跟get 请求一样,拼接在url后面就可以了。我还定时3秒钟发了一个心跳。

8、客户端如何发送消息

我相信很多同学都有我一样的问题,因为我们集成这个不一定是做聊天工具,一个发,一个收。
很多时候我们都是通过服务端主动给某个人发的,没有所谓的发送,或者有些是通过http接口去发送的消息。那我们怎么发送呢?
接下来我们写一个专门的消息收发的业务处理类

@Component
@Slf4j
public class MsgHandel {// 注入bean 这个非常重要,后面所有的消息发送,包括单发给个人,还是群发都需要这个@Autowiredprivate TioWebSocketServerBootstrap tioWebSocketServerBootstrap;// 发送消息,单发给某个人,这里的userId 就是我们之前在用户连接的时候绑定的用户Id// 陷入回忆:Tio.bindUser(channelContext, userId);public void sendObject(String userId, Object object) {WsResponse wsResponse = WsResponse.fromText(JSONObject.toJSONString(object), CHARSET);//单发Tio.sendToUser(tioWebSocketServerBootstrap.getServerTioConfig(), userId, wsResponse);}//群发,这个的groupId 也就是我们刚刚绑定的群Id// 再次陷入回忆:Tio.bindGroup(channelContext, groupId);// 当然前提是你前端连接的时候带了群Id或者自己定义了群Idpublic void sendOnlineNum(String groupId) {// 包装消息体JSONObject bodyJsonObject = new JSONObject();bodyJsonObject.put("data", "所有人,在吗?在的请举手");// 群发最新在线人WsResponse wsResponse = WsResponse.fromText(bodyJsonObject.toJSONString(), CHARSET);//群发Tio.sendToGroup(tioWebSocketServerBootstrap.getServerTioConfig(), groupId, wsResponse);}}

9、最后啰嗦,问题探讨

到这里就结束了,最后啰嗦一点吧,这个也是我当前没有完善的部分,有好的解决办法的可以评论区支支招。
现在很多项目都是微服务,我们刚刚这种是直接调的本服务的ip+端口的方式。因为t-io 底层是使用netty写的,大家都知道,netty是单独监听一个端口的,我们这个也是一样,那么我们在启动项目的时候,不知道大家有没有发现,我们springboot项目也监听了一个端口,相当于我们的一个项目启动了两个实例,而且微服务注册到nacos注册中心的是springboot监听的端口。
这样我们做服务访问,或者网关路由转发的时候就不能转发到我们的 t-io 监听的端口上面来。
我想了几个方法,但是都不太如意,最后还是使用单体了,体量不大也能够用。

1、手动注册一个服务到nacos

package ren.chemi3.tio;import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.tio.server.ServerTioConfig;
import org.tio.websocket.server.WsServerStarter;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Properties;/*** websocket 配置类*/
@Configuration
public class WebSocketConfig {@Resourceprivate NacosDiscoveryProperties nacosDiscoveryProperties;@Value("${tio.websocket.server.port}")private Integer port;/*** 将服务注册进Nacos**/@PostConstructpublic void registerNamingService() {try {Properties properties = new Properties();properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace());NamingService namingService = NamingFactory.createNamingService(properties);InetAddress address = InetAddress.getLocalHost();namingService.registerInstance("chemi3-websocket-im", address.getHostAddress(), port);} catch (Exception e) {throw new RuntimeException(e);}}
}

上面我手动注册了一个名为chemi3-websocket-im 的服务名称到注册中心,嗯,注册成功,没有问题
在这里插入图片描述
我还启动了两个
在这里插入图片描述
看似没有问题。接下来就是没在网关了

routes:- id: chemi3-websocket-im# uri: ws://127.0.0.1:9876 uri: lb:ws://chemi3-websocket-impredicates: - Path=/ws/**filters:- StripPrefix=1

这样应该没问题吧,按道理,我现在访问 ws://ip:网关端口/ws 应该可以连接才对,但是那就是不能连接,我也不知道什么问题。
那就试试第二种方法,第二种方法好像行,但是好像又不太行。

2、直接通过网关配置真实地址做负载,不走服务名

项目启动不变,修改一下网关配置

routes:- id: chemi3-websocket-imuri: ws://127.0.0.1:9876predicates: - Path=/ws/**- Weight=group1,50filters:- StripPrefix=1- id: chemi3-websocket-im1uri: ws://127.0.0.1:9875predicates: - Path=/ws/**- Weight=group1,50filters:- StripPrefix=1

这样就是通过网关来做负载均衡,好像是可以的,但是感觉不太稳妥。而且这样如果再启动更多的服务就只能在这里加配置,ip和端口写死。我没有具体去做线上的实际使用,你们可以试一下。


结束

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com