Index: 运维集控/项目测试/运维标准版/cases/08设备管理/01新增设备.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/运维集控/项目测试/运维标准版/cases/08设备管理/01新增设备.py b/运维集控/项目测试/运维标准版/cases/08设备管理/01新增设备.py
new file mode 100644
--- /dev/null	(date 1741229657204)
+++ b/运维集控/项目测试/运维标准版/cases/08设备管理/01新增设备.py	(date 1741229657204)
@@ -0,0 +1,108 @@
+import sys
+import os
+
+from hytest.common import SELENIUM_LOG_SCREEN
+
+sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', '..')))
+from 运维集控.项目测试.运维标准版.lib.base import *
+
+# 构建 CSV 文件的绝对路径
+csv_path = os.path.abspath(
+    os.path.join(os.path.dirname(__file__), '..', '..', 'testdata', '08设备管理', '新增设备.csv'))
+
+class AreafuntionAdd:
+    tag = ['新增设备']
+    ddt_cases = read_csv_data(csv_path)
+    def teststeps(self):
+        wd = GSTORE['wd']
+
+        #从self.para中解构出数据
+        name = self.name
+        area_group, area_type, area_name, area_ip, remark, info = self.para
+
+        STEP(1, '点击新增按钮')
+        areafuntion_add = WebDriverWait(wd, 10).until(
+            EC.element_to_be_clickable(
+                (By.XPATH, "//div[@class='company-edmit-right']//span[contains(text(),'新增')]"))
+        )
+        areafuntion_add.click()
+        sleep(1)
+
+        STEP(2, f'查找并选择区域分组:{area_group}')
+        if area_group:
+            area_group_input = WebDriverWait(wd, 10).until(
+                EC.presence_of_element_located(
+                    (By.XPATH, "//div[@class='dialog_input']//input[@placeholder='请选择分组']"))
+            )
+            area_group_input.send_keys(area_group)
+            #默认选择第一个分组
+            areagroup_select = WebDriverWait(wd, 10).until(
+                EC.presence_of_element_located(
+                    (
+                    By.XPATH, "//li[@class='el-cascader__suggestion-item']"))
+            )
+            sleep(1)
+            areagroup_select.click()
+        else:
+            print("group_name 为空,不执行选择区域分组的操作")
+
+        STEP(3, f'选择区域类型:{area_type}')
+        if area_type:
+            area_type_input = WebDriverWait(wd, 10).until(
+                EC.presence_of_element_located(
+                    (By.XPATH, "//div[@class='dialog_input']//input[@placeholder='请选择区域类型']"))
+            )
+            area_type_input.send_keys(area_type)
+            #默认选择第一个区域类型
+            areatype_select = WebDriverWait(wd, 10).until(
+                EC.presence_of_element_located(
+                    (By.XPATH, "//div[@x-placement='bottom-start']//ul[@class='el-scrollbar__view el-select-dropdown__list']"))
+            )
+            sleep(1)
+            areatype_select.click()
+        else:
+            print("area_type 为空,不执行选择区域类型的操作")
+
+        STEP(4, f'输入区域名称:{area_name}')
+        area_name_input = WebDriverWait(wd, 10).until(
+            EC.presence_of_element_located(
+                (By.XPATH, "//div[contains(@class,'dialog_input')]//input[contains(@placeholder,'请输入区域名称')]"))
+        )
+        area_name_input.clear()
+        area_name_input.send_keys(area_name)
+
+        STEP(5, f'填写IP地址:{area_ip}')
+        area_ip_input = WebDriverWait(wd, 10).until(
+            EC.presence_of_element_located(
+                (By.XPATH, "//input[@placeholder='IP地址']"))
+        )
+        area_ip_input.clear()
+        area_ip_input.send_keys(area_ip)
+
+        STEP(6, f'填写备注:{remark}')
+        funtion_remark = WebDriverWait(wd, 10).until(
+            EC.presence_of_element_located(
+                (By.XPATH, "//input[@placeholder='备注']"))
+        )
+        funtion_remark.clear()
+        funtion_remark.send_keys(remark)
+
+        STEP(7, '点击确认')
+        commit = WebDriverWait(wd, 10).until(
+            EC.element_to_be_clickable(
+                (By.XPATH, "//div[@aria-label='新增']//span[contains(text(),'确 定')]"))
+        )
+        commit.click()
+
+        STEP(8, '验证是否新增成功')
+        get_menu = WebDriverWait(wd, 10).until(
+            EC.visibility_of_element_located(
+                (By.CSS_SELECTOR, '.el-message__content'))
+        )
+        get_menu1 = get_menu.text
+        CHECK_POINT('检查是否出现成功提示弹窗', get_menu1 == info)
+
+        # 截图并保存
+        SELENIUM_LOG_SCREEN(wd, "50%")
+        sleep(1)
+        wd.refresh()
\ No newline at end of file
Index: 运维集控/项目测试/运维标准版/testdata/07类型标签/新增标签.csv
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/运维集控/项目测试/运维标准版/testdata/07类型标签/新增标签.csv b/运维集控/项目测试/运维标准版/testdata/07类型标签/新增标签.csv
--- a/运维集控/项目测试/运维标准版/testdata/07类型标签/新增标签.csv	(revision 6cedec8cd6072a6338b10f2314cd65d7c35c57f7)
+++ b/运维集控/项目测试/运维标准版/testdata/07类型标签/新增标签.csv	(date 1741229657220)
@@ -1,0 +1,11 @@
+name,typename,sort
+新增标签-001-标签名称为空,,1
+新增标签-002-排序为空,智慧大屏1,
+新增标签-003-排序为小数,智慧大屏2,1.1
+新增标签-004-排序为负数,智慧大屏3,-1
+新增标签-005-正常新增标签1,智慧屏设备,1
+新增标签-006-正常新增标签2,时序电源,2
+新增标签-007-正常新增标签3,中控主机,3
+新增标签-008-正常新增标签4,音频处理器,4
+新增标签-009-正常新增标签5,视频处理器,5
+新增标签-010-正常新增标签6,网络路由,6
Index: 运维集控/项目测试/运维标准版/lib/base.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
<+>import pandas as pd\r\nimport csv\r\nimport urllib\r\nimport glob\r\nimport subprocess\r\nimport requests\r\nimport json\r\nimport hmac\r\nimport hashlib\r\nimport base64\r\nimport time\r\nimport win32api\r\nimport win32con\r\nimport win32gui\r\nimport logging\r\nfrom hytest import *\r\nimport pandas as pd\r\nfrom selenium import webdriver\r\nfrom datetime import datetime\r\nfrom urllib.parse import urlencode\r\nfrom selenium.webdriver.common.by import By\r\nfrom selenium.webdriver.edge.options import Options\r\nfrom selenium.webdriver.support.wait import WebDriverWait\r\nfrom selenium.webdriver.support import expected_conditions as EC\r\nfrom selenium.common import TimeoutException,ElementNotInteractableException\r\nfrom selenium.webdriver.common.keys import Keys\r\nfrom time import sleep\r\n\r\nlogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\r\n# 打开浏览器,忽略ssh警告\r\ndef open_browser():\r\n    INFO('打开默认浏览器')\r\n    # 更改显示屏分辨率为1920x1080\r\n    # success = change_resolution(1280, 1024)\r\n    success = change_resolution(1920, 1080)\r\n    edge_options = Options()\r\n    edge_options.add_argument('--ignore-certificate-errors')\r\n    edge_options.add_argument('--disable-blink-features=AutomationControlled')\r\n    edge_options.add_argument('--allow-insecure-localhost')\r\n    wd = webdriver.Edge(options=edge_options)\r\n    GSTORE['wd'] = wd\r\n    # wd.get('https://rms.ubainsyun.com/#/login')\r\n    wd.get('https://192.168.5.218:8443/#/login')\r\n    wd.maximize_window()\r\n    wd.implicitly_wait(10)\r\n\r\n# 定义调整屏幕分辨率,仅内部环境跑定时任务需要使用。\r\ndef change_resolution(width, height):\r\n    # 获取当前显示器的设备上下文(Device Context, DC)\r\n    device = win32api.EnumDisplayDevices(None, 0)\r\n    dm = win32api.EnumDisplaySettings(device.DeviceName, win32con.ENUM_CURRENT_SETTINGS)\r\n\r\n    if dm.PelsWidth != width or dm.PelsHeight != height:\r\n        print(f\"Changing resolution to {width}x{height}\")\r\n        dm.PelsWidth = width\r\n        dm.PelsHeight = height\r\n\r\n        # CDS_TEST 是测试模式,如果设置成功则不实际应用更改\r\n        if win32api.ChangeDisplaySettings(dm, win32con.CDS_TEST) != win32con.DISP_CHANGE_SUCCESSFUL :\r\n            print(\"The requested resolution change is not supported.\")\r\n            return False\r\n\r\n        # 实际应用新的分辨率设置\r\n        if win32api.ChangeDisplaySettings(dm, 0) != win32con.DISP_CHANGE_SUCCESSFUL:\r\n            print(\"Failed to change resolution.\")\r\n            return False\r\n\r\n        print(\"Resolution changed successfully.\")\r\n        return True\r\n    else:\r\n        print(\"The requested resolution is already set.\")\r\n        return True\r\n\r\n# 用户进行登录\r\ndef user_login(username, password, captcha):\r\n    wd = GSTORE['wd']\r\n    INFO(f'输入登录账号: {username}')\r\n    username_input = WebDriverWait(wd, 10).until(\r\n        EC.presence_of_element_located((By.XPATH, \"//input[@placeholder='请输入登录账号']\"))\r\n    )\r\n    username_input.clear()\r\n    username_input.send_keys(username)\r\n\r\n    INFO(f'输入登录密码: {password}')\r\n    password_input = WebDriverWait(wd, 10).until(\r\n        EC.presence_of_element_located((By.XPATH, \"//input[@placeholder='请输入登录密码']\"))\r\n    )\r\n    password_input.clear()\r\n    password_input.send_keys(password)\r\n\r\n    INFO(f'输入验证码:{captcha}')\r\n    captcha_input = WebDriverWait(wd, 10).until(\r\n        EC.presence_of_element_located((By.XPATH, \"//input[@placeholder='请输入验证码(区分大小写)']\"))\r\n    )\r\n    captcha_input.clear()\r\n    captcha_input.send_keys(captcha)\r\n\r\n    INFO('点击登录按钮')\r\n    login_button = WebDriverWait(wd, 3).until(\r\n        EC.element_to_be_clickable((By.XPATH, \"//div[@class='loginButton']\"))\r\n    )\r\n    login_button.click()\r\n    sleep(1)\r\n\r\ndef validate_input(username, password, captcha):\r\n    if not isinstance(username, str) or not isinstance(password, str) or not isinstance(captcha, str):\r\n        raise ValueError(\"Invalid input type\")\r\n    if len(username) < 1 or len(password) < 1 or len(captcha) < 1:\r\n        raise ValueError(\"Input cannot be empty\")\r\n\r\ndef run_login_tests(df):\r\n    # 遍历每一行\r\n    for index, row in df.iterrows():\r\n        username = row.get('username', None)\r\n        password = row.get('password', None)\r\n        captcha = row.get('captcha', None)\r\n        if username and password and captcha:\r\n            try:\r\n                validate_input(username, password, captcha)\r\n                user_login(username, password, captcha)\r\n            except ValueError as e:\r\n                print(f\"Invalid input at row {index}: {e}\")\r\n\r\n# 构建当前所在目录\r\ndef main():\r\n    # 获取当前脚本的目录\r\n    current_dir = os.path.dirname(os.path.abspath(__file__))\r\n    csv_file_path = os.path.join(current_dir, '../testdata/01登录模块/登录信息.csv')\r\n    try:\r\n        if not os.path.exists(csv_file_path):\r\n            print(f\"File not found: {csv_file_path}\")\r\n            return\r\n        df = pd.read_csv(csv_file_path, encoding='utf-8')\r\n        run_login_tests(df)\r\n    except Exception as e:\r\n        print(f\"An error occurred: {e}\")\r\n\r\n# 进入后台管理系统页面\r\ndef enter_system():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击系统设置按钮')\r\n    try:\r\n        enter_sys = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'系统设置')]\"))\r\n        )\r\n        enter_sys.click()\r\n        logging.info('系统设置按钮已点击')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking system settings button: {error}\")\r\n\r\n# 进入管理页面\r\ndef enter_manage():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击管理按钮')\r\n    try:\r\n        enter_mag = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//div[@class='el-submenu__title']//span[contains(text(),'管理')]\"))\r\n        )\r\n        enter_mag.click()\r\n        logging.info('管理按钮已点击')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入用户管理页面\r\ndef enter_user_manage():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击用户管理按钮')\r\n    try:\r\n        enter_user_mag = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'用户管理')]\"))\r\n        )\r\n        enter_user_mag.click()\r\n        logging.info('打开用户管理页面')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入区域分组管理页面\r\ndef enter_areagroup_manage():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击区域分组按钮')\r\n    try:\r\n        enter_areagroup_mag = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'区域分组')]\"))\r\n        )\r\n        enter_areagroup_mag.click()\r\n        logging.info('打开区域分组页面')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入区域类型页面\r\ndef enter_areatype_manage():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击区域类型按钮')\r\n    try:\r\n        enter_areatype_mag = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'区域类型')]\"))\r\n        )\r\n        enter_areatype_mag.click()\r\n        logging.info('打开区域类型页面')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入区域功能页面\r\ndef enter_areafuntion_manage():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击区域功能按钮')\r\n    try:\r\n        enter_areafuntion_mag = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'区域功能')]\"))\r\n        )\r\n        enter_areafuntion_mag.click()\r\n        logging.info('打开区域功能页面')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入区域管理页面\r\ndef enter_area_manage():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击区域管理按钮')\r\n    try:\r\n        enter_area_mag = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'区域管理')]\"))\r\n        )\r\n        enter_area_mag.click()\r\n        logging.info('打开区域管理页面')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入类型标签页面\r\ndef enter_typetag():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击类型标签按钮')\r\n    try:\r\n        enter_type_tag = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'类型标签')]\"))\r\n        )\r\n        enter_type_tag.click()\r\n        logging.info('打开类型标签页面')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入设备管理页面\r\ndef enter_devices_manage():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击设备管理按钮')\r\n    try:\r\n        enter_devices_mag = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'设备管理')]\"))\r\n        )\r\n        enter_devices_mag.click()\r\n        logging.info('打开设备管理页面')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入协议管理页面\r\ndef enter_protocol_manage():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击协议管理按钮')\r\n    try:\r\n        enter_protocol_mag = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'协议管理')]\"))\r\n        )\r\n        enter_protocol_mag.click()\r\n        logging.info('打开协议管理页面')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入数据类型页面\r\ndef enter_datatype():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击数据类型按钮')\r\n    try:\r\n        enter_data_type = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'数据类型')]\"))\r\n        )\r\n        enter_data_type.click()\r\n        logging.info('打开数据类型页面')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入联动动作页面\r\ndef enter_linkaction():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击联动动作按钮')\r\n    try:\r\n        enter_link_action = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'联动动作')]\"))\r\n        )\r\n        enter_link_action.click()\r\n        logging.info('打开联动动作页面')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 进入模式管理页面\r\ndef enter_model_manage():\r\n    wd = GSTORE['wd']\r\n    if wd is None:\r\n        raise ValueError(\"WebDriver is not initialized\")\r\n\r\n    logging.info('点击模式管理按钮')\r\n    try:\r\n        enter_model_mag = WebDriverWait(wd, 10).until(\r\n            EC.element_to_be_clickable((By.XPATH, \"//li[contains(text(),'模式管理')]\"))\r\n        )\r\n        enter_model_mag.click()\r\n        logging.info('打开模式管理页面')\r\n    except (TimeoutException, ElementNotInteractableException) as error:\r\n        logging.error(f\"Error clicking manage button: {error}\")\r\n\r\n# 定义钉钉接口函数\r\ndef dingding_send_message(test_report_url, title, mobile, ding_type):\r\n    \"\"\"\r\n    发送钉钉机器人消息\r\n    参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x\r\n\r\n    :param test_report_url: 测试报告链接\r\n    :param title: 消息标题\r\n    :param text: 消息内容\r\n    :param mobile: 需要@的手机号列表\r\n    \"\"\"\r\n    # 记录调用此函数的日志\r\n    logging.info(\"开始构建并发送钉钉机器人消息\")\r\n    # 钉钉机器人的 Webhook URL 和密钥(正式环境)\r\n    # webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=b0eea0bbf097ce3badb4c832d2cd0267a50486f395ec8beca6e2042102bb295b'\r\n    # secret = 'SEC928b11659c5fd6476cfa2042edbf56da876abf759289f7e4d3c671fb9a81bf43'\r\n    # 钉钉机器人的 Webhook URL 和密钥(测试环境)\r\n\r\n    if ding_type ==  '标准版巡检':\r\n        webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'\r\n        secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'\r\n    elif ding_type == '展厅巡检':\r\n        webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=061b6e9b1ae436f356cfda7fe19b6e58e46b62670046a78bd3a4d869118c612d'\r\n        secret = 'SEC93212bd880aad638cc0df2b28a72ef4fdf6651cacb8a6a4bc71dcf09705d458d'\r\n\r\n    # 生成时间戳\r\n    timestamp = str(round(time.time() * 1000))\r\n\r\n    # 生成签名\r\n    secret_enc = secret.encode('utf-8')\r\n    string_to_sign = f'{timestamp}\\n{secret}'\r\n    string_to_sign_enc = string_to_sign.encode('utf-8')\r\n    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()\r\n    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))\r\n\r\n    # 构建最终的 Webhook URL\r\n    params = {\r\n        'access_token': webhook_url.split('=')[1],\r\n        'timestamp': timestamp,\r\n        'sign': sign\r\n    }\r\n    encoded_params = urllib.parse.urlencode(params)\r\n    final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}'\r\n\r\n    # 记录最终的 Webhook URL\r\n    logging.info(f\"钉钉机器人Webhook URL: {final_webhook_url}\")\r\n\r\n    # 构建消息体\r\n    headers = {'Content-Type': 'application/json'}\r\n    message = {\r\n        'msgtype': 'link',\r\n        'link': {\r\n            'title': title,\r\n            'messageUrl': test_report_url,\r\n            'text': test_report_url\r\n        },\r\n        \"at\": {\r\n            \"atMobiles\": [mobile],\r\n            \"isAtAll\": True\r\n        }\r\n    }\r\n\r\n    try:\r\n        # 发送 POST 请求\r\n        response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers)\r\n\r\n        # 检查响应状态码\r\n        if response.status_code == 200:\r\n            logging.info('消息发送成功!')\r\n            logging.info(f'响应内容: {response.text}')\r\n        else:\r\n            logging.error(f'消息发送失败,状态码: {response.status_code}')\r\n            logging.error(f'响应内容: {response.text}')\r\n    except requests.exceptions.RequestException as e:\r\n        logging.error(f'请求异常: {e}')\r\n\r\n# 调用框架自带生成的测试报告\r\ndef get_latest_report_file(report_dir, base_url):\r\n    \"\"\"\r\n    获取指定目录下最新的HTML报告文件,并返回带有基础URL的完整路径。\r\n\r\n    :param report_dir: 报告文件所在的目录\r\n    :param base_url: 基础URL\r\n    :return: 最新的HTML报告文件的完整URL,如果没有找到则返回None\r\n    \"\"\"\r\n    # 记录调用此函数的日志\r\n    logging.info(\"开始调用get_latest_report_file函数获取报告文件\")\r\n\r\n    # 确保报告目录存在\r\n    if not os.path.exists(report_dir):\r\n        logging.error(f\"报告目录 {report_dir} 不存在。\")\r\n        return None\r\n\r\n    # 获取指定目录下所有符合模式的HTML报告文件\r\n    report_files = glob.glob(os.path.join(report_dir, 'report_*.html'))\r\n\r\n    # 打印找到的文件列表\r\n    logging.debug(f\"找到的报告文件: {report_files}\")\r\n\r\n    # 如果没有找到报告文件,记录警告信息并返回None\r\n    if not report_files:\r\n        logging.warning(\"在指定目录中没有找到报告文件。\")\r\n        return None\r\n\r\n    # 找到最新修改的报告文件\r\n    latest_file = max(report_files, key=os.path.getmtime)\r\n\r\n    # 获取最新报告文件的最后修改时间\r\n    last_modified_time = datetime.fromtimestamp(os.path.getmtime(latest_file)).strftime('%Y-%m-%d %H:%M:%S')\r\n\r\n    # 记录最新报告文件的信息\r\n    logging.info(f\"最新报告文件: {latest_file}, 最后修改时间: {last_modified_time}\")\r\n\r\n    # 将文件路径转换为相对于基础URL的相对路径\r\n    relative_path = os.path.relpath(latest_file, report_dir)\r\n\r\n    # 生成完整的URL\r\n    full_url = f\"{base_url}/{relative_path}\".replace(\"\\\\\", \"/\")\r\n\r\n    # 返回完整的URL\r\n    return full_url\r\n\r\n# 定义发送钉钉报告函数\r\ndef get_reportfile_send_dingding(report_title, report_url_prefix, ding_type):\r\n    try:\r\n        # 获取报告文件所在的目录\r\n        report_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..','log')\r\n\r\n        # 获取基础URL\r\n        base_url = report_url_prefix\r\n\r\n        # 获取最新的报告文件\r\n        latest_report = get_latest_report_file(report_dir, base_url)\r\n\r\n        # 如果找到了最新的报告文件,则发送报告链接到钉钉\r\n        if latest_report:\r\n            logging.info(f\"最新报告文件URL: {latest_report}\")\r\n\r\n            try:\r\n                # 记录调用钉钉消息通知函数的日志\r\n                logging.info(\"开始调用钉钉消息通知函数\")\r\n\r\n                # 调用钉钉发送消息接口进行推送测试报告链接\r\n                dingding_send_message(latest_report, report_title, \"13724387318\", ding_type)\r\n\r\n                # 记录钉钉消息通知函数调用成功的日志\r\n                logging.info(\"钉钉消息通知函数调用成功\")\r\n            except Exception as e:\r\n                # 记录钉钉消息通知函数调用失败的日志\r\n                logging.error(f\"钉钉消息通知函数调用失败: {e}\")\r\n        else:\r\n            # 记录没有找到报告文件的日志\r\n            logging.warning(\"没有找到报告文件以发送。\")\r\n\r\n    except subprocess.CalledProcessError as e:\r\n        # 处理子进程调用失败的异常\r\n        logging.error(f\"命令执行失败,返回码 {e.returncode}: {e.output}\")\r\n    except OSError as e:\r\n        # 处理操作系统相关的异常\r\n        logging.error(f\"发生操作系统错误: {e}\")\r\n    finally:\r\n        # 无论是否成功,都记录测试结束的日志\r\n        logging.info(\"自动化测试完成。\")\r\n\r\n# 获取csv文件数据\r\ndef read_csv_data(csv_file_path):\r\n    \"\"\"\r\n    读取CSV文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。\r\n    参数:\r\n    csv_file_path (str): CSV文件的路径。\r\n    返回:\r\n    list: 包含字典的列表,每个字典包含测试用例的名称和参数。\r\n    \"\"\"\r\n    # 打开CSV文件,使用只读模式,确保兼容性并处理编码\r\n    with open(csv_file_path, mode='r', newline='', encoding='utf-8') as file:\r\n        # 创建CSV阅读器\r\n        reader = csv.reader(file)\r\n        # 读取表头,为后续数据解析做准备\r\n        headers = next(reader)\r\n        ddt_cases = []\r\n        # 遍历CSV文件中的每一行数据\r\n        for row in reader:\r\n            # 将每一行数据转换为字典,其中包含测试用例的名称和参数\r\n            case = {\r\n                'name': row[0],\r\n                'para': row[1:]\r\n            }\r\n            # 将转换后的测试用例数据添加到列表中\r\n            ddt_cases.append(case)\r\n    # 日志记录:CSV文件已读取\r\n    INFO(\"CSV文件已读取\")\r\n    # 返回包含所有测试用例数据的列表\r\n    return ddt_cases\r\n\r\ndef run_automation_test(report_title, report_url_prefix , ding_type):\r\n    \"\"\"\r\n    运行自动化测试并生成报告。\r\n\r\n    参数:\r\n    - report_title: 报告的标题\r\n    - report_url_prefix: 报告URL的前缀\r\n    - test_case: 测试用例脚本执行的标签名\r\n    \"\"\"\r\n    # 记录测试开始的日志\r\n    logging.info(\"开始自动化测试...\")\r\n\r\n    # 构建运行测试命令\r\n    command = [\r\n        'hytest',\r\n        '--report_title', report_title,\r\n        '--report_url_prefix', report_url_prefix\r\n    ]\r\n\r\n    # 记录将要执行的命令日志\r\n    logging.info(f\"执行命令: {' '.join(command)}\")\r\n\r\n    try:\r\n        # 执行测试命令并获取结果\r\n        result = subprocess.run(command, capture_output=True, text=True, check=True)\r\n\r\n        # 记录命令的标准输出和错误输出\r\n        logging.debug(f\"命令标准输出: {result.stdout}\")\r\n        logging.debug(f\"命令错误输出: {result.stderr}\")\r\n    except subprocess.CalledProcessError as e:\r\n        # 处理子进程调用失败的异常\r\n        logging.error(f\"命令执行失败,返回码 {e.returncode}: {e.output}\")\r\n    except OSError as e:\r\n        # 处理操作系统相关的异常\r\n        logging.error(f\"发生操作系统错误: {e}\")\r\n    finally:\r\n        # 无论测试是否成功,都记录测试结束的日志\r\n        logging.info(\"自动化测试完成。\")\r\n\r\n        # 调用回调函数处理后续操作\r\n        get_reportfile_send_dingding(f\"{report_title}\", report_url_prefix, ding_type)\r\n
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/运维集控/项目测试/运维标准版/lib/base.py b/运维集控/项目测试/运维标准版/lib/base.py
--- a/运维集控/项目测试/运维标准版/lib/base.py	(revision 6cedec8cd6072a6338b10f2314cd65d7c35c57f7)
+++ b/运维集控/项目测试/运维标准版/lib/base.py	(date 1741231416971)
@@ -1,4 +1,6 @@
+import openpyxl
 import pandas as pd
