提交 079adfda authored 作者: 陈泽健's avatar 陈泽健

refactor(base): 优化基础模块并移除冗余日志代码

- 在 browser_init 后增加 sleep(15) 以确保环境稳定
- 注释掉 get_remote_log_with_paramiko 的主函数调用块
- 注释掉 LogCollector 的主函数调用及使用示例
- 移除 create_test_xlsx.py 文件及相关用例生成逻辑
- 删除兰州用例.json配置文件
- 移除兰州登录功能开发相关文档
上级 76a8ce14
# -*- coding: utf-8 -*-
"""
根据模板用例 Excel,在同一个工作簿中新建 Sheet,并从 JSON 配置文件中读取用例数据写入。
后续只需要维护 JSON 文件即可复用。
"""
import os
import json
from copy import copy
from openpyxl import load_workbook
from openpyxl.styles import Alignment
# ===== 1. 配置路径(改为相对当前脚本的路径) =====
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEMPLATE_PATH = os.path.join(BASE_DIR, "用例文件", "兰州中石化项目测试用例20251203.xlsx")
NEW_SHEET_NAME = "兰州登录MQTT用例" # 新建的 Sheet 名
CASES_FILE = os.path.join(BASE_DIR, "config", "兰州用例.json") # 用例配置文件(JSON)
# 与你表头对应的顺序(根据截图)
headers_order = [
"序号", "功能模块", "功能类别", "用例编号", "功能描述", "用例等级",
"功能编号", "用例名称", "预置条件", "操作步骤", "JSON", "预期结果",
"测试结果", "测试结论", "日志截屏", "备注",
]
def load_cases():
"""从 JSON 配置文件加载用例列表。"""
if not os.path.exists(CASES_FILE):
raise FileNotFoundError(f"找不到用例配置文件: {CASES_FILE}")
with open(CASES_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("用例配置文件的根节点必须是列表(List)")
return data
def main():
if not os.path.exists(TEMPLATE_PATH):
print("找不到模板文件:", TEMPLATE_PATH)
return
try:
cases = load_cases()
except Exception as e:
print("加载用例配置失败:", e)
return
wb = load_workbook(TEMPLATE_PATH)
# 以第一个sheet作为模板
template_sheet = wb.worksheets[0]
# 如果新sheet已存在就先删除
if NEW_SHEET_NAME in wb.sheetnames:
ws_new = wb[NEW_SHEET_NAME]
wb.remove(ws_new)
ws_new = wb.create_sheet(NEW_SHEET_NAME)
# ===== 复制表头(仅值 + 简单样式) =====
# 假设模板表头在第3行(根据截图),如有偏差可以改成2或其它
header_row_index = 3
for col_idx, cell in enumerate(template_sheet[header_row_index], start=1):
new_cell = ws_new.cell(row=1, column=col_idx, value=cell.value)
if cell.has_style:
new_cell.font = copy(cell.font)
new_cell.fill = copy(cell.fill)
new_cell.border = copy(cell.border)
new_cell.alignment = copy(cell.alignment)
new_cell.number_format = cell.number_format
ws_new.freeze_panes = "B2"
# ===== 写入用例数据(预置条件/操作步骤自动换行,JSON列统一留空) =====
row = 2
for case in cases:
for col_idx, header in enumerate(headers_order, start=1):
val = case.get(header, "")
if header in ("预置条件", "操作步骤") and isinstance(val, str):
# 把分号+空格变成换行,Excel 中会显示为多行
val = val.replace("; ", "\n")
if header == "JSON":
# JSON 列统一留空
val = ""
ws_new.cell(row=row, column=col_idx, value=val)
row += 1
# ===== 对“预置条件”和“操作步骤”开启自动换行 =====
col_idx_pre = headers_order.index("预置条件") + 1
col_idx_steps = headers_order.index("操作步骤") + 1
for r in range(1, row):
cell_pre = ws_new.cell(row=r, column=col_idx_pre)
cell_steps = ws_new.cell(row=r, column=col_idx_steps)
cell_pre.alignment = Alignment(wrap_text=True, vertical="top")
cell_steps.alignment = Alignment(wrap_text=True, vertical="top")
# 调整列宽
for col in ws_new.columns:
max_len = 0
col_letter = col[0].column_letter
for c in col:
v = c.value
if v is None:
continue
l = len(str(v))
if l > max_len:
max_len = l
ws_new.column_dimensions[col_letter].width = min(max_len + 2, 60)
wb.save(TEMPLATE_PATH)
print("已在原文件中创建新Sheet:", NEW_SHEET_NAME)
if __name__ == "__main__":
main()
\ No newline at end of file
#!/usr/bin/env bash
# 容器镜像打包上传至公司网盘脚本
# 功能:
# 1. 将当前目录压缩成 tar.gz 格式文件
# 2. 上传到公司 NAS 网盘(SMB 协议)
# 3. 显示上传进度
set -euo pipefail
# ================================
# 配置区(密码已加密存储)
# ================================
NAS_SERVER="192.168.9.9"
NAS_SHARE="home"
NAS_USER="陈泽键"
NAS_PASS_ENCRYPTED="Mm01ZUwuTUA="
NAS_MOUNT_POINT="/mnt/nas_upload"
# 上传目录(网盘上的目标目录)
NAS_UPLOAD_DIR="容器镜像包"
log() {
local level="$1"; shift
printf '[%(%F %T)T] [%s] %s\n' -1 "${level}" "$*"
}
# 解密密码
decrypt_password() {
echo "${NAS_PASS_ENCRYPTED}" | base64 -d
}
# 检查依赖
check_dependencies() {
local missing_deps=()
if ! command -v cifs-utils >/dev/null 2>&1 && ! command -v mount.cifs >/dev/null 2>&1; then
# 检查是否可以挂载 cifs
if ! grep -q cifs /proc/filesystems 2>/dev/null; then
missing_deps+=("cifs-utils")
fi
fi
if ! command -v pv >/dev/null 2>&1; then
log "WARN" "未安装 pv 工具,将使用 cp 命令(无进度显示)"
fi
if [[ ${#missing_deps[@]} -gt 0 ]]; then
log "ERROR" "缺少依赖: ${missing_deps[*]}"
log "INFO" "请执行: yum install -y ${missing_deps[*]} 或 apt install -y ${missing_deps[*]}"
return 1
fi
return 0
}
# 挂载 NAS
mount_nas() {
local nas_pass
nas_pass="$(decrypt_password)"
# 创建挂载点
if [[ ! -d "${NAS_MOUNT_POINT}" ]]; then
log "INFO" "创建挂载点: ${NAS_MOUNT_POINT}"
sudo mkdir -p "${NAS_MOUNT_POINT}"
fi
# 检查是否已挂载
if mountpoint -q "${NAS_MOUNT_POINT}" 2>/dev/null; then
log "INFO" "NAS 已挂载,跳过挂载步骤"
return 0
fi
log "INFO" "正在挂载 NAS: //${NAS_SERVER}/${NAS_SHARE}"
# 挂载 SMB/CIFS 共享
if ! sudo mount -t cifs "//${NAS_SERVER}/${NAS_SHARE}" "${NAS_MOUNT_POINT}" \
-o "username=${NAS_USER},password=${nas_pass},iocharset=utf8,vers=2.0"; then
log "ERROR" "NAS 挂载失败"
log "INFO" "请检查: 1) 网络连接 2) 账号密码 3) 共享路径"
return 1
fi
log "INFO" "✅ NAS 挂载成功"
return 0
}
# 卸载 NAS
unmount_nas() {
if mountpoint -q "${NAS_MOUNT_POINT}" 2>/dev/null; then
log "INFO" "正在卸载 NAS..."
sudo umount "${NAS_MOUNT_POINT}" || true
fi
}
# 打包当前目录
pack_current_dir() {
local pack_name="$1"
local source_dir="$2"
log "INFO" "开始打包目录: ${source_dir}"
log "INFO" "目标文件: ${pack_name}"
# 获取目录大小用于进度估算
local dir_size
dir_size=$(du -sb "${source_dir}" 2>/dev/null | awk '{print $1}')
if command -v pv >/dev/null 2>&1; then
# 使用 pv 显示打包进度
tar -cf - -C "$(dirname "${source_dir}")" "$(basename "${source_dir}")" | \
pv -s "${dir_size}" -p -t -e -r | \
gzip > "${pack_name}"
else
# 无 pv 时直接打包
tar -czvf "${pack_name}" -C "$(dirname "${source_dir}")" "$(basename "${source_dir}")"
fi
if [[ -f "${pack_name}" ]]; then
local file_size
file_size=$(du -h "${pack_name}" | awk '{print $1}')
log "INFO" "✅ 打包完成,文件大小: ${file_size}"
return 0
else
log "ERROR" "打包失败"
return 1
fi
}
# 上传文件到 NAS(带进度显示)
upload_to_nas() {
local source_file="$1"
local target_dir="${NAS_MOUNT_POINT}/${NAS_UPLOAD_DIR}"
# 确保目标目录存在
if [[ ! -d "${target_dir}" ]]; then
log "INFO" "创建目标目录: ${target_dir}"
sudo mkdir -p "${target_dir}"
fi
local file_name
file_name=$(basename "${source_file}")
local target_file="${target_dir}/${file_name}"
log "INFO" "开始上传文件到 NAS..."
log "INFO" "源文件: ${source_file}"
log "INFO" "目标: //${NAS_SERVER}/${NAS_SHARE}/${NAS_UPLOAD_DIR}/${file_name}"
# 获取文件大小
local file_size
file_size=$(stat -c%s "${source_file}" 2>/dev/null || stat -f%z "${source_file}" 2>/dev/null)
if command -v pv >/dev/null 2>&1; then
# 使用 pv 显示上传进度
pv -p -t -e -r "${source_file}" | sudo tee "${target_file}" > /dev/null
else
# 无 pv 时使用 cp 并显示简单进度
log "INFO" "正在上传(无进度显示)..."
sudo cp "${source_file}" "${target_file}"
fi
# 验证上传结果
if [[ -f "${target_file}" ]]; then
local uploaded_size
uploaded_size=$(sudo du -h "${target_file}" | awk '{print $1}')
log "INFO" "✅ 上传完成!"
log "INFO" " 文件路径: \\\\${NAS_SERVER}\\${NAS_SHARE}\\${NAS_UPLOAD_DIR}\\${file_name}"
log "INFO" " 文件大小: ${uploaded_size}"
return 0
else
log "ERROR" "上传失败,目标文件不存在"
return 1
fi
}
# 显示使用说明
usage() {
cat <<'EOF'
用法:
upload_to_nas.sh [选项] [目录路径]
选项:
-h, --help 显示帮助信息
-n, --name 指定打包文件名(不含扩展名)
-d, --dir 指定上传到网盘的子目录
参数:
目录路径 要打包上传的目录,默认为当前目录
示例:
# 打包并上传当前目录
./upload_to_nas.sh
# 打包并上传指定目录
./upload_to_nas.sh /data/temp/container-images
# 指定打包文件名
./upload_to_nas.sh -n java_container_v6 /data/temp/java
网盘信息:
服务器: \\192.168.9.9\home
账号: 陈泽键
上传目录: 容器镜像包/
EOF
}
# 清理函数
cleanup() {
unmount_nas
}
# 主函数
main() {
local source_dir="."
local pack_name=""
local custom_upload_dir=""
# 解析参数
while [[ $# -gt 0 ]]; do
case "$1" in
-h|--help)
usage
exit 0
;;
-n|--name)
pack_name="$2"
shift 2
;;
-d|--dir)
custom_upload_dir="$2"
shift 2
;;
-*)
log "ERROR" "未知选项: $1"
usage
exit 1
;;
*)
source_dir="$1"
shift
;;
esac
done
# 设置自定义上传目录
if [[ -n "${custom_upload_dir}" ]]; then
NAS_UPLOAD_DIR="${custom_upload_dir}"
fi
# 转换为绝对路径
source_dir=$(cd "${source_dir}" && pwd)
if [[ ! -d "${source_dir}" ]]; then
log "ERROR" "目录不存在: ${source_dir}"
exit 1
fi
# 生成打包文件名
if [[ -z "${pack_name}" ]]; then
local dir_name
dir_name=$(basename "${source_dir}")
local timestamp
timestamp=$(date +%Y%m%d_%H%M%S)
pack_name="${dir_name}_${timestamp}"
fi
local pack_file="/tmp/${pack_name}.tar.gz"
log "INFO" "=================================================================="
log "INFO" "容器镜像打包上传工具"
log "INFO" "=================================================================="
log "INFO" "源目录: ${source_dir}"
log "INFO" "打包文件: ${pack_file}"
log "INFO" "目标网盘: \\\\${NAS_SERVER}\\${NAS_SHARE}\\${NAS_UPLOAD_DIR}"
log "INFO" "=================================================================="
# 设置清理钩子
trap cleanup EXIT
# 检查依赖
if ! check_dependencies; then
exit 1
fi
# 步骤1: 打包目录
log "INFO" "[步骤 1/3] 打包目录..."
if ! pack_current_dir "${pack_file}" "${source_dir}"; then
exit 1
fi
# 步骤2: 挂载 NAS
log "INFO" "[步骤 2/3] 挂载 NAS..."
if ! mount_nas; then
exit 1
fi
# 步骤3: 上传文件
log "INFO" "[步骤 3/3] 上传文件..."
if ! upload_to_nas "${pack_file}"; then
exit 1
fi
# 清理临时文件
log "INFO" "清理临时文件..."
rm -f "${pack_file}"
log "INFO" "=================================================================="
log "INFO" "🎉 全部完成!"
log "INFO" "=================================================================="
return 0
}
main "$@"
......@@ -942,6 +942,7 @@ def dingding_send_message(latest_report, title, mobile, ding_type):
# 调用测试结果获取函数
browser_init("兰州中石化项目测试环境")
sleep(15)
wd = GSTORE['wd']
# print(latest_report)
test_result = get_test_result(latest_report, wd)
......@@ -2118,21 +2119,21 @@ def get_remote_log_with_paramiko(host, username, private_key_path, passphrase, l
if 'client' in locals():
client.close()
if __name__ == "__main__":
host = "192.168.5.218" # 替换为你的服务器 IP 或域名
username = "root" # 替换为你的用户名
# private_key_path = "C:\\Users\\29194\\.ssh\\id_rsa" # 替换为你的私钥文件路径
private_key_path = "C:/Users/29194/.ssh/id_rsa"
passphrase = "Ubains@123" # 确保这是你的私钥文件的正确 passphrase
log_path = "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log" # 替换为你要读取的日志文件路径
filter_word = "" # 替换为你想要过滤的关键字
log_content = get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path)
if log_content:
print(log_content)
else:
print("Failed to retrieve log content.")
# if __name__ == "__main__":
# host = "192.168.5.218" # 替换为你的服务器 IP 或域名
# username = "root" # 替换为你的用户名
# # private_key_path = "C:\\Users\\29194\\.ssh\\id_rsa" # 替换为你的私钥文件路径
# private_key_path = "C:/Users/29194/.ssh/id_rsa"
# passphrase = "Ubains@123" # 确保这是你的私钥文件的正确 passphrase
# log_path = "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log" # 替换为你要读取的日志文件路径
# filter_word = "" # 替换为你想要过滤的关键字
#
# log_content = get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path)
#
# if log_content:
# print(log_content)
# else:
# print("Failed to retrieve log content.")
......@@ -2228,24 +2229,24 @@ class LogCollector:
print("Log collection thread terminated.")
# 使用示例
if __name__ == "__main__":
host = "192.168.5.218"
username = "root"
private_key_path = "C:\\Users\\29194\\.ssh\\id_rsa" # 替换为你的私钥文件路径
passphrase = "Ubains@123" # 替换为你的 passphrase
log_path = "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log"
output_file = "collected_logs.txt"
collector = LogCollector(host, username, private_key_path, passphrase, log_path)
# 开始收集日志
collector.start_collection()
# 假设一段时间后停止收集日志
try:
while True:
time.sleep(1) # 保持主程序运行
except KeyboardInterrupt:
print("Stopping log collection due to user interrupt.")
collector.stop_collection(output_file)
print("日志收集完成!")
\ No newline at end of file
# if __name__ == "__main__":
# host = "192.168.5.218"
# username = "root"
# private_key_path = "C:\\Users\\29194\\.ssh\\id_rsa" # 替换为你的私钥文件路径
# passphrase = "Ubains@123" # 替换为你的 passphrase
# log_path = "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log"
# output_file = "collected_logs.txt"
#
# collector = LogCollector(host, username, private_key_path, passphrase, log_path)
#
# # 开始收集日志
# collector.start_collection()
#
# # 假设一段时间后停止收集日志
# try:
# while True:
# time.sleep(1) # 保持主程序运行
# except KeyboardInterrupt:
# print("Stopping log collection due to user interrupt.")
# collector.stop_collection(output_file)
# print("日志收集完成!")
\ No newline at end of file
......@@ -49,19 +49,19 @@ class MeetingManageQuickTest:
if element_type == "click":
safe_click((locator_type, locator_value), wd)
sleep(2)
SELENIUM_LOG_SCREEN(wd, "75")
# SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "input":
safe_send_keys((locator_type, locator_value), element_value, wd)
sleep(2)
SELENIUM_LOG_SCREEN(wd, "75")
# SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "SwitchWindow":
# 将字符转换为int类型
element_value = int(element_value)
wd.switch_to.window(wd.window_handles[element_value])
sleep(2)
SELENIUM_LOG_SCREEN(wd, "75")
# SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "login":
# 退出系统登录
......
......@@ -86,7 +86,7 @@ start_workers(3)
# 定时执行预定系统测试任务
schedule.every().day.at("10:00").do(run_task, run_automation_test, report_title="预定系统测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="预定系统测试", ding_type="标准版巡检")
# 定时每一小时执行一次预定快速测试任务
schedule.every(2).hours.do(run_task, run_automation_test, report_title="预定系统快速测试测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="预定系统快速测试", ding_type="标准版巡检")
schedule.every(0.5).hours.do(run_task, run_automation_test, report_title="预定系统快速测试测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="预定系统快速测试", ding_type="标准版巡检")
# 定时执行展厅巡检任务
# schedule.every().day.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论