import openpyxl import pandas as pd import re import csv import urllib import glob import subprocess import requests import json import hmac import hashlib import base64 import time import win32api import win32con import win32gui import logging from hytest import * import pandas as pd from selenium import webdriver from datetime import datetime from urllib.parse import urlencode from selenium.webdriver.common.by import By from selenium.webdriver.edge.options import Options from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common import TimeoutException, ElementNotInteractableException, NoSuchElementException from selenium.webdriver.common.keys import Keys from time import sleep logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # 打开浏览器,忽略ssh警告 def open_browser(): INFO('打开默认浏览器') # 更改显示屏分辨率为1920x1080 # success = change_resolution(1280, 1024) success = change_resolution(1920, 1080) edge_options = Options() edge_options.add_argument('--ignore-certificate-errors') edge_options.add_argument('--disable-blink-features=AutomationControlled') edge_options.add_argument('--allow-insecure-localhost') wd = webdriver.Edge(options=edge_options) GSTORE['wd'] = wd # wd.get('https://rms.ubainsyun.com/#/login') wd.get('https://192.168.5.218:8443/#/login') wd.maximize_window() wd.implicitly_wait(10) # 定义调整屏幕分辨率,仅虚拟机电脑环境跑定时任务需要使用。 def change_resolution(width, height): # 获取当前显示器的设备上下文(Device Context, DC) device = win32api.EnumDisplayDevices(None, 0) dm = win32api.EnumDisplaySettings(device.DeviceName, win32con.ENUM_CURRENT_SETTINGS) if dm.PelsWidth != width or dm.PelsHeight != height: print(f"Changing resolution to {width}x{height}") dm.PelsWidth = width dm.PelsHeight = height # CDS_TEST 是测试模式,如果设置成功则不实际应用更改 if win32api.ChangeDisplaySettings(dm, win32con.CDS_TEST) != win32con.DISP_CHANGE_SUCCESSFUL : print("The requested resolution change is not supported.") return False # 实际应用新的分辨率设置 if win32api.ChangeDisplaySettings(dm, 0) != win32con.DISP_CHANGE_SUCCESSFUL: print("Failed to change resolution.") return False print("Resolution changed successfully.") return True else: print("The requested resolution is already set.") return True # 用户进行登录 def user_login(username, password, captcha): wd = GSTORE['wd'] INFO(f'输入登录账号: {username}') username_input = WebDriverWait(wd, 10).until( EC.presence_of_element_located((By.XPATH, "//input[@placeholder='请输入登录账号']")) ) username_input.clear() username_input.send_keys(username) INFO(f'输入登录密码: {password}') password_input = WebDriverWait(wd, 10).until( EC.presence_of_element_located((By.XPATH, "//input[@placeholder='请输入登录密码']")) ) password_input.clear() password_input.send_keys(password) INFO(f'输入验证码:{captcha}') captcha_input = WebDriverWait(wd, 10).until( EC.presence_of_element_located((By.XPATH, "//input[@placeholder='请输入验证码(区分大小写)']")) ) captcha_input.clear() captcha_input.send_keys(captcha) INFO('点击登录按钮') login_button = WebDriverWait(wd, 3).until( EC.element_to_be_clickable((By.XPATH, "//div[@class='loginButton']")) ) login_button.click() sleep(1) def validate_input(username, password, captcha): if not isinstance(username, str) or not isinstance(password, str) or not isinstance(captcha, str): raise ValueError("Invalid input type") if len(username) < 1 or len(password) < 1 or len(captcha) < 1: raise ValueError("Input cannot be empty") def run_login_tests(df): # 遍历每一行 for index, row in df.iterrows(): username = row.get('username', None) password = row.get('password', None) captcha = row.get('captcha', None) if username and password and captcha: try: validate_input(username, password, captcha) user_login(username, password, captcha) except ValueError as e: print(f"Invalid input at row {index}: {e}") # 构建当前所在目录 def main(): # 获取当前脚本的目录 current_dir = os.path.dirname(os.path.abspath(__file__)) csv_file_path = os.path.join(current_dir, '../testdata/01登录模块/登录信息.csv') try: if not os.path.exists(csv_file_path): print(f"File not found: {csv_file_path}") return df = pd.read_csv(csv_file_path, encoding='utf-8') run_login_tests(df) except Exception as e: print(f"An error occurred: {e}") def browser_quit(): """ 退出浏览器并释放资源。 该函数从全局存储中获取浏览器驱动实例,并调用其quit方法来关闭浏览器并释放相关资源。 """ # 清除浏览器 INFO("清除浏览器") # 从全局存储中获取浏览器驱动实例 wd = GSTORE['wd'] # 调用浏览器驱动实例的quit方法,关闭浏览器并释放资源 wd.quit() # 进入后台管理系统页面 def enter_system(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击系统设置按钮') try: enter_sys = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'系统设置')]")) ) enter_sys.click() logging.info('系统设置按钮已点击') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking system settings button: {error}") # 进入管理页面 def enter_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击管理按钮') try: enter_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//div[@class='el-submenu__title']//span[contains(text(),'管理')]")) ) enter_mag.click() logging.info('管理按钮已点击') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 进入用户管理页面 def enter_user_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击用户管理按钮') try: enter_user_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'用户管理')]")) ) enter_user_mag.click() logging.info('打开用户管理页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 进入区域分组管理页面 def enter_areagroup_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击区域分组按钮') try: enter_areagroup_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'区域分组')]")) ) enter_areagroup_mag.click() logging.info('打开区域分组页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 进入区域类型页面 def enter_areatype_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击区域类型按钮') try: enter_areatype_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'区域类型')]")) ) enter_areatype_mag.click() logging.info('打开区域类型页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 进入区域功能页面 def enter_areafuntion_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击区域功能按钮') try: enter_areafuntion_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'区域功能')]")) ) enter_areafuntion_mag.click() logging.info('打开区域功能页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 进入区域管理页面 def enter_area_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击区域管理按钮') try: enter_area_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'区域管理')]")) ) enter_area_mag.click() logging.info('打开区域管理页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 进入类型标签页面 def enter_typetag(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击类型标签按钮') try: enter_type_tag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'类型标签')]")) ) enter_type_tag.click() logging.info('打开类型标签页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 进入设备管理页面 def enter_devices_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击设备管理按钮') try: enter_devices_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'设备管理')]")) ) enter_devices_mag.click() logging.info('打开设备管理页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 进入协议管理页面 def enter_protocol_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击协议管理按钮') try: enter_protocol_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'协议管理')]")) ) enter_protocol_mag.click() logging.info('打开协议管理页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 进入数据类型页面 def enter_datatype(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击数据类型按钮') try: enter_data_type = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'数据类型')]")) ) enter_data_type.click() logging.info('打开数据类型页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 进入联动动作页面 def enter_linkaction(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击联动动作按钮') try: enter_link_action = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'联动动作')]")) ) enter_link_action.click() logging.info('打开联动动作页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 进入模式管理页面 def enter_model_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击模式管理按钮') try: enter_model_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'模式管理')]")) ) enter_model_mag.click() logging.info('打开模式管理页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") # 定义钉钉接口函数 def dingding_send_message(test_report_url, title, mobile, ding_type): """ 发送钉钉机器人消息 参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x :param test_report_url: 测试报告链接 :param title: 消息标题 :param text: 消息内容 :param mobile: 需要@的手机号列表 """ # 记录调用此函数的日志 logging.info("开始构建并发送钉钉机器人消息") # 钉钉机器人的 Webhook URL 和密钥(正式环境) # webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=b0eea0bbf097ce3badb4c832d2cd0267a50486f395ec8beca6e2042102bb295b' # secret = 'SEC928b11659c5fd6476cfa2042edbf56da876abf759289f7e4d3c671fb9a81bf43' # 钉钉机器人的 Webhook URL 和密钥(测试环境) if ding_type == '标准版巡检': webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4' secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6' elif ding_type == '展厅巡检': webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=061b6e9b1ae436f356cfda7fe19b6e58e46b62670046a78bd3a4d869118c612d' secret = 'SEC93212bd880aad638cc0df2b28a72ef4fdf6651cacb8a6a4bc71dcf09705d458d' # 生成时间戳 timestamp = str(round(time.time() * 1000)) # 生成签名 secret_enc = secret.encode('utf-8') string_to_sign = f'{timestamp}\n{secret}' string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 构建最终的 Webhook URL params = { 'access_token': webhook_url.split('=')[1], 'timestamp': timestamp, 'sign': sign } encoded_params = urllib.parse.urlencode(params) final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}' # 记录最终的 Webhook URL logging.info(f"钉钉机器人Webhook URL: {final_webhook_url}") # 调用测试结果获取函数 open_browser() wd = GSTORE['wd'] # print(latest_report) test_result = get_test_result(test_report_url, wd) browser_quit() # 构建消息体 headers = {'Content-Type': 'application/json'} message = { 'msgtype': 'link', 'link': { 'title': title, 'messageUrl': test_report_url, 'text': f"通过:{test_result['pass_percent']}" + f"失败:{test_result['fail_percent']}" + f"异常:{test_result['exception_percent']}", }, "at": { "atMobiles": [mobile], "isAtAll": True } } try: # 发送 POST 请求 response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers) # 检查响应状态码 if response.status_code == 200: logging.info('消息发送成功!') logging.info(f'响应内容: {response.text}') else: logging.error(f'消息发送失败,状态码: {response.status_code}') logging.error(f'响应内容: {response.text}') except requests.exceptions.RequestException as e: logging.error(f'请求异常: {e}') # 调用框架自带生成的测试报告 def get_latest_report_file(report_dir, base_url): """ 获取指定目录下最新的HTML报告文件,并返回带有基础URL的完整路径。 :param report_dir: 报告文件所在的目录 :param base_url: 基础URL :return: 最新的HTML报告文件的完整URL,如果没有找到则返回None """ # 记录调用此函数的日志 logging.info("开始调用get_latest_report_file函数获取报告文件") # 确保报告目录存在 if not os.path.exists(report_dir): logging.error(f"报告目录 {report_dir} 不存在。") return None # 获取指定目录下所有符合模式的HTML报告文件 report_files = glob.glob(os.path.join(report_dir, 'report_*.html')) # 打印找到的文件列表 logging.debug(f"找到的报告文件: {report_files}") # 如果没有找到报告文件,记录警告信息并返回None if not report_files: logging.warning("在指定目录中没有找到报告文件。") return None # 找到最新修改的报告文件 latest_file = max(report_files, key=os.path.getmtime) # 获取最新报告文件的最后修改时间 last_modified_time = datetime.fromtimestamp(os.path.getmtime(latest_file)).strftime('%Y-%m-%d %H:%M:%S') # 记录最新报告文件的信息 logging.info(f"最新报告文件: {latest_file}, 最后修改时间: {last_modified_time}") # 将文件路径转换为相对于基础URL的相对路径 relative_path = os.path.relpath(latest_file, report_dir) # 生成完整的URL full_url = f"{base_url}/{relative_path}".replace("\\", "/") # 返回完整的URL return full_url # 定义发送钉钉报告函数 def get_reportfile_send_dingding(report_title, report_url_prefix, ding_type): try: # 获取报告文件所在的目录 report_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..','log') # 获取基础URL base_url = report_url_prefix # 获取最新的报告文件 latest_report = get_latest_report_file(report_dir, base_url) # 如果找到了最新的报告文件,则发送报告链接到钉钉 if latest_report: logging.info(f"最新报告文件URL: {latest_report}") try: # 记录调用钉钉消息通知函数的日志 logging.info("开始调用钉钉消息通知函数") # 调用钉钉发送消息接口进行推送测试报告链接 dingding_send_message(latest_report, report_title, "13724387318", ding_type) # 记录钉钉消息通知函数调用成功的日志 logging.info("钉钉消息通知函数调用成功") except Exception as e: # 记录钉钉消息通知函数调用失败的日志 logging.error(f"钉钉消息通知函数调用失败: {e}") else: # 记录没有找到报告文件的日志 logging.warning("没有找到报告文件以发送。") except subprocess.CalledProcessError as e: # 处理子进程调用失败的异常 logging.error(f"命令执行失败,返回码 {e.returncode}: {e.output}") except OSError as e: # 处理操作系统相关的异常 logging.error(f"发生操作系统错误: {e}") finally: # 无论是否成功,都记录测试结束的日志 logging.info("自动化测试完成。") # 获取csv文件数据 def read_csv_data(csv_file_path): """ 读取CSV文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。 参数: csv_file_path (str): CSV文件的路径。 返回: list: 包含字典的列表,每个字典包含测试用例的名称和参数。 """ # 打开CSV文件,使用只读模式,确保兼容性并处理编码 with open(csv_file_path, mode='r', newline='', encoding='utf-8') as file: # 创建CSV阅读器 reader = csv.reader(file) # 读取表头,为后续数据解析做准备 headers = next(reader) ddt_cases = [] # 遍历CSV文件中的每一行数据 for row in reader: # 将每一行数据转换为字典,其中包含测试用例的名称和参数 case = { 'name': row[0], 'para': row[1:] } # 将转换后的测试用例数据添加到列表中 ddt_cases.append(case) # 日志记录:CSV文件已读取 INFO("CSV文件已读取") # 返回包含所有测试用例数据的列表 return ddt_cases # 定义生成的报告,并且发送钉钉通知 def run_automation_test(report_title, report_url_prefix , ding_type): """ 运行自动化测试并生成报告。 参数: - report_title: 报告的标题 - report_url_prefix: 报告URL的前缀 - test_case: 测试用例脚本执行的标签名 """ # 记录测试开始的日志 logging.info("开始自动化测试...") # 构建运行测试命令 command = [ 'hytest', '--report_title', report_title, '--report_url_prefix', report_url_prefix ] # 记录将要执行的命令日志 logging.info(f"执行命令: {' '.join(command)}") try: # 执行测试命令并获取结果 result = subprocess.run(command, capture_output=True, text=True, check=True) # 记录命令的标准输出和错误输出 logging.debug(f"命令标准输出: {result.stdout}") logging.debug(f"命令错误输出: {result.stderr}") except subprocess.CalledProcessError as e: # 处理子进程调用失败的异常 logging.error(f"命令执行失败,返回码 {e.returncode}: {e.output}") except OSError as e: # 处理操作系统相关的异常 logging.error(f"发生操作系统错误: {e}") finally: # 无论测试是否成功,都记录测试结束的日志 logging.info("自动化测试完成。") # 调用回调函数处理后续操作 get_reportfile_send_dingding(f"{report_title}", report_url_prefix, ding_type) # 获取报告中的通过率、失败率和异常率,用于发送钉钉报告 def get_test_result(latest_report, wd): """ 获取测试结果页面的通过率、失败率和异常率 :param latest_report: 测试结果页面的URL :param wd: WebDriver实例,用于访问和操作网页 :return: 包含通过率、失败率和异常率的字典 """ # 初始化测试结果字典 test_result = { "pass_percent": "", "fail_percent": "", "exception_percent": "" } # 访问测试结果页面 wd.get(latest_report) sleep(5) # 点击简略显示 safe_click((By.XPATH,"//div[@id='display_mode']"), wd) sleep(5) # 定义一个函数来获取和解析百分比 def get_percentage(selector, wd): text = elment_get_text(selector, wd) logging.info(f"获取的文本:{text}") match = re.search(r'(\d+(\.\d+)?)%', text) if match: return match.group(0) else: logging.error(f"未找到百分比匹配项,文本内容: {text}") return "0" # 获取通过率 pass_percent = get_percentage((By.CSS_SELECTOR, "div[class='result_barchart'] div:nth-child(1) span:nth-child(1)"), wd) test_result["pass_percent"] = pass_percent # 获取失败率 fail_percent = get_percentage( (By.CSS_SELECTOR, "body > div.main_section > div.result > div > div:nth-child(2) > span"), wd) test_result["fail_percent"] = fail_percent # 获取异常率 exception_percent = get_percentage( (By.CSS_SELECTOR, "body > div.main_section > div.result > div > div:nth-child(3) > span"), wd) test_result["exception_percent"] = exception_percent # 输出test_result logging.info(test_result) print(test_result) sleep(5) # 返回测试结果字典 return test_result # 通用方法-点击元素 def safe_click(element_locator, wd): """ 对其定位器指定的元素执行安全单击。 此函数尝试以处理潜在异常的方式单击元素。 它等待元素可见并可单击,然后再尝试单击它。 如果该元素在20秒内无法点击,或者它不存在, 或者不可交互,它会捕获相应的异常并记录一条信息性消息。 参数: -element_locator:要单击的元素的定位器,指定为元组。 -wd:用于查找元素并与之交互的WebDriver实例。 """ try: # Wait up to 20 seconds for the element to be visible element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) # Attempt to click the element element.click() except TimeoutException: # Log a message if the element is not found or not clickable within 20 seconds INFO(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # Log a message if the element is not found INFO(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # Log a message if the element is not interactable INFO(f"ElementNotInteractableException: Element {element_locator} is not interactable.") # 通用方法-在输入框输入内容 def safe_send_keys(element_locator, value, wd): """ 安全地向网页元素发送键值。 该函数尝试在指定时间内找到指定的网页元素,如果找到并且元素可见,将先清除元素内容,然后发送指定的键值。 如果在指定时间内未能找到元素或元素不可点击,则捕获相应的异常并打印错误信息。 参数: element_locator (tuple): 用于定位网页元素的策略和定位器的元组,例如(By.ID, 'element_id')。 value (str): 要发送到元素的键值字符串。 wd: WebDriver实例,用于与浏览器交互。 异常处理: - TimeoutException: 如果元素在指定时间内未被找到或不可点击。 - NoSuchElementException: 如果元素不存在。 - ElementNotInteractableException: 如果元素存在但不可交互。 """ try: # 等待元素在指定时间内可见 element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) element.clear() # 清除元素的当前值 element.send_keys(value) # 向元素发送指定的键值 except TimeoutException: # 如果元素在指定时间内未被找到或不可点击,打印超时异常信息 INFO(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # 如果元素不存在,打印相应异常信息 INFO(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # 如果元素不可交互,打印相应异常信息 INFO(f"ElementNotInteractableException: Element {element_locator} is not interactable.") # 通用方法-输入框清除 def input_clear(element_locator, wd): """ 清空输入框中的文本。 该函数通过元素定位器找到指定的输入框,并尝试清空其内容。它使用显式等待来确保元素在尝试清除之前是可见的。 参数: - element_locator: 用于定位输入框的元素定位器,通常是一个元组,包含定位方法和定位表达式。 - wd: WebDriver实例,用于与浏览器交互。 异常处理: - TimeoutException: 如果在指定时间内元素不可见,则捕获此异常并打印超时异常消息。 - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的消息。 - ElementNotInteractableException: 如果元素不可操作(例如,元素不可见或不可点击),则捕获此异常并打印相应消息。 """ try: # 等待元素可见,并在可见后清空输入框。 input_element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) input_element.clear() except TimeoutException: # 如果元素在20秒内不可见,打印超时异常消息。 print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # 如果找不到元素,打印未找到元素的消息。 print(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # 如果元素不可操作,打印元素不可操作的消息。 print(f"ElementNotInteractableException: Element {element_locator} is not interactable.") # 通用方法-回车 def send_keyboard(element_locator, wd): """ 向指定元素发送键盘事件。 该函数尝试找到页面上的一个元素,并向其发送RETURN键点击事件。它使用显式等待来确保元素在尝试交互之前是可见的。 如果在指定时间内元素不可见、不存在或不可交互,则捕获相应的异常并打印错误消息。 参数: - element_locator: 用于定位元素的元组,格式为(by, value)。例如,(By.ID, 'element_id')。 - wd: WebDriver实例,用于与浏览器进行交互。 异常处理: - TimeoutException: 如果元素在20秒内不可见,则捕获此异常并打印超时错误消息。 - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的错误消息。 - ElementNotInteractableException: 如果元素不可交互(例如,被遮挡或不可点击),则捕获此异常并打印相应错误消息。 """ try: # 等待元素可见,并在可见后向其发送RETURN键点击事件。 element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) element.send_keys(Keys.RETURN) except TimeoutException: # 如果元素在指定时间内不可见,打印超时错误消息。 print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # 如果找不到指定的元素,打印未找到元素的错误消息。 print(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # 如果元素不可交互,打印不可交互错误消息。 print(f"ElementNotInteractableException: Element {element_locator} is not interactable.") # 通用方法-获取提示文本 def get_notify_text(wd,element_locator): """ 获取通知文本信息。 该函数通过WebDriver等待特定的元素出现,并截取其文本内容作为通知信息。此外,它还负责在获取通知文本后进行屏幕截图, 以便于后续的调试或记录需要。 参数: wd (WebDriver): 由上层传入的WebDriver对象,用于操作浏览器。 返回: str: 提取的通知文本信息。如果未能提取到信息或发生异常,则返回None。 """ try: # 获取提示信息 notify_text = WebDriverWait(wd, 60).until( EC.presence_of_element_located(element_locator) ).text # 屏幕截图 SELENIUM_LOG_SCREEN(wd,"50%") return notify_text except Exception as e: # 记录异常信息 INFO(f"Exception occurred: {e}") # 通用方法-获取页面中的文本内容 def elment_get_text(element_locator, wd): """ 获取页面元素的文本。 该函数通过显式等待的方式,确保页面元素在20秒内可见并获取其文本。 如果在规定时间内元素不可见、不存在或不可交互,则会捕获相应的异常并打印错误信息。 参数: - element_locator: 用于定位页面元素的元组,通常包含定位方式和定位值。 - wd: WebDriver对象,用于与浏览器进行交互。 返回: - element_text: 页面元素的文本。如果发生异常,则返回None。 """ try: # 使用WebDriverWait等待页面元素在20秒内可见,并获取其文本。 element_text = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)).text return element_text except TimeoutException: # 如果超过20秒元素仍未可见,则捕获TimeoutException异常并打印错误信息。 print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # 如果页面上不存在该元素,则捕获NoSuchElementException异常并打印错误信息。 print(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # 如果元素存在但不可交互(例如被遮挡),则捕获ElementNotInteractableException异常并打印错误信息。 print(f"ElementNotInteractableException: Element {element_locator} is not interactable.") # 通用方法-读取XLSX文件中的数据,并将其转换为一个包含字典的列表 def read_xlsx_data(xlsx_file_path): """ 读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。 参数: xlsx_file_path (str): XLSX文件的路径。 返回: list: 包含字典的列表,每个字典包含测试用例的名称和参数。 """ # 打开XLSX文件 workbook = openpyxl.load_workbook(xlsx_file_path) # 假设数据在第一个工作表中 sheet = workbook.active # 读取表头,从第三行开始 headers = [cell.value for cell in sheet[3]] # 打印表头列名 # print(f"表头列名: {headers}") # 找到表头中名为 'JSON' 的列索引 try: json_index = headers.index('JSON') except ValueError as e: raise ValueError(f"表头中没有找到所需的列: {e}") ddt_cases = [] # 遍历XLSX文件中的每一行数据,从第四行开始 for row in sheet.iter_rows(min_row=4, values_only=True): # 获取 JSON 列的数据 json_data = row[json_index] # 打印 JSON 数据以进行调试 # print(f"JSON 数据: {json_data}") # 解析 JSON 字符串 try: if json_data: parsed_json = json.loads(json_data) else: raise ValueError("JSON 数据为空") except json.JSONDecodeError: raise ValueError(f"无法解析 JSON 数据: {json_data}") # 将解析后的 JSON 数据添加到列表中 ddt_cases.append(parsed_json) # 日志记录:XLSX文件已读取 INFO("XLSX文件已读取") # 返回包含所有测试用例数据的列表 return ddt_cases # 通用方法-转换字符串类型 def get_by_enum(type_str): """ 将字符串类型的定位器类型转换为 selenium.webdriver.common.by.By 枚举类型。 参数: type_str (str): 定位器类型字符串,例如 'XPATH'。 返回: selenium.webdriver.common.by.By: 对应的 By 枚举类型。 """ type_str = type_str.upper() if type_str == 'XPATH': return By.XPATH elif type_str == 'ID': return By.ID elif type_str == 'NAME': return By.NAME elif type_str == 'CLASS_NAME': return By.CLASS_NAME elif type_str == 'CSS_SELECTOR': return By.CSS_SELECTOR elif type_str == 'TAG_NAME': return By.TAG_NAME elif type_str == 'LINK_TEXT': return By.LINK_TEXT elif type_str == 'PARTIAL_LINK_TEXT': return By.PARTIAL_LINK_TEXT else: raise ValueError(f"未知的定位器类型: {type_str}") # 通用方法-用户登录 def user_login(element_locators, username, password, verifycode): """ 管理员登录函数。 该函数通过模拟用户输入用户名、密码和验证码,并点击登录按钮,以实现管理员登录。 """ # 获取webdriver实例 wd = GSTORE['wd'] # 打印用户名输入信息 INFO(f"输入用户名:{username}") # 向用户名输入框发送用户名 safe_send_keys(element_locators['username_locator'], f'{username}', wd) sleep(5) # 打印密码输入信息 INFO(f"输入密码:{password}") # 向密码输入框发送密码 safe_send_keys(element_locators['password_locator'], f"{password}", wd) sleep(5) # 打印验证码输入信息 INFO(f"输入验证码:{verifycode}") # 向验证码输入验证码 safe_send_keys(element_locators['verifycode_locator'], f"{verifycode}", wd) sleep(5) #点击登录按钮 INFO("点击登录按钮") safe_click(element_locators['submitButton_locator'], wd) sleep(5)