提交 35b07be9 authored 作者: PGY's avatar PGY

refactor(统一平台): 重构代码并添加定时执行功能

- 优化了浏览器初始化过程
- 添加了定时执行功能测试的脚本,实现了自动化定时任务
- 调整了截图保存路径,统一到 reports/imgs目录下
- 移除了未使用的导入和冗余代码,精简了代码结构
- 添加了对钉钉机器人发送测试结果的支持
上级 acd876b6
......@@ -106,7 +106,7 @@ def SELENIUM_LOG_SCREEN(driver, width: str = None, module_name: str = None, func
# 使用当前时间作为文件名,确保唯一性
filename = datetime.now().strftime('%Y%m%d%H%M%S%f')
# 定义文件路径和相对于log的路径
filepath = f'log/imgs/{filename}.png'
filepath = f'reports/imgs/{filename}.png'
filepath_relative_to_log = f'imgs/{filename}.png'
# 获取并保存截图
driver.get_screenshot_as_file(filepath)
......
......@@ -13,20 +13,20 @@ import subprocess
import logging
from hytest import *
from selenium import webdriver
from selenium.common import ElementNotInteractableException
from selenium.common.exceptions import ElementNotInteractableException
from selenium.webdriver.common.keys import Keys
from urllib.parse import urlencode
from datetime import datetime
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 获取当前脚本的父目录
parent_dir = os.path.dirname(current_dir)
logging.info(parent_dir)
# 添加路径
sys.path.append(current_dir)
# # 获取当前脚本的绝对路径
# current_dir = os.path.dirname(os.path.abspath(__file__))
# # 获取当前脚本的父目录
# parent_dir = os.path.dirname(current_dir)
# logging.info(parent_dir)
# # 添加路径
# sys.path.append(current_dir)
# 配置日志记录器,仅输出到控制台
logging.basicConfig(
......@@ -65,13 +65,13 @@ def browser_init(login_type):
# 允许不安全的本地主机运行,通常用于开发和测试环境
options.add_argument('--allow-insecure-localhost')
# 使用无痕窗口
options.add_argument('--incognito')
# options.add_argument('--incognito')
# 使用webdriver_manager自动下载并管理chromedriver
driver_path = ChromeDriverManager().install()
service = ChromeService(driver_path)
#driver_path = ChromeDriverManager().install()
#service = ChromeService(driver_path)
# 手动指定ChromeDriver的路径
# service = Service(r'C:\Users\29194\AppData\Local\Programs\Python\Python310\Scripts\chromedriver.exe')
service = Service(r'E:\ubains-module-test\drivers\chromedriver.exe')
try:
# 创建WebDriver实例
wd = webdriver.Chrome(service=service, options=options)
......@@ -83,7 +83,9 @@ def browser_init(login_type):
# 打开对应类型的登录页面
wd.get(login_url)
# 最大化浏览器窗口
wd.maximize_window()
wd.minimize_window() # 强制恢复窗口(即使不是最小化也无副作用)
time.sleep(1) # 等待恢复完成
wd.maximize_window() # 再次尝试最大化
# 将WebDriver实例存储在全局存储器中,以便后续使用
GSTORE['wd'] = wd
......@@ -93,6 +95,7 @@ def browser_init(login_type):
except Exception as e:
# 捕获并记录初始化过程中的任何异常
logging.error(f"浏览器初始化失败:{e}")
raise # 主动抛出异常,防止后续继续执行
# 从配置项config中获取登录URL
def get_login_url_from_config(login_type):
......@@ -240,7 +243,7 @@ def safe_click(element_locator, wd):
"""
try:
# Wait up to 20 seconds for the element to be visible
element = WebDriverWait(wd, 5).until(EC.visibility_of_element_located(element_locator))
element = WebDriverWait(wd, 10).until(EC.visibility_of_element_located(element_locator))
# Attempt to click the element
element.click()
except TimeoutException:
......@@ -469,76 +472,6 @@ def read_csv_data(csv_file_path):
# 读取测试用例xlsx文件中的JSON数据进行数据驱动函数
import openpyxl
# def read_xlsx_data(xlsx_file_path, sheet_name=None):
# """
# 读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。
#
# 参数:
# xlsx_file_path (str): XLSX文件的路径。
# sheet_name (str, optional): 工作表的名称。如果未指定,则使用活动工作表。
#
# 返回:
# list: 包含字典的列表,每个字典包含测试用例的名称和参数。
# """
# try:
# # 打开XLSX文件
# workbook = openpyxl.load_workbook(xlsx_file_path)
# except FileNotFoundError:
# raise FileNotFoundError(f"文件未找到: {xlsx_file_path}")
# except Exception as e:
# raise Exception(f"无法打开文件: {e}")
#
# # 选择工作表
# if sheet_name:
# try:
# sheet = workbook[sheet_name]
# except KeyError:
# raise KeyError(f"工作表未找到: {sheet_name}")
# else:
# sheet = workbook.active
#
# # 读取表头,从第三行开始
# headers = [cell.value for cell in sheet[3]]
#
# # 打印表头列名
# # INFO(f"表头列名: {headers}")
#
# # 找到表头中名为 'JSON' 的列索引
# try:
# json_index = headers.index('JSON')
# except ValueError as e:
# raise ValueError(f"表头中没有找到所需的列: {e}")
#
# ddt_cases = []
# # 遍历XLSX文件中的每一行数据,从第四行开始
# for row_num, row in enumerate(sheet.iter_rows(min_row=4, values_only=True), start=4):
# # 获取 JSON 列的数据
# json_data = row[json_index]
#
# # 打印 JSON 数据以进行调试
# # INFO(f"行 {row_num} 的 JSON 数据: {json_data}")
#
# # 检查 JSON 数据是否为空
# if json_data is None or json_data.strip() == "":
# # INFO(f"跳过行 {row_num},JSON 数据为空")
# continue
#
# # 解析 JSON 字符串
# try:
# parsed_json = json.loads(json_data)
# except json.JSONDecodeError:
# raise ValueError(f"行 {row_num} 的 JSON 数据无法解析: {json_data}")
#
# # 将解析后的 JSON 数据添加到列表中
# ddt_cases.append(parsed_json)
#
# # 日志记录:XLSX文件已读取
# # INFO("XLSX文件已读取")
# # 返回包含所有测试用例数据的列表
# return ddt_cases
# 获取当前进程的 CPU 占用率函数
def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None):
"""
读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。
......@@ -591,12 +524,8 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None):
# 获取 JSON 列的数据
json_data = row[json_index]
# 打印 JSON 数据以进行调试
# INFO(f"行 {row_num} 的 JSON 数据: {json_data}")
# 检查 JSON 数据是否为空
if json_data is None or json_data.strip() == "":
# INFO(f"跳过行 {row_num},JSON 数据为空")
continue
# 解析 JSON 字符串
......@@ -619,7 +548,6 @@ def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None):
INFO("XLSX文件已读取")
# 返回包含所有测试用例数据的列表
return ddt_cases
def get_cpu_usage(interval=1):
"""
获取当前进程的 CPU 占用率。
......@@ -797,8 +725,8 @@ def dingding_send_message(latest_report, title, mobile, ding_type):
# webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=b0eea0bbf097ce3badb4c832d2cd0267a50486f395ec8beca6e2042102bb295b'
# secret = 'SEC928b11659c5fd6476cfa2042edbf56da876abf759289f7e4d3c671fb9a81bf43'
# 钉钉机器人的 Webhook URL 和密钥(测试环境)
if ding_type == '标准版巡检':
VALID_TYPES = ['标准版巡检', '项目功能验证']
if ding_type in VALID_TYPES:
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'
secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'
elif ding_type == '展厅巡检':
......@@ -828,7 +756,7 @@ def dingding_send_message(latest_report, title, mobile, ding_type):
logging.info(f"钉钉机器人Webhook URL: {final_webhook_url}")
# 调用测试结果获取函数
browser_init("标准版预定系统")
browser_init("初始化网页")
wd = GSTORE['wd']
# print(latest_report)
test_result = get_test_result(latest_report, wd)
......@@ -967,6 +895,7 @@ from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
# 点击并拖拽函数
def single_click_and_drag(source_element_locator, target_element_locator, wd):
"""
......@@ -1301,6 +1230,9 @@ def get_test_result(latest_report, wd):
def get_percentage(selector, wd):
text = elment_get_text(selector, wd)
logging.info(f"获取的文本:{text}")
if text is None:
logging.error("获取的文本为 None,无法进行正则匹配")
return "0"
match = re.search(r'(\d+(\.\d+)?)%', text)
if match:
return match.group(0)
......@@ -1329,17 +1261,13 @@ def get_test_result(latest_report, wd):
# 返回测试结果字典
return test_result
# if __name__ == "__main__":
# browser_init("展厅预定巡检")
# wd = GSTORE['wd']
# test_result = get_test_result("http://nat.ubainsyun.com:31134/report_20250217_094401.html",wd)
# print(test_result)
# 获取本机IP地址函数
import yaml
import logging
import socket
import subprocess
# 获取本机的局域网IP地址
def get_local_ip():
"""
获取本机的局域网IP地址。
......@@ -1439,21 +1367,6 @@ def kill_ngrok():
# 如果终止 ngrok 进程时发生其他错误,记录错误信息
logging.error(f"终止 ngrok 进程时出错: {e}")
# if __name__ == '__main__':
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
#
# # 获取本机IP地址
# local_ip = get_local_ip()
# logging.info(f"本机IP地址: {local_ip}")
#
# # 更新ngrok.cfg文件
# ngrok_config_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\ngrok\ngrok-调试主机\ngrok.cfg'
# update_ngrok_config(ngrok_config_path, local_ip)
#
# # 启动ngrok
# ngrok_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\ngrok\ngrok-调试主机\ngrok.exe'
# start_ngrok(ngrok_path, ngrok_config_path)
# 字符串转换枚举类型函数
def get_by_enum(type_str):
"""
......@@ -1489,8 +1402,6 @@ def get_by_enum(type_str):
# 如果输入的定位器类型字符串不匹配任何已知的 By 枚举类型,抛出 ValueError 异常
raise ValueError(f"未知的定位器类型: {type_str}")
# 获取当前时间并格式化为 'HH:MM' 格式的函数,用于会议预定使用
import datetime
def get_current_time_formatted():
"""
获取当前时间并格式化为 'HH:MM' 格式,并选择最近的未来时间点(如 00:00, 00:15, 00:30, 00:45 等)。
......@@ -1499,7 +1410,7 @@ def get_current_time_formatted():
str: 最近的未来时间点字符串,例如 '17:00'。
"""
# 获取当前时间
current_time = datetime.datetime.now()
current_time = datetime.now()
current_time_formatted = current_time.strftime("%H:%M")
# 定义时间点列表
......@@ -1531,7 +1442,7 @@ def get_current_time_formatted():
]
# 将当前时间转换为 datetime 对象
current_time_dt = datetime.datetime.strptime(current_time_formatted, "%H:%M")
current_time_dt = datetime.strptime(current_time_formatted, "%H:%M")
# 初始化最近时间点和最小时间差
closest_time_point = None
......@@ -1539,7 +1450,7 @@ def get_current_time_formatted():
# 遍历时间点列表,找到最近的未来时间点
for time_point in time_points:
time_point_dt = datetime.datetime.strptime(time_point, "%H:%M")
time_point_dt = datetime.strptime(time_point, "%H:%M")
# 如果时间点在当前时间之后
if time_point_dt > current_time_dt:
......
# 浏览器初始化函数
def browser_init(login_type):
"""
初始化浏览器设置和实例。
此函数旨在创建并配置一个Chrome浏览器实例,包括设置Chrome选项以排除不必要的日志,
并尝试打开特定的登录页面。任何初始化过程中出现的错误都会被捕获并记录。
参数:
login_type (str): 指定登录类型,根据不同的登录类型选择不同的URL。
返回:
"""
# 标记初始化过程的开始
INFO("'----------' 正在初始化浏览器 '----------'")
# 创建Chrome选项实例,用于配置浏览器行为
options = webdriver.ChromeOptions()
# 添加实验性选项,排除某些命令行开关以减少输出日志
options.add_experimental_option('excludeSwitches', ['enable-Logging'])
# 忽略证书错误,允许在本地主机上运行时不安全
options.add_argument('--ignore-certificate-errors')
# 禁用自动化控制特征检测,避免被网站识别为自动化流量
options.add_argument('--disable-blink-features=AutomationControlled')
# 允许不安全的本地主机运行,通常用于开发和测试环境
options.add_argument('--allow-insecure-localhost')
# 使用webdriver_manager自动下载并管理chromedriver
# service = ChromeService(ChromeDriverManager().install())
# 使用备用的ChromeDriver下载源
# service = Service(ChromeDriverManager().install())
# 手动指定ChromeDriver的路径
# 自动化运行服务器的chromedriver路径:
service = Service(r'C:\Users\29194\AppData\Local\Programs\Python\Python310\Scripts\chromedriver.exe')
# service = Service(r'C:\Program Files\Python310\Scripts\chromedriver.exe')
# 尝试创建WebDriver实例并执行初始化操作
try:
# 创建WebDriver实例
wd = webdriver.Chrome(service=service, options=options)
# 设置隐式等待时间为10秒,以允许元素加载
wd.implicitly_wait(60)
# 获取登录URL
login_url = get_login_url_from_config(login_type)
# 打开对应类型的登录页面
wd.get(login_url)
# 最大化浏览器窗口
wd.maximize_window()
# 将WebDriver实例存储在全局存储器中,以便后续使用
GSTORE['wd'] = wd
# 标记初始化过程完成
INFO("'----------' 浏览器初始化完成 '----------'")
except Exception as e:
# 捕获并记录初始化过程中的任何异常
logging.error(f"浏览器初始化失败:{e}")
import sys
import os
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建统一平台的绝对路径
platform_path = os.path.abspath(os.path.join(current_dir, '..','..','..'))
# 添加路径
# 添加路径到系统路径中
sys.path.append(platform_path)
# 导入模块
from 统一平台.base.bases import *
# 获取当前脚本所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建XLSX文件的绝对路径
xlsx_file_path = os.path.join(current_dir, '..', '..', 'data', '统一平台PC端测试用例.xlsx')
......@@ -20,7 +15,7 @@ class Unified_Platform_0001:
#执行指令:
# cd ./统一平台/
# hytest --tag 统一平台
tags = ['创建会议','重庆长安']
tags = ['创建会议','重庆长安','统一平台']
ddt_cases = read_xlsx_data(xlsx_file_path, sheet_name="新建会议&")
def teststeps(self):
wd = GSTORE['wd']
......@@ -36,7 +31,7 @@ class Unified_Platform_0001:
element_type = step.get('element_type')
element_value = step.get('element_value')
expented_result = step.get('expented_result')
print(f"执行JSON:{step}")
print(f"正在执行JSON:{step}")
if step.get("page") == "CreateMeeting":
if element_type == "input":
safe_send_keys((locator_type, locator_value), element_value, wd)
......
......@@ -11,7 +11,7 @@ sys.path.append(platform_path)
from 统一平台.base.bases import *
class Unified_Platform_0001:
tags = ['统一平台','会议控制','重庆长安']
tags = ['统一平台','会议控制','重庆长安','统一标准版']
def teststeps(self):
wd = GSTORE['wd']
# 进入会控界面
......
......@@ -23,11 +23,15 @@ def suite_setup():
wd = GSTORE['wd']
STEP(2, "登录系统")
safe_send_keys((By.XPATH, "//input[@placeholder='手机号/用户名/邮箱']"),"admin@pgy",wd)
safe_send_keys((By.XPATH, "//input[@placeholder='密码']"),"Ubains@13579",wd)
safe_send_keys((By.XPATH, "//input[@placeholder='密码']"),"Ubains@1357",wd)
safe_send_keys((By.XPATH, "//input[@placeholder='图形验证']"),"csba",wd)
safe_click((By.XPATH, "//span[@class='el-checkbox__inner']"),wd)
sleep(2)
print("对协议进行勾选")
safe_click((By.XPATH, "//div[@aria-label='提示']//span[contains(text(),'确定')]"),wd)
print("已经勾选协议了")
sleep(2)
safe_click((By.XPATH, "//div[@id='pane-1']//div//span[contains(text(),'登录')]"),wd)
print("已经点击登录了")
sleep(2)
def suite_teardown():
......
import sys
import os
from win32trace import flush
#from win32trace import flush
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
......@@ -50,4 +50,4 @@ class Unified_Platform_0001:
notify_text = elment_get_text((locator_type,locator_value), wd)
INFO(f"获取弹窗提示内容为:{notify_text}")
CHECK_POINT("判断是否跟预期一致", expented_result == notify_text)
SELENIUM_LOG_SCREEN(wd, "50%")
\ No newline at end of file
SELENIUM_LOG_SCREEN(wd, "50%",1,1,1 )
\ No newline at end of file
......@@ -25,9 +25,13 @@ def suite_setup():
safe_send_keys((By.XPATH, "//input[@placeholder='手机号/用户名/邮箱']"),"admin@pgy",wd)
safe_send_keys((By.XPATH, "//input[@placeholder='密码']"),"Ubains@1357",wd)
safe_send_keys((By.XPATH, "//input[@placeholder='图形验证']"),"csba",wd)
safe_click((By.XPATH, "//span[@class='el-checkbox__inner']"),wd)
sleep(2)
print("对协议进行勾选")
safe_click((By.XPATH, "//div[@aria-label='提示']//span[contains(text(),'确定')]"),wd)
print("已经勾选协议了")
sleep(2)
safe_click((By.XPATH, "//div[@id='pane-1']//div//span[contains(text(),'登录')]"),wd)
print("已经点击登录了")
sleep(2)
def suite_teardown():
......
{
"_comment_exhibit_unified_platform": "统一平台系统URL",
"统一平台": "http://192.168.9.78:443/#/login",
"成都太行": "http://192.168.9.78:2443/#/login"
"成都太行": "http://192.168.9.78:2443/#/login",
"初始化网页": "https://www.baidu.com/"
}
\ No newline at end of file
server_addr: "ngrok.ubsyun.com:9083"
trust_host_root_certs: false
tunnels:
nat1:
remote_port: 32136
proto:
tcp: "192.168.1.66:80"
nat2:
remote_port: 32135
proto:
tcp: "192.168.1.66:8081"
\ No newline at end of file
ngrok -config=ngrok.cfg start nat1 nat2
\ No newline at end of file
import schedule
import queue
from base.bases import *
import time
import logging
import threading
"""
执行指令:
1.打开一个终端输入:
- cd .\统一平台\
- python -m http.server 8081 --directory reports
2.打开新终端输入:
- cd .\统一平台\ngrok\ngrok-调试主机\
- .\start.bat
3.再打开一个终端输入:
- cd .\统一平台\
- python .\定时执行功能测试.py
"""
# 创建一个任务队列,用于存储待处理的任务
task_queue = queue.Queue()
def run_task(task, *args, **kwargs):
# 将任务及其参数放入任务队列
task_queue.put((task, args, kwargs))
logging.debug(f"任务已加入队列: {task.__name__} with args: {args} and kwargs: {kwargs}")
def worker():
# 工作线程的主循环
while True:
# 从任务队列中获取任务及其参数
task, args, kwargs = task_queue.get()
try:
# 记录任务开始执行的时间
logging.debug(f"开始执行任务: {task.__name__} with args: {args} and kwargs: {kwargs}")
# 执行任务并获取结果
result = task(*args, **kwargs)
# 如果任务有返回结果,记录日志
if result:
logging.info(result)
except Exception as e:
# 捕获任务执行过程中发生的任何异常并记录错误日志
logging.error(f"执行任务时发生错误: {e}", exc_info=True)
finally:
# 无论任务是否成功执行,都标记任务已完成
task_queue.task_done()
# 记录任务完成的时间
logging.debug(f"任务完成: {task.__name__}")
def start_workers(num_workers):
# 启动指定数量的工作线程
for _ in range(num_workers):
# 创建一个新的工作线程,目标函数为 worker,设置为守护线程
threading.Thread(target=worker, daemon=True).start()
# 启动3个工作线程
start_workers(3)
# 定时执行成都太行项目
schedule.every().day.at("08:00").do(run_task, run_automation_test, report_title="成都太行项目测试报告", report_url_prefix="http://nat.ubainsyun.com:32135", test_case="成都太行", ding_type="项目功能验证")
# 定时执行重庆长安项目
#schedule.every().day.at("08:15").do(run_task, run_automation_test, report_title="重庆长安项目测试报告", report_url_prefix="http://nat.ubainsyun.com:32135", test_case="重庆长安", ding_type="项目功能验证")
# 定时执行统一平台标准版
schedule.every().day.at("08:20").do(run_task, run_automation_test, report_title="统一平台标准版测试报告", report_url_prefix="http://nat.ubainsyun.com:32135", test_case="统一平台", ding_type="标准版巡检")
try:
# 无限循环,持续检查并执行计划任务
while True:
schedule.run_pending() # 检查并执行所有待处理的任务
time.sleep(1) # 每秒检查一次
except KeyboardInterrupt:
# 捕获用户中断信号 (Ctrl+C)
logging.info("Scheduler interrupted by user.")
except Exception as e:
# 捕获其他未预期的异常
logging.error(f"Unexpected error: {e}", exc_info=True)
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论