1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
import json
import logging
import csv
import os
import re
import threading
import time
from time import sleep
from datetime import datetime
import paho.mqtt.client as mqtt
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Mqtt:
def __init__(self, broker_address, port):
"""
初始化 MQTT 客户端
:param broker_address: MQTT 代理地址
:param port: MQTT 代理端口
"""
self.lock = None
self.broker_address = broker_address
self.port = port
self.client = None
self._received_message_lock = threading.Lock()
self.received_message = None
self.message_type = None
def connect(self):
"""
连接到 MQTT 服务器
"""
try:
self.client = mqtt.Client()
self.client.on_connect = self.on_connect # 设置连接回调
self.client.on_message = self.on_message # 设置消息回调
self.client.connect(self.broker_address, self.port) # 连接到代理
self.client.loop_start() # 启动网络循环
except Exception as e:
logging.error(f"连接到MQTT服务器时发生错误: {e}")
raise
def disconnect(self):
"""
断开与 MQTT 服务器的连接
"""
if self.client:
try:
self.client.loop_stop() # 停止网络循环
self.client.disconnect() # 断开连接
logging.info("已断开与MQTT 服务器的连接")
except Exception as e:
logging.error(f"断开与MQTT 服务器的连接时发生错误: {e}")
finally:
self.client = None # 确保资源被完全释放
else:
logging.warning("尝试断开连接时,客户端已不存在")
def on_connect(self, client, userdata, flags, rc):
"""
连接成功或失败的回调函数
:param client: 客户端实例
:param userdata: 用户数据
:param flags: 连接标志
:param rc: 返回码
"""
try:
if rc == 0:
logging.info("连接成功")
else:
logging.error(f"连接失败,返回码: {rc}")
# 根据不同的返回码采取不同的措施
if rc == 1:
logging.error("错误:错误的协议版本")
elif rc == 2:
logging.error("错误:无效的客户端标识符")
elif rc == 3:
logging.error("错误:服务器不可用")
elif rc == 4:
logging.error("错误:错误的用户名或密码")
elif rc == 5:
logging.error("错误:未授权")
else:
logging.error("未知错误")
except Exception as e:
logging.exception(f"在处理连接结果时发生异常: {e}")
def on_message(self, client, userdata, msg):
"""
接收到消息的回调函数
:param client: 客户端实例
:param userdata: 用户数据
:param msg: 消息对象
"""
try:
payload = msg.payload.decode('utf-8', errors='replace') # 处理解码错误
logging.info(f"收到消息: {msg.topic} {payload[:50]}...") # 脱敏日志记录
except UnicodeDecodeError as e:
logging.error(f"解码错误: {e}")
payload = "无法解码的消息"
with self._received_message_lock:
self.received_message = payload # 线程安全地存储接收到的消息
def subscribe(self, topic):
"""
订阅指定的主题
:param topic: 主题名称
"""
if self.client is None:
logging.error("客户端未初始化,无法订阅主题")
raise ValueError("客户端未初始化,无法订阅主题")
# 输入验证
if not isinstance(topic, str) or not topic.strip():
logging.error("无效的主题名称")
raise ValueError("无效的主题名称")
try:
self.client.subscribe(topic)
logging.info(f"已订阅主题: {topic}")
except ConnectionError as ce:
logging.error(f"连接错误,无法订阅主题: {topic}, 错误信息: {str(ce)}")
raise
except TimeoutError as te:
logging.error(f"超时错误,无法订阅主题: {topic}, 错误信息: {str(te)}")
raise
except ValueError as ve:
logging.error(f"值错误,无法订阅主题: {topic}, 错误信息: {str(ve)}")
raise
except Exception as e:
logging.error(f"未知错误,无法订阅主题: {topic}, 错误信息: {str(e)}")
raise
def set_message_type(self, message_type):
"""
设置消息类型
此方法用于设置或更改消息类型属性,允许对象根据需要处理不同类型的消息
参数:
message_type: 要设置的消息类型,可以是任何数据类型,但通常应该是字符串、整数或枚举类型
返回:
无
"""
self.message_type = message_type
def publish(self, topic, message):
"""
发布消息到指定的主题
:param topic: 主题名称
:param message: 消息内容
"""
if self.client:
try:
# 将消息转换为JSON字符串
if self.message_type == dict and isinstance(message, dict):
message = json.dumps(message)
elif message is None:
message = ""
else:
message = str(message)
except (TypeError, ValueError) as e:
logging.error(f"{datetime.now()} - 消息转换失败: {e} - 调用者: {self.__class__.__name__}.publish")
raise
try:
self.client.publish(topic, message)
logging.info(f"{datetime.now()} - 已发布消息到主题: {topic} - 调用者: {self.__class__.__name__}.publish")
except Exception as e:
logging.error(f"{datetime.now()} - 消息发布失败: {e} - 失败的主题: {topic}, 消息: {message} - 调用者: {self.__class__.__name__}.publish")
raise
def wait_for_message(self, topic, timeout=5):
"""
等待指定主题的消息
:param topic: 主题名称
:param timeout: 超时时间(秒)
:return: 接收到的消息或 None
"""
if self.client is None:
logging.warning("Client is not initialized")
return None
if timeout < 0:
logging.warning("Timeout cannot be negative")
return None
start_time = time.monotonic()
while (time.monotonic() - start_time) < timeout:
try:
with self.lock:
if self.received_message is not None:
return self.received_message
except Exception as e:
logging.error(f"Error accessing received_message: {e}")
try:
time.sleep(0.1)
except Exception as e:
logging.error(f"Error in sleep: {e}")
return None
@staticmethod
def read_config_from_csv(file_path):
"""
从 CSV 文件读取配置
:param file_path: CSV 文件路径
:param allowed_directory: 允许访问的目录
:return: 配置列表
"""
try:
# 验证文件路径
if not os.path.isfile(file_path):
raise FileNotFoundError(f"文件 {file_path} 不存在")
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
return [row for row in reader]
except FileNotFoundError as e:
print(f"错误: {e}")
return []
except PermissionError as e:
print(f"错误: {e}")
return []
except Exception as e:
print(f"未知错误: {e}")
return []
@staticmethod
def wait_for_message(self, topic, timeout=5):
"""
等待指定主题的消息
:param topic: 主题名称
:param timeout: 超时时间(秒)
:return: 接收到的消息或 None
"""
if not isinstance(topic, str) or not re.match(r'^[a-zA-Z0-9_\-]+$', topic):
raise ValueError("Invalid topic format")
if timeout < 0:
return None
try:
if self.client:
start_time = time.time()
while (time.time() - start_time) < timeout:
if self.has_received_message():
return self.received_message
sleep(0.1)
return None
except AttributeError:
return None
return None
def has_received_message(self):
return hasattr(self, 'received_message')
@staticmethod
def read_config_from_csv(file_path):
"""
从 CSV 文件读取配置
:param file_path: CSV 文件路径
:return: 配置列表
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件 {file_path} 不存在")
try:
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
config_list = [row for row in reader]
if not config_list:
raise ValueError("CSV 文件内容为空或格式不正确")
logging.info(f"成功读取文件 {file_path}")
return config_list
except Exception as e:
logging.error(f"读取文件 {file_path} 时发生错误: {e}")
return []
@staticmethod
def build_message(config, current_time,topic):
"""
构建消息内容
:param config: 配置字典
:param current_time: 当前时间
:return: 消息字典
"""
#安卓信息设备上报
if topic == "rebootResponseTopic":
return {
"method": "/system/readSystemInfo",
"clientId": config['clientId'],
"result": json.dumps({
"result": {
"buildInfo": {
"appToken": config['appToken'],
"companyNumber": config['companyNumber'],
"cnum": config['cnum'],
"conferenceName": "测试会议室",
"conferenceId": int(config['conferenceId']),
"defaultQrCodeUrl": "http://192.168.5.218:8888/group1/M00/00/21/wKgFyGNBWZmADtnNAAAwrxR0X8s016.png",
"aliasName": "zt",
"serverBaseUrl": "http://192.168.5.218:8996/",
"localBindTime": current_time,
"generalField": "{\"conferencePhone\":\"\",\"chooseTimeType\":1,\"meetingTopicSwitch\":\"1\",\"meetingContentSwitch\":\"1\",\"meetingReverseTypeSwitch\":\"1\",\"seatArrangeSwitch\":\"1\",\"meetingVoteSwitch\":\"1\",\"floorPlanPath\":\"\",\"jumpToPaperless\":2,\"approvalList\":[],\"isLeaderJoin\":false,\"meetingPublishScreenSwitch\":\"1\"}"
},
"faceVersion": "4.2.12021020201.1",
"wgtVersion": "0.0.81",
"deviceModel": "yx_rk3288",
"abiList": ["armeabi-v7a", "armeabi"],
"androidId": "48134e6047a19aaf",
"appName": "UBAINS",
"appPackageName": "com.ubains.local.gviewer",
"appVersion": 78,
"appVersionName": "1.1.78",
"battery": 0,
"bluetoothEnabled": False,
"camerasCount": 1,
"charging": True,
"density": 1,
"freeAndTotalMemory": "1176M/1997M",
"internalAvailableSize": 4306395136,
"internalTotalSize": 4877451264,
"ipAddress": "192.168.5.129",
"macAddress": config['macAddress'],
"networkType": "NETWORK_ETHERNET",
"processCpuUsage": "0.82%",
"resolution": "1280x800",
"romName": "rockchip",
"rooted": True,
"sdkVersionCode": 25,
"sdkVersionName": "7.1.2",
"sysDate": "Tue Oct 22 18:24:52 GMT+08:00 2024",
"sysDateStr": current_time,
"sysElapsedRealtime": "342:26:11",
"sysLanguage": "zh",
"sysSupportedSensors": ["Accelerometer sensor", "Gyroscope sensor",
"Game Rotation Vector Sensor",
"Gravity Sensor"],
"authCode": config['authCode'],
"conferenceName": "测试会议室"
}
})
}
#安卓信息心跳上报
elif topic == "/uams/android/broadcast":
return json.dumps({
"type":"heartbeat",
"clientId" : config['clientId'],
"appId":"com.ubains.uniplatform",
"deviceId": config['deviceId']
})
#毫米波雷达数据上报
elif "/properties/upload" in topic or "/event/upload" in topic:
return json.dumps({
"properties":{
"client_id" : config['client_id'],
"presence_state" : config['presence_state'],
"kaiguan" : config['kaiguan'],
"julishezhi" : config['julishezhi'],
"lingmindushezhi" : config['lingmindushezhi'],
"led":1,
"wifi_mac" : config['wifi_mac'],
"ble_mac" : config['ble_mac'],
"last_connection_time": current_time,
"current_time":current_time,
"device_model" : "c1_100_wifi_u",
"fw_version":"0.0.6",
"sn" : config['sn'],
"ip" : config['ip']
}
})
# 北京富创项目的消息体与主题
elif topic == "/meeting/message/sync":
return json.dumps({
"action": config['action'],
"thirdMessageDTO": [{
"thirdPartyMeetingId": config['thirdPartyMeetingId'],
"messageCompere": "张三",
"thirdPartyUserId": "jiaojiao",
"conferenceName": config['conferenceName'],
"thirdPartyRoomId": config['thirdPartyRoomId'],
"messageName": config['messageName'],
"startTime": config['startTime'],
"endTime": config['endTime'],
"companyNumber": config['companyNumber'],
"participantList": ["JiaoJiao", "JiaYu", "DuiFangZhengZaiZhangTouFa", "DuoTangMaLaBan"]
}]
})
def send_and_receive_messages(self, topic: str, message: str, num_times: int = 1, timeout: int = 5,
interval: float = 0.2):
"""
发送并接收消息
:param topic: 主题名称
:param message: 消息内容
:param num_times: 发送次数,默认为1
:param timeout: 超时时间(秒),默认为5秒
:param interval: 每次发送之间的间隔时间(秒),默认为0.2秒
"""
if not isinstance(topic, str) or not isinstance(message, str):
raise ValueError("主题和消息必须是字符串类型")
for i in range(num_times):
try:
self.publish(topic, message)
received_message = self.wait_for_message(topic, timeout=timeout)
if received_message:
logging.info("消息接收成功!")
else:
logging.warning("超时时间内未接收到消息。")
sleep(interval)
except (ConnectionError, TimeoutError) as e:
logging.error(f"网络连接或超时错误: {e}")
except ValueError as e:
logging.error(f"值错误: {e}")
except Exception as e:
logging.error(f"未知错误: {e}")