提交 c92255f5 authored 作者: 陈泽健's avatar 陈泽健

feat(新统一平台): 新增新统一平台测试用例

- 添加新统一平台测试用例文件和测试数据
- 实现新统一平台测试用例的读取和执行
- 更新定时执行任务,增加新统一平台测试
- 优化测试报告展示效果
上级 3102a32a
...@@ -71,7 +71,9 @@ def browser_init(login_type): ...@@ -71,7 +71,9 @@ def browser_init(login_type):
#driver_path = ChromeDriverManager().install() #driver_path = ChromeDriverManager().install()
#service = ChromeService(driver_path) #service = ChromeService(driver_path)
# 手动指定ChromeDriver的路径 # 手动指定ChromeDriver的路径
service = Service(r'E:\ubains-module-test\drivers\chromedriver.exe') # service = Service(r'E:\ubains-module-test\drivers\chromedriver.exe')
# CZJ云桌面
service = Service(r'E:\Python\Scripts\chromedriver.exe')
try: try:
# 创建WebDriver实例 # 创建WebDriver实例
wd = webdriver.Chrome(service=service, options=options) wd = webdriver.Chrome(service=service, options=options)
...@@ -217,7 +219,7 @@ def safe_send_keys(element_locator, value, wd): ...@@ -217,7 +219,7 @@ def safe_send_keys(element_locator, value, wd):
element = WebDriverWait(wd, 5).until(EC.visibility_of_element_located(element_locator)) element = WebDriverWait(wd, 5).until(EC.visibility_of_element_located(element_locator))
element.clear() # 清除元素的当前值 element.clear() # 清除元素的当前值
element.send_keys(value) # 向元素发送指定的键值 element.send_keys(value) # 向元素发送指定的键值
element.send_keys(Keys.ENTER) # 模拟按下回车键 # element.send_keys(Keys.ENTER) # 模拟按下回车键
except TimeoutException: except TimeoutException:
# 如果元素在指定时间内未被找到或不可点击,打印超时异常信息 # 如果元素在指定时间内未被找到或不可点击,打印超时异常信息
INFO(f"超时异常:元素 {element_locator} 在20秒内未找到或无法点击。") INFO(f"超时异常:元素 {element_locator} 在20秒内未找到或无法点击。")
...@@ -474,80 +476,82 @@ def read_csv_data(csv_file_path): ...@@ -474,80 +476,82 @@ def read_csv_data(csv_file_path):
import openpyxl import openpyxl
def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None): def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None):
""" """
读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据 从指定的Excel文件中读取测试用例数据,支持按工作表名称和用例类型筛选
参数: 参数:
xlsx_file_path (str): XLSX文件的路径。 xlsx_file_path (str): Excel文件的路径。
sheet_name (str, optional): 工作表的名称。如果未指定,则使用活动工作表。 sheet_name (str, optional): 指定要读取的工作表名称。若为None,则读取所有工作表。
case_type (str, optional): 测试用例类型,例如 '标准版' 或 'XX项目需求'。如果未指定,则读取所有测试用例 case_type (str, optional): 指定要筛选的用例类型(对应“版本”列)。若为None,则不进行筛选
返回: 返回:
list: 包含字典的列表,每个字典包含测试用例的名称和参数。 list: 解析后的JSON格式测试用例列表。
异常:
FileNotFoundError: 当指定的文件不存在时抛出。
Exception: 其他未预期的错误将被记录并重新抛出。
""" """
try: try:
# 打开XLSX文件 INFO(f"尝试打开文件路径: {xlsx_file_path}")
workbook = openpyxl.load_workbook(xlsx_file_path) if not os.path.exists(xlsx_file_path):
except FileNotFoundError: raise FileNotFoundError(f"文件未找到: {xlsx_file_path}")
raise FileNotFoundError(f"文件未找到: {xlsx_file_path}")
except Exception as e:
raise Exception(f"无法打开文件: {e}")
# 选择工作表
if sheet_name:
try:
sheet = workbook[sheet_name]
except KeyError:
raise KeyError(f"工作表未找到: {sheet_name}")
else:
sheet = workbook.active
# 读取表头,从第三行开始
headers = [cell.value for cell in sheet[3]]
# 打印表头列名 workbook = openpyxl.load_workbook(xlsx_file_path, read_only=True)
# INFO(f"表头列名: {headers}") INFO("XLSX文件成功打开")
# 找到表头中名为 'JSON' 和 '功能类别' 的列索引 # 确定需要处理的工作表列表
try: sheets_to_process = [workbook[sheet_name]] if sheet_name else workbook.worksheets
json_index = headers.index('JSON') ddt_cases = []
except ValueError as e:
raise ValueError(f"表头中没有找到所需的列: {e}")
try:
category_index = headers.index('版本')
except ValueError as e:
raise ValueError(f"表头中没有找到所需的列: {e}")
ddt_cases = []
# 遍历XLSX文件中的每一行数据,从第四行开始
for row_num, row in enumerate(sheet.iter_rows(min_row=4, values_only=True), start=4):
# 获取 JSON 列的数据
json_data = row[json_index]
# 检查 JSON 数据是否为空 for sheet in sheets_to_process:
if json_data is None or json_data.strip() == "": INFO(f"正在处理工作表: {sheet.title}")
continue headers = [cell.value for cell in sheet[3]]
INFO(f"表头列名: {headers}")
# 解析 JSON 字符串 # 查找必需的列索引
try: try:
parsed_json = json.loads(json_data) json_index = headers.index('JSON')
except json.JSONDecodeError: category_index = headers.index('版本')
raise ValueError(f"行 {row_num} 的 JSON 数据无法解析: {json_data}") except ValueError as e:
INFO(f"工作表 {sheet.title} 缺少必要列,跳过: {e}")
continue
# 遍历数据行并解析JSON
for row_num, row in enumerate(sheet.iter_rows(min_row=4, values_only=True), start=4):
json_data = row[json_index]
if not json_data or not json_data.strip():
INFO(f"行 {row_num} 的JSON数据为空,跳过")
continue
try:
# 打印原始JSON数据用于调试
INFO(f"行 {row_num} 原始JSON数据: {json_data}")
parsed_json = json.loads(json_data)
category = row[category_index]
# 根据case_type筛选用例
if case_type and category != case_type:
continue
ddt_cases.append(parsed_json)
INFO(f"行 {row_num} JSON解析成功: {parsed_json}")
except json.JSONDecodeError as e:
logging.error(f"行 {row_num} 的JSON数据解析失败: {e}\n数据内容: {json_data}")
except Exception as e:
logging.error(f"行 {row_num} 处理时发生未知错误: {e}")
INFO(f"XLSX文件处理完成,共找到 {len(ddt_cases)} 条用例")
return ddt_cases
# 获取功能类别 except Exception as e:
category = row[category_index] logging.error(f"处理文件时出错: {e}")
raise
# 检查是否需要过滤测试用例类型 finally:
if case_type and category != case_type: if workbook:
continue workbook.close()
# 将解析后的 JSON 数据添加到列表中
ddt_cases.append(parsed_json)
# 日志记录:XLSX文件已读取
INFO("XLSX文件已读取")
# 返回包含所有测试用例数据的列表
return ddt_cases
def get_cpu_usage(interval=1): def get_cpu_usage(interval=1):
""" """
获取当前进程的 CPU 占用率。 获取当前进程的 CPU 占用率。
......
import sys
import os
from time import sleep
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建统一平台的绝对路径
platform_path = os.path.abspath(os.path.join(current_dir, '..','..','..'))
# 添加路径
sys.path.append(platform_path)
# 导入模块
try:
from 统一平台.base.bases import *
except ModuleNotFoundError as e:
INFO(f"ModuleNotFoundError: {e}")
INFO("尝试使用绝对路径导入")
from 统一平台.base.bases import *
def suite_setup():
STEP(1, "初始化浏览器")
browser_init("新统一平台")
def suite_teardown():
browser_quit()
\ No newline at end of file
import sys
import os
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建统一平台的绝对路径
platform_path = os.path.abspath(os.path.join(current_dir, '..','..','..'))
# 添加路径到系统路径中
sys.path.append(platform_path)
# 导入模块
from 统一平台.base.bases import *
# 构建XLSX文件的绝对路径
xlsx_file_path = os.path.join(current_dir, '..', '..', 'data', '统一平台PC端测试用例.xlsx')
class NewUnifiedPlatform:
#执行指令:
# cd ./统一平台/
# hytest --tag 新统一平台
tags = ['新统一平台']
ddt_cases = read_xlsx_data(xlsx_file_path, case_type="标准版")
def teststeps(self):
"""
执行测试步骤函数,主要用于执行读取的测试用例并进行信息统计模块功能测试操作
"""
# 从全局存储中获取webdriver对象
wd = GSTORE['wd']
name = self.name
# 刷新页面
# wd.refresh()
wd.refresh()
if "议题申报" in name:
# 点击【议题申报】按钮进入模块
INFO("点击【议题申报】按钮")
safe_click((By.XPATH, "//div[@id='CreateTopic']"), wd)
sleep(1)
for step in self.para:
# 赋值页面类型page
page_type = step.get('page')
# 赋值元素定位类型,并将字符串转为Enum类型
locator_type = get_by_enum(step.get('locator_type'))
# 赋值元素值
locator_value = step.get('locator_value')
# 赋值元素类型,例如:click点击、input输入框等
element_type = step.get('element_type')
# 赋值元素值,例如输入框的输入值
element_value = step.get('element_value')
# 赋值预期结果
expected_result = step.get('expected_result')
# 赋值等待时间
# sleep_time = step.get('sleep_time')
# 赋值操作步骤
step_name = step.get('step')
INFO(
f"页面: {page_type}、元素定位类型: {locator_type}、元素定位值: {locator_value}、元素类型: {element_type}、元素值: {element_value}、预期结果: {expected_result}、步骤名称: {step_name}")
if element_type == "click":
safe_click((locator_type, locator_value), wd)
sleep(2)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "input":
safe_send_keys((locator_type, locator_value), element_value, wd)
sleep(2)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "SwitchWindow":
# 将字符转换为int类型
element_value = int(element_value)
wd.switch_to.window(wd.window_handles[element_value])
sleep(2)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "login":
# 退出系统登录
safe_click((By.XPATH, "//div[@class='quit']"), wd)
sleep(2)
INFO(f"开始登录,账号为:{element_value[0]},密码为:{element_value[1]}")
safe_send_keys((By.XPATH, "//input[@placeholder='手机号/用户名/邮箱']"), "admin@xty", wd)
safe_send_keys((By.XPATH, "//input[@placeholder='密码']"), "Ubains@4321", wd)
safe_send_keys((By.XPATH, "//input[@placeholder='图形验证']"), "csba", wd)
sleep(2)
INFO("对协议进行勾选")
safe_click((By.XPATH, "//div[@aria-label='提示']//span[contains(text(),'确定')]"), wd)
INFO("已经勾选协议了")
sleep(2)
safe_click((By.XPATH, "//div[@id='pane-1']//div//span[contains(text(),'登录')]"), wd)
INFO("已经点击登录了")
sleep(2)
elif element_type == "getTips":
notify_text = get_notify_text(wd, (locator_type, locator_value))
INFO(f"获取到的提示信息为:{notify_text}")
sleep(2)
CHECK_POINT(f"获取到的提示信息为:{notify_text}", expected_result in notify_text)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "getText":
text = elment_get_text((locator_type, locator_value), wd)
INFO(f"获取到的文本信息为:{text}")
CHECK_POINT(f"获取到的文本信息为:{text}", expected_result in text)
SELENIUM_LOG_SCREEN(wd, "75")
\ No newline at end of file
...@@ -2,5 +2,6 @@ ...@@ -2,5 +2,6 @@
"_comment_exhibit_unified_platform": "统一平台系统URL", "_comment_exhibit_unified_platform": "统一平台系统URL",
"统一平台": "http://192.168.9.78:443/#/login", "统一平台": "http://192.168.9.78:443/#/login",
"成都太行": "http://192.168.9.78:2443/#/login", "成都太行": "http://192.168.9.78:2443/#/login",
"初始化网页": "https://www.baidu.com/" "初始化网页": "https://www.baidu.com/",
"新统一平台": "https://192.168.5.234"
} }
\ No newline at end of file
...@@ -4,8 +4,8 @@ tunnels: ...@@ -4,8 +4,8 @@ tunnels:
nat1: nat1:
remote_port: 32136 remote_port: 32136
proto: proto:
tcp: "192.168.1.66:80" tcp: "127.0.0.1:80"
nat2: nat2:
remote_port: 32135 remote_port: 32135
proto: proto:
tcp: "192.168.1.66:8081" tcp: "127.0.0.1:8081"
\ No newline at end of file \ No newline at end of file
此差异已折叠。
...@@ -64,6 +64,9 @@ schedule.every().day.at("08:00").do(run_task, run_automation_test, report_title= ...@@ -64,6 +64,9 @@ schedule.every().day.at("08:00").do(run_task, run_automation_test, report_title=
# 定时执行统一平台标准版 # 定时执行统一平台标准版
schedule.every().day.at("08:20").do(run_task, run_automation_test, report_title="统一平台标准版测试报告", report_url_prefix="http://nat.ubainsyun.com:32135", test_case="统一平台", ding_type="标准版巡检") schedule.every().day.at("08:20").do(run_task, run_automation_test, report_title="统一平台标准版测试报告", report_url_prefix="http://nat.ubainsyun.com:32135", test_case="统一平台", ding_type="标准版巡检")
# 定时执行新统一平台标准版
schedule.every().day.at("11:07").do(run_task, run_automation_test, report_title="新统一平台测试报告", report_url_prefix="http://nat.ubainsyun.com:32135", test_case="新统一平台", ding_type="标准版巡检")
try: try:
# 无限循环,持续检查并执行计划任务 # 无限循环,持续检查并执行计划任务
while True: while True:
......
...@@ -583,6 +583,21 @@ import json ...@@ -583,6 +583,21 @@ import json
from hytest import INFO from hytest import INFO
def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None): def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None):
"""
从指定的Excel文件中读取测试用例数据,支持按工作表名称和用例类型筛选。
参数:
xlsx_file_path (str): Excel文件的路径。
sheet_name (str, optional): 指定要读取的工作表名称。若为None,则读取所有工作表。
case_type (str, optional): 指定要筛选的用例类型(对应“功能类别”列)。若为None,则不进行筛选。
返回:
list: 解析后的JSON格式测试用例列表。
异常:
FileNotFoundError: 当指定的文件不存在时抛出。
Exception: 其他未预期的错误将被记录并重新抛出。
"""
try: try:
INFO(f"尝试打开文件路径: {xlsx_file_path}") INFO(f"尝试打开文件路径: {xlsx_file_path}")
if not os.path.exists(xlsx_file_path): if not os.path.exists(xlsx_file_path):
...@@ -591,6 +606,7 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None): ...@@ -591,6 +606,7 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None):
workbook = openpyxl.load_workbook(xlsx_file_path, read_only=True) workbook = openpyxl.load_workbook(xlsx_file_path, read_only=True)
INFO("XLSX文件成功打开") INFO("XLSX文件成功打开")
# 确定需要处理的工作表列表
sheets_to_process = [workbook[sheet_name]] if sheet_name else workbook.worksheets sheets_to_process = [workbook[sheet_name]] if sheet_name else workbook.worksheets
ddt_cases = [] ddt_cases = []
...@@ -599,6 +615,7 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None): ...@@ -599,6 +615,7 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None):
headers = [cell.value for cell in sheet[3]] headers = [cell.value for cell in sheet[3]]
INFO(f"表头列名: {headers}") INFO(f"表头列名: {headers}")
# 查找必需的列索引
try: try:
json_index = headers.index('JSON') json_index = headers.index('JSON')
category_index = headers.index('功能类别') category_index = headers.index('功能类别')
...@@ -606,6 +623,7 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None): ...@@ -606,6 +623,7 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None):
INFO(f"工作表 {sheet.title} 缺少必要列,跳过: {e}") INFO(f"工作表 {sheet.title} 缺少必要列,跳过: {e}")
continue continue
# 遍历数据行并解析JSON
for row_num, row in enumerate(sheet.iter_rows(min_row=4, values_only=True), start=4): for row_num, row in enumerate(sheet.iter_rows(min_row=4, values_only=True), start=4):
json_data = row[json_index] json_data = row[json_index]
if not json_data or not json_data.strip(): if not json_data or not json_data.strip():
...@@ -619,6 +637,7 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None): ...@@ -619,6 +637,7 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None):
parsed_json = json.loads(json_data) parsed_json = json.loads(json_data)
category = row[category_index] category = row[category_index]
# 根据case_type筛选用例
if case_type and category != case_type: if case_type and category != case_type:
continue continue
...@@ -641,6 +660,7 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None): ...@@ -641,6 +660,7 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None):
workbook.close() workbook.close()
import openpyxl import openpyxl
def clear_columns_in_xlsx(xlsx_file_path, sheet_name=None, columns_to_clear=None): def clear_columns_in_xlsx(xlsx_file_path, sheet_name=None, columns_to_clear=None):
""" """
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论