+import re
 import csv
 import urllib
 import glob
@@ -22,7 +24,7 @@
 from selenium.webdriver.edge.options import Options
 from selenium.webdriver.support.wait import WebDriverWait
 from selenium.webdriver.support import expected_conditions as EC
-from selenium.common import TimeoutException,ElementNotInteractableException
+from selenium.common import TimeoutException, ElementNotInteractableException, NoSuchElementException
 from selenium.webdriver.common.keys import Keys
 from time import sleep
 
@@ -44,7 +46,7 @@
     wd.maximize_window()
     wd.implicitly_wait(10)
 
-# 定义调整屏幕分辨率,仅内部环境跑定时任务需要使用。
+# 定义调整屏幕分辨率,仅虚拟机电脑环境跑定时任务需要使用。
 def change_resolution(width, height):
     # 获取当前显示器的设备上下文(Device Context, DC)
     device = win32api.EnumDisplayDevices(None, 0)
@@ -537,6 +539,7 @@
     # 返回包含所有测试用例数据的列表
     return ddt_cases
 
+# 定义生成的报告,并且发送钉钉通知
 def run_automation_test(report_title, report_url_prefix , ding_type):
     """
     运行自动化测试并生成报告。
@@ -578,3 +581,351 @@
 
         # 调用回调函数处理后续操作
         get_reportfile_send_dingding(f"{report_title}", report_url_prefix, ding_type)
+
+
+# 获取报告中的通过率、失败率和异常率,用于发送钉钉报告
+def get_test_result(latest_report, wd):
+    """
+    获取测试结果页面的通过率、失败率和异常率
+
+    :param latest_report: 测试结果页面的URL
+    :param wd: WebDriver实例,用于访问和操作网页
+    :return: 包含通过率、失败率和异常率的字典
+    """
+    # 初始化测试结果字典
+    test_result = {
+        "pass_percent": "",
+        "fail_percent": "",
+        "exception_percent": ""
+    }
+
+    # 访问测试结果页面
+    wd.get(latest_report)
+    sleep(5)
+
+    # 点击简略显示
+    safe_click((By.XPATH,"//div[@id='display_mode']"), wd)
+    sleep(5)
+
+    # 定义一个函数来获取和解析百分比
+    def get_percentage(selector, wd):
+        text = elment_get_text(selector, wd)
+        logging.info(f"获取的文本:{text}")
+        match = re.search(r'(\d+(\.\d+)?)%', text)
+        if match:
+            return match.group(0)
+        else:
+            logging.error(f"未找到百分比匹配项,文本内容: {text}")
+            return "0"
+
+    # 获取通过率
+    pass_percent = get_percentage((By.CSS_SELECTOR, "div[class='result_barchart'] div:nth-child(1) span:nth-child(1)"), wd)
+    test_result["pass_percent"] = pass_percent
+
+    # 获取失败率
+    fail_percent = get_percentage(
+        (By.CSS_SELECTOR, "body > div.main_section > div.result > div > div:nth-child(2) > span"), wd)
+    test_result["fail_percent"] = fail_percent
+
+    # 获取异常率
+    exception_percent = get_percentage(
+        (By.CSS_SELECTOR, "body > div.main_section > div.result > div > div:nth-child(3) > span"), wd)
+    test_result["exception_percent"] = exception_percent
+    # 输出test_result
+    logging.info(test_result)
+    print(test_result)
+    sleep(5)
+
+    # 返回测试结果字典
+    return test_result
+
+
+# 通用方法-点击元素
+def safe_click(element_locator, wd):
+    """
+    对其定位器指定的元素执行安全单击。
+    此函数尝试以处理潜在异常的方式单击元素。
+    它等待元素可见并可单击,然后再尝试单击它。
+    如果该元素在20秒内无法点击,或者它不存在,
+    或者不可交互,它会捕获相应的异常并记录一条信息性消息。
+    参数:
+    -element_locator:要单击的元素的定位器,指定为元组。
+    -wd:用于查找元素并与之交互的WebDriver实例。
+    """
+    try:
+        # Wait up to 20 seconds for the element to be visible
+        element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))
+        # Attempt to click the element
+        element.click()
+    except TimeoutException:
+        # Log a message if the element is not found or not clickable within 20 seconds
+        INFO(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.")
+    except NoSuchElementException:
+        # Log a message if the element is not found
+        INFO(f"NoSuchElementException: Element {element_locator} not found.")
+    except ElementNotInteractableException:
+        # Log a message if the element is not interactable
+        INFO(f"ElementNotInteractableException: Element {element_locator} is not interactable.")
+
+# 通用方法-在输入框输入内容
+def safe_send_keys(element_locator, value, wd):
+    """
+    安全地向网页元素发送键值。
+    该函数尝试在指定时间内找到指定的网页元素,如果找到并且元素可见,将先清除元素内容,然后发送指定的键值。
+    如果在指定时间内未能找到元素或元素不可点击,则捕获相应的异常并打印错误信息。
+    参数:
+    element_locator (tuple): 用于定位网页元素的策略和定位器的元组,例如(By.ID, 'element_id')。
+    value (str): 要发送到元素的键值字符串。
+    wd: WebDriver实例,用于与浏览器交互。
+    异常处理:
+    - TimeoutException: 如果元素在指定时间内未被找到或不可点击。
+    - NoSuchElementException: 如果元素不存在。
+    - ElementNotInteractableException: 如果元素存在但不可交互。
+    """
+    try:
+        # 等待元素在指定时间内可见
+        element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))
+        element.clear()  # 清除元素的当前值
+        element.send_keys(value)  # 向元素发送指定的键值
+    except TimeoutException:
+        # 如果元素在指定时间内未被找到或不可点击,打印超时异常信息
+        INFO(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.")
+    except NoSuchElementException:
+        # 如果元素不存在,打印相应异常信息
+        INFO(f"NoSuchElementException: Element {element_locator} not found.")
+    except ElementNotInteractableException:
+        # 如果元素不可交互,打印相应异常信息
+        INFO(f"ElementNotInteractableException: Element {element_locator} is not interactable.")
+
+# 通用方法-输入框清除
+def input_clear(element_locator, wd):
+    """
+    清空输入框中的文本。
+
+    该函数通过元素定位器找到指定的输入框,并尝试清空其内容。它使用显式等待来确保元素在尝试清除之前是可见的。
+
+    参数:
+    - element_locator: 用于定位输入框的元素定位器,通常是一个元组,包含定位方法和定位表达式。
+    - wd: WebDriver实例,用于与浏览器交互。
+
+    异常处理:
+    - TimeoutException: 如果在指定时间内元素不可见,则捕获此异常并打印超时异常消息。
+    - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的消息。
+    - ElementNotInteractableException: 如果元素不可操作(例如,元素不可见或不可点击),则捕获此异常并打印相应消息。
+    """
+    try:
+        # 等待元素可见,并在可见后清空输入框。
+        input_element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))
+        input_element.clear()
+    except TimeoutException:
+        # 如果元素在20秒内不可见,打印超时异常消息。
+        print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.")
+    except NoSuchElementException:
+        # 如果找不到元素,打印未找到元素的消息。
+        print(f"NoSuchElementException: Element {element_locator} not found.")
+    except ElementNotInteractableException:
+        # 如果元素不可操作,打印元素不可操作的消息。
+        print(f"ElementNotInteractableException: Element {element_locator} is not interactable.")
+
+# 通用方法-回车
+def send_keyboard(element_locator, wd):
+    """
+    向指定元素发送键盘事件。
+
+    该函数尝试找到页面上的一个元素,并向其发送RETURN键点击事件。它使用显式等待来确保元素在尝试交互之前是可见的。
+    如果在指定时间内元素不可见、不存在或不可交互,则捕获相应的异常并打印错误消息。
+
+    参数:
+    - element_locator: 用于定位元素的元组,格式为(by, value)。例如,(By.ID, 'element_id')。
+    - wd: WebDriver实例,用于与浏览器进行交互。
+
+    异常处理:
+    - TimeoutException: 如果元素在20秒内不可见,则捕获此异常并打印超时错误消息。
+    - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的错误消息。
+    - ElementNotInteractableException: 如果元素不可交互(例如,被遮挡或不可点击),则捕获此异常并打印相应错误消息。
+    """
+    try:
+        # 等待元素可见,并在可见后向其发送RETURN键点击事件。
+        element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))
+        element.send_keys(Keys.RETURN)
+    except TimeoutException:
+        # 如果元素在指定时间内不可见,打印超时错误消息。
+        print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.")
+    except NoSuchElementException:
+        # 如果找不到指定的元素,打印未找到元素的错误消息。
+        print(f"NoSuchElementException: Element {element_locator} not found.")
+    except ElementNotInteractableException:
+        # 如果元素不可交互,打印不可交互错误消息。
+        print(f"ElementNotInteractableException: Element {element_locator} is not interactable.")
+
+# 通用方法-获取提示文本
+def get_notify_text(wd,element_locator):
+    """
+    获取通知文本信息。
+
+    该函数通过WebDriver等待特定的元素出现,并截取其文本内容作为通知信息。此外,它还负责在获取通知文本后进行屏幕截图,
+    以便于后续的调试或记录需要。
+
+    参数:
+    wd (WebDriver): 由上层传入的WebDriver对象,用于操作浏览器。
+
+    返回:
+    str: 提取的通知文本信息。如果未能提取到信息或发生异常,则返回None。
+    """
+    try:
+        # 获取提示信息
+        notify_text = WebDriverWait(wd, 60).until(
+            EC.presence_of_element_located(element_locator)
+        ).text
+        # 屏幕截图
+        SELENIUM_LOG_SCREEN(wd,"50%")
+        return notify_text
+    except Exception as e:
+        # 记录异常信息
+        INFO(f"Exception occurred: {e}")
+
+# 通用方法-获取页面中的文本内容
+def elment_get_text(element_locator, wd):
+    """
+    获取页面元素的文本。
+
+    该函数通过显式等待的方式,确保页面元素在20秒内可见并获取其文本。
+    如果在规定时间内元素不可见、不存在或不可交互,则会捕获相应的异常并打印错误信息。
+
+    参数:
+    - element_locator: 用于定位页面元素的元组,通常包含定位方式和定位值。
+    - wd: WebDriver对象,用于与浏览器进行交互。
+
+    返回:
+    - element_text: 页面元素的文本。如果发生异常,则返回None。
+    """
+    try:
+        # 使用WebDriverWait等待页面元素在20秒内可见,并获取其文本。
+        element_text = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)).text
+        return element_text
+    except TimeoutException:
+        # 如果超过20秒元素仍未可见,则捕获TimeoutException异常并打印错误信息。
+        print(f"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.")
+    except NoSuchElementException:
+        # 如果页面上不存在该元素,则捕获NoSuchElementException异常并打印错误信息。
+        print(f"NoSuchElementException: Element {element_locator} not found.")
+    except ElementNotInteractableException:
+        # 如果元素存在但不可交互(例如被遮挡),则捕获ElementNotInteractableException异常并打印错误信息。
+        print(f"ElementNotInteractableException: Element {element_locator} is not interactable.")
+
+# 通用方法-读取XLSX文件中的数据,并将其转换为一个包含字典的列表
+def read_xlsx_data(xlsx_file_path):
+    """
+    读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。
+
+    参数:
+    xlsx_file_path (str): XLSX文件的路径。
+
+    返回:
+    list: 包含字典的列表,每个字典包含测试用例的名称和参数。
+    """
+    # 打开XLSX文件
+    workbook = openpyxl.load_workbook(xlsx_file_path)
+    # 假设数据在第一个工作表中
+    sheet = workbook.active
+
+    # 读取表头,从第三行开始
+    headers = [cell.value for cell in sheet[3]]
+
+    # 打印表头列名
+    # print(f"表头列名: {headers}")
+
+    # 找到表头中名为 'JSON' 的列索引
+    try:
+        json_index = headers.index('JSON')
+    except ValueError as e:
+        raise ValueError(f"表头中没有找到所需的列: {e}")
+
+    ddt_cases = []
+    # 遍历XLSX文件中的每一行数据,从第四行开始
+    for row in sheet.iter_rows(min_row=4, values_only=True):
+        # 获取 JSON 列的数据
+        json_data = row[json_index]
+
+        # 打印 JSON 数据以进行调试
+        # print(f"JSON 数据: {json_data}")
+
+        # 解析 JSON 字符串
+        try:
+            if json_data:
+                parsed_json = json.loads(json_data)
+            else:
+                raise ValueError("JSON 数据为空")
+        except json.JSONDecodeError:
+            raise ValueError(f"无法解析 JSON 数据: {json_data}")
+
+        # 将解析后的 JSON 数据添加到列表中
+        ddt_cases.append(parsed_json)
+
+    # 日志记录:XLSX文件已读取
+    INFO("XLSX文件已读取")
+    # 返回包含所有测试用例数据的列表
+    return ddt_cases
+
+# 通用方法-转换字符串类型
+def get_by_enum(type_str):
+    """
+    将字符串类型的定位器类型转换为 selenium.webdriver.common.by.By 枚举类型。
+
+    参数:
+    type_str (str): 定位器类型字符串,例如 'XPATH'。
+
+    返回:
+    selenium.webdriver.common.by.By: 对应的 By 枚举类型。
+    """
+    type_str = type_str.upper()
+    if type_str == 'XPATH':
+        return By.XPATH
+    elif type_str == 'ID':
+        return By.ID
+    elif type_str == 'NAME':
+        return By.NAME
+    elif type_str == 'CLASS_NAME':
+        return By.CLASS_NAME
+    elif type_str == 'CSS_SELECTOR':
+        return By.CSS_SELECTOR
+    elif type_str == 'TAG_NAME':
+        return By.TAG_NAME
+    elif type_str == 'LINK_TEXT':
+        return By.LINK_TEXT
+    elif type_str == 'PARTIAL_LINK_TEXT':
+        return By.PARTIAL_LINK_TEXT
+    else:
+        raise ValueError(f"未知的定位器类型: {type_str}")
+
+# 通用方法-用户登录
+def user_login(element_locators, username, password, verifycode):
+    """
+    管理员登录函数。
+    该函数通过模拟用户输入用户名、密码和验证码,并点击登录按钮,以实现管理员登录。
+    """
+    # 获取webdriver实例
+    wd = GSTORE['wd']
+
+    # 打印用户名输入信息
+    INFO(f"输入用户名:{username}")
+    # 向用户名输入框发送用户名
+    safe_send_keys(element_locators['username_locator'], f'{username}', wd)
+    sleep(5)
+
+    # 打印密码输入信息
+    INFO(f"输入密码:{password}")
+    # 向密码输入框发送密码
+    safe_send_keys(element_locators['password_locator'], f"{password}", wd)
+    sleep(5)
+
+    # 打印验证码输入信息
+    INFO(f"输入验证码:{verifycode}")
+    # 向验证码输入验证码
+    safe_send_keys(element_locators['verifycode_locator'], f"{verifycode}", wd)
+    sleep(5)
+
+    #点击登录按钮
+    INFO("点击登录按钮")
+    safe_click(element_locators['submitButton_locator'], wd)
+    sleep(5)
Index: 运维集控/项目测试/运维标准版/testdata/08设备管理/新增设备.csv
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/运维集控/项目测试/运维标准版/testdata/08设备管理/新增设备.csv b/运维集控/项目测试/运维标准版/testdata/08设备管理/新增设备.csv
--- a/运维集控/项目测试/运维标准版/testdata/08设备管理/新增设备.csv	(revision 6cedec8cd6072a6338b10f2314cd65d7c35c57f7)
+++ b/运维集控/项目测试/运维标准版/testdata/08设备管理/新增设备.csv	(date 1741229657231)
@@ -1,0 +1,2 @@
+name,area_name,device_name,type,brand,model,sn,ip,tag,supplier,contact,price,expiry_time,sort,useYear,description,relateMaster,outMaster
+新增设备-001-正常新增设备,测试区域11,测试设备0,智慧屏设备,华为品牌,Ideahub,123456789,192.168.1.1,资产设备1,华为,13141234567,2000,2025-01-01 12:00:00,1,3,用途说明1,否,否
\ No newline at end of file
Index: 运维集控/项目测试/运维标准版/cases/07类型标签/01新增标签.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/运维集控/项目测试/运维标准版/cases/07类型标签/01新增标签.py b/运维集控/项目测试/运维标准版/cases/07类型标签/01新增标签.py
new file mode 100644
--- /dev/null	(date 1741229657191)
+++ b/运维集控/项目测试/运维标准版/cases/07类型标签/01新增标签.py	(date 1741229657191)
@@ -0,0 +1,85 @@
+import sys
+import os
+
+from hytest.common import SELENIUM_LOG_SCREEN
+
+sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', '..')))
+from 运维集控.项目测试.运维标准版.lib.base import *
+
+#构建当前项目路径
+sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', '..')))
+# 构建 CSV 文件的绝对路径
+csv_path = os.path.abspath(
+    os.path.join(os.path.dirname(__file__), '..', '..', 'testdata', '03类型标签', '新增标签.csv'))
+
+class AreagroupAdd:
+    tag = ['新增类型标签']
+    ddt_cases = read_csv_data(csv_path)
+    def teststeps(self):
+        wd = GSTORE['wd']
+        # 从self.para中解构出数据
+        name = self.name
+        area_group, group_name, group_address, remark, info = self.para
+
+        STEP(1, '点击新增按钮')
+        areagroup_add = WebDriverWait(wd, 10).until(
+            EC.element_to_be_clickable((By.XPATH, "//div[@class='company-edmit-right']//span[contains(text(),'新增')]"))
+        )
+        areagroup_add.click()
+
+        STEP(2, f'查看并选择类型标签:{area_group}')
+        if area_group:
+            area_group_input = WebDriverWait(wd, 10).until(
+                EC.presence_of_element_located((By.XPATH, "// input[ @ placeholder = '请选择分组']"))
+            )
+            area_group_input.clear()
+            area_group_input.send_keys(area_group)
+            sleep(1)
+            #默认选择第一个内容
+            area_group_select = WebDriverWait(wd, 10).until(
+                EC.presence_of_element_located((By.XPATH, "//div[@class='el-cascader__suggestion-panel el-scrollbar']//li[1]//span[1]"))
+            )
+            area_group_select.click()
+        else:
+            print("group_name 为空,不执行选择区域分组的操作")
+
+        STEP(3, f'填写标签名称:{group_name}')
+        group_name_input = WebDriverWait(wd, 10).until(
+            EC.presence_of_element_located((By.XPATH, "//input[@placeholder='长度1-20个字符']"))
+        )
+        group_name_input.clear()
+        group_name_input.send_keys(group_name)
+
+        STEP(4, f'填写标签排序:{group_address}')
+        group_address_input = WebDriverWait(wd, 10).until(
+            EC.presence_of_element_located((By.XPATH, "//input[@placeholder='输入地址不能大于50个字符']"))
+        )
+        group_address_input.clear()
+        group_address_input.send_keys(group_address)
+
+        STEP(5, f'填写分组备注:{remark}')
+        group_remark = WebDriverWait(wd, 10).until(
+            EC.presence_of_element_located(
+                (By.XPATH, "//input[@placeholder='请输入备注']"))
+        )
+        group_remark.clear()
+        group_remark.send_keys(remark)
+
+        STEP(6, '点击确认')
+        commit = WebDriverWait(wd, 10).until(
+            EC.element_to_be_clickable((By.XPATH, "//div[@class='dialog-footer']//span[contains(text(),'确 定')]"))
+        )
+        commit.click()
+
+        STEP(7, '验证是否新增成功')
+        get_menu = WebDriverWait(wd, 10).until(
+            EC.visibility_of_element_located((By.XPATH, "//p[@class='el-message__content']"))
+        )
+        get_menu1 = get_menu.text
+        CHECK_POINT('检查是否出现成功提示弹窗', get_menu1 == info )
+
+        # 截图并保存
+        SELENIUM_LOG_SCREEN(wd, "50%")
+        sleep(1)
+        wd.refresh()
+
Index: 预定系统/Base/base.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
<+>import csv\r\nimport glob\r\nimport re\r\nimport urllib\r\nfrom selenium.webdriver.chrome.service import Service\r\nimport json\r\nimport hmac\r\nimport hashlib\r\nimport base64\r\nimport psutil\r\nimport time\r\nimport subprocess\r\nimport logging\r\nfrom hytest import *\r\nfrom selenium import webdriver\r\nfrom selenium.common import ElementNotInteractableException\r\nfrom selenium.webdriver.common.keys import Keys\r\nfrom urllib.parse import urlencode\r\nfrom datetime import datetime\r\n\r\n# 获取当前脚本的绝对路径\r\ncurrent_dir = os.path.dirname(os.path.abspath(__file__))\r\n# 获取当前脚本的父目录\r\nparent_dir = os.path.dirname(current_dir)\r\nlogging.info(parent_dir)\r\n# 添加路径\r\nsys.path.append(current_dir)\r\n\r\n# 配置日志记录器,仅输出到控制台\r\nlogging.basicConfig(\r\n    level=logging.DEBUG,\r\n    format='%(asctime)s - %(levelname)s - %(message)s',\r\n    handlers=[\r\n        logging.StreamHandler()\r\n    ]\r\n)\r\n\r\ndef browser_init(login_type):\r\n    \"\"\"\r\n    初始化浏览器设置和实例。\r\n\r\n    此函数旨在创建并配置一个Chrome浏览器实例,包括设置Chrome选项以排除不必要的日志,\r\n    并尝试打开特定的登录页面。任何初始化过程中出现的错误都会被捕获并记录。\r\n\r\n    参数:\r\n    login_type (str): 指定登录类型,根据不同的登录类型选择不同的URL。\r\n\r\n    返回:\r\n    无\r\n    \"\"\"\r\n    # 标记初始化过程的开始\r\n    INFO(\"'----------' 正在初始化浏览器 '----------'\")\r\n\r\n    # 创建Chrome选项实例,用于配置浏览器行为\r\n    options = webdriver.ChromeOptions()\r\n    # 添加实验性选项,排除某些命令行开关以减少输出日志\r\n    options.add_experimental_option('excludeSwitches', ['enable-Logging'])\r\n    # 忽略证书错误,允许在本地主机上运行时不安全\r\n    options.add_argument('--ignore-certificate-errors')\r\n    # 禁用自动化控制特征检测,避免被网站识别为自动化流量\r\n    options.add_argument('--disable-blink-features=AutomationControlled')\r\n    # 允许不安全的本地主机运行,通常用于开发和测试环境\r\n    options.add_argument('--allow-insecure-localhost')\r\n\r\n    # 使用webdriver_manager自动下载并管理chromedriver\r\n    # service = ChromeService(ChromeDriverManager().install())\r\n    # 使用备用的ChromeDriver下载源\r\n    # service = Service(ChromeDriverManager().install())\r\n    # 手动指定ChromeDriver的路径\r\n    # 自动化运行服务器的chromedriver路径:\r\n    service = Service(r'C:\\Users\\29194\\AppData\\Local\\Programs\\Python\\Python310\\Scripts\\chromedriver.exe')\r\n    # service = Service(r'C:\\Program Files\\Python310\\Scripts\\chromedriver.exe')\r\n    # 尝试创建WebDriver实例并执行初始化操作\r\n    try:\r\n        # 创建WebDriver实例\r\n        wd = webdriver.Chrome(service=service, options=options)\r\n        # 设置隐式等待时间为10秒,以允许元素加载\r\n        wd.implicitly_wait(60)\r\n\r\n        # 获取登录URL\r\n        login_url = get_login_url_from_config(login_type)\r\n        # 打开对应类型的登录页面\r\n        wd.get(login_url)\r\n        # 最大化浏览器窗口\r\n        wd.maximize_window()\r\n\r\n        # 将WebDriver实例存储在全局存储器中,以便后续使用\r\n        GSTORE['wd'] = wd\r\n        # 标记初始化过程完成\r\n        INFO(\"'----------' 浏览器初始化完成 '----------'\")\r\n    except Exception as e:\r\n        # 捕获并记录初始化过程中的任何异常\r\n        logging.error(f\"浏览器初始化失败:{e}\")\r\n\r\n\r\ndef get_login_url_from_config(login_type):\r\n    \"\"\"\r\n    从配置文件中读取登录URL。\r\n\r\n    参数:\r\n    login_type (str): 指定登录类型,根据不同的登录类型选择不同的URL。\r\n\r\n    返回:\r\n    str: 对应的登录URL。\r\n    \"\"\"\r\n    # 检查 login_type 是否为空或 None\r\n    if not login_type:\r\n        raise ValueError(\"login_type 不能为空\")\r\n\r\n    # 获取当前脚本的绝对路径\r\n    current_dir = os.path.dirname(os.path.abspath(__file__))\r\n    # 构建配置文件的绝对路径,指向 ubains-module-test 目录下的 config.json\r\n    config_path = os.path.abspath(os.path.join(current_dir, '..', '..', 'config.json'))\r\n    # 规范化路径,防止路径遍历攻击\r\n    config_path = os.path.normpath(config_path)\r\n\r\n    # 记录配置文件路径以便调试,对路径进行脱敏处理\r\n    logging.info(f\"配置文件路径: {os.path.basename(config_path)}\")\r\n\r\n    # 检查文件是否存在\r\n    if not os.path.exists(config_path):\r\n        # 如果配置文件不存在,则抛出异常\r\n        raise FileNotFoundError(f\"配置文件 {config_path} 不存在\")\r\n\r\n    try:\r\n        # 读取配置文件\r\n        with open(config_path, 'r', encoding='utf-8') as config_file:\r\n            # 将配置文件内容解析为 JSON 格式\r\n            config = json.load(config_file)\r\n            # 根据 login_type 获取对应的登录 URL\r\n            login_url = config.get(login_type)\r\n            # 记录正在打开的登录页面类型和 URL\r\n            logging.info(f\"正在打开 {login_type} 的登录页面:{login_url}\")\r\n    except IOError as e:\r\n        # 处理文件读取异常\r\n        raise IOError(f\"读取配置文件失败: {e}\")\r\n    except json.JSONDecodeError as e:\r\n        # 处理 JSON 解析异常\r\n        raise json.JSONDecodeError(f\"解析配置文件失败: {e}\")\r\n\r\n    # 检查是否成功获取到 URL\r\n    if not login_url:\r\n        # 如果未找到对应的 URL,则抛出异常\r\n        raise ValueError(f\"未找到对应的 URL 配置项: {login_type}\")\r\n\r\n    # 返回登录 URL\r\n    return login_url\r\n\r\n\r\ndef admin_login(username, password):\r\n    \"\"\"\r\n    管理员登录函数。\r\n    该函数通过模拟用户输入用户名、密码和验证码,并点击登录按钮,以实现管理员登录。\r\n    \"\"\"\r\n    # 获取webdriver实例\r\n    wd = GSTORE['wd']\r\n\r\n    # 打印用户名输入信息\r\n    INFO(f\"输入用户名:{username}\")\r\n    # 向用户名输入框发送用户名\r\n    safe_send_keys((By.XPATH, \"//input[@placeholder='请输入账号或手机号或邮箱号']\"), f'{username}', wd)\r\n\r\n    # 打印密码输入信息\r\n    INFO(f\"输入密码:{password}\")\r\n    # 向密码输入框发送密码\r\n    safe_send_keys((By.XPATH, \"//input[@placeholder='请输入密码']\"), f\"{password}\", wd)\r\n\r\n    # 打印验证码输入信息\r\n    INFO(\"输入验证码:csba\")\r\n    # 向验证码输入框发送验证码\r\n    safe_send_keys((By.XPATH, \"//input[@placeholder='请输入图形验证码']\"), \"csba\", wd)\r\n    # 隐式等待5秒,以确保验证码被正确处理\r\n    wd.implicitly_wait(5)\r\n\r\n    # 打印登录按钮点击信息\r\n    INFO(\"点击登录按钮\")\r\n    # 点击登录按钮\r\n    safe_click((By.XPATH, \"//input[@value='登 录']\"), wd)\r\n\r\ndef enter_the_backend():\r\n    \"\"\"\r\n    进入后台系统界面。\r\n    该函数通过模拟点击操作,找到并点击后台系统入口,以进入后台界面。\r\n    \"\"\"\r\n    # 记录进入后台系统的操作信息\r\n    INFO(\"进入后台\")\r\n\r\n    # 获取webdriver对象,用于后续的页面元素操作\r\n    wd = GSTORE['wd']\r\n\r\n    # 执行点击操作,通过XPath定位到后台系统入口图标并点击\r\n    safe_click((By.XPATH, \"//img[@title='后台系统']\"), wd)\r\n\r\ndef safe_send_keys(element_locator, value, wd):\r\n    \"\"\"\r\n    安全地向网页元素发送键值。\r\n    该函数尝试在指定时间内找到指定的网页元素,如果找到并且元素可见,将先清除元素内容,然后发送指定的键值。\r\n    如果在指定时间内未能找到元素或元素不可点击,则捕获相应的异常并打印错误信息。\r\n    参数:\r\n    element_locator (tuple): 用于定位网页元素的策略和定位器的元组,例如(By.ID, 'element_id')。\r\n    value (str): 要发送到元素的键值字符串。\r\n    wd: WebDriver实例,用于与浏览器交互。\r\n    异常处理:\r\n    - TimeoutException: 如果元素在指定时间内未被找到或不可点击。\r\n    - NoSuchElementException: 如果元素不存在。\r\n    - ElementNotInteractableException: 如果元素存在但不可交互。\r\n    \"\"\"\r\n    try:\r\n        # 等待元素在指定时间内可见\r\n        element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))\r\n        element.clear()  # 清除元素的当前值\r\n        element.send_keys(value)  # 向元素发送指定的键值\r\n    except TimeoutException:\r\n        # 如果元素在指定时间内未被找到或不可点击,打印超时异常信息\r\n        INFO(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n    except NoSuchElementException:\r\n        # 如果元素不存在,打印相应异常信息\r\n        INFO(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n    except ElementNotInteractableException:\r\n        # 如果元素不可交互,打印相应异常信息\r\n        INFO(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\ndef safe_click(element_locator, wd):\r\n    \"\"\"\r\n    对其定位器指定的元素执行安全单击。\r\n    此函数尝试以处理潜在异常的方式单击元素。\r\n    它等待元素可见并可单击,然后再尝试单击它。\r\n    如果该元素在20秒内无法点击,或者它不存在,\r\n    或者不可交互,它会捕获相应的异常并记录一条信息性消息。\r\n    参数:\r\n    -element_locator:要单击的元素的定位器,指定为元组。\r\n    -wd:用于查找元素并与之交互的WebDriver实例。\r\n    \"\"\"\r\n    try:\r\n        # Wait up to 20 seconds for the element to be visible\r\n        element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))\r\n        # Attempt to click the element\r\n        element.click()\r\n    except TimeoutException:\r\n        # Log a message if the element is not found or not clickable within 20 seconds\r\n        INFO(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n    except NoSuchElementException:\r\n        # Log a message if the element is not found\r\n        INFO(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n    except ElementNotInteractableException:\r\n        # Log a message if the element is not interactable\r\n        INFO(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\nfrom time import sleep\r\nfrom selenium.webdriver.common.by import By\r\n\r\ndef issue_send_and_upload(wd, issue_num, issue_name):\r\n    \"\"\"\r\n        输入议题名称以及上传议题文件。\r\n    \"\"\"\r\n\r\n    issue_file_path = [\r\n        r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\5.164Scan 安全报告.pdf\",\r\n        r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\IdeaTop软件配置&操作说明文档.docx\",\r\n        r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\ideaTop部署配置视频.mp4\",\r\n        r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\IdeaTop软件配置&操作说明文档.docx\",\r\n        r\"D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\reports\\issue_file\\议题图片.png\"\r\n    ]\r\n\r\n    INFO(f\"输入议题名称:{issue_name}\")\r\n    # 输入议题名称\r\n    safe_send_keys((By.XPATH, f\"(//input[@placeholder='请输入会议议题'])[1]\"), f\"{issue_name}\", wd)\r\n\r\n    # 选择议题文件进行上传\r\n    INFO(\"点击【上传文件】按钮\")\r\n    safe_click((By.XPATH, f\"(//div[@class='topicsHandleButton uploadFile'][contains(text(),'上传文件(0)')])[1]\"), wd)\r\n    sleep(2)\r\n\r\n    for i in range(issue_num):\r\n        if not os.path.exists(issue_file_path[i]):\r\n            INFO(f\"文件 {issue_file_path[i]} 不存在,跳出函数\")\r\n            return\r\n\r\n        # INFO(\"定位【选择文件】按钮\")\r\n        upload_button = wd.find_element(By.XPATH, '//*[@id=\"global-uploader-btn\"]/input')\r\n        sleep(2)\r\n        # INFO(f\"元素定位:{upload_button}\")\r\n        # INFO(\"选择议题文件上传\")\r\n        upload_button.send_keys(issue_file_path[i])\r\n        # INFO(f\"第{i+1}个议题文件上传完成\")\r\n        sleep(15)\r\n\r\n    SELENIUM_LOG_SCREEN(wd, \"50%\", \"Exhibit_Inspect\", \"Meeting_Message\", \"添加议题文件\")\r\n    # 点击【确定】按钮\r\n    safe_click((By.XPATH, \"//div[@aria-label='会议文件上传']//div[@class='el-dialog__footer']//div//span[contains(text(),'确定')]\"), wd)\r\n    sleep(2)\r\n\r\ndef input_clear(element_locator, wd):\r\n    \"\"\"\r\n    清空输入框中的文本。\r\n\r\n    该函数通过元素定位器找到指定的输入框,并尝试清空其内容。它使用显式等待来确保元素在尝试清除之前是可见的。\r\n\r\n    参数:\r\n    - element_locator: 用于定位输入框的元素定位器,通常是一个元组,包含定位方法和定位表达式。\r\n    - wd: WebDriver实例,用于与浏览器交互。\r\n\r\n    异常处理:\r\n    - TimeoutException: 如果在指定时间内元素不可见,则捕获此异常并打印超时异常消息。\r\n    - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的消息。\r\n    - ElementNotInteractableException: 如果元素不可操作(例如,元素不可见或不可点击),则捕获此异常并打印相应消息。\r\n    \"\"\"\r\n    try:\r\n        # 等待元素可见,并在可见后清空输入框。\r\n        input_element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))\r\n        input_element.clear()\r\n    except TimeoutException:\r\n        # 如果元素在20秒内不可见,打印超时异常消息。\r\n        print(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n    except NoSuchElementException:\r\n        # 如果找不到元素,打印未找到元素的消息。\r\n        print(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n    except ElementNotInteractableException:\r\n        # 如果元素不可操作,打印元素不可操作的消息。\r\n        print(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\ndef send_keyboard(element_locator, wd):\r\n    \"\"\"\r\n    向指定元素发送键盘事件。\r\n\r\n    该函数尝试找到页面上的一个元素,并向其发送RETURN键点击事件。它使用显式等待来确保元素在尝试交互之前是可见的。\r\n    如果在指定时间内元素不可见、不存在或不可交互,则捕获相应的异常并打印错误消息。\r\n\r\n    参数:\r\n    - element_locator: 用于定位元素的元组,格式为(by, value)。例如,(By.ID, 'element_id')。\r\n    - wd: WebDriver实例,用于与浏览器进行交互。\r\n\r\n    异常处理:\r\n    - TimeoutException: 如果元素在20秒内不可见,则捕获此异常并打印超时错误消息。\r\n    - NoSuchElementException: 如果找不到指定的元素,则捕获此异常并打印未找到元素的错误消息。\r\n    - ElementNotInteractableException: 如果元素不可交互(例如,被遮挡或不可点击),则捕获此异常并打印相应错误消息。\r\n    \"\"\"\r\n    try:\r\n        # 等待元素可见,并在可见后向其发送RETURN键点击事件。\r\n        element = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator))\r\n        element.send_keys(Keys.RETURN)\r\n    except TimeoutException:\r\n        # 如果元素在指定时间内不可见,打印超时错误消息。\r\n        print(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n    except NoSuchElementException:\r\n        # 如果找不到指定的元素,打印未找到元素的错误消息。\r\n        print(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n    except ElementNotInteractableException:\r\n        # 如果元素不可交互,打印不可交互错误消息。\r\n        print(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\ndef get_notify_text(wd,element_locator,module_name,function_name,name):\r\n    \"\"\"\r\n    获取通知文本信息。\r\n\r\n    该函数通过WebDriver等待特定的元素出现,并截取其文本内容作为通知信息。此外,它还负责在获取通知文本后进行屏幕截图,\r\n    以便于后续的调试或记录需要。\r\n\r\n    参数:\r\n    wd (WebDriver): 由上层传入的WebDriver对象,用于操作浏览器。\r\n\r\n    返回:\r\n    str: 提取的通知文本信息。如果未能提取到信息或发生异常,则返回None。\r\n    \"\"\"\r\n    try:\r\n        # 获取提示信息\r\n        notify_text = WebDriverWait(wd, 60).until(\r\n            EC.presence_of_element_located(element_locator)\r\n        ).text\r\n        # 屏幕截图\r\n        SELENIUM_LOG_SCREEN(wd,\"50%\",module_name,function_name,name)\r\n        return notify_text\r\n    except Exception as e:\r\n        # 记录异常信息\r\n        INFO(f\"Exception occurred: {e}\")\r\n\r\ndef elment_get_text(element_locator, wd):\r\n    \"\"\"\r\n    获取页面元素的文本。\r\n\r\n    该函数通过显式等待的方式,确保页面元素在20秒内可见并获取其文本。\r\n    如果在规定时间内元素不可见、不存在或不可交互,则会捕获相应的异常并打印错误信息。\r\n\r\n    参数:\r\n    - element_locator: 用于定位页面元素的元组,通常包含定位方式和定位值。\r\n    - wd: WebDriver对象,用于与浏览器进行交互。\r\n\r\n    返回:\r\n    - element_text: 页面元素的文本。如果发生异常,则返回None。\r\n    \"\"\"\r\n    try:\r\n        # 使用WebDriverWait等待页面元素在20秒内可见,并获取其文本。\r\n        element_text = WebDriverWait(wd, 60).until(EC.visibility_of_element_located(element_locator)).text\r\n        return element_text\r\n    except TimeoutException:\r\n        # 如果超过20秒元素仍未可见,则捕获TimeoutException异常并打印错误信息。\r\n        print(f\"TimeoutException: Element {element_locator} not found or not clickable within 20 seconds.\")\r\n    except NoSuchElementException:\r\n        # 如果页面上不存在该元素,则捕获NoSuchElementException异常并打印错误信息。\r\n        print(f\"NoSuchElementException: Element {element_locator} not found.\")\r\n    except ElementNotInteractableException:\r\n        # 如果元素存在但不可交互(例如被遮挡),则捕获ElementNotInteractableException异常并打印错误信息。\r\n        print(f\"ElementNotInteractableException: Element {element_locator} is not interactable.\")\r\n\r\ndef read_csv_data(csv_file_path):\r\n    \"\"\"\r\n    读取CSV文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。\r\n\r\n    参数:\r\n    csv_file_path (str): CSV文件的路径。\r\n\r\n    返回:\r\n    list: 包含字典的列表,每个字典包含测试用例的名称和参数。\r\n    \"\"\"\r\n    # 打开CSV文件,使用只读模式,确保兼容性并处理编码\r\n    with open(csv_file_path, mode='r', newline='', encoding='utf-8') as file:\r\n        # 创建CSV阅读器\r\n        reader = csv.reader(file)\r\n        # 读取表头,为后续数据解析做准备\r\n        headers = next(reader)\r\n        ddt_cases = []\r\n        # 遍历CSV文件中的每一行数据\r\n        for row in reader:\r\n            # 将每一行数据转换为字典,其中包含测试用例的名称和参数\r\n            case = {\r\n                'name': row[0],\r\n                'para': row[1:]\r\n            }\r\n            # 将转换后的测试用例数据添加到列表中\r\n            ddt_cases.append(case)\r\n    # 日志记录:CSV文件已读取\r\n    INFO(\"CSV文件已读取\")\r\n    # 返回包含所有测试用例数据的列表\r\n    return ddt_cases\r\n\r\nimport openpyxl\r\n\r\ndef read_xlsx_data(xlsx_file_path):\r\n    \"\"\"\r\n    读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。\r\n\r\n    参数:\r\n    xlsx_file_path (str): XLSX文件的路径。\r\n\r\n    返回:\r\n    list: 包含字典的列表,每个字典包含测试用例的名称和参数。\r\n    \"\"\"\r\n    # 打开XLSX文件\r\n    workbook = openpyxl.load_workbook(xlsx_file_path)\r\n    # 假设数据在第一个工作表中\r\n    sheet = workbook.active\r\n\r\n    # 读取表头,从第三行开始\r\n    headers = [cell.value for cell in sheet[3]]\r\n\r\n    # 打印表头列名\r\n    # print(f\"表头列名: {headers}\")\r\n\r\n    # 找到表头中名为 'JSON' 的列索引\r\n    try:\r\n        json_index = headers.index('JSON')\r\n    except ValueError as e:\r\n        raise ValueError(f\"表头中没有找到所需的列: {e}\")\r\n\r\n    ddt_cases = []\r\n    # 遍历XLSX文件中的每一行数据,从第四行开始\r\n    for row in sheet.iter_rows(min_row=4, values_only=True):\r\n        # 获取 JSON 列的数据\r\n        json_data = row[json_index]\r\n\r\n        # 打印 JSON 数据以进行调试\r\n        # print(f\"JSON 数据: {json_data}\")\r\n\r\n        # 解析 JSON 字符串\r\n        try:\r\n            if json_data:\r\n                parsed_json = json.loads(json_data)\r\n            else:\r\n                raise ValueError(\"JSON 数据为空\")\r\n        except json.JSONDecodeError:\r\n            raise ValueError(f\"无法解析 JSON 数据: {json_data}\")\r\n\r\n        # 将解析后的 JSON 数据添加到列表中\r\n        ddt_cases.append(parsed_json)\r\n\r\n    # 日志记录:XLSX文件已读取\r\n    INFO(\"XLSX文件已读取\")\r\n    # 返回包含所有测试用例数据的列表\r\n    return ddt_cases\r\n\r\ndef get_cpu_usage(interval=1):\r\n    \"\"\"\r\n    获取当前进程的 CPU 占用率。\r\n    :param interval: 计算 CPU 使用率的时间间隔(秒)\r\n    :return: 当前进程的 CPU 占用率(百分比)\r\n    \"\"\"\r\n    try:\r\n        process = psutil.Process(os.getpid())\r\n        cpu_usage = process.cpu_percent(interval=interval)\r\n        if isinstance(cpu_usage, (int, float)):\r\n            return cpu_usage\r\n        else:\r\n            logging.error(\"CPU 使用率数据类型不正确\")\r\n            return None\r\n    except psutil.NoSuchProcess:\r\n        logging.error(\"进程不存在\")\r\n        return None\r\n    except psutil.AccessDenied:\r\n        logging.error(\"权限不足\")\r\n        return None\r\n    except Exception as e:\r\n        logging.error(f\"未知错误: {e}\")\r\n        return None\r\n\r\ndef delete_images_in_directory(directory):\r\n    # 指定要删除的图片文件扩展名\r\n    image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff']\r\n\r\n    # 遍历目录中的所有文件\r\n    for filename in os.listdir(directory):\r\n        # 获取文件的完整路径\r\n        file_path = os.path.join(directory, filename)\r\n\r\n        # 检查文件是否为图片文件\r\n        if any(filename.lower().endswith(ext) for ext in image_extensions):\r\n            try:\r\n                # 删除文件\r\n                os.remove(file_path)\r\n                print(f\"已删除文件: {file_path}\")\r\n            except Exception as e:\r\n                print(f\"删除文件 {file_path} 时出错: {e}\")\r\n\r\ndef is_valid_password(password):\r\n    try:\r\n        # 基本类型检查\r\n        if not isinstance(password, str):\r\n            raise ValueError(\"Password must be a string\")\r\n\r\n        # 检查长度,密码至少需要11个字符\r\n        if len(password) < 11:\r\n            return False\r\n\r\n        # 使用正则表达式检查密码是否包含大小写字母和数字\r\n        if not re.match(r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\\d).{11,}$', password):\r\n            return False\r\n\r\n        # 检查连续3位及以上不重复且不连续组合\r\n        for i in range(len(password) - 2):\r\n            # 检查是否有连续3位相同\r\n            if password[i] == password[i + 1] == password[i + 2]:\r\n                return False\r\n\r\n            # 检查是否有连续3位是连续字符(如123, abc)\r\n            if abs(ord(password[i + 1]) - ord(password[i])) == 1 and abs(ord(password[i + 2]) - ord(password[i + 1])) == 1:\r\n                return False\r\n\r\n        return True\r\n    except Exception as e:\r\n        logging.error(f\"An error occurred: {e}\")\r\n        return False\r\n\r\ndef browser_quit():\r\n    \"\"\"\r\n    退出浏览器并释放资源。\r\n\r\n    该函数从全局存储中获取浏览器驱动实例,并调用其quit方法来关闭浏览器并释放相关资源。\r\n    \"\"\"\r\n    # 清除浏览器\r\n    INFO(\"清除浏览器\")\r\n    # 从全局存储中获取浏览器驱动实例\r\n    wd = GSTORE['wd']\r\n    # 调用浏览器驱动实例的quit方法,关闭浏览器并释放资源\r\n    wd.quit()\r\n\r\n\r\ndef get_latest_report_file(report_dir, base_url):\r\n    \"\"\"\r\n    获取指定目录下最新的HTML报告文件,并返回带有基础URL的完整路径。\r\n\r\n    :param report_dir: 报告文件所在的目录\r\n    :param base_url: 基础URL\r\n    :return: 最新的HTML报告文件的完整URL,如果没有找到则返回None\r\n    \"\"\"\r\n    # 记录调用此函数的日志\r\n    logging.info(\"开始调用get_latest_report_file函数获取报告文件\")\r\n\r\n    # 确保报告目录存在\r\n    if not os.path.exists(report_dir):\r\n        logging.error(f\"报告目录 {report_dir} 不存在。\")\r\n        return None\r\n\r\n    # 获取指定目录下所有符合模式的HTML报告文件\r\n    report_files = glob.glob(os.path.join(report_dir, 'report_*.html'))\r\n\r\n    # 打印找到的文件列表\r\n    logging.debug(f\"找到的报告文件: {report_files}\")\r\n\r\n    # 如果没有找到报告文件,记录警告信息并返回None\r\n    if not report_files:\r\n        logging.warning(\"在指定目录中没有找到报告文件。\")\r\n        return None\r\n\r\n    # 找到最新修改的报告文件\r\n    latest_file = max(report_files, key=os.path.getmtime)\r\n\r\n    # 获取最新报告文件的最后修改时间\r\n    last_modified_time = datetime.fromtimestamp(os.path.getmtime(latest_file)).strftime('%Y-%m-%d %H:%M:%S')\r\n\r\n    # 记录最新报告文件的信息\r\n    logging.info(f\"最新报告文件: {latest_file}, 最后修改时间: {last_modified_time}\")\r\n\r\n    # 将文件路径转换为相对于基础URL的相对路径\r\n    relative_path = os.path.relpath(latest_file, report_dir)\r\n\r\n    # 生成完整的URL\r\n    full_url = f\"{base_url}/{relative_path}\".replace(\"\\\\\", \"/\")\r\n\r\n    # 返回完整的URL\r\n    return full_url\r\n\r\ndef dingding_send_message(latest_report, title, mobile, ding_type):\r\n    \"\"\"\r\n    发送钉钉机器人消息\r\n    参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x\r\n\r\n    :param latest_report: 测试报告链接\r\n    :param title: 消息标题\r\n    :param text: 消息内容\r\n    :param mobile: 需要@的手机号列表\r\n    \"\"\"\r\n    # 记录调用此函数的日志\r\n    logging.info(\"开始构建并发送钉钉机器人消息\")\r\n    # 钉钉机器人的 Webhook URL 和密钥(正式环境)\r\n    # webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=b0eea0bbf097ce3badb4c832d2cd0267a50486f395ec8beca6e2042102bb295b'\r\n    # secret = 'SEC928b11659c5fd6476cfa2042edbf56da876abf759289f7e4d3c671fb9a81bf43'\r\n    # 钉钉机器人的 Webhook URL 和密钥(测试环境)\r\n\r\n    if ding_type ==  '标准版巡检':\r\n        webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'\r\n        secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'\r\n    elif ding_type == '展厅巡检':\r\n        webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=061b6e9b1ae436f356cfda7fe19b6e58e46b62670046a78bd3a4d869118c612d'\r\n        secret = 'SEC93212bd880aad638cc0df2b28a72ef4fdf6651cacb8a6a4bc71dcf09705d458d'\r\n\r\n    # 生成时间戳\r\n    timestamp = str(round(time.time() * 1000))\r\n\r\n    # 生成签名\r\n    secret_enc = secret.encode('utf-8')\r\n    string_to_sign = f'{timestamp}\\n{secret}'\r\n    string_to_sign_enc = string_to_sign.encode('utf-8')\r\n    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()\r\n    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))\r\n\r\n    # 构建最终的 Webhook URL\r\n    params = {\r\n        'access_token': webhook_url.split('=')[1],\r\n        'timestamp': timestamp,\r\n        'sign': sign\r\n    }\r\n    encoded_params = urllib.parse.urlencode(params)\r\n    final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}'\r\n\r\n    # 记录最终的 Webhook URL\r\n    logging.info(f\"钉钉机器人Webhook URL: {final_webhook_url}\")\r\n\r\n    # 调用测试结果获取函数\r\n    browser_init(\"标准版预定系统\")\r\n    wd = GSTORE['wd']\r\n    # print(latest_report)\r\n    test_result = get_test_result(latest_report, wd)\r\n    browser_quit()\r\n\r\n    # 构建消息体\r\n    headers = {'Content-Type': 'application/json'}\r\n    message = {\r\n        'msgtype': 'link',\r\n        'link': {\r\n            'title': title,\r\n            'messageUrl': latest_report,\r\n            'text': f\"通过:{test_result['pass_percent']}\" + f\"失败:{test_result['fail_percent']}\" + f\"异常:{test_result['exception_percent']}\",\r\n        },\r\n        \"at\": {\r\n            \"atMobiles\": [mobile],\r\n            \"isAtAll\": True\r\n        }\r\n    }\r\n\r\n    try:\r\n        # 发送 POST 请求\r\n        response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers)\r\n\r\n        # 检查响应状态码\r\n        if response.status_code == 200:\r\n            logging.info('消息发送成功!')\r\n            logging.info(f'响应内容: {response.text}')\r\n        else:\r\n            logging.error(f'消息发送失败,状态码: {response.status_code}')\r\n            logging.error(f'响应内容: {response.text}')\r\n    except requests.exceptions.RequestException as e:\r\n        logging.error(f'请求异常: {e}')\r\n\r\n\r\ndef run_automation_test(report_title, report_url_prefix, test_case , ding_type):\r\n    \"\"\"\r\n    运行自动化测试并生成报告。\r\n\r\n    参数:\r\n    - report_title: 报告的标题\r\n    - report_url_prefix: 报告URL的前缀\r\n    - test_case: 测试用例脚本执行的标签名\r\n    \"\"\"\r\n    # 记录测试开始的日志\r\n    logging.info(\"开始自动化测试...\")\r\n\r\n    # 构建运行测试命令\r\n    command = [\r\n        'hytest',\r\n        '--report_title', report_title,\r\n        '--report_url_prefix', report_url_prefix,\r\n        '--tag', test_case\r\n    ]\r\n\r\n    # 记录将要执行的命令日志\r\n    logging.info(f\"执行命令: {' '.join(command)}\")\r\n\r\n    try:\r\n        # 执行测试命令并获取结果\r\n        result = subprocess.run(command, capture_output=True, text=True, check=True)\r\n\r\n        # 记录命令的标准输出和错误输出\r\n        logging.debug(f\"命令标准输出: {result.stdout}\")\r\n        logging.debug(f\"命令错误输出: {result.stderr}\")\r\n    except subprocess.CalledProcessError as e:\r\n        # 处理子进程调用失败的异常\r\n        logging.error(f\"命令执行失败,返回码 {e.returncode}: {e.output}\")\r\n    except OSError as e:\r\n        # 处理操作系统相关的异常\r\n        logging.error(f\"发生操作系统错误: {e}\")\r\n    finally:\r\n        # 无论测试是否成功,都记录测试结束的日志\r\n        logging.info(\"自动化测试完成。\")\r\n\r\n        # 调用回调函数处理后续操作\r\n        get_reportfile_send_dingding(f\"{report_title}\", report_url_prefix, ding_type)\r\n\r\ndef get_reportfile_send_dingding(report_title, report_url_prefix, ding_type):\r\n    # print(GSTORE['case_pass'])\r\n    try:\r\n        # 获取报告文件所在的目录\r\n        report_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..','reports')\r\n\r\n        # 获取基础URLs\r\n        base_url = report_url_prefix\r\n\r\n        # 获取最新的报告文件\r\n        latest_report = get_latest_report_file(report_dir, base_url)\r\n\r\n        # 如果找到了最新的报告文件,则发送报告链接到钉钉\r\n        if latest_report:\r\n            logging.info(f\"最新报告文件URL: {latest_report}\")\r\n\r\n            try:\r\n                # 记录调用钉钉消息通知函数的日志\r\n                logging.info(\"开始调用钉钉消息通知函数\")\r\n\r\n                # 调用钉钉发送消息接口进行推送测试报告链接\r\n                dingding_send_message(latest_report, report_title, \"13724387318\", ding_type)\r\n\r\n                # 记录钉钉消息通知函数调用成功的日志\r\n                logging.info(\"钉钉消息通知函数调用成功\")\r\n            except Exception as e:\r\n                # 记录钉钉消息通知函数调用失败的日志\r\n                logging.error(f\"钉钉消息通知函数调用失败: {e}\")\r\n        else:\r\n            # 记录没有找到报告文件的日志\r\n            logging.warning(\"没有找到报告文件以发送。\")\r\n\r\n    except subprocess.CalledProcessError as e:\r\n        # 处理子进程调用失败的异常\r\n        logging.error(f\"命令执行失败,返回码 {e.returncode}: {e.output}\")\r\n    except OSError as e:\r\n        # 处理操作系统相关的异常\r\n        logging.error(f\"发生操作系统错误: {e}\")\r\n    finally:\r\n        # 无论是否成功,都记录测试结束的日志\r\n        logging.info(\"自动化测试完成。\")\r\n\r\n# if __name__ == '__main__':\r\n#     get_reportfile_send_dingding(\"测试\",\"http://192.168.1.166\")\r\n\r\nfrom selenium.webdriver.common.action_chains import ActionChains\r\nfrom selenium.webdriver.support.ui import WebDriverWait\r\nfrom selenium.webdriver.support import expected_conditions as EC\r\nfrom selenium.common.exceptions import TimeoutException, NoSuchElementException\r\nimport logging\r\n\r\ndef single_click_and_drag(source_element_locator, target_element_locator, wd):\r\n    \"\"\"\r\n    实现元素从source_element单击后拖拽到target_element的功能\r\n\r\n    :param wd: WebDriver实例\r\n    :param source_element_locator: 拖拽起始元素的定位器\r\n    :param target_element_locator: 拖拽目标元素的定位器\r\n    \"\"\"\r\n    try:\r\n        # 等待页面完全加载\r\n        sleep(3)\r\n\r\n        # 找到源元素和目标元素\r\n        source_element = WebDriverWait(wd, 120).until(EC.element_to_be_clickable(source_element_locator))\r\n        target_element = WebDriverWait(wd, 120).until(EC.element_to_be_clickable(target_element_locator))\r\n\r\n        # 使用ActionChains执行单击并拖拽操作\r\n        actions = ActionChains(wd)\r\n        actions.click_and_hold(source_element)  # 单击并按住源元素\r\n        actions.move_to_element(target_element)  # 移动到目标元素\r\n        actions.release()  # 释放鼠标\r\n        actions.perform()\r\n\r\n        logging.info(\"单击并拖拽操作成功\")\r\n    except TimeoutException as e:\r\n        logging.error(f\"元素查找超时: {e}\")\r\n    except NoSuchElementException as e:\r\n        logging.error(f\"元素未找到: {e}\")\r\n    except Exception as e:\r\n        logging.error(f\"发生未知错误: {e}\")\r\n\r\nimport requests\r\nimport os\r\nimport chardet\r\nfrom urllib3.exceptions import InsecureRequestWarning\r\n\r\n# 禁用 InsecureRequestWarning 警告\r\nrequests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)\r\n\r\ndef fetch_and_parse_check_txt(url, save_path, extract_info):\r\n    \"\"\"\r\n    获取check.txt文件并解析指定信息\r\n\r\n    :param url: check.txt文件的URL\r\n    :param save_path: 文件保存路径\r\n    :param extract_info: 需要提取的信息列表,例如 ['[m]ysql', '[r]edis', '[f]dfs_storaged', '[f]dfs_tracker', '[e]mqx', 'ubains-meeting-api-1.0-SNAPSHOT.jar', 'ubains-meeting-inner-api-1.0-SNAPSHOT.jar', 'uwsgi']\r\n    :return: 提取的信息字典\r\n    \"\"\"\r\n    try:\r\n        # 发送HTTPS请求获取文件内容\r\n        response = requests.get(url, verify=False)  # verify=False 忽略SSL证书验证,生产环境不推荐\r\n        response.raise_for_status()  # 如果响应状态码不是200,抛出异常\r\n\r\n        # 检测文件编码\r\n        detected_encoding = chardet.detect(response.content)['encoding']\r\n        logging.info(f\"检测到的编码: {detected_encoding}\")\r\n\r\n        # 如果检测到的编码为空或不准确,可以手动指定编码\r\n        if not detected_encoding or detected_encoding == 'ascii':\r\n            detected_encoding = 'utf-8'  # 假设文件编码为 utf-8\r\n\r\n        # 将响应内容解码为字符串\r\n        content = response.content.decode(detected_encoding)\r\n\r\n        # 将文件内容保存到指定目录\r\n        with open(save_path, 'w', encoding='utf-8') as file:\r\n            file.write(content)\r\n\r\n        # 解析文件内容\r\n        parsed_info = {}\r\n        for line in content.split('\\n'):\r\n            for info in extract_info:\r\n                if info in line and info not in parsed_info:\r\n                    service_name = info\r\n                    service_status = line.split(info, 1)[1].strip()\r\n                    parsed_info[service_name] = service_status\r\n                    break  # 找到后跳出内层循环,继续处理下一行\r\n\r\n        return parsed_info\r\n\r\n    except requests.exceptions.RequestException as e:\r\n        logging.exception(f\"请求错误: {e}\")\r\n        return None\r\n\r\nimport telnetlib\r\n\r\ndef check_service_status(host, port):\r\n    \"\"\"\r\n    检查服务状态。\r\n\r\n    通过尝试连接到指定的主机和端口来检查服务是否可用。\r\n\r\n    参数:\r\n    - host: 服务所在的主机地址。\r\n    - port: 服务监听的端口。\r\n\r\n    返回值:\r\n    无\r\n    \"\"\"\r\n    try:\r\n        # 创建Telnet对象并连接到服务器\r\n        tn = telnetlib.Telnet(host, port)\r\n        INFO(f\"成功连接到 {host}:{port}\")\r\n\r\n        # 可以在这里发送命令或读取响应\r\n        # 例如,读取服务器的欢迎信息\r\n        response = tn.read_until(b\"\\n\", timeout=5)\r\n        INFO(f\"服务器响应: {response.decode('ascii')}\")\r\n\r\n        # 关闭连接\r\n        tn.close()\r\n    except Exception as e:\r\n        INFO(f\"连接失败: {e}\")\r\n\r\n\r\ndef set_tx_meeting_id(element_locator, wd):\r\n    tx_meeting_id = elment_get_text(element_locator, wd)\r\n    GSTORE['tx_meeting_id'] = tx_meeting_id\r\n\r\ndef get_tx_meeting_id():\r\n    return GSTORE.get('tx_meeting_id')\r\n\r\ndef voice_device_register(app_id, app_secret, device_sn):\r\n    \"\"\"\r\n    注册语音设备。\r\n\r\n    向指定的API发送POST请求,以注册一个语音设备。需要提供应用的ID和密钥,以及设备的序列号。\r\n\r\n    参数:\r\n    app_id (str): 应用的唯一标识符。\r\n    app_secret (str): 应用的秘密密钥。\r\n    device_sn (str): 设备的序列号。\r\n\r\n    返回:\r\n    无直接返回值,但会记录请求结果到日志。\r\n    \"\"\"\r\n    # 构建请求体,包含注册所需的必要信息\r\n    body = {\r\n        \"app_id\": app_id,\r\n        \"app_secret\": app_secret,\r\n        \"device_sn\": device_sn\r\n    }\r\n\r\n    # 设置请求头,指定内容类型为JSON\r\n    headers = {\r\n        \"Content-Type\": \"application/json\"\r\n    }\r\n\r\n    try:\r\n        # 发送POST请求到注册URL,请求体序列化为JSON格式\r\n        response = requests.post(\"https://wdev.wmj.com.cn/deviceApi/register\", headers=headers, data=json.dumps(body))\r\n        # 检查HTTP响应状态码,确保请求成功\r\n        response.raise_for_status()\r\n        # 解析响应的JSON内容\r\n        response_json = response.json()\r\n        # 记录成功的请求日志\r\n        logging.info(\"请求成功: %s\", response_json)\r\n    except requests.exceptions.RequestException as e:\r\n        # 处理请求异常,记录错误日志\r\n        logging.error(\"请求失败: %s\", e)\r\n    except ValueError as e:\r\n        # 处理解析响应异常,记录错误日志\r\n        logging.error(\"解析响应失败: %s\", e)\r\n\r\n\r\nimport requests\r\n\r\n\r\ndef cloud_voice_setting(app_id, app_secret, device_sn):\r\n    \"\"\"\r\n    设置云语音功能。\r\n\r\n    :param app_id: 应用ID\r\n    :param app_secret: 应用密钥\r\n    :param device_sn: 设备序列号\r\n    :return: 服务器响应结果\r\n    \"\"\"\r\n    url = \"https://wdev.wmj.com.cn/deviceApi/send\"\r\n\r\n    # 写死的data参数\r\n    data = {\r\n        \"cmd_type\": \"setting\",\r\n        \"info\": {\r\n            \"volume\": 10,  # 0-9,音量由小到大,默认为中间值\r\n            \"speed\": 2,  # 0-9,语速由慢到快,默认为中间值正常语速\r\n            \"tone\": 4,  # 0-9,语调由低到高,默认为中间值正常语调\r\n            \"speaker\": 0 # 0为女生,支持中英文\r\n        }\r\n    }\r\n\r\n    # 构建请求体\r\n    payload = {\r\n        \"app_id\": app_id,\r\n        \"app_secret\": app_secret,\r\n        \"device_sn\": device_sn,\r\n        \"data\": data\r\n    }\r\n\r\n    # 发送POST请求\r\n    try:\r\n        response = requests.post(url, json=payload)\r\n        response.raise_for_status()  # 如果响应状态码不是200,抛出异常\r\n        logging.info(response.json())  # 打印响应的JSON数据\r\n    except requests.exceptions.RequestException as e:\r\n        logging.error({\"status\": \"error\", \"message\": str(e)})\r\n\r\n# 示例调用\r\n# if __name__ == \"__main__\":\r\n#     app_id = os.getenv(\"APP_ID\", \"a98a124c6c3252f6612fc544a0d0fa79\")\r\n#     app_secret = os.getenv(\"APP_SECRET\", \"88bc1ec4eba624f47b2200a4ce8c3852\")\r\n#     device_sn = os.getenv(\"DEVICE_SN\", \"W703BB44444\")\r\n#     cloud_voice_setting(app_id, app_secret, device_sn)\r\n\r\n\r\ndef play_cloud_voice(app_id, app_secret, device_sn):\r\n    \"\"\"\r\n    播放云语音功能。\r\n\r\n    本函数通过发送HTTP POST请求,触发远程语音设备播放指定的语音内容。\r\n    \"\"\"\r\n    # 注册设备\r\n    voice_device_register(app_id, app_secret, device_sn)\r\n    sleep(5)  # 可以考虑使用异步编程或非阻塞的方式替代\r\n\r\n    # 设置云喇叭的音量以及语速参数\r\n    cloud_voice_setting(app_id, app_secret, device_sn)\r\n    sleep(10)\r\n\r\n    # 定义请求URL\r\n    url = os.getenv(\"CLOUD_VOICE_API_URL\", \"https://wdev.wmj.com.cn/deviceApi/send\")\r\n\r\n    # 构建请求体,包括应用ID、应用密钥、设备序列号和语音播放指令\r\n    body = {\r\n        \"app_id\": app_id,\r\n        \"app_secret\": app_secret,\r\n        \"device_sn\": device_sn,\r\n        \"data\": {\r\n            \"cmd_type\": \"play\",\r\n            \"info\": {\r\n                \"tts\": \"一、二、三、四、五、六、七、八、九、十        一、二、三、四、五、六、七、八、九、十        一、二、三、四、五、六、七、八、九、十、一、二、三、四、五、六、七、八、九、十\",\r\n                \"inner\": 10,  # wifi版特有\r\n                \"volume\": 5  # 4G版本1-7,wifi版1-10\r\n            }\r\n        }\r\n    }\r\n\r\n    # 设置请求头,指定内容类型为JSON\r\n    headers = {\r\n        \"Content-Type\": \"application/json\"\r\n    }\r\n\r\n    try:\r\n        # 发送POST请求\r\n        response = requests.post(url, headers=headers, data=json.dumps(body))\r\n\r\n        # 根据响应状态码判断请求是否成功\r\n        if response.status_code == 200:\r\n            logging.info(f\"请求成功: {response.json()}\")\r\n        else:\r\n            logging.error(f\"请求失败: 状态码 {response.status_code}, 响应内容 {response.text}\")\r\n\r\n    except requests.exceptions.RequestException as e:\r\n        logging.error(f\"请求过程中发生异常: {e}\")\r\n    except json.JSONDecodeError as e:\r\n        logging.error(f\"JSON解析失败: {e}\")\r\n    except Exception as e:\r\n        logging.error(f\"发生未知异常: {e}\")\r\n\r\n# # 示例调用\r\n# if __name__ == \"__main__\":\r\n#     app_id = os.getenv(\"APP_ID\", \"a98a124c6c3252f6612fc544a0d0fa79\")\r\n#     app_secret = os.getenv(\"APP_SECRET\", \"88bc1ec4eba624f47b2200a4ce8c3852\")\r\n#     device_sn = os.getenv(\"DEVICE_SN\", \"W703BB44444\")\r\n#     play_cloud_voice(app_id, app_secret, device_sn)\r\n\r\nimport logging\r\nimport re\r\nfrom selenium.webdriver.common.by import By\r\n\r\ndef get_test_result(latest_report, wd):\r\n    \"\"\"\r\n    获取测试结果页面的通过率、失败率和异常率\r\n\r\n    :param latest_report: 测试结果页面的URL\r\n    :param wd: WebDriver实例,用于访问和操作网页\r\n    :return: 包含通过率、失败率和异常率的字典\r\n    \"\"\"\r\n    # 初始化测试结果字典\r\n    test_result = {\r\n        \"pass_percent\": \"\",\r\n        \"fail_percent\": \"\",\r\n        \"exception_percent\": \"\"\r\n    }\r\n\r\n    # 访问测试结果页面\r\n    wd.get(latest_report)\r\n    sleep(5)\r\n\r\n    # 点击简略显示\r\n    safe_click((By.XPATH,\"//div[@id='display_mode']\"), wd)\r\n    sleep(5)\r\n\r\n    # 定义一个函数来获取和解析百分比\r\n    def get_percentage(selector, wd):\r\n        text = elment_get_text(selector, wd)\r\n        logging.info(f\"获取的文本:{text}\")\r\n        match = re.search(r'(\\d+(\\.\\d+)?)%', text)\r\n        if match:\r\n            return match.group(0)\r\n        else:\r\n            logging.error(f\"未找到百分比匹配项,文本内容: {text}\")\r\n            return \"0\"\r\n\r\n    # 获取通过率\r\n    pass_percent = get_percentage((By.CSS_SELECTOR, \"div[class='result_barchart'] div:nth-child(1) span:nth-child(1)\"), wd)\r\n    test_result[\"pass_percent\"] = pass_percent\r\n\r\n    # 获取失败率\r\n    fail_percent = get_percentage(\r\n        (By.CSS_SELECTOR, \"body > div.main_section > div.result > div > div:nth-child(2) > span\"), wd)\r\n    test_result[\"fail_percent\"] = fail_percent\r\n\r\n    # 获取异常率\r\n    exception_percent = get_percentage(\r\n        (By.CSS_SELECTOR, \"body > div.main_section > div.result > div > div:nth-child(3) > span\"), wd)\r\n    test_result[\"exception_percent\"] = exception_percent\r\n    # 输出test_result\r\n    logging.info(test_result)\r\n    print(test_result)\r\n    sleep(5)\r\n\r\n    # 返回测试结果字典\r\n    return test_result\r\n\r\n# if __name__ == \"__main__\":\r\n#     browser_init(\"展厅预定巡检\")\r\n#     wd = GSTORE['wd']\r\n#     test_result = get_test_result(\"http://nat.ubainsyun.com:31134/report_20250217_094401.html\",wd)\r\n#     print(test_result)\r\n\r\nimport socket\r\nimport re\r\nimport subprocess\r\n\r\n\r\n# 获取本机IP地址\r\ndef get_local_ip():\r\n    try:\r\n        # 创建一个UDP套接字\r\n        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r\n        # 连接到一个公共的IP地址和端口\r\n        sock.connect((\"8.8.8.8\", 80))\r\n        # 获取本地IP地址\r\n        local_ip = sock.getsockname()[0]\r\n    finally:\r\n        sock.close()\r\n    return local_ip\r\n\r\n\r\nimport yaml\r\nimport logging\r\nimport socket\r\nimport subprocess\r\n\r\n# 获取本机IP地址\r\ndef get_local_ip():\r\n    try:\r\n        # 创建一个UDP套接字\r\n        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r\n        # 连接到一个公共的IP地址和端口\r\n        sock.connect((\"8.8.8.8\", 80))\r\n        # 获取本地IP地址\r\n        local_ip = sock.getsockname()[0]\r\n    finally:\r\n        sock.close()\r\n    return local_ip\r\n\r\n# 更新ngrok.cfg文件中的IP地址\r\ndef update_ngrok_config(config_path, new_ip):\r\n    try:\r\n        with open(config_path, 'r', encoding='utf-8') as file:\r\n            config = yaml.safe_load(file)\r\n\r\n        # 更新IP地址\r\n        config['tunnels']['nat1']['proto']['tcp'] = f\"{new_ip}:80\"\r\n\r\n        with open(config_path, 'w', encoding='utf-8') as file:\r\n            yaml.safe_dump(config, file, default_flow_style=False, allow_unicode=True)\r\n\r\n        logging.info(f\"ngrok.cfg 更新成功,新的IP地址为: {new_ip}\")\r\n    except Exception as e:\r\n        logging.error(f\"更新ngrok.cfg文件时出错: {e}\")\r\n\r\n# 启动ngrok\r\ndef start_ngrok(ngrok_path, config_path):\r\n    try:\r\n        # 终止已运行的ngrok进程\r\n        kill_ngrok()\r\n\r\n        command = [ngrok_path, '-config', config_path, 'start', 'nat1']\r\n        subprocess.Popen(command, shell=True)\r\n        logging.info(f\"ngrok 启动成功\")\r\n    except Exception as e:\r\n        logging.error(f\"启动ngrok时出错: {e}\")\r\n\r\ndef kill_ngrok():\r\n    try:\r\n        # 使用 taskkill 命令终止所有 ngrok 进程\r\n        subprocess.run(['taskkill', '/F', '/IM', 'ngrok.exe'], check=True)\r\n        logging.info(\"终止所有 ngrok 进程成功\")\r\n    except subprocess.CalledProcessError as e:\r\n        logging.info(\"没有找到 ngrok 进程\")\r\n    except Exception as e:\r\n        logging.error(f\"终止 ngrok 进程时出错: {e}\")\r\n\r\n# if __name__ == '__main__':\r\n#     logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\r\n#\r\n#     # 获取本机IP地址\r\n#     local_ip = get_local_ip()\r\n#     logging.info(f\"本机IP地址: {local_ip}\")\r\n#\r\n#     # 更新ngrok.cfg文件\r\n#     ngrok_config_path = r'D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\ngrok\\ngrok-调试主机\\ngrok.cfg'\r\n#     update_ngrok_config(ngrok_config_path, local_ip)\r\n#\r\n#     # 启动ngrok\r\n#     ngrok_path = r'D:\\GithubData\\自动化\\ubains-module-test\\预定系统\\ngrok\\ngrok-调试主机\\ngrok.exe'\r\n#     start_ngrok(ngrok_path, ngrok_config_path)\r\n\r\n# # 定义执行终端命令的函数\r\n# def run_http_server():\r\n#     try:\r\n#         # 构建命令\r\n#         command = [\r\n#             'cd', r'D:\\GithubData\\自动化\\ubains-module-test\\预定系统',\r\n#             '&&', 'python', '-m', 'http.server', '81', '--directory', 'reports'\r\n#         ]\r\n#\r\n#         # 运行命令\r\n#         process = subprocess.Popen(command, shell=True)\r\n#         logging.info(\"HTTP 服务器启动成功\")\r\n#     except Exception as e:\r\n#         logging.error(f\"启动HTTP服务器时出错: {e}\")\r\n#\r\n# if __name__ == '__main__':\r\n#     logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\r\n#     run_http_server()\r\n\r\ndef user_login(element_locators, username, password, verifycode):\r\n    \"\"\"\r\n    管理员登录函数。\r\n    该函数通过模拟用户输入用户名、密码和验证码,并点击登录按钮,以实现管理员登录。\r\n    \"\"\"\r\n    # 获取webdriver实例\r\n    wd = GSTORE['wd']\r\n\r\n    # 打印用户名输入信息\r\n    INFO(f\"输入用户名:{username}\")\r\n    # 向用户名输入框发送用户名\r\n    safe_send_keys(element_locators['username_locator'], f'{username}', wd)\r\n    sleep(5)\r\n\r\n    # 打印密码输入信息\r\n    INFO(f\"输入密码:{password}\")\r\n    # 向密码输入框发送密码\r\n    safe_send_keys(element_locators['password_locator'], f\"{password}\", wd)\r\n    sleep(5)\r\n\r\n    # 打印验证码输入信息\r\n    INFO(f\"输入验证码:{verifycode}\")\r\n    # 向验证码输入验证码\r\n    safe_send_keys(element_locators['verifycode_locator'], f\"{verifycode}\", wd)\r\n    sleep(5)\r\n\r\n    #点击登录按钮\r\n    INFO(\"点击登录按钮\")\r\n    safe_click(element_locators['submitButton_locator'], wd)\r\n    sleep(5)\r\n\r\ndef read_xlsx_data(xlsx_file_path):\r\n    \"\"\"\r\n    读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。\r\n\r\n    参数:\r\n    xlsx_file_path (str): XLSX文件的路径。\r\n\r\n    返回:\r\n    list: 包含字典的列表,每个字典包含测试用例的名称和参数。\r\n    \"\"\"\r\n    # 打开XLSX文件\r\n    workbook = openpyxl.load_workbook(xlsx_file_path)\r\n    # 假设数据在第一个工作表中\r\n    sheet = workbook.active\r\n\r\n    # 读取表头,从第三行开始\r\n    headers = [cell.value for cell in sheet[3]]\r\n\r\n    # 打印表头列名\r\n    # print(f\"表头列名: {headers}\")\r\n\r\n    # 找到表头中名为 'JSON' 的列索引\r\n    try:\r\n        json_index = headers.index('JSON')\r\n    except ValueError as e:\r\n        raise ValueError(f\"表头中没有找到所需的列: {e}\")\r\n\r\n    ddt_cases = []\r\n    # 遍历XLSX文件中的每一行数据,从第四行开始\r\n    for row in sheet.iter_rows(min_row=4, values_only=True):\r\n        # 获取 JSON 列的数据\r\n        json_data = row[json_index]\r\n\r\n        # 打印 JSON 数据以进行调试\r\n        # print(f\"JSON 数据: {json_data}\")\r\n\r\n        # 解析 JSON 字符串\r\n        try:\r\n            if json_data:\r\n                parsed_json = json.loads(json_data)\r\n            else:\r\n                raise ValueError(\"JSON 数据为空\")\r\n        except json.JSONDecodeError:\r\n            raise ValueError(f\"无法解析 JSON 数据: {json_data}\")\r\n\r\n        # 将解析后的 JSON 数据添加到列表中\r\n        ddt_cases.append(parsed_json)\r\n\r\n    # 日志记录:XLSX文件已读取\r\n    INFO(\"XLSX文件已读取\")\r\n    # 返回包含所有测试用例数据的列表\r\n    return ddt_cases\r\n\r\ndef get_by_enum(type_str):\r\n    \"\"\"\r\n    将字符串类型的定位器类型转换为 selenium.webdriver.common.by.By 枚举类型。\r\n\r\n    参数:\r\n    type_str (str): 定位器类型字符串,例如 'XPATH'。\r\n\r\n    返回:\r\n    selenium.webdriver.common.by.By: 对应的 By 枚举类型。\r\n    \"\"\"\r\n    type_str = type_str.upper()\r\n    if type_str == 'XPATH':\r\n        return By.XPATH\r\n    elif type_str == 'ID':\r\n        return By.ID\r\n    elif type_str == 'NAME':\r\n        return By.NAME\r\n    elif type_str == 'CLASS_NAME':\r\n        return By.CLASS_NAME\r\n    elif type_str == 'CSS_SELECTOR':\r\n        return By.CSS_SELECTOR\r\n    elif type_str == 'TAG_NAME':\r\n        return By.TAG_NAME\r\n    elif type_str == 'LINK_TEXT':\r\n        return By.LINK_TEXT\r\n    elif type_str == 'PARTIAL_LINK_TEXT':\r\n        return By.PARTIAL_LINK_TEXT\r\n    else:\r\n        raise ValueError(f\"未知的定位器类型: {type_str}\")
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/预定系统/Base/base.py b/预定系统/Base/base.py
--- a/预定系统/Base/base.py	(revision 6cedec8cd6072a6338b10f2314cd65d7c35c57f7)
+++ b/预定系统/Base/base.py	(date 1741230068869)
@@ -909,7 +909,6 @@
     except Exception as e:
         INFO(f"连接失败: {e}")
 
-
 def set_tx_meeting_id(element_locator, wd):
     tx_meeting_id = elment_get_text(element_locator, wd)
     GSTORE['tx_meeting_id'] = tx_meeting_id
diff --git a/运维集控/项目测试/运维标准版/cases/08设备管理/02编辑设备.py b/运维集控/项目测试/运维标准版/cases/08设备管理/02编辑设备.py
new file mode 100644
diff --git a/运维集控/项目测试/运维标准版/cases/08设备管理/04删除设备.py b/运维集控/项目测试/运维标准版/cases/08设备管理/04删除设备.py
new file mode 100644
diff --git a/运维集控/项目测试/运维标准版/cases/08设备管理/03查询设备.py b/运维集控/项目测试/运维标准版/cases/08设备管理/03查询设备.py
new file mode 100644
diff --git a/运维集控/项目测试/运维标准版/cases/07类型标签/03删除标签.py b/运维集控/项目测试/运维标准版/cases/07类型标签/03删除标签.py
new file mode 100644
diff --git a/运维集控/项目测试/运维标准版/cases/07类型标签/02编辑标签.py b/运维集控/项目测试/运维标准版/cases/07类型标签/02编辑标签.py
new file mode 100644