from appium.webdriver.common.appiumby import AppiumBy from time import sleep from appium.options.android import UiAutomator2Options from django.db.models.fields import return_None from hytest import * from pywinauto.mouse import click from selenium import webdriver # 创建一个函数,用于初始化Appium驱动程序 def app_setup_driver(platformName, platformVersion, deviceName, appPackage, appActivity, udid): """ 根据提供的参数设置 Appium 驱动程序。 参数: - platformName: 操作系统名称 - platformVersion: 操作系统版本 - deviceName: 设备名称 - appPackage: 应用程序包名 - appActivity: 应用程序活动名 - udid: 设备唯一标识符 返回: - driver: Appium 驱动程序对象 """ # 定义设备和应用的相关参数,以便 Appium 能够识别和控制设备 desired_caps = { 'platformName': platformName, # 被测手机是安卓 'platformVersion': platformVersion, # 手机安卓版本,如果是鸿蒙系统,依次尝试 12、11、10 这些版本号 'deviceName': deviceName, # 设备名,安卓手机可以随意填写 'appPackage': appPackage, # 启动APP Package名称 'appActivity': appActivity, # 启动Activity名称 'unicodeKeyboard': True, # 自动化需要输入中文时填True 'resetKeyboard': True, # 执行完程序恢复原来输入法 'noReset': True, # 不要重置App 'newCommandTimeout': 6000, 'automationName': 'UiAutomator2', 'skipUnlock': True, 'autoGrantPermissions': True, 'udid':udid } # 记录 desired_caps 参数信息 logging.info(f"desired_caps参数:{desired_caps}") try: # 记录初始化 Appium 驱动程序的过程 logging.info("正在初始化 Appium 驱动程序...") # 创建 Appium 驱动程序对象 driver = webdriver.Remote('http://localhost:4723/wd/hub', options=UiAutomator2Options().load_capabilities(desired_caps)) # 记录 Appium 驱动程序初始化成功 logging.info("Appium 驱动程序初始化成功。") # 返回 Appium 驱动程序对象 return driver except Exception as e: # 记录初始化驱动程序失败的错误信息 logging.error(f"初始化驱动程序失败: {e}") # 重新抛出异常 raise # 封装滑动操作 def swipe_up(app_driver): """ 在应用程序中执行上滑操作。 参数: - app_driver: 应用程序的驱动对象,用于与设备交互。 返回值: 无 """ # 获取屏幕尺寸 size = app_driver.get_window_size() # 计算滑动的起始和结束坐标 start_x = size['width'] // 2 start_y = int(size['height'] * 0.2) # 起始y坐标,屏幕高度的20% end_x = start_x end_y = int(size['height'] * 0.8) # 结束y坐标,屏幕高度的80% # 执行滑动操作 app_driver.swipe(start_x, start_y, end_x, end_y, duration=500) # 图片亮度对比函数 # 请使用“pip install opencv-python -i https://pypi.tuna.tsinghua.edu.cn/simple”安装PIL库 from PIL import Image import numpy as np import os import logging def compare_brightness(light_down_path, light_on_path, threshold=1): """ 对比两张图片的亮度,返回亮度是否增加的布尔值。 light_on_path:传入暗色的图片 light_down_path:传入亮色的图片 threshold:亮度变化的阈值,默认为1 """ try: # 打开图片并转换为灰度图像,以便后续处理 image1 = Image.open(light_down_path).convert('L') # 转换为灰度图像 image2 = Image.open(light_on_path).convert('L') # 转换为灰度图像 # 将图像转换为numpy数组,便于计算 array1 = np.array(image1) array2 = np.array(image2) # 计算两张图片的平均亮度 avg_brightness1 = np.mean(array1) avg_brightness2 = np.mean(array2) # 记录日志,输出两张图片的平均亮度 logging.info(f"关闭灯光时的平均亮度: {avg_brightness1}") logging.info(f"打开灯光时的平均亮度: {avg_brightness2}") # 计算亮度变化量 brightness_increase = avg_brightness2 - avg_brightness1 # 记录日志,输出亮度变化量 logging.info(f"亮度变化量: {brightness_increase}") # 判断亮度变化量是否超过阈值 return brightness_increase > threshold except Exception as e: # 异常处理,记录错误日志 logging.error(f"对比亮度时发生错误: {e}", exc_info=True) return False # 调用示例 # if __name__ == '__main__': # logging.info("开始对比亮度") # # image1_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\Base\captured_frame2.jpg' # image2_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\Base\captured_frame.jpg' # # # 检查图片路径是否存在 # if not os.path.exists(image1_path): # logging.error(f"图片 {image1_path} 不存在") # exit(1) # if not os.path.exists(image2_path): # logging.error(f"图片 {image2_path} 不存在") # exit(1) # # # 对比两张截图的亮度 # result = compare_brightness(image1_path, image2_path) # logging.info(f"亮度比较结果: {result}") # # if result: # logging.info("灯光已成功打开") # else: # logging.error("灯光未成功打开") # 提取特征点并比较图片函数 # 请使用“pip install opencv-python”安装cv2库 import cv2 import logging from PIL import Image import numpy as np import os def compare_images_feature_matching(image1_path, image2_path): """ 比较两张图片是否相同 使用特征匹配的方法比较两张图片是否相同。首先验证图片路径和格式,然后根据图片尺寸进行调整, 最后转换为numpy数组进行比较。 参数: image1_path (str): 第一张图片的路径 image2_path (str): 第二张图片的路径 返回: dict: 包含比较结果和可能的错误信息 """ try: # 验证图片路径是否存在且为有效图片文件 if not os.path.isfile(image1_path) or not os.path.isfile(image2_path): logging.error("图片路径无效") return {"result": False, "error": "图片路径无效"} # 打开两张图片 img1 = Image.open(image1_path) img2 = Image.open(image2_path) # 验证图片是否为有效格式 if img1.format not in ['JPEG', 'PNG'] or img2.format not in ['JPEG', 'PNG']: logging.error("图片格式无效") return {"result": False, "error": "图片格式无效"} # 如果尺寸不同,先调整大小 if img1.size != img2.size: logging.info("图片尺寸不同,调整为相同尺寸进行比较") img2 = img2.resize(img1.size, Image.ANTIALIAS) # 保持纵横比 # 将图片转换为相同的模式 img1 = img1.convert("RGB") img2 = img2.convert("RGB") # 转换为 numpy 数组进行比较 img1_array = np.array(img1) img2_array = np.array(img2) # 输出numpy数组信息 logging.info(f"图片1数组: {img1_array},图片2数组: {img2_array}") # 比较两个数组是否相同 return {"result": np.array_equal(img1_array, img2_array), "error": None} except FileNotFoundError as e: logging.error(f"文件未找到: {e}") return {"result": False, "error": f"文件未找到: {e}"} except OSError as e: logging.error(f"图片处理错误: {e}") return {"result": False, "error": f"图片处理错误: {e}"} except MemoryError as e: logging.error(f"内存不足: {e}") return {"result": False, "error": f"内存不足: {e}"} except Exception as e: logging.error(f"未知错误: {e}") return {"result": False, "error": f"未知错误: {e}"} # 示例调用 # if __name__ == '__main__': # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # logging.info("开始对比图片") # # image1_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\No_PaperLess\DeviceA-ShareScreen.png' # image2_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\No_PaperLess\同屏前-无纸化设备B界面截屏.png' # # if not os.path.exists(image1_path): # logging.error(f"图片 {image1_path} 不存在") # exit(1) # if not os.path.exists(image2_path): # logging.error(f"图片 {image2_path} 不存在") # exit(1) # # # 对比两张截图的相似度 # if compare_images_feature_matching(image1_path, image2_path): # logging.info("图片相同") # else: # logging.error("图片不同") # 计算直方图相似度函数 import cv2 import numpy from PIL import Image import logging import os def calculate(image1, image2): """ 计算两张图片的直方图重合度 通过将图片转换为BGR格式,并计算每张图片的蓝色通道直方图,然后比较这两个直方图的重合度来评估图片的相似度 参数: image1: 第一张图片,应为RGB格式 image2: 第二张图片,应为RGB格式 返回值: 返回两张图片直方图的重合度,范围在0到1之间,1表示完全重合,即图片高度相似 """ image1 = cv2.cvtColor(numpy.asarray(image1), cv2.COLOR_RGB2BGR) image2 = cv2.cvtColor(numpy.asarray(image2), cv2.COLOR_RGB2BGR) hist1 = cv2.calcHist([image1], [0], None, [256], [0.0, 255.0]) hist2 = cv2.calcHist([image2], [0], None, [256], [0.0, 255.0]) # 计算直方图的重合度 degree = 0 for i in range(len(hist1)): if hist1[i] != hist2[i]: degree = degree + (1 - abs(hist1[i] - hist2[i]) / max(hist1[i], hist2[i])) else: degree = degree + 1 degree = degree / len(hist1) return degree # 图片相似性对比函数 def classify_hist_with_split(image1, image2, size=(256, 256)): """ 根据两张图片的RGB直方图比较它们的相似性。 参数: image1: 第一张图片的路径。 image2: 第二张图片的路径。 size: 将图片调整到的统一尺寸,默认为(256, 256)。 返回: 两张图片的相似度,值越小表示两张图片越相似。 """ # 打开图片文件 image1 = Image.open(image1) image2 = Image.open(image2) # 将PIL图像转换为OpenCV格式(BGR) image1 = cv2.cvtColor(numpy.asarray(image1), cv2.COLOR_RGB2BGR) image2 = cv2.cvtColor(numpy.asarray(image2), cv2.COLOR_RGB2BGR) # 调整图片尺寸,以确保比较是在相同尺寸下进行 image1 = cv2.resize(image1, size) image2 = cv2.resize(image2, size) # 分离图片的RGB通道 sub_image1 = cv2.split(image1) sub_image2 = cv2.split(image2) sub_data = 0 # 遍历每个通道,计算并累加相似度 for im1, im2 in zip(sub_image1, sub_image2): sub_data += calculate(im1, im2) # 计算平均相似度 sub_data = sub_data / 3 # 返回最终的相似度结果 return sub_data # 示例调用 # if __name__ == '__main__': # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # logging.info("开始对比图片") # # image1_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\No_PaperLess\同屏后-无纸化设备A界面截屏.png' # image2_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\No_PaperLess\同屏后-无纸化设备B界面截屏.png' # # if not os.path.exists(image1_path): # logging.error(f"图片 {image1_path} 不存在") # exit(1) # if not os.path.exists(image2_path): # logging.error(f"图片 {image2_path} 不存在") # exit(1) # # # 对比两张截图的相似度 # result1 = classify_hist_with_split(image1_path, image2_path) # # # 确保 result1 是一个标量值 # if isinstance(result1, numpy.ndarray): # result1 = result1.item() # # print("相似度为:" + "%.2f%%" % (result1 * 100)) # 检查输出路径是否有效函数 import cv2 import logging import os import shutil # 导入 shutil 模块以检查磁盘空间 def check_output_path(output_path): """ 检查输出路径是否有效 如果输出目录不存在,则尝试创建它,并检查是否有写权限 参数: output_path (str): 输出文件的路径 返回: bool: 如果输出路径有效且可写,则返回True,否则返回False """ # 获取输出文件的目录部分 output_dir = os.path.dirname(output_path) # 检查输出目录是否存在 if not os.path.exists(output_dir): try: # 尝试创建输出目录 os.makedirs(output_dir) logging.info(f"创建目录: {output_dir}") except Exception as e: # 如果创建目录失败,记录错误信息并返回False logging.error(f"无法创建目录 {output_dir}: {e}") return False # 检查文件权限 if not os.access(output_dir, os.W_OK): # 如果没有写权限,记录错误信息并返回False logging.error(f"没有写权限: {output_dir}") return False # 如果一切正常,返回True return True # 捕获RTSP流并保存为图像文件函数 def capture_frame_from_rtsp(rtsp_url, file_name, output_path=None): """ 从RTSP流中捕获一帧并保存为图像文件。 参数: - rtsp_url: RTSP流的URL。 - file_name: 保存图像文件的名称。 - output_path: 保存图像文件的路径,默认为None,如果未提供则使用默认路径。 返回: - 成功捕获并保存帧时返回True,否则返回False。 """ try: # 验证输入参数 if not rtsp_url: logging.error("RTSP URL 为空") return False # 获取当前脚本所在的根目录 script_dir = os.path.dirname(os.path.abspath(__file__)) root_dir = os.path.dirname(script_dir) # 构建默认输出路径 if output_path is None: output_path = os.path.join(root_dir, "reports", "imgs", "Exhibit_Inspect", "Control_Manage", file_name) # 检查并创建输出目录 if not check_output_path(output_path): return False # 打开RTSP流 cap = cv2.VideoCapture(rtsp_url) if not cap.isOpened(): logging.error("无法打开RTSP流") return False # 尝试多次读取帧以确保获取有效帧 for _ in range(5): # 尝试读取5次 ret, frame = cap.read() if ret and frame is not None: break else: logging.error("无法从RTSP流中读取有效帧") cap.release() return False # 确认帧不为空 if frame is None or frame.size == 0: logging.error("捕获到的帧为空") cap.release() return False # 检查帧的形状和类型 logging.info(f"捕获到的帧尺寸: {frame.shape}, 数据类型: {frame.dtype}") # 尝试保存帧为图像文件 success = False try: # 使用 cv2.imencode 保存图像到内存中,再写入文件 _, img_encoded = cv2.imencode('.png', frame) with open(output_path, 'wb') as f: f.write(img_encoded.tobytes()) success = True except Exception as e: logging.error(f"无法保存帧到 {output_path}: {e}") logging.error(f"检查路径是否存在: {os.path.exists(os.path.dirname(output_path))}") logging.error(f"检查路径是否可写: {os.access(os.path.dirname(output_path), os.W_OK)}") # 使用 shutil.disk_usage 检查磁盘空间 try: total, used, free = shutil.disk_usage(os.path.dirname(output_path)) logging.error(f"检查磁盘空间: {free // (2 ** 20)} MB available") except Exception as e: logging.error(f"无法检查磁盘空间: {e}") if success: logging.info(f"帧已保存到 {output_path}") else: logging.error(f"帧保存失败") # 释放资源 cap.release() return success except Exception as e: logging.error(f"捕获帧时发生错误: {e}", exc_info=True) return False # 中控屏灯光控制函数 def light_control(app_drive): """ 控制灯光的函数。 该函数通过Appium驱动定位并点击应用中的灯光控制按钮,以开启不同区域的灯光。 参数: - app_drive: Appium驱动实例,用于与移动应用交互。 """ # 开启所有区域灯光 # 定位【接待区】灯光 light_reception_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.ImageView[1]") sleep(2) logging.info("尝试定位【接待区】按钮元素,并点击按钮") click_with_retry(light_reception_button) sleep(2) # 定位【指挥中心】灯光 light_command_center_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[3]") sleep(2) logging.info("尝试定位【指挥中心】按钮元素,并点击按钮") click_with_retry(light_command_center_button) sleep(2) # 定位【影音室】灯光 light_audio_room_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[2]") sleep(2) logging.info("尝试定位【影音室】按钮元素,并点击按钮") click_with_retry(light_audio_room_button) sleep(2) # 定位【会议室】灯光 light_meeting_room_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[5]") sleep(2) logging.info("尝试定位【会议室】按钮元素,并点击按钮") click_with_retry(light_meeting_room_button) sleep(2) # 定位【会商区】灯光 light_meeting_area_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[4]") sleep(2) logging.info("尝试定位【会商区】按钮元素,并点击按钮") click_with_retry(light_meeting_area_button) sleep(2) # 定位【培训室】灯光 light_training_room_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[6]") sleep(2) logging.info("尝试定位【培训室】按钮元素,并点击按钮") click_with_retry(light_training_room_button) sleep(2) # 中控屏窗帘控制函数 def curtain_control(app_drive, wd): """ 控制窗帘的上升和下降,并捕获相应状态的截图。 参数: app_drive: Appium驱动对象,用于操作App。 wd: WebDriver对象,用于捕获屏幕截图。 此函数无返回值。 """ # 所有窗帘全部上升 logging.info("尝试定位所有【窗帘上升】按钮元素,并点击按钮") # 上升按钮的定位 curtain_up_locator = ['3', '4', '5', '1', '13'] for i in curtain_up_locator: curtain_up_button = find_element_with_retry(app_drive, AppiumBy.XPATH, f"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[{i}]") click_with_retry(curtain_up_button) sleep(2) INFO("请检查窗帘上升状态是否正常") # 截图获取当前中控屏软件窗帘上升的界面 get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "curtain_up") sleep(30) # # 测试报告中补充窗帘上升的截图 SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "curtain_rtsp_up") # 通过rtsp流获取当前窗帘的上升效果图 curtain_rtsp_url = "rtsp://admin:huawei@123@192.168.4.18/LiveMedia/ch1/Media2" logging.info("开始捕获RTSP流中的帧") if capture_frame_from_rtsp(curtain_rtsp_url, "curtain_rtsp_up.png"): logging.info("帧捕获成功") else: logging.error("帧捕获失败") # 所有窗帘全部下降 logging.info("尝试定位所有【窗帘下降】按钮元素,并点击按钮") # 下降按钮的定位 curtain_down_locator = ['10', '11', '12', '6', '15'] for i in curtain_down_locator: curtain_down_button = find_element_with_retry(app_drive, AppiumBy.XPATH, f"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[{i}]") click_with_retry(curtain_down_button) sleep(2) sleep(30) INFO("请检查窗帘下降状态是否正常") get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "curtain_down") # 截图获取当前中控屏软件窗帘上升的界面 # 测试报告中补充窗帘下降的截图 SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "curtain_rtsp_down") logging.info("开始捕获RTSP流中的帧") if capture_frame_from_rtsp(curtain_rtsp_url, "curtain_rtsp_down.png"): logging.info("帧捕获成功") else: logging.error("帧捕获失败") # 中控屏空调控制函数 def air_condition_control(app_drive, wd): """ 控制空调的打开与关闭,并检查其状态显示。 参数: - app_drive: Appium驱动对象,用于操作移动端应用。 - wd: WebDriver对象,用于捕获屏幕截图。 此函数不返回任何值。 """ # 点击【打开空调】按钮 logging.info("尝试定位【打开空调】按钮元素,并点击按钮") open_air_conditioner_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[14]") click_with_retry(open_air_conditioner_button) sleep(20) # 这是空调开启的状态显示 INFO("请检查空调开启的状态是否正常") get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "air_condition_on") sleep(2) # 点击【关闭空调】按钮 logging.info("尝试定位【关闭空调】按钮元素,并点击按钮") close_air_conditioner_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[14]") click_with_retry(close_air_conditioner_button) sleep(20) # 这是空调关闭的状态显示 INFO("请检查空调关闭的状态是否正常") get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "air_condition_off") sleep(2) # 中控屏信息发布控制函数 def information_control(app_drive, wd): """ 控制信息展示和捕获RTSP流中的帧。 参数: - app_drive: Appium驱动实例,用于操作移动应用。 - wd: WebDriver实例,用于操作网页。 此函数依次选择不同的内容进行播放,捕获RTSP流中的帧,并记录屏幕状态。 """ # 选择生日快乐内容播放 brithday_information_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[1]") click_with_retry(brithday_information_button) logging.info("选择生日快乐内容播放") sleep(5) # 这是生日快乐主题内容发布 INFO("请检查中控屏软件信息发布界面是否正常") SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "information_brithday_on") information_rtsp_url = "rtsp://admin:huawei@123@192.168.4.19/LiveMedia/ch1/Media2" # 替换为你的RTSP流地址 logging.info("开始捕获RTSP流中的帧") if capture_frame_from_rtsp(information_rtsp_url, "information_brithday_on.png"): logging.info("帧捕获成功") else: logging.error("帧捕获失败") # 选择欢迎领导发布内容播放 meeting_information_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[2]") click_with_retry(meeting_information_button) logging.info("选择会议发布内容播放") sleep(5) # 这是会议欢迎主题内容发布 INFO("请检查中控屏软件信息发布界面是否正常") SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "information_meeting_on") logging.info("开始捕获RTSP流中的帧") if capture_frame_from_rtsp(information_rtsp_url, "information_meeting_on.png"): logging.info("帧捕获成功") else: logging.error("帧捕获失败") # 选择展厅空间结构内容播放 information_space_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[3]") click_with_retry(information_space_button) logging.info("选择展厅空间结构内容播放") sleep(5) # 这是展厅空间结构内容发布 INFO("请检查中控屏软件信息发布界面是否正常显示") SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "information_space_on") if capture_frame_from_rtsp(information_rtsp_url, "information_space_on.png"): logging.info("帧捕获成功") else: logging.error("帧捕获失败") # 点击信息发布关闭按钮 INFO("点击信息发布关闭按钮") information_close_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[6]") click_with_retry(information_close_button) # 这是信发屏已关闭的界面 INFO("请检查中控屏软件信息发布界面是否正常显示为关闭") SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "information_off") if capture_frame_from_rtsp(information_rtsp_url, "information_off.png"): logging.info("帧捕获成功") else: logging.error("帧捕获失败") # 中控屏音乐控制函数 def music_control(app_drive, wd): """ 控制音乐播放的函数,包括播放和停止音乐。 :param app_drive: Appium驱动对象,用于操作App。 :param wd: WebDriver对象,用于浏览器自动化操作。 """ # 点击【播放音乐】 logging.info("尝试定位【播放音乐】按钮元素,并点击按钮") play_music_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[8]") click_with_retry(play_music_button) sleep(2) sleep(5) # 这是音乐开启播放后的界面显示 INFO("请检查中控屏软件打开音乐播放后的界面状态显示") get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "music_on") # 点击【关闭播放音乐】 logging.info("尝试定位【关闭播放音乐】按钮元素,并点击按钮") close_play_music_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[8]") click_with_retry(close_play_music_button) sleep(5) # 这是音乐关闭播放后的界面显示 INFO("请检查中控屏软件关闭音乐播放后的界面状态显示") get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "music_off") # 中控屏控制函数 def command_centre_control(rtsp_url, app_drive, wd): """ 控制指挥中心大屏的开启和关闭,并 capture RTSP 流的一帧作为日志。 参数: - rtsp_url: RTSP 流的 URL。 - app_drive: Appium 驱动对象,用于操作移动应用。 - wd: WebDriver 对象,用于执行 Selenium 相关操作。 此函数会尝试打开指挥中心大屏,capture 并记录大屏开启和关闭时的监控视频帧。 """ # 打开指挥中心大屏 open_center_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button") click_with_retry(open_center_button) sleep(10) # 这是指挥大屏开启的监控视频显示 INFO("请检查指挥大屏开启的监控视频状态是否正常") SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "command_screen_on") # 从rtsp流中截取一帧保存为图片 logging.info("开始捕获RTSP流中的帧") if capture_frame_from_rtsp(rtsp_url, "command_screen_on.png"): logging.info("帧捕获成功") else: logging.error("帧捕获失败") # 这是指挥大屏关闭的监控视频显示 INFO("请检查指挥大屏关闭的监控视频状态是否正常") SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "command_screen_down") # 关闭指挥中心大屏幕 logging.info("尝试定位【关闭指挥中心控制】按钮元素,并点击按钮") close_center_button = find_element_with_retry(app_drive, AppiumBy.XPATH, "/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button") click_with_retry(close_center_button) sleep(10) # 从rtsp流中截取一帧保存为图片 logging.info("开始捕获RTSP流中的帧") if capture_frame_from_rtsp(rtsp_url, "command_screen_down.png"): logging.info("帧捕获成功") else: logging.error("帧捕获失败") # app设备初始化adb连接函数 import subprocess from venv import logger def app_init(device_ip, port=5555): """ 初始化浏览器设置和实例。 此函数旨在初始化程序与app设备之间的adb连接,判断adb连接状态是否可控 """ # 标记初始化过程的开始 INFO("'----------' 正在初始化ADB连接 '----------'") """ 通过 ADB 连接设备并检查设备状态 :param device_ip: 设备的 IP 地址 :param port: 端口号,默认为 5555 """ try: # 构建设备地址 device_address = f"{device_ip}:{port}" # 连接设备 subprocess.run(['adb', 'connect', device_address], check=True) INFO(f"尝试连接到设备: {device_address}") # 检查设备状态 result = subprocess.run(['adb', 'devices'], capture_output=True, text=True, check=True) devices = result.stdout.strip().split('\n')[1:] # 去掉标题行 for device in devices: ip, status = device.split() if ip == device_address and status == 'device': INFO(f"设备 {device_address} 已连接并可用") return True elif ip == device_address and status == 'offline': INFO(f"设备 {device_address} 处于 offline 状态,当前设备不可控制,请检查设备网络状态") return False elif ip == device_address and status == 'unauthorized': logger.error(f"设备 {device_address} 未授权调试") return False INFO(f"设备 {device_address} 未找到,请检查设备IP是否正确!") return False except subprocess.CalledProcessError as e: INFO(f"连接设备失败: {e}") return False # app设备退出adb连接函数 def app_quit(device_ip,port=5555): """ 退出浏览器并释放资源。 该函数从全局存储中获取设备adb连接状态 """ # 断开特定 IP 和端口的 ADB 连接 device_address = f"{device_ip}:{port}" subprocess.run(['adb', 'disconnect', device_address]) INFO(f"ADB 连接已断开: {device_address}") # app截屏函数 def get_screenshot_with_retry(wd,app_drive, module_name, function_name, step_name, max_retries=3, retry_delay=5): """ 使用重试机制获取并保存截图。 参数: app_drive: 实现了get_screenshot_as_file方法的对象,用于获取截图。 module_name: 用于构造保存截图的目录名称。 function_name: 用于构造截图的目录名称。 setp_name:用于构造截图文件的名称 max_retries: 最大重试次数,默认为3次。 retry_delay: 重试间隔时间,默认为5秒。 返回值: 无。如果多次尝试截图失败,则抛出异常。 """ # 获取当前文件的绝对路径 current_file_path = os.path.abspath(__file__) # 获取当前文件的父级目录 parent_dir = os.path.dirname(current_file_path) # 构造目标目录路径 target_dir = os.path.join(parent_dir, '..', 'reports', 'imgs', module_name, function_name) # 确保目标目录存在,如果不存在则创建 os.makedirs(target_dir, exist_ok=True) # 构造文件路径 file_path = os.path.join(target_dir, f"{step_name}.png") #截屏 SELENIUM_LOG_SCREEN(wd, "75%", module_name, function_name, f"{step_name}") # 使用循环实现重试机制 for _ in range(max_retries): try: # 尝试保存截图 app_drive.get_screenshot_as_file(file_path) # 如果成功,记录日志并退出函数 logging.info(f"截图保存成功: {file_path}") return except Exception as e: # 如果失败,记录日志并等待重试 logging.warning(f"截图失败,重试中... ({e})") sleep(retry_delay) # 如果多次尝试均失败,则抛出异常 raise Exception(f"多次尝试截图失败: {file_path}") # app查找元素函数 def find_element_with_retry(app_driver, by, value, max_retries=3, retry_delay=5): """ 使用重试机制查找元素。 在WebDriver(driver)中通过给定的查找方式(by)和值(value)来查找页面元素。 如果在指定的最大重试次数(max_retries)内仍然找不到元素,则抛出异常。 每次重试之间会有指定的延迟时间(retry_delay)。 参数: - driver: WebDriver实例,用于执行查找操作。 - by: 查找元素的方式,如XPath、ID等。 - value: 元素的值,根据'by'参数指定的查找方式对应的具体值。 - max_retries: 最大重试次数,默认为3次。 - retry_delay: 每次重试之间的延迟时间,默认为5秒。 返回: - 返回找到的元素。 异常: - 如果超过最大重试次数仍未找到元素,则抛出异常。 """ for _ in range(max_retries): try: # 尝试查找元素,如果成功则立即返回元素 return app_driver.find_element(by, value) except Exception as e: # 如果查找元素失败,记录日志并等待一段时间后重试 logging.warning(f"查找元素失败,重试中... ({e})") sleep(retry_delay) # 如果达到最大重试次数仍未找到元素,则抛出异常 raise Exception(f"多次尝试查找元素失败: {by}={value}") # app点击事件函数 def click_with_retry(element, max_retries=3, retry_delay=5): """ 点击元素的函数,带有重试机制。 参数: element (obj): 要点击的元素对象。 max_retries (int): 最大重试次数,默认为3次。 retry_delay (int): 每次重试之间的延迟时间,默认为5秒。 异常: 如果超过最大重试次数仍未成功点击元素,则抛出异常。 """ # 尝试点击元素,直到达到最大重试次数 for _ in range(max_retries): try: # 尝试点击元素 element.click() # 如果点击成功,记录日志并退出函数 logging.info(f"点击元素成功: {element}") return except Exception as e: # 如果点击失败,记录日志并等待下一次重试 logging.warning(f"点击元素失败,重试中... ({e})") sleep(retry_delay) # 如果所有重试都失败,抛出异常 raise Exception(f"多次尝试点击元素失败: {element}")