import json import logging import csv import os import re import threading import time from time import sleep from datetime import datetime import paho.mqtt.client as mqtt # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class Mqtt: def __init__(self, broker_address, port): """ 初始化 MQTT 客户端 :param broker_address: MQTT 代理地址 :param port: MQTT 代理端口 """ self.lock = None self.broker_address = broker_address self.port = port self.client = None self._received_message_lock = threading.Lock() self.received_message = None self.message_type = None def connect(self): """ 连接到 MQTT 服务器 """ try: self.client = mqtt.Client() self.client.on_connect = self.on_connect # 设置连接回调 self.client.on_message = self.on_message # 设置消息回调 self.client.connect(self.broker_address, self.port) # 连接到代理 self.client.loop_start() # 启动网络循环 except Exception as e: logging.error(f"连接到MQTT服务器时发生错误: {e}") raise def disconnect(self): """ 断开与 MQTT 服务器的连接 """ if self.client: try: self.client.loop_stop() # 停止网络循环 self.client.disconnect() # 断开连接 logging.info("已断开与MQTT 服务器的连接") except Exception as e: logging.error(f"断开与MQTT 服务器的连接时发生错误: {e}") finally: self.client = None # 确保资源被完全释放 else: logging.warning("尝试断开连接时,客户端已不存在") def on_connect(self, client, userdata, flags, rc): """ 连接成功或失败的回调函数 :param client: 客户端实例 :param userdata: 用户数据 :param flags: 连接标志 :param rc: 返回码 """ try: if rc == 0: logging.info("连接成功") else: logging.error(f"连接失败,返回码: {rc}") # 根据不同的返回码采取不同的措施 if rc == 1: logging.error("错误:错误的协议版本") elif rc == 2: logging.error("错误:无效的客户端标识符") elif rc == 3: logging.error("错误:服务器不可用") elif rc == 4: logging.error("错误:错误的用户名或密码") elif rc == 5: logging.error("错误:未授权") else: logging.error("未知错误") except Exception as e: logging.exception(f"在处理连接结果时发生异常: {e}") def on_message(self, client, userdata, msg): """ 接收到消息的回调函数 :param client: 客户端实例 :param userdata: 用户数据 :param msg: 消息对象 """ try: payload = msg.payload.decode('utf-8', errors='replace') # 处理解码错误 logging.info(f"收到消息: {msg.topic} {payload[:50]}...") # 脱敏日志记录 except UnicodeDecodeError as e: logging.error(f"解码错误: {e}") payload = "无法解码的消息" with self._received_message_lock: self.received_message = payload # 线程安全地存储接收到的消息 def subscribe(self, topic): """ 订阅指定的主题 :param topic: 主题名称 """ if self.client is None: logging.error("客户端未初始化,无法订阅主题") raise ValueError("客户端未初始化,无法订阅主题") # 输入验证 if not isinstance(topic, str) or not topic.strip(): logging.error("无效的主题名称") raise ValueError("无效的主题名称") try: self.client.subscribe(topic) logging.info(f"已订阅主题: {topic}") except ConnectionError as ce: logging.error(f"连接错误,无法订阅主题: {topic}, 错误信息: {str(ce)}") raise except TimeoutError as te: logging.error(f"超时错误,无法订阅主题: {topic}, 错误信息: {str(te)}") raise except ValueError as ve: logging.error(f"值错误,无法订阅主题: {topic}, 错误信息: {str(ve)}") raise except Exception as e: logging.error(f"未知错误,无法订阅主题: {topic}, 错误信息: {str(e)}") raise def set_message_type(self, message_type): """ 设置消息类型 此方法用于设置或更改消息类型属性,允许对象根据需要处理不同类型的消息 参数: message_type: 要设置的消息类型,可以是任何数据类型,但通常应该是字符串、整数或枚举类型 返回: 无 """ self.message_type = message_type def publish(self, topic, message): """ 发布消息到指定的主题 :param topic: 主题名称 :param message: 消息内容 """ if self.client: try: # 将消息转换为JSON字符串 if self.message_type == dict and isinstance(message, dict): message = json.dumps(message) elif message is None: message = "" else: message = str(message) except (TypeError, ValueError) as e: logging.error(f"{datetime.now()} - 消息转换失败: {e} - 调用者: {self.__class__.__name__}.publish") raise try: self.client.publish(topic, message) logging.info(f"{datetime.now()} - 已发布消息到主题: {topic} - 调用者: {self.__class__.__name__}.publish") except Exception as e: logging.error(f"{datetime.now()} - 消息发布失败: {e} - 失败的主题: {topic}, 消息: {message} - 调用者: {self.__class__.__name__}.publish") raise def wait_for_message(self, topic, timeout=5): """ 等待指定主题的消息 :param topic: 主题名称 :param timeout: 超时时间(秒) :return: 接收到的消息或 None """ if self.client is None: logging.warning("Client is not initialized") return None if timeout < 0: logging.warning("Timeout cannot be negative") return None start_time = time.monotonic() while (time.monotonic() - start_time) < timeout: try: with self.lock: if self.received_message is not None: return self.received_message except Exception as e: logging.error(f"Error accessing received_message: {e}") try: time.sleep(0.1) except Exception as e: logging.error(f"Error in sleep: {e}") return None @staticmethod def read_config_from_csv(file_path): """ 从 CSV 文件读取配置 :param file_path: CSV 文件路径 :param allowed_directory: 允许访问的目录 :return: 配置列表 """ try: # 验证文件路径 if not os.path.isfile(file_path): raise FileNotFoundError(f"文件 {file_path} 不存在") with open(file_path, mode='r', encoding='utf-8') as file: reader = csv.DictReader(file) return [row for row in reader] except FileNotFoundError as e: print(f"错误: {e}") return [] except PermissionError as e: print(f"错误: {e}") return [] except Exception as e: print(f"未知错误: {e}") return [] @staticmethod def wait_for_message(self, topic, timeout=5): """ 等待指定主题的消息 :param topic: 主题名称 :param timeout: 超时时间(秒) :return: 接收到的消息或 None """ if not isinstance(topic, str) or not re.match(r'^[a-zA-Z0-9_\-]+$', topic): raise ValueError("Invalid topic format") if timeout < 0: return None try: if self.client: start_time = time.time() while (time.time() - start_time) < timeout: if self.has_received_message(): return self.received_message sleep(0.1) return None except AttributeError: return None return None def has_received_message(self): return hasattr(self, 'received_message') @staticmethod def read_config_from_csv(file_path): """ 从 CSV 文件读取配置 :param file_path: CSV 文件路径 :return: 配置列表 """ if not os.path.exists(file_path): raise FileNotFoundError(f"文件 {file_path} 不存在") try: with open(file_path, mode='r', encoding='utf-8') as file: reader = csv.DictReader(file) config_list = [row for row in reader] if not config_list: raise ValueError("CSV 文件内容为空或格式不正确") logging.info(f"成功读取文件 {file_path}") return config_list except Exception as e: logging.error(f"读取文件 {file_path} 时发生错误: {e}") return [] @staticmethod def build_message(config, current_time,topic): """ 构建消息内容 :param config: 配置字典 :param current_time: 当前时间 :return: 消息字典 """ #安卓信息设备上报 if topic == "rebootResponseTopic": return { "method": "/system/readSystemInfo", "clientId": config['clientId'], "result": json.dumps({ "result": { "buildInfo": { "appToken": config['appToken'], "companyNumber": config['companyNumber'], "cnum": config['cnum'], "conferenceName": "测试会议室", "conferenceId": int(config['conferenceId']), "defaultQrCodeUrl": "http://192.168.5.218:8888/group1/M00/00/21/wKgFyGNBWZmADtnNAAAwrxR0X8s016.png", "aliasName": "zt", "serverBaseUrl": "http://192.168.5.218:8996/", "localBindTime": current_time, "generalField": "{\"conferencePhone\":\"\",\"chooseTimeType\":1,\"meetingTopicSwitch\":\"1\",\"meetingContentSwitch\":\"1\",\"meetingReverseTypeSwitch\":\"1\",\"seatArrangeSwitch\":\"1\",\"meetingVoteSwitch\":\"1\",\"floorPlanPath\":\"\",\"jumpToPaperless\":2,\"approvalList\":[],\"isLeaderJoin\":false,\"meetingPublishScreenSwitch\":\"1\"}" }, "faceVersion": "4.2.12021020201.1", "wgtVersion": "0.0.81", "deviceModel": "yx_rk3288", "abiList": ["armeabi-v7a", "armeabi"], "androidId": "48134e6047a19aaf", "appName": "UBAINS", "appPackageName": "com.ubains.local.gviewer", "appVersion": 78, "appVersionName": "1.1.78", "battery": 0, "bluetoothEnabled": False, "camerasCount": 1, "charging": True, "density": 1, "freeAndTotalMemory": "1176M/1997M", "internalAvailableSize": 4306395136, "internalTotalSize": 4877451264, "ipAddress": "192.168.5.129", "macAddress": config['macAddress'], "networkType": "NETWORK_ETHERNET", "processCpuUsage": "0.82%", "resolution": "1280x800", "romName": "rockchip", "rooted": True, "sdkVersionCode": 25, "sdkVersionName": "7.1.2", "sysDate": "Tue Oct 22 18:24:52 GMT+08:00 2024", "sysDateStr": current_time, "sysElapsedRealtime": "342:26:11", "sysLanguage": "zh", "sysSupportedSensors": ["Accelerometer sensor", "Gyroscope sensor", "Game Rotation Vector Sensor", "Gravity Sensor"], "authCode": config['authCode'], "conferenceName": "测试会议室" } }) } #安卓信息心跳上报 elif topic == "/uams/android/broadcast": return json.dumps({ "type":"heartbeat", "clientId" : config['clientId'], "appId":"com.ubains.uniplatform", "deviceId": config['deviceId'] }) #毫米波雷达数据上报 elif "/properties/upload" in topic or "/event/upload" in topic: return json.dumps({ "properties":{ "client_id" : config['client_id'], "presence_state" : config['presence_state'], "kaiguan" : config['kaiguan'], "julishezhi" : config['julishezhi'], "lingmindushezhi" : config['lingmindushezhi'], "led":1, "wifi_mac" : config['wifi_mac'], "ble_mac" : config['ble_mac'], "last_connection_time": current_time, "current_time":current_time, "device_model" : "c1_100_wifi_u", "fw_version":"0.0.6", "sn" : config['sn'], "ip" : config['ip'] } }) # 北京富创项目的消息体与主题 elif topic == "/meeting/message/sync": return json.dumps({ "action": config['action'], "thirdMessageDTO": [{ "thirdPartyMeetingId": config['thirdPartyMeetingId'], "messageCompere": "张三", "thirdPartyUserId": "jiaojiao", "conferenceName": config['conferenceName'], "thirdPartyRoomId": config['thirdPartyRoomId'], "messageName": config['messageName'], "startTime": config['startTime'], "endTime": config['endTime'], "companyNumber": config['companyNumber'], "participantList": ["JiaoJiao", "JiaYu", "DuiFangZhengZaiZhangTouFa", "DuoTangMaLaBan","czj","czj2"] }] }) def send_and_receive_messages(self, topic: str, message: str, num_times: int = 1, timeout: int = 5, interval: float = 0.2): """ 发送并接收消息 :param topic: 主题名称 :param message: 消息内容 :param num_times: 发送次数,默认为1 :param timeout: 超时时间(秒),默认为5秒 :param interval: 每次发送之间的间隔时间(秒),默认为0.2秒 """ if not isinstance(topic, str) or not isinstance(message, str): raise ValueError("主题和消息必须是字符串类型") for i in range(num_times): try: self.publish(topic, message) received_message = self.wait_for_message(topic, timeout=timeout) if received_message: logging.info("消息接收成功!") else: logging.warning("超时时间内未接收到消息。") sleep(interval) except (ConnectionError, TimeoutError) as e: logging.error(f"网络连接或超时错误: {e}") except ValueError as e: logging.error(f"值错误: {e}") except Exception as e: logging.error(f"未知错误: {e}")