欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 创投人物 > 物联网协议之MQTT(二)服务端

物联网协议之MQTT(二)服务端

2025/6/8 16:12:45 来源:https://blog.csdn.net/tenyears940326/article/details/148498676  浏览:    关键词:物联网协议之MQTT(二)服务端

参考文档:mica-mqtt
示例代码:Gitee仓库

  经过前两章的基础铺垫,基础概念就不再过多赘述,大家还有不清楚的,可以访问上面的参考文档,感谢春哥对MQTT的详细介绍,很有用!
   mica-mqtt目前已经加入dromara组织,不仅支持SpringBoot2SpringBoot3,对SolonJFinal都能支持,作为Java开发者对MQTT的学习有极大的帮助。

本章使用测试工具为 MQTTX ,可回看一章获取:物联网协议之MQTT(一)基础概念和设备

一、依赖引入

<dependency><groupId>org.dromara.mica-mqtt</groupId><artifactId>mica-mqtt-server-spring-boot-starter</artifactId><version>${最新版本}</version>
</dependency>

按照官方的依赖,大家可以引入以上依赖。但是,作者其实还有个加强版的(付费版)的组件。不过源码不开源,但是依赖可以免费用。

<dependency><groupId>net.dreamlu</groupId><artifactId>mica-mqttx-server-spring-boot-starter</artifactId><version>${最新版本}</version>
</dependency>

  使用上面的mqttx的依赖就行,比开源版的性能更好,但是接口跟开源版是一样的,完全可以用开源的文档进行使用。

下面贴一张可实现接口,大家可以通过实现下面的接口来自定义功能,下面我会写部分我常用的接口,以供大家参考。
mica-mqtt可实现接口

二、配置项

mqtt:server:enabled: true               # 是否开启服务端,默认:true
#    ip: 0.0.0.0                # 服务端 ip 默认为空,0.0.0.0,建议不要设置port: 1883                  # 端口,默认:1883name: Mica-Mqtt-Server      # 名称,默认:Mica-Mqtt-Serverheartbeat-timeout: 120000   # 心跳超时,单位毫秒,默认: 1000 * 120read-buffer-size: 8KB       # 接收数据的 buffer size,默认:8kmax-bytes-in-message: 10MB  # 消息解析最大 bytes 长度,默认:10Mauth:enable: false             # 是否开启 mqtt 认证username: mica            # mqtt 认证用户名password: mica            # mqtt 认证密码debug: true                 # 如果开启 prometheus 指标收集建议关闭stat-enable: true           # 开启指标收集,debug 和 prometheus 开启时需要打开,默认开启,关闭节省内存proxy-protocol-enable: false   # 代理协议支持,nginx 可开启 tcp proxy_protocol on; 时转发源 ip 信息。2.4.1 版本开始支持web-port: 8083              # http、websocket 端口,默认:8083websocket-enable: true      # 是否开启 websocket,默认: truehttp-enable: false          # 是否开启 http api,默认: falsehttp-basic-auth:enable: false             # 是否开启 http basic auth,默认: falseusername: mica            # http basic auth 用户名password: mica            # http basic auth 密码ssl:                        # mqtt tcp ssl 认证enabled: false            # 是否开启 ssl 认证,2.1.0 开始支持双向认证keystore-path:            # 必须参数:ssl keystore 目录,支持 classpath:/ 路径。keystore-pass:            # 必选参数:ssl keystore 密码truststore-path:          # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。truststore-pass:          # 可选参数:ssl 双向认证 truststore 密码client-auth: none         # 是否需要客户端认证(双向认证),默认:NONE(不需要)

以上是官网上默认的配置,基本不用改动。


强烈建议大家改一下用户认证,就是把 mqtt.server.auth.enable = true 就行,下面的用户名、密码最好改一下。另外,用户名、密码理论上是没有限制的,但是并不代表采集设备配置服务端没有限制,所以建议大家密码可以有一定的复杂度,但是别离谱,尤其是不常用的符号就免了吧!


物联网平台虽然不想常规的web平台,但是没有认证的设备接入也是有一定风险性的,简单设置一下就能提升安全性,绝对值!

三、认证服务

实现IMqttServerAuthHandler 接口来完成此功能。大家可以把这个理解为微信加好友,大家肯定是不想收到各种各样的垃圾消息,只有通过好友请求(认证)的才允许相互通信。

