Mqtt_Send.py 16.7 KB

import json
import logging
import csv
import os
import re
import threading
import time
from time import sleep
from datetime import datetime
import paho.mqtt.client as mqtt

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class Mqtt:
    def __init__(self, broker_address, port):
        """
        初始化 MQTT 客户端

        :param broker_address: MQTT 代理地址
        :param port: MQTT 代理端口
        """
        self.lock = None
        self.broker_address = broker_address
        self.port = port
        self.client = None
        self._received_message_lock = threading.Lock()
        self.received_message = None
        self.message_type = None

    def connect(self):
        """
        连接到 MQTT 服务器
        """
        try:
            self.client = mqtt.Client()
            self.client.on_connect = self.on_connect  # 设置连接回调
            self.client.on_message = self.on_message  # 设置消息回调
            self.client.connect(self.broker_address, self.port)  # 连接到代理
            self.client.loop_start()  # 启动网络循环
        except Exception as e:
            logging.error(f"连接到MQTT服务器时发生错误: {e}")
            raise

    def disconnect(self):
        """
        断开与 MQTT 服务器的连接
        """
        if self.client:
            try:
                self.client.loop_stop()  # 停止网络循环
                self.client.disconnect()  # 断开连接
                logging.info("已断开与MQTT 服务器的连接")
            except Exception as e:
                logging.error(f"断开与MQTT 服务器的连接时发生错误: {e}")
            finally:
                self.client = None  # 确保资源被完全释放
        else:
            logging.warning("尝试断开连接时,客户端已不存在")

    def on_connect(self, client, userdata, flags, rc):
        """
        连接成功或失败的回调函数

        :param client: 客户端实例
        :param userdata: 用户数据
        :param flags: 连接标志
        :param rc: 返回码
        """
        try:
            if rc == 0:
                logging.info("连接成功")
            else:
                logging.error(f"连接失败,返回码: {rc}")
                # 根据不同的返回码采取不同的措施
                if rc == 1:
                    logging.error("错误:错误的协议版本")
                elif rc == 2:
                    logging.error("错误:无效的客户端标识符")
                elif rc == 3:
                    logging.error("错误:服务器不可用")
                elif rc == 4:
                    logging.error("错误:错误的用户名或密码")
                elif rc == 5:
                    logging.error("错误:未授权")
                else:
                    logging.error("未知错误")
        except Exception as e:
            logging.exception(f"在处理连接结果时发生异常: {e}")

    def on_message(self, client, userdata, msg):
        """
        接收到消息的回调函数

        :param client: 客户端实例
        :param userdata: 用户数据
        :param msg: 消息对象
        """
        try:
            payload = msg.payload.decode('utf-8', errors='replace')  # 处理解码错误
            logging.info(f"收到消息: {msg.topic} {payload[:50]}...")  # 脱敏日志记录
        except UnicodeDecodeError as e:
            logging.error(f"解码错误: {e}")
            payload = "无法解码的消息"

        with self._received_message_lock:
            self.received_message = payload  # 线程安全地存储接收到的消息

    def subscribe(self, topic):
        """
        订阅指定的主题

        :param topic: 主题名称
        """
        if self.client is None:
            logging.error("客户端未初始化,无法订阅主题")
            raise ValueError("客户端未初始化,无法订阅主题")

        # 输入验证
        if not isinstance(topic, str) or not topic.strip():
            logging.error("无效的主题名称")
            raise ValueError("无效的主题名称")

        try:
            self.client.subscribe(topic)
            logging.info(f"已订阅主题: {topic}")
        except ConnectionError as ce:
            logging.error(f"连接错误,无法订阅主题: {topic}, 错误信息: {str(ce)}")
            raise
        except TimeoutError as te:
            logging.error(f"超时错误,无法订阅主题: {topic}, 错误信息: {str(te)}")
            raise
        except ValueError as ve:
            logging.error(f"值错误,无法订阅主题: {topic}, 错误信息: {str(ve)}")
            raise
        except Exception as e:
            logging.error(f"未知错误,无法订阅主题: {topic}, 错误信息: {str(e)}")
            raise

    def set_message_type(self, message_type):
        """
        设置消息类型
        此方法用于设置或更改消息类型属性,允许对象根据需要处理不同类型的消息
        参数:
        message_type: 要设置的消息类型,可以是任何数据类型,但通常应该是字符串、整数或枚举类型
        返回:

        """
        self.message_type = message_type

    def publish(self, topic, message):
        """
        发布消息到指定的主题

        :param topic: 主题名称
        :param message: 消息内容
        """
        if self.client:
            try:
                # 将消息转换为JSON字符串
                if self.message_type == dict and isinstance(message, dict):
                    message = json.dumps(message)
                elif message is None:
                    message = ""
                else:
                    message = str(message)
            except (TypeError, ValueError) as e:
                logging.error(f"{datetime.now()} - 消息转换失败: {e} - 调用者: {self.__class__.__name__}.publish")
                raise

            try:
                self.client.publish(topic, message)
                logging.info(f"{datetime.now()} - 已发布消息到主题: {topic} - 调用者: {self.__class__.__name__}.publish")
            except Exception as e:
                logging.error(f"{datetime.now()} - 消息发布失败: {e} - 失败的主题: {topic}, 消息: {message} - 调用者: {self.__class__.__name__}.publish")
                raise

    def wait_for_message(self, topic, timeout=5):
        """
        等待指定主题的消息

        :param topic: 主题名称
        :param timeout: 超时时间(秒)
        :return: 接收到的消息或 None
        """
        if self.client is None:
            logging.warning("Client is not initialized")
            return None

        if timeout < 0:
            logging.warning("Timeout cannot be negative")
            return None

        start_time = time.monotonic()
        while (time.monotonic() - start_time) < timeout:
            try:
                with self.lock:
                    if self.received_message is not None:
                        return self.received_message
            except Exception as e:
                logging.error(f"Error accessing received_message: {e}")
            try:
                time.sleep(0.1)
            except Exception as e:
                logging.error(f"Error in sleep: {e}")
        return None

    @staticmethod
    def read_config_from_csv(file_path):
        """
        从 CSV 文件读取配置

        :param file_path: CSV 文件路径
        :param allowed_directory: 允许访问的目录
        :return: 配置列表
        """
        try:
            # 验证文件路径
            if not os.path.isfile(file_path):
                raise FileNotFoundError(f"文件 {file_path} 不存在")

            with open(file_path, mode='r', encoding='utf-8') as file:
                reader = csv.DictReader(file)
                return [row for row in reader]
        except FileNotFoundError as e:
            print(f"错误: {e}")
            return []
        except PermissionError as e:
            print(f"错误: {e}")
            return []
        except Exception as e:
            print(f"未知错误: {e}")
            return []

    @staticmethod
    def wait_for_message(self, topic, timeout=5):
        """
        等待指定主题的消息

        :param topic: 主题名称
        :param timeout: 超时时间(秒)
        :return: 接收到的消息或 None
        """
        if not isinstance(topic, str) or not re.match(r'^[a-zA-Z0-9_\-]+$', topic):
            raise ValueError("Invalid topic format")

        if timeout < 0:
            return None

        try:
            if self.client:
                start_time = time.time()
                while (time.time() - start_time) < timeout:
                    if self.has_received_message():
                        return self.received_message
                    sleep(0.1)
                return None
        except AttributeError:
            return None
        return None

    def has_received_message(self):
        return hasattr(self, 'received_message')

    @staticmethod
    def read_config_from_csv(file_path):
        """
        从 CSV 文件读取配置

        :param file_path: CSV 文件路径
        :return: 配置列表
        """
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件 {file_path} 不存在")

        try:
            with open(file_path, mode='r', encoding='utf-8') as file:
                reader = csv.DictReader(file)
                config_list = [row for row in reader]
                if not config_list:
                    raise ValueError("CSV 文件内容为空或格式不正确")
                logging.info(f"成功读取文件 {file_path}")
                return config_list
        except Exception as e:
            logging.error(f"读取文件 {file_path} 时发生错误: {e}")
            return []

    @staticmethod
    def build_message(config, current_time,topic):
        """
        构建消息内容

        :param config: 配置字典
        :param current_time: 当前时间
        :return: 消息字典
        """
        #安卓信息设备上报
        if topic == "rebootResponseTopic":
            return {
                "method": "/system/readSystemInfo",
                "clientId": config['clientId'],
                "result": json.dumps({
                    "result": {
                        "buildInfo": {
                            "appToken": config['appToken'],
                            "companyNumber": config['companyNumber'],
                            "cnum": config['cnum'],
                            "conferenceName": "测试会议室",
                            "conferenceId": int(config['conferenceId']),
                            "defaultQrCodeUrl": "http://192.168.5.218:8888/group1/M00/00/21/wKgFyGNBWZmADtnNAAAwrxR0X8s016.png",
                            "aliasName": "zt",
                            "serverBaseUrl": "http://192.168.5.218:8996/",
                            "localBindTime": current_time,
                            "generalField": "{\"conferencePhone\":\"\",\"chooseTimeType\":1,\"meetingTopicSwitch\":\"1\",\"meetingContentSwitch\":\"1\",\"meetingReverseTypeSwitch\":\"1\",\"seatArrangeSwitch\":\"1\",\"meetingVoteSwitch\":\"1\",\"floorPlanPath\":\"\",\"jumpToPaperless\":2,\"approvalList\":[],\"isLeaderJoin\":false,\"meetingPublishScreenSwitch\":\"1\"}"
                        },
                        "faceVersion": "4.2.12021020201.1",
                        "wgtVersion": "0.0.81",
                        "deviceModel": "yx_rk3288",
                        "abiList": ["armeabi-v7a", "armeabi"],
                        "androidId": "48134e6047a19aaf",
                        "appName": "UBAINS",
                        "appPackageName": "com.ubains.local.gviewer",
                        "appVersion": 78,
                        "appVersionName": "1.1.78",
                        "battery": 0,
                        "bluetoothEnabled": False,
                        "camerasCount": 1,
                        "charging": True,
                        "density": 1,
                        "freeAndTotalMemory": "1176M/1997M",
                        "internalAvailableSize": 4306395136,
                        "internalTotalSize": 4877451264,
                        "ipAddress": "192.168.5.129",
                        "macAddress": config['macAddress'],
                        "networkType": "NETWORK_ETHERNET",
                        "processCpuUsage": "0.82%",
                        "resolution": "1280x800",
                        "romName": "rockchip",
                        "rooted": True,
                        "sdkVersionCode": 25,
                        "sdkVersionName": "7.1.2",
                        "sysDate": "Tue Oct 22 18:24:52 GMT+08:00 2024",
                        "sysDateStr": current_time,
                        "sysElapsedRealtime": "342:26:11",
                        "sysLanguage": "zh",
                        "sysSupportedSensors": ["Accelerometer sensor", "Gyroscope sensor",
                                                "Game Rotation Vector Sensor",
                                                "Gravity Sensor"],
                        "authCode": config['authCode'],
                        "conferenceName": "测试会议室"
                    }
                })
            }
        #安卓信息心跳上报
        elif topic == "/uams/android/broadcast":
            return json.dumps({
                "type":"heartbeat",
                "clientId" : config['clientId'],
                "appId":"com.ubains.uniplatform",
                "deviceId": config['deviceId']
            })

        #毫米波雷达数据上报
        elif "/properties/upload" in topic or "/event/upload" in topic:
            return json.dumps({
                "properties":{
                    "client_id" : config['client_id'],
                    "presence_state" : config['presence_state'],
                    "kaiguan" : config['kaiguan'],
                    "julishezhi" : config['julishezhi'],
                    "lingmindushezhi" : config['lingmindushezhi'],
                    "led":1,
                    "wifi_mac" : config['wifi_mac'],
                    "ble_mac" : config['ble_mac'],
                    "last_connection_time": current_time,
                    "current_time":current_time,
                    "device_model" : "c1_100_wifi_u",
                    "fw_version":"0.0.6",
                    "sn" : config['sn'],
                    "ip" : config['ip']
                }
            })

        # 北京富创项目的消息体与主题
        elif topic ==  "/meeting/message/sync":
            return json.dumps({
                    "action": config['action'],
                    "thirdMessageDTO": [{
                        "thirdPartyMeetingId": config['thirdPartyMeetingId'],
                        "messageCompere": "张三",
                        "thirdPartyUserId": "jiaojiao",
                        "conferenceName": config['conferenceName'],
                        "thirdPartyRoomId": config['thirdPartyRoomId'],
                        "messageName": config['messageName'],
                        "startTime": config['startTime'],
                        "endTime": config['endTime'],
                        "companyNumber": config['companyNumber'],
                        "participantList": ["JiaoJiao", "JiaYu", "DuiFangZhengZaiZhangTouFa", "DuoTangMaLaBan"]
            }]
    })


    def send_and_receive_messages(self, topic: str, message: str, num_times: int = 1, timeout: int = 5,
                                  interval: float = 0.2):
        """
        发送并接收消息

        :param topic: 主题名称
        :param message: 消息内容
        :param num_times: 发送次数,默认为1
        :param timeout: 超时时间(秒),默认为5秒
        :param interval: 每次发送之间的间隔时间(秒),默认为0.2秒
        """
        if not isinstance(topic, str) or not isinstance(message, str):
            raise ValueError("主题和消息必须是字符串类型")

        for i in range(num_times):
            try:
                self.publish(topic, message)
                received_message = self.wait_for_message(topic, timeout=timeout)

                if received_message:
                    logging.info("消息接收成功!")
                else:
                    logging.warning("超时时间内未接收到消息。")

                sleep(interval)
            except (ConnectionError, TimeoutError) as e:
                logging.error(f"网络连接或超时错误: {e}")
            except ValueError as e:
                logging.error(f"值错误: {e}")
            except Exception as e:
                logging.error(f"未知错误: {e}")