提交 bd845030 authored 作者: 陈泽健's avatar 陈泽健

feat(app): 初始化Appium驱动及中控屏设备控制功能

- 添加Appium驱动初始化函数,支持Android设备控制
- 实现滑动操作封装,用于界面导航
- 添加图片亮度对比、特征匹配和直方图相似度计算功能
- 实现RTSP流帧捕获和保存功能
- 封装中控屏灯光、窗帘、空调、信息发布、音乐及大屏控制函数
- 添加ADB连接初始化和断开连接管理函数
- 提供设备控制相关的元素定位和点击重试机制
- 集成截图和日志记录功能用于测试验证
上级 b305fef3
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
预定系统标准版
\ No newline at end of file
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.10 (预定系统标准版)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (预定系统标准版)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/预定系统.iml" filepath="$PROJECT_DIR$/.idea/预定系统.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
import sys
import os
# 获取 Base 目录的绝对路径,并加入 sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
from 预定系统.Base.base import *
def suite_teardown():
browser_quit()
\ No newline at end of file
import sys
import os
from time import sleep
# 获取 Base 目录的绝对路径,并加入 sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
from 新统一平台.Base.base import *
def suite_setup():
STEP(1, "初始化浏览器")
browser_init("新统一平台")
def suite_teardown():
browser_quit()
\ No newline at end of file
import sys
import os
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建统一平台的绝对路径
platform_path = os.path.abspath(os.path.join(current_dir, '..','..','..'))
# 添加路径到系统路径中
sys.path.append(platform_path)
# 导入模块
from 统一平台.base.bases import *
# 构建XLSX文件的绝对路径
xlsx_file_path = os.path.join(current_dir, '..', '..', '测试数据', '新统一平台测试用例.xlsx')
class NewUnifiedPlatform:
#执行指令:
# cd ./新统一平台/
# hytest --tag 新统一平台
tags = ['新统一平台']
ddt_cases = read_xlsx_data(xlsx_file_path, case_type="标准版")
def teststeps(self):
"""
执行测试步骤函数,主要用于执行读取的测试用例并进行信息统计模块功能测试操作
"""
# 从全局存储中获取webdriver对象
wd = GSTORE['wd']
name = self.name
# 刷新页面
# wd.refresh()
wd.refresh()
sleep(5)
for step in self.para:
# 赋值页面类型page
page_type = step.get('page')
# 赋值元素定位类型,并将字符串转为Enum类型
locator_type = get_by_enum(step.get('locator_type'))
# 赋值元素值
locator_value = step.get('locator_value')
# 赋值元素类型,例如:click点击、input输入框等
element_type = step.get('element_type')
# 赋值元素值,例如输入框的输入值
element_value = step.get('element_value')
# 赋值预期结果
expected_result = step.get('expected_result')
# 赋值等待时间
# sleep_time = step.get('sleep_time')
# 赋值操作步骤
step_name = step.get('step')
INFO(
f"步骤名称: {step_name}\n"
)
if element_type == "click":
safe_click((locator_type, locator_value), wd)
sleep(5)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "input":
safe_send_keys((locator_type, locator_value), element_value, wd)
sleep(2)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "SwitchWindow":
# 将字符转换为int类型
element_value = int(element_value)
wd.switch_to.window(wd.window_handles[element_value])
sleep(2)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "login":
# 退出系统登录
safe_click((By.XPATH, "//div[@class='quit']"), wd)
sleep(5)
INFO(f"开始登录,账号为:{element_value[0]},密码为:{element_value[1]}")
safe_send_keys((By.XPATH, "//input[@placeholder='手机号/用户名/邮箱']"), "admin@XTY", wd)
safe_send_keys((By.XPATH, "//input[@placeholder='密码']"), "Ubains@4321", wd)
safe_send_keys((By.XPATH, "//input[@placeholder='图形验证']"), "csba", wd)
sleep(2)
INFO("对协议进行勾选")
safe_click((By.XPATH, "//div[@aria-label='提示']//span[contains(text(),'确定')]"), wd)
INFO("已经勾选协议了")
sleep(2)
safe_click((By.XPATH, "//div[@id='pane-1']//div//span[contains(text(),'登录')]"), wd)
INFO("已经点击登录了")
sleep(2)
elif element_type == "getTips":
notify_text = get_notify_text(wd, (locator_type, locator_value))
INFO(f"获取到的提示信息为:{notify_text}")
sleep(2)
CHECK_POINT(f"获取到的提示信息为:{notify_text}", expected_result in notify_text)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "getText":
text = elment_get_text((locator_type, locator_value), wd)
INFO(f"获取到的文本信息为:{text}")
CHECK_POINT(f"获取到的文本信息为:{text}", expected_result in text)
SELENIUM_LOG_SCREEN(wd, "75")
sleep(2)
\ No newline at end of file
import sys
import os
from time import sleep
# 获取 Base 目录的绝对路径,并加入 sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
from 新统一平台.Base.base import *
def suite_setup():
STEP(1, "初始化浏览器")
browser_init("新统一平台")
def suite_teardown():
browser_quit()
\ No newline at end of file
import sys
import os
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建统一平台的绝对路径
platform_path = os.path.abspath(os.path.join(current_dir, '..','..','..'))
# 添加路径到系统路径中
sys.path.append(platform_path)
# 导入模块
from 统一平台.base.bases import *
# 构建XLSX文件的绝对路径
xlsx_file_path = os.path.join(current_dir, '..', '..', '测试数据', '新统一平台测试用例.xlsx')
class NewUnifiedPlatform:
#执行指令:
# cd ./统一平台/
# hytest --tag 新统一平台
tags = ['新统一平台']
ddt_cases = read_xlsx_data(xlsx_file_path, case_type="标准版")
def teststeps(self):
"""
执行测试步骤函数,主要用于执行读取的测试用例并进行信息统计模块功能测试操作
"""
# 从全局存储中获取webdriver对象
wd = GSTORE['wd']
name = self.name
# 刷新页面
# wd.refresh()
wd.refresh()
sleep(5)
for step in self.para:
# 赋值页面类型page
page_type = step.get('page')
# 赋值元素定位类型,并将字符串转为Enum类型
locator_type = get_by_enum(step.get('locator_type'))
# 赋值元素值
locator_value = step.get('locator_value')
# 赋值元素类型,例如:click点击、input输入框等
element_type = step.get('element_type')
# 赋值元素值,例如输入框的输入值
element_value = step.get('element_value')
# 赋值预期结果
expected_result = step.get('expected_result')
# 赋值等待时间
# sleep_time = step.get('sleep_time')
# 赋值操作步骤
step_name = step.get('step')
INFO(
f"步骤名称: {step_name}\n"
)
if element_type == "click":
safe_click((locator_type, locator_value), wd)
sleep(5)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "input":
safe_send_keys((locator_type, locator_value), element_value, wd)
sleep(2)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "SwitchWindow":
# 将字符转换为int类型
element_value = int(element_value)
wd.switch_to.window(wd.window_handles[element_value])
sleep(2)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "login":
# 退出系统登录
safe_click((By.XPATH, "//div[@class='quit']"), wd)
sleep(5)
INFO(f"开始登录,账号为:{element_value[0]},密码为:{element_value[1]}")
safe_send_keys((By.XPATH, "//input[@placeholder='手机号/用户名/邮箱']"), "admin@xty", wd)
safe_send_keys((By.XPATH, "//input[@placeholder='密码']"), "Ubains@4321", wd)
safe_send_keys((By.XPATH, "//input[@placeholder='图形验证']"), "csba", wd)
sleep(2)
INFO("对协议进行勾选")
safe_click((By.XPATH, "//div[@aria-label='提示']//span[contains(text(),'确定')]"), wd)
INFO("已经勾选协议了")
sleep(2)
safe_click((By.XPATH, "//div[@id='pane-1']//div//span[contains(text(),'登录')]"), wd)
INFO("已经点击登录了")
sleep(2)
elif element_type == "getTips":
notify_text = get_notify_text(wd, (locator_type, locator_value))
INFO(f"获取到的提示信息为:{notify_text}")
sleep(2)
CHECK_POINT(f"获取到的提示信息为:{notify_text}", expected_result in notify_text)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "getText":
text = elment_get_text((locator_type, locator_value), wd)
INFO(f"获取到的文本信息为:{text}")
CHECK_POINT(f"获取到的文本信息为:{text}", expected_result in text)
SELENIUM_LOG_SCREEN(wd, "75")
sleep(2)
\ No newline at end of file
{
"标准版新统一平台": "https://192.168.5.44"
}
\ No newline at end of file
server_addr: ngrok.ubsyun.com:9083
trust_host_root_certs: false
tunnels:
nat1:
proto:
tcp: 127.0.0.1:80
remote_port: 31135
ngrok -config=ngrok.cfg start nat1
\ No newline at end of file
import schedule
import queue
from Base.base import *
import time
import logging
if getattr(sys, 'frozen', False):
base_dir = os.path.dirname(sys.executable)
else:
base_dir = os.path.dirname(os.path.abspath(__file__))
cases_dir = os.path.join(base_dir, 'cases')
if not os.path.isdir(cases_dir):
print("cases_dir 路径为:", cases_dir)
raise FileNotFoundError("请将 cases 目录放在程序同级目录下!")
"""
调试主机-执行指令:
1.打开一个终端输入:
- cd .\预定系统\
- python -m http.server 81 --directory reports
2.打开新终端输入:
- cd .\预定系统\ngrok\ngrok-调试主机\
- .\start.bat
3.再打开一个终端输入:
- cd .\预定系统\
- python .\定时执行功能测试.py
自动化运行虚拟机-执行指令:
1.打开一个终端输入:
- cd .\预定系统\
- python -m http.server 81 --directory reports
2.打开新终端输入:
- cd .\预定系统\ngrok\ngrok-自动化运行虚拟机
- .\start.bat
3.再打开一个终端输入:
- cd .\预定系统\
- python .\定时执行功能测试.py
"""
import os
import sys
def get_resource_path(relative_path):
"""
获取打包后的资源文件真实路径(适用于 PyInstaller)
:param relative_path: 相对于打包时指定的路径
:return: 真实路径字符串
"""
if getattr(sys, 'frozen', False): # 是否被打包成 .exe
base_path = sys._MEIPASS
else:
base_path = os.path.dirname(os.path.abspath(__file__))
return os.path.join(base_path, relative_path)
# 创建一个任务队列,用于存储待处理的任务
task_queue = queue.Queue()
def run_task(task, *args, **kwargs):
# 将任务及其参数放入任务队列
task_queue.put((task, args, kwargs))
logging.debug(f"任务已加入队列: {task.__name__} with args: {args} and kwargs: {kwargs}")
def worker():
# 工作线程的主循环
while True:
# 从任务队列中获取任务及其参数
task, args, kwargs = task_queue.get()
try:
# 记录任务开始执行的时间
logging.debug(f"开始执行任务: {task.__name__} with args: {args} and kwargs: {kwargs}")
# 执行任务并获取结果
result = task(*args, **kwargs)
# 如果任务有返回结果,记录日志
if result:
logging.info(result)
except Exception as e:
# 捕获任务执行过程中发生的任何异常并记录错误日志
logging.error(f"执行任务时发生错误: {e}", exc_info=True)
finally:
# 无论任务是否成功执行,都标记任务已完成
task_queue.task_done()
# 记录任务完成的时间
logging.debug(f"任务完成: {task.__name__}")
def start_workers(num_workers):
# 启动指定数量的工作线程
for _ in range(num_workers):
# 创建一个新的工作线程,目标函数为 worker,设置为守护线程
threading.Thread(target=worker, daemon=True).start()
# 启动3个工作线程
start_workers(3)
# 定时执行预定系统测试任务
schedule.every().day.at("10:00").do(run_task, run_automation_test, report_title="预定系统测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="JSON测试", ding_type="标准版巡检")
schedule.every().day.at("07:00").do(run_task, run_automation_test, report_title="兰州中石化项目测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="兰州中石化项目", ding_type="标准版巡检")
# 定时执行展厅巡检任务
# schedule.every().day.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().monday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().thursday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().wednesday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().tuesday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().friday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检")
# schedule.every().saturday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
# schedule.every().sunday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
try:
# 无限循环,持续检查并执行计划任务
while True:
schedule.run_pending() # 检查并执行所有待处理的任务
time.sleep(1) # 每秒检查一次
except KeyboardInterrupt:
# 捕获用户中断信号 (Ctrl+C)
logging.info("Scheduler interrupted by user.")
except Exception as e:
# 捕获其他未预期的异常
logging.error(f"Unexpected error: {e}", exc_info=True)
\ No newline at end of file
import csv
def create_AndroidMessageUp_csv():
# 定义基础数据
base_data = {
"topic": "rebootResponseTopic",
"appToken": "AND-2IK-0021",
"companyNumber": "CN-2IK-UBAINS",
"cnum": "",
"conferenceId": "100", # 初始值为字符串
"macAddress": "20:59:20:00:28:01",
}
# 输出文件路径
output_file = "MQTT模块/MQTT安卓上报_2000条.csv"
# 生成2000条数据
with open(output_file, mode="w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
# 写入表头
writer.writerow([
"topic", "clientId", "appToken", "companyNumber", "cnum",
"conferenceId", "macAddress", "authCode", "clientId", "deviceId"
])
for i in range(1, 2001):
# 格式化编号
index_str = f"{i:04d}"
client_id = f"48134e6047a19a{i:04d}"
device_id = f"aa44e258a4e1e{i:04d}"
app_token = f"AND-2IK-{index_str}"
auth_code = app_token
# conferenceId 从100开始递增
current_conference_id = str(int(base_data["conferenceId"]) + i)
# 写入一行数据
writer.writerow([
base_data["topic"], client_id, app_token, base_data["companyNumber"],
base_data["cnum"], current_conference_id, base_data["macAddress"],
auth_code, client_id, device_id
])
print(f"成功生成 {output_file} 文件,包含2000条数据。")
import csv
def create_Androidbroadcast_csv():
# 基础配置
base_topic = "/uams/android/broadcast"
output_file = "MQTT模块/MQTT心跳上报_2000条.csv"
# 生成2000条数据
with open(output_file, mode="w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
# 写入表头(根据原始CSV结构)
writer.writerow([
"topic", "clientId", "appToken", "companyNumber", "cnum",
"conferenceId", "macAddress", "authCode", "clientId", "deviceId"
])
for i in range(1, 2001):
# 格式化编号为4位数,如0001, 0002...
index_str = f"{i:04d}"
# clientId 和 deviceId 的格式
client_id = f"48134e6047a19a{index_str}"
device_id = f"aa44e258a4e1e{index_str}"
# 按照指定格式写入空字段和动态字段
writer.writerow([
base_topic, "", "", "", "",
"", "", "", client_id, device_id
])
print(f"成功生成 {output_file} 文件,包含2000条数据。")
# if __name__ == "__main__":
# create_AndroidMessageUp_csv()
# create_Androidbroadcast_csv()
\ No newline at end of file
...@@ -2,15 +2,8 @@ import sys ...@@ -2,15 +2,8 @@ import sys
import os import os
# 获取 Base 目录的绝对路径,并加入 sys.path # 获取 Base 目录的绝对路径,并加入 sys.path
current_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
base_dir = os.path.abspath(os.path.join(current_dir, '..', 'Base')) from 预定系统.Base.base import *
if base_dir not in sys.path:
sys.path.insert(0, base_dir)
try:
from base import *
except ModuleNotFoundError as e:
print(f"ModuleNotFoundError: {e}")
def suite_teardown(): def suite_teardown():
browser_quit() browser_quit()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论