提交 4f0658f8 authored 作者: 陈泽健's avatar 陈泽健

refactor(MQTT通用工具): 实现稳定连接的MQTT客户端并优化消息发布流程

- 新增 StableMQTTClient 类,具有重试机制的 MQTT 客户端连接和消息发布功能
- 优化工作线程函数,增加错误处理和重试逻辑
- 调整主循环逻辑,支持持续运行并添加异常处理- 定时任务脚本中更新了任务执行时间并注释掉了一次性执行任务
上级 cbcbd03e
...@@ -4,6 +4,7 @@ import sys ...@@ -4,6 +4,7 @@ import sys
import json import json
import logging import logging
import threading import threading
import time
from datetime import datetime from datetime import datetime
from queue import Queue from queue import Queue
...@@ -34,6 +35,43 @@ print("当前脚本所在的目录:", current_dir) ...@@ -34,6 +35,43 @@ print("当前脚本所在的目录:", current_dir)
csv_file_path = os.path.join(current_dir, '../测试数据/预定系统-门口屏/MQTT心跳上报_100条.csv') csv_file_path = os.path.join(current_dir, '../测试数据/预定系统-门口屏/MQTT心跳上报_100条.csv')
class StableMQTTClient:
def __init__(self, broker_address, port, username, password, client_id):
self.broker_address = broker_address
self.port = port
self.username = username
self.password = password
self.client_id = client_id
self.client = None
self.connect()
def connect(self):
max_retries = 3
for attempt in range(max_retries):
try:
self.client = Mqtt(self.broker_address, self.port,
self.username, self.password, self.client_id)
self.client.set_message_type("json")
self.client.connect()
logging.info(f"连接成功,Client ID: {self.client_id}")
return True
except Exception as e:
if attempt == max_retries - 1:
logging.error(f"连接失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
raise
wait_time = (attempt + 1) * 5
logging.warning(f"连接失败,{wait_time}秒后重试... ({attempt + 1}/{max_retries})")
time.sleep(wait_time)
def publish(self, topic, message):
try:
self.client.publish(topic, message)
except Exception as e:
logging.error(f"发布消息失败: {str(e)},尝试重新连接...")
self.connect()
self.client.publish(topic, message) # 重试一次
# 工作线程函数 # 工作线程函数
def worker(mqtt_client, config_queue, interval): def worker(mqtt_client, config_queue, interval):
while True: while True:
...@@ -48,6 +86,7 @@ def worker(mqtt_client, config_queue, interval): ...@@ -48,6 +86,7 @@ def worker(mqtt_client, config_queue, interval):
except Exception as e: except Exception as e:
logging.error(f"线程 {threading.current_thread().name} 发送消息失败: {e}") logging.error(f"线程 {threading.current_thread().name} 发送消息失败: {e}")
time.sleep(5) # 出错后等待5秒
finally: finally:
config_queue.task_done() config_queue.task_done()
...@@ -59,51 +98,60 @@ if __name__ == "__main__": ...@@ -59,51 +98,60 @@ if __name__ == "__main__":
username = "mqtt@cmdb" username = "mqtt@cmdb"
password = "mqtt@webpassw0RD" password = "mqtt@webpassw0RD"
port = 1883 port = 1883
num_repeats = 2000
interval_between_repeats = 1
num_threads = 100 # 线程数量 num_threads = 100 # 线程数量
# 创建配置队列 # 创建配置队列
config_queue = Queue() config_queue = Queue()
# 创建MQTT客户端列表 (每个线程一个客户端) # 创建稳定的MQTT客户端列表
mqtt_clients = [] mqtt_clients = []
for i, config in enumerate(configs): for i, config in enumerate(configs):
client_id = config.get("clientId", f"python_client_{i}") client_id = config.get("clientId", f"python_client_{i}")
mqtt_client = Mqtt(broker_address, port, username, password, client_id) try:
mqtt_client.set_message_type("json") mqtt_client = StableMQTTClient(broker_address, port, username, password, client_id)
mqtt_clients.append(mqtt_client) mqtt_clients.append(mqtt_client)
except Exception as e:
logging.error(f"创建客户端 {client_id} 失败: {str(e)}")
try: try:
# 连接所有MQTT客户端
for client in mqtt_clients:
client.connect()
logging.info(f"连接成功,Client ID: {client.client_id}")
# 创建工作线程 # 创建工作线程
threads = []
for i in range(num_threads): for i in range(num_threads):
t = threading.Thread( t = threading.Thread(
target=worker, target=worker,
args=(mqtt_clients[i % len(mqtt_clients)], config_queue, interval_between_repeats), args=(mqtt_clients[i % len(mqtt_clients)], config_queue, 1),
name=f"Worker-{i + 1}", name=f"Worker-{i + 1}",
daemon=True daemon=True
) )
t.start() t.start()
threads.append(t)
time.sleep(0.1) # 避免同时启动太多线程
# 主循环 # 主循环 - 持续运行
for repeat in range(num_repeats): logging.info("系统已启动,将持续运行...")
logging.info(f"开始第 {repeat + 1} 次上报") while True:
try:
# 将配置放入队列 # 将配置放入队列
for config in configs: for config in configs:
config_queue.put(config) config_queue.put(config)
config_queue.join() # 等待所有任务完成 config_queue.join() # 等待所有任务完成
time.sleep(interval_between_repeats) time.sleep(1)
except KeyboardInterrupt:
logging.info("接收到中断信号,准备退出...")
break
except Exception as e:
logging.error(f"主循环发生错误: {e}", exc_info=True)
time.sleep(5) # 出错后等待5秒
except Exception as e: except Exception as e:
logging.error(f"发生错误: {e}", exc_info=True) logging.error(f"发生错误: {e}", exc_info=True)
finally: finally:
logging.info("正在关闭所有连接...")
# 断开所有MQTT连接 # 断开所有MQTT连接
for client in mqtt_clients: for client in mqtt_clients:
client.disconnect() try:
client.client.disconnect()
except:
pass
...@@ -104,9 +104,9 @@ start_workers(3) ...@@ -104,9 +104,9 @@ start_workers(3)
# run_task, # run_task,
# pull_remote_devlop.Deployer().deploy # pull_remote_devlop.Deployer().deploy
# ) # )
schedule.every().day.at("11:42").do(run_task, run_automation_test, report_title="兰州中石化项目测试报告", report_url_prefix="http://nat.ubainsyun.com:31135", test_case="兰州中石化项目", ding_type="标准版巡检") schedule.every().day.at("07:00").do(run_task, run_automation_test, report_title="兰州中石化项目测试报告", report_url_prefix="http://nat.ubainsyun.com:31135", test_case="兰州中石化项目", ding_type="标准版巡检")
run_automation_test(report_title="兰州中石化项目测试报告", report_url_prefix="http://nat.ubainsyun.com:31135", test_case="新-AI创会测试", ding_type="标准版巡检") # run_automation_test(report_title="兰州中石化项目测试报告", report_url_prefix="http://nat.ubainsyun.com:31135", test_case="新-AI创会测试", ding_type="标准版巡检")
# 定时执行展厅巡检任务 # 定时执行展厅巡检任务
# schedule.every().day.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检") # schedule.every().day.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论