提交 08201ee3 authored 作者: 陈泽健's avatar 陈泽健

兰州中石化项目输出会议申报模块的JSON数据,调试自动化运行。

上级 23bd8995
<changelist name="在进行更新之前于_2025_5_27_18_00_取消提交了更改_[更改]" date="1748340012458" recycled="false" toDelete="true">
<option name="PATH" value="$PROJECT_DIR$/.idea/shelf/在进行更新之前于_2025_5_27_18_00_取消提交了更改_[更改]/shelved.patch" />
<option name="DESCRIPTION" value="在进行更新之前于 2025/5/27 18:00 取消提交了更改 [更改]" />
</changelist>
\ No newline at end of file
Index: 预定系统/Base/base.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
<+>import csv\r\nimport glob\r\nimport re\r\nimport urllib\r\nfrom selenium.webdriver.chrome.service import Service\r\nimport json\r\nimport hmac\r\nimport hashlib\r\nimport base64\r\nimport psutil\r\nimport time\r\nimport subprocess\r\nimport logging\r\nfrom hytest import *\r\nfrom selenium import webdriver\r\nfrom selenium.common import ElementNotInteractableException\r\nfrom selenium.webdriver.common.keys import Keys\r\nfrom urllib.parse import urlencode\r\nfrom datetime import datetime\r\nfrom selenium.webdriver.chrome.service import Service as ChromeService\r\nfrom webdriver_manager.chrome import ChromeDriverManager\r\n\r\n\r\n# import datetime\r\n\r\n# 获取当前脚本的绝对路径\r\ncurrent_dir = os.path.dirname(os.path.abspath(__file__))\r\n# 获取当前脚本的父目录\r\nparent_dir = os.path.dirname(current_dir)\r\nlogging.info(parent_dir)\r\n# 添加路径\r\nsys.path.append(current_dir)\r\n\r\n# 配置日志记录器,仅输出到控制台\r\nlogging.basicConfig(\r\n level=logging.DEBUG,\r\n format='%(asctime)s - %(levelname)s - %(message)s',\r\n handlers=[\r\n logging.StreamHandler()\r\n ]\r\n)\r\n\r\n# 浏览器初始化函数\r\ndef browser_init(login_type):\r\n \"\"\"\r\n 初始化浏览器设置和实例。\r\n\r\n 此函数旨在创建并配置一个Chrome浏览器实例,包括设置Chrome选项以排除不必要的日志,\r\n 并尝试打开特定的登录页面。任何初始化过程中出现的错误都会被捕获并记录。\r\n\r\n 参数:\r\n login_type (str): 指定登录类型,根据不同的登录类型选择不同的URL。\r\n\r\n 返回:\r\n 无\r\n \"\"\"\r\n # 标记初始化过程的开始\r\n INFO(\"'----------' 正在初始化浏览器 '----------'\")\r\n\r\n # 创建Chrome选项实例,用于配置浏览器行为\r\n options = webdriver.ChromeOptions()\r\n # 添加实验性选项,排除某些命令行开关以减少输出日志\r\n options.add_experimental_option('excludeSwitches', ['enable-Logging'])\r\n # 忽略证书错误,允许在本地主机上运行时不安全\r\n options.add_argument('--ignore-certificate-errors')\r\n # 禁用自动化控制特征检测,避免被网站识别为自动化流量\r\n options.add_argument('--disable-blink-features=AutomationControlled')\r\n # 允许不安全的本地主机运行,通常用于开发和测试环境\r\n options.add_argument('--allow-insecure-localhost')\r\n\r\n # 使用webdriver_manager自动下载并管理chromedriver\r\n # service = ChromeService(ChromeDriverManager().install())\r\n # 使用备用的ChromeDriver下载源\r\n # service = Service(ChromeDriverManager().install())\r\n # 手动指定ChromeDriver的路径\r\n # 自动化运行服务器的chromedriver路径:\r\n # 拯救者电脑\r\n # service = Service(r'C:\\Users\\29194\\AppData\\Local\\Programs\\Python\\Python310\\Scripts\\chromedriver.exe')\r\n # EDY电脑\r\n # service = Service(r'C:\\Program Files\\Python310\\Scripts\\chromedriver.exe')\r\n # 云电脑\r\n service = Service(r'E:\\Python\\Scripts\\chromedriver.exe')\r\n # 尝试创建WebDriver实例并执行初始化操作\r\n try:\r\n # 创建WebDriver实例\r\n wd = webdriver.Chrome(service=service, options=options)\r\n # 设置隐式等待时间为10秒,以允许元素加载\r\n wd.implicitly_wait(60)\r\n\r\n # 获取登录URL\r\n login_url = get_login_url_from_config(login_type)\r\n # 打开对应类型的登录页面\r\n wd.get(login_url)\r\n # 最大化浏览器窗口\r\n wd.maximize_window()\r\n\r\n # 将WebDriver实例存储在全局存储器中,以便后续使用\r\n GSTORE['wd'] = wd\r\n # 标记初始化过程完成\r\n INFO(\"'----------' 浏览器初始化完成 '----------'\")\r\n except Exception as e:\r\n # 捕获并记录初始化过程中的任何异常\r\n logging.error(f\"浏览器初始化失败:{e}\")\r\n\r\n# 从配置项config中获取登录URL\r\ndef get_login_url_from_config(login_type):\r\n \"\"\"\r\n 从配置文件中读取登录URL。\r\n\r\n 参数:\r\n login_type (str): 指定登录类型,根据不同的登录类型选择不同的URL。\r\n\r\n 返回:\r\n str: 对应的登录URL。\r\n \"\"\"\r\n # 检查 login_type 是否为空或 None\r\n if not login_type:\r\n raise ValueError(\"login_type 不能为空\")\r\n\r\n # 获取当前脚本的绝对路径\r\n current_dir = os.path.dirname(os.path.abspath(__file__))\r\n # 构建配置文件的绝对路径,指向 ubains-module-test 目录下的 config.json\r\n config_path = os.path.abspath(os.path.join(current_dir, '..', '..', 'config.json'))\r\n # 规范化路径,防止路径遍历攻击\r\n config_path = os.path.normpath(config_path)\r\n\r\n # 记录配置文件路径以便调试,对路径进行脱敏处理\r\n logging.info(f\"配置文件路径: {os.path.basename(config_path)}\")\r\n\r\n # 检查文件是否存在\r\n if not os.path.exists(config_path):\r\n # 如果配置文件不存在,则抛出异常\r\n raise FileNotFoundError(f\"配置文件 {config_path} 不存在\")\r\n\r\n try:\r\n # 读取配置文件\r\n with open(config_path, 'r', encoding='utf-8') as config_file:\r\n # 将配置文件内容解析为 JSON 格式\r\n config = json.load(config_file)\r\n # 根据 login_type 获取对应的登录 URL\r\n login_url = config.get(login_type)\r\n # 记录正在打开的登录页面类型和 URL\r\n logging.info(f\"正在打开 {login_type} 的登录页面:{login_url}\")\r\n except IOError as e:\r\n # 处理文件读取异常\r\n raise IOError(f\"读取配置文件失败: {e}\")\r\n except json.JSONDecodeError as e:\r\n # 处理 JSON 解析异常\r\n raise json.JSONDecodeError(f\"解析配置文件失败: {e}\")\r\n\r\n # 检查是否成功获取到 URL\r\n if not login_url:\r\n # 如果未找到对应的 URL,则抛出异常\r\n raise ValueError(f\"未找到对应的 URL 配置项: {login_type}\")\r\n\r\n # 返回登录 URL\r\n return login_url\r\n\r\n# 管理员登录函数\r\ndef user_login(username, password):\r\n \"\"\"\r\n 管理员登录函数。\r\n 该函数通过模拟用户输入用户名、密码和验证码,并点击登录按钮,以实现管理员登录。\r\n \"\"\"\r\n # 获取webdriver实例\r\n wd = GSTORE['wd']\r\n\r\n # 打印用户名输入信息\r\n INFO(f\"输入用户名:{username}\")\r\n # 向用户名输入框发送用户名\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='请输入账号或手机号或邮箱号']\"), f'{username}', wd)\r\n\r\n # 打印密码输入信息\r\n INFO(f\"输入密码:{password}\")\r\n # 向密码输入框发送密码\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='请输入密码']\"), f\"{password}\", wd)\r\n\r\n # 打印验证码输入信息\r\n INFO(\"输入验证码:csba\")\r\n # 向验证码输入框发送验证码\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='请输入图形验证码']\"), \"csba\", wd)\r\n # 隐式等待5秒,以确保验证码被正确处理\r\n wd.implicitly_wait(5)\r\n\r\n # 打印登录按钮点击信息\r\n INFO(\"点击登录按钮\")\r\n # 点击登录按钮\r\n safe_click((By.XPATH, \"//input[@value='登 录']\"), wd)\r\n\r\n# 进入预定后台函数\r\ndef enter_the_backend():\r\n \"\"\"\r\n 进入后台系统界面。\r\n 该函数通过模拟点击操作,找到并点击后台系统入口,以进入后台界面。\r\n \"\"\"\r\n # 记录进入后台系统的操作信息\r\n INFO(\"进入后台\")\r\n\r\n # 获取webdriver对象,用于后续的页面元素操作\r\n wd = GSTORE['wd']\r\n\r\n # 执行点击操作,通过XPath定位到后台系统入口图标并点击\r\n safe_click((By.XPATH, \"//img[@title='后台系统']\"), wd)\r\n\r\n# 输入框输入值函数\r\ndef safe_send_keys(element_locator, value, wd):\r\n \"\"\"\r\n 安全地向网页元素发送键值。\r\n 该函数尝试在指定时间内找到指定的网页元素,如果找到并且元素可见,将先清除元素内容,然后发送指定的键值。\r\n 如果在指定时间内未能找到元素或元素不可点击,则捕获相应的异常并打印错误信息。\r\n 参数:\r\n element_locator (tuple): 用于定位网页元素的策略和定位器的元组,例如(By.ID, 'element_id')。\r\n value (str): 要发送到元素的键值字符串。\r\n wd: WebDriver实例,用于与浏览器交互。\r\n 异常处理:\r\n - TimeoutException: 如果元素在指定时间内未被找到或不可点击。\r\n - NoSuchElementException: 如果元素不存在。\r\n - ElementNotInteractableException: 如果元素存在但不可交互。\r\n \"\"\"\r\n try:\r\n # 等待元素在指定时间内可见\r\n element = WebDriverWait(wd, 20).until(EC.visibility_of_element_located(element_locator))\r\n element.clear() # 清除元素的当前值\r\n element.send_keys(value) # 向元素发送指定的键值\r\n except TimeoutException:\r\n # 如果元素在指定时间内未被找到或不可点击,打印超时异常信息\r\n INFO(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n except NoSuchElementException:\r\n # 如果元素不存在,打印相应异常信息\r\n INFO(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n except ElementNotInteractableException:\r\n # 如果元素不可交互,打印相应异常信息\r\n INFO(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\n# 点击按钮函数\r\ndef safe_click(element_locator, wd):\r\n \"\"\"\r\n 对其定位器指定的元素执行安全单击。\r\n 此函数尝试以处理潜在异常的方式单击元素。\r\n 它等待元素可见并可单击,然后再尝试单击它。\r\n 如果该元素在20秒内无法点击,或者它不存在,\r\n 或者不可交互,它会捕获相应的异常并记录一条信息性消息。\r\n 参数:\r\n -element_locator:要单击的元素的定位器,指定为元组。\r\n -wd:用于查找元素并与之交互的WebDriver实例。\r\n \"\"\"\r\n try:\r\n # Wait up to 20 seconds for the element to be visible\r\n element = WebDriverWait(wd, 20).until(EC.visibility_of_element_located(element_locator))\r\n # Attempt to click the element\r\n element.click()\r\n except TimeoutException:\r\n # Log a message if the element is not found or not clickable within 20 seconds\r\n INFO(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n except NoSuchElementException:\r\n # Log a message if the element is not found\r\n INFO(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n except ElementNotInteractableException:\r\n # Log a message if the element is not interactable\r\n INFO(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\nfrom time import sleep\r\nfrom selenium.webdriver.common.by import By\r\n\r\n# 议题输入和上传议题文件函数\r\ndef issue_send_and_upload(wd, issue_num, issue_name):\r\n \"\"\"\r\n 输入议题名称以及上传议题文件。\r\n\r\n 参数:\r\n wd: WebDriver实例,用于操作浏览器。\r\n issue_num: 需要上传的议题文件数量。\r\n issue_name: 会议议题的名称。\r\n \"\"\"\r\n\r\n # 议题文件的路径列表\r\n issue_file_path = [\r\n r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\5.164Scan 安全报告.pdf\",\r\n r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\IdeaTop软件配置&操作说明文档.docx\",\r\n r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\ideaTop部署配置视频.mp4\",\r\n r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\IdeaTop软件配置&操作说明文档.docx\",\r\n r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\议题图片.png\"\r\n ]\r\n\r\n # 打印并输入议题名称\r\n INFO(f\"输入议题名称:{issue_name}\")\r\n safe_send_keys((By.XPATH, f\"(//input[@placeholder='请输入会议议题'])[1]\"), f\"{issue_name}\", wd)\r\n\r\n # 点击【上传文件】按钮以开始上传议题文件\r\n INFO(\"点击【上传文件】按钮\")\r\n safe_click((By.XPATH, f\"(//div[@class='topicsHandleButton uploadFile'][contains(text(),'上传文件(0)')])[1]\"), wd)\r\n sleep(2)\r\n\r\n # 遍历每个议题文件进行上传\r\n for i in range(issue_num):\r\n # 检查文件是否存在\r\n if not os.path.exists(issue_file_path[i]):\r\n INFO(f\"文件 {issue_file_path[i]} 不存在,跳出函数\")\r\n return\r\n\r\n # 定位【选择文件】按钮\r\n upload_button = wd.find_element(By.XPATH, '//*[@id=\"global-uploader-btn\"]/input')\r\n sleep(2)\r\n\r\n # 选择议题文件上传\r\n upload_button.send_keys(issue_file_path[i])\r\n # 等待文件上传完成\r\n sleep(15)\r\n\r\n # 截取上传完成后的屏幕日志\r\n SELENIUM_LOG_SCREEN(wd, \"50%\", \"Exhibit_Inspect\", \"Meeting_Message\", \"添加议题文件\")\r\n sleep(60)\r\n # 点击【确定】按钮完成上传\r\n safe_click((By.XPATH,\"//div[@aria-label='会议文件上传']//div[@class='el-dialog__footer']//div//span[contains(text(),'确定')]\"),wd)\r\n sleep(2)\r\n\r\n\r\n# 清除输入框函数\r\ndef input_clear(element_locator, wd):\r\n \"\"\"\r\n 清空输入框中的文本。\r\n\r\n 该函数通过元素定位器找到指定的输入框,并尝试清空其内容。它使用显式等待来确保元素在尝试清除之前是可见的。\r\n\r\n 参数:\r\n - element_locator: 用于定位输入框的元素定位器,通常是一个元组,包含定位方法和定位表达式。\r\n - wd: WebDriver实例,用于与浏览器交互。\r\n\r\n 异常处理:\r\n - TimeoutException: 如果在指定时间内元素不可见,则捕获此异常并打印超时异常消息。\r\n - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的消息。\r\n - ElementNotInteractableException: 如果元素不可操作(例如,元素不可见或不可点击),则捕获此异常并打印相应消息。\r\n \"\"\"\r\n try:\r\n # 等待元素可见,并在可见后清空输入框。\r\n input_element = WebDriverWait(wd, 20).until(EC.visibility_of_element_located(element_locator))\r\n input_element.clear()\r\n except TimeoutException:\r\n # 如果元素在20秒内不可见,打印超时异常消息。\r\n print(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n except NoSuchElementException:\r\n # 如果找不到元素,打印未找到元素的消息。\r\n print(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n except ElementNotInteractableException:\r\n # 如果元素不可操作,打印元素不可操作的消息。\r\n print(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\n# 键盘输入函数,例如【回车】键等操作\r\ndef send_keyboard(element_locator, wd):\r\n \"\"\"\r\n 向指定元素发送键盘事件。\r\n\r\n 该函数尝试找到页面上的一个元素,并向其发送RETURN键点击事件。它使用显式等待来确保元素在尝试交互之前是可见的。\r\n 如果在指定时间内元素不可见、不存在或不可交互,则捕获相应的异常并打印错误消息。\r\n\r\n 参数:\r\n - element_locator: 用于定位元素的元组,格式为(by, value)。例如,(By.ID, 'element_id')。\r\n - wd: WebDriver实例,用于与浏览器进行交互。\r\n\r\n 异常处理:\r\n - TimeoutException: 如果元素在20秒内不可见,则捕获此异常并打印超时错误消息。\r\n - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的错误消息。\r\n - ElementNotInteractableException: 如果元素不可交互(例如,被遮挡或不可点击),则捕获此异常并打印相应错误消息。\r\n \"\"\"\r\n try:\r\n # 等待元素可见,并在可见后向其发送RETURN键点击事件。\r\n element = WebDriverWait(wd, 20).until(EC.visibility_of_element_located(element_locator))\r\n element.send_keys(Keys.RETURN)\r\n except TimeoutException:\r\n # 如果元素在指定时间内不可见,打印超时错误消息。\r\n print(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n except NoSuchElementException:\r\n # 如果找不到指定的元素,打印未找到元素的错误消息。\r\n print(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n except ElementNotInteractableException:\r\n # 如果元素不可交互,打印不可交互错误消息。\r\n print(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\n# 获取常规提示文本函数,会同步进行截屏操作\r\ndef get_notify_text(wd,element_locator,module_name = None,function_name = None,file_name = None):\r\n \"\"\"\r\n 获取通知文本信息。\r\n\r\n 该函数通过WebDriver等待特定的元素出现,并截取其文本内容作为通知信息。此外,它还负责在获取通知文本后进行屏幕截图,\r\n 以便于后续的调试或记录需要。\r\n\r\n 参数:\r\n wd (WebDriver): 由上层传入的WebDriver对象,用于操作浏览器。\r\n element_locator: 用于定位元素的定位器。\r\n module_name: 模块名称,用于日志记录。\r\n function_name: 函数名称,用于日志记录。\r\n file_name: 屏幕截图的名称,用于日志记录。\r\n\r\n 返回:\r\n str: 提取的通知文本信息。如果未能提取到信息或发生异常,则返回None。\r\n \"\"\"\r\n try:\r\n # 使用WebDriverWait等待元素出现,并获取其文本内容\r\n notify_text = WebDriverWait(wd, 20).until(\r\n EC.presence_of_element_located(element_locator)\r\n ).text\r\n if module_name or function_name or file_name:\r\n # 如果 module_name, function_name, 或 file_name 有值,则使用这些参数进行屏幕截图\r\n SELENIUM_LOG_SCREEN(wd, \"50%\", module_name, function_name, file_name)\r\n else:\r\n # 如果 module_name, function_name, 和 file_name 都没有值,则仅使用 wd 和 \"50%\" 进行屏幕截图\r\n SELENIUM_LOG_SCREEN(wd, \"50%\")\r\n return notify_text\r\n except Exception as e:\r\n # 当发生异常时,记录异常信息\r\n INFO(f\"Exception occurred: {e}\")\r\n\r\n# 获取列表的查询结果文本函数\r\ndef elment_get_text(element_locator, wd):\r\n \"\"\"\r\n 获取页面元素的文本。\r\n\r\n 该函数通过显式等待的方式,确保页面元素在20秒内可见并获取其文本。\r\n 如果在规定时间内元素不可见、不存在或不可交互,则会捕获相应的异常并打印错误信息。\r\n\r\n 参数:\r\n - element_locator: 用于定位页面元素的元组,通常包含定位方式和定位值。\r\n - wd: WebDriver对象,用于与浏览器进行交互。\r\n\r\n 返回:\r\n - element_text: 页面元素的文本。如果发生异常,则返回None。\r\n \"\"\"\r\n try:\r\n # 使用WebDriverWait等待页面元素在20秒内可见,并获取其文本。\r\n element_text = WebDriverWait(wd, 20).until(EC.visibility_of_element_located(element_locator)).text\r\n return element_text\r\n except TimeoutException:\r\n # 如果超过20秒元素仍未可见,则捕获TimeoutException异常并打印错误信息。\r\n print(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n except NoSuchElementException:\r\n # 如果页面上不存在该元素,则捕获NoSuchElementException异常并打印错误信息。\r\n print(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n except ElementNotInteractableException:\r\n # 如果元素存在但不可交互(例如被遮挡),则捕获ElementNotInteractableException异常并打印错误信息。\r\n print(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\n# 读取csv文件进行数据驱动函数\r\ndef read_csv_data(csv_file_path):\r\n \"\"\"\r\n 读取CSV文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。\r\n\r\n 参数:\r\n csv_file_path (str): CSV文件的路径。\r\n\r\n 返回:\r\n list: 包含字典的列表,每个字典包含测试用例的名称和参数。\r\n \"\"\"\r\n # 打开CSV文件,使用只读模式,确保兼容性并处理编码\r\n with open(csv_file_path, mode='r', newline='', encoding='utf-8') as file:\r\n # 创建CSV阅读器\r\n reader = csv.reader(file)\r\n # 读取表头,为后续数据解析做准备\r\n headers = next(reader)\r\n ddt_cases = []\r\n # 遍历CSV文件中的每一行数据\r\n for row in reader:\r\n # 将每一行数据转换为字典,其中包含测试用例的名称和参数\r\n case = {\r\n 'name': row[0],\r\n 'para': row[1:]\r\n }\r\n # 将转换后的测试用例数据添加到列表中\r\n ddt_cases.append(case)\r\n # 日志记录:CSV文件已读取\r\n INFO(\"CSV文件已读取\")\r\n # 返回包含所有测试用例数据的列表\r\n return ddt_cases\r\n\r\n# 读取测试用例xlsx文件中的JSON数据进行数据驱动函数\r\nimport json\r\nfrom hytest import INFO\r\n\r\ndef read_xlsx_data(xlsx_file_path, sheet_name=None, case_type=None):\r\n \"\"\"\r\n 读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。\r\n\r\n 参数:\r\n xlsx_file_path (str): XLSX文件的路径。\r\n sheet_name (str, optional): 工作表的名称。如果未指定,则使用活动工作表。\r\n case_type (str, optional): 测试用例类型,例如 '标准版' 或 'XX项目需求'。如果未指定,则读取所有测试用例。\r\n\r\n 返回:\r\n list: 包含字典的列表,每个字典包含测试用例的名称和参数。\r\n \"\"\"\r\n try:\r\n # 调试信息:打印文件路径\r\n INFO(f\"尝试打开文件路径: {xlsx_file_path}\")\r\n if not os.path.exists(xlsx_file_path):\r\n raise FileNotFoundError(f\"文件未找到: {xlsx_file_path}\")\r\n\r\n # 使用 with 语句打开 XLSX 文件,确保文件在读取完成后自动关闭\r\n workbook = openpyxl.load_workbook(xlsx_file_path, read_only=True)\r\n INFO(\"XLSX文件成功打开\")\r\n\r\n # 选择工作表\r\n if sheet_name:\r\n try:\r\n sheet = workbook[sheet_name]\r\n INFO(f\"成功选择工作表: {sheet_name}\")\r\n except KeyError:\r\n raise KeyError(f\"工作表未找到: {sheet_name}\")\r\n else:\r\n sheet = workbook.active\r\n INFO(\"使用活动工作表\")\r\n\r\n # 读取表头,从第三行开始\r\n headers = [cell.value for cell in sheet[3]]\r\n INFO(f\"表头列名: {headers}\")\r\n\r\n # 找到表头中名为 'JSON' 和 '功能类别' 的列索引\r\n try:\r\n json_index = headers.index('JSON')\r\n INFO(f\"找到 'JSON' 列索引: {json_index}\")\r\n except ValueError as e:\r\n raise ValueError(f\"表头中没有找到所需的列: {e}\")\r\n\r\n try:\r\n category_index = headers.index('功能类别')\r\n INFO(f\"找到 '功能类别' 列索引: {category_index}\")\r\n except ValueError as e:\r\n raise ValueError(f\"表头中没有找到所需的列: {e}\")\r\n\r\n ddt_cases = []\r\n # 遍历XLSX文件中的每一行数据,从第四行开始\r\n for row_num, row in enumerate(sheet.iter_rows(min_row=4, values_only=True), start=4):\r\n # 获取 JSON 列的数据\r\n json_data = row[json_index]\r\n INFO(f\"行 {row_num} 的 JSON 数据: {json_data}\")\r\n\r\n # 检查 JSON 数据是否为空\r\n if json_data is None or json_data.strip() == \"\":\r\n INFO(f\"跳过行 {row_num},JSON 数据为空\")\r\n continue\r\n\r\n # 解析 JSON 字符串\r\n try:\r\n parsed_json = json.loads(json_data)\r\n INFO(f\"行 {row_num} 的 JSON 数据解析成功: {parsed_json}\")\r\n except json.JSONDecodeError:\r\n raise ValueError(f\"行 {row_num} 的 JSON 数据无法解析: {json_data}\")\r\n\r\n # 获取功能类别\r\n category = row[category_index]\r\n INFO(f\"行 {row_num} 的功能类别: {category}\")\r\n\r\n # 检查是否需要过滤测试用例类型\r\n if case_type and category != case_type:\r\n INFO(f\"跳过行 {row_num},功能类别不匹配: {category} != {case_type}\")\r\n continue\r\n\r\n # 将解析后的 JSON 数据添加到列表中\r\n ddt_cases.append(parsed_json)\r\n INFO(f\"行 {row_num} 的 JSON 数据已添加到 ddt_cases\")\r\n\r\n # 日志记录:XLSX文件已读取\r\n INFO(\"XLSX文件已读取\")\r\n # 返回包含所有测试用例数据的列表\r\n return ddt_cases\r\n\r\n except FileNotFoundError:\r\n raise FileNotFoundError(f\"文件未找到: {xlsx_file_path}\")\r\n except Exception as e:\r\n raise Exception(f\"无法打开文件: {e}\")\r\n\r\n# if __name__ == \"__main__\":\r\n# xlsx_file_path = r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\测试数据\\会议预定测试用例.xlsx\"\r\n# ddt_cases = read_xlsx_data(xlsx_file_path, sheet_name=\"AI创会\", case_type=\"标准版\")\r\n# print(ddt_cases)\r\n\r\nimport openpyxl\r\ndef clear_columns_in_xlsx(xlsx_file_path, sheet_name=None, columns_to_clear=None):\r\n \"\"\"\r\n 将XLSX文件中指定列的单元格值设置为空。\r\n\r\n 参数:\r\n xlsx_file_path (str): XLSX文件的路径。\r\n sheet_name (str, optional): 工作表的名称。如果未指定,则使用活动工作表。\r\n columns_to_clear (list, optional): 需要清空的列名列表。如果未指定,则不执行任何操作。\r\n\r\n 返回:\r\n 无\r\n \"\"\"\r\n if not columns_to_clear:\r\n logging.warning(\"未指定需要清空的列名列表,函数将不执行任何操作。\")\r\n return\r\n\r\n try:\r\n # 打开XLSX文件\r\n workbook = openpyxl.load_workbook(xlsx_file_path)\r\n except FileNotFoundError:\r\n raise FileNotFoundError(f\"文件未找到: {xlsx_file_path}\")\r\n except Exception as e:\r\n raise Exception(f\"无法打开文件: {e}\")\r\n\r\n # 选择工作表\r\n if sheet_name:\r\n try:\r\n sheet = workbook[sheet_name]\r\n except KeyError:\r\n raise KeyError(f\"工作表未找到: {sheet_name}\")\r\n else:\r\n sheet = workbook.active\r\n\r\n # 读取表头,从第三行开始\r\n headers = [cell.value for cell in sheet[3]]\r\n\r\n # 打印表头列名\r\n logging.info(f\"表头列名: {headers}\")\r\n\r\n # 找到需要清空的列的索引\r\n column_indices_to_clear = [headers.index(column) for column in columns_to_clear if column in headers]\r\n\r\n if not column_indices_to_clear:\r\n logging.warning(\"指定的列名在表头中未找到,函数将不执行任何操作。\")\r\n return\r\n\r\n # 遍历XLSX文件中的每一行数据,从第四行开始\r\n for row in sheet.iter_rows(min_row=4):\r\n for col_index in column_indices_to_clear:\r\n row[col_index].value = None # 将单元格值设置为空\r\n\r\n # 保存修改后的文件\r\n try:\r\n workbook.save(xlsx_file_path)\r\n logging.info(f\"文件 {xlsx_file_path} 已保存,指定列的单元格值已清空。\")\r\n except Exception as e:\r\n logging.error(f\"保存文件时出错: {e}\")\r\n# if __name__ == \"__main__\":\r\n# # 示例调用\r\n# clear_columns_in_xlsx(r'D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\测试数据\\长安大学测试用例.xlsx',\r\n# sheet_name='会议审批', columns_to_clear=['测试结果', '测试频次', '日志截图'])\r\n\r\n# 获取当前进程的 CPU 占用率函数\r\ndef get_cpu_usage(interval=1):\r\n \"\"\"\r\n 获取当前进程的 CPU 占用率。\r\n :param interval: 计算 CPU 使用率的时间间隔(秒)\r\n :return: 当前进程的 CPU 占用率(百分比)\r\n \"\"\"\r\n try:\r\n process = psutil.Process(os.getpid())\r\n cpu_usage = process.cpu_percent(interval=interval)\r\n if isinstance(cpu_usage, (int, float)):\r\n return cpu_usage\r\n else:\r\n logging.error(\"CPU 使用率数据类型不正确\")\r\n return None\r\n except psutil.NoSuchProcess:\r\n logging.error(\"进程不存在\")\r\n return None\r\n except psutil.AccessDenied:\r\n logging.error(\"权限不足\")\r\n return None\r\n except Exception as e:\r\n logging.error(f\"未知错误: {e}\")\r\n return None\r\n\r\n# 删除目录下的图片文件函数\r\ndef delete_images_in_directory(directory):\r\n \"\"\"\r\n 删除指定目录下的所有图片文件。\r\n\r\n 该函数会遍历指定的目录,寻找并删除所有扩展名在image_extensions列表中的图片文件。\r\n\r\n 参数:\r\n directory (str): 图片文件所在的目录路径。\r\n\r\n 返回:\r\n 无返回值。\r\n \"\"\"\r\n # 指定要删除的图片文件扩展名\r\n image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff']\r\n\r\n # 遍历目录中的所有文件\r\n for filename in os.listdir(directory):\r\n # 获取文件的完整路径\r\n file_path = os.path.join(directory, filename)\r\n\r\n # 检查文件是否为图片文件\r\n if any(filename.lower().endswith(ext) for ext in image_extensions):\r\n try:\r\n # 删除文件\r\n os.remove(file_path)\r\n print(f\"已删除文件: {file_path}\")\r\n except Exception as e:\r\n print(f\"删除文件 {file_path} 时出错: {e}\")\r\n\r\n# 判断非法字符函数\r\ndef is_valid_password(password):\r\n \"\"\"\r\n 验证密码的有效性。\r\n\r\n 有效密码需满足以下条件:\r\n 1. 必须是一个字符串。\r\n 2. 长度至少为11个字符。\r\n 3. 必须包含至少一个小写字母、一个大写字母和一个数字。\r\n 4. 不得包含连续3位相同的字符。\r\n 5. 不得包含连续3位连续的字符(如123, abc)。\r\n\r\n 参数:\r\n password (str): 待验证的密码。\r\n\r\n 返回:\r\n bool: 如果密码有效则返回True,否则返回False。\r\n \"\"\"\r\n try:\r\n # 基本类型检查\r\n if not isinstance(password, str):\r\n raise ValueError(\"Password must be a string\")\r\n\r\n # 检查长度,密码至少需要11个字符\r\n if len(password) < 11:\r\n return False\r\n\r\n # 使用正则表达式检查密码是否包含大小写字母和数字\r\n if not re.match(r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\\d).{11,}$', password):\r\n return False\r\n\r\n # 检查连续3位及以上不重复且不连续组合\r\n for i in range(len(password) - 2):\r\n # 检查是否有连续3位相同\r\n if password[i] == password[i + 1] == password[i + 2]:\r\n return False\r\n\r\n # 检查是否有连续3位是连续字符(如123, abc)\r\n if abs(ord(password[i + 1]) - ord(password[i])) == 1 and abs(ord(password[i + 2]) - ord(password[i + 1])) == 1:\r\n return False\r\n\r\n return True\r\n except Exception as e:\r\n logging.error(f\"An error occurred: {e}\")\r\n return False\r\n\r\n# 退出浏览器并释放资源函数\r\ndef browser_quit():\r\n \"\"\"\r\n 退出浏览器并释放资源。\r\n\r\n 该函数从全局存储中获取浏览器驱动实例,并调用其quit方法来关闭浏览器并释放相关资源。\r\n \"\"\"\r\n # 清除浏览器\r\n INFO(\"清除浏览器\")\r\n # 从全局存储中获取浏览器驱动实例\r\n wd = GSTORE['wd']\r\n # 调用浏览器驱动实例的quit方法,关闭浏览器并释放资源\r\n wd.quit()\r\n\r\nimport os\r\nimport glob\r\nimport logging\r\n# from datetime import datetime\r\n\r\n# 获取最新的HTML报告文件,并拼接网页访问连接函数\r\ndef get_latest_report_file(report_dir, base_url):\r\n \"\"\"\r\n 获取指定目录下最新的HTML报告文件,并返回带有基础URL的完整路径。\r\n\r\n :param report_dir: 报告文件所在的目录\r\n :param base_url: 基础URL\r\n :return: 最新的HTML报告文件的完整URL,如果没有找到则返回None\r\n \"\"\"\r\n # 记录调用此函数的日志\r\n logging.info(\"开始调用get_latest_report_file函数获取报告文件\")\r\n\r\n # 确保报告目录存在\r\n if not os.path.exists(report_dir):\r\n logging.error(f\"报告目录 {report_dir} 不存在。\")\r\n return None\r\n\r\n # 获取指定目录下所有符合模式的HTML报告文件\r\n report_files = glob.glob(os.path.join(report_dir, 'report_*.html'))\r\n\r\n # 打印找到的文件列表\r\n logging.debug(f\"找到的报告文件: {report_files}\")\r\n\r\n # 如果没有找到报告文件,记录警告信息并返回None\r\n if not report_files:\r\n logging.warning(\"在指定目录中没有找到报告文件。\")\r\n return None\r\n\r\n # 找到最新修改的报告文件\r\n latest_file = max(report_files, key=os.path.getmtime)\r\n\r\n # 获取最新报告文件的最后修改时间\r\n last_modified_time = datetime.fromtimestamp(os.path.getmtime(latest_file)).strftime('%Y-%m-%d %H:%M:%S')\r\n\r\n # 记录最新报告文件的信息\r\n logging.info(f\"最新报告文件: {latest_file}, 最后修改时间: {last_modified_time}\")\r\n\r\n # 将文件路径转换为相对于基础URL的相对路径\r\n relative_path = os.path.relpath(latest_file, report_dir)\r\n\r\n # 生成完整的URL\r\n full_url = f\"{base_url}/{relative_path}\".replace(\"\\\\\", \"/\")\r\n\r\n # 返回完整的URL\r\n return full_url\r\n\r\n\r\n# 钉钉群机器人消息发送函数\r\ndef dingding_send_message(latest_report, title, mobile, ding_type):\r\n \"\"\"\r\n 发送钉钉机器人消息\r\n 参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x\r\n\r\n :param latest_report: 测试报告链接\r\n :param title: 消息标题\r\n :param mobile: 需要@的手机号列表\r\n :param ding_type: 钉钉机器人类型,用于选择不同的 Webhook URL 和密钥\r\n \"\"\"\r\n # 记录调用此函数的日志\r\n logging.info(\"开始构建并发送钉钉机器人消息\")\r\n # 钉钉机器人的 Webhook URL 和密钥(正式环境)\r\n # webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=b0eea0bbf097ce3badb4c832d2cd0267a50486f395ec8beca6e2042102bb295b'\r\n # secret = 'SEC928b11659c5fd6476cfa2042edbf56da876abf759289f7e4d3c671fb9a81bf43'\r\n # 钉钉机器人的 Webhook URL 和密钥(测试环境)\r\n\r\n if ding_type == '标准版巡检':\r\n webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'\r\n secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'\r\n elif ding_type == '展厅巡检':\r\n webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=061b6e9b1ae436f356cfda7fe19b6e58e46b62670046a78bd3a4d869118c612d'\r\n secret = 'SEC93212bd880aad638cc0df2b28a72ef4fdf6651cacb8a6a4bc71dcf09705d458d'\r\n\r\n # 生成时间戳\r\n timestamp = str(round(time.time() * 1000))\r\n\r\n # 生成签名\r\n secret_enc = secret.encode('utf-8')\r\n string_to_sign = f'{timestamp}\\n{secret}'\r\n string_to_sign_enc = string_to_sign.encode('utf-8')\r\n hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()\r\n sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))\r\n\r\n # 构建最终的 Webhook URL\r\n params = {\r\n 'access_token': webhook_url.split('=')[1],\r\n 'timestamp': timestamp,\r\n 'sign': sign\r\n }\r\n encoded_params = urllib.parse.urlencode(params)\r\n final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}'\r\n\r\n # 记录最终的 Webhook URL\r\n logging.info(f\"钉钉机器人Webhook URL: {final_webhook_url}\")\r\n\r\n # 调用测试结果获取函数\r\n browser_init(\"标准版预定系统\")\r\n wd = GSTORE['wd']\r\n # print(latest_report)\r\n test_result = get_test_result(latest_report, wd)\r\n browser_quit()\r\n\r\n # 构建消息体\r\n headers = {'Content-Type': 'application/json'}\r\n message = {\r\n 'msgtype': 'link',\r\n 'link': {\r\n 'title': title,\r\n 'messageUrl': latest_report,\r\n 'text': f\"通过:{test_result['pass_percent']}\" + f\"失败:{test_result['fail_percent']}\" + f\"异常:{test_result['exception_percent']}\",\r\n },\r\n \"at\": {\r\n \"atMobiles\": [mobile],\r\n \"isAtAll\": True\r\n }\r\n }\r\n\r\n try:\r\n # 发送 POST 请求\r\n response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers)\r\n\r\n # 检查响应状态码\r\n if response.status_code == 200:\r\n logging.info('消息发送成功!')\r\n logging.info(f'响应内容: {response.text}')\r\n else:\r\n logging.error(f'消息发送失败,状态码: {response.status_code}')\r\n logging.error(f'响应内容: {response.text}')\r\n except requests.exceptions.RequestException as e:\r\n logging.error(f'请求异常: {e}')\r\n\r\n# 运行自动化测试函数,并调用获取测试报告链接和钉钉机器人消息发送函数\r\ndef run_automation_test(report_title, report_url_prefix, test_case , ding_type):\r\n \"\"\"\r\n 运行自动化测试并生成报告。\r\n\r\n 参数:\r\n - report_title: 报告的标题\r\n - report_url_prefix: 报告URL的前缀\r\n - test_case: 测试用例脚本执行的标签名\r\n - ding_type: 钉钉通知的类型,备用参数,当前代码中未使用\r\n \"\"\"\r\n # 记录测试开始的日志\r\n logging.info(\"开始自动化测试...\")\r\n\r\n # 构建运行测试命令\r\n command = [\r\n 'hytest',\r\n '--report_title', report_title,\r\n '--report_url_prefix', report_url_prefix,\r\n '--tag', test_case\r\n ]\r\n\r\n # 记录将要执行的命令日志\r\n logging.info(f\"执行命令: {' '.join(command)}\")\r\n\r\n try:\r\n # 执行测试命令并获取结果\r\n result = subprocess.run(command, capture_output=True, text=True, check=True)\r\n\r\n # 记录命令的标准输出和错误输出\r\n logging.debug(f\"命令标准输出: {result.stdout}\")\r\n logging.debug(f\"命令错误输出: {result.stderr}\")\r\n except subprocess.CalledProcessError as e:\r\n # 处理子进程调用失败的异常\r\n logging.error(f\"命令执行失败,返回码 {e.returncode}: {e.output}\")\r\n except OSError as e:\r\n # 处理操作系统相关的异常\r\n logging.error(f\"发生操作系统错误: {e}\")\r\n finally:\r\n # 无论测试是否成功,都记录测试结束的日志\r\n logging.info(\"自动化测试完成。\")\r\n\r\n # 调用回调函数处理后续操作\r\n get_reportfile_send_dingding(f\"{report_title}\", report_url_prefix, ding_type)\r\n\r\n# 定义一个函数,用于获取最新的报告文件,并返回其URL,并调用钉钉消息发送函数\r\ndef get_reportfile_send_dingding(report_title, report_url_prefix, ding_type):\r\n \"\"\"\r\n 获取最新的报告文件并通过钉钉发送报告链接。\r\n\r\n 参数:\r\n report_title (str): 报告的标题。\r\n report_url_prefix (str): 报告URL的前缀。\r\n ding_type (str): 钉钉消息的类型。\r\n\r\n 返回:\r\n 无\r\n \"\"\"\r\n # print(GSTORE['case_pass'])\r\n try:\r\n # 获取报告文件所在的目录\r\n report_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..','reports')\r\n\r\n # 获取基础URLs\r\n base_url = report_url_prefix\r\n\r\n # 获取最新的报告文件\r\n latest_report = get_latest_report_file(report_dir, base_url)\r\n\r\n # 如果找到了最新的报告文件,则发送报告链接到钉钉\r\n if latest_report:\r\n logging.info(f\"最新报告文件URL: {latest_report}\")\r\n\r\n try:\r\n # 记录调用钉钉消息通知函数的日志\r\n logging.info(\"开始调用钉钉消息通知函数\")\r\n\r\n # 调用钉钉发送消息接口进行推送测试报告链接\r\n dingding_send_message(latest_report, report_title, \"13724387318\", ding_type)\r\n\r\n # 记录钉钉消息通知函数调用成功的日志\r\n logging.info(\"钉钉消息通知函数调用成功\")\r\n except Exception as e:\r\n # 记录钉钉消息通知函数调用失败的日志\r\n logging.error(f\"钉钉消息通知函数调用失败: {e}\")\r\n else:\r\n # 记录没有找到报告文件的日志\r\n logging.warning(\"没有找到报告文件以发送。\")\r\n\r\n except subprocess.CalledProcessError as e:\r\n # 处理子进程调用失败的异常\r\n logging.error(f\"命令执行失败,返回码 {e.returncode}: {e.output}\")\r\n except OSError as e:\r\n # 处理操作系统相关的异常\r\n logging.error(f\"发生操作系统错误: {e}\")\r\n finally:\r\n # 无论是否成功,都记录测试结束的日志\r\n logging.info(\"自动化测试完成。\")\r\n\r\n\r\nfrom selenium.webdriver.common.action_chains import ActionChains\r\nfrom selenium.webdriver.support.ui import WebDriverWait\r\nfrom selenium.webdriver.support import expected_conditions as EC\r\nfrom selenium.common.exceptions import TimeoutException, NoSuchElementException\r\n# 点击并拖拽函数\r\ndef single_click_and_drag(source_element_locator, target_element_locator, wd):\r\n \"\"\"\r\n 实现元素从source_element单击后拖拽到target_element的功能\r\n\r\n :param wd: WebDriver实例\r\n :param source_element_locator: 拖拽起始元素的定位器\r\n :param target_element_locator: 拖拽目标元素的定位器\r\n \"\"\"\r\n try:\r\n # 等待页面完全加载\r\n sleep(3)\r\n\r\n # 找到源元素和目标元素\r\n source_element = WebDriverWait(wd, 120).until(EC.element_to_be_clickable(source_element_locator))\r\n target_element = WebDriverWait(wd, 120).until(EC.element_to_be_clickable(target_element_locator))\r\n\r\n # 使用ActionChains执行单击并拖拽操作\r\n actions = ActionChains(wd)\r\n actions.click_and_hold(source_element) # 单击并按住源元素\r\n actions.move_to_element(target_element) # 移动到目标元素\r\n actions.release() # 释放鼠标\r\n actions.perform()\r\n\r\n logging.info(\"单击并拖拽操作成功\")\r\n except TimeoutException as e:\r\n logging.error(f\"元素查找超时: {e}\")\r\n except NoSuchElementException as e:\r\n logging.error(f\"元素未找到: {e}\")\r\n except Exception as e:\r\n logging.error(f\"发生未知错误: {e}\")\r\n\r\n# 获取check.txt文件并解析指定信息函数\r\nimport requests\r\nimport os\r\nimport chardet\r\nfrom urllib3.exceptions import InsecureRequestWarning\r\n# 禁用 InsecureRequestWarning 警告\r\nrequests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)\r\ndef fetch_and_parse_check_txt(url, save_path, extract_info):\r\n \"\"\"\r\n 获取check.txt文件并解析指定信息\r\n\r\n :param url: check.txt文件的URL\r\n :param save_path: 文件保存路径\r\n :param extract_info: 需要提取的信息列表,例如 ['[m]ysql', '[r]edis', '[f]dfs_storaged', '[f]dfs_tracker', '[e]mqx', 'ubains-meeting-api-1.0-SNAPSHOT.jar', 'ubains-meeting-inner-api-1.0-SNAPSHOT.jar', 'uwsgi']\r\n :return: 提取的信息字典\r\n \"\"\"\r\n try:\r\n # 发送HTTPS请求获取文件内容\r\n response = requests.get(url, verify=False) # verify=False 忽略SSL证书验证,生产环境不推荐\r\n response.raise_for_status() # 如果响应状态码不是200,抛出异常\r\n\r\n # 检测文件编码\r\n detected_encoding = chardet.detect(response.content)['encoding']\r\n logging.info(f\"检测到的编码: {detected_encoding}\")\r\n\r\n # 如果检测到的编码为空或不准确,可以手动指定编码\r\n if not detected_encoding or detected_encoding == 'ascii':\r\n detected_encoding = 'utf-8' # 假设文件编码为 utf-8\r\n\r\n # 将响应内容解码为字符串\r\n content = response.content.decode(detected_encoding)\r\n\r\n # 将文件内容保存到指定目录\r\n with open(save_path, 'w', encoding='utf-8') as file:\r\n file.write(content)\r\n\r\n # 解析文件内容\r\n parsed_info = {}\r\n for line in content.split('\\n'):\r\n for info in extract_info:\r\n if info in line and info not in parsed_info:\r\n service_name = info\r\n service_status = line.split(info, 1)[1].strip()\r\n parsed_info[service_name] = service_status\r\n break # 找到后跳出内层循环,继续处理下一行\r\n\r\n return parsed_info\r\n\r\n except requests.exceptions.RequestException as e:\r\n logging.exception(f\"请求错误: {e}\")\r\n return None\r\n\r\n# 检查服务状态函数\r\nimport telnetlib\r\ndef check_service_status(host, port):\r\n \"\"\"\r\n 检查服务状态。\r\n\r\n 通过尝试连接到指定的主机和端口来检查服务是否可用。\r\n\r\n 参数:\r\n - host: 服务所在的主机地址。\r\n - port: 服务监听的端口。\r\n\r\n 返回值:\r\n 无\r\n \"\"\"\r\n try:\r\n # 创建Telnet对象并连接到服务器\r\n tn = telnetlib.Telnet(host, port)\r\n INFO(f\"成功连接到 {host}:{port}\")\r\n\r\n # 可以在这里发送命令或读取响应\r\n # 例如,读取服务器的欢迎信息\r\n response = tn.read_until(b\"\\n\", timeout=5)\r\n INFO(f\"服务器响应: {response.decode('ascii')}\")\r\n\r\n # 关闭连接\r\n tn.close()\r\n except Exception as e:\r\n INFO(f\"连接失败: {e}\")\r\n\r\n# 设置腾讯会议会议号为全局变量函数\r\ndef set_tx_meeting_id(element_locator, wd):\r\n \"\"\"\r\n 设置腾讯会议ID到全局存储。\r\n\r\n 该函数通过元素定位器获取腾讯会议ID,并将其存储在全局变量GSTORE中,\r\n 以便在其他地方访问和使用该ID。\r\n\r\n 参数:\r\n - element_locator: 用于定位腾讯会议ID元素的定位器。\r\n - wd: WebDriver实例,用于与网页交互。\r\n\r\n 返回值:\r\n 无返回值。\r\n \"\"\"\r\n # 获取腾讯会议ID文本\r\n tx_meeting_id = elment_get_text(element_locator, wd)\r\n # 将腾讯会议ID存储在全局存储中\r\n GSTORE['tx_meeting_id'] = tx_meeting_id\r\n\r\n# 获取腾讯会议会议号函数\r\ndef get_tx_meeting_id():\r\n \"\"\"\r\n 获取腾讯会议ID。\r\n\r\n 从全局存储器GSTORE中获取预先存储的腾讯会议ID。这个函数没有输入参数,直接返回存储的会议ID。\r\n\r\n Returns:\r\n str: 腾讯会议ID。\r\n \"\"\"\r\n return GSTORE.get('tx_meeting_id')\r\n\r\n# 云喇叭设备注册函数\r\ndef voice_device_register(app_id, app_secret, device_sn):\r\n \"\"\"\r\n 注册语音设备。\r\n\r\n 向指定的API发送POST请求,以注册一个语音设备。需要提供应用的ID和密钥,以及设备的序列号。\r\n\r\n 参数:\r\n app_id (str): 应用的唯一标识符。\r\n app_secret (str): 应用的秘密密钥。\r\n device_sn (str): 设备的序列号。\r\n\r\n 返回:\r\n 无直接返回值,但会记录请求结果到日志。\r\n \"\"\"\r\n # 构建请求体,包含注册所需的必要信息\r\n body = {\r\n \"app_id\": app_id,\r\n \"app_secret\": app_secret,\r\n \"device_sn\": device_sn\r\n }\r\n\r\n # 设置请求头,指定内容类型为JSON\r\n headers = {\r\n \"Content-Type\": \"application/json\"\r\n }\r\n\r\n try:\r\n # 发送POST请求到注册URL,请求体序列化为JSON格式\r\n response = requests.post(\"https://wdev.wmj.com.cn/deviceApi/register\", headers=headers, data=json.dumps(body))\r\n # 检查HTTP响应状态码,确保请求成功\r\n response.raise_for_status()\r\n # 解析响应的JSON内容\r\n response_json = response.json()\r\n # 记录成功的请求日志\r\n logging.info(\"请求成功: %s\", response_json)\r\n except requests.exceptions.RequestException as e:\r\n # 处理请求异常,记录错误日志\r\n logging.error(\"请求失败: %s\", e)\r\n except ValueError as e:\r\n # 处理解析响应异常,记录错误日志\r\n logging.error(\"解析响应失败: %s\", e)\r\n\r\n# 云喇叭设备设置函数\r\nimport requests\r\ndef cloud_voice_setting(app_id, app_secret, device_sn):\r\n \"\"\"\r\n 设置云语音功能。\r\n\r\n :param app_id: 应用ID\r\n :param app_secret: 应用密钥\r\n :param device_sn: 设备序列号\r\n :return: 服务器响应结果\r\n \"\"\"\r\n url = \"https://wdev.wmj.com.cn/deviceApi/send\"\r\n\r\n # 写死的data参数\r\n data = {\r\n \"cmd_type\": \"setting\",\r\n \"info\": {\r\n \"volume\": 10, # 0-9,音量由小到大,默认为中间值\r\n \"speed\": 2, # 0-9,语速由慢到快,默认为中间值正常语速\r\n \"tone\": 4, # 0-9,语调由低到高,默认为中间值正常语调\r\n \"speaker\": 0 # 0为女生,支持中英文\r\n }\r\n }\r\n\r\n # 构建请求体\r\n payload = {\r\n \"app_id\": app_id,\r\n \"app_secret\": app_secret,\r\n \"device_sn\": device_sn,\r\n \"data\": data\r\n }\r\n\r\n # 发送POST请求\r\n try:\r\n response = requests.post(url, json=payload)\r\n response.raise_for_status() # 如果响应状态码不是200,抛出异常\r\n logging.info(response.json()) # 打印响应的JSON数据\r\n except requests.exceptions.RequestException as e:\r\n logging.error({\"status\": \"error\", \"message\": str(e)})\r\n\r\n# 示例调用\r\n# if __name__ == \"__main__\":\r\n# app_id = os.getenv(\"APP_ID\", \"a98a124c6c3252f6612fc544a0d0fa79\")\r\n# app_secret = os.getenv(\"APP_SECRET\", \"88bc1ec4eba624f47b2200a4ce8c3852\")\r\n# device_sn = os.getenv(\"DEVICE_SN\", \"W703BB44444\")\r\n# cloud_voice_setting(app_id, app_secret, device_sn)\r\n\r\n# 云喇叭设备播放函数\r\ndef play_cloud_voice(app_id, app_secret, device_sn):\r\n \"\"\"\r\n 播放云语音功能。\r\n\r\n 本函数通过发送HTTP POST请求,触发远程语音设备播放指定的语音内容。\r\n\r\n 参数:\r\n - app_id: 应用ID,用于标识应用。\r\n - app_secret: 应用密钥,用于验证应用的身份。\r\n - device_sn: 设备序列号,用于标识具体的语音设备。\r\n \"\"\"\r\n # 注册设备\r\n voice_device_register(app_id, app_secret, device_sn)\r\n sleep(5) # 可以考虑使用异步编程或非阻塞的方式替代\r\n\r\n # 设置云喇叭的音量以及语速参数\r\n cloud_voice_setting(app_id, app_secret, device_sn)\r\n sleep(10)\r\n\r\n # 定义请求URL\r\n url = os.getenv(\"CLOUD_VOICE_API_URL\", \"https://wdev.wmj.com.cn/deviceApi/send\")\r\n\r\n # 构建请求体,包括应用ID、应用密钥、设备序列号和语音播放指令\r\n body = {\r\n \"app_id\": app_id,\r\n \"app_secret\": app_secret,\r\n \"device_sn\": device_sn,\r\n \"data\": {\r\n \"cmd_type\": \"play\",\r\n \"info\": {\r\n \"tts\": \"一、二、三、四、五、六、七、八、九、十 一、二、三、四、五、六、七、八、九、十 一、二、三、四、五、六、七、八、九、十、一、二、三、四、五、六、七、八、九、十\",\r\n \"inner\": 10, # wifi版特有\r\n \"volume\": 5 # 4G版本1-7,wifi版1-10\r\n }\r\n }\r\n }\r\n\r\n # 设置请求头,指定内容类型为JSON\r\n headers = {\r\n \"Content-Type\": \"application/json\"\r\n }\r\n\r\n try:\r\n # 发送POST请求\r\n response = requests.post(url, headers=headers, data=json.dumps(body))\r\n\r\n # 根据响应状态码判断请求是否成功\r\n if response.status_code == 200:\r\n logging.info(f\"请求成功: {response.json()}\")\r\n else:\r\n logging.error(f\"请求失败: 状态码 {response.status_code}, 响应内容 {response.text}\")\r\n\r\n except requests.exceptions.RequestException as e:\r\n logging.error(f\"请求过程中发生异常: {e}\")\r\n except json.JSONDecodeError as e:\r\n logging.error(f\"JSON解析失败: {e}\")\r\n except Exception as e:\r\n logging.error(f\"发生未知异常: {e}\")\r\n\r\n# # 示例调用\r\n# if __name__ == \"__main__\":\r\n# app_id = os.getenv(\"APP_ID\", \"a98a124c6c3252f6612fc544a0d0fa79\")\r\n# app_secret = os.getenv(\"APP_SECRET\", \"88bc1ec4eba624f47b2200a4ce8c3852\")\r\n# device_sn = os.getenv(\"DEVICE_SN\", \"W703BB44444\")\r\n# play_cloud_voice(app_id, app_secret, device_sn)\r\n\r\n# 获取测试报告通过率等参数的函数\r\nimport logging\r\nimport re\r\nfrom selenium.webdriver.common.by import By\r\ndef get_test_result(latest_report, wd):\r\n \"\"\"\r\n 获取测试结果页面的通过率、失败率和异常率\r\n\r\n :param latest_report: 测试结果页面的URL\r\n :param wd: WebDriver实例,用于访问和操作网页\r\n :return: 包含通过率、失败率和异常率的字典\r\n \"\"\"\r\n # 初始化测试结果字典\r\n test_result = {\r\n \"pass_percent\": \"\",\r\n \"fail_percent\": \"\",\r\n \"exception_percent\": \"\"\r\n }\r\n\r\n # 访问测试结果页面\r\n wd.get(latest_report)\r\n sleep(5)\r\n\r\n # 点击简略显示\r\n safe_click((By.XPATH,\"//div[@id='display_mode']\"), wd)\r\n sleep(5)\r\n\r\n # 定义一个函数来获取和解析百分比\r\n def get_percentage(selector, wd):\r\n text = elment_get_text(selector, wd)\r\n logging.info(f\"获取的文本:{text}\")\r\n match = re.search(r'(\\d+(\\.\\d+)?)%', text)\r\n if match:\r\n return match.group(0)\r\n else:\r\n logging.error(f\"未找到百分比匹配项,文本内容: {text}\")\r\n return \"0\"\r\n\r\n # 获取通过率\r\n pass_percent = get_percentage((By.CSS_SELECTOR, \"div[class='result_barchart'] div:nth-child(1) span:nth-child(1)\"), wd)\r\n test_result[\"pass_percent\"] = pass_percent\r\n\r\n # 获取失败率\r\n fail_percent = get_percentage(\r\n (By.CSS_SELECTOR, \"body > div.main_section > div.result > div > div:nth-child(2) > span\"), wd)\r\n test_result[\"fail_percent\"] = fail_percent\r\n\r\n # 获取异常率\r\n exception_percent = get_percentage(\r\n (By.CSS_SELECTOR, \"body > div.main_section > div.result > div > div:nth-child(3) > span\"), wd)\r\n test_result[\"exception_percent\"] = exception_percent\r\n # 输出test_result\r\n logging.info(test_result)\r\n print(test_result)\r\n sleep(5)\r\n\r\n # 返回测试结果字典\r\n return test_result\r\n\r\n# if __name__ == \"__main__\":\r\n# browser_init(\"展厅预定巡检\")\r\n# wd = GSTORE['wd']\r\n# test_result = get_test_result(\"http://nat.ubainsyun.com:31134/report_20250217_094401.html\",wd)\r\n# print(test_result)\r\n\r\n# 获取本机IP地址函数\r\nimport yaml\r\nimport logging\r\nimport socket\r\nimport subprocess\r\ndef get_local_ip():\r\n \"\"\"\r\n 获取本机的局域网IP地址。\r\n\r\n 此函数通过尝试与外部网络通信来确定本机的局域网IP地址。它利用了UDP协议,\r\n 并连接到一个知名的公共IP地址和端口,以此来获取本机的IP地址信息。\r\n\r\n Returns:\r\n str: 本机的局域网IP地址。\r\n \"\"\"\r\n try:\r\n # 创建一个UDP套接字\r\n sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r\n # 连接到一个公共的IP地址和端口\r\n sock.connect((\"8.8.8.8\", 80))\r\n # 获取本地IP地址\r\n local_ip = sock.getsockname()[0]\r\n finally:\r\n # 确保在所有情况下关闭套接字\r\n sock.close()\r\n return local_ip\r\n\r\n# 更新ngrok.cfg文件中的IP地址 函数\r\ndef update_ngrok_config(config_path, new_ip):\r\n \"\"\"\r\n 更新ngrok配置文件中的IP地址\r\n\r\n 本函数尝试打开并解析ngrok配置文件,更新其中的IP地址,然后将更改保存回配置文件中\r\n\r\n 参数:\r\n config_path (str): ngrok配置文件的路径\r\n new_ip (str): 需要更新到配置文件中的新IP地址\r\n\r\n 返回:\r\n 无\r\n \"\"\"\r\n try:\r\n # 打开并安全地加载ngrok配置文件\r\n with open(config_path, 'r', encoding='utf-8') as file:\r\n config = yaml.safe_load(file)\r\n\r\n # 更新IP地址\r\n config['tunnels']['nat1']['proto']['tcp'] = f\"{new_ip}:80\"\r\n\r\n # 将更新后的配置安全地写回文件\r\n with open(config_path, 'w', encoding='utf-8') as file:\r\n yaml.safe_dump(config, file, default_flow_style=False, allow_unicode=True)\r\n\r\n # 记录成功更新的日志信息\r\n logging.info(f\"ngrok.cfg 更新成功,新的IP地址为: {new_ip}\")\r\n except Exception as e:\r\n # 记录更新过程中出现的错误\r\n logging.error(f\"更新ngrok.cfg文件时出错: {e}\")\r\n\r\n# 启动ngrok函数\r\ndef start_ngrok(ngrok_path, config_path):\r\n \"\"\"\r\n 启动ngrok工具。\r\n\r\n 在尝试启动ngrok之前,此函数会先终止已运行的ngrok进程(如果有的话)。\r\n 然后使用指定的配置文件路径启动ngrok,并记录启动结果。\r\n\r\n 参数:\r\n ngrok_path (str): ngrok可执行文件的路径。\r\n config_path (str): ngrok配置文件的路径。\r\n\r\n 返回:\r\n 无返回值。\r\n \"\"\"\r\n try:\r\n # 终止已运行的ngrok进程\r\n kill_ngrok()\r\n\r\n # 构建启动ngrok的命令\r\n command = [ngrok_path, '-config', config_path, 'start', 'nat1']\r\n # 使用构建的命令启动ngrok\r\n subprocess.Popen(command, shell=True)\r\n # 记录ngrok启动成功的信息\r\n logging.info(f\"ngrok 启动成功\")\r\n except Exception as e:\r\n # 记录启动ngrok时发生的错误\r\n logging.error(f\"启动ngrok时出错: {e}\")\r\n\r\n# 停止ngrok进程函数\r\ndef kill_ngrok():\r\n \"\"\"\r\n 尝试终止所有正在运行的ngrok进程。\r\n \"\"\"\r\n try:\r\n # 使用 taskkill 命令终止所有 ngrok 进程\r\n subprocess.run(['taskkill', '/F', '/IM', 'ngrok.exe'], check=True)\r\n logging.info(\"终止所有 ngrok 进程成功\")\r\n except subprocess.CalledProcessError as e:\r\n # 如果没有找到 ngrok 进程,记录相关信息\r\n logging.info(\"没有找到 ngrok 进程\")\r\n except Exception as e:\r\n # 如果终止 ngrok 进程时发生其他错误,记录错误信息\r\n logging.error(f\"终止 ngrok 进程时出错: {e}\")\r\n\r\n# if __name__ == '__main__':\r\n# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\r\n#\r\n# # 获取本机IP地址\r\n# local_ip = get_local_ip()\r\n# logging.info(f\"本机IP地址: {local_ip}\")\r\n#\r\n# # 更新ngrok.cfg文件\r\n# ngrok_config_path = r'D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\ngrok\\ngrok-调试主机\\ngrok.cfg'\r\n# update_ngrok_config(ngrok_config_path, local_ip)\r\n#\r\n# # 启动ngrok\r\n# ngrok_path = r'D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\ngrok\\ngrok-调试主机\\ngrok.exe'\r\n# start_ngrok(ngrok_path, ngrok_config_path)\r\n\r\n# 字符串转换枚举类型函数\r\ndef get_by_enum(type_str):\r\n \"\"\"\r\n 将字符串类型的定位器类型转换为 selenium.webdriver.common.by.By 枚举类型。\r\n\r\n 参数:\r\n type_str (str): 定位器类型字符串,例如 'XPATH'。\r\n\r\n 返回:\r\n selenium.webdriver.common.by.By: 对应的 By 枚举类型。\r\n \"\"\"\r\n # 将输入的定位器类型字符串转换为大写,以匹配 By 枚举类型的命名\r\n type_str = type_str.upper()\r\n\r\n # 根据输入的字符串类型返回对应的 By 枚举类型\r\n if type_str == 'XPATH':\r\n return By.XPATH\r\n elif type_str == 'ID':\r\n return By.ID\r\n elif type_str == 'NAME':\r\n return By.NAME\r\n elif type_str == 'CLASS_NAME':\r\n return By.CLASS_NAME\r\n elif type_str == 'CSS_SELECTOR':\r\n return By.CSS_SELECTOR\r\n elif type_str == 'TAG_NAME':\r\n return By.TAG_NAME\r\n elif type_str == 'LINK_TEXT':\r\n return By.LINK_TEXT\r\n elif type_str == 'PARTIAL_LINK_TEXT':\r\n return By.PARTIAL_LINK_TEXT\r\n else:\r\n # 如果输入的定位器类型字符串不匹配任何已知的 By 枚举类型,抛出 ValueError 异常\r\n raise ValueError(f\"未知的定位器类型: {type_str}\")\r\n\r\n# 获取当前时间并格式化为 'HH:MM' 格式的函数,用于会议预定使用\r\n# import datetime\r\ndef get_current_time_formatted():\r\n \"\"\"\r\n 获取当前时间并格式化为 'HH:MM' 格式,并选择最近的未来时间点(如 00:00, 00:15, 00:30, 00:45 等)。\r\n\r\n 返回:\r\n str: 最近的未来时间点字符串,例如 '17:00'。\r\n \"\"\"\r\n # 获取当前时间\r\n current_time = datetime.now()\r\n current_time_formatted = current_time.strftime(\"%H:%M\")\r\n\r\n # 定义时间点列表\r\n time_points = [\r\n \"00:00\", \"00:15\", \"00:30\", \"00:45\",\r\n \"01:00\", \"01:15\", \"01:30\", \"01:45\",\r\n \"02:00\", \"02:15\", \"02:30\", \"02:45\",\r\n \"03:00\", \"03:15\", \"03:30\", \"03:45\",\r\n \"04:00\", \"04:15\", \"04:30\", \"04:45\",\r\n \"05:00\", \"05:15\", \"05:30\", \"05:45\",\r\n \"06:00\", \"06:15\", \"06:30\", \"06:45\",\r\n \"07:00\", \"07:15\", \"07:30\", \"07:45\",\r\n \"08:00\", \"08:15\", \"08:30\", \"08:45\",\r\n \"09:00\", \"09:15\", \"09:30\", \"09:45\",\r\n \"10:00\", \"10:15\", \"10:30\", \"10:45\",\r\n \"11:00\", \"11:15\", \"11:30\", \"11:45\",\r\n \"12:00\", \"12:15\", \"12:30\", \"12:45\",\r\n \"13:00\", \"13:15\", \"13:30\", \"13:45\",\r\n \"14:00\", \"14:15\", \"14:30\", \"14:45\",\r\n \"15:00\", \"15:15\", \"15:30\", \"15:45\",\r\n \"16:00\", \"16:15\", \"16:30\", \"16:45\",\r\n \"17:00\", \"17:15\", \"17:30\", \"17:45\",\r\n \"18:00\", \"18:15\", \"18:30\", \"18:45\",\r\n \"19:00\", \"19:15\", \"19:30\", \"19:45\",\r\n \"20:00\", \"20:15\", \"20:30\", \"20:45\",\r\n \"21:00\", \"21:15\", \"21:30\", \"21:45\",\r\n \"22:00\", \"22:15\", \"22:30\", \"22:45\",\r\n \"23:00\", \"23:15\", \"23:30\", \"23:45\"\r\n ]\r\n\r\n # 将当前时间转换为 datetime 对象\r\n current_time_dt = datetime.strptime(current_time_formatted, \"%H:%M\")\r\n\r\n # 初始化最近时间点和最小时间差\r\n closest_time_point = None\r\n min_time_diff = float('inf')\r\n\r\n # 遍历时间点列表,找到最近的未来时间点\r\n for time_point in time_points:\r\n time_point_dt = datetime.strptime(time_point, \"%H:%M\")\r\n\r\n # 如果时间点在当前时间之后\r\n if time_point_dt > current_time_dt:\r\n time_diff = (time_point_dt - current_time_dt).total_seconds()\r\n if time_diff < min_time_diff:\r\n min_time_diff = time_diff\r\n closest_time_point = time_point\r\n\r\n # 如果没有找到未来的时间点(即当前时间已经是最后一个时间点),则选择下一个天的最早时间点\r\n if closest_time_point is None:\r\n closest_time_point = time_points[0]\r\n\r\n return closest_time_point\r\n\r\n# 会议创建函数\r\ndef meeting_message(meeting_room_name, message_type, message_name, project_type, wd):\r\n \"\"\"\r\n 会议室会议预定功能的实现。\r\n\r\n 该函数通过模拟用户交互来预定会议室会议。它首先搜索指定的会议室,然后填写会议信息,\r\n 包括会议名称和类型,最后选择会议时间并完成预定。\r\n\r\n 参数:\r\n - meeting_room_name (str): 会议室名称,用于搜索指定的会议室。\r\n - message_type (str): 会议类型,用于填写会议信息。\r\n - message_name (str): 会议名称,用于填写会议信息。\r\n - project_type (str): 项目类型,用于判断是否需要输入特定信息,如固定电话。\r\n - wd: WebDriver实例,用于操作浏览器。\r\n\r\n 返回:\r\n 无返回值。\r\n \"\"\"\r\n print(\"到这了\")\r\n # 切换至会议室列表界面,开始新一轮的审批会议创建\r\n safe_click((By.XPATH, \"//li[@class='meeting_list_li']//span[contains(text(),'会议室列表')]\"), wd)\r\n # 先搜索会议室\r\n safe_click((By.XPATH, \"//i[@class='el-collapse-item__arrow el-icon-arrow-right']\"), wd)\r\n sleep(1)\r\n\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='请输入会议室名称']\"), meeting_room_name, wd)\r\n INFO(f\"搜索结果为:{meeting_room_name}\")\r\n # 点击【查询】按钮\r\n safe_click((By.XPATH, \"//span[contains(text(),'查询')]\"), wd)\r\n sleep(2)\r\n # 点击【会议预定】按钮\r\n safe_click((By.XPATH, \"//span[@class='MeetingCityList_t_btn']\"), wd)\r\n sleep(2)\r\n # 输入会议名称并勾选MessageType类型\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='请输入会议名称']\"), message_name, wd)\r\n safe_click(\r\n (By.XPATH, f\"//div[@class='reserve_input']//span[@class='el-checkbox__label'][contains(text(),'{message_type}')]\"), wd)\r\n sleep(1)\r\n\r\n # 判断是否为项目定制的预约界面\r\n if project_type == \"长安大学\":\r\n # 输入固定电话\r\n safe_send_keys((By.XPATH,\"//input[@placeholder='固定电话']\"), \"13724387311\", wd)\r\n\r\n # 选择会议时间,点击【快速预定】按钮\r\n current_time = get_current_time_formatted()\r\n print(f\"获取当前的时间{current_time}\")\r\n safe_click((By.XPATH, f\"//div[normalize-space()='{current_time}']\"), wd)\r\n sleep(1)\r\n safe_click((By.XPATH, \"//div[@class='header_Quick']\"), wd)\r\n sleep(2)\r\n # 点击【确定】按钮\r\n safe_click((By.XPATH, \"//button[@type='button']//span[contains(text(),'预定')]\"), wd)\r\n sleep(2)\r\n\r\n# 会议状态设置函数\r\ndef message_satus_control(message_name, message_type, control_type, wd):\r\n \"\"\"\r\n 结束会议流程。\r\n\r\n 参数:\r\n - message_name: 会议名称,用于搜索特定的会议。\r\n - message_type: 会议类型,用于判断是否需要进行额外的确认操作。\r\n - control_type: 控制类型,用于选择相应的会议控制操作。\r\n - wd: WebDriver实例,用于操作浏览器。\r\n\r\n 该函数通过模拟用户交互来结束会议,包括搜索会议、点击相关按钮等操作。\r\n \"\"\"\r\n # 输入会议名称并搜索\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='输入关键字搜索']\"), message_name, wd)\r\n send_keyboard((By.XPATH, \"//input[@placeholder='输入关键字搜索']\"), wd)\r\n sleep(2)\r\n # 点击【更多操作】结束会议数据\r\n # 提前开始会议\r\n safe_click((By.XPATH, \"//span[contains(text(),'更多操作')]\"), wd)\r\n sleep(1)\r\n safe_click((By.XPATH, \"//li[contains(text(),'会议状态')]\"), wd)\r\n sleep(1)\r\n # 选择提前结束\r\n safe_click((By.XPATH, f\"//span[contains(text(),'{control_type}')]\"), wd)\r\n sleep(1)\r\n safe_click((By.XPATH, \"//div[@slot='footer']//span[contains(text(),'确定')]\"), wd)\r\n sleep(2)\r\n # 针对特定会议平台的额外确认操作\r\n if message_type == \"会控\":\r\n safe_click((By.XPATH,\r\n \"//button[@class='el-button el-button--default el-button--small el-button--primary ']//span[contains(text(),'确定')]\"),\r\n wd)\r\n sleep(1)\r\n\r\n# 进入会议预约界面函数\r\ndef enter_meeting_booking_page(meeting_room_name, wd):\r\n \"\"\"\r\n 进入会议室预订页面并填写会议信息。\r\n\r\n :param meeting_room_name: 会议室名称,用于搜索特定的会议室。\r\n :param wd: WebDriver实例,用于操作浏览器。\r\n \"\"\"\r\n # 先搜索会议室\r\n safe_click((By.XPATH, \"//i[@class='el-collapse-item__arrow el-icon-arrow-right']\"), wd)\r\n sleep(1)\r\n\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='请输入会议室名称']\"), meeting_room_name, wd)\r\n INFO(f\"搜索结果为:{meeting_room_name}\")\r\n # 点击【查询】按钮\r\n safe_click((By.XPATH, \"//span[contains(text(),'查询')]\"), wd)\r\n sleep(2)\r\n # 点击【会议预定】按钮\r\n safe_click((By.XPATH, \"//span[@class='MeetingCityList_t_btn']\"), wd)\r\n sleep(2)\r\n # 输入会议名称并勾选MessageType类型\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='请输入会议名称']\"), \"测试\", wd)\r\n sleep(1)\r\n # 选择会议时间\r\n current_time = get_current_time_formatted()\r\n print(f\"获取当前的时间{current_time}\")\r\n safe_click((By.XPATH, f\"//div[normalize-space()='{current_time}']\"), wd)\r\n sleep(1)\r\n\r\n# 删除会议函数\r\ndef del_message(message_name, wd):\r\n \"\"\"\r\n 删除会议消息。\r\n\r\n 根据会议名称搜索并删除会议。\r\n\r\n 参数:\r\n - message_name: 会议名称,用于搜索特定的会议。\r\n - wd: WebDriver实例,用于执行网页自动化操作。\r\n\r\n 此函数无返回值。\r\n \"\"\"\r\n # 输入会议名称并搜索\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='输入关键字搜索']\"), message_name, wd)\r\n send_keyboard((By.XPATH, \"//input[@placeholder='输入关键字搜索']\"), wd)\r\n sleep(2)\r\n # 点击【删除会议】按钮\r\n safe_click((By.XPATH, \"//span[contains(text(),'删除会议')]\"), wd)\r\n sleep(2)\r\n # 点击【确定】按钮\r\n safe_click((By.XPATH, \"//button[contains(@class,'el-button el-button--default el-button--small el-button--primary')]//span[contains(text(),'确定')]\"), wd)\r\n sleep(1)\r\n # 进入【会议室列表】界面\r\n safe_click((By.XPATH, \"//span[contains(text(),'会议室列表')]\"), wd)\r\n sleep(1)\r\n\r\n# 设置会议审批状态函数\r\ndef approval_status_control(message_name, approval_type, wd):\r\n \"\"\"\r\n 设置会议审批状态。\r\n\r\n 参数:\r\n - message_name: 会议名称,用于搜索特定的会议。\r\n - approval_type: 审批类型,用于选择审批操作(如驳回)。\r\n - wd: WebDriver实例,用于与浏览器交互。\r\n \"\"\"\r\n # 进入会议审批模块界面\r\n safe_click((By.XPATH, \"//span[contains(text(),'会议审批')]\"), wd)\r\n sleep(1)\r\n # 进入待我审批界面\r\n safe_click((By.XPATH, \"//div[@id='tab-second']\"), wd)\r\n sleep(1)\r\n # 搜索会议\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='请输入会议名称搜索']\"), message_name, wd)\r\n send_keyboard((By.XPATH, \"//input[@placeholder='请输入会议名称搜索']\"), wd)\r\n sleep(1)\r\n # 点击【审批】按钮\r\n safe_click((By.XPATH, \"//button[@type='button']//span[contains(text(),'审批')]\"), wd)\r\n sleep(1)\r\n # 输入驳回意见\r\n safe_send_keys((By.XPATH, \"//textarea[@placeholder='请输入审批意见']\"), \"审批意见\", wd)\r\n # 点击【驳回】按钮\r\n safe_click((By.XPATH, f\"//span[contains(text(),'{approval_type}')]\"), wd)\r\n sleep(1)\r\n\r\n\r\n# xlsx文件写入函数\r\nimport os\r\nimport openpyxl\r\nfrom openpyxl import load_workbook\r\nfrom openpyxl.drawing.image import Image\r\nfrom openpyxl.utils import get_column_letter\r\nimport logging\r\n\r\ndef write_xlsx_data(xlsx_file_path, sheet_name, function_number, test_result, log_screenshot):\r\n \"\"\"\r\n 在XLSX文件的指定行中填充测试结果和日志截图(图片嵌入单元格)。\r\n\r\n 参数:\r\n xlsx_file_path (str): XLSX文件的路径。\r\n sheet_name (str): 工作表的名称。\r\n function_number (str): 功能编号,用于匹配行。\r\n test_result (str): 测试结果。\r\n log_screenshot (str): 日志截图路径。\r\n \"\"\"\r\n try:\r\n workbook = load_workbook(xlsx_file_path)\r\n except FileNotFoundError:\r\n raise FileNotFoundError(f\"文件未找到: {xlsx_file_path}\")\r\n except Exception as e:\r\n raise Exception(f\"无法打开文件: {e}\")\r\n\r\n # 选择工作表\r\n try:\r\n sheet = workbook[sheet_name] if sheet_name else workbook.active\r\n except KeyError:\r\n raise KeyError(f\"工作表未找到: {sheet_name}\")\r\n\r\n # 读取表头\r\n headers = [cell.value for cell in sheet[3]]\r\n required_columns = [\"功能编号\", \"测试结果\", \"日志截图\"]\r\n indices = {}\r\n\r\n for col in required_columns:\r\n try:\r\n indices[col] = headers.index(col)\r\n except ValueError:\r\n raise ValueError(f\"表头中缺少必要列: {col}\")\r\n\r\n # 查找目标行\r\n target_row = None\r\n for row in sheet.iter_rows(min_row=4):\r\n if row[indices[\"功能编号\"]].value == function_number:\r\n target_row = row\r\n break\r\n else:\r\n raise ValueError(f\"未找到功能编号为 {function_number} 的行\")\r\n\r\n # 填充测试结果\r\n test_result_cell = target_row[indices[\"测试结果\"]]\r\n test_result_cell.value = test_result\r\n\r\n # 处理图片\r\n if log_screenshot:\r\n img = Image(log_screenshot)\r\n img_cell = target_row[indices[\"日志截图\"]]\r\n\r\n # 计算单元格尺寸(单位转换)\r\n col_letter = get_column_letter(img_cell.column)\r\n col_width = sheet.column_dimensions[col_letter].width or 8.43 # 默认列宽\r\n row_height = sheet.row_dimensions[img_cell.row].height or 15.0 # 默认行高\r\n\r\n # 将Excel单位转换为像素(经验公式)\r\n cell_width_px = col_width * 7 # 1字符 ≈ 7像素\r\n cell_height_px = row_height * 1.333 # 1磅 ≈ 1.333像素\r\n\r\n # 按比例缩放图片\r\n img_ratio = img.width / img.height\r\n if cell_width_px / cell_height_px > img_ratio:\r\n new_height = cell_height_px\r\n new_width = cell_height_px * img_ratio\r\n else:\r\n new_width = cell_width_px\r\n new_height = cell_width_px / img_ratio\r\n\r\n img.width = new_width\r\n img.height = new_height\r\n\r\n # 将图片锚定到单元格\r\n sheet.add_image(img, f\"{col_letter}{img_cell.row}\")\r\n\r\n # 保存文件\r\n try:\r\n workbook.save(xlsx_file_path)\r\n logging.info(f\"文件已保存,功能编号 {function_number} 的图片已嵌入\")\r\n except Exception as e:\r\n logging.error(f\"保存失败: {e}\")\r\n raise\r\n\r\n\r\n# import xlwings as xw\r\n# import os\r\n# import logging\r\n# def write_xlsx_data2(xlsx_file_path, sheet_name, function_number, test_result, log_screenshot):\r\n# \"\"\"\r\n# 在XLSX文件的指定行中填充测试结果和嵌入日志截图(通过xlwings实现)。\r\n#\r\n# 参数:\r\n# xlsx_file_path (str): XLSX文件的路径。\r\n# sheet_name (str): 工作表的名称。\r\n# function_number (str): 功能编号,用于匹配行。\r\n# test_result (str): 测试结果。\r\n# log_screenshot (str): 日志截图路径。\r\n# \"\"\"\r\n# if not os.path.exists(xlsx_file_path):\r\n# raise FileNotFoundError(f\"文件未找到: {xlsx_file_path}\")\r\n#\r\n# try:\r\n# # 打开Excel文件\r\n# wb = xw.Book(xlsx_file_path)\r\n# except Exception as e:\r\n# raise Exception(f\"无法打开文件: {e}\")\r\n#\r\n# try:\r\n# # 选择工作表\r\n# if sheet_name:\r\n# sheet = wb.sheets[sheet_name]\r\n# else:\r\n# sheet = wb.sheets.active\r\n# except KeyError:\r\n# wb.close()\r\n# raise KeyError(f\"工作表 '{sheet_name}' 不存在\")\r\n#\r\n# # 读取表头(第三行)\r\n# try:\r\n# headers = sheet.range(\"3:3\").value # 获取第三行所有值\r\n# except Exception as e:\r\n# wb.close()\r\n# raise Exception(f\"读取表头失败: {e}\")\r\n#\r\n# # 确定列索引(转换为1-based列号)\r\n# try:\r\n# function_number_col = headers.index(\"功能编号\") + 1\r\n# test_result_col = headers.index(\"测试结果\") + 1\r\n# log_screenshot_col = headers.index(\"日志截图\") + 1\r\n# except ValueError as e:\r\n# wb.close()\r\n# raise ValueError(f\"表头列缺失: {e}\")\r\n#\r\n# # 遍历数据行(从第四行开始)\r\n# last_row = sheet.used_range.last_cell.row\r\n# found = False\r\n# for row_number in range(4, last_row + 1):\r\n# current_value = sheet.range((row_number, function_number_col)).value\r\n# if str(current_value) == str(function_number):\r\n# # 更新测试结果\r\n# sheet.range((row_number, test_result_col)).value = test_result\r\n#\r\n# # 插入图片(如果存在)\r\n# if log_screenshot and os.path.exists(log_screenshot):\r\n# try:\r\n# # 获取目标单元格\r\n# cell = sheet.range((row_number, log_screenshot_col))\r\n#\r\n# # 插入图片并绑定到单元格\r\n# pic = sheet.pictures.add(log_screenshot,\r\n# left=cell.left,\r\n# top=cell.top,\r\n# width=cell.width,\r\n# height=cell.height)\r\n# pic.api.Placement = 3 # 设置为xlMoveAndSize模式\r\n# except Exception as e:\r\n# logging.error(f\"插入图片失败: {e}\")\r\n# wb.close()\r\n# raise\r\n#\r\n# found = True\r\n# break\r\n#\r\n# if not found:\r\n# wb.close()\r\n# raise ValueError(f\"未找到功能编号为 {function_number} 的行\")\r\n#\r\n# # 保存并关闭\r\n# try:\r\n# wb.save()\r\n# logging.info(f\"文件 {xlsx_file_path} 已保存,数据已写入功能编号为 {function_number} 的行。\")\r\n# except Exception as e:\r\n# logging.error(f\"保存文件时出错: {e}\")\r\n# raise\r\n# finally:\r\n# wb.close()\r\n\r\nimport paramiko\r\nimport time\r\nimport paramiko\r\n\r\nimport paramiko\r\n\r\ndef get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path, num_lines=1000, timeout=30, filter_word=None):\r\n \"\"\"\r\n 使用 Paramiko 获取远程服务器的日志文件内容,并通过过滤词过滤日志内容.\r\n\r\n Args:\r\n host (str): 服务器 IP 地址或域名.\r\n username (str): 用户名.\r\n private_key_path (str): SSH 私钥文件路径.\r\n passphrase (str): 私钥文件的 passphrase.\r\n log_path (str): 日志文件路径.\r\n num_lines (int): 要获取的日志行数 (默认 100).\r\n timeout (int): SSH 命令执行的超时时间(秒).\r\n filter_word (str): 过滤词,只有包含该词的日志行才会被返回 (默认 None).\r\n\r\n Returns:\r\n str: 获取的日志内容,如果出错返回 None.\r\n \"\"\"\r\n try:\r\n print(f\"Loading private key from {private_key_path}...\")\r\n\r\n if passphrase:\r\n print(f\"passphrase为:{passphrase}\")\r\n print(f\"private_key_path:{private_key_path}\")\r\n private_key = paramiko.RSAKey.from_private_key_file(private_key_path, password=passphrase)\r\n else:\r\n private_key = paramiko.RSAKey.from_private_key_file(private_key_path)\r\n\r\n print(f\"Private key loaded successfully from {private_key_path}\")\r\n\r\n client = paramiko.SSHClient()\r\n client.set_missing_host_key_policy(paramiko.AutoAddPolicy())\r\n print(f\"Connecting to {host} as {username}...\")\r\n client.connect(host, username=username, pkey=private_key, timeout=timeout)\r\n\r\n command = f\"tail -n {num_lines} {log_path}\"\r\n print(f\"Executing command: {command}\")\r\n stdin, stdout, stderr = client.exec_command(command, timeout=timeout)\r\n\r\n error = stderr.read().decode('utf-8')\r\n if error:\r\n print(f\"Error: {error}\")\r\n return None\r\n\r\n output = stdout.read().decode('utf-8')\r\n print(\"Successfully retrieved log content.\")\r\n print(\"Full log content:\")\r\n print(output) # 打印完整的日志内容\r\n\r\n if filter_word:\r\n filtered_output = \"\\n\".join([line for line in output.split('\\n') if filter_word in line])\r\n if not filtered_output:\r\n print(f\"No lines found containing the filter word: {filter_word}\")\r\n return filtered_output\r\n\r\n return output\r\n\r\n except paramiko.ssh_exception.PasswordRequiredException:\r\n print(\"Error: The private key file is encrypted but no passphrase was provided.\")\r\n return None\r\n except paramiko.ssh_exception.SSHException as e:\r\n print(f\"SSH Error: {e}\")\r\n return None\r\n except Exception as e:\r\n print(f\"An error occurred: {e}\")\r\n return None\r\n finally:\r\n if 'client' in locals():\r\n client.close()\r\n\r\nif __name__ == \"__main__\":\r\n host = \"192.168.5.218\" # 替换为你的服务器 IP 或域名\r\n username = \"root\" # 替换为你的用户名\r\n # private_key_path = \"C:\\\\Users\\\\29194\\\\.ssh\\\\id_rsa\" # 替换为你的私钥文件路径\r\n private_key_path = \"C:/Users/29194/.ssh/id_rsa\"\r\n passphrase = \"Ubains@123\" # 确保这是你的私钥文件的正确 passphrase\r\n log_path = \"/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log\" # 替换为你要读取的日志文件路径\r\n filter_word = \"\" # 替换为你想要过滤的关键字\r\n\r\n log_content = get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path)\r\n\r\n if log_content:\r\n print(log_content)\r\n else:\r\n print(\"Failed to retrieve log content.\")\r\n\r\n\r\n\r\n\r\n\r\nimport paramiko\r\nimport threading\r\nimport time\r\n\r\nclass LogCollector:\r\n def __init__(self, host, username, private_key_path, passphrase, log_path, timeout=30, keepalive_interval=60):\r\n self.host = host\r\n self.username = username\r\n self.private_key_path = private_key_path\r\n self.passphrase = passphrase\r\n self.log_path = log_path\r\n self.timeout = timeout\r\n self.keepalive_interval = keepalive_interval # 保持连接活跃的时间间隔(秒)\r\n self.client = None\r\n self.channel = None\r\n self.log_content = []\r\n self.collecting = False\r\n self.lock = threading.Lock()\r\n self.thread = None\r\n\r\n def connect(self):\r\n try:\r\n private_key = paramiko.RSAKey.from_private_key_file(self.private_key_path, password=self.passphrase)\r\n self.client = paramiko.SSHClient()\r\n self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())\r\n self.client.connect(self.host, username=self.username, pkey=private_key, timeout=self.timeout)\r\n print(f\"Connected to {self.host} as {self.username}\")\r\n except Exception as e:\r\n print(f\"Failed to connect: {e}\")\r\n self.client = None\r\n\r\n def start_collection(self):\r\n if self.collecting:\r\n print(\"Already collecting logs.\")\r\n return\r\n\r\n self.connect()\r\n if self.client is None:\r\n print(\"Failed to start collection due to connection failure.\")\r\n return\r\n\r\n self.channel = self.client.invoke_shell()\r\n command = f\"tail -f {self.log_path}\\n\"\r\n self.channel.send(command)\r\n self.collecting = True\r\n self.thread = threading.Thread(target=self.collect_logs)\r\n self.thread.start()\r\n print(\"Log collection started.\")\r\n\r\n def stop_collection(self, output_file):\r\n if not self.collecting:\r\n print(\"Not collecting logs.\")\r\n return\r\n\r\n self.collecting = False\r\n if self.channel:\r\n self.channel.close()\r\n if self.client:\r\n self.client.close()\r\n print(\"Log collection stopped.\")\r\n\r\n with open(output_file, 'w', encoding='utf-8') as f:\r\n f.write('\\n'.join(self.log_content))\r\n print(f\"Log content saved to {output_file}\")\r\n\r\n def collect_logs(self):\r\n try:\r\n while self.collecting:\r\n if self.channel.recv_ready():\r\n output = self.channel.recv(1024).decode('utf-8')\r\n with self.lock:\r\n self.log_content.append(output.strip())\r\n print(output.strip())\r\n else:\r\n time.sleep(0.1) # 避免CPU占用过高\r\n\r\n # 发送keepalive包以保持连接活跃\r\n self.client.get_transport().send_ignore()\r\n time.sleep(self.keepalive_interval)\r\n except Exception as e:\r\n print(f\"Error in collect_logs: {e}\")\r\n self.collecting = False\r\n finally:\r\n if self.channel:\r\n self.channel.close()\r\n if self.client:\r\n self.client.close()\r\n print(\"Log collection thread terminated.\")\r\n\r\n# 使用示例\r\nif __name__ == \"__main__\":\r\n host = \"192.168.5.218\"\r\n username = \"root\"\r\n private_key_path = \"C:\\\\Users\\\\29194\\\\.ssh\\\\id_rsa\" # 替换为你的私钥文件路径\r\n passphrase = \"Ubains@123\" # 替换为你的 passphrase\r\n log_path = \"/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log\"\r\n output_file = \"collected_logs.txt\"\r\n\r\n collector = LogCollector(host, username, private_key_path, passphrase, log_path)\r\n\r\n # 开始收集日志\r\n collector.start_collection()\r\n\r\n # 假设一段时间后停止收集日志\r\n try:\r\n while True:\r\n time.sleep(1) # 保持主程序运行\r\n except KeyboardInterrupt:\r\n print(\"Stopping log collection due to user interrupt.\")\r\n collector.stop_collection(output_file)\r\n print(\"日志收集完成!\")
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/预定系统/Base/base.py b/预定系统/Base/base.py
--- a/预定系统/Base/base.py (revision 2dce467dae0b5bd2c5518da172a6d79724750706)
+++ b/预定系统/Base/base.py (date 1748316209407)
@@ -75,7 +75,7 @@
# 手动指定ChromeDriver的路径
# 自动化运行服务器的chromedriver路径:
# 拯救者电脑
- # service = Service(r'C:\Users\29194\AppData\Local\Programs\Python\Python310\Scripts\chromedriver.exe')
+ service = Service(r'C:\Users\29194\AppData\Local\Programs\Python\Python310\Scripts\chromedriver.exe')
# EDY电脑
# service = Service(r'C:\Program Files\Python310\Scripts\chromedriver.exe')
# 云电脑
......@@ -75,11 +75,11 @@ def browser_init(login_type):
# 手动指定ChromeDriver的路径
# 自动化运行服务器的chromedriver路径:
# 拯救者电脑
# service = Service(r'C:\Users\29194\AppData\Local\Programs\Python\Python310\Scripts\chromedriver.exe')
service = Service(r'C:\Users\29194\AppData\Local\Programs\Python\Python310\Scripts\chromedriver.exe')
# EDY电脑
# service = Service(r'C:\Program Files\Python310\Scripts\chromedriver.exe')
# 云电脑
service = Service(r'E:\Python\Scripts\chromedriver.exe')
# service = Service(r'E:\Python\Scripts\chromedriver.exe')
# 尝试创建WebDriver实例并执行初始化操作
try:
# 创建WebDriver实例
......
......@@ -307,4 +307,6 @@
- 处理MQTT创建客户端实例时补充账号密码的配置。
88. 2025-05-27:
- 安卓信息消息监听脚本也增加MQTT账号密码的配置处理。
- 调试兰州中石化项目会议申报模块的自动化JSON数据。
\ No newline at end of file
- 调试兰州中石化项目会议申报模块的自动化JSON数据。
89. 2025-06-03:
- 兰州中石化项目输出会议申报模块的JSON数据,调试自动化运行。
\ No newline at end of file
......@@ -64,4 +64,9 @@ class ConferenceDeclaration:
text = elment_get_text((locator_type, locator_value), wd)
INFO(f"获取到的文本信息为:{text}")
CHECK_POINT(f"获取到的文本信息为:{text}", expented_result in text)
SELENIUM_LOG_SCREEN(wd, "50")
\ No newline at end of file
SELENIUM_LOG_SCREEN(wd, "50")
# 执行完一个用例就刷新一下页面重置
wd.refresh()
wd.refresh()
sleep(2)
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论