提交 cdf4655f authored 作者: 陈泽健's avatar 陈泽健

完成用户管理模块的主流程代码。优化ddt_cases的读取方式,从csv中读取测试数据。并且将csv写入ddt_cases的代码封装成函数,后续方便管理。调...

完成用户管理模块的主流程代码。优化ddt_cases的读取方式,从csv中读取测试数据。并且将csv写入ddt_cases的代码封装成函数,后续方便管理。调试安卓信息MQTT上报没有成功的问题。问题已解决。
上级 cdecc8bd
import json
import logging
import csv
import os
import re
import threading
import time
from distutils.command.config import config
from time import sleep
from datetime import datetime
import paho.mqtt.client as mqtt
......@@ -25,6 +26,7 @@ class Mqtt:
self.client = None
self._received_message_lock = threading.Lock()
self.received_message = None
self.message_type = None
def connect(self):
"""
......@@ -114,13 +116,38 @@ class Mqtt:
logging.error("客户端未初始化,无法订阅主题")
raise ValueError("客户端未初始化,无法订阅主题")
# 输入验证
if not isinstance(topic, str) or not topic.strip():
logging.error("无效的主题名称")
raise ValueError("无效的主题名称")
try:
self.client.subscribe(topic)
logging.info(f"已订阅主题: {topic}")
except ConnectionError as ce:
logging.error(f"连接错误,无法订阅主题: {topic}, 错误信息: {str(ce)}")
raise
except TimeoutError as te:
logging.error(f"超时错误,无法订阅主题: {topic}, 错误信息: {str(te)}")
raise
except ValueError as ve:
logging.error(f"值错误,无法订阅主题: {topic}, 错误信息: {str(ve)}")
raise
except Exception as e:
logging.error(f"订阅主题失败: {topic}, 错误信息: {str(e)}")
logging.error(f"未知错误,无法订阅主题: {topic}, 错误信息: {str(e)}")
raise
def set_message_type(self, message_type):
"""
设置消息类型
此方法用于设置或更改消息类型属性,允许对象根据需要处理不同类型的消息
参数:
message_type: 要设置的消息类型,可以是任何数据类型,但通常应该是字符串、整数或枚举类型
返回:
"""
self.message_type = message_type
def publish(self, topic, message):
"""
发布消息到指定的主题
......@@ -131,18 +158,22 @@ class Mqtt:
if self.client:
try:
# 将消息转换为JSON字符串
if isinstance(message, dict):
if self.message_type == dict and isinstance(message, dict):
message = json.dumps(message)
except TypeError as e:
logging.error(f"消息转换失败: {e}")
return
elif message is None:
message = ""
else:
message = str(message)
except (TypeError, ValueError) as e:
logging.error(f"{datetime.now()} - 消息转换失败: {e} - 调用者: {self.__class__.__name__}.publish")
raise
try:
self.client.publish(topic, message)
logging.info(f"已发布消息到主题: {topic}")
logging.info(f"{datetime.now()} - 已发布消息到主题: {topic} - 调用者: {self.__class__.__name__}.publish")
except Exception as e:
logging.error(f"消息发布失败: {e}")
logging.error(f"失败的主题: {topic}, 消息: {message}")
logging.error(f"{datetime.now()} - 消息发布失败: {e} - 失败的主题: {topic}, 消息: {message} - 调用者: {self.__class__.__name__}.publish")
raise
def wait_for_message(self, topic, timeout=5):
"""
......@@ -177,14 +208,29 @@ class Mqtt:
@staticmethod
def read_config_from_csv(file_path):
"""
从 CSV 文件读取配置-
从 CSV 文件读取配置
:param file_path: CSV 文件路径
:param allowed_directory: 允许访问的目录
:return: 配置列表
"""
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
return list(reader)
try:
# 验证文件路径
if not os.path.isfile(file_path):
raise FileNotFoundError(f"文件 {file_path} 不存在")
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
return [row for row in reader]
except FileNotFoundError as e:
print(f"错误: {e}")
return []
except PermissionError as e:
print(f"错误: {e}")
return []
except Exception as e:
print(f"未知错误: {e}")
return []
@staticmethod
def wait_for_message(self, topic, timeout=5):
......@@ -195,26 +241,49 @@ class Mqtt:
:param timeout: 超时时间(秒)
:return: 接收到的消息或 None
"""
if self.client:
start_time = datetime.now()
while (datetime.now() - start_time).total_seconds() < timeout:
if hasattr(self, 'received_message'):
return self.received_message
sleep(0.1)
if not isinstance(topic, str) or not re.match(r'^[a-zA-Z0-9_\-]+$', topic):
raise ValueError("Invalid topic format")
if timeout < 0:
return None
try:
if self.client:
start_time = time.time()
while (time.time() - start_time) < timeout:
if self.has_received_message():
return self.received_message
sleep(0.1)
return None
except AttributeError:
return None
return None
def has_received_message(self):
return hasattr(self, 'received_message')
@staticmethod
def read_config_from_csv(file_path):
"""
从 CSV 文件读取配置-
从 CSV 文件读取配置
:param file_path: CSV 文件路径
:return: 配置列表
"""
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
return list(reader)
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件 {file_path} 不存在")
try:
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
config_list = [row for row in reader]
if not config_list:
raise ValueError("CSV 文件内容为空或格式不正确")
logging.info(f"成功读取文件 {file_path}")
return config_list
except Exception as e:
logging.error(f"读取文件 {file_path} 时发生错误: {e}")
return []
@staticmethod
def build_message(config, current_time,topic):
......@@ -312,21 +381,24 @@ class Mqtt:
}
})
def send_and_receive_messages(self, topic, message, num_times=1, timeout=5, interval=0.2):
def send_and_receive_messages(self, topic: str, message: str, num_times: int = 1, timeout: int = 5,
interval: float = 0.2):
"""
发送并接收消息
:param topic: 主题名称
:param message: 消息内容
:param num_times: 发送次数
:param timeout: 超时时间(秒)
:param interval: 每次发送之间的间隔时间(秒)
:param num_times: 发送次数,默认为1
:param timeout: 超时时间(秒),默认为5秒
:param interval: 每次发送之间的间隔时间(秒),默认为0.2秒
"""
if not isinstance(topic, str) or not isinstance(message, str):
raise ValueError("主题和消息必须是字符串类型")
for i in range(num_times):
try:
self.publish(topic, message)
received_message = self.wait_for_message(topic, timeout=timeout)
logging.info(f"正在等待主题: {topic} 的消息,超时时间: {timeout} 秒")
if received_message:
logging.info("消息接收成功!")
......@@ -334,5 +406,9 @@ class Mqtt:
logging.warning("超时时间内未接收到消息。")
sleep(interval)
except (ConnectionError, TimeoutError) as e:
logging.error(f"网络连接或超时错误: {e}")
except ValueError as e:
logging.error(f"值错误: {e}")
except Exception as e:
logging.error(f"发送或接收消息时发生错误: {e}")
logging.error(f"未知错误: {e}")
\ No newline at end of file
此差异已折叠。
......@@ -29,4 +29,8 @@
- 测试报告中补充用例截图。
- 完成毫米波雷达的模拟数据上报,系统界面根据上报信息来回显,目前只造了100个模拟数据。
8. 2024-11-05
- 完成用户管理模块的新增用户、删除用户、用户查询。补充脚本注释信息。
\ No newline at end of file
- 完成用户管理模块的新增用户、删除用户、用户查询。补充脚本注释信息。
9. 2024-11-06
- 完成用户管理模块的主流程代码。优化ddt_cases的读取方式,从csv中读取测试数据。并且将csv写入ddt_cases的代码封装成函数,后续方便管理。调试安卓信息MQTT上报没有成功的问题。问题已解决。
- 优化ddt_cases的读取方式,从csv中读取测试数据。并且将csv写入ddt_cases的代码封装成函数,后续方便管理。
- 调试安卓信息MQTT上报没有成功的问题。问题已解决。
\ No newline at end of file
from time import sleep
from hytest import *
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建预定系统的绝对路径
预定系统_path = os.path.abspath(os.path.join(current_dir, '..','..','..'))
# 添加路径
sys.path.append(预定系统_path)
# 导入模块
from 预定系统.Base.base import *
account = 'admin@Test'
username = 'admin@Test'
password = 'Ubains@54321'
new_password = 'Ubains@4321'
phone = '17319004611'
email = '2919407801@qq.com'
class Main_User_Manage_0001:
def teststeps(self):
# 初始化通知文本为空字符串
notify_text = ""
# 从全局存储中获取webdriver实例
wd = GSTORE['wd']
STEP(1, "点击【新增】按钮")
safe_click((By.XPATH, "//span[contains(text(),'添 加')]"), wd)
sleep(1)
STEP(2, f"输入账号:{account},用户名:{username},密码:{password},确认密码: {password},手机号:{phone},邮箱:{email}")
safe_send_keys((By.XPATH, "//input[@id='accountChange']"), account, wd)
# 输入确认用户名
safe_send_keys((By.XPATH, "//input[@placeholder='用户名']"), username, wd)
# 输入密码
safe_send_keys(
(By.XPATH, "//input[@placeholder='11位及以上的大小写字母和数字且连续3位及以上不重复和不连续组合']"),
password, wd)
# 输入确认密码
safe_send_keys((By.XPATH, "//input[@placeholder='确认密码']"), password, wd)
# 点击部门下拉框
safe_click((By.XPATH, "//div[@class='el-input el-input--suffix']//input[@placeholder='请选择']"), wd)
# 选择部门
safe_click((By.XPATH, "/html[1]/body[1]/div[4]/div[1]/div[1]/div[1]/ul[1]/li[1]/label[1]/span[1]/span[1]"), wd)
# 点击性别单选框
safe_click((By.XPATH, "//span[contains(text(),'男')]"), wd)
# 输入手机号和邮箱
safe_send_keys((By.XPATH, "//input[@placeholder='手机']"), phone, wd)
safe_send_keys((By.XPATH, "//input[@placeholder='邮箱']"), email, wd)
sleep(2)
# 屏幕截图
SELENIUM_LOG_SCREEN(wd)
STEP(3, "点击【确定】按钮")
# 点击确定按钮
safe_click((By.XPATH, "//div[@aria-label='添加用户']//span[contains(text(),'确定')]"), wd)
sleep(2)
notify_text = get_notify_text(wd)
# 检查点:验证提示信息是否与预期相符
CHECK_POINT('弹出提示', notify_text == "添加成功")
STEP(4,"管理员退出登录")
safe_click((By.XPATH, "//img[@title='返回预定首页']"),wd)
safe_click((By.XPATH, "//img[@title='退出登录']"),wd)
STEP(5,f"{account}用户登录系统")
INFO(f"登录账号:{account},登录密码:{password},登录验证码:'csba'")
safe_send_keys((By.XPATH, "//input[@placeholder='请输入账号或手机号或邮箱号']"),account,wd)
safe_send_keys((By.XPATH, "//input[@placeholder='请输入密码']"),password,wd)
safe_send_keys((By.XPATH, "//input[@placeholder='请输入图形验证码']"),"csba",wd)
safe_click((By.XPATH, "//input[@value='登 录']"),wd)
sleep(2)
notify_text = get_notify_text(wd)
# 检查点:验证提示信息是否与预期相符
INFO(f"首次登录提示:{notify_text}")
CHECK_POINT('弹出提示', notify_text == "首次登录,请修改密码")
STEP(6,f"{account}修改用户密码")
INFO(f"修改密码为:{new_password}")
safe_send_keys((By.XPATH, "//body[1]/div[2]/div[1]/div[2]/div[2]/div[2]/div[1]/input[1]"),password,wd)
safe_send_keys((By.XPATH, "//input[@placeholder='11位及以上的大小写字母和数字且连续3位及以上不重复和不连续组合']"),new_password,wd)
safe_send_keys((By.XPATH, "//input[@placeholder='请再次输入密码']"),new_password,wd)
safe_click((By.XPATH, "//span[contains(text(),'确定')]"),wd)
sleep(2)
notify_text = get_notify_text(wd)
# 检查点:验证提示信息是否与预期相符
INFO(f"首次登录修改密码提示:{notify_text}")
CHECK_POINT('弹出提示', notify_text == "修改成功,请重新登录")
STEP(7,f"{account}用户重新登录系统")
INFO(f"登录账号:{account},登录密码:{new_password},登录验证码:'csba'")
safe_send_keys((By.XPATH, "//input[@placeholder='请输入账号或手机号或邮箱号']"),account,wd)
safe_send_keys((By.XPATH, "//input[@placeholder='请输入密码']"),new_password,wd)
safe_send_keys((By.XPATH, "//input[@placeholder='请输入图形验证码']"),'csba',wd)
safe_click((By.XPATH, "//input[@value='登 录']"),wd)
sleep(2)
notify_text = elment_get_text((By.XPATH, "//span[contains(text(),'欢迎 自动化测试公司')]"),wd)
# 检查点:验证提示信息是否与预期相符
INFO(f"登录系统提示:{notify_text}")
CHECK_POINT('登录成功校验', notify_text == "欢迎 自动化测试公司")
SELENIUM_LOG_SCREEN(wd)
\ No newline at end of file
......@@ -10,53 +10,14 @@ sys.path.append(预定系统_path)
# 导入模块
from 预定系统.Base.base import *
# 获取当前脚本所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建CSV文件的绝对路径
csv_file_path = os.path.join(current_dir, '../../测试数据/用户管理模块/用户删除.csv')
class Delete_User_000x:
ddt_cases = [
{
'name': '用户删除_001',
'para': ['admin@12', '删除成功']
},
{
'name': '用户删除_002',
'para': ['admin@21', '删除成功']
},
{
'name': '用户删除_003',
'para': ['admin@32', '删除成功']
},
{
'name': '用户删除_004',
'para': ['admin@admin', '删除成功']
},
{
'name': '用户删除_005',
'para': ['admin@34', '删除成功']
},
{
'name': '用户删除_006',
'para': ['admin@35', '删除成功']
},
{
'name': '用户删除_007',
'para': ['admin@36', '删除成功']
},
{
'name': '用户删除_008',
'para': ['admin@37', '删除成功']
},
{
'name': '用户删除_009',
'para': ['admin@38', '删除成功']
},
{
'name': '用户删除_010',
'para': ['admin@44', '删除成功']
},
{
'name': '用户删除_011',
'para': ['admin@51', '删除成功']
}
]
ddt_cases = read_csv_data(csv_file_path)
def teststeps(self):
"""
......
import csv
from time import sleep
from hytest import *
......@@ -10,185 +11,14 @@ sys.path.append(预定系统_path)
# 导入模块
from 预定系统.Base.base import *
# 获取当前脚本所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建CSV文件的绝对路径
csv_file_path = os.path.join(current_dir, '../../测试数据/用户管理模块/用户查询.csv')
class Query_User_000x:
ddt_cases = [
{
'name': '用户查询_001',
'para': ['admin@12', 'account']
},
{
'name': '用户查询_002',
'para': ['admin@21', 'account']
},
{
'name': '用户查询_003',
'para': ['admin@32', 'account']
},
{
'name': '用户查询_004',
'para': ['admin@admin', 'account']
},
{
'name': '用户查询_005',
'para': ['admin@34', 'account']
},
{
'name': '用户查询_006',
'para': ['admin@35', 'account']
},
{
'name': '用户查询_007',
'para': ['admin@36', 'account']
},
{
'name': '用户查询_008',
'para': ['admin@37', 'account']
},
{
'name': '用户查询_009',
'para': ['admin@38', 'account']
},
{
'name': '用户查询_010',
'para': ['admin@44', 'account']
},
{
'name': '用户查询_011',
'para': ['admin@51', 'account']
},
{
'name': '用户查询_012',
'para': ['12', 'vague_account']
},
{
'name': '用户查询_013',
'para': ['21', 'vague_account']
},
{
'name': '用户查询_014',
'para': ['32', 'vague_account']
},
{
'name': '用户查询_015',
'para': ['admin', 'vague_account']
},
{
'name': '用户查询_016',
'para': ['34', 'vague_account']
},
{
'name': '用户查询_017',
'para': ['35', 'vague_account']
},
{
'name': '用户查询_018',
'para': ['36', 'vague_account']
},
{
'name': '用户查询_019',
'para': ['37', 'vague_account']
},
{
'name': '用户查询_020',
'para': ['38', 'vague_account']
},
{
'name': '用户查询_021',
'para': ['44', 'vague_account']
},
{
'name': '用户查询_022',
'para': ['51', 'vague_account']
},
{
'name': '用户查询_023',
'para': ['张三', 'username']
},
{
'name': '用户查询_024',
'para': ['张二', 'username']
},
{
'name': '用户查询_025',
'para': ['张四', 'username']
},
{
'name': '用户查询_026',
'para': ['admin@admin', 'username']
},
{
'name': '用户查询_027',
'para': ['李四', 'username']
},
{
'name': '用户查询_028',
'para': ['王五', 'username']
},
{
'name': '用户查询_029',
'para': ['王六', 'username']
},
{
'name': '用户查询_030',
'para': ['chen', 'username']
},
{
'name': '用户查询_031',
'para': ['chen1', 'username']
},
{
'name': '用户查询_032',
'para': ['chen2', 'username']
},
{
'name': '用户查询_033',
'para': ['chen4', 'username']
},
{
'name': '用户查询_034',
'para': ['三', 'vague_username']
},
{
'name': '用户查询_035',
'para': ['二', 'vague_username']
},
{
'name': '用户查询_036',
'para': ['四', 'vague_username']
},
{
'name': '用户查询_037',
'para': ['admin@admi', 'vague_username']
},
{
'name': '用户查询_038',
'para': ['四', 'vague_username']
},
{
'name': '用户查询_039',
'para': ['五', 'vague_username']
},
{
'name': '用户查询_040',
'para': ['六', 'vague_username']
},
{
'name': '用户查询_041',
'para': ['hen', 'vague_username']
},
{
'name': '用户查询_042',
'para': ['hen1', 'vague_username']
},
{
'name': '用户查询_043',
'para': ['hen2', 'vague_username']
},
{
'name': '用户查询_044',
'para': ['hen4', 'vague_username']
}
]
ddt_cases = read_csv_data(csv_file_path)
def teststeps(self):
"""
......
import time
import sys
import os
from datetime import datetime
from hytest import *
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
......@@ -26,10 +22,8 @@ csv_file_path = os.path.join(current_dir, '../../测试数据/MQTT安卓上报
if __name__ == "__main__":
# 读取配置文件
configs = Mqtt.read_config_from_csv(csv_file_path)
broker_address = "192.168.5.218"
port = 1883
# topic = "rebootResponseTopic"
num_repeats = 100 # 重复执行的次数
interval_between_repeats = 0.2 # 每次重复之间的间隔时间(秒)
......@@ -68,60 +62,4 @@ if __name__ == "__main__":
finally:
# 断开与 MQTT 服务器的连接
mqtt_client.disconnect()
# class Android_000x:
# with open(csv_file_path, mode='r', encoding='utf-8') as file:
# reader = csv.DictReader(file)
# ddt_cases = []
# for i, row in enumerate(reader):
# # 将CSV行转换为ddt_cases的格式
# case = {
# 'name': row['name'],
# 'para': [
# row['topic'], row['clientId'], row['appToken'], row['companyNumber'],
# row['cnum'], row['conferenceId'], row['macAddress'], row['authCode']
# ]
# }
# ddt_cases.append(case)
#
# # 打印结果
# # print(f"Total cases: {len(ddt_cases)}")
# # print(ddt_cases)
#
# def teststeps(self):
#
# broker_address = "nat.ubainsyun.com"
# port = 1883
# # topic = "rebootResponseTopic"
# num_repeats = 100 # 重复执行的次数
# interval_between_repeats = 0.2 # 每次重复之间的间隔时间(秒)
# topic, clientId, appToken, companyNumber, cnum, conferenceId, macAddress, authCode = self.para
#
# # 创建 MQTT 客户端实例
# mqtt_client = Mqtt(broker_address, port)
#
# try:
# STEP(1,"连接MQTT")
# # 连接到 MQTT 服务器
# mqtt_client.connect()
# STEP(2,"MQTT发送消息")
# for repeat in range(num_repeats):
# logging.info(f"开始第 {repeat + 1} 次上报")
# current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# message = Mqtt.build_message(topic, clientId, appToken, companyNumber, cnum, conferenceId, macAddress, authCode, current_time, "deviceId")
#
# print(message)
#
# #发送消息
# mqtt_client.publish(topic, message)
#
# # 每次发送之间可以设置一个间隔时间
# time.sleep(interval_between_repeats)
#
# except Exception as e:
# logging.error(f"发送消息时发生错误: {e}")
#
# finally:
# # 断开与 MQTT 服务器的连接
# mqtt_client.disconnect()
mqtt_client.disconnect()
\ No newline at end of file
from datetime import datetime
import logging
import time
import sys
import os
# 获取当前脚本的绝对路径
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论