提交 ce47ccf9 authored 作者: 陈泽健's avatar 陈泽健

refactor(预定系统): 优化代码结构和功能

- 修改 base.py 中的 ChromeDriver 路径设置
- 重构 StableMQTTClient 类,增加连接重试和消息重发功能
- 新增安卓信息全局定时重启脚本
- 扩展安卓信息批量更新模拟脚本,增加下载进度显示和重试机制
上级 a37364f5
import paho.mqtt.client as mqtt
import logging
import threading
import time
from queue import Queue
from collections import defaultdict
# 配置日志输出到控制台
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
# 全局配置
THREAD_COUNT = 5 # 线程数量
message_queue = Queue() # 消息队列
message_counter = defaultdict(int) # 消息计数器
message_details = defaultdict(list) # 消息详细记录
class MessageStats:
def __init__(self):
self.total_count = 0
self.last_received = None
self.first_received = None
# 全局消息统计
global_stats = MessageStats()
def print_banner(title):
"""打印分隔横幅"""
banner = f"\n=== {title} ===\n"
print(banner)
def test_progress():
"""测试进度显示"""
print_banner("系统测试")
for i in range(1, 6):
time.sleep(0.5)
logging.info(f"系统测试中... {i}/5")
print_banner("测试完成")
def on_connect(client, userdata, flags, rc):
"""MQTT连接回调"""
if rc == 0:
logging.info("MQTT连接成功")
client.subscribe("/androidPanel/#") # 订阅所有相关主题
else:
logging.error(f"MQTT连接失败,错误码: {rc}")
def on_message(client, userdata, msg):
"""MQTT消息回调"""
try:
payload = msg.payload.decode()
logging.info(f"收到原始消息 - 主题: {msg.topic}")
message_queue.put((msg.topic, payload))
except Exception as e:
logging.error(f"消息处理失败: {e}")
def worker_thread():
"""工作线程函数 - 只记录消息不进行其他操作"""
while True:
topic, payload = message_queue.get()
try:
# 记录消息统计
message_counter[topic] += 1
global_stats.total_count += 1
# 记录详细消息
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
message_details[topic].append({
'timestamp': timestamp,
'payload': payload
})
# 更新首次/最后接收时间
if global_stats.first_received is None:
global_stats.first_received = timestamp
global_stats.last_received = timestamp
# 打印消息摘要
logging.info(
f"消息记录 - 主题: {topic} | "
f"次数: {message_counter[topic]} | "
f"时间: {timestamp}"
)
# 定期打印统计摘要
if global_stats.total_count % 10 == 0:
print_banner("统计摘要")
logging.info(f"运行时间: {time.strftime('%H:%M:%S', time.gmtime(time.time() - start_time))}")
logging.info(f"总消息数: {global_stats.total_count}")
for topic, count in message_counter.items():
logging.info(f"主题 '{topic}': {count}次")
except Exception as e:
logging.error(f"消息记录失败: {e}")
finally:
message_queue.task_done()
if __name__ == "__main__":
start_time = time.time()
print_banner("MQTT消息监控程序启动")
logging.info(f"启动时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
logging.info(f"工作线程数: {THREAD_COUNT}")
# 测试系统功能
test_progress()
# 启动工作线程池
for i in range(THREAD_COUNT):
t = threading.Thread(
target=worker_thread,
name=f"Worker-{i + 1}",
daemon=True
)
t.start()
logging.info(f"已启动工作线程 {i + 1}/{THREAD_COUNT}")
# 创建MQTT客户端
client = mqtt.Client()
client.username_pw_set("mqtt@cmdb", "mqtt@webpassw0RD")
client.on_connect = on_connect
client.on_message = on_message
try:
# 连接MQTT服务器
client.connect("192.168.5.229", 1883)
logging.info("MQTT客户端已启动,等待消息...")
# 保持运行
client.loop_forever()
except KeyboardInterrupt:
print_banner("程序终止")
logging.info("最终统计:")
logging.info(f"总运行时间: {time.strftime('%H:%M:%S', time.gmtime(time.time() - start_time))}")
logging.info(f"总接收消息数: {global_stats.total_count}")
for topic, count in message_counter.items():
logging.info(f"主题 '{topic}': {count}次")
except Exception as e:
logging.error(f"程序发生错误: {e}")
finally:
client.disconnect()
logging.info("MQTT客户端已断开连接")
\ No newline at end of file
......@@ -37,18 +37,52 @@ csv_file_path = os.path.join(current_dir, '../测试数据/预定系统-门口
class StableMQTTClient:
def __init__(self, broker_address, port, username, password, client_id):
self.broker_address = broker_address
self.port = port
self.username = username
self.password = password
self.client_id = client_id
"""
初始化MQTT客户端实例
参数:
broker_address (str): MQTT代理服务器的地址/IP
port (int): MQTT代理服务器的端口号
username (str): 连接代理服务器的用户名
password (str): 连接代理服务器的密码
client_id (str): 客户端的唯一标识符
功能:
1. 保存连接参数
2. 初始化客户端对象为None
3. 自动尝试连接代理服务器
"""
# 保存MQTT连接参数
self.broker_address = broker_address # 代理服务器地址
self.port = port # 端口号
self.username = username # 用户名
self.password = password # 密码
self.client_id = client_id # 客户端ID
# 初始化MQTT客户端对象(将在connect方法中实例化)
self.client = None
# 自动尝试连接代理服务器
self.connect()
def connect(self):
"""
尝试连接到MQTT broker,最多重试3次。
每次连接失败后会等待递增的时间后重试(5秒、10秒、15秒)。
如果所有尝试都失败,则抛出最后的异常并记录错误日志。
连接成功时会记录成功日志并返回True。
Returns:
bool: 连接成功返回True,否则抛出异常。
Raises:
Exception: 当所有重试尝试都失败时,抛出最后一次连接尝试的异常。
"""
max_retries = 3
for attempt in range(max_retries):
try:
# 创建MQTT客户端并尝试连接
self.client = Mqtt(self.broker_address, self.port,
self.username, self.password, self.client_id)
self.client.set_message_type("json")
......@@ -56,17 +90,34 @@ class StableMQTTClient:
logging.info(f"连接成功,Client ID: {self.client_id}")
return True
except Exception as e:
# 最后一次尝试失败时直接抛出异常
if attempt == max_retries - 1:
logging.error(f"连接失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
raise
# 非最后一次失败时等待递增时间后重试
wait_time = (attempt + 1) * 5
logging.warning(f"连接失败,{wait_time}秒后重试... ({attempt + 1}/{max_retries})")
time.sleep(wait_time)
def publish(self, topic, message):
"""
发布消息到指定主题。
如果发布失败,将自动尝试重新连接并重试一次发布操作。
参数:
topic (str): 要发布消息的主题名称
message (str): 要发布的消息内容
异常:
如果重试后仍然失败,将通过logging记录错误但不会抛出异常
"""
try:
# 尝试发布消息
self.client.publish(topic, message)
except Exception as e:
# 发布失败时记录错误并尝试重新连接后重试
logging.error(f"发布消息失败: {str(e)},尝试重新连接...")
self.connect()
self.client.publish(topic, message) # 重试一次
......@@ -74,9 +125,21 @@ class StableMQTTClient:
# 工作线程函数
def worker(mqtt_client, config_queue, interval):
"""MQTT消息发布工作线程
持续从配置队列中获取配置信息,构建MQTT消息并发布到指定主题。
该线程会循环运行直到被外部中断。
Args:
mqtt_client: 已连接的MQTT客户端实例,用于发布消息
config_queue: 包含配置信息的队列,每个配置项应包含topic等必要字段
interval: 每次消息发布后的间隔时间(秒)
"""
while True:
# 从队列获取配置信息
config = config_queue.get()
try:
# 构建并发布MQTT消息
topic = config["topic"]
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = Mqtt.build_message(config, current_time, topic)
......@@ -85,9 +148,11 @@ def worker(mqtt_client, config_queue, interval):
time.sleep(interval)
except Exception as e:
# 异常处理:记录错误日志并短暂等待
logging.error(f"线程 {threading.current_thread().name} 发送消息失败: {e}")
time.sleep(5) # 出错后等待5秒
finally:
# 标记队列任务完成
config_queue.task_done()
......
......@@ -72,7 +72,7 @@ def browser_init(login_type):
options.add_argument('--allow-insecure-localhost')
# 使用webdriver_manager自动下载并管理chromedriver
service = ChromeService(ChromeDriverManager().install())
# service = ChromeService(ChromeDriverManager().install())
# 使用备用的ChromeDriver下载源
# service = Service(ChromeDriverManager().install())
# 手动指定ChromeDriver的路径
......@@ -83,7 +83,7 @@ def browser_init(login_type):
# EDY电脑
# service = Service(r'C:\Users\EDY\AppData\Local\Programs\Python\Python310\Scripts\chromedriver.exe')
# 云电脑
# service = Service(r'E:\Python\Scripts\chromedriver.exe')
service = Service(r'E:\Python\Scripts\chromedriver.exe')
# 自动化虚拟机
# service = Service(r'C:\Program Files\Python310\Scripts\chromedriver.exe')
# 尝试创建WebDriver实例并执行初始化操作
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论