欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > MQTT应用环路验证

MQTT应用环路验证

2025/7/10 23:29:16 来源:https://blog.csdn.net/m0_60313295/article/details/145937092  浏览:    关键词:MQTT应用环路验证

前言

  如今物联网的概念在如今十分普及,而与之伴随的协议就是MQTT,之前给我的感觉就是熟悉又陌生,所以最近就探究,整个MQTT的应用环路,进一步加深其应用层面上的了解。当然,笔者本着体验阿里云的产品试用的同时,来进行验证,可能有什么不是很符合实际的,大家见谅。还有,可以从下述也能看到环路并没有完整的实现,所以写下该文章也是给大家提供下MQTT方向的思路,和记录下我的尝试流程以及思考。

文中用到的各大官网以及软件:阿里云,EMQX Platform、MQTTX工具。

方案

  打红叉的线路都是在下述提到的云产品(其它中间件或产品可能可以实现)中实现不了的,还是我技术水平有限了,之前猜想还以为互通的线路应该没啥问题,但是没想,多数只能单方面的写、上传,线路还是没法达到发生/接收的效果。

1.客户端+MQTT端+服务端+数据库

在这里插入图片描述

2.客户端+EMQX专业版+数据库

主要就是修改成EMQX专业版,可以优化处理端,减少步骤

在这里插入图片描述

方案1

1.客户端

客户端通常是具备网络连接能力并支持MQTT协议通信的物联网设备,例如ESP32、树莓派或部分Arduino开发板等等。不过平时测试时,通比较喜欢先用MQTTX来,模拟设备端收发数据来调试。

在这里插入图片描述

2.MQTT端

MQTT端本来之前都是用的阿里云的物联网平台的公共实例的,但是之前不知道为什么不见了,不知道是不是被什么操作给卡掉了,还是怎么回事,还有那上面的云产品流转,也就是得到MQTT消息的处理方法,还挺难用的,数据目的大部分也只能是自家的云产品,当然这公共实例是免费体验,也不能要求太多。

在这里插入图片描述

不过也正好体验下EMQX Cloud的Serverless服务,毕竟这个更加专注于MQTT服务,而且Serverless版本每月都免费额度,设定好消费额度后,可以每个月都只用免费额度的量,还是非常适合用来做测试验证之类的。

在这里插入图片描述

在这里插入图片描述

Serverless版本也有着一定的限制,就是较为关键的功能,数据集成方面给出的功能少,按文档来看其它版本才有更加多的集成,如数据库之类的,Serverless版本则没有,所以说该方案才考虑用阿里云的函数计算,搭建服务端,通过HTTP服务把消息发过去处理。

在这里插入图片描述

还有就是,每个连接进MQTT的客户端,都要先在控制台这里进行客户端认证,设置用户名和密码后,客户端才能拿着该信息登录MQTT服务器。这里我定义了,两个用户,一个是用来给设备客户端做数据上报,一个给服务端做数据返回。

在这里插入图片描述

3.服务端

也在有服务端处理需求的时候,找阿里云产品,才开始使用,看了视频介绍,也符好,可以将HTTP服务的请求进行处理,只用关注于服务端的处理程序就行了,至于服务器的购买,底层软件的安装都不用,直接按照,配置好的文件框架来。我配置的函数则是,Web函数下Debian10的python,则是基于Flask框架的。

在这里插入图片描述

在这里插入图片描述

下面也附上,我在函数计算中的调试代码,不过MQTT的服务是有问题的。数据库读写的功能是没有问题,目前使用的是flask_mqtt包,调试的时候一直再说SSL的验证有问题,也提供ca.crt证书了。在之前使用过paho.mqtt包,能连接上,通过.publish也能发送信息到主题,但是代码写的是发一条,结果是一直发,查不出问题就放弃了。