3.1 接口实现

  个人认为这个功能是一定一定要有的,这样才能保证平台的安全险。
  当然不是一定要实现这个接口,比如上面通过配置把用户名、密码设置好,就是基本的认证功能。当然,实现接口可以实现更复杂的逻辑,比如:①不在系统的客户端不接受连接;②可以设计一套黑名单,拒绝部分IP或客户端连接。

@Configuration(proxyBeanMethods = false)
@Slf4j
@RequiredArgsConstructor
public class MqttAuthListener implements IMqttServerAuthHandler {@Value("${mqtt.server.auth.username}")private String authUserName;@Value("${mqtt.server.auth.password}")private String authPassword;@Overridepublic boolean authenticate(ChannelContext context, String uniqueId, String clientId, String userName, String password) {// 下面我们简单以配置文件的用户名、密码作为检验,你也可以自定义逻辑,甚至每个设备有单独的用户名、密码boolean auth = authUserName.equals(userName) && authPassword.equals(password);if (!auth) {log.error("客户端:{}用户名或密码不正确,不能连接。用户名:{},密码:{}", clientId, userName, password);return false;}if (StrUtil.isBlank(clientId)) {log.error("客户端:{}为空,不能连接", clientId);return false;}// TODO 比如可以从缓存中判断clientId是否存在,不存在的可以不接收// TODO 比如我们可以判断clientId是否被拉黑// TODO 比如我们可以通过ChannelContext判断IP是否被拉黑return true;}
}

如果实现此接口,配置文件里的用户名、密码就不生效了,会以实现接口的逻辑为准。

3.2 测试成功连接

使用MQTTX连接服务端,截图可能有很多次,如果clientId不同,别在意,看功能就行。
连接配置
客户端上线

3.3 测试用户名密码错误连接

修改用户名
服务端拒绝连接

四、客户端上下线监听

实现MqttConnectStatusListener 接口来完成此功能。大家可以理解为以前的QQ,有好友上线会有个通知。

  对于物联网平台,设备上下线是比较重要的,比如设备离线了,那么所有的监测就毫无意义。

4.1 接口实现

@Slf4j
@Component
@RequiredArgsConstructor
public class MqttConnectStatusListener implements IMqttConnectStatusListener {@Overridepublic void online(ChannelContext context, String clientId, String username) {log.info("设备【{}】上线",  clientId);}@Overridepublic void offline(ChannelContext context, String clientId, String username, String reason) {log.warn("设备【{}】离线",  clientId);}
}

官网上使用的@EventListener的方式来实现,我不是很习惯,就用的实现接口,大家喜欢的可以用一下

@Service
public class MqttConnectStatusListener {private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class);@EventListenerpublic void online(MqttClientOnlineEvent event) {logger.info("MqttClientOnlineEvent:{}", event);}@EventListenerpublic void offline(MqttClientOfflineEvent event) {logger.info("MqttClientOfflineEvent:{}", event);}}

4.2 上线测试

MQTTX连接
服务端上线

4.3 离线测试

MQTTX断开连接
服务端离线

五、消息接收

实现IMqttMessageListener 接口来完成此功能。大家可以理解给微信好友发消息。

5.1 接口实现

@Slf4j
@Component
@RequiredArgsConstructor
public class MqttServerMessageListener implements IMqttMessageListener {@Overridepublic void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qoS, MqttPublishMessage message) {String payload = ByteBufferUtil.toString(message.getPayload(), StandardCharsets.UTF_8);log.info("Topic:【{}】,收到客户端:【{}】消息:【{}】",  topic, clientId, payload);}
}

5.2 消息测试

消息发送
服务端接收消息

六、消息发送

使用MqttServerTemplate服务,实现服务端向客户端发送消息

6.1 代码实现

topic是必须的,如果你还是不了解topic是什么,建议先学习下。

@RestController
@RequiredArgsConstructor
public class TestPublishController {private final MqttServerTemplate mqttServerTemplate;@GetMapping("/test/publish")public void testPublish() {// 向所有订阅topic的客户端发送消息mqttServerTemplate.publishAll("/mqtt/testpublish", "mqtt server publishAll message".getBytes());// 向指定客户端发送消息mqttServerTemplate.publish("mqttx_f1e5c583", "/mqtt/testpublish", "mqtt server publish message".getBytes());}
}

6.2 客户端订阅topic

订阅topic

6.3 消息发布

浏览器调用6.1的接口

客户端收到消息

至此,MQTT服务端的基本操作已经完成!

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词