# -*- coding: utf-8 -*- # cython: language_level=3 import json import time import paho.mqtt.client as mqtt import logging # 自定义过滤器类 class CustomFilter(logging.Filter): def filter(self, record): # 屏蔽特定的调试日志内容 if record.levelno == logging.DEBUG: message = record.getMessage() if "发送->主题" in message and "发送->数据" in message: data = message.split("发送->数据: ")[1] try: data_dict = json.loads(data) if data_dict.get("action") == "test" and data_dict.get("data") == "Hello, MQTT!": return False except json.JSONDecodeError: pass elif "Published message with message id" in message: return False return True # 配置日志记录器 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger() logger.addFilter(CustomFilter()) class MQTTClient: def __init__(self, config_file): self.config = self.load_config(config_file) self.client = mqtt.Client(client_id=self.config['client_id'], protocol=mqtt.MQTTv311) self.client.username_pw_set(self.config['username'], self.config['password']) self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.client.on_message = self.on_message self.client.on_publish = self.on_publish self.client.on_subscribe = self.on_subscribe def load_config(self, file_path): """从指定路径加载配置文件""" with open(file_path, 'r') as file: return json.load(file) def on_connect(self, client, userdata, flags, rc): """连接回调函数""" if rc == 0: logging.info("连接MQTT成功.") # 订阅某个主题(可选) client_udid = '3d53149e-408f-56c6-99ef-29fae67f8efc' topic_subscribe = f"/maintain/room/master/{client_udid}/" self.client.subscribe(topic_subscribe, qos=1) # 尝试使用 QoS 1 logging.info(f"已订阅主题: {topic_subscribe}") else: logging.error(f"连接MQTT失败,返回代码为: {rc}.") self.reconnect() def on_disconnect(self, client, userdata, rc): """断开连接回调函数""" if rc != 0: logging.error(f"异常断开,返回代码为: {rc}.") self.reconnect() def on_message(self, client, userdata, msg): """接收消息回调函数""" logging.debug(f"接收<-主题:'{msg.topic}'") try: payload = msg.payload.decode() logging.debug(f"接收<-数据:{payload}") except UnicodeDecodeError: logging.error("无法解码消息内容,可能是编码问题") def on_publish(self, client, userdata, mid): """发送消息回调函数""" logging.debug(f"Published message with message id {mid}") def on_subscribe(self, client, userdata, mid, granted_qos): """订阅成功回调函数""" logging.info(f"订阅成功,mid: {mid}, granted_qos: {granted_qos}") self.perform_post_subscription_tasks(client) def perform_post_subscription_tasks(self, client): """订阅成功后执行的任务""" logging.info("执行订阅成功后的任务...") # 注释掉或删除以下行以防止发送特定消息 # topic_publish = "/maintain/room/master/client/" # message = {"action": "test", "data": "Hello, MQTT!"} # self.publish_message(client, topic_publish, message) def publish_message(self, client, topic, message, qos=0): """发布消息到指定主题""" logging.debug(f"发送->主题:'{topic}',\n发送->数据: {json.dumps(message)}") result = client.publish(topic, json.dumps(message), qos) return result[0] == 0 # 返回是否成功 def open_conference_room(self, topic, client_udid): """开启会议室电源""" open_message = { "action": "_updatemaster", "client_udid": client_udid, "data": [ { "item": "environmental", "pm25": 20, "co2": 2, "temp": 20, "tvoc": 20, "humi": 20, "hcho": 20 }, { "item": "conference", "power": 1, "exist": 1, "run": "在线" } ] } return self.publish_message(self.client, topic, open_message, qos=0) def close_conference_room(self, topic, client_udid): """关闭会议室电源""" close_message = { "action": "_updatemaster", "client_udid": client_udid, "data": [ { "item": "environmental", "pm25": 20, "co2": 2, "temp": 20, "tvoc": 20, "humi": 20, "hcho": 20 }, { "item": "conference", "power": 0, "exist": 1, "run": "离线" } ] } return self.publish_message(self.client, topic, close_message, qos=0) def conference_room_online(self, topic, client_udid): """会议室在线""" online_message = { "udid": client_udid, "action": "online", "value": 1 } return self.publish_message(self.client, topic, online_message, qos=0) def conference_room_offline(self, topic, client_udid): """会议室离线""" offline_message = { "udid": client_udid, "action": "offline", "value": 0 } return self.publish_message(self.client, topic, offline_message, qos=0) def start(self): """连接服务器并启动网络循环""" try: self.client.connect(self.config['broker_address'], self.config['port']) except Exception as e: logging.error(f"连接MQTT失败,错误为: {e}") else: self.client.loop_start() def stop(self): """停止网络循环并断开连接""" self.client.loop_stop() self.client.disconnect() def reconnect(self): """重连机制""" logging.info("尝试重新连接MQTT服务器...") time.sleep(5) # 等待5秒后重试 self.start() # # 示例使用 # if __name__ == '__main__': # mqtt_client = MQTTClient('config.json') # mqtt_client.start() # # try: # while True: # logging.debug("正在运行...") # time.sleep(600) # 保持脚本运行 # except KeyboardInterrupt: # logging.info("已手动中断MQTT连接进程!") # finally: # mqtt_client.stop()