mqtt.py 7.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199

# -*- coding: utf-8 -*-
# cython: language_level=3
import json
import time
import paho.mqtt.client as mqtt
import logging

# 自定义过滤器类
class CustomFilter(logging.Filter):
    def filter(self, record):
        # 屏蔽特定的调试日志内容
        if record.levelno == logging.DEBUG:
            message = record.getMessage()
            if "发送->主题" in message and "发送->数据" in message:
                data = message.split("发送->数据: ")[1]
                try:
                    data_dict = json.loads(data)
                    if data_dict.get("action") == "test" and data_dict.get("data") == "Hello, MQTT!":
                        return False
                except json.JSONDecodeError:
                    pass
            elif "Published message with message id" in message:
                return False
        return True

# 配置日志记录器
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
logger.addFilter(CustomFilter())

class MQTTClient:
    def __init__(self, config_file):
        self.config = self.load_config(config_file)
        self.client = mqtt.Client(client_id=self.config['client_id'], protocol=mqtt.MQTTv311)
        self.client.username_pw_set(self.config['username'], self.config['password'])
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client.on_subscribe = self.on_subscribe

    def load_config(self, file_path):
        """从指定路径加载配置文件"""
        with open(file_path, 'r') as file:
            return json.load(file)

    def on_connect(self, client, userdata, flags, rc):
        """连接回调函数"""
        if rc == 0:
            logging.info("连接MQTT成功.")
            # 订阅某个主题(可选)
            client_udid = '3d53149e-408f-56c6-99ef-29fae67f8efc'
            topic_subscribe = f"/maintain/room/master/{client_udid}/"
            self.client.subscribe(topic_subscribe, qos=1)  # 尝试使用 QoS 1
            logging.info(f"已订阅主题: {topic_subscribe}")
        else:
            logging.error(f"连接MQTT失败,返回代码为: {rc}.")
            self.reconnect()

    def on_disconnect(self, client, userdata, rc):
        """断开连接回调函数"""
        if rc != 0:
            logging.error(f"异常断开,返回代码为: {rc}.")
            self.reconnect()

    def on_message(self, client, userdata, msg):
        """接收消息回调函数"""
        logging.debug(f"接收<-主题:'{msg.topic}'")
        try:
            payload = msg.payload.decode()
            logging.debug(f"接收<-数据:{payload}")
        except UnicodeDecodeError:
            logging.error("无法解码消息内容,可能是编码问题")

    def on_publish(self, client, userdata, mid):
        """发送消息回调函数"""
        logging.debug(f"Published message with message id {mid}")

    def on_subscribe(self, client, userdata, mid, granted_qos):
        """订阅成功回调函数"""
        logging.info(f"订阅成功,mid: {mid}, granted_qos: {granted_qos}")
        self.perform_post_subscription_tasks(client)

    def perform_post_subscription_tasks(self, client):
        """订阅成功后执行的任务"""
        logging.info("执行订阅成功后的任务...")
        # 注释掉或删除以下行以防止发送特定消息
        # topic_publish = "/maintain/room/master/client/"
        # message = {"action": "test", "data": "Hello, MQTT!"}
        # self.publish_message(client, topic_publish, message)

    def publish_message(self, client, topic, message, qos=0):
        """发布消息到指定主题"""
        logging.debug(f"发送->主题:'{topic}',\n发送->数据: {json.dumps(message)}")
        result = client.publish(topic, json.dumps(message), qos)
        return result[0] == 0  # 返回是否成功

    def open_conference_room(self, topic, client_udid):
        """开启会议室电源"""
        open_message = {
            "action": "_updatemaster",
            "client_udid": client_udid,
            "data": [
                {
                    "item": "environmental",
                    "pm25": 20,
                    "co2": 2,
                    "temp": 20,
                    "tvoc": 20,
                    "humi": 20,
                    "hcho": 20
                },
                {
                    "item": "conference",
                    "power": 1,
                    "exist": 1,
                    "run": "在线"
                }
            ]
        }
        return self.publish_message(self.client, topic, open_message, qos=0)

    def close_conference_room(self, topic, client_udid):
        """关闭会议室电源"""
        close_message = {
            "action": "_updatemaster",
            "client_udid": client_udid,
            "data": [
                {
                    "item": "environmental",
                    "pm25": 20,
                    "co2": 2,
                    "temp": 20,
                    "tvoc": 20,
                    "humi": 20,
                    "hcho": 20
                },
                {
                    "item": "conference",
                    "power": 0,
                    "exist": 1,
                    "run": "离线"
                }
            ]
        }
        return self.publish_message(self.client, topic, close_message, qos=0)

    def conference_room_online(self, topic, client_udid):
        """会议室在线"""
        online_message = {
            "udid": client_udid,
            "action": "online",
            "value": 1
        }
        return self.publish_message(self.client, topic, online_message, qos=0)

    def conference_room_offline(self, topic, client_udid):
        """会议室离线"""
        offline_message = {
            "udid": client_udid,
            "action": "offline",
            "value": 0
        }
        return self.publish_message(self.client, topic, offline_message, qos=0)

    def start(self):
        """连接服务器并启动网络循环"""
        try:
            self.client.connect(self.config['broker_address'], self.config['port'])
        except Exception as e:
            logging.error(f"连接MQTT失败,错误为: {e}")
        else:
            self.client.loop_start()

    def stop(self):
        """停止网络循环并断开连接"""
        self.client.loop_stop()
        self.client.disconnect()

    def reconnect(self):
        """重连机制"""
        logging.info("尝试重新连接MQTT服务器...")
        time.sleep(5)  # 等待5秒后重试
        self.start()

# # 示例使用
# if __name__ == '__main__':
#     mqtt_client = MQTTClient('config.json')
#     mqtt_client.start()
#
#     try:
#         while True:
#             logging.debug("正在运行...")
#             time.sleep(600)  # 保持脚本运行
#     except KeyboardInterrupt:
#         logging.info("已手动中断MQTT连接进程!")
#     finally:
#         mqtt_client.stop()