提交 c15c2ca2 authored 作者: 陈泽健's avatar 陈泽健

feat(deploy): 添加接口安全测试需求文档和远程部署脚本

- 添加接口安全测试需求文档,包含代码路径、系统账号信息、接口清单等
- 更新远程部署配置,修正NAS目录路径和部署脚本执行逻辑
- 添加完整的X86欧拉服务器部署脚本,包括中间件安装和服务部署
- 增强功能测试报告生成工具,支持用户自定义测试单ID上传到ERP
- 优化部署日志监控和进程管理功能
上级 a675bb6b
......@@ -355,12 +355,13 @@ def generate_report_main(
logger = setup_logger()
for report_path in docx_reports:
# 询问用户是否上传
if ask_upload_confirmation_cli(report_path):
print(f"\n正在上传报告到ERP...")
success = upload_report_to_erp(report_path, logger)
# 询问用户是否上传,并获取测试单ID
confirmed, testing_id = ask_upload_confirmation_cli(report_path)
if confirmed:
print(f"\n正在上传报告到ERP(测试单ID: {testing_id})...")
success = upload_report_to_erp(report_path, logger, developtesting_id=testing_id)
if success:
print(f" [OK] 报告已成功上传到ERP")
print(f" [OK] 报告已成功上传到ERP(测试单ID: {testing_id})")
else:
print(f" [X] 报告上传失败,请查看日志")
else:
......
......@@ -13,7 +13,7 @@ import time
import logging
import os
import tempfile
from typing import Optional
from typing import Optional, Tuple
from io import BytesIO
import requests
......@@ -199,24 +199,29 @@ def upload_image_to_erp(img_bytes: bytes, idx: int, logger: logging.Logger) -> O
return None
def create_report_in_erp(content: str, logger: logging.Logger) -> Optional[int]:
def create_report_in_erp(content: str, logger: logging.Logger, developtesting_id: int = None) -> Optional[int]:
"""
在ERP中创建测试报告(含重试机制)
Args:
content: HTML格式的报告内容
logger: 日志记录器
developtesting_id: 测试单ID(可选,默认使用config中的配置)
Returns:
报告ID,失败返回None
"""
# 如果没有指定测试单ID,使用config中的默认值
if developtesting_id is None:
developtesting_id = ERP_DEVELOPTESTING_ID
url = f'{ERP_BASE_URL}{ERP_CREATE_REPORT_URL}'
for attempt in range(ERP_MAX_RETRIES):
try:
# 准备请求参数
data = {
'developtesting_id': ERP_DEVELOPTESTING_ID,
'developtesting_id': developtesting_id,
'type_id': ERP_TYPE_ID,
'content': content,
'descript': '',
......@@ -233,7 +238,7 @@ def create_report_in_erp(content: str, logger: logging.Logger) -> Optional[int]:
logger.info(f"请求URL: {url}")
logger.info(f"请求Headers: X-Api-Key={ERP_API_KEY[:10]}..., Content-Type=application/json")
logger.info(f"请求参数:")
logger.info(f" - developtesting_id: {ERP_DEVELOPTESTING_ID}")
logger.info(f" - developtesting_id: {developtesting_id}")
logger.info(f" - type_id: {ERP_TYPE_ID}")
logger.info(f" - content: {len(content)} 字符 (HTML)")
logger.info(f" - descript: (空)")
......@@ -374,17 +379,22 @@ def word_to_html_with_images(file_path: str, logger: logging.Logger) -> Optional
return None
def upload_report_to_erp(file_path: str, logger: logging.Logger = None) -> bool:
def upload_report_to_erp(file_path: str, logger: logging.Logger = None, developtesting_id: int = None) -> bool:
"""
上传报告到ERP的完整流程
Args:
file_path: Word报告文件路径
logger: 日志记录器(可选,默认创建新的logger)
developtesting_id: 测试单ID(可选,默认使用config中的配置)
Returns:
是否上传成功
"""
# 如果没有指定测试单ID,使用config中的默认值
if developtesting_id is None:
developtesting_id = ERP_DEVELOPTESTING_ID
# 如果没有提供logger,创建新的logger
if logger is None:
logger = setup_logger()
......@@ -394,7 +404,7 @@ def upload_report_to_erp(file_path: str, logger: logging.Logger = None) -> bool:
logger.info(f"报告文件: {file_path}")
logger.info(f"ERP配置:")
logger.info(f" - Base URL: {ERP_BASE_URL}")
logger.info(f" - 测试单ID: {ERP_DEVELOPTESTING_ID}")
logger.info(f" - 测试单ID: {developtesting_id}")
logger.info(f" - 报告类型ID: {ERP_TYPE_ID}")
logger.info("=" * 50)
......@@ -408,7 +418,7 @@ def upload_report_to_erp(file_path: str, logger: logging.Logger = None) -> bool:
# 2. 创建ERP报告
logger.info(f"[步骤2/2] 创建ERP测试报告")
report_id = create_report_in_erp(html_content, logger)
report_id = create_report_in_erp(html_content, logger, developtesting_id=developtesting_id)
if report_id is None:
logger.error("✗ ERP报告创建失败")
return False
......@@ -420,15 +430,17 @@ def upload_report_to_erp(file_path: str, logger: logging.Logger = None) -> bool:
return True
def ask_upload_confirmation_cli(report_path: str) -> bool:
def ask_upload_confirmation_cli(report_path: str) -> Tuple[bool, Optional[int]]:
"""
控制台交互:询问用户是否上传报告到ERP
控制台交互:询问用户是否上传报告到ERP,并输入测试单ID
Args:
report_path: 报告文件路径
Returns:
用户选择(True=上传,False=不上传)
元组 (用户选择, 测试单ID)
- (True, int) 用户确认上传并提供了测试单ID
- (False, None) 用户选择不上传
"""
while True:
print(f"\n{'='*50}")
......@@ -441,9 +453,22 @@ def ask_upload_confirmation_cli(report_path: str) -> bool:
choice = 'N'
if choice == 'Y':
return True
# 用户确认上传,继续输入测试单ID
while True:
id_input = input(f"请输入测试单ID (默认={ERP_DEVELOPTESTING_ID}): ").strip()
if not id_input:
# 按回车使用默认值
return True, ERP_DEVELOPTESTING_ID
try:
testing_id = int(id_input)
if testing_id > 0:
return True, testing_id
else:
print("测试单ID必须为正整数,请重新输入")
except ValueError:
print("输入无效,请输入数字")
elif choice == 'N':
return False
return False, None
else:
print("输入错误,请输入 Y 或 N")
......
......@@ -496,30 +496,35 @@ class ReportGeneratorGUI:
def _ask_upload_to_erp(self, report_path: str):
"""
询问用户是否上传报告到ERP
询问用户是否上传报告到ERP,并获取测试单ID
Args:
report_path: 报告文件路径
"""
# 使用自定义对话框替代messagebox,确保按钮正常显示
result = self._show_upload_dialog(report_path)
confirmed, testing_id = self._show_upload_dialog(report_path)
if result:
# 用户选择上传
self._upload_to_erp(report_path)
if confirmed:
# 用户选择上传,透传测试单ID
self._upload_to_erp(report_path, developtesting_id=testing_id)
else:
self._log("[!] 跳过上传到ERP", "warning")
def _show_upload_dialog(self, report_path: str) -> bool:
def _show_upload_dialog(self, report_path: str) -> tuple:
"""
显示自定义上传确认对话框
显示自定义上传确认对话框(含测试单ID输入框)
Args:
report_path: 报告文件路径
Returns:
用户选择(True=上传,False=不上传)
元组 (用户选择, 测试单ID)
- (True, int) 用户确认上传并提供了测试单ID
- (False, None) 用户选择不上传
"""
# 导入ERP配置
from src.config import ERP_DEVELOPTESTING_ID
# 创建对话框窗口
dialog = tk.Toplevel(self.root)
dialog.title("上传报告到ERP")
......@@ -530,10 +535,10 @@ class ReportGeneratorGUI:
dialog.grab_set()
# 先设置一个临时大小
dialog.geometry("550x220+300+200")
dialog.geometry("550x280+300+200")
# 消息内容
message_frame = tk.Frame(dialog, padx=30, pady=20)
message_frame = tk.Frame(dialog, padx=30, pady=15)
message_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
tk.Label(
......@@ -541,7 +546,7 @@ class ReportGeneratorGUI:
text="报告已生成完成!\n\n是否将报告上传到ERP测试单?",
font=("微软雅黑", 11),
justify=tk.LEFT
).pack(anchor=tk.W, pady=(0, 10))
).pack(anchor=tk.W, pady=(0, 5))
tk.Label(
message_frame,
......@@ -549,20 +554,64 @@ class ReportGeneratorGUI:
font=("微软雅黑", 9),
fg="gray",
justify=tk.LEFT
).pack(anchor=tk.W)
).pack(anchor=tk.W, pady=(0, 10))
# 测试单ID输入区域
id_frame = tk.Frame(message_frame)
id_frame.pack(anchor=tk.W, pady=(5, 0))
tk.Label(
id_frame,
text="测试单ID:",
font=("微软雅黑", 10),
).pack(side=tk.LEFT)
id_entry = tk.Entry(id_frame, width=15, font=("微软雅黑", 10))
id_entry.insert(0, str(ERP_DEVELOPTESTING_ID))
id_entry.pack(side=tk.LEFT, padx=(5, 0))
id_entry.select_range(0, tk.END)
id_entry.focus_set()
tk.Label(
id_frame,
text="(可修改,默认使用配置值)",
font=("微软雅黑", 8),
fg="gray"
).pack(side=tk.LEFT, padx=(10, 0))
# 按钮区域(放在dialog的底部,固定高度)
button_frame = tk.Frame(dialog, bg="#f0f0f0", height=60)
button_frame.pack(side=tk.BOTTOM, fill=tk.X)
result = [False]
result = [False, None]
def on_yes():
# 获取用户输入的测试单ID
id_text = id_entry.get().strip()
if not id_text:
# 空值使用默认值
result[0] = True
result[1] = ERP_DEVELOPTESTING_ID
else:
try:
testing_id = int(id_text)
if testing_id > 0:
result[0] = True
result[1] = testing_id
else:
# 无效输入,提示用户
from tkinter import messagebox
messagebox.showwarning("输入错误", "测试单ID必须为正整数", parent=dialog)
return
except ValueError:
from tkinter import messagebox
messagebox.showwarning("输入错误", "请输入有效的数字", parent=dialog)
return
dialog.destroy()
def on_no():
result[0] = False
result[1] = None
dialog.destroy()
tk.Button(
......@@ -696,17 +745,18 @@ class ReportGeneratorGUI:
# 等待对话框关闭
dialog.wait_window()
def _upload_to_erp(self, report_path: str):
def _upload_to_erp(self, report_path: str, developtesting_id: int = None):
"""
上传报告到ERP
Args:
report_path: 报告文件路径
developtesting_id: 测试单ID(可选,默认使用config中的配置)
"""
try:
from src.erp_uploader import upload_report_to_erp, setup_logger
self._log("\n开始上传报告到ERP...", "info")
self._log(f"\n开始上传报告到ERP(测试单ID: {developtesting_id})...", "info")
# 设置日志,使用GUI输出
logger = setup_logger()
......@@ -715,7 +765,7 @@ class ReportGeneratorGUI:
# 在后台线程中上传
thread = threading.Thread(
target=self._upload_to_erp_thread,
args=(report_path, logger),
args=(report_path, logger, developtesting_id),
daemon=True
)
thread.start()
......@@ -727,16 +777,17 @@ class ReportGeneratorGUI:
self._log(f"[X] ERP上传功能异常: {str(e)}", "error")
self._show_error_dialog("错误", f"ERP上传功能异常:\n{str(e)}")
def _upload_to_erp_thread(self, report_path: str, logger):
def _upload_to_erp_thread(self, report_path: str, logger, developtesting_id: int = None):
"""
在后台线程中上传报告到ERP
Args:
report_path: 报告文件路径
logger: 日志记录器
developtesting_id: 测试单ID(可选,默认使用config中的配置)
"""
try:
success = upload_report_to_erp(report_path, logger)
success = upload_report_to_erp(report_path, logger, developtesting_id=developtesting_id)
if success:
self._log("[OK] 报告已成功上传到ERP", "success")
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
X86欧拉部署 - 正确的中间件安装脚本
自带log函数,不source replace_ip_interactive.sh(它有exit会终止shell)
"""
import paramiko
import time
import sys
import re
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
HOST = '192.168.5.52'
USER = 'root'
PASS = 'Ubains@123'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(HOST, username=USER, password=PASS, timeout=30, look_for_keys=False, allow_agent=False)
def run(cmd, timeout=300):
stdin, stdout, stderr = ssh.exec_command(cmd, timeout=timeout)
out = stdout.read().decode('utf-8', errors='replace')
err = stderr.read().decode('utf-8', errors='replace')
rc = stdout.channel.recv_exit_status()
return rc, out, err
# ============ 创建部署脚本(自带log函数) ============
print('[准备] 创建中间件安装启动脚本(自带log函数)')
launcher_script = r"""#!/bin/bash
# ============ 自定义log函数 ============
LOG_FILE="/var/log/new_auto_script.log"
mkdir -p "$(dirname "$LOG_FILE")" 2>/dev/null
function log() {
local timestamp=$(date "+%Y-%m-%d %H:%M:%S")
if [ $# -lt 2 ]; then
echo "Usage: log <level> <message>"
return 1
fi
local level=$1
local message=$2
echo "[$timestamp] [$level] $message"
echo "[$timestamp] [$level] $message" >> "$LOG_FILE"
}
# ============ 环境变量 ============
export TERM=dumb
export server_ip=192.168.5.52
export middleware_type="mysql redis emqx nacos nginx"
export LOG_FILE="/var/log/new_auto_script.log"
cd /data/offline_auto_unifiedPlatform
# ============ source中间件安装脚本(不source replace_ip_interactive.sh) ============
source auto_middleware_install.sh
# ============ 执行中间件安装 ============
log "INFO" "========== 开始安装中间件 =========="
install_middleware
INSTALL_RC=$?
log "INFO" "========== 中间件安装完成, 返回码: $INSTALL_RC =========="
# ============ 配置防火墙 ============
log "INFO" "========== 配置防火墙 =========="
source auto_firewall_settings.sh
# ============ 配置定时任务 ============
log "INFO" "========== 配置定时任务 =========="
bash auto_crontab_settings.sh
echo "DEPLOY_ALL_DONE"
"""
sftp = ssh.open_sftp()
with sftp.file('/tmp/run_deploy_all.sh', 'w') as f:
f.write(launcher_script)
sftp.close()
run('chmod +x /tmp/run_deploy_all.sh')
# 清理日志
run('> /data/deploy_all.log')
# 后台启动
rc, out, _ = run('nohup /tmp/run_deploy_all.sh </dev/null >/data/deploy_all.log 2>&1 & echo $!')
pid = out.strip()
print(f' 部署进程已启动, PID: {pid}')
# ============ 监控进度 ============
print()
print('[监控] 等待中间件安装完成...')
start = time.time()
max_wait = 3600 # 60分钟
waited = 0
last_report_min = -1
while waited < max_wait:
time.sleep(15)
waited += 15
current_min = int(waited / 60)
# 每3分钟报告进度
if current_min > last_report_min and current_min % 3 == 0:
last_report_min = current_min
# 检查完成标志
rc, done_out, _ = run('grep "DEPLOY_ALL_DONE" /data/deploy_all.log 2>/dev/null && echo FOUND || echo NOT_FOUND')
if 'FOUND' in done_out:
print(f' [{current_min}分钟] 部署全部完成!')
break
# 检查进程
rc, ps_out, _ = run(f'ps -p {pid} > /dev/null 2>&1 && echo RUNNING || echo STOPPED')
# 查看容器
rc, docker_out, _ = run('docker ps --format "{{.Names}}" 2>/dev/null | wc -l')
container_count = docker_out.strip()
# 获取最后几行日志
rc, tail_out, _ = run('tail -8 /data/deploy_all.log 2>/dev/null')
elapsed = int(time.time() - start)
print(f' [{current_min}分钟/{elapsed}秒] 进程:{ps_out.strip()} 容器数:{container_count}')
for line in tail_out.strip().split('\n')[-5:]:
if line.strip():
clean = re.sub(r'\x1b\[[0-9;]*m', '', line.strip())
if clean:
print(f' > {clean}')
if 'STOPPED' in ps_out:
time.sleep(5)
rc, done2, _ = run('grep "DEPLOY_ALL_DONE" /data/deploy_all.log 2>/dev/null && echo FOUND || echo NOT_FOUND')
if 'FOUND' in done2:
print(' 部署全部完成!')
else:
print(' 进程已结束但未检测到完成标志,检查日志...')
rc, last_log, _ = run('tail -15 /data/deploy_all.log 2>/dev/null')
for line in last_log.strip().split('\n')[-10:]:
clean = re.sub(r'\x1b\[[0-9;]*m', '', line)
if clean.strip():
print(f' > {clean.strip()}')
break
if waited >= max_wait:
print(' [ERROR] 部署超时!')
# ============ 最终状态 ============
print()
print('=' * 50)
print('[最终状态]')
rc, out, _ = run('docker ps --format "table {{.Names}}\t{{.Status}}\t{{.Ports}}" 2>/dev/null')
print(f'Docker容器:\n{out}')
total_min = int((time.time() - start) / 60)
print(f'部署用时: {total_min}分钟')
ssh.close()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
X86欧拉部署 - 中间件安装(含log函数)
先source replace_ip_interactive.sh获取log函数,再source auto_middleware_install.sh
"""
import paramiko
import time
import sys
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
HOST = '192.168.5.52'
USER = 'root'
PASS = 'Ubains@123'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(HOST, username=USER, password=PASS, timeout=30, look_for_keys=False, allow_agent=False)
def run(cmd, timeout=300):
stdin, stdout, stderr = ssh.exec_command(cmd, timeout=timeout)
out = stdout.read().decode('utf-8', errors='replace')
err = stderr.read().decode('utf-8', errors='replace')
rc = stdout.channel.recv_exit_status()
return rc, out, err
# ============ 创建带log函数的部署启动脚本 ============
print('[准备] 创建中间件安装启动脚本(含log函数)')
launcher_script = r"""#!/bin/bash
export TERM=dumb
export server_ip=192.168.5.52
export middleware_type="mysql redis emqx nacos nginx"
export LOG_FILE="/var/log/new_auto_script.log"
mkdir -p "$(dirname "$LOG_FILE")" 2>/dev/null
cd /data/offline_auto_unifiedPlatform
# 先source replace_ip_interactive.sh获取log函数
source replace_ip_interactive.sh
# 再source中间件安装脚本
source auto_middleware_install.sh
# 执行中间件安装
install_middleware
echo "MIDDLEWARE_INSTALL_DONE"
"""
# 上传启动脚本
sftp = ssh.open_sftp()
with sftp.file('/tmp/run_middleware.sh', 'w') as f:
f.write(launcher_script)
sftp.close()
run('chmod +x /tmp/run_middleware.sh')
# 清理旧日志
run('> /data/middleware_deploy.log')
# 后台启动
rc, out, _ = run('nohup /tmp/run_middleware.sh </dev/null >/data/middleware_deploy.log 2>&1 & echo $!')
pid = out.strip()
print(f' 部署进程已启动, PID: {pid}')
print(f' 日志文件: /data/middleware_deploy.log')
# ============ 监控进度 ============
print()
print('[监控] 等待中间件安装完成...')
start = time.time()
max_wait = 3600 # 60分钟超时
waited = 0
last_check = 0
while waited < max_wait:
time.sleep(20)
waited += 20
current_min = int(waited / 60)
# 每2分钟打印进度
if current_min > last_check and current_min % 2 == 0:
last_check = current_min
# 检查完成标志
rc, done_out, _ = run('grep "MIDDLEWARE_INSTALL_DONE" /data/middleware_deploy.log 2>/dev/null && echo FOUND || echo NOT_FOUND')
if 'FOUND' in done_out:
print(f' [{current_min}分钟] 中间件安装完成!')
break
# 检查进程
rc, ps_out, _ = run(f'ps -p {pid} > /dev/null 2>&1 && echo RUNNING || echo STOPPED')
# 获取日志尾部
rc, tail_out, _ = run('tail -5 /data/middleware_deploy.log 2>/dev/null | grep -v "^$" | tail -3')
elapsed = int(time.time() - start)
print(f' [{current_min}分钟/{elapsed}秒] 进程:{ps_out.strip()}')
# 解码tail输出中的ANSI颜色码,提取关键信息
for line in tail_out.strip().split('\n'):
if line.strip():
# 去掉ANSI颜色码
import re
clean = re.sub(r'\x1b\[[0-9;]*m', '', line.strip())
print(f' > {clean}')
if 'STOPPED' in ps_out:
time.sleep(5)
rc, done2, _ = run('grep "MIDDLEWARE_INSTALL_DONE" /data/middleware_deploy.log 2>/dev/null && echo FOUND || echo NOT_FOUND')
if 'FOUND' in done2:
print(' 中间件安装完成!')
else:
print(' 进程已结束,但未检测到完成标志,检查日志...')
rc, last_log, _ = run('tail -20 /data/middleware_deploy.log 2>/dev/null')
import re
for line in last_log.strip().split('\n')[-10:]:
clean = re.sub(r'\x1b\[[0-9;]*m', '', line)
print(f' > {clean}')
break
if waited >= max_wait:
print(' [ERROR] 中间件安装超时!')
# ============ 最终状态检查 ============
print()
print('[检查] 部署最终状态')
rc, out, _ = run('docker ps --format "table {{.Names}}\t{{.Status}}" 2>/dev/null')
print(f'Docker容器:\n{out}')
total_time = int((time.time() - start) / 60)
print(f'\n中间件安装用时: {total_time}分钟')
ssh.close()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
X86欧拉部署 - 加载Java/Python/Voice Docker镜像 + 安装宿主机JDK + 启动ujava2容器
"""
import paramiko
import time
import sys
import re
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
HOST = '192.168.5.52'
USER = 'root'
PASS = 'Ubains@123'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(HOST, username=USER, password=PASS, timeout=30, look_for_keys=False, allow_agent=False)
def run(cmd, timeout=600):
stdin, stdout, stderr = ssh.exec_command(cmd, timeout=timeout)
out = stdout.read().decode('utf-8', errors='replace')
err = stderr.read().decode('utf-8', errors='replace')
rc = stdout.channel.recv_exit_status()
return rc, out, err
start = time.time()
# ============ 步骤1: 加载Java Docker镜像 ============
print('[步骤1] 加载Java Docker镜像 (java1.8.0_492.tar.gz)...')
rc, out, err = run('docker load -i /data/temp/java1.8.0_492.tar.gz 2>&1')
# 提取镜像名
loaded_images = [l for l in out.split('\n') if 'Loaded' in l]
print(f' 加载结果: {loaded_images}')
# 查看加载的镜像名
rc, out, _ = run('docker images --format "{{.Repository}}:{{.Tag}}" | grep -i java')
java_image = out.strip()
print(f' Java镜像: {java_image}')
# ============ 步骤2: 创建并启动ujava2容器 ============
print()
print('[步骤2] 创建ujava2容器...')
# 先检查容器是否已存在
rc, out, _ = run('docker ps -a --format "{{.Names}}" | grep ujava2')
if out.strip():
print(f' ujava2容器已存在,启动中...')
run('docker start ujava2')
else:
# 创建ujava2容器 - 将服务目录挂载进去
rc, out, err = run(f"""docker run -d \
--name ujava2 \
--hostname ujava2 \
--mac-address="02:42:ac:11:00:06" \
-p 8002:8002 \
-p 18000:18000 \
-v /data/services/api/java-meeting:/var/www/java/api/java-meeting \
-v /data/services/api/auth:/var/www/java/api/auth \
-v /data/services/api/dubbo:/var/www/java/api/dubbo \
-v /data/middleware:/data/middleware \
-v /data/services/scripts:/data/services/scripts \
-v /etc/localtime:/etc/localtime:ro \
--restart=always \
{java_image}""")
if rc == 0:
print(f' ujava2容器创建成功: {out.strip()[:12]}')
else:
print(f' 创建失败: {err.strip()}')
# 等待容器启动
time.sleep(5)
rc, out, _ = run('docker ps --format "{{.Names}}\t{{.Status}}" | grep ujava')
print(f' 容器状态: {out.strip()}')
# ============ 步骤3: 安装宿主机JDK ============
print()
print('[步骤3] 安装宿主机JDK (jdk-8u492)...')
rc, out, _ = run('java -version 2>&1')
if 'version' in out.lower() or 'version' in out:
print(f' JDK已安装: {out.strip()}')
else:
# 解压JDK
rc, out, err = run('tar -xzf /data/temp/jdk-8u492-linux-x64.tar.gz -C /usr/local/')
if rc == 0:
print(' JDK解压完成')
# 配置环境变量
jdk_path = '/usr/local/jdk8u492-b09'
profile_line = f"""
# Java Environment
export JAVA_HOME={jdk_path}
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
"""
rc, out, _ = run(f'grep "JAVA_HOME" /etc/profile')
if not out.strip():
run(f'echo \'{profile_line}\' >> /etc/profile')
print(' 环境变量已写入 /etc/profile')
# 验证
rc, out, _ = run(f'source /etc/profile && {jdk_path}/bin/java -version 2>&1')
print(f' JDK验证: {out.strip()}')
else:
print(f' JDK安装失败: {err.strip()}')
# ============ 步骤4: 加载Python Docker镜像 ============
print()
print('[步骤4] 加载Python Docker镜像 (python_v16.tar.gz)...')
rc, out, err = run('docker load -i /data/temp/python_v16.tar.gz 2>&1')
loaded_python = [l for l in out.split('\n') if 'Loaded' in l]
print(f' 加载结果: {loaded_python}')
rc, out, _ = run('docker images --format "{{.Repository}}:{{.Tag}}" | tail -5')
print(f' 当前镜像列表:\n{out}')
# ============ 步骤5: 加载Voice Docker镜像 ============
print()
print('[步骤5] 加载Voice Docker镜像 (uvoice3.tar.gz)...')
rc, out, err = run('docker load -i /data/temp/uvoice3.tar.gz 2>&1')
loaded_voice = [l for l in out.split('\n') if 'Loaded' in l]
print(f' 加载结果: {loaded_voice}')
# ============ 步骤6: 加载其他镜像 ============
print()
print('[步骤6] 加载其他镜像 (paperless.tar, ufastdfs, uos-cardtable)...')
for img in ['paperless.tar', 'ufastdfs-v2.tar.gz', 'uos-cardtable.tar', 'ungrok2.tar.gz']:
rc, out, err = run(f'docker load -i /data/temp/{img} 2>&1')
loaded = [l for l in out.split('\n') if 'Loaded' in l]
print(f' {img}: {loaded if loaded else out.strip()[:80]}')
# ============ 最终状态 ============
print()
print('=' * 50)
print('[最终Docker镜像列表]')
rc, out, _ = run('docker images --format "table {{.Repository}}\t{{.Tag}}\t{{.Size}}"')
print(out)
print('[Docker容器列表]')
rc, out, _ = run('docker ps -a --format "table {{.Names}}\t{{.Status}}\t{{.Image}}"')
print(out)
total_min = int((time.time() - start) / 60)
print(f'\n服务安装用时: {total_min}分钟')
ssh.close()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
X86欧拉部署 - 步骤1-3: 拷贝数据 + 空间检查 + 启动中间件安装
"""
import paramiko
import time
import sys
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
HOST = '192.168.5.52'
USER = 'root'
PASS = 'Ubains@123'
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(HOST, username=USER, password=PASS, timeout=30, look_for_keys=False, allow_agent=False)
def run(cmd, timeout=300):
stdin, stdout, stderr = ssh.exec_command(cmd, timeout=timeout)
out = stdout.read().decode('utf-8', errors='replace')
err = stderr.read().decode('utf-8', errors='replace')
rc = stdout.channel.recv_exit_status()
return rc, out, err
DEPLOY_START = time.time()
# ============ 步骤1: 拷贝data目录内容到/data/ ============
print('[步骤1] 拷贝部署数据到 /data/')
# 注意:使用cp -rn(不覆盖已存在的文件)
rc, out, err = run('cp -rn /data/offline_auto_unifiedPlatform/data/* /data/ 2>&1')
print(f' 拷贝完成 rc={rc}')
# 验证
rc, out, _ = run('ls /data/temp/*.tar* 2>/dev/null | wc -l')
print(f' /data/temp/ 镜像文件数: {out.strip()}')
rc, out, _ = run('ls -d /data/middleware/*/ 2>/dev/null | wc -l')
print(f' /data/middleware/ 子目录数: {out.strip()}')
rc, out, _ = run('ls -d /data/services/*/ 2>/dev/null | wc -l')
print(f' /data/services/ 子目录数: {out.strip()}')
# ============ 步骤2: 空间检查 ============
print()
print('[步骤2] 磁盘空间检查')
rc, out, err = run('cd /data/offline_auto_unifiedPlatform && system_type=new_type_mount ./auto_check_space.sh', timeout=60)
# 只打印最后500字符
result = out.strip()
if len(result) > 500:
print(f' ...{result[-500:]}')
else:
print(result)
if err.strip():
err_text = err.strip()
if len(err_text) > 200:
print(f' stderr: ...{err_text[-200:]}')
else:
print(f' stderr: {err_text}')
print(f' 返回码: {rc}')
# ============ 步骤3: 创建并启动中间件安装脚本 ============
print()
print('[步骤3] 安装中间件服务(Docker+MySQL+Redis+EMQX+Nacos+Nginx等)')
print(' 这将需要较长时间...')
# 写部署启动脚本到服务器
launcher_script = r"""#!/bin/bash
export TERM=dumb
export server_ip=192.168.5.52
export middleware_type="mysql redis emqx nacos nginx"
cd /data/offline_auto_unifiedPlatform
source auto_middleware_install.sh
install_middleware
echo "MIDDLEWARE_INSTALL_DONE"
"""
# 使用SFTP写文件避免shell转义问题
sftp = ssh.open_sftp()
with sftp.file('/tmp/run_middleware.sh', 'w') as f:
f.write(launcher_script)
sftp.close()
run('chmod +x /tmp/run_middleware.sh')
# 后台运行
rc, out, _ = run('nohup /tmp/run_middleware.sh </dev/null >/data/middleware_deploy.log 2>&1 & echo $!')
pid = out.strip()
print(f' 部署进程已启动, PID: {pid}')
print(f' 日志文件: /data/middleware_deploy.log')
ssh.close()
print()
print(f' 步骤1-3完成,中间件正在后台安装中...')
print(f' 用时: {int(time.time() - DEPLOY_START)}秒')
......@@ -100,7 +100,7 @@ CONFIGS = {
'deploy_dir': '/data/offline_auto_unifiedPlatform',
'deploy_pkg': 'offline_auto_unifiedPlatform.tar.gz',
'deploy_md5': 'offline_auto_unifiedPlatform.tar.gz.md5',
'nas_dir': r'Z:\发布版本\03服务器部署\15新统一平台\X86部署包\全量版\版本更新-待验证',
'nas_dir': r'Z:\发布版本\03服务器部署\15新统一平台\X86部署包\全量版',
'deploy_script': 'new_auto.sh',
'deploy_answers': "y\ny\ny\ny\ny\ny\ny\nn\n",
'deploy_log': '/data/offline_auto_unifiedPlatform/deploy_output.log',
......@@ -460,12 +460,6 @@ echo "DEPLOY_SCRIPT_FINISHED"
ssh_exec_bg(ssh,
f"nohup /tmp/run_deploy.sh </dev/null >{cfg['deploy_log']} 2>&1 & echo $!")
if use_sudo:
ssh_exec_bg(ssh,
f"nohup /tmp/run_deploy.sh </dev/null >{cfg['deploy_log']} 2>&1 & echo $!")
else:
ssh_exec_bg(ssh,
f"nohup /tmp/run_deploy.sh </dev/null >{cfg['deploy_log']} 2>&1 & echo $!")
pid = 'monitoring'
logger.log("部署脚本已启动,监控日志进度...")
......@@ -691,8 +685,6 @@ def phase_security():
logger.log(f"========== {cfg['label']} 安全修复 ==========")
try:
ssh = ssh_connect(cfg['host'])
ssh = ssh_connect(cfg['host'], cfg['username'], cfg['password'])
# 创建带sudo支持的执行器
......
# 接口安全测试_需求文档
## 代码路径
- [AuxiliaryTool/ScriptTool/ApiSecurityTest/]
## 相关资料要求
### 目标服务器
- X86架构-欧拉服务器:192.168.5.44 root Ubains@123
......@@ -10,35 +12,29 @@
### 系统账号信息
- 超管账号密码:superadmin Ubains@1357
- 普通用户账号:需提供(用于测试越权访问)
- 管理员账号:需提供(用于测试水平越权)
- 不同角色的账号至少各一个(如:普通用户、公司管理员、超级管理员)
- 管理员账号:admin@aq
- 普通用户账号:user@aq
### 已知接口清单
- 预定对外接口:`/exapi/message/getMsgPageList`
- 预定系统接口:`/meetingV3/api/systemConfiguration/globalConfig?companyNumber=CN-SZ-00-0201`
- 运维集控接口:`/monitor/api2/api/servermonitor/`
- 讯飞转录接口:`/voice/api/iflytek/roommaster?company_id=1&user_id=8&company_secret=57d00f9f-020f-5f1f-b788-55fae843bceb&getall=1`
- 完整接口清单需补充(建议提供 Swagger 文档地址或后端接口文档)
-
### 相关资料
- 华为红线相关资料:[Z:\deploy\18其它系统\安全测试\01安全测试资料]
- 项目漏洞资料:[Z:\deploy\18其它系统\安全测试\02项目漏洞资料]
### 认证方式信息
- Token认证方式:accessToken(需提供 Token 获取接口及参数)
- 鉴权机制说明:需补充(Cookie / JWT / OAuth2 / 自定义Token)
- Token认证方式:accessToken
- 鉴权机制说明:JWT
- 接口鉴权参数:companyNumber、user_id、company_secret 等参数说明
### 服务部署信息
- 容器化部署:Docker
- 反向代理:Nginx(需提供 Nginx 配置文件路径或内容)
- 后端技术栈:需补充(Java Spring / Python Flask / Node.js 等)
- 数据库类型:需补充(MySQL / PostgreSQL / Redis 等)
### 需额外提供的资料
- [ ] 完整的 API 接口文档或 Swagger 地址
- [ ] 不同角色账号(至少:普通用户、管理员、超管各一个)
- [ ] Nginx 配置文件内容(用于分析路由和安全策略)
- [ ] 后端技术栈信息(语言、框架版本)
- [ ] 数据库类型和版本信息
- [ ] 是否有 WAF(Web应用防火墙)或 API 网关
- 反向代理:Nginx
- 配置文件:[Z:\deploy\18其它系统\安全测试\03Nginx配置文件\meeting443.conf]
- 后端技术栈:jaav、springboot、python
- 前端技术栈:vue2、unitapp
- 数据库类型:mysql、redis
- 中间件:emqx、fdfs、nginx、nacos
## 测试范围
### OWASP API Security Top 10 覆盖
......@@ -61,6 +57,7 @@
#### 预定系统(Meeting)
- 预定对外接口 `/exapi/*` 安全测试
- 预定对内接口 `/meetingV3/api/*` 安全测试
- 预定对内接口 `/meetingV2/api/*` 安全测试
- 会议预约、取消、修改的越权测试
- 用户信息接口的敏感数据泄露测试
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论