提交 1a1226b7 authored 作者: 陈泽健's avatar 陈泽健

refactor(测试用例):优化测试用例中导入 Base模块的方式

- 修改了获取 Base 目录路径的方法,确保正确添加到 sys.path
- 使用 try-except 结构处理可能的 ModuleNotFoundError 异常
-优化代码结构,提高可读性和健壮性
上级 4f0658f8
......@@ -21,7 +21,7 @@ logging.basicConfig(
# 全局配置
message_queue = Queue()
THREAD_COUNT = 20 # 减少线程数量便于观察进度
THREAD_COUNT = 110 # 减少线程数量便于观察进度
DOWNLOAD_DIR = "downloads"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
......@@ -139,8 +139,10 @@ def worker_thread():
actual_auth = payload["headers"]["Authorization"]
udid = payload["udid"][0]
# 构建请求参数
url = "http://192.168.5.229:8999/androidPanel/pack/download?id=30"
# 构建apk的请求参数
# url = "http://192.168.5.229:8999/androidPanel/pack/download?id=30"
# 构建pak的请求参数
url = "http://192.168.5.229:8999/androidPanel/pack/download?id=29"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
......
import psutil
import time
import logging
import platform
import paramiko # 用于远程服务器监控
from datetime import datetime
from collections import deque
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='network_monitor.log'
)
# 监控配置
class MonitorConfig:
def __init__(self):
# 本机监控配置
self.local_check_interval = 5 # 秒
self.network_interface = '以太网4' # 修改为"以太网4"
# 远程服务器监控配置
self.remote_host = '192.168.5.229' # 服务器IP
self.remote_user = 'root'
self.remote_password = 'Ubains@123'
self.remote_check_interval = 30 # 秒
# 阈值配置 (百分比或MB)
self.cpu_threshold = 90 # CPU使用率阈值
self.mem_threshold = 90 # 内存使用率阈值
self.net_threshold = 10 # 网络吞吐量阈值(MB/s)
# 异常检测窗口
self.window_size = 5 # 检测最近5次记录
self.anomaly_count = 3 # 窗口内超过阈值3次则报警
# 本机网络监控
class LocalNetworkMonitor:
def __init__(self, config):
self.config = config
self.history = deque(maxlen=config.window_size)
self.actual_interface = self.find_actual_interface()
def find_actual_interface(self):
"""查找实际的网卡名称(Windows中文环境下可能需要匹配)"""
ifaces = psutil.net_io_counters(pernic=True).keys()
# 尝试精确匹配
if self.config.network_interface in ifaces:
return self.config.network_interface
# 尝试模糊匹配(针对Windows中文环境)
for iface in ifaces:
if '以太网' in iface and '4' in iface:
return iface
# 找不到匹配项,使用第一个可用网卡
available_ifaces = list(ifaces)
if available_ifaces:
logging.warning(f"网卡 {self.config.network_interface} 不存在,将使用 {available_ifaces[0]}")
return available_ifaces[0]
raise ValueError("没有可用的网络接口")
def get_network_stats(self):
"""获取当前网络吞吐量(MB/s)"""
stats = psutil.net_io_counters(pernic=True).get(self.actual_interface)
if not stats:
available_ifaces = psutil.net_io_counters(pernic=True).keys()
logging.warning(f"网卡 {self.actual_interface} 不可用,可用网卡: {', '.join(available_ifaces)}")
return 0, 0
sent_mb = stats.bytes_sent / (1024 * 1024)
recv_mb = stats.bytes_recv / (1024 * 1024)
return sent_mb, recv_mb
def check_anomaly(self, current_sent, current_recv):
"""检测网络流量异常"""
# 记录当前数据
self.history.append((current_sent, current_recv))
# 检查是否达到检测窗口
if len(self.history) < self.config.window_size:
return False
# 计算窗口内超过阈值的次数
exceed_count = 0
for sent, recv in self.history:
if sent > self.config.net_threshold or recv > self.config.net_threshold:
exceed_count += 1
return exceed_count >= self.config.anomaly_count
# 远程服务器监控
class RemoteServerMonitor:
def __init__(self, config):
self.config = config
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.cpu_history = deque(maxlen=config.window_size)
self.mem_history = deque(maxlen=config.window_size)
self.net_history = deque(maxlen=config.window_size)
def connect(self):
try:
self.ssh.connect(
self.config.remote_host,
username=self.config.remote_user,
password=self.config.remote_password
)
return True
except Exception as e:
logging.error(f"连接服务器失败: {str(e)}")
return False
def get_server_stats(self):
"""获取服务器资源使用情况"""
try:
# CPU使用率
stdin, stdout, stderr = self.ssh.exec_command("top -bn1 | grep 'Cpu(s)'")
cpu_line = stdout.read().decode().strip()
cpu_usage = float(cpu_line.split('%')[0].split(':')[1].strip())
# 内存使用率
stdin, stdout, stderr = self.ssh.exec_command("free -m | grep Mem")
mem_line = stdout.read().decode().split()
total_mem = float(mem_line[1])
used_mem = float(mem_line[2])
mem_usage = (used_mem / total_mem) * 100
# 网络吞吐量 (MB/s)
stdin, stdout, stderr = self.ssh.exec_command("cat /proc/net/dev | grep eth0")
net_line = stdout.read().decode().split()
bytes_recv = float(net_line[1])
bytes_sent = float(net_line[9])
recv_mb = bytes_recv / (1024 * 1024)
sent_mb = bytes_sent / (1024 * 1024)
return cpu_usage, mem_usage, sent_mb, recv_mb
except Exception as e:
logging.error(f"获取服务器状态失败: {str(e)}")
return None, None, None, None
def check_anomaly(self, cpu, mem, net_sent, net_recv):
"""检测服务器资源异常"""
# 记录数据
self.cpu_history.append(cpu)
self.mem_history.append(mem)
self.net_history.append((net_sent, net_recv))
# 检查是否达到检测窗口
if len(self.cpu_history) < self.config.window_size:
return False, False, False
# 计算各指标超过阈值的次数
cpu_exceed = sum(1 for x in self.cpu_history if x > self.config.cpu_threshold)
mem_exceed = sum(1 for x in self.mem_history if x > self.config.mem_threshold)
net_exceed = sum(1 for sent, recv in self.net_history
if sent > self.config.net_threshold or recv > self.config.net_threshold)
return (
cpu_exceed >= self.config.anomaly_count,
mem_exceed >= self.config.anomaly_count,
net_exceed >= self.config.anomaly_count
)
# 主监控程序
def main():
config = MonitorConfig()
local_monitor = LocalNetworkMonitor(config)
remote_monitor = RemoteServerMonitor(config)
# 初始化上次记录时间
last_local_check = time.time()
last_remote_check = time.time()
# 初始网络基准
last_sent, last_recv = local_monitor.get_network_stats()
logging.info(f"开始监控,使用网卡: {local_monitor.actual_interface}")
logging.info(f"服务器监控间隔: {config.remote_check_interval}秒")
logging.info(f"本机监控间隔: {config.local_check_interval}秒")
while True:
current_time = time.time()
# 本机网络监控
if current_time - last_local_check >= config.local_check_interval:
current_sent, current_recv = local_monitor.get_network_stats()
# 计算间隔期内的吞吐量(MB/s)
time_elapsed = current_time - last_local_check
sent_rate = (current_sent - last_sent) / time_elapsed
recv_rate = (current_recv - last_recv) / time_elapsed
logging.info(
f"本机网络 - 发送: {sent_rate:.2f} MB/s, "
f"接收: {recv_rate:.2f} MB/s"
)
# 检测异常
if local_monitor.check_anomaly(sent_rate, recv_rate):
logging.warning(
f"网络流量异常! 最近{config.window_size}次检测中,"
f"{config.anomaly_count}次超过阈值 {config.net_threshold} MB/s"
)
last_sent, last_recv = current_sent, current_recv
last_local_check = current_time
# 远程服务器监控
if current_time - last_remote_check >= config.remote_check_interval:
if not remote_monitor.ssh.get_transport() or not remote_monitor.ssh.get_transport().is_active():
remote_monitor.connect()
if remote_monitor.ssh.get_transport() and remote_monitor.ssh.get_transport().is_active():
cpu, mem, sent_mb, recv_mb = remote_monitor.get_server_stats()
if cpu is not None:
logging.info(
f"服务器状态 - CPU: {cpu:.1f}%, "
f"内存: {mem:.1f}%, "
f"网络: 发送 {sent_mb:.2f} MB/s, 接收 {recv_mb:.2f} MB/s"
)
# 检测异常
cpu_anomaly, mem_anomaly, net_anomaly = remote_monitor.check_anomaly(
cpu, mem, sent_mb, recv_mb
)
if cpu_anomaly:
logging.warning(
f"服务器CPU使用率异常! 最近{config.window_size}次检测中,"
f"{config.anomaly_count}次超过阈值 {config.cpu_threshold}%"
)
if mem_anomaly:
logging.warning(
f"服务器内存使用率异常! 最近{config.window_size}次检测中,"
f"{config.anomaly_count}次超过阈值 {config.mem_threshold}%"
)
if net_anomaly:
logging.warning(
f"服务器网络流量异常! 最近{config.window_size}次检测中,"
f"{config.anomaly_count}次超过阈值 {config.net_threshold} MB/s"
)
last_remote_check = current_time
time.sleep(1)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
logging.info("监控程序已停止")
except Exception as e:
logging.error(f"监控程序发生错误: {str(e)}")
import sys
import os
from hytest import *
# 获取当前脚本的绝对路径
# 获取 Base 目录的绝对路径,并加入 sys.path
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建预定系统的绝对路径
预定系统_path = os.path.abspath(os.path.join(current_dir, '..','..'))
# 添加路径
sys.path.append(预定系统_path)
# 导入模块
from 预定系统.Base.base import *
base_dir = os.path.abspath(os.path.join(current_dir, '..', '..', '..', 'Base'))
if base_dir not in sys.path:
print(f"Adding Base directory to sys.path: {base_dir}")
sys.path.insert(0, base_dir)
try:
from base import *
except ModuleNotFoundError as e:
print(f"ModuleNotFoundError: {e}")
def suite_setup():
STEP(1, "初始化浏览器")
......
import sys
import os
from hytest import *
# 获取当前脚本的绝对路径
# 获取 Base 目录的绝对路径,并加入 sys.path
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建预定系统的绝对路径 (修改路径构建方式)
预定系统_path = os.path.abspath(os.path.join(current_dir, '..', '..', '..'))
# 添加路径
sys.path.append(预定系统_path)
# 导入模块
from 预定系统.Base.base import *
base_dir = os.path.abspath(os.path.join(current_dir, '..', '..', '..', 'Base'))
if base_dir not in sys.path:
print(f"Adding Base directory to sys.path: {base_dir}")
sys.path.insert(0, base_dir)
try:
from base import *
except ModuleNotFoundError as e:
print(f"ModuleNotFoundError: {e}")
# 获取当前脚本所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建XLSX文件的绝对路径
xlsx_file_path = os.path.join(current_dir, '..', '..', '..', '测试数据', '会议预定测试用例.xlsx')
# 构建XLSX文件的绝对路径 (修正路径)
xlsx_file_path = os.path.join(预定系统_path, '测试数据', '会议预定测试用例.xlsx')
# xlsx_file_path = os.path.join(预定系统_path, '测试数据', '会议预定测试用例.xlsx')
class LicensePlateFilling:
tags = ['工商银行项目测试']
......
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论