mqtt.py 7.0 KB
Newer Older


# -*- coding: utf-8 -*-
# cython: language_level=3
import json
import time
import paho.mqtt.client as mqtt
import logging

# 自定义过滤器类
class CustomFilter(logging.Filter):
    def filter(self, record):
        # 屏蔽特定的调试日志内容
        if record.levelno == logging.DEBUG:
            message = record.getMessage()
            if "发送->主题" in message and "发送->数据" in message:
                data = message.split("发送->数据: ")[1]
                try:
                    data_dict = json.loads(data)
                    if data_dict.get("action") == "test" and data_dict.get("data") == "Hello, MQTT!":
                        return False
                except json.JSONDecodeError:
                    pass
            elif "Published message with message id" in message:
                return False
        return True

# 配置日志记录器
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
logger.addFilter(CustomFilter())

class MQTTClient:
    def __init__(self, config_file):
        self.config = self.load_config(config_file)
        self.client = mqtt.Client(client_id=self.config['client_id'], protocol=mqtt.MQTTv311)
        self.client.username_pw_set(self.config['username'], self.config['password'])
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        self.client.on_message = self.on_message
        self.client.on_publish = self.on_publish
        self.client.on_subscribe = self.on_subscribe

    def load_config(self, file_path):
        """从指定路径加载配置文件"""
        with open(file_path, 'r') as file:
            return json.load(file)

    def on_connect(self, client, userdata, flags, rc):
        """连接回调函数"""
        if rc == 0:
            logging.info("连接MQTT成功.")
            # 订阅某个主题(可选)
            client_udid = '3d53149e-408f-56c6-99ef-29fae67f8efc'
            topic_subscribe = f"/maintain/room/master/{client_udid}/"
            self.client.subscribe(topic_subscribe, qos=1)  # 尝试使用 QoS 1
            logging.info(f"已订阅主题: {topic_subscribe}")
        else:
            logging.error(f"连接MQTT失败,返回代码为: {rc}.")
            self.reconnect()

    def on_disconnect(self, client, userdata, rc):
        """断开连接回调函数"""
        if rc != 0:
            logging.error(f"异常断开,返回代码为: {rc}.")
            self.reconnect()

    def on_message(self, client, userdata, msg):
        """接收消息回调函数"""
        logging.debug(f"接收<-主题:'{msg.topic}'")
        try:
            payload = msg.payload.decode()
            logging.debug(f"接收<-数据:{payload}")
        except UnicodeDecodeError:
            logging.error("无法解码消息内容,可能是编码问题")

    def on_publish(self, client, userdata, mid):
        """发送消息回调函数"""
        logging.debug(f"Published message with message id {mid}")

    def on_subscribe(self, client, userdata, mid, granted_qos):
        """订阅成功回调函数"""
        logging.info(f"订阅成功,mid: {mid}, granted_qos: {granted_qos}")
        self.perform_post_subscription_tasks(client)

    def perform_post_subscription_tasks(self, client):
        """订阅成功后执行的任务"""
        logging.info("执行订阅成功后的任务...")
        # 注释掉或删除以下行以防止发送特定消息
        # topic_publish = "/maintain/room/master/client/"
        # message = {"action": "test", "data": "Hello, MQTT!"}
        # self.publish_message(client, topic_publish, message)

    def publish_message(self, client, topic, message, qos=0):
        """发布消息到指定主题"""
        logging.debug(f"发送->主题:'{topic}',\n发送->数据: {json.dumps(message)}")
        result = client.publish(topic, json.dumps(message), qos)
        return result[0] == 0  # 返回是否成功

    def open_conference_room(self, topic, client_udid):
        """开启会议室电源"""
        open_message = {
            "action": "_updatemaster",
            "client_udid": client_udid,
            "data": [
                {
                    "item": "environmental",
                    "pm25": 20,
                    "co2": 2,
                    "temp": 20,
                    "tvoc": 20,
                    "humi": 20,
                    "hcho": 20
                },
                {
                    "item": "conference",
                    "power": 1,
                    "exist": 1,
                    "run": "在线"
                }
            ]
        }
        return self.publish_message(self.client, topic, open_message, qos=0)

    def close_conference_room(self, topic, client_udid):
        """关闭会议室电源"""
        close_message = {
            "action": "_updatemaster",
            "client_udid": client_udid,
            "data": [
                {
                    "item": "environmental",
                    "pm25": 20,
                    "co2": 2,
                    "temp": 20,
                    "tvoc": 20,
                    "humi": 20,
                    "hcho": 20
                },
                {
                    "item": "conference",
                    "power": 0,
                    "exist": 1,
                    "run": "离线"
                }
            ]
        }
        return self.publish_message(self.client, topic, close_message, qos=0)

    def conference_room_online(self, topic, client_udid):
        """会议室在线"""
        online_message = {
            "udid": client_udid,
            "action": "online",
            "value": 1
        }
        return self.publish_message(self.client, topic, online_message, qos=0)

    def conference_room_offline(self, topic, client_udid):
        """会议室离线"""
        offline_message = {
            "udid": client_udid,
            "action": "offline",
            "value": 0
        }
        return self.publish_message(self.client, topic, offline_message, qos=0)

    def start(self):
        """连接服务器并启动网络循环"""
        try:
            self.client.connect(self.config['broker_address'], self.config['port'])
        except Exception as e:
            logging.error(f"连接MQTT失败,错误为: {e}")
        else:
            self.client.loop_start()

    def stop(self):
        """停止网络循环并断开连接"""
        self.client.loop_stop()
        self.client.disconnect()

    def reconnect(self):
        """重连机制"""
        logging.info("尝试重新连接MQTT服务器...")
        time.sleep(5)  # 等待5秒后重试
        self.start()

# # 示例使用
# if __name__ == '__main__':
#     mqtt_client = MQTTClient('config.json')
#     mqtt_client.start()
#
#     try:
#         while True:
#             logging.debug("正在运行...")
#             time.sleep(600)  # 保持脚本运行
#     except KeyboardInterrupt:
#         logging.info("已手动中断MQTT连接进程!")
#     finally:
#         mqtt_client.stop()