提交 968d86a2 authored 作者: 陈泽健's avatar 陈泽健

Base函数库中的函数缩略补充函数使用说明,增加相关注释,删除无用函数。

上级 104c178e
......@@ -3,10 +3,26 @@ from time import sleep
from appium.options.android import UiAutomator2Options
from django.db.models.fields import return_None
from hytest import *
from pywinauto.mouse import click
from selenium import webdriver
# 创建一个函数,用于初始化Appium驱动程序
def app_setup_driver(platformName, platformVersion, deviceName, appPackage, appActivity, udid):
"""
根据提供的参数设置 Appium 驱动程序。
参数:
- platformName: 操作系统名称
- platformVersion: 操作系统版本
- deviceName: 设备名称
- appPackage: 应用程序包名
- appActivity: 应用程序活动名
- udid: 设备唯一标识符
返回:
- driver: Appium 驱动程序对象
"""
# 定义设备和应用的相关参数,以便 Appium 能够识别和控制设备
desired_caps = {
'platformName': platformName, # 被测手机是安卓
'platformVersion': platformVersion, # 手机安卓版本,如果是鸿蒙系统,依次尝试 12、11、10 这些版本号
......@@ -22,21 +38,39 @@ def app_setup_driver(platformName, platformVersion, deviceName, appPackage, appA
'autoGrantPermissions': True,
'udid':udid
}
# 记录 desired_caps 参数信息
logging.info(f"desired_caps参数:{desired_caps}")
try:
# 记录初始化 Appium 驱动程序的过程
logging.info("正在初始化 Appium 驱动程序...")
# 创建 Appium 驱动程序对象
driver = webdriver.Remote('http://localhost:4723/wd/hub',
options=UiAutomator2Options().load_capabilities(desired_caps))
# 记录 Appium 驱动程序初始化成功
logging.info("Appium 驱动程序初始化成功。")
# 返回 Appium 驱动程序对象
return driver
except Exception as e:
# 记录初始化驱动程序失败的错误信息
logging.error(f"初始化驱动程序失败: {e}")
# 重新抛出异常
raise
# 封装滑动操作
def swipe_up(app_driver):
"""
在应用程序中执行上滑操作。
参数:
- app_driver: 应用程序的驱动对象,用于与设备交互。
返回值:
"""
# 获取屏幕尺寸
size = app_driver.get_window_size()
# 计算滑动的起始和结束坐标
start_x = size['width'] // 2
start_y = int(size['height'] * 0.2) # 起始y坐标,屏幕高度的20%
end_x = start_x
......@@ -45,46 +79,49 @@ def swipe_up(app_driver):
# 执行滑动操作
app_driver.swipe(start_x, start_y, end_x, end_y, duration=500)
# 图片亮度对比函数
# 请使用“pip install opencv-python -i https://pypi.tuna.tsinghua.edu.cn/simple”安装PIL库
from PIL import Image
import numpy as np
import os
import logging
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def compare_brightness(light_down_path, light_on_path, threshold=1):
"""
对比两张图片的亮度,返回亮度是否增加的布尔值。
light_on_path:传入暗色的图片
light_down_path:传入亮色的图片
threshold:亮度变化的阈值,默认为1
"""
try:
# 打开图片并转换为灰度图像,以便后续处理
image1 = Image.open(light_down_path).convert('L') # 转换为灰度图像
image2 = Image.open(light_on_path).convert('L') # 转换为灰度图像
# 将图像转换为numpy数组
# 将图像转换为numpy数组,便于计算
array1 = np.array(image1)
array2 = np.array(image2)
# 计算平均亮度
# 计算两张图片的平均亮度
avg_brightness1 = np.mean(array1)
avg_brightness2 = np.mean(array2)
# 记录日志,输出两张图片的平均亮度
logging.info(f"关闭灯光时的平均亮度: {avg_brightness1}")
logging.info(f"打开灯光时的平均亮度: {avg_brightness2}")
# 判断亮度是否增加,考虑阈值
# 计算亮度变化量
brightness_increase = avg_brightness2 - avg_brightness1
# 记录日志,输出亮度变化量
logging.info(f"亮度变化量: {brightness_increase}")
# 判断亮度变化量是否超过阈值
return brightness_increase > threshold
except Exception as e:
# 异常处理,记录错误日志
logging.error(f"对比亮度时发生错误: {e}", exc_info=True)
return False
# 调用示例
# if __name__ == '__main__':
# logging.info("开始对比亮度")
#
......@@ -108,14 +145,27 @@ def compare_brightness(light_down_path, light_on_path, threshold=1):
# else:
# logging.error("灯光未成功打开")
# 提取特征点并比较图片函数
# 请使用“pip install opencv-python”安装cv2库
import cv2
import logging
from PIL import Image
import numpy as np
import os
def compare_images_feature_matching(image1_path, image2_path):
"""
比较两张图片是否相同
使用特征匹配的方法比较两张图片是否相同。首先验证图片路径和格式,然后根据图片尺寸进行调整,
最后转换为numpy数组进行比较。
参数:
image1_path (str): 第一张图片的路径
image2_path (str): 第二张图片的路径
返回:
dict: 包含比较结果和可能的错误信息
"""
try:
# 验证图片路径是否存在且为有效图片文件
if not os.path.isfile(image1_path) or not os.path.isfile(image2_path):
......@@ -163,7 +213,7 @@ def compare_images_feature_matching(image1_path, image2_path):
logging.error(f"未知错误: {e}")
return {"result": False, "error": f"未知错误: {e}"}
# 示例调用
# if __name__ == '__main__':
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# logging.info("开始对比图片")
......@@ -184,14 +234,25 @@ def compare_images_feature_matching(image1_path, image2_path):
# else:
# logging.error("图片不同")
# 计算直方图相似度函数
import cv2
import numpy
from PIL import Image
import logging
import os
def calculate(image1, image2):
"""
计算两张图片的直方图重合度
通过将图片转换为BGR格式,并计算每张图片的蓝色通道直方图,然后比较这两个直方图的重合度来评估图片的相似度
def calculate(image1, image2):
参数:
image1: 第一张图片,应为RGB格式
image2: 第二张图片,应为RGB格式
返回值:
返回两张图片直方图的重合度,范围在0到1之间,1表示完全重合,即图片高度相似
"""
image1 = cv2.cvtColor(numpy.asarray(image1), cv2.COLOR_RGB2BGR)
image2 = cv2.cvtColor(numpy.asarray(image2), cv2.COLOR_RGB2BGR)
hist1 = cv2.calcHist([image1], [0], None, [256], [0.0, 255.0])
......@@ -206,75 +267,113 @@ def calculate(image1, image2):
degree = degree / len(hist1)
return degree
# 图片相似性对比函数
def classify_hist_with_split(image1, image2, size=(256, 256)):
"""
根据两张图片的RGB直方图比较它们的相似性。
参数:
image1: 第一张图片的路径。
image2: 第二张图片的路径。
size: 将图片调整到的统一尺寸,默认为(256, 256)。
返回:
两张图片的相似度,值越小表示两张图片越相似。
"""
# 打开图片文件
image1 = Image.open(image1)
image2 = Image.open(image2)
# 将图像resize后,分离为RGB三个通道,再计算每个通道的相似值
# 将PIL图像转换为OpenCV格式(BGR)
image1 = cv2.cvtColor(numpy.asarray(image1), cv2.COLOR_RGB2BGR)
image2 = cv2.cvtColor(numpy.asarray(image2), cv2.COLOR_RGB2BGR)
# 调整图片尺寸,以确保比较是在相同尺寸下进行
image1 = cv2.resize(image1, size)
image2 = cv2.resize(image2, size)
# 分离图片的RGB通道
sub_image1 = cv2.split(image1)
sub_image2 = cv2.split(image2)
sub_data = 0
# 遍历每个通道,计算并累加相似度
for im1, im2 in zip(sub_image1, sub_image2):
sub_data += calculate(im1, im2)
# 计算平均相似度
sub_data = sub_data / 3
# 返回最终的相似度结果
return sub_data
# 示例调用
# if __name__ == '__main__':
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# logging.info("开始对比图片")
#
# image1_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\No_PaperLess\同屏后-无纸化设备A界面截屏.png'
# image2_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\No_PaperLess\同屏后-无纸化设备B界面截屏.png'
#
# if not os.path.exists(image1_path):
# logging.error(f"图片 {image1_path} 不存在")
# exit(1)
# if not os.path.exists(image2_path):
# logging.error(f"图片 {image2_path} 不存在")
# exit(1)
#
# # 对比两张截图的相似度
# result1 = classify_hist_with_split(image1_path, image2_path)
#
# # 确保 result1 是一个标量值
# if isinstance(result1, numpy.ndarray):
# result1 = result1.item()
#
# print("相似度为:" + "%.2f%%" % (result1 * 100))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("开始对比图片")
image1_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\No_PaperLess\同屏后-无纸化设备A界面截屏.png'
image2_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\No_PaperLess\同屏后-无纸化设备B界面截屏.png'
if not os.path.exists(image1_path):
logging.error(f"图片 {image1_path} 不存在")
exit(1)
if not os.path.exists(image2_path):
logging.error(f"图片 {image2_path} 不存在")
exit(1)
# 对比两张截图的相似度
result1 = classify_hist_with_split(image1_path, image2_path)
# 确保 result1 是一个标量值
if isinstance(result1, numpy.ndarray):
result1 = result1.item()
print("相似度为:" + "%.2f%%" % (result1 * 100))
# 检查输出路径是否有效函数
import cv2
import logging
import os
import shutil # 导入 shutil 模块以检查磁盘空间
def check_output_path(output_path):
"""
检查输出路径是否有效
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
如果输出目录不存在,则尝试创建它,并检查是否有写权限
def check_output_path(output_path):
参数:
output_path (str): 输出文件的路径
返回:
bool: 如果输出路径有效且可写,则返回True,否则返回False
"""
# 获取输出文件的目录部分
output_dir = os.path.dirname(output_path)
# 检查输出目录是否存在
if not os.path.exists(output_dir):
try:
# 尝试创建输出目录
os.makedirs(output_dir)
logging.info(f"创建目录: {output_dir}")
except Exception as e:
# 如果创建目录失败,记录错误信息并返回False
logging.error(f"无法创建目录 {output_dir}: {e}")
return False
# 检查文件权限
if not os.access(output_dir, os.W_OK):
# 如果没有写权限,记录错误信息并返回False
logging.error(f"没有写权限: {output_dir}")
return False
# 如果一切正常,返回True
return True
# 捕获RTSP流并保存为图像文件函数
def capture_frame_from_rtsp(rtsp_url, file_name, output_path=None):
"""
从RTSP流中捕获一帧并保存为图像文件。
参数:
- rtsp_url: RTSP流的URL。
- file_name: 保存图像文件的名称。
- output_path: 保存图像文件的路径,默认为None,如果未提供则使用默认路径。
返回:
- 成功捕获并保存帧时返回True,否则返回False。
"""
try:
# 验证输入参数
......@@ -351,73 +450,90 @@ def capture_frame_from_rtsp(rtsp_url, file_name, output_path=None):
logging.error(f"捕获帧时发生错误: {e}", exc_info=True)
return False
# 中控屏灯光控制函数
def light_control(app_drive):
"""
控制灯光的函数。
该函数通过Appium驱动定位并点击应用中的灯光控制按钮,以开启不同区域的灯光。
参数:
- app_drive: Appium驱动实例,用于与移动应用交互。
"""
# 开启所有区域灯光
# 定位【接待区】灯光
light_reception_button = app_drive.find_element(AppiumBy.XPATH,
light_reception_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.ImageView[1]")
sleep(2)
logging.info("尝试定位【接待区】按钮元素,并点击按钮")
light_reception_button.click()
click_with_retry(light_reception_button)
sleep(2)
# 定位【指挥中心】灯光
light_command_center_button = app_drive.find_element(AppiumBy.XPATH,
light_command_center_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[3]")
sleep(2)
logging.info("尝试定位【指挥中心】按钮元素,并点击按钮")
light_command_center_button.click()
click_with_retry(light_command_center_button)
sleep(2)
# 定位【影音室】灯光
light_audio_room_button = app_drive.find_element(AppiumBy.XPATH,
light_audio_room_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[2]")
sleep(2)
logging.info("尝试定位【影音室】按钮元素,并点击按钮")
light_audio_room_button.click()
click_with_retry(light_audio_room_button)
sleep(2)
# 定位【会议室】灯光
light_meeting_room_button = app_drive.find_element(AppiumBy.XPATH,
light_meeting_room_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[5]")
sleep(2)
logging.info("尝试定位【会议室】按钮元素,并点击按钮")
light_meeting_room_button.click()
click_with_retry(light_meeting_room_button)
sleep(2)
# 定位【会商区】灯光
light_meeting_area_button = app_drive.find_element(AppiumBy.XPATH,
light_meeting_area_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[4]")
sleep(2)
logging.info("尝试定位【会商区】按钮元素,并点击按钮")
light_meeting_area_button.click()
click_with_retry(light_meeting_area_button)
sleep(2)
# 定位【培训室】灯光
light_training_room_button = app_drive.find_element(AppiumBy.XPATH,
light_training_room_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[6]")
sleep(2)
logging.info("尝试定位【培训室】按钮元素,并点击按钮")
light_training_room_button.click()
click_with_retry(light_training_room_button)
sleep(2)
def curtain_control(app_drive,wd):
# 中控屏窗帘控制函数
def curtain_control(app_drive, wd):
"""
控制窗帘的上升和下降,并捕获相应状态的截图。
参数:
app_drive: Appium驱动对象,用于操作App。
wd: WebDriver对象,用于捕获屏幕截图。
此函数无返回值。
"""
# 所有窗帘全部上升
logging.info("尝试定位所有【窗帘上升】按钮元素,并点击按钮")
# 上升按钮的定位
curtain_up_locator = ['3', '4', '5', '1', '13']
for i in curtain_up_locator:
curtain_up_button = app_drive.find_element(AppiumBy.XPATH,
curtain_up_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
f"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[{i}]")
curtain_up_button.click()
click_with_retry(curtain_up_button)
sleep(2)
INFO("请检查窗帘上升状态是否正常")
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "curtain_up")
# 截图获取当前中控屏软件窗帘上升的界面
get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "curtain_up")
app_drive.get_screenshot_as_file(
r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\Control_Manage\curtain_up.png")
# get_screenshot_with_retry(wd,app_drive, "Control_Manage", "curtain_up")
sleep(30)
# # 测试报告中补充窗帘上升的截图
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "curtain_rtsp_up")
......@@ -434,16 +550,15 @@ def curtain_control(app_drive,wd):
# 下降按钮的定位
curtain_down_locator = ['10', '11', '12', '6', '15']
for i in curtain_down_locator:
curtain_down_button = app_drive.find_element(AppiumBy.XPATH,
curtain_down_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
f"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[{i}]")
curtain_down_button.click()
click_with_retry(curtain_down_button)
sleep(2)
sleep(30)
INFO("请检查窗帘下降状态是否正常")
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "curtain_down")
app_drive.get_screenshot_as_file(r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\Control_Manage\curtain_down.png")
get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "curtain_down")
# 截图获取当前中控屏软件窗帘上升的界面
# get_screenshot_with_retry(wd,app_drive, "Control_Manage", "curtain_down")
# 测试报告中补充窗帘下降的截图
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "curtain_rtsp_down")
logging.info("开始捕获RTSP流中的帧")
......@@ -452,39 +567,59 @@ def curtain_control(app_drive,wd):
else:
logging.error("帧捕获失败")
def air_condition_control(app_drive,wd):
# 中控屏空调控制函数
def air_condition_control(app_drive, wd):
"""
控制空调的打开与关闭,并检查其状态显示。
参数:
- app_drive: Appium驱动对象,用于操作移动端应用。
- wd: WebDriver对象,用于捕获屏幕截图。
此函数不返回任何值。
"""
# 点击【打开空调】按钮
logging.info("尝试定位【打开空调】按钮元素,并点击按钮")
open_air_conditioner_button = app_drive.find_element(AppiumBy.XPATH,
open_air_conditioner_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[14]")
open_air_conditioner_button.click()
click_with_retry(open_air_conditioner_button)
sleep(20)
# 这是空调开启的状态显示
INFO("请检查空调开启的状态是否正常")
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "air_condition_on")
# app_drive.get_screenshot_as_file(r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\Control_Manage\air_condition_on.png")
get_screenshot_with_retry(wd,app_drive, "Control_Manage", "air_condition_on")
get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "air_condition_on")
sleep(2)
# 点击【关闭空调】按钮
logging.info("尝试定位【关闭空调】按钮元素,并点击按钮")
close_air_conditioner_button = app_drive.find_element(AppiumBy.XPATH,
close_air_conditioner_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[14]")
close_air_conditioner_button.click()
click_with_retry(close_air_conditioner_button)
sleep(20)
# 这是空调关闭的状态显示
INFO("请检查空调关闭的状态是否正常")
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "air_condition_off")
# app_drive.get_screenshot_as_file(r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\Control_Manage\air_condition_off.png")
get_screenshot_with_retry(wd,app_drive, "Control_Manage", "air_condition_off")
get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "air_condition_off")
sleep(2)
def information_control(app_drive,wd):
# 中控屏信息发布控制函数
def information_control(app_drive, wd):
"""
控制信息展示和捕获RTSP流中的帧。
参数:
- app_drive: Appium驱动实例,用于操作移动应用。
- wd: WebDriver实例,用于操作网页。
此函数依次选择不同的内容进行播放,捕获RTSP流中的帧,并记录屏幕状态。
"""
# 选择生日快乐内容播放
brithday_information_button = app_drive.find_element(AppiumBy.XPATH,
brithday_information_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[1]")
brithday_information_button.click()
click_with_retry(brithday_information_button)
logging.info("选择生日快乐内容播放")
sleep(5)
# 这是生日快乐主题内容发布
......@@ -499,9 +634,9 @@ def information_control(app_drive,wd):
logging.error("帧捕获失败")
# 选择欢迎领导发布内容播放
meeting_information_button = app_drive.find_element(AppiumBy.XPATH,
meeting_information_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[2]")
meeting_information_button.click()
click_with_retry(meeting_information_button)
logging.info("选择会议发布内容播放")
sleep(5)
# 这是会议欢迎主题内容发布
......@@ -515,13 +650,13 @@ def information_control(app_drive,wd):
logging.error("帧捕获失败")
# 选择展厅空间结构内容播放
information_space_button = app_drive.find_element(AppiumBy.XPATH,
information_space_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[3]")
information_space_button.click()
click_with_retry(information_space_button)
logging.info("选择展厅空间结构内容播放")
sleep(5)
# 这是展厅空间结构内容发布
INFO("请检查中控屏软件信息发布界面是否正常")
INFO("请检查中控屏软件信息发布界面是否正常显示")
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "information_space_on")
if capture_frame_from_rtsp(information_rtsp_url, "information_space_on.png"):
......@@ -531,9 +666,9 @@ def information_control(app_drive,wd):
# 点击信息发布关闭按钮
INFO("点击信息发布关闭按钮")
information_close_button = app_drive.find_element(AppiumBy.XPATH,
information_close_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[6]")
information_close_button.click()
click_with_retry(information_close_button)
# 这是信发屏已关闭的界面
INFO("请检查中控屏软件信息发布界面是否正常显示为关闭")
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "information_off")
......@@ -543,40 +678,54 @@ def information_control(app_drive,wd):
else:
logging.error("帧捕获失败")
# 中控屏音乐控制函数
def music_control(app_drive, wd):
"""
控制音乐播放的函数,包括播放和停止音乐。
def music_control(app_drive,wd):
:param app_drive: Appium驱动对象,用于操作App。
:param wd: WebDriver对象,用于浏览器自动化操作。
"""
# 点击【播放音乐】
logging.info("尝试定位【播放音乐】按钮元素,并点击按钮")
play_music_button = app_drive.find_element(AppiumBy.XPATH,
play_music_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[8]")
play_music_button.click()
click_with_retry(play_music_button)
sleep(2)
# 调用音频采集判断音量函数
INFO("请检查中控屏音乐播放的音频采集是否正常!!!")
volume_acquisition()
sleep(5)
# 这是音乐开启播放后的界面显示
INFO("请检查中控屏软件打开音乐播放后的界面状态显示")
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "music_on")
# app_drive.get_screenshot_as_file(r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\Control_Manage\music_on.png")
get_screenshot_with_retry(wd,app_drive, "Control_Manage", "music_on")
get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "music_on")
# 点击【关闭播放音乐】
logging.info("尝试定位【关闭播放音乐】按钮元素,并点击按钮")
close_play_music_button = app_drive.find_element(AppiumBy.XPATH,
close_play_music_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button[8]")
close_play_music_button.click()
click_with_retry(close_play_music_button)
sleep(5)
# 这是音乐关闭播放后的界面显示
INFO("请检查中控屏软件关闭音乐播放后的界面状态显示")
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "music_off")
# app_drive.get_screenshot_as_file(r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\Control_Manage\music_off.png")
get_screenshot_with_retry(wd,app_drive, "Control_Manage", "music_off")
get_screenshot_with_retry(wd, app_drive, "Exhibit_Inspect", "Control_Manage", "music_off")
# 中控屏控制函数
def command_centre_control(rtsp_url, app_drive, wd):
"""
控制指挥中心大屏的开启和关闭,并 capture RTSP 流的一帧作为日志。
def command_centre_control(rtsp_url,app_drive,wd):
open_center_button = app_drive.find_element(AppiumBy.XPATH,
参数:
- rtsp_url: RTSP 流的 URL。
- app_drive: Appium 驱动对象,用于操作移动应用。
- wd: WebDriver 对象,用于执行 Selenium 相关操作。
此函数会尝试打开指挥中心大屏,capture 并记录大屏开启和关闭时的监控视频帧。
"""
# 打开指挥中心大屏
open_center_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button")
open_center_button.click()
click_with_retry(open_center_button)
sleep(10)
# 这是指挥大屏开启的监控视频显示
INFO("请检查指挥大屏开启的监控视频状态是否正常")
......@@ -593,9 +742,9 @@ def command_centre_control(rtsp_url,app_drive,wd):
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Control_Manage", "command_screen_down")
# 关闭指挥中心大屏幕
logging.info("尝试定位【关闭指挥中心控制】按钮元素,并点击按钮")
close_center_button = app_drive.find_element(AppiumBy.XPATH,
close_center_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
"/hierarchy/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.LinearLayout/android.widget.FrameLayout/android.widget.RelativeLayout/android.widget.RelativeLayout/android.widget.RelativeLayout[2]/android.widget.Button")
close_center_button.click()
click_with_retry(close_center_button)
sleep(10)
# 从rtsp流中截取一帧保存为图片
logging.info("开始捕获RTSP流中的帧")
......@@ -605,67 +754,9 @@ def command_centre_control(rtsp_url,app_drive,wd):
else:
logging.error("帧捕获失败")
import pyaudio
import numpy as np
# 定义参数
FORMAT = pyaudio.paInt16 # 16-bit resolution
CHANNELS = 1 # 1 channel
RATE = 44100 # 44.1kHz sampling rate
CHUNK = 1024 # 2^10 samples for buffer
THRESHOLD = 1000 # 阈值,可以根据实际情况调整
RECORD_SECONDS = 10 # 录制时长
def is_audio_loud(data, threshold=THRESHOLD):
"""
判断音频数据是否超过阈值
:param data: 音频数据
:param threshold: 阈值
:return: 如果声音大小超过阈值,返回True;否则返回False
"""
return np.abs(np.frombuffer(data, dtype=np.int16)).mean() > threshold
def volume_acquisition():
audio = pyaudio.PyAudio() # 初始化pyaudio
# 打开音频流
stream = audio.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
logging.info("开始录音...")
frames = []
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK) # 读取音频数据
frames.append(data)
logging.info("录音结束。")
# 停止和关闭音频流
stream.stop_stream()
stream.close()
audio.terminate()
# 将所有帧合并成一个字节流
audio_data = b''.join(frames)
# 判断音量状态
if is_audio_loud(audio_data):
INFO("整段音频声音过大!")
else:
INFO("整段音频声音正常。")
# 进一步处理音频数据(例如保存为文件或分析)
# ...
# if __name__ == "__main__":
# volume_acquisition()
# app设备初始化adb连接函数
import subprocess
from venv import logger
def app_init(device_ip, port=5555):
"""
初始化浏览器设置和实例。
......@@ -706,6 +797,7 @@ def app_init(device_ip, port=5555):
INFO(f"连接设备失败: {e}")
return False
# app设备退出adb连接函数
def app_quit(device_ip,port=5555):
"""
退出浏览器并释放资源。
......@@ -717,15 +809,16 @@ def app_quit(device_ip,port=5555):
subprocess.run(['adb', 'disconnect', device_address])
INFO(f"ADB 连接已断开: {device_address}")
def get_screenshot_with_retry(wd,app_drive, module_name, function_name, max_retries=3, retry_delay=5):
# app截屏函数
def get_screenshot_with_retry(wd,app_drive, module_name, function_name, step_name, max_retries=3, retry_delay=5):
"""
使用重试机制获取并保存截图。
参数:
app_drive: 实现了get_screenshot_as_file方法的对象,用于获取截图。
module_name: 用于构造保存截图的目录名称。
function_name: 用于构造截图文件的名称。
function_name: 用于构造截图的目录名称。
setp_name:用于构造截图文件的名称
max_retries: 最大重试次数,默认为3次。
retry_delay: 重试间隔时间,默认为5秒。
......@@ -744,7 +837,7 @@ def get_screenshot_with_retry(wd,app_drive, module_name, function_name, max_retr
file_path = os.path.join(target_dir, f"{function_name}.png")
#截屏
SELENIUM_LOG_SCREEN(wd, "75%", "Exhibit_Inspect", module_name, f"{function_name}")
SELENIUM_LOG_SCREEN(wd, "75%", module_name, function_name, f"{step_name}")
# 使用循环实现重试机制
for _ in range(max_retries):
......@@ -761,6 +854,7 @@ def get_screenshot_with_retry(wd,app_drive, module_name, function_name, max_retr
# 如果多次尝试均失败,则抛出异常
raise Exception(f"多次尝试截图失败: {file_path}")
# app查找元素函数
def find_element_with_retry(app_driver, by, value, max_retries=3, retry_delay=5):
"""
使用重试机制查找元素。
......@@ -793,7 +887,7 @@ def find_element_with_retry(app_driver, by, value, max_retries=3, retry_delay=5)
# 如果达到最大重试次数仍未找到元素,则抛出异常
raise Exception(f"多次尝试查找元素失败: {by}={value}")
# app点击事件函数
def click_with_retry(element, max_retries=3, retry_delay=5):
"""
点击元素的函数,带有重试机制。
......
......@@ -35,6 +35,7 @@ logging.basicConfig(
]
)
# 浏览器初始化函数
def browser_init(login_type):
"""
初始化浏览器设置和实例。
......@@ -92,7 +93,7 @@ def browser_init(login_type):
# 捕获并记录初始化过程中的任何异常
logging.error(f"浏览器初始化失败:{e}")
# 从配置项config中获取登录URL
def get_login_url_from_config(login_type):
"""
从配置文件中读取登录URL。
......@@ -146,7 +147,7 @@ def get_login_url_from_config(login_type):
# 返回登录 URL
return login_url
# 管理员登录函数
def admin_login(username, password):
"""
管理员登录函数。
......@@ -177,6 +178,7 @@ def admin_login(username, password):
# 点击登录按钮
safe_click((By.XPATH, "//input[@value='登 录']"), wd)
# 进入预定后台函数
def enter_the_backend():
"""
进入后台系统界面。
......@@ -191,6 +193,7 @@ def enter_the_backend():
# 执行点击操作,通过XPath定位到后台系统入口图标并点击
safe_click((By.XPATH, "//img[@title='后台系统']"), wd)
# 输入框输入值函数
def safe_send_keys(element_locator, value, wd):
"""
安全地向网页元素发送键值。
......@@ -220,6 +223,7 @@ def safe_send_keys(element_locator, value, wd):
# 如果元素不可交互,打印相应异常信息
INFO(f"ElementNotInteractableException: Element {element_locator} is not interactable.")
# 点击按钮函数
def safe_click(element_locator, wd):
"""
对其定位器指定的元素执行安全单击。
......@@ -249,11 +253,18 @@ def safe_click(element_locator, wd):
from time import sleep
from selenium.webdriver.common.by import By
# 议题输入和上传议题文件函数
def issue_send_and_upload(wd, issue_num, issue_name):
"""
输入议题名称以及上传议题文件。
参数:
wd: WebDriver实例,用于操作浏览器。
issue_num: 需要上传的议题文件数量。
issue_name: 会议议题的名称。
"""
# 议题文件的路径列表
issue_file_path = [
r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\issue_file\5.164Scan 安全报告.pdf",
r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\issue_file\IdeaTop软件配置&操作说明文档.docx",
......@@ -262,34 +273,41 @@ def issue_send_and_upload(wd, issue_num, issue_name):
r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\issue_file\议题图片.png"
]
# 打印并输入议题名称
INFO(f"输入议题名称:{issue_name}")
# 输入议题名称
safe_send_keys((By.XPATH, f"(//input[@placeholder='请输入会议议题'])[1]"), f"{issue_name}", wd)
# 选择议题文件进行上传
# 点击【上传文件】按钮以开始上传议题文件
INFO("点击【上传文件】按钮")
safe_click((By.XPATH, f"(//div[@class='topicsHandleButton uploadFile'][contains(text(),'上传文件(0)')])[1]"), wd)
sleep(2)
# 遍历每个议题文件进行上传
for i in range(issue_num):
# 检查文件是否存在
if not os.path.exists(issue_file_path[i]):
INFO(f"文件 {issue_file_path[i]} 不存在,跳出函数")
return
# INFO("定位【选择文件】按钮")
# 定位【选择文件】按钮
upload_button = wd.find_element(By.XPATH, '//*[@id="global-uploader-btn"]/input')
sleep(2)
# INFO(f"元素定位:{upload_button}")
# INFO("选择议题文件上传")
# 选择议题文件上传
upload_button.send_keys(issue_file_path[i])
# INFO(f"第{i+1}个议题文件上传完成")
# 等待文件上传完成
sleep(15)
# 截取上传完成后的屏幕日志
SELENIUM_LOG_SCREEN(wd, "50%", "Exhibit_Inspect", "Meeting_Message", "添加议题文件")
# 点击【确定】按钮
safe_click((By.XPATH, "//div[@aria-label='会议文件上传']//div[@class='el-dialog__footer']//div//span[contains(text(),'确定')]"), wd)
# 点击【确定】按钮完成上传
safe_click((By.XPATH,
"//div[@aria-label='会议文件上传']//div[@class='el-dialog__footer']//div//span[contains(text(),'确定')]"),
wd)
sleep(2)
# 清除输入框函数
def input_clear(element_locator, wd):
"""
清空输入框中的文本。
......@@ -319,6 +337,7 @@ def input_clear(element_locator, wd):
# 如果元素不可操作,打印元素不可操作的消息。
print(f"ElementNotInteractableException: Element {element_locator} is not interactable.")
# 键盘输入函数,例如【回车】键等操作
def send_keyboard(element_locator, wd):
"""
向指定元素发送键盘事件。
......@@ -349,6 +368,7 @@ def send_keyboard(element_locator, wd):
# 如果元素不可交互,打印不可交互错误消息。
print(f"ElementNotInteractableException: Element {element_locator} is not interactable.")
# 获取常规提示文本函数,会同步进行截屏操作
def get_notify_text(wd,element_locator,module_name,function_name,name):
"""
获取通知文本信息。
......@@ -358,22 +378,27 @@ def get_notify_text(wd,element_locator,module_name,function_name,name):
参数:
wd (WebDriver): 由上层传入的WebDriver对象,用于操作浏览器。
element_locator: 用于定位元素的定位器。
module_name: 模块名称,用于日志记录。
function_name: 函数名称,用于日志记录。
name: 屏幕截图的名称,用于日志记录。
返回:
str: 提取的通知文本信息。如果未能提取到信息或发生异常,则返回None。
"""
try:
# 获取提示信息
# 使用WebDriverWait等待元素出现,并获取其文本内容
notify_text = WebDriverWait(wd, 60).until(
EC.presence_of_element_located(element_locator)
).text
# 屏幕截图
# 在获取到通知文本后进行屏幕截图
SELENIUM_LOG_SCREEN(wd,"50%",module_name,function_name,name)
return notify_text
except Exception as e:
# 记录异常信息
# 当发生异常时,记录异常信息
INFO(f"Exception occurred: {e}")
# 获取列表的查询结果文本函数
def elment_get_text(element_locator, wd):
"""
获取页面元素的文本。
......@@ -402,6 +427,7 @@ def elment_get_text(element_locator, wd):
# 如果元素存在但不可交互(例如被遮挡),则捕获ElementNotInteractableException异常并打印错误信息。
print(f"ElementNotInteractableException: Element {element_locator} is not interactable.")
# 读取csv文件进行数据驱动函数
def read_csv_data(csv_file_path):
"""
读取CSV文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。
......@@ -433,8 +459,8 @@ def read_csv_data(csv_file_path):
# 返回包含所有测试用例数据的列表
return ddt_cases
# 读取测试用例xlsx文件中的JSON数据进行数据驱动函数
import openpyxl
def read_xlsx_data(xlsx_file_path):
"""
读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。
......@@ -488,6 +514,7 @@ def read_xlsx_data(xlsx_file_path):
# 返回包含所有测试用例数据的列表
return ddt_cases
# 获取当前进程的 CPU 占用率函数
def get_cpu_usage(interval=1):
"""
获取当前进程的 CPU 占用率。
......@@ -512,7 +539,19 @@ def get_cpu_usage(interval=1):
logging.error(f"未知错误: {e}")
return None
# 删除目录下的图片文件函数
def delete_images_in_directory(directory):
"""
删除指定目录下的所有图片文件。
该函数会遍历指定的目录,寻找并删除所有扩展名在image_extensions列表中的图片文件。
参数:
directory (str): 图片文件所在的目录路径。
返回:
无返回值。
"""
# 指定要删除的图片文件扩展名
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff']
......@@ -530,7 +569,24 @@ def delete_images_in_directory(directory):
except Exception as e:
print(f"删除文件 {file_path} 时出错: {e}")
# 判断非法字符函数
def is_valid_password(password):
"""
验证密码的有效性。
有效密码需满足以下条件:
1. 必须是一个字符串。
2. 长度至少为11个字符。
3. 必须包含至少一个小写字母、一个大写字母和一个数字。
4. 不得包含连续3位相同的字符。
5. 不得包含连续3位连续的字符(如123, abc)。
参数:
password (str): 待验证的密码。
返回:
bool: 如果密码有效则返回True,否则返回False。
"""
try:
# 基本类型检查
if not isinstance(password, str):
......@@ -559,6 +615,7 @@ def is_valid_password(password):
logging.error(f"An error occurred: {e}")
return False
# 退出浏览器并释放资源函数
def browser_quit():
"""
退出浏览器并释放资源。
......@@ -572,7 +629,7 @@ def browser_quit():
# 调用浏览器驱动实例的quit方法,关闭浏览器并释放资源
wd.quit()
# 获取最新的HTML报告文件,并拼接网页访问连接函数
def get_latest_report_file(report_dir, base_url):
"""
获取指定目录下最新的HTML报告文件,并返回带有基础URL的完整路径。
......@@ -618,6 +675,7 @@ def get_latest_report_file(report_dir, base_url):
# 返回完整的URL
return full_url
# 钉钉群机器人消息发送函数
def dingding_send_message(latest_report, title, mobile, ding_type):
"""
发送钉钉机器人消息
......@@ -625,8 +683,8 @@ def dingding_send_message(latest_report, title, mobile, ding_type):
:param latest_report: 测试报告链接
:param title: 消息标题
:param text: 消息内容
:param mobile: 需要@的手机号列表
:param ding_type: 钉钉机器人类型,用于选择不同的 Webhook URL 和密钥
"""
# 记录调用此函数的日志
logging.info("开始构建并发送钉钉机器人消息")
......@@ -700,7 +758,7 @@ def dingding_send_message(latest_report, title, mobile, ding_type):
except requests.exceptions.RequestException as e:
logging.error(f'请求异常: {e}')
# 运行自动化测试函数,并调用获取测试报告链接和钉钉机器人消息发送函数
def run_automation_test(report_title, report_url_prefix, test_case , ding_type):
"""
运行自动化测试并生成报告。
......@@ -709,6 +767,7 @@ def run_automation_test(report_title, report_url_prefix, test_case , ding_type):
- report_title: 报告的标题
- report_url_prefix: 报告URL的前缀
- test_case: 测试用例脚本执行的标签名
- ding_type: 钉钉通知的类型,备用参数,当前代码中未使用
"""
# 记录测试开始的日志
logging.info("开始自动化测试...")
......@@ -744,7 +803,19 @@ def run_automation_test(report_title, report_url_prefix, test_case , ding_type):
# 调用回调函数处理后续操作
get_reportfile_send_dingding(f"{report_title}", report_url_prefix, ding_type)
# 定义一个函数,用于获取最新的报告文件,并返回其URL,并调用钉钉消息发送函数
def get_reportfile_send_dingding(report_title, report_url_prefix, ding_type):
"""
获取最新的报告文件并通过钉钉发送报告链接。
参数:
report_title (str): 报告的标题。
report_url_prefix (str): 报告URL的前缀。
ding_type (str): 钉钉消息的类型。
返回:
"""
# print(GSTORE['case_pass'])
try:
# 获取报告文件所在的目录
......@@ -786,15 +857,12 @@ def get_reportfile_send_dingding(report_title, report_url_prefix, ding_type):
# 无论是否成功,都记录测试结束的日志
logging.info("自动化测试完成。")
# if __name__ == '__main__':
# get_reportfile_send_dingding("测试","http://192.168.1.166")
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import logging
# 点击并拖拽函数
def single_click_and_drag(source_element_locator, target_element_locator, wd):
"""
实现元素从source_element单击后拖拽到target_element的功能
......@@ -826,14 +894,13 @@ def single_click_and_drag(source_element_locator, target_element_locator, wd):
except Exception as e:
logging.error(f"发生未知错误: {e}")
# 获取check.txt文件并解析指定信息函数
import requests
import os
import chardet
from urllib3.exceptions import InsecureRequestWarning
# 禁用 InsecureRequestWarning 警告
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
def fetch_and_parse_check_txt(url, save_path, extract_info):
"""
获取check.txt文件并解析指定信息
......@@ -879,8 +946,8 @@ def fetch_and_parse_check_txt(url, save_path, extract_info):
logging.exception(f"请求错误: {e}")
return None
# 检查服务状态函数
import telnetlib
def check_service_status(host, port):
"""
检查服务状态。
......@@ -909,14 +976,39 @@ def check_service_status(host, port):
except Exception as e:
INFO(f"连接失败: {e}")
# 设置腾讯会议会议号为全局变量函数
def set_tx_meeting_id(element_locator, wd):
"""
设置腾讯会议ID到全局存储。
该函数通过元素定位器获取腾讯会议ID,并将其存储在全局变量GSTORE中,
以便在其他地方访问和使用该ID。
参数:
- element_locator: 用于定位腾讯会议ID元素的定位器。
- wd: WebDriver实例,用于与网页交互。
返回值:
无返回值。
"""
# 获取腾讯会议ID文本
tx_meeting_id = elment_get_text(element_locator, wd)
# 将腾讯会议ID存储在全局存储中
GSTORE['tx_meeting_id'] = tx_meeting_id
# 获取腾讯会议会议号函数
def get_tx_meeting_id():
"""
获取腾讯会议ID。
从全局存储器GSTORE中获取预先存储的腾讯会议ID。这个函数没有输入参数,直接返回存储的会议ID。
Returns:
str: 腾讯会议ID。
"""
return GSTORE.get('tx_meeting_id')
# 云喇叭设备注册函数
def voice_device_register(app_id, app_secret, device_sn):
"""
注册语音设备。
......@@ -959,10 +1051,8 @@ def voice_device_register(app_id, app_secret, device_sn):
# 处理解析响应异常,记录错误日志
logging.error("解析响应失败: %s", e)
# 云喇叭设备设置函数
import requests
def cloud_voice_setting(app_id, app_secret, device_sn):
"""
设置云语音功能。
......@@ -1008,12 +1098,17 @@ def cloud_voice_setting(app_id, app_secret, device_sn):
# device_sn = os.getenv("DEVICE_SN", "W703BB44444")
# cloud_voice_setting(app_id, app_secret, device_sn)
# 云喇叭设备播放函数
def play_cloud_voice(app_id, app_secret, device_sn):
"""
播放云语音功能。
本函数通过发送HTTP POST请求,触发远程语音设备播放指定的语音内容。
参数:
- app_id: 应用ID,用于标识应用。
- app_secret: 应用密钥,用于验证应用的身份。
- device_sn: 设备序列号,用于标识具体的语音设备。
"""
# 注册设备
voice_device_register(app_id, app_secret, device_sn)
......@@ -1070,10 +1165,10 @@ def play_cloud_voice(app_id, app_secret, device_sn):
# device_sn = os.getenv("DEVICE_SN", "W703BB44444")
# play_cloud_voice(app_id, app_secret, device_sn)
# 获取测试报告通过率等参数的函数
import logging
import re
from selenium.webdriver.common.by import By
def get_test_result(latest_report, wd):
"""
获取测试结果页面的通过率、失败率和异常率
......@@ -1135,13 +1230,21 @@ def get_test_result(latest_report, wd):
# test_result = get_test_result("http://nat.ubainsyun.com:31134/report_20250217_094401.html",wd)
# print(test_result)
# 获取本机IP地址函数
import yaml
import logging
import socket
import re
import subprocess
def get_local_ip():
"""
获取本机的局域网IP地址。
此函数通过尝试与外部网络通信来确定本机的局域网IP地址。它利用了UDP协议,
并连接到一个知名的公共IP地址和端口,以此来获取本机的IP地址信息。
# 获取本机IP地址
def get_local_ip():
Returns:
str: 本机的局域网IP地址。
"""
try:
# 创建一个UDP套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
......@@ -1150,64 +1253,85 @@ def get_local_ip():
# 获取本地IP地址
local_ip = sock.getsockname()[0]
finally:
# 确保在所有情况下关闭套接字
sock.close()
return local_ip
# 更新ngrok.cfg文件中的IP地址 函数
def update_ngrok_config(config_path, new_ip):
"""
更新ngrok配置文件中的IP地址
import yaml
import logging
import socket
import subprocess
本函数尝试打开并解析ngrok配置文件,更新其中的IP地址,然后将更改保存回配置文件中
# 获取本机IP地址
def get_local_ip():
try:
# 创建一个UDP套接字
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 连接到一个公共的IP地址和端口
sock.connect(("8.8.8.8", 80))
# 获取本地IP地址
local_ip = sock.getsockname()[0]
finally:
sock.close()
return local_ip
参数:
config_path (str): ngrok配置文件的路径
new_ip (str): 需要更新到配置文件中的新IP地址
# 更新ngrok.cfg文件中的IP地址
def update_ngrok_config(config_path, new_ip):
返回:
"""
try:
# 打开并安全地加载ngrok配置文件
with open(config_path, 'r', encoding='utf-8') as file:
config = yaml.safe_load(file)
# 更新IP地址
config['tunnels']['nat1']['proto']['tcp'] = f"{new_ip}:80"
# 将更新后的配置安全地写回文件
with open(config_path, 'w', encoding='utf-8') as file:
yaml.safe_dump(config, file, default_flow_style=False, allow_unicode=True)
# 记录成功更新的日志信息
logging.info(f"ngrok.cfg 更新成功,新的IP地址为: {new_ip}")
except Exception as e:
# 记录更新过程中出现的错误
logging.error(f"更新ngrok.cfg文件时出错: {e}")
# 启动ngrok
# 启动ngrok函数
def start_ngrok(ngrok_path, config_path):
"""
启动ngrok工具。
在尝试启动ngrok之前,此函数会先终止已运行的ngrok进程(如果有的话)。
然后使用指定的配置文件路径启动ngrok,并记录启动结果。
参数:
ngrok_path (str): ngrok可执行文件的路径。
config_path (str): ngrok配置文件的路径。
返回:
无返回值。
"""
try:
# 终止已运行的ngrok进程
kill_ngrok()
# 构建启动ngrok的命令
command = [ngrok_path, '-config', config_path, 'start', 'nat1']
# 使用构建的命令启动ngrok
subprocess.Popen(command, shell=True)
# 记录ngrok启动成功的信息
logging.info(f"ngrok 启动成功")
except Exception as e:
# 记录启动ngrok时发生的错误
logging.error(f"启动ngrok时出错: {e}")
# 停止ngrok进程函数
def kill_ngrok():
"""
尝试终止所有正在运行的ngrok进程。
"""
try:
# 使用 taskkill 命令终止所有 ngrok 进程
subprocess.run(['taskkill', '/F', '/IM', 'ngrok.exe'], check=True)
logging.info("终止所有 ngrok 进程成功")
except subprocess.CalledProcessError as e:
# 如果没有找到 ngrok 进程,记录相关信息
logging.info("没有找到 ngrok 进程")
except Exception as e:
# 如果终止 ngrok 进程时发生其他错误,记录错误信息
logging.error(f"终止 ngrok 进程时出错: {e}")
# if __name__ == '__main__':
......@@ -1225,56 +1349,7 @@ def kill_ngrok():
# ngrok_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\ngrok\ngrok-调试主机\ngrok.exe'
# start_ngrok(ngrok_path, ngrok_config_path)
# # 定义执行终端命令的函数
# def run_http_server():
# try:
# # 构建命令
# command = [
# 'cd', r'D:\GithubData\自动化\ubains-module-test\预定系统',
# '&&', 'python', '-m', 'http.server', '81', '--directory', 'reports'
# ]
#
# # 运行命令
# process = subprocess.Popen(command, shell=True)
# logging.info("HTTP 服务器启动成功")
# except Exception as e:
# logging.error(f"启动HTTP服务器时出错: {e}")
#
# if __name__ == '__main__':
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# run_http_server()
def user_login(element_locators, username, password, verifycode):
"""
管理员登录函数。
该函数通过模拟用户输入用户名、密码和验证码,并点击登录按钮,以实现管理员登录。
"""
# 获取webdriver实例
wd = GSTORE['wd']
# 打印用户名输入信息
INFO(f"输入用户名:{username}")
# 向用户名输入框发送用户名
safe_send_keys(element_locators['username_locator'], f'{username}', wd)
sleep(5)
# 打印密码输入信息
INFO(f"输入密码:{password}")
# 向密码输入框发送密码
safe_send_keys(element_locators['password_locator'], f"{password}", wd)
sleep(5)
# 打印验证码输入信息
INFO(f"输入验证码:{verifycode}")
# 向验证码输入验证码
safe_send_keys(element_locators['verifycode_locator'], f"{verifycode}", wd)
sleep(5)
#点击登录按钮
INFO("点击登录按钮")
safe_click(element_locators['submitButton_locator'], wd)
sleep(5)
# 读取XLSX文件JSON数据函数
def read_xlsx_data(xlsx_file_path):
"""
读取XLSX文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。
......@@ -1328,6 +1403,7 @@ def read_xlsx_data(xlsx_file_path):
# 返回包含所有测试用例数据的列表
return ddt_cases
# 字符串转换枚举类型函数
def get_by_enum(type_str):
"""
将字符串类型的定位器类型转换为 selenium.webdriver.common.by.By 枚举类型。
......@@ -1338,7 +1414,10 @@ def get_by_enum(type_str):
返回:
selenium.webdriver.common.by.By: 对应的 By 枚举类型。
"""
# 将输入的定位器类型字符串转换为大写,以匹配 By 枚举类型的命名
type_str = type_str.upper()
# 根据输入的字符串类型返回对应的 By 枚举类型
if type_str == 'XPATH':
return By.XPATH
elif type_str == 'ID':
......@@ -1356,4 +1435,5 @@ def get_by_enum(type_str):
elif type_str == 'PARTIAL_LINK_TEXT':
return By.PARTIAL_LINK_TEXT
else:
# 如果输入的定位器类型字符串不匹配任何已知的 By 枚举类型,抛出 ValueError 异常
raise ValueError(f"未知的定位器类型: {type_str}")
......@@ -240,3 +240,4 @@
- 补充展厅无纸化中控2.0的同屏巡检代码。补充展厅无纸化2.0的主流程验证代码。
- 处理展厅统一平台因会议室变动导致的异常,会议室搜索改为模糊查询,会控界面的终端拖拽元素调整更新。
- 处理测试用例JSON读取登录功能测试脚本。
- Base函数库中的函数缩略补充函数使用说明,增加相关注释,删除无用函数。
\ No newline at end of file
......@@ -64,7 +64,7 @@ class Exhibition_hall_Control_000x:
SELENIUM_LOG_SCREEN(wd, "75%", "Exhibit_Inspect", "Control_Manage", "light_all_off")
# app_drive.get_screenshot_as_file(r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\Control_Manage\light_all_off.png")
# 截图获取当前软件的灯光控制界面
get_screenshot_with_retry(wd,app_drive,"Control_Manage", "light_all_off")
get_screenshot_with_retry(wd,app_drive, "Exhibit_Inspect","Control_Manage", "light_all_off")
# 调用灯光控制函数
light_control(app_drive)
......@@ -73,7 +73,7 @@ class Exhibition_hall_Control_000x:
SELENIUM_LOG_SCREEN(wd, "75%", "Exhibit_Inspect", "Control_Manage", "light_all_on")
# app_drive.get_screenshot_as_file(r"D:\GithubData\自动化\ubains-module-test\预定系统\reports\imgs\Exhibit_Inspect\Control_Manage\light_all_on.png")
# 截图获取当前软件的灯光控制界面
get_screenshot_with_retry(wd,app_drive,"Control_Manage", "light_all_on")
get_screenshot_with_retry(wd,app_drive, "Exhibit_Inspect","Control_Manage", "light_all_on")
# 这是灯光开启后的截图
INFO("请检查灯光开启后的监控视频状态是否正常")
......
......@@ -50,7 +50,7 @@ class same_screen_share_000x:
# 先截取另一台设备的界面做参照物
INFO(f"这是设备B的界面")
# 调用截屏函数
get_screenshot_with_retry(wd, app_drive2, "No_PaperLess", "BeforeShareDeviceB")
get_screenshot_with_retry(wd, app_drive2, "Exhibit_Inspect", "No_PaperLess", "BeforeShareDeviceB")
# 进入【同屏和视频服务】界面
logging.info(f"进入【同屏和视频服务】")
......@@ -79,10 +79,10 @@ class same_screen_share_000x:
# 截取设备A和设备B的同屏画面截屏
INFO(f"这是设备A同屏共享的界面")
get_screenshot_with_retry(wd, app_drive1, "No_PaperLess", "AfterShareDeviceA")
get_screenshot_with_retry(wd, app_drive1, "Exhibit_Inspect","No_PaperLess", "AfterShareDeviceA")
INFO(f"这是设备B同屏共享的界面")
get_screenshot_with_retry(wd, app_drive2, "No_PaperLess", "AfterShareDeviceB")
get_screenshot_with_retry(wd, app_drive2, "Exhibit_Inspect","No_PaperLess", "AfterShareDeviceB")
# 调用图片对比函数判断相似度
# image1_path = r"/预定系统/reports/imgs/Exhibit_Inspect/No_PaperLess/同屏后-无纸化设备A界面截屏.png"
......@@ -112,10 +112,10 @@ class same_screen_share_000x:
click_with_retry(share_quit_button)
# 退出同屏后截屏操作
INFO(f"这是设备A关闭同屏共享的界面")
get_screenshot_with_retry(wd, app_drive1, "No_PaperLess", "QuitShareDeviceA")
get_screenshot_with_retry(wd, app_drive1, "Exhibit_Inspect","No_PaperLess", "QuitShareDeviceA")
INFO(f"这是设备B关闭同屏共享的界面")
get_screenshot_with_retry(wd, app_drive2, "No_PaperLess", "QuitShareDeviceB")
get_screenshot_with_retry(wd, app_drive2, "Exhibit_Inspect","No_PaperLess", "QuitShareDeviceB")
sleep(5)
......
......@@ -28,7 +28,7 @@ class Exhibition_hall_NoPaperinspection_000x:
# 使用显式等待来等待元素出现
logging.info("等待登录页加载...")
sleep(10)
get_screenshot_with_retry(wd,app_drive, "No_PaperLess", "无纸化首页截屏")
get_screenshot_with_retry(wd,app_drive, "Exhibit_Inspect","No_PaperLess", "无纸化首页截屏")
# 定位【会议签到】按钮元素,并点击按钮
logging.info("尝试定位【会议签到】按钮元素,并点击按钮")
......@@ -39,7 +39,7 @@ class Exhibition_hall_NoPaperinspection_000x:
sleep(2)
STEP(2, "无纸化会议信息")
get_screenshot_with_retry(wd,app_drive, "No_PaperLess", "无纸化会议信息截图")
get_screenshot_with_retry(wd,app_drive, "Exhibit_Inspect","No_PaperLess", "无纸化会议信息截图")
# 点击【会议议题】按钮
meeting_issue_button = find_element_with_retry(app_drive, AppiumBy.XPATH,
......@@ -65,7 +65,7 @@ class Exhibition_hall_NoPaperinspection_000x:
STEP(3, "无纸化议题信息")
INFO("请查看议题显示")
get_screenshot_with_retry(wd,app_drive, "No_PaperLess", "议题文件截图")
get_screenshot_with_retry(wd,app_drive, "Exhibit_Inspect","No_PaperLess", "议题文件截图")
for i in range(1,4):
logging.info(f"定位议题文件{i}的【查看】按钮元素")
......@@ -75,7 +75,7 @@ class Exhibition_hall_NoPaperinspection_000x:
sleep(5)
INFO(f"请查看议题文件{i}显示")
get_screenshot_with_retry(wd,app_drive, "No_PaperLess", f"议题文件{i}截图")
get_screenshot_with_retry(wd,app_drive, "Exhibit_Inspect","No_PaperLess", f"议题文件{i}截图")
sleep(2)
app_drive.back()
sleep(2)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论