参考文档:mica-mqtt
示例代码:Gitee仓库
经过前两章的基础铺垫,基础概念就不再过多赘述,大家还有不清楚的,可以访问上面的参考文档
,感谢春哥对MQTT的详细介绍,很有用!
mica-mqtt
目前已经加入dromara
组织,不仅支持SpringBoot2
、SpringBoot3
,对Solon
、JFinal
都能支持,作为Java开发者对MQTT的学习有极大的帮助。
本章使用测试工具为 MQTTX ,可回看一章获取
:物联网协议之MQTT(一)基础概念和设备
一、依赖引入
<dependency><groupId>org.dromara.mica-mqtt</groupId><artifactId>mica-mqtt-server-spring-boot-starter</artifactId><version>${最新版本}</version>
</dependency>
按照官方的依赖,大家可以引入以上依赖。但是,作者其实还有个加强版的(付费版)的组件。不过源码不开源,但是依赖可以免费用。
<dependency><groupId>net.dreamlu</groupId><artifactId>mica-mqttx-server-spring-boot-starter</artifactId><version>${最新版本}</version>
</dependency>
使用上面的mqttx的依赖就行,比开源版的性能更好,但是接口跟开源版是一样的,完全可以用开源的文档进行使用。
下面贴一张可实现接口,大家可以通过实现下面的接口来自定义功能,下面我会写部分我常用的接口,以供大家参考。
二、配置项
mqtt:server:enabled: true # 是否开启服务端,默认:true
# ip: 0.0.0.0 # 服务端 ip 默认为空,0.0.0.0,建议不要设置port: 1883 # 端口,默认:1883name: Mica-Mqtt-Server # 名称,默认:Mica-Mqtt-Serverheartbeat-timeout: 120000 # 心跳超时,单位毫秒,默认: 1000 * 120read-buffer-size: 8KB # 接收数据的 buffer size,默认:8kmax-bytes-in-message: 10MB # 消息解析最大 bytes 长度,默认:10Mauth:enable: false # 是否开启 mqtt 认证username: mica # mqtt 认证用户名password: mica # mqtt 认证密码debug: true # 如果开启 prometheus 指标收集建议关闭stat-enable: true # 开启指标收集,debug 和 prometheus 开启时需要打开,默认开启,关闭节省内存proxy-protocol-enable: false # 代理协议支持,nginx 可开启 tcp proxy_protocol on; 时转发源 ip 信息。2.4.1 版本开始支持web-port: 8083 # http、websocket 端口,默认:8083websocket-enable: true # 是否开启 websocket,默认: truehttp-enable: false # 是否开启 http api,默认: falsehttp-basic-auth:enable: false # 是否开启 http basic auth,默认: falseusername: mica # http basic auth 用户名password: mica # http basic auth 密码ssl: # mqtt tcp ssl 认证enabled: false # 是否开启 ssl 认证,2.1.0 开始支持双向认证keystore-path: # 必须参数:ssl keystore 目录,支持 classpath:/ 路径。keystore-pass: # 必选参数:ssl keystore 密码truststore-path: # 可选参数:ssl 双向认证 truststore 目录,支持 classpath:/ 路径。truststore-pass: # 可选参数:ssl 双向认证 truststore 密码client-auth: none # 是否需要客户端认证(双向认证),默认:NONE(不需要)
以上是官网上默认的配置,基本不用改动。
强烈建议大家改一下用户认证,就是把 mqtt.server.auth.enable = true 就行,下面的用户名、密码最好改一下。另外,用户名、密码理论上是没有限制的,但是并不代表采集设备配置服务端没有限制,所以建议大家密码可以有一定的复杂度,但是别离谱,尤其是不常用的符号就免了吧!
物联网平台虽然不想常规的web平台,但是没有认证的设备接入也是有一定风险性的,简单设置一下就能提升安全性,绝对值!
三、认证服务
实现IMqttServerAuthHandler 接口来完成此功能。大家可以把这个理解为微信加好友,大家肯定是不想收到各种各样的垃圾消息,只有通过好友请求(认证)的才允许相互通信。
3.1 接口实现
个人认为这个功能是一定一定要有的,这样才能保证平台的安全险。
当然不是一定要实现这个接口,比如上面通过配置把用户名、密码设置好,就是基本的认证功能。当然,实现接口可以实现更复杂的逻辑,比如:①不在系统的客户端不接受连接;②可以设计一套黑名单,拒绝部分IP或客户端连接。
@Configuration(proxyBeanMethods = false)
@Slf4j
@RequiredArgsConstructor
public class MqttAuthListener implements IMqttServerAuthHandler {@Value("${mqtt.server.auth.username}")private String authUserName;@Value("${mqtt.server.auth.password}")private String authPassword;@Overridepublic boolean authenticate(ChannelContext context, String uniqueId, String clientId, String userName, String password) {// 下面我们简单以配置文件的用户名、密码作为检验,你也可以自定义逻辑,甚至每个设备有单独的用户名、密码boolean auth = authUserName.equals(userName) && authPassword.equals(password);if (!auth) {log.error("客户端:{}用户名或密码不正确,不能连接。用户名:{},密码:{}", clientId, userName, password);return false;}if (StrUtil.isBlank(clientId)) {log.error("客户端:{}为空,不能连接", clientId);return false;}// TODO 比如可以从缓存中判断clientId是否存在,不存在的可以不接收// TODO 比如我们可以判断clientId是否被拉黑// TODO 比如我们可以通过ChannelContext判断IP是否被拉黑return true;}
}
如果实现此接口,配置文件里的用户名、密码就不生效了,会以实现接口的逻辑为准。
3.2 测试成功连接
使用MQTTX连接服务端,截图可能有很多次,如果clientId不同,别在意,看功能就行。
3.3 测试用户名密码错误连接
四、客户端上下线监听
实现MqttConnectStatusListener 接口来完成此功能。大家可以理解为以前的QQ,有好友上线会有个通知。
对于物联网平台,设备上下线是比较重要的,比如设备离线了,那么所有的监测就毫无意义。
4.1 接口实现
@Slf4j
@Component
@RequiredArgsConstructor
public class MqttConnectStatusListener implements IMqttConnectStatusListener {@Overridepublic void online(ChannelContext context, String clientId, String username) {log.info("设备【{}】上线", clientId);}@Overridepublic void offline(ChannelContext context, String clientId, String username, String reason) {log.warn("设备【{}】离线", clientId);}
}
官网上使用的
@EventListener
的方式来实现,我不是很习惯,就用的实现接口,大家喜欢的可以用一下
@Service
public class MqttConnectStatusListener {private static final Logger logger = LoggerFactory.getLogger(MqttConnectStatusListener.class);@EventListenerpublic void online(MqttClientOnlineEvent event) {logger.info("MqttClientOnlineEvent:{}", event);}@EventListenerpublic void offline(MqttClientOfflineEvent event) {logger.info("MqttClientOfflineEvent:{}", event);}}
4.2 上线测试
4.3 离线测试
五、消息接收
实现IMqttMessageListener 接口来完成此功能。大家可以理解给微信好友发消息。
5.1 接口实现
@Slf4j
@Component
@RequiredArgsConstructor
public class MqttServerMessageListener implements IMqttMessageListener {@Overridepublic void onMessage(ChannelContext context, String clientId, String topic, MqttQoS qoS, MqttPublishMessage message) {String payload = ByteBufferUtil.toString(message.getPayload(), StandardCharsets.UTF_8);log.info("Topic:【{}】,收到客户端:【{}】消息:【{}】", topic, clientId, payload);}
}
5.2 消息测试
六、消息发送
使用MqttServerTemplate服务,实现服务端向客户端发送消息
6.1 代码实现
topic是必须的,如果你还是不了解topic是什么,建议先学习下。
@RestController
@RequiredArgsConstructor
public class TestPublishController {private final MqttServerTemplate mqttServerTemplate;@GetMapping("/test/publish")public void testPublish() {// 向所有订阅topic的客户端发送消息mqttServerTemplate.publishAll("/mqtt/testpublish", "mqtt server publishAll message".getBytes());// 向指定客户端发送消息mqttServerTemplate.publish("mqttx_f1e5c583", "/mqtt/testpublish", "mqtt server publish message".getBytes());}
}
6.2 客户端订阅topic
6.3 消息发布
浏览器调用
6.1
的接口
至此,MQTT服务端的基本操作已经完成!