from flask import Flask
from flask import request
import pymysql
from flask_mqtt import Mqtt
from flask import jsonify
import ssl
import osREQUEST_ID_HEADER = 'x-fc-request-id'app = Flask(__name__)# 获取当前工作目录
current_dir = os.path.dirname(os.path.abspath(__file__))
ca_certs_path = os.path.join(current_dir, 'emqxsl-ca.crt')# MQTT配置
app.config['MQTT_BROKER_URL'] = 'MQTT服务器'
app.config['MQTT_BROKER_PORT'] = 8883
app.config['MQTT_USERNAME'] = '用户名' # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_PASSWORD'] = '密码'      # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_KEEPALIVE'] = 5                # 设置心跳时间,单位为秒
app.config['MQTT_TLS_ENABLED'] = True           # 如果你的服务器支持 TLS,请设置为 True
app.config['MQTT_TLS_INSECURE'] = True  # 仅用于调试,生产环境中不推荐
app.config['MQTT_TLS_CA_CERTS'] = ca_certs_path
app.config['MQTT_TLS_SSL_VERSION'] = ssl.PROTOCOL_TLSv1_2
app.config['MQTT_TLS_CERT_REQS'] = ssl.CERT_REQUIREDmqtt_client = Mqtt(app)# mqtt连接成功回调
@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):if rc == 0:print('Connected successfully')mqtt_client.subscribe(topic) # 订阅主题else:print('Bad connection. Code:', rc)# 接收消息
@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):data = dict(topic=message.topic,payload=message.payload.decode())print('Received message on topic: {topic} with payload: {payload}'.format(**data))# 发布消息
@app.route('/get', methods=['POST'])
def publish_message():request_data = request.get_json()publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])return jsonify({'code': publish_result[0]})# 修改数据库
def update_database(temp, humi):# 数据库操作# 打开数据库连接my_db = pymysql.connect(host='数据库地址',user='用户',password='密码',database='表名')# 使用cursor()方法获取操作游标 cursor = my_db.cursor()# SQL 更新语句sql = "UPDATE `my_db`.`mqtt` SET `temp`=%s,`humi`=%s WHERE `id`=1"try:# 执行SQL语句cursor.execute(sql, (temp, humi))# 提交到数据库执行my_db.commit()except Exception as e:# 发生错误时回滚my_db.rollback()print(f"Error occurred: {e}")finally:if my_db:# 关闭数据库连接my_db.close()@app.route('/', defaults={'path': ''})
@app.route('/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def hello_world(path):rid = request.headers.get(REQUEST_ID_HEADER)print("FC Invoke Start RequestId: " + rid)print("Path: " + path)# MQTT端数据解析json_data = request.get_json(force=True)if json_data:print("Json Data: " + str(json_data))# 操作方法 send or getmethod = str(json_data['method'])print("Data: " + method)# 如果是发送数据,则更新数据库if method == 'send':# 提取 temp 和 humi 的值temp_value = json_data['data'].get('temp', 0)  # 默认值为 0humi_value = json_data['data'].get('humi', 0)  # 默认值为 0update_database(temp_value, humi_value)elif method == 'get':# mqtt_client.publish(topic, f"Hi MQTTX!")passprint("FC Invoke End RequestId: " + rid)return "OK!"if __name__ == '__main__':app.run(host='0.0.0.0',port=9000)

4.数据持久化

数据库选用的阿里云的MySQL,启动实例,创建账号,开通外网访问权限以获取地址,复制到服务端即可。

在这里插入图片描述

方案2

该方案主要步骤只是把EMQX的Serverless版本,改为专业版,因为数据集成自带数据库操作,就可以去掉上个方案中的服务端处理,其它的大体不变。版本介绍 | EMQX Platform
当然这个网址,也写有了各版本之间开放什么功能。

在这里插入图片描述

在这里插入图片描述

问题

问题1:MQTT端的HTTP服务,无法访问服务端

EMQX Cloud的Serverless版本,在数据集成设定调用HTTP服务输出动作到服务端(函数计算),一直都失败,而用Hoppscotch工具,给定相同的地址,和请求头却又可以。然后我通过Beyond Compare 4(文本比较工具),比较了,这成功案例和失败案例的日志,发现原来的HTTP服务的默认端口80,与服务端提供服务的9000端口不一致,造成的,后来在输出动作的url上加:9000就行了。

在这里插入图片描述

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词