Index: 运维集控/项目测试/运维标准版/cases/08设备管理/01新增设备.py IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/运维集控/项目测试/运维标准版/cases/08设备管理/01新增设备.py b/运维集控/项目测试/运维标准版/cases/08设备管理/01新增设备.py new file mode 100644 --- /dev/null (date 1741229657204) +++ b/运维集控/项目测试/运维标准版/cases/08设备管理/01新增设备.py (date 1741229657204) @@ -0,0 +1,108 @@ +import sys +import os + +from hytest.common import SELENIUM_LOG_SCREEN + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', '..'))) +from 运维集控.项目测试.运维标准版.lib.base import * + +# 构建 CSV 文件的绝对路径 +csv_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..', '..', 'testdata', '08设备管理', '新增设备.csv')) + +class AreafuntionAdd: + tag = ['新增设备'] + ddt_cases = read_csv_data(csv_path) + def teststeps(self): + wd = GSTORE['wd'] + + #从self.para中解构出数据 + name = self.name + area_group, area_type, area_name, area_ip, remark, info = self.para + + STEP(1, '点击新增按钮') + areafuntion_add = WebDriverWait(wd, 10).until( + EC.element_to_be_clickable( + (By.XPATH, "//div[@class='company-edmit-right']//span[contains(text(),'新增')]")) + ) + areafuntion_add.click() + sleep(1) + + STEP(2, f'查找并选择区域分组:{area_group}') + if area_group: + area_group_input = WebDriverWait(wd, 10).until( + EC.presence_of_element_located( + (By.XPATH, "//div[@class='dialog_input']//input[@placeholder='请选择分组']")) + ) + area_group_input.send_keys(area_group) + #默认选择第一个分组 + areagroup_select = WebDriverWait(wd, 10).until( + EC.presence_of_element_located( + ( + By.XPATH, "//li[@class='el-cascader__suggestion-item']")) + ) + sleep(1) + areagroup_select.click() + else: + print("group_name 为空,不执行选择区域分组的操作") + + STEP(3, f'选择区域类型:{area_type}') + if area_type: + area_type_input = WebDriverWait(wd, 10).until( + EC.presence_of_element_located( + (By.XPATH, "//div[@class='dialog_input']//input[@placeholder='请选择区域类型']")) + ) + area_type_input.send_keys(area_type) + #默认选择第一个区域类型 + areatype_select = WebDriverWait(wd, 10).until( + EC.presence_of_element_located( + (By.XPATH, "//div[@x-placement='bottom-start']//ul[@class='el-scrollbar__view el-select-dropdown__list']")) + ) + sleep(1) + areatype_select.click() + else: + print("area_type 为空,不执行选择区域类型的操作") + + STEP(4, f'输入区域名称:{area_name}') + area_name_input = WebDriverWait(wd, 10).until( + EC.presence_of_element_located( + (By.XPATH, "//div[contains(@class,'dialog_input')]//input[contains(@placeholder,'请输入区域名称')]")) + ) + area_name_input.clear() + area_name_input.send_keys(area_name) + + STEP(5, f'填写IP地址:{area_ip}') + area_ip_input = WebDriverWait(wd, 10).until( + EC.presence_of_element_located( + (By.XPATH, "//input[@placeholder='IP地址']")) + ) + area_ip_input.clear() + area_ip_input.send_keys(area_ip) + + STEP(6, f'填写备注:{remark}') + funtion_remark = WebDriverWait(wd, 10).until( + EC.presence_of_element_located( + (By.XPATH, "//input[@placeholder='备注']")) + ) + funtion_remark.clear() + funtion_remark.send_keys(remark) + + STEP(7, '点击确认') + commit = WebDriverWait(wd, 10).until( + EC.element_to_be_clickable( + (By.XPATH, "//div[@aria-label='新增']//span[contains(text(),'确 定')]")) + ) + commit.click() + + STEP(8, '验证是否新增成功') + get_menu = WebDriverWait(wd, 10).until( + EC.visibility_of_element_located( + (By.CSS_SELECTOR, '.el-message__content')) + ) + get_menu1 = get_menu.text + CHECK_POINT('检查是否出现成功提示弹窗', get_menu1 == info) + + # 截图并保存 + SELENIUM_LOG_SCREEN(wd, "50%") + sleep(1) + wd.refresh() \ No newline at end of file Index: 运维集控/项目测试/运维标准版/testdata/07类型标签/新增标签.csv IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/运维集控/项目测试/运维标准版/testdata/07类型标签/新增标签.csv b/运维集控/项目测试/运维标准版/testdata/07类型标签/新增标签.csv --- a/运维集控/项目测试/运维标准版/testdata/07类型标签/新增标签.csv (revision 6cedec8cd6072a6338b10f2314cd65d7c35c57f7) +++ b/运维集控/项目测试/运维标准版/testdata/07类型标签/新增标签.csv (date 1741229657220) @@ -1,0 +1,11 @@ +name,typename,sort +新增标签-001-标签名称为空,,1 +新增标签-002-排序为空,智慧大屏1, +新增标签-003-排序为小数,智慧大屏2,1.1 +新增标签-004-排序为负数,智慧大屏3,-1 +新增标签-005-正常新增标签1,智慧屏设备,1 +新增标签-006-正常新增标签2,时序电源,2 +新增标签-007-正常新增标签3,中控主机,3 +新增标签-008-正常新增标签4,音频处理器,4 +新增标签-009-正常新增标签5,视频处理器,5 +新增标签-010-正常新增标签6,网络路由,6 Index: 运维集控/项目测试/运维标准版/lib/base.py IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP <+>import pandas as pd\r\nimport csv\r\nimport urllib\r\nimport glob\r\nimport subprocess\r\nimport requests\r\nimport json\r\nimport hmac\r\nimport hashlib\r\nimport base64\r\nimport time\r\nimport win32api\r\nimport win32con\r\nimport win32gui\r\nimport logging\r\nfrom hytest import *\r\nimport pandas as pd\r\nfrom selenium import webdriver\r\nfrom datetime import datetime\r\nfrom urllib.parse import urlencode\r\nfrom selenium.webdriver.common.by import By\r\nfrom selenium.webdriver.edge.options import Options\r\nfrom selenium.webdriver.support.wait import WebDriverWait\r\nfrom selenium.webdriver.support import expected_conditions as EC\r\nfrom selenium.common import TimeoutException,ElementNotInteractableException\r\nfrom selenium.webdriver.common.keys import Keys\r\nfrom time import sleep\r\n\r\nlogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\r\n# 打开浏览器,忽略ssh警告\r\ndef open_browser():\r\n INFO('打开默认浏览器')\r\n # 更改显示屏分辨率为1920x1080\r\n # success = change_resolution(1280, 1024)\r\n success = change_resolution(1920, 1080)\r\n edge_options = Options()\r\n edge_options.add_argument('--ignore-certificate-errors')\r\n edge_options.add_argument('--disable-blink-features=AutomationControlled')\r\n edge_options.add_argument('--allow-insecure-localhost')\r\n wd = webdriver.Edge(options=edge_options)\r\n GSTORE['wd'] = wd\r\n # wd.get('https://rms.ubainsyun.com/#/login')\r\n wd.get('https://192.168.5.218:8443/#/login')\r\n wd.maximize_window()\r\n wd.implicitly_wait(10)\r\n\r\n# 定义调整屏幕分辨率,仅内部环境跑定时任务需要使用。\r\ndef change_resolution(width, height):\r\n # 获取当前显示器的设备上下文(Device Context, DC)\r\n device = win32api.EnumDisplayDevices(None, 0)\r\n dm = win32api.EnumDisplaySettings(device.DeviceName, win32con.ENUM_CURRENT_SETTINGS)\r\n\r\n if dm.PelsWidth != width or dm.PelsHeight != height:\r\n print(f\"Changing resolution to {width}x{height}\")\r\n dm.PelsWidth = width\r\n dm.PelsHeight = height\r\n\r\n # CDS_TEST 是测试模式,如果设置成功则不实际应用更改\r\n if win32api.ChangeDisplaySettings(dm, win32con.CDS_TEST) != win32con.DISP_CHANGE_SUCCESSFUL :\r\n print(\"The requested resolution change is not supported.\")\r\n return False\r\n\r\n # 实际应用新的分辨率设置\r\n if win32api.ChangeDisplaySettings(dm, 0) != win32con.DISP_CHANGE_SUCCESSFUL:\r\n print(\"Failed to change resolution.\")\r\n return False\r\n\r\n print(\"Resolution changed successfully.\")\r\n return True\r\n else:\r\n print(\"The requested resolution is already set.\")\r\n return True\r\n\r\n# 用户进行登录\r\ndef user_login(username, password, captcha):\r\n wd = GSTORE['wd']\r\n INFO(f'输入登录账号: {username}')\r\n username_input = WebDriverWait(wd, 10).until(\r\n EC.presence_of_element_located((By.XPATH, \"//input[@placeholder='请输入登录账号']\"))\r\n )\r\n username_input.clear()\r\n username_input.send_keys(username)\r\n\r\n INFO(f'输入登录密码: {password}')\r\n password_input = WebDriverWait(wd, 10).until(\r\n EC.presence_of_element_located((By.XPATH, \"//input[@placeholder='请输入登录密码']\"))\r\n )\r\n password_input.clear()\r\n password_input.send_keys(password)\r\n\r\n INFO(f'输入验证码:{captcha}')\r\n captcha_input = WebDriverWait(wd, 10).until(\r\n EC.presence_of_element_located((By.XPATH, \"//input[@placeholder='请输入验证码(区分大小写)']\"))\r\n )\r\n captcha_input.clear()\r\n captcha_input.send_keys(captcha)\r\n\r\n INFO('点击登录按钮')\r\n login_button = WebDriverWait(wd, 3).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//div[@class='loginButton']\"))\r\n )\r\n login_button.click()\r\n sleep(1)\r\n\r\ndef validate_input(username, password, captcha):\r\n if not isinstance(username, str) or not isinstance(password, str) or not isinstance(captcha, str):\r\n raise ValueError(\"Invalid input type\")\r\n if len(username) < 1 or len(password) < 1 or len(captcha) < 1:\r\n raise ValueError(\"Input cannot be empty\")\r\n\r\ndef run_login_tests(df):\r\n # 遍历每一行\r\n for index, row in df.iterrows():\r\n username = row.get('username', None)\r\n password = row.get('password', None)\r\n captcha = row.get('captcha', None)\r\n if username and password and captcha:\r\n try:\r\n validate_input(username, password, captcha)\r\n user_login(username, password, captcha)\r\n except ValueError as e:\r\n print(f\"Invalid input at row {index}: {e}\")\r\n\r\n# 构建当前所在目录\r\ndef main():\r\n # 获取当前脚本的目录\r\n current_dir = os.path.dirname(os.path.abspath(__file__))\r\n csv_file_path = os.path.join(current_dir, '../testdata/01登录模块/登录信息.csv')\r\n try:\r\n if not os.path.exists(csv_file_path):\r\n print(f\"File not found: {csv_file_path}\")\r\n return\r\n df = pd.read_csv(csv_file_path, encoding='utf-8')\r\n run_login_tests(df)\r\n except Exception as e:\r\n print(f\"An error occurred: {e}\")\r\n\r\n# 进入后台管理系统页面\r\ndef enter_system():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击系统设置按钮')\r\n try:\r\n enter_sys = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'系统设置')]\"))\r\n )\r\n enter_sys.click()\r\n logging.info('系统设置按钮已点击')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking system settings button: {error}\")\r\n\r\n# 进入管理页面\r\ndef enter_manage():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击管理按钮')\r\n try:\r\n enter_mag = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//div[@class='el-submenu__title']//span[contains(text(),'管理')]\"))\r\n )\r\n enter_mag.click()\r\n logging.info('管理按钮已点击')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入用户管理页面\r\ndef enter_user_manage():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击用户管理按钮')\r\n try:\r\n enter_user_mag = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'用户管理')]\"))\r\n )\r\n enter_user_mag.click()\r\n logging.info('打开用户管理页面')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入区域分组管理页面\r\ndef enter_areagroup_manage():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击区域分组按钮')\r\n try:\r\n enter_areagroup_mag = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'区域分组')]\"))\r\n )\r\n enter_areagroup_mag.click()\r\n logging.info('打开区域分组页面')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入区域类型页面\r\ndef enter_areatype_manage():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击区域类型按钮')\r\n try:\r\n enter_areatype_mag = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'区域类型')]\"))\r\n )\r\n enter_areatype_mag.click()\r\n logging.info('打开区域类型页面')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入区域功能页面\r\ndef enter_areafuntion_manage():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击区域功能按钮')\r\n try:\r\n enter_areafuntion_mag = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'区域功能')]\"))\r\n )\r\n enter_areafuntion_mag.click()\r\n logging.info('打开区域功能页面')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入区域管理页面\r\ndef enter_area_manage():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击区域管理按钮')\r\n try:\r\n enter_area_mag = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'区域管理')]\"))\r\n )\r\n enter_area_mag.click()\r\n logging.info('打开区域管理页面')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入类型标签页面\r\ndef enter_typetag():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击类型标签按钮')\r\n try:\r\n enter_type_tag = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'类型标签')]\"))\r\n )\r\n enter_type_tag.click()\r\n logging.info('打开类型标签页面')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入设备管理页面\r\ndef enter_devices_manage():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击设备管理按钮')\r\n try:\r\n enter_devices_mag = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'设备管理')]\"))\r\n )\r\n enter_devices_mag.click()\r\n logging.info('打开设备管理页面')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入协议管理页面\r\ndef enter_protocol_manage():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击协议管理按钮')\r\n try:\r\n enter_protocol_mag = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'协议管理')]\"))\r\n )\r\n enter_protocol_mag.click()\r\n logging.info('打开协议管理页面')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入数据类型页面\r\ndef enter_datatype():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击数据类型按钮')\r\n try:\r\n enter_data_type = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'数据类型')]\"))\r\n )\r\n enter_data_type.click()\r\n logging.info('打开数据类型页面')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入联动动作页面\r\ndef enter_linkaction():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击联动动作按钮')\r\n try:\r\n enter_link_action = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'联动动作')]\"))\r\n )\r\n enter_link_action.click()\r\n logging.info('打开联动动作页面')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入模式管理页面\r\ndef enter_model_manage():\r\n wd = GSTORE['wd']\r\n if wd is None:\r\n raise ValueError(\"WebDriver is not initialized\")\r\n\r\n logging.info('点击模式管理按钮')\r\n try:\r\n enter_model_mag = WebDriverWait(wd, 10).until(\r\n EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'模式管理')]\"))\r\n )\r\n enter_model_mag.click()\r\n logging.info('打开模式管理页面')\r\n except (TimeoutException, ElementNotInteractableException) as error:\r\n logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 定义钉钉接口函数\r\ndef dingding_send_message(test_report_url, title, mobile, ding_type):\r\n \"\"\"\r\n 发送钉钉机器人消息\r\n 参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x\r\n\r\n :param test_report_url: 测试报告链接\r\n :param title: 消息标题\r\n :param text: 消息内容\r\n :param mobile: 需要@的手机号列表\r\n \"\"\"\r\n # 记录调用此函数的日志\r\n logging.info(\"开始构建并发送钉钉机器人消息\")\r\n # 钉钉机器人的 Webhook URL 和密钥(正式环境)\r\n # webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=b0eea0bbf097ce3badb4c832d2cd0267a50486f395ec8beca6e2042102bb295b'\r\n # secret = 'SEC928b11659c5fd6476cfa2042edbf56da876abf759289f7e4d3c671fb9a81bf43'\r\n # 钉钉机器人的 Webhook URL 和密钥(测试环境)\r\n\r\n if ding_type == '标准版巡检':\r\n webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'\r\n secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'\r\n elif ding_type == '展厅巡检':\r\n webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=061b6e9b1ae436f356cfda7fe19b6e58e46b62670046a78bd3a4d869118c612d'\r\n secret = 'SEC93212bd880aad638cc0df2b28a72ef4fdf6651cacb8a6a4bc71dcf09705d458d'\r\n\r\n # 生成时间戳\r\n timestamp = str(round(time.time() * 1000))\r\n\r\n # 生成签名\r\n secret_enc = secret.encode('utf-8')\r\n string_to_sign = f'{timestamp}\\n{secret}'\r\n string_to_sign_enc = string_to_sign.encode('utf-8')\r\n hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()\r\n sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))\r\n\r\n # 构建最终的 Webhook URL\r\n params = {\r\n 'access_token': webhook_url.split('=')[1],\r\n 'timestamp': timestamp,\r\n 'sign': sign\r\n }\r\n encoded_params = urllib.parse.urlencode(params)\r\n final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}'\r\n\r\n # 记录最终的 Webhook URL\r\n logging.info(f\"钉钉机器人Webhook URL: {final_webhook_url}\")\r\n\r\n # 构建消息体\r\n headers = {'Content-Type': 'application/json'}\r\n message = {\r\n 'msgtype': 'link',\r\n 'link': {\r\n 'title': title,\r\n 'messageUrl': test_report_url,\r\n 'text': test_report_url\r\n },\r\n \"at\": {\r\n \"atMobiles\": [mobile],\r\n \"isAtAll\": True\r\n }\r\n }\r\n\r\n try:\r\n # 发送 POST 请求\r\n response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers)\r\n\r\n # 检查响应状态码\r\n if response.status_code == 200:\r\n logging.info('消息发送成功!')\r\n logging.info(f'响应内容: {response.text}')\r\n else:\r\n logging.error(f'消息发送失败,状态码: {response.status_code}')\r\n logging.error(f'响应内容: {response.text}')\r\n except requests.exceptions.RequestException as e:\r\n logging.error(f'请求异常: {e}')\r\n\r\n# 调用框架自带生成的测试报告\r\ndef get_latest_report_file(report_dir, base_url):\r\n \"\"\"\r\n 获取指定目录下最新的HTML报告文件,并返回带有基础URL的完整路径。\r\n\r\n :param report_dir: 报告文件所在的目录\r\n :param base_url: 基础URL\r\n :return: 最新的HTML报告文件的完整URL,如果没有找到则返回None\r\n \"\"\"\r\n # 记录调用此函数的日志\r\n logging.info(\"开始调用get_latest_report_file函数获取报告文件\")\r\n\r\n # 确保报告目录存在\r\n if not os.path.exists(report_dir):\r\n logging.error(f\"报告目录 {report_dir} 不存在。\")\r\n return None\r\n\r\n # 获取指定目录下所有符合模式的HTML报告文件\r\n report_files = glob.glob(os.path.join(report_dir, 'report_*.html'))\r\n\r\n # 打印找到的文件列表\r\n logging.debug(f\"找到的报告文件: {report_files}\")\r\n\r\n # 如果没有找到报告文件,记录警告信息并返回None\r\n if not report_files:\r\n logging.warning(\"在指定目录中没有找到报告文件。\")\r\n return None\r\n\r\n # 找到最新修改的报告文件\r\n latest_file = max(report_files, key=os.path.getmtime)\r\n\r\n # 获取最新报告文件的最后修改时间\r\n last_modified_time = datetime.fromtimestamp(os.path.getmtime(latest_file)).strftime('%Y-%m-%d %H:%M:%S')\r\n\r\n # 记录最新报告文件的信息\r\n logging.info(f\"最新报告文件: {latest_file}, 最后修改时间: {last_modified_time}\")\r\n\r\n # 将文件路径转换为相对于基础URL的相对路径\r\n relative_path = os.path.relpath(latest_file, report_dir)\r\n\r\n # 生成完整的URL\r\n full_url = f\"{base_url}/{relative_path}\".replace(\"\\\\\", \"/\")\r\n\r\n # 返回完整的URL\r\n return full_url\r\n\r\n# 定义发送钉钉报告函数\r\ndef get_reportfile_send_dingding(report_title, report_url_prefix, ding_type):\r\n try:\r\n # 获取报告文件所在的目录\r\n report_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..','log')\r\n\r\n # 获取基础URL\r\n base_url = report_url_prefix\r\n\r\n # 获取最新的报告文件\r\n latest_report = get_latest_report_file(report_dir, base_url)\r\n\r\n # 如果找到了最新的报告文件,则发送报告链接到钉钉\r\n if latest_report:\r\n logging.info(f\"最新报告文件URL: {latest_report}\")\r\n\r\n try:\r\n # 记录调用钉钉消息通知函数的日志\r\n logging.info(\"开始调用钉钉消息通知函数\")\r\n\r\n # 调用钉钉发送消息接口进行推送测试报告链接\r\n dingding_send_message(latest_report, report_title, \"13724387318\", ding_type)\r\n\r\n # 记录钉钉消息通知函数调用成功的日志\r\n logging.info(\"钉钉消息通知函数调用成功\")\r\n except Exception as e:\r\n # 记录钉钉消息通知函数调用失败的日志\r\n logging.error(f\"钉钉消息通知函数调用失败: {e}\")\r\n else:\r\n # 记录没有找到报告文件的日志\r\n logging.warning(\"没有找到报告文件以发送。\")\r\n\r\n except subprocess.CalledProcessError as e:\r\n # 处理子进程调用失败的异常\r\n logging.error(f\"命令执行失败,返回码 {e.returncode}: {e.output}\")\r\n except OSError as e:\r\n # 处理操作系统相关的异常\r\n logging.error(f\"发生操作系统错误: {e}\")\r\n finally:\r\n # 无论是否成功,都记录测试结束的日志\r\n logging.info(\"自动化测试完成。\")\r\n\r\n# 获取csv文件数据\r\ndef read_csv_data(csv_file_path):\r\n \"\"\"\r\n 读取CSV文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。\r\n 参数:\r\n csv_file_path (str): CSV文件的路径。\r\n 返回:\r\n list: 包含字典的列表,每个字典包含测试用例的名称和参数。\r\n \"\"\"\r\n # 打开CSV文件,使用只读模式,确保兼容性并处理编码\r\n with open(csv_file_path, mode='r', newline='', encoding='utf-8') as file:\r\n # 创建CSV阅读器\r\n reader = csv.reader(file)\r\n # 读取表头,为后续数据解析做准备\r\n headers = next(reader)\r\n ddt_cases = []\r\n # 遍历CSV文件中的每一行数据\r\n for row in reader:\r\n # 将每一行数据转换为字典,其中包含测试用例的名称和参数\r\n case = {\r\n 'name': row[0],\r\n 'para': row[1:]\r\n }\r\n # 将转换后的测试用例数据添加到列表中\r\n ddt_cases.append(case)\r\n # 日志记录:CSV文件已读取\r\n INFO(\"CSV文件已读取\")\r\n # 返回包含所有测试用例数据的列表\r\n return ddt_cases\r\n\r\ndef run_automation_test(report_title, report_url_prefix , ding_type):\r\n \"\"\"\r\n 运行自动化测试并生成报告。\r\n\r\n 参数:\r\n - report_title: 报告的标题\r\n - report_url_prefix: 报告URL的前缀\r\n - test_case: 测试用例脚本执行的标签名\r\n \"\"\"\r\n # 记录测试开始的日志\r\n logging.info(\"开始自动化测试...\")\r\n\r\n # 构建运行测试命令\r\n command = [\r\n 'hytest',\r\n '--report_title', report_title,\r\n '--report_url_prefix', report_url_prefix\r\n ]\r\n\r\n # 记录将要执行的命令日志\r\n logging.info(f\"执行命令: {' '.join(command)}\")\r\n\r\n try:\r\n # 执行测试命令并获取结果\r\n result = subprocess.run(command, capture_output=True, text=True, check=True)\r\n\r\n # 记录命令的标准输出和错误输出\r\n logging.debug(f\"命令标准输出: {result.stdout}\")\r\n logging.debug(f\"命令错误输出: {result.stderr}\")\r\n except subprocess.CalledProcessError as e:\r\n # 处理子进程调用失败的异常\r\n logging.error(f\"命令执行失败,返回码 {e.returncode}: {e.output}\")\r\n except OSError as e:\r\n # 处理操作系统相关的异常\r\n logging.error(f\"发生操作系统错误: {e}\")\r\n finally:\r\n # 无论测试是否成功,都记录测试结束的日志\r\n logging.info(\"自动化测试完成。\")\r\n\r\n # 调用回调函数处理后续操作\r\n get_reportfile_send_dingding(f\"{report_title}\", report_url_prefix, ding_type)\r\n Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/运维集控/项目测试/运维标准版/lib/base.py b/运维集控/项目测试/运维标准版/lib/base.py --- a/运维集控/项目测试/运维标准版/lib/base.py (revision 6cedec8cd6072a6338b10f2314cd65d7c35c57f7) +++ b/运维集控/项目测试/运维标准版/lib/base.py (date 1741231416971) @@ -1,4 +1,6 @@ +import openpyxl import pandas as pd +import re import csv import urllib import glob @@ -22,7 +24,7 @@ from selenium.webdriver.edge.options import Options from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC -from selenium.common import TimeoutException,ElementNotInteractableException +from selenium.common import TimeoutException, ElementNotInteractableException, NoSuchElementException from selenium.webdriver.common.keys import Keys from time import sleep @@ -44,7 +46,7 @@ wd.maximize_window() wd.implicitly_wait(10) -# 定义调整屏幕分辨率,仅内部环境跑定时任务需要使用。 +# 定义调整屏幕分辨率,仅虚拟机电脑环境跑定时任务需要使用。 def change_resolution(width, height): # 获取当前显示器的设备上下文(Device Context, DC) device = win32api.EnumDisplayDevices(None, 0) @@ -537,6 +539,7 @@ # 返回包含所有测试用例数据的列表 return ddt_cases +# 定义生成的报告,并且发送钉钉通知 def run_automation_test(report_title, report_url_prefix , ding_type): """ 运行自动化测试并生成报告。 @@ -578,3 +581,351 @@ # 调用回调函数处理后续操作 get_reportfile_send_dingding(f"{report_title}", report_url_prefix, ding_type) + + +# 获取报告中的通过率、失败率和异常率,用于发送钉钉报告 +def get_test_result(latest_report, wd): + """ + 获取测试结果页面的通过率、失败率和异常率 + + :param latest_report: 测试结果页面的URL + :param wd: WebDriver实例,用于访问和操作网页 + :return: 包含通过率、失败率和异常率的字典 + """ + # 初始化测试结果字典 + test_result = { + "pass_percent": "", + "fail_percent": "", + "exception_percent": "" + } + + # 访问测试结果页面 + wd.get(latest_report) + sleep(5) + + # 点击简略显示 + safe_click((By.XPATH,"//div[@id='display_mode']"), wd) + sleep(5) + + # 定义一个函数来获取和解析百分比 + def get_percentage(selector, wd): + text = elment_get_text(selector, wd) + logging.info(f"获取的文本:{text}") + match = re.search(r'(\d+(\.\d+)?)%', text) + if match: + return match.group(0) + else: + logging.error(f"未找到百分比匹配项,文本内容: {text}") + return "0" + + # 获取通过率 + pass_percent = get_percentage((By.CSS_SELECTOR, "div[class='result_barchart'] div:nth-child(1) span:nth-child(1)"), wd) + test_result["pass_percent"] = pass_percent + + # 获取失败率 + fail_percent = get_percentage( + (By.CSS_SELECTOR, "body > div.main_section > div.result > div > div:nth-child(2) > span"), wd) + test_result["fail_percent"] = fail_percent + + # 获取异常率 + exception_percent = get_percentage( + (By.CSS_SELECTOR, "body > div.main_section > div.result > div > div:nth-child(3) > span"), wd) + test_result["exception_percent"] = exception_percent + # 输出test_result + logging.info(test_result) + print(test_result) + sleep(5) + + # 返回测试结果字典 + return test_result + + +# 通用方法-点击元素 +def safe_click(element_locator, wd): + """ + 对其定位器指定的元素执行安全单击。 + 此函数尝试以处理潜在异常的方式单击元素。 + 它等待元素可见并可单击,然后再尝试单击它。 + 如果该元素在20秒内无法点击,或者它不存在, + 或者不可交互,它会捕获相应的异常并记录一条信息性消息。 + 参数: + -element_locator:要单击的元素的定位器,指定为元组。 + -wd:用于查找元素并与之交互的WebDriver实例。 + """ + try: + # Wait up to 20 seconds for the element to be visible + element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) + # Attempt to click the element + element.click() + except TimeoutException: + # Log a message if the element is not found or not clickable within 20 seconds + INFO(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") + except NoSuchElementException: + # Log a message if the element is not found + INFO(f"NoSuchElementException: Element {element_locator} not found.") + except ElementNotInteractableException: + # Log a message if the element is not interactable + INFO(f"ElementNotInteractableException: Element {element_locator} is not interactable.") + +# 通用方法-在输入框输入内容 +def safe_send_keys(element_locator, value, wd): + """ + 安全地向网页元素发送键值。 + 该函数尝试在指定时间内找到指定的网页元素,如果找到并且元素可见,将先清除元素内容,然后发送指定的键值。 + 如果在指定时间内未能找到元素或元素不可点击,则捕获相应的异常并打印错误信息。 + 参数: + element_locator (tuple): 用于定位网页元素的策略和定位器的元组,例如(By.ID, 'element_id')。 + value (str): 要发送到元素的键值字符串。 + wd: WebDriver实例,用于与浏览器交互。 + 异常处理: + - TimeoutException: 如果元素在指定时间内未被找到或不可点击。 + - NoSuchElementException: 如果元素不存在。 + - ElementNotInteractableException: 如果元素存在但不可交互。 + """ + try: + # 等待元素在指定时间内可见 + element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) + element.clear() # 清除元素的当前值 + element.send_keys(value) # 向元素发送指定的键值 + except TimeoutException: + # 如果元素在指定时间内未被找到或不可点击,打印超时异常信息 + INFO(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") + except NoSuchElementException: + # 如果元素不存在,打印相应异常信息 + INFO(f"NoSuchElementException: Element {element_locator} not found.") + except ElementNotInteractableException: + # 如果元素不可交互,打印相应异常信息 + INFO(f"ElementNotInteractableException: Element {element_locator} is not interactable.") + +# 通用方法-输入框清除 +def input_clear(element_locator, wd): + """ + 清空输入框中的文本。 + + 该函数通过元素定位器找到指定的输入框,并尝试清空其内容。它使用显式等待来确保元素在尝试清除之前是可见的。 + + 参数: + - element_locator: 用于定位输入框的元素定位器,通常是一个元组,包含定位方法和定位表达式。 + - wd: WebDriver实例,用于与浏览器交互。 + + 异常处理: + - TimeoutException: 如果在指定时间内元素不可见,则捕获此异常并打印超时异常消息。 + - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的消息。 + - ElementNotInteractableException: 如果元素不可操作(例如,元素不可见或不可点击),则捕获此异常并打印相应消息。 + """ + try: + # 等待元素可见,并在可见后清空输入框。 + input_element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) + input_element.clear() + except TimeoutException: + # 如果元素在20秒内不可见,打印超时异常消息。 + print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") + except NoSuchElementException: + # 如果找不到元素,打印未找到元素的消息。 + print(f"NoSuchElementException: Element {element_locator} not found.") + except ElementNotInteractableException: + # 如果元素不可操作,打印元素不可操作的消息。 + print(f"ElementNotInteractableException: Element {element_locator} is not interactable.") + +# 通用方法-回车 +def send_keyboard(element_locator, wd): + """ + 向指定元素发送键盘事件。 + + 该函数尝试找到页面上的一个元素,并向其发送RETURN键点击事件。它使用显式等待来确保元素在尝试交互之前是可见的。 + 如果在指定时间内元素不可见、不存在或不可交互,则捕获相应的异常并打印错误消息。 + + 参数: + - element_locator: 用于定位元素的元组,格式为(by, value)。例如,(By.ID, 'element_id')。 + - wd: WebDriver实例,用于与浏览器进行交互。 + + 异常处理: + - TimeoutException: 如果元素在20秒内不可见,则捕获此异常并打印超时错误消息。 + - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的错误消息。 + - ElementNotInteractableException: 如果元素不可交互(例如,被遮挡或不可点击),则捕获此异常并打印相应错误消息。 + """ + try: + # 等待元素可见,并在可见后向其发送RETURN键点击事件。 + element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)) + element.send_keys(Keys.RETURN) + except TimeoutException: + # 如果元素在指定时间内不可见,打印超时错误消息。 + print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") + except NoSuchElementException: + # 如果找不到指定的元素,打印未找到元素的错误消息。 + print(f"NoSuchElementException: Element {element_locator} not found.") + except ElementNotInteractableException: + # 如果元素不可交互,打印不可交互错误消息。 + print(f"ElementNotInteractableException: Element {element_locator} is not interactable.") + +# 通用方法-获取提示文本 +def get_notify_text(wd,element_locator): + """ + 获取通知文本信息。 + + 该函数通过WebDriver等待特定的元素出现,并截取其文本内容作为通知信息。此外,它还负责在获取通知文本后进行屏幕截图, + 以便于后续的调试或记录需要。 + + 参数: + wd (WebDriver): 由上层传入的WebDriver对象,用于操作浏览器。 + + 返回: + str: 提取的通知文本信息。如果未能提取到信息或发生异常,则返回None。 + """ + try: + # 获取提示信息 + notify_text = WebDriverWait(wd, 60).until( + EC.presence_of_element_located(element_locator) + ).text + # 屏幕截图 + SELENIUM_LOG_SCREEN(wd,"50%") + return notify_text + except Exception as e: + # 记录异常信息 + INFO(f"Exception occurred: {e}") + +# 通用方法-获取页面中的文本内容 +def elment_get_text(element_locator, wd): + """ + 获取页面元素的文本。 + + 该函数通过显式等待的方式,确保页面元素在20秒内可见并获取其文本。 + 如果在规定时间内元素不可见、不存在或不可交互,则会捕获相应的异常并打印错误信息。 + + 参数: + - element_locator: 用于定位页面元素的元组,通常包含定位方式和定位值。 + - wd: WebDriver对象,用于与浏览器进行交互。 + + 返回: + - element_text: 页面元素的文本。如果发生异常,则返回None。 + """ + try: + # 使用WebDriverWait等待页面元素在20秒内可见,并获取其文本。 + element_text = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)).text + return element_text + except TimeoutException: + # 如果超过20秒元素仍未可见,则捕获TimeoutException异常并打印错误信息。 + print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.") + except NoSuchElementException: + # 如果页面上不存在该元素,则捕获NoSuchElementException异常并打印错误信息。 + print(f"NoSuchElementException: Element {element_locator} not found.") + except ElementNotInteractableException: + # 如果元素存在但不可交互(例如被遮挡),则捕获ElementNotInteractableException异常并打印错误信息。 + print(f"ElementNotInteractableException: Element {element_locator} is not interactable.") + +# 通用方法-读取XLSX文件中的数据,并将其转换为一个包含字典的列表 +def read_xlsx_data(xlsx_file_path): + """ + 读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。 + + 参数: + xlsx_file_path (str): XLSX文件的路径。 + + 返回: + list: 包含字典的列表,每个字典包含测试用例的名称和参数。 + """ + # 打开XLSX文件 + workbook = openpyxl.load_workbook(xlsx_file_path) + # 假设数据在第一个工作表中 + sheet = workbook.active + + # 读取表头,从第三行开始 + headers = [cell.value for cell in sheet[3]] + + # 打印表头列名 + # print(f"表头列名: {headers}") + + # 找到表头中名为 'JSON' 的列索引 + try: + json_index = headers.index('JSON') + except ValueError as e: + raise ValueError(f"表头中没有找到所需的列: {e}") + + ddt_cases = [] + # 遍历XLSX文件中的每一行数据,从第四行开始 + for row in sheet.iter_rows(min_row=4, values_only=True): + # 获取 JSON 列的数据 + json_data = row[json_index] + + # 打印 JSON 数据以进行调试 + # print(f"JSON 数据: {json_data}") + + # 解析 JSON 字符串 + try: + if json_data: + parsed_json = json.loads(json_data) + else: + raise ValueError("JSON 数据为空") + except json.JSONDecodeError: + raise ValueError(f"无法解析 JSON 数据: {json_data}") + + # 将解析后的 JSON 数据添加到列表中 + ddt_cases.append(parsed_json) + + # 日志记录:XLSX文件已读取 + INFO("XLSX文件已读取") + # 返回包含所有测试用例数据的列表 + return ddt_cases + +# 通用方法-转换字符串类型 +def get_by_enum(type_str): + """ + 将字符串类型的定位器类型转换为 selenium.webdriver.common.by.By 枚举类型。 + + 参数: + type_str (str): 定位器类型字符串,例如 'XPATH'。 + + 返回: + selenium.webdriver.common.by.By: 对应的 By 枚举类型。 + """ + type_str = type_str.upper() + if type_str == 'XPATH': + return By.XPATH + elif type_str == 'ID': + return By.ID + elif type_str == 'NAME': + return By.NAME + elif type_str == 'CLASS_NAME': + return By.CLASS_NAME + elif type_str == 'CSS_SELECTOR': + return By.CSS_SELECTOR + elif type_str == 'TAG_NAME': + return By.TAG_NAME + elif type_str == 'LINK_TEXT': + return By.LINK_TEXT + elif type_str == 'PARTIAL_LINK_TEXT': + return By.PARTIAL_LINK_TEXT + else: + raise ValueError(f"未知的定位器类型: {type_str}") + +# 通用方法-用户登录 +def user_login(element_locators, username, password, verifycode): + """ + 管理员登录函数。 + 该函数通过模拟用户输入用户名、密码和验证码,并点击登录按钮,以实现管理员登录。 + """ + # 获取webdriver实例 + wd = GSTORE['wd'] + + # 打印用户名输入信息 + INFO(f"输入用户名:{username}") + # 向用户名输入框发送用户名 + safe_send_keys(element_locators['username_locator'], f'{username}', wd) + sleep(5) + + # 打印密码输入信息 + INFO(f"输入密码:{password}") + # 向密码输入框发送密码 + safe_send_keys(element_locators['password_locator'], f"{password}", wd) + sleep(5) + + # 打印验证码输入信息 + INFO(f"输入验证码:{verifycode}") + # 向验证码输入验证码 + safe_send_keys(element_locators['verifycode_locator'], f"{verifycode}", wd) + sleep(5) + + #点击登录按钮 + INFO("点击登录按钮") + safe_click(element_locators['submitButton_locator'], wd) + sleep(5) Index: 运维集控/项目测试/运维标准版/testdata/08设备管理/新增设备.csv IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/运维集控/项目测试/运维标准版/testdata/08设备管理/新增设备.csv b/运维集控/项目测试/运维标准版/testdata/08设备管理/新增设备.csv --- a/运维集控/项目测试/运维标准版/testdata/08设备管理/新增设备.csv (revision 6cedec8cd6072a6338b10f2314cd65d7c35c57f7) +++ b/运维集控/项目测试/运维标准版/testdata/08设备管理/新增设备.csv (date 1741229657231) @@ -1,0 +1,2 @@ +name,area_name,device_name,type,brand,model,sn,ip,tag,supplier,contact,price,expiry_time,sort,useYear,description,relateMaster,outMaster +新增设备-001-正常新增设备,测试区域11,测试设备0,智慧屏设备,华为品牌,Ideahub,123456789,192.168.1.1,资产设备1,华为,13141234567,2000,2025-01-01 12:00:00,1,3,用途说明1,否,否 \ No newline at end of file Index: 运维集控/项目测试/运维标准版/cases/07类型标签/01新增标签.py IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/运维集控/项目测试/运维标准版/cases/07类型标签/01新增标签.py b/运维集控/项目测试/运维标准版/cases/07类型标签/01新增标签.py new file mode 100644 --- /dev/null (date 1741229657191) +++ b/运维集控/项目测试/运维标准版/cases/07类型标签/01新增标签.py (date 1741229657191) @@ -0,0 +1,85 @@ +import sys +import os + +from hytest.common import SELENIUM_LOG_SCREEN + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', '..'))) +from 运维集控.项目测试.运维标准版.lib.base import * + +#构建当前项目路径 +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', '..'))) +# 构建 CSV 文件的绝对路径 +csv_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..', '..', 'testdata', '03类型标签', '新增标签.csv')) + +class AreagroupAdd: + tag = ['新增类型标签'] + ddt_cases = read_csv_data(csv_path) + def teststeps(self): + wd = GSTORE['wd'] + # 从self.para中解构出数据 + name = self.name + area_group, group_name, group_address, remark, info = self.para + + STEP(1, '点击新增按钮') + areagroup_add = WebDriverWait(wd, 10).until( + EC.element_to_be_clickable((By.XPATH, "//div[@class='company-edmit-right']//span[contains(text(),'新增')]")) + ) + areagroup_add.click() + + STEP(2, f'查看并选择类型标签:{area_group}') + if area_group: + area_group_input = WebDriverWait(wd, 10).until( + EC.presence_of_element_located((By.XPATH, "// input[ @ placeholder = '请选择分组']")) + ) + area_group_input.clear() + area_group_input.send_keys(area_group) + sleep(1) + #默认选择第一个内容 + area_group_select = WebDriverWait(wd, 10).until( + EC.presence_of_element_located((By.XPATH, "//div[@class='el-cascader__suggestion-panel el-scrollbar']//li[1]//span[1]")) + ) + area_group_select.click() + else: + print("group_name 为空,不执行选择区域分组的操作") + + STEP(3, f'填写标签名称:{group_name}') + group_name_input = WebDriverWait(wd, 10).until( + EC.presence_of_element_located((By.XPATH, "//input[@placeholder='长度1-20个字符']")) + ) + group_name_input.clear() + group_name_input.send_keys(group_name) + + STEP(4, f'填写标签排序:{group_address}') + group_address_input = WebDriverWait(wd, 10).until( + EC.presence_of_element_located((By.XPATH, "//input[@placeholder='输入地址不能大于50个字符']")) + ) + group_address_input.clear() + group_address_input.send_keys(group_address) + + STEP(5, f'填写分组备注:{remark}') + group_remark = WebDriverWait(wd, 10).until( + EC.presence_of_element_located( + (By.XPATH, "//input[@placeholder='请输入备注']")) + ) + group_remark.clear() + group_remark.send_keys(remark) + + STEP(6, '点击确认') + commit = WebDriverWait(wd, 10).until( + EC.element_to_be_clickable((By.XPATH, "//div[@class='dialog-footer']//span[contains(text(),'确 定')]")) + ) + commit.click() + + STEP(7, '验证是否新增成功') + get_menu = WebDriverWait(wd, 10).until( + EC.visibility_of_element_located((By.XPATH, "//p[@class='el-message__content']")) + ) + get_menu1 = get_menu.text + CHECK_POINT('检查是否出现成功提示弹窗', get_menu1 == info ) + + # 截图并保存 + SELENIUM_LOG_SCREEN(wd, "50%") + sleep(1) + wd.refresh() + Index: 预定系统/Base/base.py IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP <+>import csv\r\nimport glob\r\nimport re\r\nimport urllib\r\nfrom selenium.webdriver.chrome.service import Service\r\nimport json\r\nimport hmac\r\nimport hashlib\r\nimport base64\r\nimport psutil\r\nimport time\r\nimport subprocess\r\nimport logging\r\nfrom hytest import *\r\nfrom selenium import webdriver\r\nfrom selenium.common import ElementNotInteractableException\r\nfrom selenium.webdriver.common.keys import Keys\r\nfrom urllib.parse import urlencode\r\nfrom datetime import datetime\r\n\r\n# 获取当前脚本的绝对路径\r\ncurrent_dir = os.path.dirname(os.path.abspath(__file__))\r\n# 获取当前脚本的父目录\r\nparent_dir = os.path.dirname(current_dir)\r\nlogging.info(parent_dir)\r\n# 添加路径\r\nsys.path.append(current_dir)\r\n\r\n# 配置日志记录器,仅输出到控制台\r\nlogging.basicConfig(\r\n level=logging.DEBUG,\r\n format='%(asctime)s - %(levelname)s - %(message)s',\r\n handlers=[\r\n logging.StreamHandler()\r\n ]\r\n)\r\n\r\ndef browser_init(login_type):\r\n \"\"\"\r\n 初始化浏览器设置和实例。\r\n\r\n 此函数旨在创建并配置一个Chrome浏览器实例,包括设置Chrome选项以排除不必要的日志,\r\n 并尝试打开特定的登录页面。任何初始化过程中出现的错误都会被捕获并记录。\r\n\r\n 参数:\r\n login_type (str): 指定登录类型,根据不同的登录类型选择不同的URL。\r\n\r\n 返回:\r\n 无\r\n \"\"\"\r\n # 标记初始化过程的开始\r\n INFO(\"'----------' 正在初始化浏览器 '----------'\")\r\n\r\n # 创建Chrome选项实例,用于配置浏览器行为\r\n options = webdriver.ChromeOptions()\r\n # 添加实验性选项,排除某些命令行开关以减少输出日志\r\n options.add_experimental_option('excludeSwitches', ['enable-Logging'])\r\n # 忽略证书错误,允许在本地主机上运行时不安全\r\n options.add_argument('--ignore-certificate-errors')\r\n # 禁用自动化控制特征检测,避免被网站识别为自动化流量\r\n options.add_argument('--disable-blink-features=AutomationControlled')\r\n # 允许不安全的本地主机运行,通常用于开发和测试环境\r\n options.add_argument('--allow-insecure-localhost')\r\n\r\n # 使用webdriver_manager自动下载并管理chromedriver\r\n # service = ChromeService(ChromeDriverManager().install())\r\n # 使用备用的ChromeDriver下载源\r\n # service = Service(ChromeDriverManager().install())\r\n # 手动指定ChromeDriver的路径\r\n # 自动化运行服务器的chromedriver路径:\r\n service = Service(r'C:\\Users\\29194\\AppData\\Local\\Programs\\Python\\Python310\\Scripts\\chromedriver.exe')\r\n # service = Service(r'C:\\Program Files\\Python310\\Scripts\\chromedriver.exe')\r\n # 尝试创建WebDriver实例并执行初始化操作\r\n try:\r\n # 创建WebDriver实例\r\n wd = webdriver.Chrome(service=service, options=options)\r\n # 设置隐式等待时间为10秒,以允许元素加载\r\n wd.implicitly_wait(60)\r\n\r\n # 获取登录URL\r\n login_url = get_login_url_from_config(login_type)\r\n # 打开对应类型的登录页面\r\n wd.get(login_url)\r\n # 最大化浏览器窗口\r\n wd.maximize_window()\r\n\r\n # 将WebDriver实例存储在全局存储器中,以便后续使用\r\n GSTORE['wd'] = wd\r\n # 标记初始化过程完成\r\n INFO(\"'----------' 浏览器初始化完成 '----------'\")\r\n except Exception as e:\r\n # 捕获并记录初始化过程中的任何异常\r\n logging.error(f\"浏览器初始化失败:{e}\")\r\n\r\n\r\ndef get_login_url_from_config(login_type):\r\n \"\"\"\r\n 从配置文件中读取登录URL。\r\n\r\n 参数:\r\n login_type (str): 指定登录类型,根据不同的登录类型选择不同的URL。\r\n\r\n 返回:\r\n str: 对应的登录URL。\r\n \"\"\"\r\n # 检查 login_type 是否为空或 None\r\n if not login_type:\r\n raise ValueError(\"login_type 不能为空\")\r\n\r\n # 获取当前脚本的绝对路径\r\n current_dir = os.path.dirname(os.path.abspath(__file__))\r\n # 构建配置文件的绝对路径,指向 ubains-module-test 目录下的 config.json\r\n config_path = os.path.abspath(os.path.join(current_dir, '..', '..', 'config.json'))\r\n # 规范化路径,防止路径遍历攻击\r\n config_path = os.path.normpath(config_path)\r\n\r\n # 记录配置文件路径以便调试,对路径进行脱敏处理\r\n logging.info(f\"配置文件路径: {os.path.basename(config_path)}\")\r\n\r\n # 检查文件是否存在\r\n if not os.path.exists(config_path):\r\n # 如果配置文件不存在,则抛出异常\r\n raise FileNotFoundError(f\"配置文件 {config_path} 不存在\")\r\n\r\n try:\r\n # 读取配置文件\r\n with open(config_path, 'r', encoding='utf-8') as config_file:\r\n # 将配置文件内容解析为 JSON 格式\r\n config = json.load(config_file)\r\n # 根据 login_type 获取对应的登录 URL\r\n login_url = config.get(login_type)\r\n # 记录正在打开的登录页面类型和 URL\r\n logging.info(f\"正在打开 {login_type} 的登录页面:{login_url}\")\r\n except IOError as e:\r\n # 处理文件读取异常\r\n raise IOError(f\"读取配置文件失败: {e}\")\r\n except json.JSONDecodeError as e:\r\n # 处理 JSON 解析异常\r\n raise json.JSONDecodeError(f\"解析配置文件失败: {e}\")\r\n\r\n # 检查是否成功获取到 URL\r\n if not login_url:\r\n # 如果未找到对应的 URL,则抛出异常\r\n raise ValueError(f\"未找到对应的 URL 配置项: {login_type}\")\r\n\r\n # 返回登录 URL\r\n return login_url\r\n\r\n\r\ndef admin_login(username, password):\r\n \"\"\"\r\n 管理员登录函数。\r\n 该函数通过模拟用户输入用户名、密码和验证码,并点击登录按钮,以实现管理员登录。\r\n \"\"\"\r\n # 获取webdriver实例\r\n wd = GSTORE['wd']\r\n\r\n # 打印用户名输入信息\r\n INFO(f\"输入用户名:{username}\")\r\n # 向用户名输入框发送用户名\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='请输入账号或手机号或邮箱号']\"), f'{username}', wd)\r\n\r\n # 打印密码输入信息\r\n INFO(f\"输入密码:{password}\")\r\n # 向密码输入框发送密码\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='请输入密码']\"), f\"{password}\", wd)\r\n\r\n # 打印验证码输入信息\r\n INFO(\"输入验证码:csba\")\r\n # 向验证码输入框发送验证码\r\n safe_send_keys((By.XPATH, \"//input[@placeholder='请输入图形验证码']\"), \"csba\", wd)\r\n # 隐式等待5秒,以确保验证码被正确处理\r\n wd.implicitly_wait(5)\r\n\r\n # 打印登录按钮点击信息\r\n INFO(\"点击登录按钮\")\r\n # 点击登录按钮\r\n safe_click((By.XPATH, \"//input[@value='登 录']\"), wd)\r\n\r\ndef enter_the_backend():\r\n \"\"\"\r\n 进入后台系统界面。\r\n 该函数通过模拟点击操作,找到并点击后台系统入口,以进入后台界面。\r\n \"\"\"\r\n # 记录进入后台系统的操作信息\r\n INFO(\"进入后台\")\r\n\r\n # 获取webdriver对象,用于后续的页面元素操作\r\n wd = GSTORE['wd']\r\n\r\n # 执行点击操作,通过XPath定位到后台系统入口图标并点击\r\n safe_click((By.XPATH, \"//img[@title='后台系统']\"), wd)\r\n\r\ndef safe_send_keys(element_locator, value, wd):\r\n \"\"\"\r\n 安全地向网页元素发送键值。\r\n 该函数尝试在指定时间内找到指定的网页元素,如果找到并且元素可见,将先清除元素内容,然后发送指定的键值。\r\n 如果在指定时间内未能找到元素或元素不可点击,则捕获相应的异常并打印错误信息。\r\n 参数:\r\n element_locator (tuple): 用于定位网页元素的策略和定位器的元组,例如(By.ID, 'element_id')。\r\n value (str): 要发送到元素的键值字符串。\r\n wd: WebDriver实例,用于与浏览器交互。\r\n 异常处理:\r\n - TimeoutException: 如果元素在指定时间内未被找到或不可点击。\r\n - NoSuchElementException: 如果元素不存在。\r\n - ElementNotInteractableException: 如果元素存在但不可交互。\r\n \"\"\"\r\n try:\r\n # 等待元素在指定时间内可见\r\n element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))\r\n element.clear() # 清除元素的当前值\r\n element.send_keys(value) # 向元素发送指定的键值\r\n except TimeoutException:\r\n # 如果元素在指定时间内未被找到或不可点击,打印超时异常信息\r\n INFO(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n except NoSuchElementException:\r\n # 如果元素不存在,打印相应异常信息\r\n INFO(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n except ElementNotInteractableException:\r\n # 如果元素不可交互,打印相应异常信息\r\n INFO(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\ndef safe_click(element_locator, wd):\r\n \"\"\"\r\n 对其定位器指定的元素执行安全单击。\r\n 此函数尝试以处理潜在异常的方式单击元素。\r\n 它等待元素可见并可单击,然后再尝试单击它。\r\n 如果该元素在20秒内无法点击,或者它不存在,\r\n 或者不可交互,它会捕获相应的异常并记录一条信息性消息。\r\n 参数:\r\n -element_locator:要单击的元素的定位器,指定为元组。\r\n -wd:用于查找元素并与之交互的WebDriver实例。\r\n \"\"\"\r\n try:\r\n # Wait up to 20 seconds for the element to be visible\r\n element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))\r\n # Attempt to click the element\r\n element.click()\r\n except TimeoutException:\r\n # Log a message if the element is not found or not clickable within 20 seconds\r\n INFO(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n except NoSuchElementException:\r\n # Log a message if the element is not found\r\n INFO(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n except ElementNotInteractableException:\r\n # Log a message if the element is not interactable\r\n INFO(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\nfrom time import sleep\r\nfrom selenium.webdriver.common.by import By\r\n\r\ndef issue_send_and_upload(wd, issue_num, issue_name):\r\n \"\"\"\r\n 输入议题名称以及上传议题文件。\r\n \"\"\"\r\n\r\n issue_file_path = [\r\n r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\5.164Scan 安全报告.pdf\",\r\n r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\IdeaTop软件配置&操作说明文档.docx\",\r\n r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\ideaTop部署配置视频.mp4\",\r\n r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\IdeaTop软件配置&操作说明文档.docx\",\r\n r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\议题图片.png\"\r\n ]\r\n\r\n INFO(f\"输入议题名称:{issue_name}\")\r\n # 输入议题名称\r\n safe_send_keys((By.XPATH, f\"(//input[@placeholder='请输入会议议题'])[1]\"), f\"{issue_name}\", wd)\r\n\r\n # 选择议题文件进行上传\r\n INFO(\"点击【上传文件】按钮\")\r\n safe_click((By.XPATH, f\"(//div[@class='topicsHandleButton uploadFile'][contains(text(),'上传文件(0)')])[1]\"), wd)\r\n sleep(2)\r\n\r\n for i in range(issue_num):\r\n if not os.path.exists(issue_file_path[i]):\r\n INFO(f\"文件 {issue_file_path[i]} 不存在,跳出函数\")\r\n return\r\n\r\n # INFO(\"定位【选择文件】按钮\")\r\n upload_button = wd.find_element(By.XPATH, '//*[@id=\"global-uploader-btn\"]/input')\r\n sleep(2)\r\n # INFO(f\"元素定位:{upload_button}\")\r\n # INFO(\"选择议题文件上传\")\r\n upload_button.send_keys(issue_file_path[i])\r\n # INFO(f\"第{i+1}个议题文件上传完成\")\r\n sleep(15)\r\n\r\n SELENIUM_LOG_SCREEN(wd, \"50%\", \"Exhibit_Inspect\", \"Meeting_Message\", \"添加议题文件\")\r\n # 点击【确定】按钮\r\n safe_click((By.XPATH, \"//div[@aria-label='会议文件上传']//div[@class='el-dialog__footer']//div//span[contains(text(),'确定')]\"), wd)\r\n sleep(2)\r\n\r\ndef input_clear(element_locator, wd):\r\n \"\"\"\r\n 清空输入框中的文本。\r\n\r\n 该函数通过元素定位器找到指定的输入框,并尝试清空其内容。它使用显式等待来确保元素在尝试清除之前是可见的。\r\n\r\n 参数:\r\n - element_locator: 用于定位输入框的元素定位器,通常是一个元组,包含定位方法和定位表达式。\r\n - wd: WebDriver实例,用于与浏览器交互。\r\n\r\n 异常处理:\r\n - TimeoutException: 如果在指定时间内元素不可见,则捕获此异常并打印超时异常消息。\r\n - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的消息。\r\n - ElementNotInteractableException: 如果元素不可操作(例如,元素不可见或不可点击),则捕获此异常并打印相应消息。\r\n \"\"\"\r\n try:\r\n # 等待元素可见,并在可见后清空输入框。\r\n input_element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))\r\n input_element.clear()\r\n except TimeoutException:\r\n # 如果元素在20秒内不可见,打印超时异常消息。\r\n print(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n except NoSuchElementException:\r\n # 如果找不到元素,打印未找到元素的消息。\r\n print(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n except ElementNotInteractableException:\r\n # 如果元素不可操作,打印元素不可操作的消息。\r\n print(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\ndef send_keyboard(element_locator, wd):\r\n \"\"\"\r\n 向指定元素发送键盘事件。\r\n\r\n 该函数尝试找到页面上的一个元素,并向其发送RETURN键点击事件。它使用显式等待来确保元素在尝试交互之前是可见的。\r\n 如果在指定时间内元素不可见、不存在或不可交互,则捕获相应的异常并打印错误消息。\r\n\r\n 参数:\r\n - element_locator: 用于定位元素的元组,格式为(by, value)。例如,(By.ID, 'element_id')。\r\n - wd: WebDriver实例,用于与浏览器进行交互。\r\n\r\n 异常处理:\r\n - TimeoutException: 如果元素在20秒内不可见,则捕获此异常并打印超时错误消息。\r\n - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的错误消息。\r\n - ElementNotInteractableException: 如果元素不可交互(例如,被遮挡或不可点击),则捕获此异常并打印相应错误消息。\r\n \"\"\"\r\n try:\r\n # 等待元素可见,并在可见后向其发送RETURN键点击事件。\r\n element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))\r\n element.send_keys(Keys.RETURN)\r\n except TimeoutException:\r\n # 如果元素在指定时间内不可见,打印超时错误消息。\r\n print(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n except NoSuchElementException:\r\n # 如果找不到指定的元素,打印未找到元素的错误消息。\r\n print(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n except ElementNotInteractableException:\r\n # 如果元素不可交互,打印不可交互错误消息。\r\n print(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\ndef get_notify_text(wd,element_locator,module_name,function_name,name):\r\n \"\"\"\r\n 获取通知文本信息。\r\n\r\n 该函数通过WebDriver等待特定的元素出现,并截取其文本内容作为通知信息。此外,它还负责在获取通知文本后进行屏幕截图,\r\n 以便于后续的调试或记录需要。\r\n\r\n 参数:\r\n wd (WebDriver): 由上层传入的WebDriver对象,用于操作浏览器。\r\n\r\n 返回:\r\n str: 提取的通知文本信息。如果未能提取到信息或发生异常,则返回None。\r\n \"\"\"\r\n try:\r\n # 获取提示信息\r\n notify_text = WebDriverWait(wd, 60).until(\r\n EC.presence_of_element_located(element_locator)\r\n ).text\r\n # 屏幕截图\r\n SELENIUM_LOG_SCREEN(wd,\"50%\",module_name,function_name,name)\r\n return notify_text\r\n except Exception as e:\r\n # 记录异常信息\r\n INFO(f\"Exception occurred: {e}\")\r\n\r\ndef elment_get_text(element_locator, wd):\r\n \"\"\"\r\n 获取页面元素的文本。\r\n\r\n 该函数通过显式等待的方式,确保页面元素在20秒内可见并获取其文本。\r\n 如果在规定时间内元素不可见、不存在或不可交互,则会捕获相应的异常并打印错误信息。\r\n\r\n 参数:\r\n - element_locator: 用于定位页面元素的元组,通常包含定位方式和定位值。\r\n - wd: WebDriver对象,用于与浏览器进行交互。\r\n\r\n 返回:\r\n - element_text: 页面元素的文本。如果发生异常,则返回None。\r\n \"\"\"\r\n try:\r\n # 使用WebDriverWait等待页面元素在20秒内可见,并获取其文本。\r\n element_text = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)).text\r\n return element_text\r\n except TimeoutException:\r\n # 如果超过20秒元素仍未可见,则捕获TimeoutException异常并打印错误信息。\r\n print(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n except NoSuchElementException:\r\n # 如果页面上不存在该元素,则捕获NoSuchElementException异常并打印错误信息。\r\n print(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n except ElementNotInteractableException:\r\n # 如果元素存在但不可交互(例如被遮挡),则捕获ElementNotInteractableException异常并打印错误信息。\r\n print(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\ndef read_csv_data(csv_file_path):\r\n \"\"\"\r\n 读取CSV文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。\r\n\r\n 参数:\r\n csv_file_path (str): CSV文件的路径。\r\n\r\n 返回:\r\n list: 包含字典的列表,每个字典包含测试用例的名称和参数。\r\n \"\"\"\r\n # 打开CSV文件,使用只读模式,确保兼容性并处理编码\r\n with open(csv_file_path, mode='r', newline='', encoding='utf-8') as file:\r\n # 创建CSV阅读器\r\n reader = csv.reader(file)\r\n # 读取表头,为后续数据解析做准备\r\n headers = next(reader)\r\n ddt_cases = []\r\n # 遍历CSV文件中的每一行数据\r\n for row in reader:\r\n # 将每一行数据转换为字典,其中包含测试用例的名称和参数\r\n case = {\r\n 'name': row[0],\r\n 'para': row[1:]\r\n }\r\n # 将转换后的测试用例数据添加到列表中\r\n ddt_cases.append(case)\r\n # 日志记录:CSV文件已读取\r\n INFO(\"CSV文件已读取\")\r\n # 返回包含所有测试用例数据的列表\r\n return ddt_cases\r\n\r\nimport openpyxl\r\n\r\ndef read_xlsx_data(xlsx_file_path):\r\n \"\"\"\r\n 读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。\r\n\r\n 参数:\r\n xlsx_file_path (str): XLSX文件的路径。\r\n\r\n 返回:\r\n list: 包含字典的列表,每个字典包含测试用例的名称和参数。\r\n \"\"\"\r\n # 打开XLSX文件\r\n workbook = openpyxl.load_workbook(xlsx_file_path)\r\n # 假设数据在第一个工作表中\r\n sheet = workbook.active\r\n\r\n # 读取表头,从第三行开始\r\n headers = [cell.value for cell in sheet[3]]\r\n\r\n # 打印表头列名\r\n # print(f\"表头列名: {headers}\")\r\n\r\n # 找到表头中名为 'JSON' 的列索引\r\n try:\r\n json_index = headers.index('JSON')\r\n except ValueError as e:\r\n raise ValueError(f\"表头中没有找到所需的列: {e}\")\r\n\r\n ddt_cases = []\r\n # 遍历XLSX文件中的每一行数据,从第四行开始\r\n for row in sheet.iter_rows(min_row=4, values_only=True):\r\n # 获取 JSON 列的数据\r\n json_data = row[json_index]\r\n\r\n # 打印 JSON 数据以进行调试\r\n # print(f\"JSON 数据: {json_data}\")\r\n\r\n # 解析 JSON 字符串\r\n try:\r\n if json_data:\r\n parsed_json = json.loads(json_data)\r\n else:\r\n raise ValueError(\"JSON 数据为空\")\r\n except json.JSONDecodeError:\r\n raise ValueError(f\"无法解析 JSON 数据: {json_data}\")\r\n\r\n # 将解析后的 JSON 数据添加到列表中\r\n ddt_cases.append(parsed_json)\r\n\r\n # 日志记录:XLSX文件已读取\r\n INFO(\"XLSX文件已读取\")\r\n # 返回包含所有测试用例数据的列表\r\n return ddt_cases\r\n\r\ndef get_cpu_usage(interval=1):\r\n \"\"\"\r\n 获取当前进程的 CPU 占用率。\r\n :param interval: 计算 CPU 使用率的时间间隔(秒)\r\n :return: 当前进程的 CPU 占用率(百分比)\r\n \"\"\"\r\n try:\r\n process = psutil.Process(os.getpid())\r\n cpu_usage = process.cpu_percent(interval=interval)\r\n if isinstance(cpu_usage, (int, float)):\r\n return cpu_usage\r\n else:\r\n logging.error(\"CPU 使用率数据类型不正确\")\r\n return None\r\n except psutil.NoSuchProcess:\r\n logging.error(\"进程不存在\")\r\n return None\r\n except psutil.AccessDenied:\r\n logging.error(\"权限不足\")\r\n return None\r\n except Exception as e:\r\n logging.error(f\"未知错误: {e}\")\r\n return None\r\n\r\ndef delete_images_in_directory(directory):\r\n # 指定要删除的图片文件扩展名\r\n image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff']\r\n\r\n # 遍历目录中的所有文件\r\n for filename in os.listdir(directory):\r\n # 获取文件的完整路径\r\n file_path = os.path.join(directory, filename)\r\n\r\n # 检查文件是否为图片文件\r\n if any(filename.lower().endswith(ext) for ext in image_extensions):\r\n try:\r\n # 删除文件\r\n os.remove(file_path)\r\n print(f\"已删除文件: {file_path}\")\r\n except Exception as e:\r\n print(f\"删除文件 {file_path} 时出错: {e}\")\r\n\r\ndef is_valid_password(password):\r\n try:\r\n # 基本类型检查\r\n if not isinstance(password, str):\r\n raise ValueError(\"Password must be a string\")\r\n\r\n # 检查长度,密码至少需要11个字符\r\n if len(password) < 11:\r\n return False\r\n\r\n # 使用正则表达式检查密码是否包含大小写字母和数字\r\n if not re.match(r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\\d).{11,}$', password):\r\n return False\r\n\r\n # 检查连续3位及以上不重复且不连续组合\r\n for i in range(len(password) - 2):\r\n # 检查是否有连续3位相同\r\n if password[i] == password[i + 1] == password[i + 2]:\r\n return False\r\n\r\n # 检查是否有连续3位是连续字符(如123, abc)\r\n if abs(ord(password[i + 1]) - ord(password[i])) == 1 and abs(ord(password[i + 2]) - ord(password[i + 1])) == 1:\r\n return False\r\n\r\n return True\r\n except Exception as e:\r\n logging.error(f\"An error occurred: {e}\")\r\n return False\r\n\r\ndef browser_quit():\r\n \"\"\"\r\n 退出浏览器并释放资源。\r\n\r\n 该函数从全局存储中获取浏览器驱动实例,并调用其quit方法来关闭浏览器并释放相关资源。\r\n \"\"\"\r\n # 清除浏览器\r\n INFO(\"清除浏览器\")\r\n # 从全局存储中获取浏览器驱动实例\r\n wd = GSTORE['wd']\r\n # 调用浏览器驱动实例的quit方法,关闭浏览器并释放资源\r\n wd.quit()\r\n\r\n\r\ndef get_latest_report_file(report_dir, base_url):\r\n \"\"\"\r\n 获取指定目录下最新的HTML报告文件,并返回带有基础URL的完整路径。\r\n\r\n :param report_dir: 报告文件所在的目录\r\n :param base_url: 基础URL\r\n :return: 最新的HTML报告文件的完整URL,如果没有找到则返回None\r\n \"\"\"\r\n # 记录调用此函数的日志\r\n logging.info(\"开始调用get_latest_report_file函数获取报告文件\")\r\n\r\n # 确保报告目录存在\r\n if not os.path.exists(report_dir):\r\n logging.error(f\"报告目录 {report_dir} 不存在。\")\r\n return None\r\n\r\n # 获取指定目录下所有符合模式的HTML报告文件\r\n report_files = glob.glob(os.path.join(report_dir, 'report_*.html'))\r\n\r\n # 打印找到的文件列表\r\n logging.debug(f\"找到的报告文件: {report_files}\")\r\n\r\n # 如果没有找到报告文件,记录警告信息并返回None\r\n if not report_files:\r\n logging.warning(\"在指定目录中没有找到报告文件。\")\r\n return None\r\n\r\n # 找到最新修改的报告文件\r\n latest_file = max(report_files, key=os.path.getmtime)\r\n\r\n # 获取最新报告文件的最后修改时间\r\n last_modified_time = datetime.fromtimestamp(os.path.getmtime(latest_file)).strftime('%Y-%m-%d %H:%M:%S')\r\n\r\n # 记录最新报告文件的信息\r\n logging.info(f\"最新报告文件: {latest_file}, 最后修改时间: {last_modified_time}\")\r\n\r\n # 将文件路径转换为相对于基础URL的相对路径\r\n relative_path = os.path.relpath(latest_file, report_dir)\r\n\r\n # 生成完整的URL\r\n full_url = f\"{base_url}/{relative_path}\".replace(\"\\\\\", \"/\")\r\n\r\n # 返回完整的URL\r\n return full_url\r\n\r\ndef dingding_send_message(latest_report, title, mobile, ding_type):\r\n \"\"\"\r\n 发送钉钉机器人消息\r\n 参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x\r\n\r\n :param latest_report: 测试报告链接\r\n :param title: 消息标题\r\n :param text: 消息内容\r\n :param mobile: 需要@的手机号列表\r\n \"\"\"\r\n # 记录调用此函数的日志\r\n logging.info(\"开始构建并发送钉钉机器人消息\")\r\n # 钉钉机器人的 Webhook URL 和密钥(正式环境)\r\n # webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=b0eea0bbf097ce3badb4c832d2cd0267a50486f395ec8beca6e2042102bb295b'\r\n # secret = 'SEC928b11659c5fd6476cfa2042edbf56da876abf759289f7e4d3c671fb9a81bf43'\r\n # 钉钉机器人的 Webhook URL 和密钥(测试环境)\r\n\r\n if ding_type == '标准版巡检':\r\n webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'\r\n secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'\r\n elif ding_type == '展厅巡检':\r\n webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=061b6e9b1ae436f356cfda7fe19b6e58e46b62670046a78bd3a4d869118c612d'\r\n secret = 'SEC93212bd880aad638cc0df2b28a72ef4fdf6651cacb8a6a4bc71dcf09705d458d'\r\n\r\n # 生成时间戳\r\n timestamp = str(round(time.time() * 1000))\r\n\r\n # 生成签名\r\n secret_enc = secret.encode('utf-8')\r\n string_to_sign = f'{timestamp}\\n{secret}'\r\n string_to_sign_enc = string_to_sign.encode('utf-8')\r\n hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()\r\n sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))\r\n\r\n # 构建最终的 Webhook URL\r\n params = {\r\n 'access_token': webhook_url.split('=')[1],\r\n 'timestamp': timestamp,\r\n 'sign': sign\r\n }\r\n encoded_params = urllib.parse.urlencode(params)\r\n final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}'\r\n\r\n # 记录最终的 Webhook URL\r\n logging.info(f\"钉钉机器人Webhook URL: {final_webhook_url}\")\r\n\r\n # 调用测试结果获取函数\r\n browser_init(\"标准版预定系统\")\r\n wd = GSTORE['wd']\r\n # print(latest_report)\r\n test_result = get_test_result(latest_report, wd)\r\n browser_quit()\r\n\r\n # 构建消息体\r\n headers = {'Content-Type': 'application/json'}\r\n message = {\r\n 'msgtype': 'link',\r\n 'link': {\r\n 'title': title,\r\n 'messageUrl': latest_report,\r\n 'text': f\"通过:{test_result['pass_percent']}\" + f\"失败:{test_result['fail_percent']}\" + f\"异常:{test_result['exception_percent']}\",\r\n },\r\n \"at\": {\r\n \"atMobiles\": [mobile],\r\n \"isAtAll\": True\r\n }\r\n }\r\n\r\n try:\r\n # 发送 POST 请求\r\n response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers)\r\n\r\n # 检查响应状态码\r\n if response.status_code == 200:\r\n logging.info('消息发送成功!')\r\n logging.info(f'响应内容: {response.text}')\r\n else:\r\n logging.error(f'消息发送失败,状态码: {response.status_code}')\r\n logging.error(f'响应内容: {response.text}')\r\n except requests.exceptions.RequestException as e:\r\n logging.error(f'请求异常: {e}')\r\n\r\n\r\ndef run_automation_test(report_title, report_url_prefix, test_case , ding_type):\r\n \"\"\"\r\n 运行自动化测试并生成报告。\r\n\r\n 参数:\r\n - report_title: 报告的标题\r\n - report_url_prefix: 报告URL的前缀\r\n - test_case: 测试用例脚本执行的标签名\r\n \"\"\"\r\n # 记录测试开始的日志\r\n logging.info(\"开始自动化测试...\")\r\n\r\n # 构建运行测试命令\r\n command = [\r\n 'hytest',\r\n '--report_title', report_title,\r\n '--report_url_prefix', report_url_prefix,\r\n '--tag', test_case\r\n ]\r\n\r\n # 记录将要执行的命令日志\r\n logging.info(f\"执行命令: {' '.join(command)}\")\r\n\r\n try:\r\n # 执行测试命令并获取结果\r\n result = subprocess.run(command, capture_output=True, text=True, check=True)\r\n\r\n # 记录命令的标准输出和错误输出\r\n logging.debug(f\"命令标准输出: {result.stdout}\")\r\n logging.debug(f\"命令错误输出: {result.stderr}\")\r\n except subprocess.CalledProcessError as e:\r\n # 处理子进程调用失败的异常\r\n logging.error(f\"命令执行失败,返回码 {e.returncode}: {e.output}\")\r\n except OSError as e:\r\n # 处理操作系统相关的异常\r\n logging.error(f\"发生操作系统错误: {e}\")\r\n finally:\r\n # 无论测试是否成功,都记录测试结束的日志\r\n logging.info(\"自动化测试完成。\")\r\n\r\n # 调用回调函数处理后续操作\r\n get_reportfile_send_dingding(f\"{report_title}\", report_url_prefix, ding_type)\r\n\r\ndef get_reportfile_send_dingding(report_title, report_url_prefix, ding_type):\r\n # print(GSTORE['case_pass'])\r\n try:\r\n # 获取报告文件所在的目录\r\n report_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..','reports')\r\n\r\n # 获取基础URLs\r\n base_url = report_url_prefix\r\n\r\n # 获取最新的报告文件\r\n latest_report = get_latest_report_file(report_dir, base_url)\r\n\r\n # 如果找到了最新的报告文件,则发送报告链接到钉钉\r\n if latest_report:\r\n logging.info(f\"最新报告文件URL: {latest_report}\")\r\n\r\n try:\r\n # 记录调用钉钉消息通知函数的日志\r\n logging.info(\"开始调用钉钉消息通知函数\")\r\n\r\n # 调用钉钉发送消息接口进行推送测试报告链接\r\n dingding_send_message(latest_report, report_title, \"13724387318\", ding_type)\r\n\r\n # 记录钉钉消息通知函数调用成功的日志\r\n logging.info(\"钉钉消息通知函数调用成功\")\r\n except Exception as e:\r\n # 记录钉钉消息通知函数调用失败的日志\r\n logging.error(f\"钉钉消息通知函数调用失败: {e}\")\r\n else:\r\n # 记录没有找到报告文件的日志\r\n logging.warning(\"没有找到报告文件以发送。\")\r\n\r\n except subprocess.CalledProcessError as e:\r\n # 处理子进程调用失败的异常\r\n logging.error(f\"命令执行失败,返回码 {e.returncode}: {e.output}\")\r\n except OSError as e:\r\n # 处理操作系统相关的异常\r\n logging.error(f\"发生操作系统错误: {e}\")\r\n finally:\r\n # 无论是否成功,都记录测试结束的日志\r\n logging.info(\"自动化测试完成。\")\r\n\r\n# if __name__ == '__main__':\r\n# get_reportfile_send_dingding(\"测试\",\"http://192.168.1.166\")\r\n\r\nfrom selenium.webdriver.common.action_chains import ActionChains\r\nfrom selenium.webdriver.support.ui import WebDriverWait\r\nfrom selenium.webdriver.support import expected_conditions as EC\r\nfrom selenium.common.exceptions import TimeoutException, NoSuchElementException\r\nimport logging\r\n\r\ndef single_click_and_drag(source_element_locator, target_element_locator, wd):\r\n \"\"\"\r\n 实现元素从source_element单击后拖拽到target_element的功能\r\n\r\n :param wd: WebDriver实例\r\n :param source_element_locator: 拖拽起始元素的定位器\r\n :param target_element_locator: 拖拽目标元素的定位器\r\n \"\"\"\r\n try:\r\n # 等待页面完全加载\r\n sleep(3)\r\n\r\n # 找到源元素和目标元素\r\n source_element = WebDriverWait(wd, 120).until(EC.element_to_be_clickable(source_element_locator))\r\n target_element = WebDriverWait(wd, 120).until(EC.element_to_be_clickable(target_element_locator))\r\n\r\n # 使用ActionChains执行单击并拖拽操作\r\n actions = ActionChains(wd)\r\n actions.click_and_hold(source_element) # 单击并按住源元素\r\n actions.move_to_element(target_element) # 移动到目标元素\r\n actions.release() # 释放鼠标\r\n actions.perform()\r\n\r\n logging.info(\"单击并拖拽操作成功\")\r\n except TimeoutException as e:\r\n logging.error(f\"元素查找超时: {e}\")\r\n except NoSuchElementException as e:\r\n logging.error(f\"元素未找到: {e}\")\r\n except Exception as e:\r\n logging.error(f\"发生未知错误: {e}\")\r\n\r\nimport requests\r\nimport os\r\nimport chardet\r\nfrom urllib3.exceptions import InsecureRequestWarning\r\n\r\n# 禁用 InsecureRequestWarning 警告\r\nrequests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)\r\n\r\ndef fetch_and_parse_check_txt(url, save_path, extract_info):\r\n \"\"\"\r\n 获取check.txt文件并解析指定信息\r\n\r\n :param url: check.txt文件的URL\r\n :param save_path: 文件保存路径\r\n :param extract_info: 需要提取的信息列表,例如 ['[m]ysql', '[r]edis', '[f]dfs_storaged', '[f]dfs_tracker', '[e]mqx', 'ubains-meeting-api-1.0-SNAPSHOT.jar', 'ubains-meeting-inner-api-1.0-SNAPSHOT.jar', 'uwsgi']\r\n :return: 提取的信息字典\r\n \"\"\"\r\n try:\r\n # 发送HTTPS请求获取文件内容\r\n response = requests.get(url, verify=False) # verify=False 忽略SSL证书验证,生产环境不推荐\r\n response.raise_for_status() # 如果响应状态码不是200,抛出异常\r\n\r\n # 检测文件编码\r\n detected_encoding = chardet.detect(response.content)['encoding']\r\n logging.info(f\"检测到的编码: {detected_encoding}\")\r\n\r\n # 如果检测到的编码为空或不准确,可以手动指定编码\r\n if not detected_encoding or detected_encoding == 'ascii':\r\n detected_encoding = 'utf-8' # 假设文件编码为 utf-8\r\n\r\n # 将响应内容解码为字符串\r\n content = response.content.decode(detected_encoding)\r\n\r\n # 将文件内容保存到指定目录\r\n with open(save_path, 'w', encoding='utf-8') as file:\r\n file.write(content)\r\n\r\n # 解析文件内容\r\n parsed_info = {}\r\n for line in content.split('\\n'):\r\n for info in extract_info:\r\n if info in line and info not in parsed_info:\r\n service_name = info\r\n service_status = line.split(info, 1)[1].strip()\r\n parsed_info[service_name] = service_status\r\n break # 找到后跳出内层循环,继续处理下一行\r\n\r\n return parsed_info\r\n\r\n except requests.exceptions.RequestException as e:\r\n logging.exception(f\"请求错误: {e}\")\r\n return None\r\n\r\nimport telnetlib\r\n\r\ndef check_service_status(host, port):\r\n \"\"\"\r\n 检查服务状态。\r\n\r\n 通过尝试连接到指定的主机和端口来检查服务是否可用。\r\n\r\n 参数:\r\n - host: 服务所在的主机地址。\r\n - port: 服务监听的端口。\r\n\r\n 返回值:\r\n 无\r\n \"\"\"\r\n try:\r\n # 创建Telnet对象并连接到服务器\r\n tn = telnetlib.Telnet(host, port)\r\n INFO(f\"成功连接到 {host}:{port}\")\r\n\r\n # 可以在这里发送命令或读取响应\r\n # 例如,读取服务器的欢迎信息\r\n response = tn.read_until(b\"\\n\", timeout=5)\r\n INFO(f\"服务器响应: {response.decode('ascii')}\")\r\n\r\n # 关闭连接\r\n tn.close()\r\n except Exception as e:\r\n INFO(f\"连接失败: {e}\")\r\n\r\n\r\ndef set_tx_meeting_id(element_locator, wd):\r\n tx_meeting_id = elment_get_text(element_locator, wd)\r\n GSTORE['tx_meeting_id'] = tx_meeting_id\r\n\r\ndef get_tx_meeting_id():\r\n return GSTORE.get('tx_meeting_id')\r\n\r\ndef voice_device_register(app_id, app_secret, device_sn):\r\n \"\"\"\r\n 注册语音设备。\r\n\r\n 向指定的API发送POST请求,以注册一个语音设备。需要提供应用的ID和密钥,以及设备的序列号。\r\n\r\n 参数:\r\n app_id (str): 应用的唯一标识符。\r\n app_secret (str): 应用的秘密密钥。\r\n device_sn (str): 设备的序列号。\r\n\r\n 返回:\r\n 无直接返回值,但会记录请求结果到日志。\r\n \"\"\"\r\n # 构建请求体,包含注册所需的必要信息\r\n body = {\r\n \"app_id\": app_id,\r\n \"app_secret\": app_secret,\r\n \"device_sn\": device_sn\r\n }\r\n\r\n # 设置请求头,指定内容类型为JSON\r\n headers = {\r\n \"Content-Type\": \"application/json\"\r\n }\r\n\r\n try:\r\n # 发送POST请求到注册URL,请求体序列化为JSON格式\r\n response = requests.post(\"https://wdev.wmj.com.cn/deviceApi/register\", headers=headers, data=json.dumps(body))\r\n # 检查HTTP响应状态码,确保请求成功\r\n response.raise_for_status()\r\n # 解析响应的JSON内容\r\n response_json = response.json()\r\n # 记录成功的请求日志\r\n logging.info(\"请求成功: %s\", response_json)\r\n except requests.exceptions.RequestException as e:\r\n # 处理请求异常,记录错误日志\r\n logging.error(\"请求失败: %s\", e)\r\n except ValueError as e:\r\n # 处理解析响应异常,记录错误日志\r\n logging.error(\"解析响应失败: %s\", e)\r\n\r\n\r\nimport requests\r\n\r\n\r\ndef cloud_voice_setting(app_id, app_secret, device_sn):\r\n \"\"\"\r\n 设置云语音功能。\r\n\r\n :param app_id: 应用ID\r\n :param app_secret: 应用密钥\r\n :param device_sn: 设备序列号\r\n :return: 服务器响应结果\r\n \"\"\"\r\n url = \"https://wdev.wmj.com.cn/deviceApi/send\"\r\n\r\n # 写死的data参数\r\n data = {\r\n \"cmd_type\": \"setting\",\r\n \"info\": {\r\n \"volume\": 10, # 0-9,音量由小到大,默认为中间值\r\n \"speed\": 2, # 0-9,语速由慢到快,默认为中间值正常语速\r\n \"tone\": 4, # 0-9,语调由低到高,默认为中间值正常语调\r\n \"speaker\": 0 # 0为女生,支持中英文\r\n }\r\n }\r\n\r\n # 构建请求体\r\n payload = {\r\n \"app_id\": app_id,\r\n \"app_secret\": app_secret,\r\n \"device_sn\": device_sn,\r\n \"data\": data\r\n }\r\n\r\n # 发送POST请求\r\n try:\r\n response = requests.post(url, json=payload)\r\n response.raise_for_status() # 如果响应状态码不是200,抛出异常\r\n logging.info(response.json()) # 打印响应的JSON数据\r\n except requests.exceptions.RequestException as e:\r\n logging.error({\"status\": \"error\", \"message\": str(e)})\r\n\r\n# 示例调用\r\n# if __name__ == \"__main__\":\r\n# app_id = os.getenv(\"APP_ID\", \"a98a124c6c3252f6612fc544a0d0fa79\")\r\n# app_secret = os.getenv(\"APP_SECRET\", \"88bc1ec4eba624f47b2200a4ce8c3852\")\r\n# device_sn = os.getenv(\"DEVICE_SN\", \"W703BB44444\")\r\n# cloud_voice_setting(app_id, app_secret, device_sn)\r\n\r\n\r\ndef play_cloud_voice(app_id, app_secret, device_sn):\r\n \"\"\"\r\n 播放云语音功能。\r\n\r\n 本函数通过发送HTTP POST请求,触发远程语音设备播放指定的语音内容。\r\n \"\"\"\r\n # 注册设备\r\n voice_device_register(app_id, app_secret, device_sn)\r\n sleep(5) # 可以考虑使用异步编程或非阻塞的方式替代\r\n\r\n # 设置云喇叭的音量以及语速参数\r\n cloud_voice_setting(app_id, app_secret, device_sn)\r\n sleep(10)\r\n\r\n # 定义请求URL\r\n url = os.getenv(\"CLOUD_VOICE_API_URL\", \"https://wdev.wmj.com.cn/deviceApi/send\")\r\n\r\n # 构建请求体,包括应用ID、应用密钥、设备序列号和语音播放指令\r\n body = {\r\n \"app_id\": app_id,\r\n \"app_secret\": app_secret,\r\n \"device_sn\": device_sn,\r\n \"data\": {\r\n \"cmd_type\": \"play\",\r\n \"info\": {\r\n \"tts\": \"一、二、三、四、五、六、七、八、九、十 一、二、三、四、五、六、七、八、九、十 一、二、三、四、五、六、七、八、九、十、一、二、三、四、五、六、七、八、九、十\",\r\n \"inner\": 10, # wifi版特有\r\n \"volume\": 5 # 4G版本1-7,wifi版1-10\r\n }\r\n }\r\n }\r\n\r\n # 设置请求头,指定内容类型为JSON\r\n headers = {\r\n \"Content-Type\": \"application/json\"\r\n }\r\n\r\n try:\r\n # 发送POST请求\r\n response = requests.post(url, headers=headers, data=json.dumps(body))\r\n\r\n # 根据响应状态码判断请求是否成功\r\n if response.status_code == 200:\r\n logging.info(f\"请求成功: {response.json()}\")\r\n else:\r\n logging.error(f\"请求失败: 状态码 {response.status_code}, 响应内容 {response.text}\")\r\n\r\n except requests.exceptions.RequestException as e:\r\n logging.error(f\"请求过程中发生异常: {e}\")\r\n except json.JSONDecodeError as e:\r\n logging.error(f\"JSON解析失败: {e}\")\r\n except Exception as e:\r\n logging.error(f\"发生未知异常: {e}\")\r\n\r\n# # 示例调用\r\n# if __name__ == \"__main__\":\r\n# app_id = os.getenv(\"APP_ID\", \"a98a124c6c3252f6612fc544a0d0fa79\")\r\n# app_secret = os.getenv(\"APP_SECRET\", \"88bc1ec4eba624f47b2200a4ce8c3852\")\r\n# device_sn = os.getenv(\"DEVICE_SN\", \"W703BB44444\")\r\n# play_cloud_voice(app_id, app_secret, device_sn)\r\n\r\nimport logging\r\nimport re\r\nfrom selenium.webdriver.common.by import By\r\n\r\ndef get_test_result(latest_report, wd):\r\n \"\"\"\r\n 获取测试结果页面的通过率、失败率和异常率\r\n\r\n :param latest_report: 测试结果页面的URL\r\n :param wd: WebDriver实例,用于访问和操作网页\r\n :return: 包含通过率、失败率和异常率的字典\r\n \"\"\"\r\n # 初始化测试结果字典\r\n test_result = {\r\n \"pass_percent\": \"\",\r\n \"fail_percent\": \"\",\r\n \"exception_percent\": \"\"\r\n }\r\n\r\n # 访问测试结果页面\r\n wd.get(latest_report)\r\n sleep(5)\r\n\r\n # 点击简略显示\r\n safe_click((By.XPATH,\"//div[@id='display_mode']\"), wd)\r\n sleep(5)\r\n\r\n # 定义一个函数来获取和解析百分比\r\n def get_percentage(selector, wd):\r\n text = elment_get_text(selector, wd)\r\n logging.info(f\"获取的文本:{text}\")\r\n match = re.search(r'(\\d+(\\.\\d+)?)%', text)\r\n if match:\r\n return match.group(0)\r\n else:\r\n logging.error(f\"未找到百分比匹配项,文本内容: {text}\")\r\n return \"0\"\r\n\r\n # 获取通过率\r\n pass_percent = get_percentage((By.CSS_SELECTOR, \"div[class='result_barchart'] div:nth-child(1) span:nth-child(1)\"), wd)\r\n test_result[\"pass_percent\"] = pass_percent\r\n\r\n # 获取失败率\r\n fail_percent = get_percentage(\r\n (By.CSS_SELECTOR, \"body > div.main_section > div.result > div > div:nth-child(2) > span\"), wd)\r\n test_result[\"fail_percent\"] = fail_percent\r\n\r\n # 获取异常率\r\n exception_percent = get_percentage(\r\n (By.CSS_SELECTOR, \"body > div.main_section > div.result > div > div:nth-child(3) > span\"), wd)\r\n test_result[\"exception_percent\"] = exception_percent\r\n # 输出test_result\r\n logging.info(test_result)\r\n print(test_result)\r\n sleep(5)\r\n\r\n # 返回测试结果字典\r\n return test_result\r\n\r\n# if __name__ == \"__main__\":\r\n# browser_init(\"展厅预定巡检\")\r\n# wd = GSTORE['wd']\r\n# test_result = get_test_result(\"http://nat.ubainsyun.com:31134/report_20250217_094401.html\",wd)\r\n# print(test_result)\r\n\r\nimport socket\r\nimport re\r\nimport subprocess\r\n\r\n\r\n# 获取本机IP地址\r\ndef get_local_ip():\r\n try:\r\n # 创建一个UDP套接字\r\n sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r\n # 连接到一个公共的IP地址和端口\r\n sock.connect((\"8.8.8.8\", 80))\r\n # 获取本地IP地址\r\n local_ip = sock.getsockname()[0]\r\n finally:\r\n sock.close()\r\n return local_ip\r\n\r\n\r\nimport yaml\r\nimport logging\r\nimport socket\r\nimport subprocess\r\n\r\n# 获取本机IP地址\r\ndef get_local_ip():\r\n try:\r\n # 创建一个UDP套接字\r\n sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r\n # 连接到一个公共的IP地址和端口\r\n sock.connect((\"8.8.8.8\", 80))\r\n # 获取本地IP地址\r\n local_ip = sock.getsockname()[0]\r\n finally:\r\n sock.close()\r\n return local_ip\r\n\r\n# 更新ngrok.cfg文件中的IP地址\r\ndef update_ngrok_config(config_path, new_ip):\r\n try:\r\n with open(config_path, 'r', encoding='utf-8') as file:\r\n config = yaml.safe_load(file)\r\n\r\n # 更新IP地址\r\n config['tunnels']['nat1']['proto']['tcp'] = f\"{new_ip}:80\"\r\n\r\n with open(config_path, 'w', encoding='utf-8') as file:\r\n yaml.safe_dump(config, file, default_flow_style=False, allow_unicode=True)\r\n\r\n logging.info(f\"ngrok.cfg 更新成功,新的IP地址为: {new_ip}\")\r\n except Exception as e:\r\n logging.error(f\"更新ngrok.cfg文件时出错: {e}\")\r\n\r\n# 启动ngrok\r\ndef start_ngrok(ngrok_path, config_path):\r\n try:\r\n # 终止已运行的ngrok进程\r\n kill_ngrok()\r\n\r\n command = [ngrok_path, '-config', config_path, 'start', 'nat1']\r\n subprocess.Popen(command, shell=True)\r\n logging.info(f\"ngrok 启动成功\")\r\n except Exception as e:\r\n logging.error(f\"启动ngrok时出错: {e}\")\r\n\r\ndef kill_ngrok():\r\n try:\r\n # 使用 taskkill 命令终止所有 ngrok 进程\r\n subprocess.run(['taskkill', '/F', '/IM', 'ngrok.exe'], check=True)\r\n logging.info(\"终止所有 ngrok 进程成功\")\r\n except subprocess.CalledProcessError as e:\r\n logging.info(\"没有找到 ngrok 进程\")\r\n except Exception as e:\r\n logging.error(f\"终止 ngrok 进程时出错: {e}\")\r\n\r\n# if __name__ == '__main__':\r\n# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\r\n#\r\n# # 获取本机IP地址\r\n# local_ip = get_local_ip()\r\n# logging.info(f\"本机IP地址: {local_ip}\")\r\n#\r\n# # 更新ngrok.cfg文件\r\n# ngrok_config_path = r'D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\ngrok\\ngrok-调试主机\\ngrok.cfg'\r\n# update_ngrok_config(ngrok_config_path, local_ip)\r\n#\r\n# # 启动ngrok\r\n# ngrok_path = r'D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\ngrok\\ngrok-调试主机\\ngrok.exe'\r\n# start_ngrok(ngrok_path, ngrok_config_path)\r\n\r\n# # 定义执行终端命令的函数\r\n# def run_http_server():\r\n# try:\r\n# # 构建命令\r\n# command = [\r\n# 'cd', r'D:\\GithubData\\自动化\\ubains-module-test\\预定系统',\r\n# '&&', 'python', '-m', 'http.server', '81', '--directory', 'reports'\r\n# ]\r\n#\r\n# # 运行命令\r\n# process = subprocess.Popen(command, shell=True)\r\n# logging.info(\"HTTP 服务器启动成功\")\r\n# except Exception as e:\r\n# logging.error(f\"启动HTTP服务器时出错: {e}\")\r\n#\r\n# if __name__ == '__main__':\r\n# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\r\n# run_http_server()\r\n\r\ndef user_login(element_locators, username, password, verifycode):\r\n \"\"\"\r\n 管理员登录函数。\r\n 该函数通过模拟用户输入用户名、密码和验证码,并点击登录按钮,以实现管理员登录。\r\n \"\"\"\r\n # 获取webdriver实例\r\n wd = GSTORE['wd']\r\n\r\n # 打印用户名输入信息\r\n INFO(f\"输入用户名:{username}\")\r\n # 向用户名输入框发送用户名\r\n safe_send_keys(element_locators['username_locator'], f'{username}', wd)\r\n sleep(5)\r\n\r\n # 打印密码输入信息\r\n INFO(f\"输入密码:{password}\")\r\n # 向密码输入框发送密码\r\n safe_send_keys(element_locators['password_locator'], f\"{password}\", wd)\r\n sleep(5)\r\n\r\n # 打印验证码输入信息\r\n INFO(f\"输入验证码:{verifycode}\")\r\n # 向验证码输入验证码\r\n safe_send_keys(element_locators['verifycode_locator'], f\"{verifycode}\", wd)\r\n sleep(5)\r\n\r\n #点击登录按钮\r\n INFO(\"点击登录按钮\")\r\n safe_click(element_locators['submitButton_locator'], wd)\r\n sleep(5)\r\n\r\ndef read_xlsx_data(xlsx_file_path):\r\n \"\"\"\r\n 读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。\r\n\r\n 参数:\r\n xlsx_file_path (str): XLSX文件的路径。\r\n\r\n 返回:\r\n list: 包含字典的列表,每个字典包含测试用例的名称和参数。\r\n \"\"\"\r\n # 打开XLSX文件\r\n workbook = openpyxl.load_workbook(xlsx_file_path)\r\n # 假设数据在第一个工作表中\r\n sheet = workbook.active\r\n\r\n # 读取表头,从第三行开始\r\n headers = [cell.value for cell in sheet[3]]\r\n\r\n # 打印表头列名\r\n # print(f\"表头列名: {headers}\")\r\n\r\n # 找到表头中名为 'JSON' 的列索引\r\n try:\r\n json_index = headers.index('JSON')\r\n except ValueError as e:\r\n raise ValueError(f\"表头中没有找到所需的列: {e}\")\r\n\r\n ddt_cases = []\r\n # 遍历XLSX文件中的每一行数据,从第四行开始\r\n for row in sheet.iter_rows(min_row=4, values_only=True):\r\n # 获取 JSON 列的数据\r\n json_data = row[json_index]\r\n\r\n # 打印 JSON 数据以进行调试\r\n # print(f\"JSON 数据: {json_data}\")\r\n\r\n # 解析 JSON 字符串\r\n try:\r\n if json_data:\r\n parsed_json = json.loads(json_data)\r\n else:\r\n raise ValueError(\"JSON 数据为空\")\r\n except json.JSONDecodeError:\r\n raise ValueError(f\"无法解析 JSON 数据: {json_data}\")\r\n\r\n # 将解析后的 JSON 数据添加到列表中\r\n ddt_cases.append(parsed_json)\r\n\r\n # 日志记录:XLSX文件已读取\r\n INFO(\"XLSX文件已读取\")\r\n # 返回包含所有测试用例数据的列表\r\n return ddt_cases\r\n\r\ndef get_by_enum(type_str):\r\n \"\"\"\r\n 将字符串类型的定位器类型转换为 selenium.webdriver.common.by.By 枚举类型。\r\n\r\n 参数:\r\n type_str (str): 定位器类型字符串,例如 'XPATH'。\r\n\r\n 返回:\r\n selenium.webdriver.common.by.By: 对应的 By 枚举类型。\r\n \"\"\"\r\n type_str = type_str.upper()\r\n if type_str == 'XPATH':\r\n return By.XPATH\r\n elif type_str == 'ID':\r\n return By.ID\r\n elif type_str == 'NAME':\r\n return By.NAME\r\n elif type_str == 'CLASS_NAME':\r\n return By.CLASS_NAME\r\n elif type_str == 'CSS_SELECTOR':\r\n return By.CSS_SELECTOR\r\n elif type_str == 'TAG_NAME':\r\n return By.TAG_NAME\r\n elif type_str == 'LINK_TEXT':\r\n return By.LINK_TEXT\r\n elif type_str == 'PARTIAL_LINK_TEXT':\r\n return By.PARTIAL_LINK_TEXT\r\n else:\r\n raise ValueError(f\"未知的定位器类型: {type_str}\") Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/预定系统/Base/base.py b/预定系统/Base/base.py --- a/预定系统/Base/base.py (revision 6cedec8cd6072a6338b10f2314cd65d7c35c57f7) +++ b/预定系统/Base/base.py (date 1741230068869) @@ -909,7 +909,6 @@ except Exception as e: INFO(f"连接失败: {e}") - def set_tx_meeting_id(element_locator, wd): tx_meeting_id = elment_get_text(element_locator, wd) GSTORE['tx_meeting_id'] = tx_meeting_id diff --git a/运维集控/项目测试/运维标准版/cases/08设备管理/02编辑设备.py b/运维集控/项目测试/运维标准版/cases/08设备管理/02编辑设备.py new file mode 100644 diff --git a/运维集控/项目测试/运维标准版/cases/08设备管理/04删除设备.py b/运维集控/项目测试/运维标准版/cases/08设备管理/04删除设备.py new file mode 100644 diff --git a/运维集控/项目测试/运维标准版/cases/08设备管理/03查询设备.py b/运维集控/项目测试/运维标准版/cases/08设备管理/03查询设备.py new file mode 100644 diff --git a/运维集控/项目测试/运维标准版/cases/07类型标签/03删除标签.py b/运维集控/项目测试/运维标准版/cases/07类型标签/03删除标签.py new file mode 100644 diff --git a/运维集控/项目测试/运维标准版/cases/07类型标签/02编辑标签.py b/运维集控/项目测试/运维标准版/cases/07类型标签/02编辑标签.py new file mode 100644