1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# -*- coding: utf-8 -*-
# cython: language_level=3
import json
import time
import paho.mqtt.client as mqtt
import logging
# 自定义过滤器类
class CustomFilter(logging.Filter):
def filter(self, record):
# 屏蔽特定的调试日志内容
if record.levelno == logging.DEBUG:
message = record.getMessage()
if "发送->主题" in message and "发送->数据" in message:
data = message.split("发送->数据: ")[1]
try:
data_dict = json.loads(data)
if data_dict.get("action") == "test" and data_dict.get("data") == "Hello, MQTT!":
return False
except json.JSONDecodeError:
pass
elif "Published message with message id" in message:
return False
return True
# 配置日志记录器
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
logger.addFilter(CustomFilter())
class MQTTClient:
def __init__(self, config_file):
self.config = self.load_config(config_file)
self.client = mqtt.Client(client_id=self.config['client_id'], protocol=mqtt.MQTTv311)
self.client.username_pw_set(self.config['username'], self.config['password'])
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
self.client.on_publish = self.on_publish
self.client.on_subscribe = self.on_subscribe
def load_config(self, file_path):
"""从指定路径加载配置文件"""
with open(file_path, 'r') as file:
return json.load(file)
def on_connect(self, client, userdata, flags, rc):
"""连接回调函数"""
if rc == 0:
logging.info("连接MQTT成功.")
# 订阅某个主题(可选)
client_udid = '3d53149e-408f-56c6-99ef-29fae67f8efc'
topic_subscribe = f"/maintain/room/master/{client_udid}/"
self.client.subscribe(topic_subscribe, qos=1) # 尝试使用 QoS 1
logging.info(f"已订阅主题: {topic_subscribe}")
else:
logging.error(f"连接MQTT失败,返回代码为: {rc}.")
self.reconnect()
def on_disconnect(self, client, userdata, rc):
"""断开连接回调函数"""
if rc != 0:
logging.error(f"异常断开,返回代码为: {rc}.")
self.reconnect()
def on_message(self, client, userdata, msg):
"""接收消息回调函数"""
logging.debug(f"接收<-主题:'{msg.topic}'")
try:
payload = msg.payload.decode()
logging.debug(f"接收<-数据:{payload}")
except UnicodeDecodeError:
logging.error("无法解码消息内容,可能是编码问题")
def on_publish(self, client, userdata, mid):
"""发送消息回调函数"""
logging.debug(f"Published message with message id {mid}")
def on_subscribe(self, client, userdata, mid, granted_qos):
"""订阅成功回调函数"""
logging.info(f"订阅成功,mid: {mid}, granted_qos: {granted_qos}")
self.perform_post_subscription_tasks(client)
def perform_post_subscription_tasks(self, client):
"""订阅成功后执行的任务"""
logging.info("执行订阅成功后的任务...")
# 注释掉或删除以下行以防止发送特定消息
# topic_publish = "/maintain/room/master/client/"
# message = {"action": "test", "data": "Hello, MQTT!"}
# self.publish_message(client, topic_publish, message)
def publish_message(self, client, topic, message, qos=0):
"""发布消息到指定主题"""
logging.debug(f"发送->主题:'{topic}',\n发送->数据: {json.dumps(message)}")
result = client.publish(topic, json.dumps(message), qos)
return result[0] == 0 # 返回是否成功
def open_conference_room(self, topic, client_udid):
"""开启会议室电源"""
open_message = {
"action": "_updatemaster",
"client_udid": client_udid,
"data": [
{
"item": "environmental",
"pm25": 20,
"co2": 2,
"temp": 20,
"tvoc": 20,
"humi": 20,
"hcho": 20
},
{
"item": "conference",
"power": 1,
"exist": 1,
"run": "在线"
}
]
}
return self.publish_message(self.client, topic, open_message, qos=0)
def close_conference_room(self, topic, client_udid):
"""关闭会议室电源"""
close_message = {
"action": "_updatemaster",
"client_udid": client_udid,
"data": [
{
"item": "environmental",
"pm25": 20,
"co2": 2,
"temp": 20,
"tvoc": 20,
"humi": 20,
"hcho": 20
},
{
"item": "conference",
"power": 0,
"exist": 1,
"run": "离线"
}
]
}
return self.publish_message(self.client, topic, close_message, qos=0)
def conference_room_online(self, topic, client_udid):
"""会议室在线"""
online_message = {
"udid": client_udid,
"action": "online",
"value": 1
}
return self.publish_message(self.client, topic, online_message, qos=0)
def conference_room_offline(self, topic, client_udid):
"""会议室离线"""
offline_message = {
"udid": client_udid,
"action": "offline",
"value": 0
}
return self.publish_message(self.client, topic, offline_message, qos=0)
def start(self):
"""连接服务器并启动网络循环"""
try:
self.client.connect(self.config['broker_address'], self.config['port'])
except Exception as e:
logging.error(f"连接MQTT失败,错误为: {e}")
else:
self.client.loop_start()
def stop(self):
"""停止网络循环并断开连接"""
self.client.loop_stop()
self.client.disconnect()
def reconnect(self):
"""重连机制"""
logging.info("尝试重新连接MQTT服务器...")
time.sleep(5) # 等待5秒后重试
self.start()
# # 示例使用
# if __name__ == '__main__':
# mqtt_client = MQTTClient('config.json')
# mqtt_client.start()
#
# try:
# while True:
# logging.debug("正在运行...")
# time.sleep(600) # 保持脚本运行
# except KeyboardInterrupt:
# logging.info("已手动中断MQTT连接进程!")
# finally:
# mqtt_client.stop()