import urllib from selenium.webdriver.chrome.service import Service import json import hmac import hashlib import base64 import subprocess import logging import requests from hytest import * from selenium import webdriver from selenium.common import ElementNotInteractableException from urllib.parse import urlencode from datetime import datetime from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException # import datetime # 获取当前脚本的绝对路径 current_dir = os.path.dirname(os.path.abspath(__file__)) # 获取当前脚本的父目录 parent_dir = os.path.dirname(current_dir) logging.info(parent_dir) # 添加路径 sys.path.append(current_dir) # 配置日志记录器,仅输出到控制台 logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) # 浏览器初始化函数 def browser_init(login_type): """ 初始化浏览器设置和实例。 此函数旨在创建并配置一个Chrome浏览器实例,包括设置Chrome选项以排除不必要的日志, 并尝试打开特定的登录页面。任何初始化过程中出现的错误都会被捕获并记录。 参数: login_type (str): 指定登录类型,根据不同的登录类型选择不同的URL。 返回: 无 """ # 标记初始化过程的开始 INFO("'----------' 正在初始化浏览器 '----------'") # 创建Chrome选项实例,用于配置浏览器行为 options = webdriver.ChromeOptions() # 添加实验性选项,排除某些命令行开关以减少输出日志 options.add_experimental_option('excludeSwitches', ['enable-Logging']) # 忽略证书错误,允许在本地主机上运行时不安全 options.add_argument('--ignore-certificate-errors') # 禁用自动化控制特征检测,避免被网站识别为自动化流量 options.add_argument('--disable-blink-features=AutomationControlled') # 允许不安全的本地主机运行,通常用于开发和测试环境 options.add_argument('--allow-insecure-localhost') # 使用webdriver_manager自动下载并管理chromedriver # service = ChromeService(ChromeDriverManager().install()) # 使用备用的ChromeDriver下载源 # service = Service(ChromeDriverManager().install()) # 手动指定ChromeDriver的路径 # 自动化运行服务器的chromedriver路径: service = Service(r'C:\Users\29194\AppData\Local\Programs\Python\Python310\Scripts\chromedriver.exe') # service = Service(r'C:\Program Files\Python310\Scripts\chromedriver.exe') # 尝试创建WebDriver实例并执行初始化操作 try: # 创建WebDriver实例 wd = webdriver.Chrome(service=service, options=options) # 设置隐式等待时间为10秒,以允许元素加载 wd.implicitly_wait(60) # 获取登录URL login_url = get_login_url_from_config(login_type) # 打开对应类型的登录页面 wd.get(login_url) # 最大化浏览器窗口 wd.maximize_window() # 将WebDriver实例存储在全局存储器中,以便后续使用 GSTORE['wd'] = wd # 标记初始化过程完成 INFO("'----------' 浏览器初始化完成 '----------'") except Exception as e: # 捕获并记录初始化过程中的任何异常 logging.error(f"浏览器初始化失败:{e}") # 从配置项config中获取登录URL def get_login_url_from_config(login_type): """ 从配置文件中读取登录URL。 参数: login_type (str): 指定登录类型,根据不同的登录类型选择不同的URL。 返回: str: 对应的登录URL。 """ # 检查 login_type 是否为空或 None if not login_type: raise ValueError("login_type 不能为空") # 获取当前脚本的绝对路径 current_dir = os.path.dirname(os.path.abspath(__file__)) # 构建配置文件的绝对路径,指向 ubains-module-test 目录下的 config.json config_path = os.path.abspath(os.path.join(current_dir, '..', '..', 'config.json')) # 规范化路径,防止路径遍历攻击 config_path = os.path.normpath(config_path) # 记录配置文件路径以便调试,对路径进行脱敏处理 logging.info(f"配置文件路径: {os.path.basename(config_path)}") # 检查文件是否存在 if not os.path.exists(config_path): # 如果配置文件不存在,则抛出异常 raise FileNotFoundError(f"配置文件 {config_path} 不存在") try: # 读取配置文件 with open(config_path, 'r', encoding='utf-8') as config_file: # 将配置文件内容解析为 JSON 格式 config = json.load(config_file) # 根据 login_type 获取对应的登录 URL login_url = config.get(login_type) # 记录正在打开的登录页面类型和 URL logging.info(f"正在打开 {login_type} 的登录页面:{login_url}") except IOError as e: # 处理文件读取异常 raise IOError(f"读取配置文件失败: {e}") except json.JSONDecodeError as e: # 处理 JSON 解析异常 raise json.JSONDecodeError(f"解析配置文件失败: {e}") # 检查是否成功获取到 URL if not login_url: # 如果未找到对应的 URL,则抛出异常 raise ValueError(f"未找到对应的 URL 配置项: {login_type}") # 返回登录 URL return login_url # 管理员登录函数 def user_login(username, password): """ 管理员登录函数。 该函数通过模拟用户输入用户名、密码和验证码,并点击登录按钮,以实现管理员登录。 """ # 获取webdriver实例 wd = GSTORE['wd'] # 打印用户名输入信息 INFO(f"输入用户名:{username}") # 向用户名输入框发送用户名 safe_send_keys((By.XPATH, "//input[@placeholder='请输入账号或手机号或邮箱号']"), f'{username}', wd) # 打印密码输入信息 INFO(f"输入密码:{password}") # 向密码输入框发送密码 safe_send_keys((By.XPATH, "//input[@placeholder='请输入密码']"), f"{password}", wd) # 打印验证码输入信息 INFO("输入验证码:csba") # 向验证码输入框发送验证码 safe_send_keys((By.XPATH, "//input[@placeholder='请输入图形验证码']"), "csba", wd) # 隐式等待5秒,以确保验证码被正确处理 wd.implicitly_wait(5) # 打印登录按钮点击信息 INFO("点击登录按钮") # 点击登录按钮 safe_click((By.XPATH, "//input[@value='登 录']"), wd) # 输入框输入值函数 def safe_send_keys(element_locator, value, wd): """ 安全地向网页元素发送键值。 该函数尝试在指定时间内找到指定的网页元素,如果找到并且元素可见,将先清除元素内容,然后发送指定的键值。 如果在指定时间内未能找到元素或元素不可点击,则捕获相应的异常并打印错误信息。 参数: element_locator (tuple): 用于定位网页元素的策略和定位器的元组,例如(By.ID, 'element_id')。 value (str): 要发送到元素的键值字符串。 wd: WebDriver实例,用于与浏览器交互。 异常处理: - TimeoutException: 如果元素在指定时间内未被找到或不可点击。 - NoSuchElementException: 如果元素不存在。 - ElementNotInteractableException: 如果元素存在但不可交互。 """ try: # 等待元素在指定时间内可见 element = WebDriverWait(wd, 20).until(EC.visibility_of_element_located(element_locator)) element.clear() # 清除元素的当前值 element.send_keys(value) # 向元素发送指定的键值 except TimeoutException: # 如果元素在指定时间内未被找到或不可点击,打印超时异常信息 INFO(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # 如果元素不存在,打印相应异常信息 INFO(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # 如果元素不可交互,打印相应异常信息 INFO(f"ElementNotInteractableException: Element {element_locator} is not interactable.") # 点击按钮函数 def safe_click(element_locator, wd): """ 对其定位器指定的元素执行安全单击。 此函数尝试以处理潜在异常的方式单击元素。 它等待元素可见并可单击,然后再尝试单击它。 如果该元素在20秒内无法点击,或者它不存在, 或者不可交互,它会捕获相应的异常并记录一条信息性消息。 参数: -element_locator:要单击的元素的定位器,指定为元组。 -wd:用于查找元素并与之交互的WebDriver实例。 """ try: # Wait up to 20 seconds for the element to be visible element = WebDriverWait(wd, 20).until(EC.visibility_of_element_located(element_locator)) # Attempt to click the element element.click() except TimeoutException: # Log a message if the element is not found or not clickable within 20 seconds INFO(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # Log a message if the element is not found INFO(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # Log a message if the element is not interactable INFO(f"ElementNotInteractableException: Element {element_locator} is not interactable.") from time import sleep from selenium.webdriver.common.by import By # 议题输入和上传议题文件函数 def issue_send_and_upload(wd, issue_num, issue_name): """ 输入议题名称以及上传议题文件。 参数: wd: WebDriver实例,用于操作浏览器。 issue_num: 需要上传的议题文件数量。 issue_name: 会议议题的名称。 """ # 议题文件的路径列表 issue_file_path = [ r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\issue_file\5.164Scan 安全报告.pdf", r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\issue_file\IdeaTop软件配置&操作说明文档.docx", r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\issue_file\ideaTop部署配置视频.mp4", r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\issue_file\IdeaTop软件配置&操作说明文档.docx", r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\issue_file\议题图片.png" ] # 打印并输入议题名称 INFO(f"输入议题名称:{issue_name}") safe_send_keys((By.XPATH, f"(//input[@placeholder='请输入会议议题'])[1]"), f"{issue_name}", wd) # 点击【上传文件】按钮以开始上传议题文件 INFO("点击【上传文件】按钮") safe_click((By.XPATH, f"(//div[@class='topicsHandleButton uploadFile'][contains(text(),'上传文件(0)')])[1]"), wd) sleep(2) # 遍历每个议题文件进行上传 for i in range(issue_num): # 检查文件是否存在 if not os.path.exists(issue_file_path[i]): INFO(f"文件 {issue_file_path[i]} 不存在,跳出函数") return # 定位【选择文件】按钮 upload_button = wd.find_element(By.XPATH, '//*[@id="global-uploader-btn"]/input') sleep(2) # 选择议题文件上传 upload_button.send_keys(issue_file_path[i]) # 等待文件上传完成 sleep(15) confirmbutton = "" confirmbutton = WebDriverWait(wd, 10).until( EC.visibility_of_element_located((By.XPATH, "//button[contains(@class,'el-button el-button--default el-button--small el-button--primary')]//span[contains(text(),'确定')]")) ) if confirmbutton is not None: confirmbutton.click() else: # 截取上传完成后的屏幕日志 SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Meeting_Message", "添加议题文件") # 点击【确定】按钮完成上传 safe_click((By.XPATH, "//div[@aria-label='会议文件上传']//div[@class='el-dialog__footer']//div//span[contains(text(),'确定')]"), wd) sleep(2) # 清除输入框函数 def input_clear(element_locator, wd): """ 清空输入框中的文本。 该函数通过元素定位器找到指定的输入框,并尝试清空其内容。它使用显式等待来确保元素在尝试清除之前是可见的。 参数: - element_locator: 用于定位输入框的元素定位器,通常是一个元组,包含定位方法和定位表达式。 - wd: WebDriver实例,用于与浏览器交互。 异常处理: - TimeoutException: 如果在指定时间内元素不可见,则捕获此异常并打印超时异常消息。 - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的消息。 - ElementNotInteractableException: 如果元素不可操作(例如,元素不可见或不可点击),则捕获此异常并打印相应消息。 """ try: # 等待元素可见,并在可见后清空输入框。 input_element = WebDriverWait(wd, 20).until(EC.visibility_of_element_located(element_locator)) input_element.clear() except TimeoutException: # 如果元素在20秒内不可见,打印超时异常消息。 print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # 如果找不到元素,打印未找到元素的消息。 print(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # 如果元素不可操作,打印元素不可操作的消息。 print(f"ElementNotInteractableException: Element {element_locator} is not interactable.") # 获取列表的查询结果文本函数 def elment_get_text(element_locator, wd): """ 获取页面元素的文本。 该函数通过显式等待的方式,确保页面元素在20秒内可见并获取其文本。 如果在规定时间内元素不可见、不存在或不可交互,则会捕获相应的异常并打印错误信息。 参数: - element_locator: 用于定位页面元素的元组,通常包含定位方式和定位值。 - wd: WebDriver对象,用于与浏览器进行交互。 返回: - element_text: 页面元素的文本。如果发生异常,则返回None。 """ try: # 使用WebDriverWait等待页面元素在20秒内可见,并获取其文本。 element_text = WebDriverWait(wd, 20).until(EC.visibility_of_element_located(element_locator)).text return element_text except TimeoutException: # 如果超过20秒元素仍未可见,则捕获TimeoutException异常并打印错误信息。 print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # 如果页面上不存在该元素,则捕获NoSuchElementException异常并打印错误信息。 print(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # 如果元素存在但不可交互(例如被遮挡),则捕获ElementNotInteractableException异常并打印错误信息。 print(f"ElementNotInteractableException: Element {element_locator} is not interactable.") # 读取测试用例xlsx文件中的JSON数据进行数据驱动函数 import openpyxl def read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None): """ 读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。 参数: xlsx_file_path (str): XLSX文件的路径。 sheet_name (str, optional): 工作表的名称。如果未指定,则使用活动工作表。 case_type (str, optional): 测试用例类型,例如 '标准版' 或 'XX项目需求'。如果未指定,则读取所有测试用例。 返回: list: 包含字典的列表,每个字典包含测试用例的名称和参数。 """ try: # 打开XLSX文件 workbook = openpyxl.load_workbook(xlsx_file_path) except FileNotFoundError: raise FileNotFoundError(f"文件未找到: {xlsx_file_path}") except Exception as e: raise Exception(f"无法打开文件: {e}") # 选择工作表 if sheet_name: try: sheet = workbook[sheet_name] except KeyError: raise KeyError(f"工作表未找到: {sheet_name}") else: sheet = workbook.active # 读取表头,从第三行开始 headers = [cell.value for cell in sheet[3]] # 打印表头列名 # INFO(f"表头列名: {headers}") # 找到表头中名为 'JSON' 和 '功能类别' 的列索引 try: json_index = headers.index('JSON') except ValueError as e: raise ValueError(f"表头中没有找到所需的列: {e}") try: category_index = headers.index('功能类别') except ValueError as e: raise ValueError(f"表头中没有找到所需的列: {e}") ddt_cases = [] # 遍历XLSX文件中的每一行数据,从第四行开始 for row_num, row in enumerate(sheet.iter_rows(min_row=4, values_only=True), start=4): # 获取 JSON 列的数据 json_data = row[json_index] # 打印 JSON 数据以进行调试 INFO(f"行 {row_num} 的 JSON 数据: {json_data}") # 检查 JSON 数据是否为空 if json_data is None or json_data.strip() == "": # INFO(f"跳过行 {row_num},JSON 数据为空") continue # 解析 JSON 字符串 try: parsed_json = json.loads(json_data) except json.JSONDecodeError: raise ValueError(f"行 {row_num} 的 JSON 数据无法解析: {json_data}") # 获取功能类别 category = row[category_index] # 检查是否需要过滤测试用例类型 if case_type and category != case_type: continue # 将解析后的 JSON 数据添加到列表中 ddt_cases.append(parsed_json) # 日志记录:XLSX文件已读取 INFO("XLSX文件已读取") # 返回包含所有测试用例数据的列表 return ddt_cases import openpyxl def clear_columns_in_xlsx(xlsx_file_path, sheet_name=None, columns_to_clear=None): """ 将XLSX文件中指定列的单元格值设置为空。 参数: xlsx_file_path (str): XLSX文件的路径。 sheet_name (str, optional): 工作表的名称。如果未指定,则使用活动工作表。 columns_to_clear (list, optional): 需要清空的列名列表。如果未指定,则不执行任何操作。 返回: 无 """ if not columns_to_clear: logging.warning("未指定需要清空的列名列表,函数将不执行任何操作。") return try: # 打开XLSX文件 workbook = openpyxl.load_workbook(xlsx_file_path) except FileNotFoundError: raise FileNotFoundError(f"文件未找到: {xlsx_file_path}") except Exception as e: raise Exception(f"无法打开文件: {e}") # 选择工作表 if sheet_name: try: sheet = workbook[sheet_name] except KeyError: raise KeyError(f"工作表未找到: {sheet_name}") else: sheet = workbook.active # 读取表头,从第三行开始 headers = [cell.value for cell in sheet[3]] # 打印表头列名 logging.info(f"表头列名: {headers}") # 找到需要清空的列的索引 column_indices_to_clear = [headers.index(column) for column in columns_to_clear if column in headers] if not column_indices_to_clear: logging.warning("指定的列名在表头中未找到,函数将不执行任何操作。") return # 遍历XLSX文件中的每一行数据,从第四行开始 for row in sheet.iter_rows(min_row=4): for col_index in column_indices_to_clear: row[col_index].value = None # 将单元格值设置为空 # 保存修改后的文件 try: workbook.save(xlsx_file_path) logging.info(f"文件 {xlsx_file_path} 已保存,指定列的单元格值已清空。") except Exception as e: logging.error(f"保存文件时出错: {e}") # 退出浏览器并释放资源函数 def browser_quit(): """ 退出浏览器并释放资源。 该函数从全局存储中获取浏览器驱动实例,并调用其quit方法来关闭浏览器并释放相关资源。 """ # 清除浏览器 INFO("清除浏览器") # 从全局存储中获取浏览器驱动实例 wd = GSTORE['wd'] # 调用浏览器驱动实例的quit方法,关闭浏览器并释放资源 wd.quit() import os import glob import logging # from datetime import datetime # 获取最新的HTML报告文件,并拼接网页访问连接函数 def get_latest_report_file(report_dir, base_url): """ 获取指定目录下最新的HTML报告文件,并返回带有基础URL的完整路径。 :param report_dir: 报告文件所在的目录 :param base_url: 基础URL :return: 最新的HTML报告文件的完整URL,如果没有找到则返回None """ # 记录调用此函数的日志 logging.info("开始调用get_latest_report_file函数获取报告文件") # 确保报告目录存在 if not os.path.exists(report_dir): logging.error(f"报告目录 {report_dir} 不存在。") return None # 获取指定目录下所有符合模式的HTML报告文件 report_files = glob.glob(os.path.join(report_dir, 'report_*.html')) # 打印找到的文件列表 logging.debug(f"找到的报告文件: {report_files}") # 如果没有找到报告文件,记录警告信息并返回None if not report_files: logging.warning("在指定目录中没有找到报告文件。") return None # 找到最新修改的报告文件 latest_file = max(report_files, key=os.path.getmtime) # 获取最新报告文件的最后修改时间 last_modified_time = datetime.fromtimestamp(os.path.getmtime(latest_file)).strftime('%Y-%m-%d %H:%M:%S') # 记录最新报告文件的信息 logging.info(f"最新报告文件: {latest_file}, 最后修改时间: {last_modified_time}") # 将文件路径转换为相对于基础URL的相对路径 relative_path = os.path.relpath(latest_file, report_dir) # 生成完整的URL full_url = f"{base_url}/{relative_path}".replace("\\", "/") # 返回完整的URL return full_url # 钉钉群机器人消息发送函数 def dingding_send_message(latest_report, title, mobile, ding_type): """ 发送钉钉机器人消息 参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x :param latest_report: 测试报告链接 :param title: 消息标题 :param mobile: 需要@的手机号列表 :param ding_type: 钉钉机器人类型,用于选择不同的 Webhook URL 和密钥 """ # 记录调用此函数的日志 logging.info("开始构建并发送钉钉机器人消息") # 钉钉机器人的 Webhook URL 和密钥(正式环境) # webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=b0eea0bbf097ce3badb4c832d2cd0267a50486f395ec8beca6e2042102bb295b' # secret = 'SEC928b11659c5fd6476cfa2042edbf56da876abf759289f7e4d3c671fb9a81bf43' # 钉钉机器人的 Webhook URL 和密钥(测试环境) if ding_type == '标准版巡检': webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4' secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6' elif ding_type == '展厅巡检': webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=061b6e9b1ae436f356cfda7fe19b6e58e46b62670046a78bd3a4d869118c612d' secret = 'SEC93212bd880aad638cc0df2b28a72ef4fdf6651cacb8a6a4bc71dcf09705d458d' # 生成时间戳 timestamp = str(round(time.time() * 1000)) # 生成签名 secret_enc = secret.encode('utf-8') string_to_sign = f'{timestamp}\n{secret}' string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 构建最终的 Webhook URL params = { 'access_token': webhook_url.split('=')[1], 'timestamp': timestamp, 'sign': sign } encoded_params = urllib.parse.urlencode(params) final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}' # 记录最终的 Webhook URL logging.info(f"钉钉机器人Webhook URL: {final_webhook_url}") # 调用测试结果获取函数 browser_init("标准版预定系统") wd = GSTORE['wd'] # print(latest_report) test_result = get_test_result(latest_report, wd) browser_quit() # 构建消息体 headers = {'Content-Type': 'application/json'} message = { 'msgtype': 'link', 'link': { 'title': title, 'messageUrl': latest_report, 'text': f"通过:{test_result['pass_percent']}" + f"失败:{test_result['fail_percent']}" + f"异常:{test_result['exception_percent']}", }, "at": { "atMobiles": [mobile], "isAtAll": True } } try: # 发送 POST 请求 response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers) # 检查响应状态码 if response.status_code == 200: logging.info('消息发送成功!') logging.info(f'响应内容: {response.text}') else: logging.error(f'消息发送失败,状态码: {response.status_code}') logging.error(f'响应内容: {response.text}') except requests.exceptions.RequestException as e: logging.error(f'请求异常: {e}') # 运行自动化测试函数,并调用获取测试报告链接和钉钉机器人消息发送函数 def run_automation_test(report_title, report_url_prefix, test_case , ding_type): """ 运行自动化测试并生成报告。 参数: - report_title: 报告的标题 - report_url_prefix: 报告URL的前缀 - test_case: 测试用例脚本执行的标签名 - ding_type: 钉钉通知的类型,备用参数,当前代码中未使用 """ # 记录测试开始的日志 logging.info("开始自动化测试...") # 构建运行测试命令 command = [ 'hytest', '--report_title', report_title, '--report_url_prefix', report_url_prefix, '--tag', test_case ] # 记录将要执行的命令日志 logging.info(f"执行命令: {' '.join(command)}") try: # 执行测试命令并获取结果 result = subprocess.run(command, capture_output=True, text=True, check=True) # 记录命令的标准输出和错误输出 logging.debug(f"命令标准输出: {result.stdout}") logging.debug(f"命令错误输出: {result.stderr}") except subprocess.CalledProcessError as e: # 处理子进程调用失败的异常 logging.error(f"命令执行失败,返回码 {e.returncode}: {e.output}") except OSError as e: # 处理操作系统相关的异常 logging.error(f"发生操作系统错误: {e}") finally: # 无论测试是否成功,都记录测试结束的日志 logging.info("自动化测试完成。") # 调用回调函数处理后续操作 get_reportfile_send_dingding(f"{report_title}", report_url_prefix, ding_type) # 定义一个函数,用于获取最新的报告文件,并返回其URL,并调用钉钉消息发送函数 def get_reportfile_send_dingding(report_title, report_url_prefix, ding_type): """ 获取最新的报告文件并通过钉钉发送报告链接。 参数: report_title (str): 报告的标题。 report_url_prefix (str): 报告URL的前缀。 ding_type (str): 钉钉消息的类型。 返回: 无 """ # print(GSTORE['case_pass']) try: # 获取报告文件所在的目录 report_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..','reports') # 获取基础URLs base_url = report_url_prefix # 获取最新的报告文件 latest_report = get_latest_report_file(report_dir, base_url) # 如果找到了最新的报告文件,则发送报告链接到钉钉 if latest_report: logging.info(f"最新报告文件URL: {latest_report}") try: # 记录调用钉钉消息通知函数的日志 logging.info("开始调用钉钉消息通知函数") # 调用钉钉发送消息接口进行推送测试报告链接 dingding_send_message(latest_report, report_title, "13724387318", ding_type) # 记录钉钉消息通知函数调用成功的日志 logging.info("钉钉消息通知函数调用成功") except Exception as e: # 记录钉钉消息通知函数调用失败的日志 logging.error(f"钉钉消息通知函数调用失败: {e}") else: # 记录没有找到报告文件的日志 logging.warning("没有找到报告文件以发送。") except subprocess.CalledProcessError as e: # 处理子进程调用失败的异常 logging.error(f"命令执行失败,返回码 {e.returncode}: {e.output}") except OSError as e: # 处理操作系统相关的异常 logging.error(f"发生操作系统错误: {e}") finally: # 无论是否成功,都记录测试结束的日志 logging.info("自动化测试完成。") # 获取测试报告通过率等参数的函数 import logging import re from selenium.webdriver.common.by import By def get_test_result(latest_report, wd): """ 获取测试结果页面的通过率、失败率和异常率 :param latest_report: 测试结果页面的URL :param wd: WebDriver实例,用于访问和操作网页 :return: 包含通过率、失败率和异常率的字典 """ # 初始化测试结果字典 test_result = { "pass_percent": "", "fail_percent": "", "exception_percent": "" } # 访问测试结果页面 wd.get(latest_report) sleep(5) # 点击简略显示 safe_click((By.XPATH,"//div[@id='display_mode']"), wd) sleep(5) # 定义一个函数来获取和解析百分比 def get_percentage(selector, wd): text = elment_get_text(selector, wd) logging.info(f"获取的文本:{text}") match = re.search(r'(\d+(\.\d+)?)%', text) if match: return match.group(0) else: logging.error(f"未找到百分比匹配项,文本内容: {text}") return "0" # 获取通过率 pass_percent = get_percentage((By.CSS_SELECTOR, "div[class='result_barchart'] div:nth-child(1) span:nth-child(1)"), wd) test_result["pass_percent"] = pass_percent # 获取失败率 fail_percent = get_percentage( (By.CSS_SELECTOR, "body > div.main_section > div.result > div > div:nth-child(2) > span"), wd) test_result["fail_percent"] = fail_percent # 获取异常率 exception_percent = get_percentage( (By.CSS_SELECTOR, "body > div.main_section > div.result > div > div:nth-child(3) > span"), wd) test_result["exception_percent"] = exception_percent # 输出test_result logging.info(test_result) print(test_result) sleep(5) # 返回测试结果字典 return test_result # if __name__ == "__main__": # browser_init("展厅预定巡检") # wd = GSTORE['wd'] # test_result = get_test_result("http://nat.ubainsyun.com:31134/report_20250217_094401.html",wd) # print(test_result) # 字符串转换枚举类型函数 def get_by_enum(type_str): """ 将字符串类型的定位器类型转换为 selenium.webdriver.common.by.By 枚举类型。 参数: type_str (str): 定位器类型字符串,例如 'XPATH'。 返回: selenium.webdriver.common.by.By: 对应的 By 枚举类型。 """ # 将输入的定位器类型字符串转换为大写,以匹配 By 枚举类型的命名 type_str = type_str.upper() # 根据输入的字符串类型返回对应的 By 枚举类型 if type_str == 'XPATH': return By.XPATH elif type_str == 'ID': return By.ID elif type_str == 'NAME': return By.NAME elif type_str == 'CLASS_NAME': return By.CLASS_NAME elif type_str == 'CSS_SELECTOR': return By.CSS_SELECTOR elif type_str == 'TAG_NAME': return By.TAG_NAME elif type_str == 'LINK_TEXT': return By.LINK_TEXT elif type_str == 'PARTIAL_LINK_TEXT': return By.PARTIAL_LINK_TEXT else: # 如果输入的定位器类型字符串不匹配任何已知的 By 枚举类型,抛出 ValueError 异常 raise ValueError(f"未知的定位器类型: {type_str}") # xlsx文件写入函数 import os import openpyxl import logging from openpyxl.drawing.image import Image from openpyxl.utils import get_column_letter def write_xlsx_data(xlsx_file_path, sheet_name, function_number, test_result, log_screenshot): """ 在XLSX文件的指定行中填充测试结果和日志截图。 参数: xlsx_file_path (str): XLSX文件的路径。 sheet_name (str): 工作表的名称。 function_number (str): 功能编号,用于匹配行。 test_result (str): 测试结果。 log_screenshot (str): 日志截图路径。 """ try: # 打开XLSX文件 workbook = openpyxl.load_workbook(xlsx_file_path) except FileNotFoundError: raise FileNotFoundError(f"文件未找到: {xlsx_file_path}") except Exception as e: raise Exception(f"无法打开文件: {e}") # 选择工作表 if sheet_name: try: sheet = workbook[sheet_name] except KeyError: raise KeyError(f"工作表未找到: {sheet_name}") else: sheet = workbook.active # 读取表头,从第三行开始 headers = [cell.value for cell in sheet[3]] # 打印表头列名 logging.info(f"表头列名: {headers}") # 找到表头中名为 '功能编号'、'测试结果' 和 '日志截图' 的列索引 try: function_number_index = headers.index("功能编号") except ValueError as e: raise ValueError(f"表头中没有找到 '功能编号' 列: {e}") try: test_result_index = headers.index("测试结果") except ValueError as e: raise ValueError(f"表头中没有找到 '测试结果' 列: {e}") try: log_screenshot_index = headers.index("日志截图") except ValueError as e: raise ValueError(f"表头中没有找到 '日志截图' 列: {e}") # 遍历数据行,找到与给定功能编号匹配的行 for row in sheet.iter_rows(min_row=4, values_only=False): if row[function_number_index].value == function_number: # 填充测试结果 row[test_result_index].value = test_result # 插入日志截图 if log_screenshot: img = Image(log_screenshot) # 获取单元格的宽度和高度 cell = row[log_screenshot_index] cell_width = sheet.column_dimensions[get_column_letter(cell.column)].width cell_height = sheet.row_dimensions[cell.row].height # 调整图片大小以适应单元格 img.width = cell_width * 7 # 7 是一个经验值,根据需要调整 img.height = cell_height * 1.5 # 1.5 是一个经验值,根据需要调整 # 设置图片位置 sheet.add_image(img, cell.coordinate) break else: raise ValueError(f"未找到功能编号为 {function_number} 的行") # 保存修改后的文件 try: workbook.save(xlsx_file_path) logging.info(f"文件 {xlsx_file_path} 已保存,数据已写入功能编号为 {function_number} 的行。") except Exception as e: logging.error(f"保存文件时出错: {e}") # 预定服务日志获取函数 import paramiko import time def get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path, num_lines=100, timeout=30): """ 使用 Paramiko 获取远程服务器的日志文件内容. Args: host (str): 服务器 IP 地址或域名. username (str): 用户名. private_key_path (str): SSH 私钥文件路径. passphrase (str): 私钥文件的 passphrase. log_path (str): 日志文件路径. num_lines (int): 要获取的日志行数 (默认 100). timeout (int): SSH 命令执行的超时时间(秒). Returns: str: 获取的日志内容,如果出错返回 None. """ try: private_key = paramiko.RSAKey.from_private_key_file(private_key_path, password=passphrase) client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host, username=username, pkey=private_key, timeout=timeout) command = f"tail -n {num_lines} {log_path}" print(f"Executing command: {command}") stdin, stdout, stderr = client.exec_command(command, timeout=timeout) error = stderr.read().decode('utf-8') if error: print(f"Error: {error}") return None output = stdout.read().decode('utf-8') print("Successfully retrieved log content.") return output except Exception as e: print(f"An error occurred: {e}") return None finally: client.close() # if __name__ == "__main__": # host = "192.168.5.218" # username = "root" # private_key_path = "C:\\Users\\29194\\.ssh\\id_rsa" # 替换为你的私钥文件路径 # passphrase = "Ubains@123" # 替换为你的 passphrase # log_path = "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log" # log_content = get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path) # # if log_content: # print(log_content) # else: # print("Failed to retrieve log content.")