提交 a3d350f5 authored 作者: 陈泽健's avatar 陈泽健

feat(Mqtt): 增加客户端 ID 并优化连接逻辑 增加批量更新模拟脚本- 在 Mqtt 类的构造函数中添加 client_id 参数

- 在连接 MQTT 服务器时指定客户端 ID
- 设置 will 消息,当客户端断连时自动发送 disconnect 消息
- 优化异常处理和日志记录
上级 3ff00d54
topic,clientId,appToken,companyNumber,cnum,conferenceId,macAddress,authCode,clientId,deviceId
/uams/android/broadcast,,,,,,,,48134e6047a19a0001,aa44e258a4e1e0001
/uams/android/broadcast,,,,,,,,48134e6047a19a0002,aa44e258a4e1e0002
/uams/android/broadcast,,,,,,,,48134e6047a19a0003,aa44e258a4e1e0003
/uams/android/broadcast,,,,,,,,48134e6047a19a0004,aa44e258a4e1e0004
/uams/android/broadcast,,,,,,,,48134e6047a19a0005,aa44e258a4e1e0005
/uams/android/broadcast,,,,,,,,48134e6047a19a0006,aa44e258a4e1e0006
/uams/android/broadcast,,,,,,,,48134e6047a19a0007,aa44e258a4e1e0007
/uams/android/broadcast,,,,,,,,48134e6047a19a0008,aa44e258a4e1e0008
/uams/android/broadcast,,,,,,,,48134e6047a19a0009,aa44e258a4e1e0009
/uams/android/broadcast,,,,,,,,48134e6047a19a0010,aa44e258a4e1e0010
/uams/android/broadcast,,,,,,,,48134e6047a19a0011,aa44e258a4e1e0011
/uams/android/broadcast,,,,,,,,48134e6047a19a0012,aa44e258a4e1e0012
/uams/android/broadcast,,,,,,,,48134e6047a19a0013,aa44e258a4e1e0013
/uams/android/broadcast,,,,,,,,48134e6047a19a0014,aa44e258a4e1e0014
/uams/android/broadcast,,,,,,,,48134e6047a19a0015,aa44e258a4e1e0015
/uams/android/broadcast,,,,,,,,48134e6047a19a0016,aa44e258a4e1e0016
/uams/android/broadcast,,,,,,,,48134e6047a19a0017,aa44e258a4e1e0017
/uams/android/broadcast,,,,,,,,48134e6047a19a0018,aa44e258a4e1e0018
/uams/android/broadcast,,,,,,,,48134e6047a19a0019,aa44e258a4e1e0019
/uams/android/broadcast,,,,,,,,48134e6047a19a0020,aa44e258a4e1e0020
/uams/android/broadcast,,,,,,,,48134e6047a19a0021,aa44e258a4e1e0021
/uams/android/broadcast,,,,,,,,48134e6047a19a0022,aa44e258a4e1e0022
/uams/android/broadcast,,,,,,,,48134e6047a19a0023,aa44e258a4e1e0023
/uams/android/broadcast,,,,,,,,48134e6047a19a0024,aa44e258a4e1e0024
/uams/android/broadcast,,,,,,,,48134e6047a19a0025,aa44e258a4e1e0025
/uams/android/broadcast,,,,,,,,48134e6047a19a0026,aa44e258a4e1e0026
/uams/android/broadcast,,,,,,,,48134e6047a19a0027,aa44e258a4e1e0027
/uams/android/broadcast,,,,,,,,48134e6047a19a0028,aa44e258a4e1e0028
/uams/android/broadcast,,,,,,,,48134e6047a19a0029,aa44e258a4e1e0029
/uams/android/broadcast,,,,,,,,48134e6047a19a0030,aa44e258a4e1e0030
/uams/android/broadcast,,,,,,,,48134e6047a19a0031,aa44e258a4e1e0031
/uams/android/broadcast,,,,,,,,48134e6047a19a0032,aa44e258a4e1e0032
/uams/android/broadcast,,,,,,,,48134e6047a19a0033,aa44e258a4e1e0033
/uams/android/broadcast,,,,,,,,48134e6047a19a0034,aa44e258a4e1e0034
/uams/android/broadcast,,,,,,,,48134e6047a19a0035,aa44e258a4e1e0035
/uams/android/broadcast,,,,,,,,48134e6047a19a0036,aa44e258a4e1e0036
/uams/android/broadcast,,,,,,,,48134e6047a19a0037,aa44e258a4e1e0037
/uams/android/broadcast,,,,,,,,48134e6047a19a0038,aa44e258a4e1e0038
/uams/android/broadcast,,,,,,,,48134e6047a19a0039,aa44e258a4e1e0039
/uams/android/broadcast,,,,,,,,48134e6047a19a0040,aa44e258a4e1e0040
/uams/android/broadcast,,,,,,,,48134e6047a19a0041,aa44e258a4e1e0041
/uams/android/broadcast,,,,,,,,48134e6047a19a0042,aa44e258a4e1e0042
/uams/android/broadcast,,,,,,,,48134e6047a19a0043,aa44e258a4e1e0043
/uams/android/broadcast,,,,,,,,48134e6047a19a0044,aa44e258a4e1e0044
/uams/android/broadcast,,,,,,,,48134e6047a19a0045,aa44e258a4e1e0045
/uams/android/broadcast,,,,,,,,48134e6047a19a0046,aa44e258a4e1e0046
/uams/android/broadcast,,,,,,,,48134e6047a19a0047,aa44e258a4e1e0047
/uams/android/broadcast,,,,,,,,48134e6047a19a0048,aa44e258a4e1e0048
/uams/android/broadcast,,,,,,,,48134e6047a19a0049,aa44e258a4e1e0049
/uams/android/broadcast,,,,,,,,48134e6047a19a0050,aa44e258a4e1e0050
/uams/android/broadcast,,,,,,,,48134e6047a19a0051,aa44e258a4e1e0051
/uams/android/broadcast,,,,,,,,48134e6047a19a0052,aa44e258a4e1e0052
/uams/android/broadcast,,,,,,,,48134e6047a19a0053,aa44e258a4e1e0053
/uams/android/broadcast,,,,,,,,48134e6047a19a0054,aa44e258a4e1e0054
/uams/android/broadcast,,,,,,,,48134e6047a19a0055,aa44e258a4e1e0055
/uams/android/broadcast,,,,,,,,48134e6047a19a0056,aa44e258a4e1e0056
/uams/android/broadcast,,,,,,,,48134e6047a19a0057,aa44e258a4e1e0057
/uams/android/broadcast,,,,,,,,48134e6047a19a0058,aa44e258a4e1e0058
/uams/android/broadcast,,,,,,,,48134e6047a19a0059,aa44e258a4e1e0059
/uams/android/broadcast,,,,,,,,48134e6047a19a0060,aa44e258a4e1e0060
/uams/android/broadcast,,,,,,,,48134e6047a19a0061,aa44e258a4e1e0061
/uams/android/broadcast,,,,,,,,48134e6047a19a0062,aa44e258a4e1e0062
/uams/android/broadcast,,,,,,,,48134e6047a19a0063,aa44e258a4e1e0063
/uams/android/broadcast,,,,,,,,48134e6047a19a0064,aa44e258a4e1e0064
/uams/android/broadcast,,,,,,,,48134e6047a19a0065,aa44e258a4e1e0065
/uams/android/broadcast,,,,,,,,48134e6047a19a0066,aa44e258a4e1e0066
/uams/android/broadcast,,,,,,,,48134e6047a19a0067,aa44e258a4e1e0067
/uams/android/broadcast,,,,,,,,48134e6047a19a0068,aa44e258a4e1e0068
/uams/android/broadcast,,,,,,,,48134e6047a19a0069,aa44e258a4e1e0069
/uams/android/broadcast,,,,,,,,48134e6047a19a0070,aa44e258a4e1e0070
/uams/android/broadcast,,,,,,,,48134e6047a19a0071,aa44e258a4e1e0071
/uams/android/broadcast,,,,,,,,48134e6047a19a0072,aa44e258a4e1e0072
/uams/android/broadcast,,,,,,,,48134e6047a19a0073,aa44e258a4e1e0073
/uams/android/broadcast,,,,,,,,48134e6047a19a0074,aa44e258a4e1e0074
/uams/android/broadcast,,,,,,,,48134e6047a19a0075,aa44e258a4e1e0075
/uams/android/broadcast,,,,,,,,48134e6047a19a0076,aa44e258a4e1e0076
/uams/android/broadcast,,,,,,,,48134e6047a19a0077,aa44e258a4e1e0077
/uams/android/broadcast,,,,,,,,48134e6047a19a0078,aa44e258a4e1e0078
/uams/android/broadcast,,,,,,,,48134e6047a19a0079,aa44e258a4e1e0079
/uams/android/broadcast,,,,,,,,48134e6047a19a0080,aa44e258a4e1e0080
/uams/android/broadcast,,,,,,,,48134e6047a19a0081,aa44e258a4e1e0081
/uams/android/broadcast,,,,,,,,48134e6047a19a0082,aa44e258a4e1e0082
/uams/android/broadcast,,,,,,,,48134e6047a19a0083,aa44e258a4e1e0083
/uams/android/broadcast,,,,,,,,48134e6047a19a0084,aa44e258a4e1e0084
/uams/android/broadcast,,,,,,,,48134e6047a19a0085,aa44e258a4e1e0085
/uams/android/broadcast,,,,,,,,48134e6047a19a0086,aa44e258a4e1e0086
/uams/android/broadcast,,,,,,,,48134e6047a19a0087,aa44e258a4e1e0087
/uams/android/broadcast,,,,,,,,48134e6047a19a0088,aa44e258a4e1e0088
/uams/android/broadcast,,,,,,,,48134e6047a19a0089,aa44e258a4e1e0089
/uams/android/broadcast,,,,,,,,48134e6047a19a0090,aa44e258a4e1e0090
/uams/android/broadcast,,,,,,,,48134e6047a19a0091,aa44e258a4e1e0091
/uams/android/broadcast,,,,,,,,48134e6047a19a0092,aa44e258a4e1e0092
/uams/android/broadcast,,,,,,,,48134e6047a19a0093,aa44e258a4e1e0093
/uams/android/broadcast,,,,,,,,48134e6047a19a0094,aa44e258a4e1e0094
/uams/android/broadcast,,,,,,,,48134e6047a19a0095,aa44e258a4e1e0095
/uams/android/broadcast,,,,,,,,48134e6047a19a0096,aa44e258a4e1e0096
/uams/android/broadcast,,,,,,,,48134e6047a19a0097,aa44e258a4e1e0097
/uams/android/broadcast,,,,,,,,48134e6047a19a0098,aa44e258a4e1e0098
/uams/android/broadcast,,,,,,,,48134e6047a19a0099,aa44e258a4e1e0099
/uams/android/broadcast,,,,,,,,48134e6047a19a0100,aa44e258a4e1e0100
/uams/android/broadcast,,,,,,,,48134e6047a19a0101,aa44e258a4e1e0101
\ No newline at end of file
from hytest import *
import os
import sys
import json
import logging
import threading
from datetime import datetime
from queue import Queue
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建预定系统的绝对路径
预定系统_path = os.path.abspath(os.path.join(current_dir, '..','..','..'))
预定系统_path = os.path.abspath(os.path.join(current_dir, '..', '..', '..'))
# 添加路径
sys.path.append(预定系统_path)
# 导入模块
......@@ -13,60 +21,89 @@ except ModuleNotFoundError as e:
print("尝试使用绝对路径导入")
from 预定系统.Base.Mqtt_Send import *
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 获取当前脚本所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
print("当前脚本所在的目录:", current_dir)
# 构建CSV文件的绝对路径
csv_file_path = os.path.join(current_dir, '../../测试数据/MQTT模块/MQTT安卓上报_2000条.csv')
# csv_file_path = os.path.join(current_dir, '../../测试数据/MQTT模块/MQTT心跳上报_2000条.csv')
csv_file_path = os.path.join(current_dir, '../测试数据/预定系统-门口屏/MQTT心跳上报_100条.csv')
# 工作线程函数
def worker(mqtt_client, config_queue, interval):
while True:
config = config_queue.get()
try:
topic = config["topic"]
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = Mqtt.build_message(config, current_time, topic)
mqtt_client.publish(topic, message)
time.sleep(interval)
except Exception as e:
logging.error(f"线程 {threading.current_thread().name} 发送消息失败: {e}")
finally:
config_queue.task_done()
if __name__ == "__main__":
# 读取配置文件
configs = Mqtt.read_config_from_csv(csv_file_path)
broker_address = "192.168.5.218"
username = "mqtt@cmdb" # 你的MQTT用户名
password = "mqtt@webpassw0RD" # 你的MQTT密码
broker_address = "192.168.5.229"
username = "mqtt@cmdb"
password = "mqtt@webpassw0RD"
port = 1883
num_repeats = 200 # 重复执行的次数
interval_between_repeats = 1 # 每次重复之间的间隔时间(秒)
num_repeats = 200
interval_between_repeats = 1
num_threads = 100 # 线程数量
# 创建 MQTT 客户端实例
mqtt_client = Mqtt(broker_address, port, username, password)
# 创建配置队列
config_queue = Queue()
# 设置 MQTT 服务器的用户名和密码
mqtt_client.connect()
# 创建MQTT客户端列表 (每个线程一个客户端)
mqtt_clients = []
for i, config in enumerate(configs):
client_id = config.get("clientId", f"python_client_{i}")
mqtt_client = Mqtt(broker_address, port, username, password, client_id)
mqtt_client.set_message_type("json")
mqtt_clients.append(mqtt_client)
try:
# 连接到 MQTT 服务器
mqtt_client.connect()
# 连接所有MQTT客户端
for client in mqtt_clients:
client.connect()
logging.info(f"连接成功,Client ID: {client.client_id}")
print('连接成功')
# 创建工作线程
for i in range(num_threads):
t = threading.Thread(
target=worker,
args=(mqtt_clients[i % len(mqtt_clients)], config_queue, interval_between_repeats),
name=f"Worker-{i + 1}",
daemon=True
)
t.start()
# 主循环
for repeat in range(num_repeats):
logging.info(f"开始第 {repeat + 1} 次上报")
# 遍历配置文件中的每一行数据
# 将配置放入队列
for config in configs:
#打印当前MQTT消息配置信息
Mqtt.print_current_config(config)
# 构建消息内容
topic = config["topic"]
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(config)
message = Mqtt.build_message(config, current_time, topic)
# 发送消息
mqtt_client.publish(topic, message)
config_queue.put(config)
# 每次发送之间可以设置一个间隔时间
time.sleep(interval_between_repeats)
# 每次重复之间设置一个间隔时间
config_queue.join() # 等待所有任务完成
time.sleep(interval_between_repeats)
except Exception as e:
logging.error(f"发送消息时发生错误: {e}")
logging.error(f"发生错误: {e}", exc_info=True)
finally:
# 断开与 MQTT 服务器的连接
mqtt_client.disconnect()
\ No newline at end of file
# 断开所有MQTT连接
for client in mqtt_clients:
client.disconnect()
......@@ -12,44 +12,52 @@ import paho.mqtt.client as mqtt
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Mqtt:
def __init__(self, broker_address, port, username=None, password=None):
def __init__(self, broker_address, port, username=None, password=None, client_id=None):
"""
初始化 MQTT 客户端
:param broker_address: MQTT 代理地址
:param port: MQTT 代理端口
:param username: MQTT 登录用户名(可选)
:param password: MQTT 登录密码(可选)
:param password: MQTT 登录密码(可选)
:param client_id: 客户端ID(可选)
"""
self.lock = None
self.broker_address = broker_address
self.port = port
self.username = username
self.password = password
self.client_id = client_id or f"python_client_{os.getpid()}_{time.time()}"
self.client = None
self._received_message_lock = threading.Lock()
self.received_message = None
self.message_type = None
self.username = username
self.password = password
self.message_type = None # 初始化message_type属性
def connect(self):
"""
连接到 MQTT 服务器,并支持账号密码登录
"""
"""连接到 MQTT 服务器"""
try:
self.client = mqtt.Client()
# 创建客户端时指定client_id
self.client = mqtt.Client(client_id=self.client_id)
# 如果提供了用户名和密码,则设置认证信息
if self.username and self.password:
self.client.username_pw_set(self.username, self.password)
self.client.on_connect = self.on_connect # 设置连接回调
self.client.on_message = self.on_message # 设置消息回调
self.client.connect(self.broker_address, self.port) # 连接到代理
self.client.loop_start() # 启动网络循环
logging.info("MQTT 客户端连接成功")
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
# 设置will消息
self.client.will_set(topic="/client/disconnect",
payload=json.dumps({"client_id": self.client_id}),
qos=1,
retain=False)
self.client.connect(self.broker_address, self.port, keepalive=60)
self.client.loop_start()
logging.info(f"MQTT连接成功,Client ID: {self.client_id}")
except Exception as e:
logging.error(f"连接到MQTT服务器时发生错误: {e}")
logging.error(f"连接失败: {str(e)}")
raise
def print_current_config(config):
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论