提交 ef4f2f62 authored 作者: 陈泽健's avatar 陈泽健

2024-10-22

   - 补充安卓信息模块的Mqtt主题上报以及接收脚本,但目前安卓信息上报后系统界面仍显示为离线。需进一步了解业务流后进行调试。后续完整多个安卓信息上报实现大量设备同时在线的功能验证。
   - 将安卓信息上报的MQTT相关函数封装到base目录下,方便后续调用以及维护管理。
   - 将MQTT上报消息通过csv进行读取,方便后续维护。
   - 更新README文档记录代码更新情况。
上级 405063f1
......@@ -4,7 +4,13 @@
<option name="autoReloadType" value="SELECTIVE" />
</component>
<component name="ChangeListManager">
<list default="true" id="1b298f03-b3a2-4f3f-9fa2-7f833ec35924" name="更改" comment="" />
<list default="true" id="1b298f03-b3a2-4f3f-9fa2-7f833ec35924" name="更改" comment="">
<change afterPath="$PROJECT_DIR$/预定系统/测试数据/MQTT上报数据.csv" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/预定系统/README.md" beforeDir="false" afterPath="$PROJECT_DIR$/预定系统/README.md" afterDir="false" />
<change beforePath="$PROJECT_DIR$/预定系统/base/Mqtt.py" beforeDir="false" afterPath="$PROJECT_DIR$/预定系统/base/Mqtt_Android.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/预定系统/安卓信息模块/安卓信息上报.py" beforeDir="false" afterPath="$PROJECT_DIR$/预定系统/安卓信息模块/安卓信息上报.py" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
......@@ -28,21 +34,22 @@
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent">{
&quot;keyToString&quot;: {
&quot;Python 测试.Python 测试 (LoginBy_ActAndPwd_PyTest.py 内).executor&quot;: &quot;Run&quot;,
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
&quot;git-widget-placeholder&quot;: &quot;develop&quot;,
&quot;last_opened_file_path&quot;: &quot;D:/GithubData/ubains-module-test/ubains-module-test&quot;,
&quot;node.js.detected.package.eslint&quot;: &quot;true&quot;,
&quot;node.js.detected.package.tslint&quot;: &quot;true&quot;,
&quot;node.js.selected.package.eslint&quot;: &quot;(autodetect)&quot;,
&quot;node.js.selected.package.tslint&quot;: &quot;(autodetect)&quot;,
&quot;nodejs_package_manager_path&quot;: &quot;npm&quot;,
&quot;settings.editor.selected.configurable&quot;: &quot;com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable&quot;,
&quot;vue.rearranger.settings.migration&quot;: &quot;true&quot;
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"Python 测试.Python 测试 (LoginBy_ActAndPwd_PyTest.py 内).executor": "Run",
"Python.安卓信息上报.executor": "Run",
"RunOnceActivity.ShowReadmeOnStart": "true",
"git-widget-placeholder": "develop",
"last_opened_file_path": "D:/GithubData/ubains-module-test/ubains-module-test",
"node.js.detected.package.eslint": "true",
"node.js.detected.package.tslint": "true",
"node.js.selected.package.eslint": "(autodetect)",
"node.js.selected.package.tslint": "(autodetect)",
"nodejs_package_manager_path": "npm",
"settings.editor.selected.configurable": "com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable",
"vue.rearranger.settings.migration": "true"
}
}</component>
}]]></component>
<component name="RecentsManager">
<key name="MoveFile.RECENT_KEYS">
<recent name="D:\GithubData\ubains-module-test\ubains-module-test\预定系统\登录模块" />
......@@ -51,6 +58,36 @@
<recent name="D:\GithubData\ubains-module-test\预定系统" />
</key>
</component>
<component name="RunManager">
<configuration name="安卓信息上报" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="ubains-module-test" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/预定系统/安卓信息模块" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/预定系统/安卓信息模块/安卓信息上报.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<recent_temporary>
<list>
<item itemvalue="Python.安卓信息上报" />
</list>
</recent_temporary>
</component>
<component name="SharedIndexes">
<attachedChunks>
<set>
......@@ -70,7 +107,7 @@
<workItem from="1728379883141" duration="132000" />
<workItem from="1728380017193" duration="171000" />
<workItem from="1729481397357" duration="2465000" />
<workItem from="1729519480013" duration="2624000" />
<workItem from="1729519480013" duration="13278000" />
</task>
<servers />
</component>
......@@ -88,6 +125,6 @@
</breakpoint-manager>
</component>
<component name="com.intellij.coverage.CoverageDataManagerImpl">
<SUITE FILE_PATH="coverage/ubains_module_test$.coverage" NAME=" 覆盖结果" MODIFIED="1729517965711" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="false" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/预定系统/登录模块" />
<SUITE FILE_PATH="coverage/ubains_module_test$.coverage" NAME="安卓信息上报 覆盖结果" MODIFIED="1729604974439" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="false" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/预定系统/安卓信息模块" />
</component>
</project>
\ No newline at end of file
......@@ -4,3 +4,7 @@
- 将测试数据csv为文件统一放在测试数据目录下,并标注所属模块功能。
- 将驱动加载方式改为更加灵活的自动下载方式,避免其他人员使用时手动下载驱动。
- 补充安卓信息模块的Mqtt主题上报以及接收脚本,但暂时还未与实际mqtt主题进行调试,需要先整理出所有的mqtt主题,再进行代码调试。
2. 2024-10-22
- 补充安卓信息模块的Mqtt主题上报以及接收脚本,但目前安卓信息上报后系统界面仍显示为离线。需进一步了解业务流后进行调试。后续完整多个安卓信息上报实现大量设备同时在线的功能验证。
- 将安卓信息上报的MQTT相关函数封装到base目录下,方便后续调用以及维护管理。
- 将MQTT上报消息通过csv进行读取,方便后续维护。
\ No newline at end of file
# Mqtt.py
# -*- coding: utf-8 -*-
# cython: language_level=3
import json
import paho.mqtt.client as mqtt
class MQTTClient:
def __init__(self, broker_address, port):
"""
MQTT客户端初始化函数
:param broker_address: MQTT代理服务器地址
:param port: MQTT代理服务器端口
"""
# 保存MQTT代理服务器地址和端口作为实例变量
self.broker_address = broker_address
self.port = port
# 创建MQTT客户端实例,使用MQTTv311协议版本
self.client = mqtt.Client(protocol=mqtt.MQTTv311)
# 初始化接收到的消息变量
self.received_message = None
# 设置回调函数
self.client.on_connect = self.on_connect # 当客户端连接到代理时调用
self.client.on_message = self.on_message # 当客户端接收到消息时调用
def on_connect(self, client, userdata, flags, rc):
"""
当客户端连接到MQTT代理时调用的回调函数。
参数:
- client: 客户端实例,用于操作MQTT代理。
- userdata: 在客户端初始化时定义的任意类型的数据。
- flags: 连接应答的标志位。
- rc: 连接结果的返回码,用于判断连接是否成功。
此函数没有返回值。
功能描述:
- 如果连接成功(rc == 0),则打印成功连接的消息。
- 如果连接失败,则打印失败的消息,并显示返回码。
"""
if rc == 0:
print("Connected to MQTT Broker!")
else:
print(f"Failed to connect, return code {rc}")
def on_message(self, client, userdata, msg):
"""
当接收到消息时调用的回调函数。
参数:
- client: MQTT客户端实例。
- userdata: 用户定义的数据,未使用。
- msg: 消息数据,包含主题(topic)和负载(payload)。
此函数将解码消息负载,并将其存储为实例属性`received_message`,然后打印出接收到的消息和其对应的主题。
"""
# 解码消息负载并存储为实例属性
self.received_message = msg.payload.decode()
# 打印接收到的消息和其对应的主题
print(f"Received message: {self.received_message} on topic {msg.topic}")
def connect(self):
"""
Establishes a connection to the MQTT broker.
This method calls the connect method of the self.client instance to establish a network connection to the MQTT broker,
using the broker address and port number that were set during class initialization. After the connection is established,
it starts the network loop using loop_start to keep the connection alive and handle MQTT protocol communications.
"""
self.client.connect(self.broker_address, self.port)
self.client.loop_start()
def disconnect(self):
"""
从服务器断开连接。
该方法首先停止客户端的循环,然后断开与服务器的连接。
"""
# 停止客户端的循环
self.client.loop_stop()
# 断开与服务器的连接
self.client.disconnect()
def publish(self, topic, message, qos=0):
"""
发布消息到指定主题。
参数:
- topic: 消息发布的目标主题。
- message: 要发布的消息内容,将被转换为JSON格式。
- qos: 消息的服务质量等级,默认为0。
此方法将消息内容序列化为JSON格式,并通过客户端发布到指定的主题上。
"""
# 将消息内容转换为JSON格式并发布到指定主题
self.client.publish(topic, json.dumps(message), qos=qos)
def subscribe(self, topic, qos=0):
"""
订阅指定的主题。
参数:
- topic (str): 要订阅的主题名称。
- qos (int): 消息的质量服务等级,可选参数,默认为0。
此方法调用client实例的subscribe方法来执行MQTT协议的订阅操作。
"""
self.client.subscribe(topic, qos=qos)
def wait_for_message(self, timeout=5):
"""
等待接收消息,直到达到超时时间。
本函数会持续检查是否接收到消息,直到消息到达或超出指定的超时时间。
这种等待机制适用于需要同步处理消息的场景,通过轮询方式检查消息状态。
参数:
timeout (int): 超时时间(秒),默认为5秒。表示在没有接收到消息的情况下,函数将等待的最大时间。
返回:
接收到的消息内容,如果没有消息在指定时间内到达,则返回None。
"""
# 导入time模块用于计算时间差和延时
import time
# 记录开始等待的时间
start_time = time.time()
# 当没有接收到消息且未超出超时时间时,持续等待
while self.received_message is None and time.time() - start_time < timeout:
# 短暂暂停,避免过高CPU占用
time.sleep(0.1)
# 返回接收到的消息,如果没有消息则返回None
return self.received_message
# Mqtt.py
# -*- coding: utf-8 -*-
# cython: language_level=3
import json
import logging
import csv
from time import sleep
from datetime import datetime
import paho.mqtt.client as mqtt
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Mqtt:
def __init__(self, broker_address, port):
"""
初始化 MQTT 客户端
:param broker_address: MQTT 代理地址
:param port: MQTT 代理端口
"""
self.broker_address = broker_address
self.port = port
self.client = None
def connect(self):
"""
连接到 MQTT 服务器
"""
self.client = mqtt.Client()
self.client.on_connect = self.on_connect # 设置连接回调
self.client.on_message = self.on_message # 设置消息回调
self.client.connect(self.broker_address, self.port) # 连接到代理
self.client.loop_start() # 启动网络循环
logging.info("已连接到MQTT 服务器")
def disconnect(self):
"""
断开与 MQTT 服务器的连接
"""
if self.client:
self.client.loop_stop() # 停止网络循环
self.client.disconnect() # 断开连接
logging.info("已断开与MQTT 服务器的连接")
def on_connect(self, client, userdata, flags, rc):
"""
连接成功或失败的回调函数
:param client: 客户端实例
:param userdata: 用户数据
:param flags: 连接标志
:param rc: 返回码
"""
if rc == 0:
logging.info("连接成功")
else:
logging.error(f"连接失败,返回码: {rc}")
def on_message(self, client, userdata, msg):
"""
接收到消息的回调函数
:param client: 客户端实例
:param userdata: 用户数据
:param msg: 消息对象
"""
logging.info(f"收到消息: {msg.topic} {msg.payload.decode()}")
self.received_message = msg.payload.decode() # 存储接收到的消息
def subscribe(self, topic):
"""
订阅指定的主题
:param topic: 主题名称
"""
if self.client:
self.client.subscribe(topic)
logging.info(f"已订阅主题: {topic}")
def publish(self, topic, message):
"""
发布消息到指定的主题
:param topic: 主题名称
:param message: 消息内容
"""
if self.client:
# 将消息转换为JSON字符串
if isinstance(message, dict):
message = json.dumps(message)
self.client.publish(topic, message)
logging.info(f"已发布消息到主题: {topic}")
def wait_for_message(self, topic, timeout=5):
"""
等待指定主题的消息
:param topic: 主题名称
:param timeout: 超时时间(秒)
:return: 接收到的消息或 None
"""
if self.client:
start_time = datetime.now()
while (datetime.now() - start_time).total_seconds() < timeout:
if hasattr(self, 'received_message'):
return self.received_message
sleep(0.1)
return None
return None
@staticmethod
def read_config_from_csv(file_path):
"""
从 CSV 文件读取配置
:param file_path: CSV 文件路径
:return: 配置列表
"""
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
return list(reader)
@staticmethod
def build_message(config, current_time):
"""
构建消息内容
:param config: 配置字典
:param current_time: 当前时间
:return: 消息字典
"""
return {
"method": "/system/readSystemInfo",
"clientId": config['clientId'],
"result": json.dumps({
"result": {
"buildInfo": {
"appToken": config['appToken'],
"companyNumber": config['companyNumber'],
"cnum": config['cnum'],
"conferenceName": "测试会议室",
"conferenceId": int(config['conferenceId']),
"defaultQrCodeUrl": "http://192.168.5.218:8888/group1/M00/00/21/wKgFyGNBWZmADtnNAAAwrxR0X8s016.png",
"aliasName": "zt",
"serverBaseUrl": "http://192.168.5.218:8996/",
"localBindTime": current_time,
"generalField": "{\"conferencePhone\":\"\",\"chooseTimeType\":1,\"meetingTopicSwitch\":\"1\",\"meetingContentSwitch\":\"1\",\"meetingReverseTypeSwitch\":\"1\",\"seatArrangeSwitch\":\"1\",\"meetingVoteSwitch\":\"1\",\"floorPlanPath\":\"\",\"jumpToPaperless\":2,\"approvalList\":[],\"isLeaderJoin\":false,\"meetingPublishScreenSwitch\":\"1\"}"
},
"faceVersion": "4.2.12021020201.1",
"wgtVersion": "0.0.81",
"deviceModel": "yx_rk3288",
"abiList": ["armeabi-v7a", "armeabi"],
"androidId": "48134e6047a19aaf",
"appName": "UBAINS",
"appPackageName": "com.ubains.local.gviewer",
"appVersion": 78,
"appVersionName": "1.1.78",
"battery": 0,
"bluetoothEnabled": False,
"camerasCount": 1,
"charging": True,
"density": 1,
"freeAndTotalMemory": "1176M/1997M",
"internalAvailableSize": 4306395136,
"internalTotalSize": 4877451264,
"ipAddress": "192.168.5.129",
"macAddress": config['macAddress'],
"networkType": "NETWORK_ETHERNET",
"processCpuUsage": "0.82%",
"resolution": "1280x800",
"romName": "rockchip",
"rooted": True,
"sdkVersionCode": 25,
"sdkVersionName": "7.1.2",
"sysDate": "Tue Oct 22 18:24:52 GMT+08:00 2024",
"sysDateStr": current_time,
"sysElapsedRealtime": "342:26:11",
"sysLanguage": "zh",
"sysSupportedSensors": ["Accelerometer sensor", "Gyroscope sensor", "Game Rotation Vector Sensor",
"Gravity Sensor"],
"authCode": config['authCode'],
"conferenceName": "测试会议室"
}
})
}
def send_and_receive_messages(self, topic, message, num_times=50, timeout=5, interval=0.2):
"""
发送并接收消息
:param topic: 主题名称
:param message: 消息内容
:param num_times: 发送次数
:param timeout: 超时时间(秒)
:param interval: 每次发送之间的间隔时间(秒)
"""
for i in range(num_times):
try:
self.publish(topic, message)
received_message = self.wait_for_message(topic, timeout=timeout)
logging.info(f"正在等待主题: {topic} 的消息,超时时间: {timeout} 秒")
if received_message:
logging.info("消息接收成功!")
else:
logging.warning("超时时间内未接收到消息。")
sleep(interval)
except Exception as e:
logging.error(f"发送或接收消息时发生错误: {e}")
# main.py
# -*- coding: utf-8 -*-
# cython: language_level=3
from 预定系统.base.Mqtt import MQTTClient
from 预定系统.base.Mqtt_Android import Mqtt
from datetime import datetime
import logging
if __name__ == "__main__":
# 读取配置文件
configs = Mqtt.read_config_from_csv('../测试数据/MQTT上报数据.csv')
broker_address = "192.168.5.218"
port = 1883
topic = "/maintain/room/master/client/"
client_udid = '79ac2430-6a98-5509-9e99-65b974eb70a1'
message = {
"action": "_updatemaster",
"client_udid": client_udid,
"data": [
{
"item": "environmental",
"pm25": 0,
"co2": 0,
"temp": 0,
"tvoc": 0,
"humi": 0,
"hcho": 0
},
{
"item": "conference",
"power": 0,
"exist": 1,
"run": "hello world"
}
]
}
# 创建客户端实例
mqtt_client = MQTTClient(broker_address, port)
# 连接到MQTT服务器
mqtt_client.connect()
topic = "rebootResponseTopic"
# 订阅主题
mqtt_client.subscribe(topic)
# 创建MQTT客户端实例
mqtt_client = Mqtt(broker_address, port)
# 发布消息
mqtt_client.publish(topic, message)
try:
# 连接到MQTT服务器
mqtt_client.connect()
# 等待接收消息
received_message = mqtt_client.wait_for_message(timeout=5)
# 订阅主题
mqtt_client.subscribe(topic)
# 检查接收到的消息
if received_message:
print("Message received successfully!")
else:
print("No message received within the timeout period.")
# 遍历每行配置数据
for config in configs:
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = Mqtt.build_message(config, current_time)
mqtt_client.send_and_receive_messages(topic, message)
# 断开连接
mqtt_client.disconnect()
except Exception as e:
logging.error(f"连接或订阅MQTT服务器时发生错误: {e}")
finally:
# 断开连接
mqtt_client.disconnect()
clientId,appToken,companyNumber,cnum,conferenceId,macAddress,authCode
48134e6047a19aaf,AND-8AJ-0025,CN-8AJ-UBAINS,058862a06633b59aa0fe3fe3c27e711b,219,20:59:20:00:28:87,AND-8AJ-0025
48134e6047a19aac,AND-8AJ-0026,CN-8AJ-UBAINS,9arvn9am36cjzlimxm4ojd9xzdzusaup,219,20:59:20:00:28:88,AND-8AJ-0026
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论