import csv import glob import re import urllib import requests import json import hmac import hashlib import base64 import psutil import time import subprocess import logging from webdriver_manager.chrome import ChromeDriverManager from selenium.webdriver.chrome.service import Service as ChromeService from hytest import * from selenium import webdriver from selenium.common import TimeoutException, NoSuchElementException, ElementNotInteractableException from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from urllib.parse import urlencode from datetime import datetime from time import sleep from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.by import By # 获取当前脚本的绝对路径 current_dir = os.path.dirname(os.path.abspath(__file__)) # 获取当前脚本的父目录 parent_dir = os.path.dirname(current_dir) logging.info(parent_dir) # 添加路径 sys.path.append(current_dir) # 配置日志记录器,仅输出到控制台 logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) # 配置日志 # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def browser_init(login_url): """ 初始化浏览器设置和实例。 此函数旨在创建并配置一个Chrome浏览器实例,包括设置Chrome选项以排除不必要的日志, 并尝试打开特定的登录页面。任何初始化过程中出现的错误都会被捕获并记录。 """ # 标记初始化过程的开始 INFO("'----------' 正在初始化浏览器 '----------'") # 创建Chrome选项实例,用于配置浏览器行为 options = webdriver.ChromeOptions() # 添加实验性选项,排除某些命令行开关以减少输出日志 options.add_experimental_option('excludeSwitches', ['enable-Logging']) # 忽略证书错误,允许在本地主机上运行时不安全 options.add_argument('--ignore-certificate-errors') options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument('--allow-insecure-localhost') # 使用webdriver_manager自动下载并管理chromedriver service = ChromeService(ChromeDriverManager().install()) # 尝试创建WebDriver实例并执行初始化操作 try: # 创建WebDriver实例 wd = webdriver.Chrome(service=service, options=options) # 设置隐式等待时间为10秒,以允许元素加载 wd.implicitly_wait(60) # 定义目标登录页面的URL # login_test_url = 'https://192.168.5.218/#/login/logindf' # login_ngrok_url = "https://nat.ubainsyun.com:11046" # 展厅巡检环境 # login_exhibit_url = 'http://nat.ubainsyun.com:11060/#/login/logindf' # 请求WebDriver打开登录页面 wd.get(login_url) # 最大化浏览器窗口 wd.maximize_window() # 将WebDriver实例存储在全局存储器中,以便后续使用 GSTORE['wd'] = wd # 标记初始化过程完成 INFO("'----------' 浏览器初始化完成 '----------'") except Exception as e: # 捕获并记录初始化过程中的任何异常 logging.error(f"浏览器初始化失败:{e}") def admin_login(username, password): """ 管理员登录函数。 该函数通过模拟用户输入用户名、密码和验证码,并点击登录按钮,以实现管理员登录。 """ # 获取webdriver实例 wd = GSTORE['wd'] # 打印用户名输入信息 INFO(f"输入用户名:{username}") # 向用户名输入框发送用户名 safe_send_keys((By.XPATH, "//input[@placeholder='请输入账号或手机号或邮箱号']"), f'{username}', wd) # 打印密码输入信息 INFO(f"输入密码:{password}") # 向密码输入框发送密码 safe_send_keys((By.XPATH, "//input[@placeholder='请输入密码']"), f"{password}", wd) # 打印验证码输入信息 INFO("输入验证码:csba") # 向验证码输入框发送验证码 safe_send_keys((By.XPATH, "//input[@placeholder='请输入图形验证码']"), "csba", wd) # 隐式等待5秒,以确保验证码被正确处理 wd.implicitly_wait(5) # 打印登录按钮点击信息 INFO("点击登录按钮") # 点击登录按钮 safe_click((By.XPATH, "//input[@value='登 录']"), wd) def enter_the_backend(): """ 进入后台系统界面。 该函数通过模拟点击操作,找到并点击后台系统入口,以进入后台界面。 """ # 记录进入后台系统的操作信息 INFO("进入后台") # 获取webdriver对象,用于后续的页面元素操作 wd = GSTORE['wd'] # 执行点击操作,通过XPath定位到后台系统入口图标并点击 safe_click((By.XPATH, "//img[@title='后台系统']"), wd) def safe_send_keys(element_locator, value, wd): """ 安全地向网页元素发送键值。 该函数尝试在指定时间内找到指定的网页元素,如果找到并且元素可见,将先清除元素内容,然后发送指定的键值。 如果在指定时间内未能找到元素或元素不可点击,则捕获相应的异常并打印错误信息。 参数: element_locator (tuple): 用于定位网页元素的策略和定位器的元组,例如(By.ID, 'element_id')。 value (str): 要发送到元素的键值字符串。 wd: WebDriver实例,用于与浏览器交互。 异常处理: - TimeoutException: 如果元素在指定时间内未被找到或不可点击。 - NoSuchElementException: 如果元素不存在。 - ElementNotInteractableException: 如果元素存在但不可交互。 """ try: # 等待元素在指定时间内可见 element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) element.clear() # 清除元素的当前值 element.send_keys(value) # 向元素发送指定的键值 except TimeoutException: # 如果元素在指定时间内未被找到或不可点击,打印超时异常信息 INFO(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # 如果元素不存在,打印相应异常信息 INFO(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # 如果元素不可交互,打印相应异常信息 INFO(f"ElementNotInteractableException: Element {element_locator} is not interactable.") def safe_click(element_locator, wd): """ 对其定位器指定的元素执行安全单击。 此函数尝试以处理潜在异常的方式单击元素。 它等待元素可见并可单击,然后再尝试单击它。 如果该元素在20秒内无法点击,或者它不存在, 或者不可交互,它会捕获相应的异常并记录一条信息性消息。 参数: -element_locator:要单击的元素的定位器,指定为元组。 -wd:用于查找元素并与之交互的WebDriver实例。 """ try: # Wait up to 20 seconds for the element to be visible element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) # Attempt to click the element element.click() except TimeoutException: # Log a message if the element is not found or not clickable within 20 seconds INFO(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # Log a message if the element is not found INFO(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # Log a message if the element is not interactable INFO(f"ElementNotInteractableException: Element {element_locator} is not interactable.") def issue_send_and_upload(wd , issue_num, issue_name): """ 输入议题名称以及上传议题文件。 """ issue_file_path = [ r"D:\GithubData\自动化测试\ubains-module-test\预定系统\reports\issue_file\5.164Scan 安全报告.pdf", r"D:\GithubData\自动化测试\ubains-module-test\预定系统\reports\issue_file\展厅巡检排班记录241129.xlsx", r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\issue_file\ideaTop部署配置视频.mp4", r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\issue_file\IdeaTop软件配置&操作说明文档.docx", r"D:\GithubData\自动化测试\ubains-module-test\预定系统\reports\issue_file\议题图片.png" ] INFO(f"输入议题名称:{issue_name}") # 输入议题名称 safe_send_keys((By.XPATH, f"(//input[@placeholder='请输入会议议题'])[1]"), f"{issue_name}", wd) # 选择议题文件进行上传 INFO("点击【上传文件】按钮") safe_click((By.XPATH, f"(//div[@class='topicsHandleButton uploadFile'][contains(text(),'上传文件(0)')])[1]"),wd) sleep(2) for i in range(issue_num): INFO("定位【选择文件】按钮") upload_button = wd.find_element(By.XPATH, '//*[@id="global-uploader-btn"]/input') # INFO(f"元素定位:{upload_button}") # INFO("选择议题文件上传") upload_button.send_keys(issue_file_path[i]) INFO(f"第{i+1}个议题文件上传完成") sleep(15) SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Meeting_Message", "添加议题文件") # 点击【确定】按钮 safe_click((By.XPATH, "//div[@aria-label='会议文件上传']//div[@class='el-dialog__footer']//div//span[contains(text(),'确定')]"),wd) sleep(2) def input_clear(element_locator, wd): """ 清空输入框中的文本。 该函数通过元素定位器找到指定的输入框,并尝试清空其内容。它使用显式等待来确保元素在尝试清除之前是可见的。 参数: - element_locator: 用于定位输入框的元素定位器,通常是一个元组,包含定位方法和定位表达式。 - wd: WebDriver实例,用于与浏览器交互。 异常处理: - TimeoutException: 如果在指定时间内元素不可见,则捕获此异常并打印超时异常消息。 - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的消息。 - ElementNotInteractableException: 如果元素不可操作(例如,元素不可见或不可点击),则捕获此异常并打印相应消息。 """ try: # 等待元素可见,并在可见后清空输入框。 input_element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) input_element.clear() except TimeoutException: # 如果元素在20秒内不可见,打印超时异常消息。 print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # 如果找不到元素,打印未找到元素的消息。 print(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # 如果元素不可操作,打印元素不可操作的消息。 print(f"ElementNotInteractableException: Element {element_locator} is not interactable.") def send_keyboard(element_locator, wd): """ 向指定元素发送键盘事件。 该函数尝试找到页面上的一个元素,并向其发送RETURN键点击事件。它使用显式等待来确保元素在尝试交互之前是可见的。 如果在指定时间内元素不可见、不存在或不可交互,则捕获相应的异常并打印错误消息。 参数: - element_locator: 用于定位元素的元组,格式为(by, value)。例如,(By.ID, 'element_id')。 - wd: WebDriver实例,用于与浏览器进行交互。 异常处理: - TimeoutException: 如果元素在20秒内不可见,则捕获此异常并打印超时错误消息。 - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的错误消息。 - ElementNotInteractableException: 如果元素不可交互(例如,被遮挡或不可点击),则捕获此异常并打印相应错误消息。 """ try: # 等待元素可见,并在可见后向其发送RETURN键点击事件。 element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) element.send_keys(Keys.RETURN) except TimeoutException: # 如果元素在指定时间内不可见,打印超时错误消息。 print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # 如果找不到指定的元素,打印未找到元素的错误消息。 print(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # 如果元素不可交互,打印不可交互错误消息。 print(f"ElementNotInteractableException: Element {element_locator} is not interactable.") def get_notify_text(wd,element_locator,module_name,function_name,name): """ 获取通知文本信息。 该函数通过WebDriver等待特定的元素出现,并截取其文本内容作为通知信息。此外,它还负责在获取通知文本后进行屏幕截图, 以便于后续的调试或记录需要。 参数: wd (WebDriver): 由上层传入的WebDriver对象,用于操作浏览器。 返回: str: 提取的通知文本信息。如果未能提取到信息或发生异常,则返回None。 """ try: # 获取提示信息 notify_text = WebDriverWait(wd, 60).until( EC.presence_of_element_located(element_locator) ).text # 屏幕截图 SELENIUM_LOG_SCREEN(wd,"50%",module_name,function_name,name) return notify_text except Exception as e: # 记录异常信息 INFO(f"Exception occurred: {e}") def elment_get_text(element_locator, wd): """ 获取页面元素的文本。 该函数通过显式等待的方式,确保页面元素在20秒内可见并获取其文本。 如果在规定时间内元素不可见、不存在或不可交互,则会捕获相应的异常并打印错误信息。 参数: - element_locator: 用于定位页面元素的元组,通常包含定位方式和定位值。 - wd: WebDriver对象,用于与浏览器进行交互。 返回: - element_text: 页面元素的文本。如果发生异常,则返回None。 """ try: # 使用WebDriverWait等待页面元素在20秒内可见,并获取其文本。 element_text = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)).text return element_text except TimeoutException: # 如果超过20秒元素仍未可见,则捕获TimeoutException异常并打印错误信息。 print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") except NoSuchElementException: # 如果页面上不存在该元素,则捕获NoSuchElementException异常并打印错误信息。 print(f"NoSuchElementException: Element {element_locator} not found.") except ElementNotInteractableException: # 如果元素存在但不可交互(例如被遮挡),则捕获ElementNotInteractableException异常并打印错误信息。 print(f"ElementNotInteractableException: Element {element_locator} is not interactable.") def read_csv_data(csv_file_path): """ 读取CSV文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。 参数: csv_file_path (str): CSV文件的路径。 返回: list: 包含字典的列表,每个字典包含测试用例的名称和参数。 """ # 打开CSV文件,使用只读模式,确保兼容性并处理编码 with open(csv_file_path, mode='r', newline='', encoding='utf-8') as file: # 创建CSV阅读器 reader = csv.reader(file) # 读取表头,为后续数据解析做准备 headers = next(reader) ddt_cases = [] # 遍历CSV文件中的每一行数据 for row in reader: # 将每一行数据转换为字典,其中包含测试用例的名称和参数 case = { 'name': row[0], 'para': row[1:] } # 将转换后的测试用例数据添加到列表中 ddt_cases.append(case) # 日志记录:CSV文件已读取 INFO("CSV文件已读取") # 返回包含所有测试用例数据的列表 return ddt_cases def get_cpu_usage(interval=1): """ 获取当前进程的 CPU 占用率。 :param interval: 计算 CPU 使用率的时间间隔(秒) :return: 当前进程的 CPU 占用率(百分比) """ try: process = psutil.Process(os.getpid()) cpu_usage = process.cpu_percent(interval=interval) if isinstance(cpu_usage, (int, float)): return cpu_usage else: logging.error("CPU 使用率数据类型不正确") return None except psutil.NoSuchProcess: logging.error("进程不存在") return None except psutil.AccessDenied: logging.error("权限不足") return None except Exception as e: logging.error(f"未知错误: {e}") return None def delete_images_in_directory(directory): # 指定要删除的图片文件扩展名 image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff'] # 遍历目录中的所有文件 for filename in os.listdir(directory): # 获取文件的完整路径 file_path = os.path.join(directory, filename) # 检查文件是否为图片文件 if any(filename.lower().endswith(ext) for ext in image_extensions): try: # 删除文件 os.remove(file_path) print(f"已删除文件: {file_path}") except Exception as e: print(f"删除文件 {file_path} 时出错: {e}") def is_valid_password(password): try: # 基本类型检查 if not isinstance(password, str): raise ValueError("Password must be a string") # 检查长度,密码至少需要11个字符 if len(password) < 11: return False # 使用正则表达式检查密码是否包含大小写字母和数字 if not re.match(r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d).{11,}$', password): return False # 检查连续3位及以上不重复且不连续组合 for i in range(len(password) - 2): # 检查是否有连续3位相同 if password[i] == password[i + 1] == password[i + 2]: return False # 检查是否有连续3位是连续字符(如123, abc) if abs(ord(password[i + 1]) - ord(password[i])) == 1 and abs(ord(password[i + 2]) - ord(password[i + 1])) == 1: return False return True except Exception as e: logging.error(f"An error occurred: {e}") return False def browser_quit(): """ 退出浏览器并释放资源。 该函数从全局存储中获取浏览器驱动实例,并调用其quit方法来关闭浏览器并释放相关资源。 """ # 清除浏览器 INFO("清除浏览器") # 从全局存储中获取浏览器驱动实例 wd = GSTORE['wd'] # 调用浏览器驱动实例的quit方法,关闭浏览器并释放资源 wd.quit() def get_latest_report_file(report_dir, base_url): """ 获取指定目录下最新的HTML报告文件,并返回带有基础URL的完整路径。 :param report_dir: 报告文件所在的目录 :param base_url: 基础URL :return: 最新的HTML报告文件的完整URL,如果没有找到则返回None """ # 记录调用此函数的日志 logging.info("开始调用get_latest_report_file函数获取报告文件") # 确保报告目录存在 if not os.path.exists(report_dir): logging.error(f"报告目录 {report_dir} 不存在。") return None # 获取指定目录下所有符合模式的HTML报告文件 report_files = glob.glob(os.path.join(report_dir, 'report_*.html')) # 打印找到的文件列表 logging.debug(f"找到的报告文件: {report_files}") # 如果没有找到报告文件,记录警告信息并返回None if not report_files: logging.warning("在指定目录中没有找到报告文件。") return None # 找到最新修改的报告文件 latest_file = max(report_files, key=os.path.getmtime) # 获取最新报告文件的最后修改时间 last_modified_time = datetime.fromtimestamp(os.path.getmtime(latest_file)).strftime('%Y-%m-%d %H:%M:%S') # 记录最新报告文件的信息 logging.info(f"最新报告文件: {latest_file}, 最后修改时间: {last_modified_time}") # 将文件路径转换为相对于基础URL的相对路径 relative_path = os.path.relpath(latest_file, report_dir) # 生成完整的URL full_url = f"{base_url}/{relative_path}".replace("\\", "/") # 返回完整的URL return full_url def dingding_send_message(test_report_url, title, mobile, ding_type): """ 发送钉钉机器人消息 参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x :param test_report_url: 测试报告链接 :param title: 消息标题 :param text: 消息内容 :param mobile: 需要@的手机号列表 """ # 记录调用此函数的日志 logging.info("开始构建并发送钉钉机器人消息") # 钉钉机器人的 Webhook URL 和密钥(正式环境) # webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=b0eea0bbf097ce3badb4c832d2cd0267a50486f395ec8beca6e2042102bb295b' # secret = 'SEC928b11659c5fd6476cfa2042edbf56da876abf759289f7e4d3c671fb9a81bf43' # 钉钉机器人的 Webhook URL 和密钥(测试环境) if ding_type == '标准版巡检': webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4' secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6' elif ding_type == '展厅巡检': webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=061b6e9b1ae436f356cfda7fe19b6e58e46b62670046a78bd3a4d869118c612d' secret = 'SEC93212bd880aad638cc0df2b28a72ef4fdf6651cacb8a6a4bc71dcf09705d458d' # 生成时间戳 timestamp = str(round(time.time() * 1000)) # 生成签名 secret_enc = secret.encode('utf-8') string_to_sign = f'{timestamp}\n{secret}' string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 构建最终的 Webhook URL params = { 'access_token': webhook_url.split('=')[1], 'timestamp': timestamp, 'sign': sign } encoded_params = urllib.parse.urlencode(params) final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}' # 记录最终的 Webhook URL logging.info(f"钉钉机器人Webhook URL: {final_webhook_url}") # 构建消息体 headers = {'Content-Type': 'application/json'} message = { 'msgtype': 'link', 'link': { 'title': title, 'messageUrl': test_report_url, 'text': test_report_url }, "at": { "atMobiles": [mobile], "isAtAll": True } } try: # 发送 POST 请求 response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers) # 检查响应状态码 if response.status_code == 200: logging.info('消息发送成功!') logging.info(f'响应内容: {response.text}') else: logging.error(f'消息发送失败,状态码: {response.status_code}') logging.error(f'响应内容: {response.text}') except requests.exceptions.RequestException as e: logging.error(f'请求异常: {e}') def run_automation_test(report_title, report_url_prefix, test_case , ding_type): """ 运行自动化测试并生成报告。 参数: - report_title: 报告的标题 - report_url_prefix: 报告URL的前缀 - test_case: 测试用例脚本执行的标签名 """ # 记录测试开始的日志 logging.info("开始自动化测试...") # 构建运行测试命令 command = [ 'hytest', '--report_title', report_title, '--report_url_prefix', report_url_prefix, '--tag', test_case ] # 记录将要执行的命令日志 logging.info(f"执行命令: {' '.join(command)}") try: # 执行测试命令并获取结果 result = subprocess.run(command, capture_output=True, text=True, check=True) # 记录命令的标准输出和错误输出 logging.debug(f"命令标准输出: {result.stdout}") logging.debug(f"命令错误输出: {result.stderr}") except subprocess.CalledProcessError as e: # 处理子进程调用失败的异常 logging.error(f"命令执行失败,返回码 {e.returncode}: {e.output}") except OSError as e: # 处理操作系统相关的异常 logging.error(f"发生操作系统错误: {e}") finally: # 无论测试是否成功,都记录测试结束的日志 logging.info("自动化测试完成。") # 调用回调函数处理后续操作 get_reportfile_send_dingding(f"{report_title}", report_url_prefix, ding_type) def get_reportfile_send_dingding(report_title, report_url_prefix, ding_type): try: # 获取报告文件所在的目录 report_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..','reports') # 获取基础URL base_url = report_url_prefix # 获取最新的报告文件 latest_report = get_latest_report_file(report_dir, base_url) # 如果找到了最新的报告文件,则发送报告链接到钉钉 if latest_report: logging.info(f"最新报告文件URL: {latest_report}") try: # 记录调用钉钉消息通知函数的日志 logging.info("开始调用钉钉消息通知函数") # 调用钉钉发送消息接口进行推送测试报告链接 dingding_send_message(latest_report, report_title, "13724387318", ding_type) # 记录钉钉消息通知函数调用成功的日志 logging.info("钉钉消息通知函数调用成功") except Exception as e: # 记录钉钉消息通知函数调用失败的日志 logging.error(f"钉钉消息通知函数调用失败: {e}") else: # 记录没有找到报告文件的日志 logging.warning("没有找到报告文件以发送。") except subprocess.CalledProcessError as e: # 处理子进程调用失败的异常 logging.error(f"命令执行失败,返回码 {e.returncode}: {e.output}") except OSError as e: # 处理操作系统相关的异常 logging.error(f"发生操作系统错误: {e}") finally: # 无论是否成功,都记录测试结束的日志 logging.info("自动化测试完成。") # if __name__ == '__main__': # get_reportfile_send_dingding("测试","http://192.168.1.166") from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException import logging def single_click_and_drag(source_element_locator, target_element_locator, wd): """ 实现元素从source_element单击后拖拽到target_element的功能 :param wd: WebDriver实例 :param source_element_locator: 拖拽起始元素的定位器 :param target_element_locator: 拖拽目标元素的定位器 """ try: # 等待页面完全加载 sleep(3) # 找到源元素和目标元素 source_element = WebDriverWait(wd, 120).until(EC.element_to_be_clickable(source_element_locator)) target_element = WebDriverWait(wd, 120).until(EC.element_to_be_clickable(target_element_locator)) # 使用ActionChains执行单击并拖拽操作 actions = ActionChains(wd) actions.click_and_hold(source_element) # 单击并按住源元素 actions.move_to_element(target_element) # 移动到目标元素 actions.release() # 释放鼠标 actions.perform() logging.info("单击并拖拽操作成功") except TimeoutException as e: logging.error(f"元素查找超时: {e}") except NoSuchElementException as e: logging.error(f"元素未找到: {e}") except Exception as e: logging.error(f"发生未知错误: {e}") import requests import os import chardet from urllib3.exceptions import InsecureRequestWarning # 禁用 InsecureRequestWarning 警告 requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) def fetch_and_parse_check_txt(url, save_path, extract_info): """ 获取check.txt文件并解析指定信息 :param url: check.txt文件的URL :param save_path: 文件保存路径 :param extract_info: 需要提取的信息列表,例如 ['[m]ysql', '[r]edis', '[f]dfs_storaged', '[f]dfs_tracker', '[e]mqx', 'ubains-meeting-api-1.0-SNAPSHOT.jar', 'ubains-meeting-inner-api-1.0-SNAPSHOT.jar', 'uwsgi'] :return: 提取的信息字典 """ try: # 发送HTTPS请求获取文件内容 response = requests.get(url, verify=False) # verify=False 忽略SSL证书验证,生产环境不推荐 response.raise_for_status() # 如果响应状态码不是200,抛出异常 # 检测文件编码 detected_encoding = chardet.detect(response.content)['encoding'] print(f"检测到的编码: {detected_encoding}") # 如果检测到的编码为空或不准确,可以手动指定编码 if not detected_encoding or detected_encoding == 'ascii': detected_encoding = 'utf-8' # 假设文件编码为 utf-8 # 将响应内容解码为字符串 content = response.content.decode(detected_encoding) # 将文件内容保存到指定目录 with open(save_path, 'w', encoding='utf-8') as file: file.write(content) # 解析文件内容 parsed_info = {} for line in content.split('\n'): for info in extract_info: if info in line: service_name = info service_status = line.split(info, 1)[1].strip() parsed_info[service_name] = service_status return parsed_info except requests.exceptions.RequestException as e: print(f"请求错误: {e}") return None