提交 e917ed43 authored 作者: 陈泽健's avatar 陈泽健

feat(script): 添加远程更新配置文件

- 配置测试服务器连接参数(主机、端口、用户名、密码)
- 配置构建服务器部署目录和包名称路径
- 配置NAS存储源目录和目标目录路径
- 定义前端服务列表包括AI包、后台包、主包等13个组件
- 定义后端JAR包服务包括认证、网关、系统等8个服务
- 定义后端文件夹服务包括CMDB和语音服务
上级 2b460259
{
"test_server": {
"host": "192.168.5.44",
"port": 22,
"username": "root",
"password": "Ubains@123"
},
"build_server": {
"host": "192.168.5.68",
"port": 22,
"username": "root",
"password": "Ubains@123",
"deploy_dir": "/data",
"package_name": "offline_auto_unifiedPlatform",
"package_path": "/data/offline_auto_unifiedPlatform"
},
"nas": {
"source_dir": "Z:\\发布版本\\03服务器部署\\15新统一平台\\X86部署包\\全量版",
"target_dir": "Z:\\发布版本\\03服务器部署\\15新统一平台\\X86部署包\\全量版\\版本更新-待验证"
},
"services": {
"frontend": [
{"name": "ai包", "path": "web/pc/pc-vue2-ai"},
{"name": "后台包", "path": "web/pc/pc-vue2-backstage"},
{"name": "main包", "path": "web/pc/pc-vue2-main"},
{"name": "meetngV2包", "path": "web/pc/pc-vue2-meetngV2"},
{"name": "meetngV3包", "path": "web/pc/pc-vue2-meetngV3"},
{"name": "meetingControl包", "path": "web/pc/pc-vue2-meetingControl"},
{"name": "monitor包", "path": "web/pc/pc-vue2-moniter"},
{"name": "platform包", "path": "web/pc/pc-vue2-platform"},
{"name": "voice包", "path": "web/pc/pc-vue2-voice/pc-vue2-voice"},
{"name": "h5-meeting", "path": "web/h5/h5-uniapp-meeting"},
{"name": "h5-moniter", "path": "web/h5/h5-uniapp-moniter"},
{"name": "h5-platform-mobile", "path": "web/h5/h5-uniapp-platform/meeting-mobile"},
{"name": "h5-platform-platform-mobile", "path": "web/h5/h5-uniapp-platform/unified-platform-mobile"}
],
"backend_jar": [
{"name": "auth包", "path": "api/auth/auth-sso-aut", "file": "ubains-auth.jar"},
{"name": "gatway包", "path": "api/auth/auth-sso-gatway", "file": "ubains-gateway.jar"},
{"name": "system包", "path": "api/auth/auth-sso-system", "file": "ubains-modules-system.jar"},
{"name": "java2.0包", "path": "api/java-meeting/java-meeting2.0", "file": "ubains-meeting-inner-api-1.0-SNAPSHOT.jar"},
{"name": "java-extapi包", "path": "api/java-meeting/java-meeting-extapi", "file": "ubains-meeting-api-1.0-SNAPSHOT.jar"},
{"name": "java-scheduling包", "path": "api/java-meeting/java-message-scheduling", "file": "ubains-meeting-message-scheduling-1.0-SNAPSHOT.jar"},
{"name": "java-mqtt包", "path": "api/java-meeting/java-mqtt", "file": "ubains-meeting-mqtt-1.0-SNAPSHOT.jar"},
{"name": "java-quartz包", "path": "api/java-meeting/java-quartz", "file": "ubains-meeting-quartz-1.0-SNAPSHOT.jar"}
],
"backend_folder": [
{"name": "cmdb包", "path": "api/python-cmdb", "config_file": "cmdb/bus/config/settingbus.conf"},
{"name": "voice包", "path": "api/python-voice", "config_file": "uvoice/bus/config/settingbus.conf"}
]
}
}
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
X86自动更新部署包版本脚本
从测试服务器获取最新服务包,更新到打包服务器的部署包中,
保留配置文件,重新打包后上传到网盘。
用法:
python x86_package_update.py # 完整更新流程
python x86_package_update.py --update-only # 仅更新服务(跳过步骤1-2)
python x86_package_update.py --help # 查看帮助
"""
import sys
import os
import json
import time
import hashlib
import shutil
import argparse
import paramiko
from datetime import datetime
# 修复Windows控制台编码问题
if sys.platform == 'win32':
try:
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
except Exception:
pass
class X86PackageUpdate:
"""X86部署包版本自动更新工具"""
def __init__(self):
"""初始化配置和状态"""
# 加载配置文件
self.config = self._load_config()
self.test_server = self.config['test_server']
self.build_server = self.config['build_server']
self.nas = self.config['nas']
self.services = self.config['services']
# SSH/SFTP 客户端
self.test_ssh = None
self.test_sftp = None
self.build_ssh = None
self.build_sftp = None
# 日志和报告
self.log_file = None
self.total_start_time = None
self.report_data = {
'start_time': '',
'end_time': '',
'total_duration': '',
'status': '成功',
'steps': [],
'service_details': [],
'errors': []
}
# 本地临时目录
script_dir = os.path.dirname(os.path.abspath(__file__))
self.temp_dir = os.path.join(script_dir, 'temp')
self.reports_dir = os.path.join(script_dir, 'reports')
def _load_config(self):
"""加载配置文件"""
config_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'config.json'
)
try:
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
print(f"[ERROR] 配置文件不存在: {config_path}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"[ERROR] 配置文件格式错误: {e}")
sys.exit(1)
# ==================== 日志模块 ====================
def _init_log(self):
"""初始化日志文件和报告目录"""
os.makedirs(self.reports_dir, exist_ok=True)
os.makedirs(self.temp_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.log_file = os.path.join(
self.reports_dir, f'x86_update_{timestamp}.log'
)
self.log(f"日志文件: {self.log_file}")
def log(self, message, level="INFO"):
"""输出日志到控制台和文件"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_msg = f"[{timestamp}] [{level}] {message}"
try:
print(log_msg)
except UnicodeEncodeError:
safe_msg = log_msg.encode('gbk', errors='replace').decode('gbk')
print(safe_msg)
if self.log_file:
try:
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_msg + '\n')
except Exception:
pass
# ==================== SSH/SFTP 连接管理 ====================
def _create_ssh_client(self, host, port, username, password, name="服务器"):
"""创建SSH连接并返回客户端和SFTP"""
self.log(f"正在连接{name} ({host}:{port})...")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
client.connect(
hostname=host,
port=port,
username=username,
password=password,
timeout=30
)
sftp = client.open_sftp()
self.log(f"{name}连接成功")
return client, sftp
except Exception as e:
retry_count += 1
self.log(
f"{name}连接失败 (第{retry_count}次): {e}", "WARN"
)
if retry_count < max_retries:
time.sleep(10)
self.log(f"{name}连接失败,已达最大重试次数", "ERROR")
return None, None
def _connect_servers(self):
"""连接测试服务器和打包服务器"""
# 连接测试服务器
self.test_ssh, self.test_sftp = self._create_ssh_client(
self.test_server['host'],
self.test_server['port'],
self.test_server['username'],
self.test_server['password'],
"测试服务器"
)
if not self.test_ssh:
return False
# 连接打包服务器
self.build_ssh, self.build_sftp = self._create_ssh_client(
self.build_server['host'],
self.build_server['port'],
self.build_server['username'],
self.build_server['password'],
"打包服务器"
)
if not self.build_ssh:
return False
return True
def _disconnect_servers(self):
"""断开所有SSH/SFTP连接"""
for sftp in [self.test_sftp, self.build_sftp]:
if sftp:
try:
sftp.close()
except Exception:
pass
for ssh in [self.test_ssh, self.build_ssh]:
if ssh:
try:
ssh.close()
except Exception:
pass
self.log("已断开所有服务器连接")
def _exec_build_cmd(self, cmd, timeout=600):
"""在打包服务器上执行SSH命令"""
try:
stdin, stdout, stderr = self.build_ssh.exec_command(
cmd, timeout=timeout
)
out = stdout.read().decode('utf-8', errors='replace')
err = stderr.read().decode('utf-8', errors='replace')
exit_code = stdout.channel.recv_exit_status()
return exit_code, out, err
except Exception as e:
self.log(f"命令执行异常: {cmd}, 错误: {e}", "ERROR")
return -1, '', str(e)
def _exec_test_cmd(self, cmd, timeout=600):
"""在测试服务器上执行SSH命令"""
try:
stdin, stdout, stderr = self.test_ssh.exec_command(
cmd, timeout=timeout
)
out = stdout.read().decode('utf-8', errors='replace')
err = stderr.read().decode('utf-8', errors='replace')
exit_code = stdout.channel.recv_exit_status()
return exit_code, out, err
except Exception as e:
self.log(f"命令执行异常: {cmd}, 错误: {e}", "ERROR")
return -1, '', str(e)
# ==================== 工具函数 ====================
def _calc_local_md5(self, filepath):
"""计算本地文件的MD5值"""
md5 = hashlib.md5()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
md5.update(chunk)
return md5.hexdigest()
def _get_remote_file_size(self, ssh_client, filepath):
"""获取远程文件大小"""
stdin, stdout, stderr = ssh_client.exec_command(
f'stat -c %s "{filepath}" 2>/dev/null || echo "0"'
)
size = stdout.read().decode().strip()
return int(size) if size.isdigit() else 0
def _format_size(self, size_bytes):
"""格式化文件大小"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB"
def _check_disk_space(self, ssh_client, path, required_mb):
"""检查远程服务器磁盘剩余空间"""
stdin, stdout, stderr = ssh_client.exec_command(
f'df -m "{path}" | tail -1 | awk \'{{print $4}}\''
)
avail = stdout.read().decode().strip()
if avail.isdigit() and int(avail) < required_mb:
self.log(
f"磁盘空间不足: 需要 {required_mb}MB, 可用 {avail}MB",
"ERROR"
)
return False
return True
def _record_step(self, step_num, step_name, status, detail=""):
"""记录操作步骤到报告数据"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.report_data['steps'].append({
'num': step_num,
'name': step_name,
'time': timestamp,
'status': status,
'detail': detail
})
# ==================== 步骤1:从网盘下载部署包到打包服务器 ====================
def step1_download_from_nas(self):
"""步骤1:从网盘读取部署包,校验MD5,上传到打包服务器"""
self.log("=" * 50)
self.log("步骤1:从网盘下载部署包到打包服务器")
self.log("=" * 50)
source_dir = self.nas['source_dir']
package_name = self.build_server['package_name']
tar_file = f"{package_name}.tar.gz"
md5_file = f"{package_name}.tar.gz.md5"
local_tar = os.path.join(source_dir, tar_file)
local_md5 = os.path.join(source_dir, md5_file)
# 检查网盘文件是否存在
if not os.path.exists(local_tar):
msg = f"网盘中未找到部署包: {local_tar}"
self.log(msg, "ERROR")
self._record_step(1, "下载部署包", "失败", msg)
return False
if not os.path.exists(local_md5):
msg = f"网盘中未找到MD5文件: {local_md5}"
self.log(msg, "ERROR")
self._record_step(1, "下载部署包", "失败", msg)
return False
# 读取MD5校验文件
with open(local_md5, 'r', encoding='utf-8') as f:
expected_md5 = f.read().strip().split()[0]
# 计算本地文件MD5
self.log("正在校验MD5...")
actual_md5 = self._calc_local_md5(local_tar)
if actual_md5 != expected_md5:
msg = f"MD5校验失败: 期望={expected_md5}, 实际={actual_md5}"
self.log(msg, "ERROR")
self._record_step(1, "下载部署包", "失败", msg)
return False
self.log(f"MD5校验通过: {actual_md5}")
file_size = os.path.getsize(local_tar)
self.log(f"部署包大小: {self._format_size(file_size)}")
# 检查打包服务器磁盘空间(至少需要文件大小的2倍)
required_mb = int(file_size * 2 / (1024 * 1024)) + 100
if not self._check_disk_space(
self.build_ssh, self.build_server['deploy_dir'], required_mb
):
self._record_step(1, "下载部署包", "失败", "磁盘空间不足")
return False
# SFTP上传到打包服务器
remote_tar = f"{self.build_server['deploy_dir']}/{tar_file}"
remote_md5 = f"{self.build_server['deploy_dir']}/{md5_file}"
self.log(f"正在上传部署包到打包服务器: {remote_tar}")
try:
self.build_sftp.put(local_tar, remote_tar)
self.log(f"部署包上传完成")
except Exception as e:
msg = f"上传部署包失败: {e}"
self.log(msg, "ERROR")
self._record_step(1, "下载部署包", "失败", msg)
return False
self._record_step(
1, "下载部署包", "成功",
f"文件: {tar_file}, 大小: {self._format_size(file_size)}, "
f"MD5: {actual_md5}"
)
return True
# ==================== 步骤2:解压部署包 ====================
def step2_extract_package(self):
"""步骤2:在打包服务器上解压部署包并清理压缩文件"""
self.log("=" * 50)
self.log("步骤2:解压部署包")
self.log("=" * 50)
deploy_dir = self.build_server['deploy_dir']
package_name = self.build_server['package_name']
tar_file = f"{deploy_dir}/{package_name}.tar.gz"
md5_file = f"{deploy_dir}/{package_name}.tar.gz.md5"
# 解压(禁止中断)
self.log("正在解压部署包(请勿中断)...")
exit_code, out, err = self._exec_build_cmd(
f'cd "{deploy_dir}" && tar -xzf "{tar_file}"',
timeout=1800
)
if exit_code != 0:
msg = f"解压失败: {err}"
self.log(msg, "ERROR")
self._record_step(2, "解压部署包", "失败", msg)
return False
self.log("解压完成")
# 验证解压后的目录存在
package_path = self.build_server['package_path']
exit_code, out, err = self._exec_build_cmd(
f'test -d "{package_path}" && echo "EXISTS"'
)
if "EXISTS" not in out:
msg = f"解压后目录不存在: {package_path}"
self.log(msg, "ERROR")
self._record_step(2, "解压部署包", "失败", msg)
return False
# 删除压缩包和md5文件
self.log("清理压缩包和MD5文件...")
self._exec_build_cmd(f'rm -f "{tar_file}" "{md5_file}"')
self.log("清理完成")
self._record_step(2, "解压部署包", "成功")
return True
# ==================== 步骤3:从测试服务器打包所有服务 ====================
def step3_package_test_services(self):
"""步骤3:从测试服务器打包所有服务包"""
self.log("=" * 50)
self.log("步骤3:从测试服务器打包所有服务包")
self.log("=" * 50)
# 构建打包命令,只打包文档列出的目录
tar_dirs = []
for svc in self.services['frontend']:
tar_dirs.append(svc['path'])
for svc in self.services['backend_jar']:
tar_dirs.append(svc['path'])
for svc in self.services['backend_folder']:
tar_dirs.append(svc['path'])
dirs_str = ' '.join(tar_dirs)
remote_tmp_tar = '/tmp/services_update.tar.gz'
# 先清理旧的临时文件
self._exec_test_cmd(f'rm -f "{remote_tmp_tar}"')
# 在测试服务器上打包
self.log("正在测试服务器上打包所有服务(请勿中断)...")
cmd = f'cd /data/services && tar -czf "{remote_tmp_tar}" {dirs_str}'
exit_code, out, err = self._exec_test_cmd(cmd, timeout=1800)
if exit_code != 0:
msg = f"测试服务器打包失败: {err}"
self.log(msg, "ERROR")
self._record_step(3, "测试服务器打包", "失败", msg)
return False
# 获取打包文件大小
tar_size = self._get_remote_file_size(self.test_ssh, remote_tmp_tar)
self.log(f"服务包打包完成,大小: {self._format_size(tar_size)}")
# 下载到本地临时目录
local_tar = os.path.join(self.temp_dir, 'services_update.tar.gz')
self.log("正在下载服务包到本地...")
try:
self.test_sftp.get(remote_tmp_tar, local_tar)
self.log("下载完成")
except Exception as e:
msg = f"下载服务包失败: {e}"
self.log(msg, "ERROR")
self._record_step(3, "测试服务器打包", "失败", msg)
return False
# 上传到打包服务器
build_tmp_tar = '/tmp/services_update.tar.gz'
self.log("正在上传服务包到打包服务器...")
try:
self.build_sftp.put(local_tar, build_tmp_tar)
self.log("上传完成")
except Exception as e:
msg = f"上传服务包到打包服务器失败: {e}"
self.log(msg, "ERROR")
self._record_step(3, "测试服务器打包", "失败", msg)
return False
# 清理测试服务器临时文件
self._exec_test_cmd(f'rm -f "{remote_tmp_tar}"')
# 清理本地临时文件
if os.path.exists(local_tar):
os.remove(local_tar)
self._record_step(
3, "测试服务器打包", "成功",
f"服务包大小: {self._format_size(tar_size)}"
)
return True
# ==================== 步骤4:解压服务包 ====================
def step4_extract_test_services(self):
"""步骤4:在打包服务器上解压测试服务包"""
self.log("=" * 50)
self.log("步骤4:解压测试服务包")
self.log("=" * 50)
build_tmp_tar = '/tmp/services_update.tar.gz'
extract_dir = '/tmp/services_update'
# 创建解压目录
self._exec_build_cmd(f'rm -rf "{extract_dir}"')
self._exec_build_cmd(f'mkdir -p "{extract_dir}"')
# 解压
self.log("正在解压服务包...")
exit_code, out, err = self._exec_build_cmd(
f'tar -xzf "{build_tmp_tar}" -C "{extract_dir}"',
timeout=1800
)
if exit_code != 0:
msg = f"解压服务包失败: {err}"
self.log(msg, "ERROR")
self._record_step(4, "解压服务包", "失败", msg)
return False
self.log("解压完成")
# 清理压缩包
self._exec_build_cmd(f'rm -f "{build_tmp_tar}"')
self._record_step(4, "解压服务包", "成功")
return True
# ==================== 步骤5-6:服务替换与配置保留 ====================
def step5_6_update_services(self):
"""步骤5-6:执行服务包更新替换并恢复配置文件"""
self.log("=" * 50)
self.log("步骤5-6:执行服务包更新替换并恢复配置文件")
self.log("=" * 50)
services_base = '/data/offline_auto_unifiedPlatform/data/services'
update_base = '/tmp/services_update/services'
all_success = True
# ---------- 更新前端服务 ----------
self.log("--- 更新前端服务 ---")
for svc in self.services['frontend']:
svc_name = svc['name']
svc_path = svc['path']
src_dir = f"{update_base}/{svc_path}"
dst_dir = f"{services_base}/{svc_path}"
config_file = f"{dst_dir}/static/config.json"
self.log(f"正在更新前端服务: {svc_name} ({svc_path})")
# 记录原始文件大小
old_size = self._get_remote_file_size(
self.build_ssh, dst_dir
)
# 保存原始 config.json
config_saved = False
config_backup = f"/tmp/config_backup_{svc_name.replace(' ', '_')}.json"
exit_code, _, _ = self._exec_build_cmd(
f'cp "{config_file}" "{config_backup}" 2>/dev/null && echo "SAVED"'
)
if "SAVED" in _ or exit_code == 0:
config_saved = True
self.log(f" 已保存 config.json")
else:
self.log(f" 无需保存 config.json(文件不存在)", "WARN")
# 覆盖服务目录
exit_code, out, err = self._exec_build_cmd(
f'rm -rf "{dst_dir}" && cp -r "{src_dir}" "{dst_dir}"'
)
if exit_code != 0:
msg = f"覆盖前端服务 {svc_name} 失败: {err}"
self.log(msg, "ERROR")
self.report_data['errors'].append({
'service': svc_name,
'error': msg
})
self._record_step(5, f"更新{svc_name}", "失败", msg)
return False
# 恢复 config.json
if config_saved:
exit_code, _, err = self._exec_build_cmd(
f'cp "{config_backup}" "{config_file}"'
)
if exit_code != 0:
msg = f"恢复 {svc_name} 的 config.json 失败: {err}"
self.log(msg, "ERROR")
self.report_data['errors'].append({
'service': svc_name,
'error': msg
})
return False
self.log(f" 已恢复 config.json")
# 清理备份
self._exec_build_cmd(f'rm -f "{config_backup}"')
# 记录新文件大小
new_size = self._get_remote_file_size(
self.build_ssh, dst_dir
)
self.report_data['service_details'].append({
'name': svc_name,
'type': '前端',
'path': svc_path,
'old_size': self._format_size(old_size),
'new_size': self._format_size(new_size),
'status': '成功'
})
self.log(f" {svc_name} 更新成功")
# ---------- 更新 jar 后端服务 ----------
self.log("--- 更新jar后端服务 ---")
for svc in self.services['backend_jar']:
svc_name = svc['name']
svc_path = svc['path']
jar_file = svc['file']
src_file = f"{update_base}/{svc_path}/{jar_file}"
dst_file = f"{services_base}/{svc_path}/{jar_file}"
self.log(f"正在更新后端服务: {svc_name} ({jar_file})")
# 记录原始文件大小
old_size = self._get_remote_file_size(self.build_ssh, dst_file)
# 直接覆盖jar文件
exit_code, out, err = self._exec_build_cmd(
f'cp -f "{src_file}" "{dst_file}"'
)
if exit_code != 0:
msg = f"覆盖jar服务 {svc_name} 失败: {err}"
self.log(msg, "ERROR")
self.report_data['errors'].append({
'service': svc_name,
'error': msg
})
self._record_step(5, f"更新{svc_name}", "失败", msg)
return False
# 记录新文件大小
new_size = self._get_remote_file_size(self.build_ssh, dst_file)
self.report_data['service_details'].append({
'name': svc_name,
'type': '后端jar',
'path': svc_path,
'old_size': self._format_size(old_size),
'new_size': self._format_size(new_size),
'status': '成功'
})
self.log(f" {svc_name} 更新成功")
# ---------- 更新文件夹后端服务 ----------
self.log("--- 更新文件夹后端服务 ---")
for svc in self.services['backend_folder']:
svc_name = svc['name']
svc_path = svc['path']
config_relative = svc['config_file']
src_dir = f"{update_base}/{svc_path}"
dst_dir = f"{services_base}/{svc_path}"
config_file = f"{dst_dir}/{config_relative}"
self.log(f"正在更新后端服务: {svc_name} ({svc_path})")
# 记录原始文件大小
old_size = self._get_remote_file_size(self.build_ssh, dst_dir)
# 保存原始 settingbus.conf
config_saved = False
config_backup = (
f"/tmp/settingbus_backup_{svc_name.replace(' ', '_')}.conf"
)
exit_code, _, _ = self._exec_build_cmd(
f'cp "{config_file}" "{config_backup}" 2>/dev/null '
f'&& echo "SAVED"'
)
if exit_code == 0:
config_saved = True
self.log(f" 已保存 settingbus.conf")
# 覆盖服务目录
exit_code, out, err = self._exec_build_cmd(
f'rm -rf "{dst_dir}" && cp -r "{src_dir}" "{dst_dir}"'
)
if exit_code != 0:
msg = f"覆盖后端服务 {svc_name} 失败: {err}"
self.log(msg, "ERROR")
self.report_data['errors'].append({
'service': svc_name,
'error': msg
})
self._record_step(5, f"更新{svc_name}", "失败", msg)
return False
# 恢复 settingbus.conf
if config_saved:
# 确保目标目录存在
config_dir = os.path.dirname(config_file)
self._exec_build_cmd(f'mkdir -p "{config_dir}"')
exit_code, _, err = self._exec_build_cmd(
f'cp "{config_backup}" "{config_file}"'
)
if exit_code != 0:
msg = (
f"恢复 {svc_name} 的 settingbus.conf 失败: {err}"
)
self.log(msg, "ERROR")
self.report_data['errors'].append({
'service': svc_name,
'error': msg
})
return False
self.log(f" 已恢复 settingbus.conf")
# 清理备份
self._exec_build_cmd(f'rm -f "{config_backup}"')
# 记录新文件大小
new_size = self._get_remote_file_size(self.build_ssh, dst_dir)
self.report_data['service_details'].append({
'name': svc_name,
'type': '后端文件夹',
'path': svc_path,
'old_size': self._format_size(old_size),
'new_size': self._format_size(new_size),
'status': '成功'
})
self.log(f" {svc_name} 更新成功")
# 清理解压的临时目录
self._exec_build_cmd('rm -rf /tmp/services_update')
self.log("已清理临时解压目录")
self._record_step(
5, "服务更新替换", "成功",
f"共更新 {len(self.report_data['service_details'])} 个服务"
)
return True
# ==================== 步骤7:重新打包 ====================
def step7_repackage(self):
"""步骤7:将更新后的部署包重新打包并生成MD5"""
self.log("=" * 50)
self.log("步骤7:重新打包部署包")
self.log("=" * 50)
deploy_dir = self.build_server['deploy_dir']
package_name = self.build_server['package_name']
package_path = self.build_server['package_path']
tar_file = f"{deploy_dir}/{package_name}.tar.gz"
md5_file = f"{deploy_dir}/{package_name}.tar.gz.md5"
# 删除旧的打包文件
self._exec_build_cmd(f'rm -f "{tar_file}" "{md5_file}"')
# 打包(禁止中断)
self.log("正在打包部署包(请勿中断)...")
exit_code, out, err = self._exec_build_cmd(
f'cd "{deploy_dir}" && tar -czf "{tar_file}" "{package_name}"',
timeout=3600
)
if exit_code != 0:
msg = f"打包失败: {err}"
self.log(msg, "ERROR")
self._record_step(7, "重新打包", "失败", msg)
return False
self.log("打包完成")
# 生成MD5
self.log("正在生成MD5校验文件...")
exit_code, out, err = self._exec_build_cmd(
f'cd "{deploy_dir}" && md5sum "{package_name}.tar.gz" '
f'> "{package_name}.tar.gz.md5"'
)
if exit_code != 0:
msg = f"生成MD5失败: {err}"
self.log(msg, "ERROR")
self._record_step(7, "重新打包", "失败", msg)
return False
# 获取打包文件大小
tar_size = self._get_remote_file_size(self.build_ssh, tar_file)
self.log(f"打包文件大小: {self._format_size(tar_size)}")
# 读取MD5值
exit_code, out, err = self._exec_build_cmd(f'cat "{md5_file}"')
md5_value = out.strip().split()[0] if out.strip() else "未知"
self.log(f"MD5: {md5_value}")
self._record_step(
7, "重新打包", "成功",
f"文件: {tar_file}, 大小: {self._format_size(tar_size)}, "
f"MD5: {md5_value}"
)
return True
# ==================== 步骤8:拷贝到网盘 ====================
def step8_copy_to_nas(self):
"""步骤8:从打包服务器下载并拷贝到网盘"""
self.log("=" * 50)
self.log("步骤8:拷贝到网盘")
self.log("=" * 50)
deploy_dir = self.build_server['deploy_dir']
package_name = self.build_server['package_name']
remote_tar = f"{deploy_dir}/{package_name}.tar.gz"
remote_md5 = f"{deploy_dir}/{package_name}.tar.gz.md5"
local_tar = os.path.join(self.temp_dir, f"{package_name}.tar.gz")
local_md5 = os.path.join(self.temp_dir, f"{package_name}.tar.gz.md5")
# 从打包服务器下载到本地
self.log("正在从打包服务器下载到本地...")
try:
self.build_sftp.get(remote_tar, local_tar)
self.build_sftp.get(remote_md5, local_md5)
self.log("下载完成")
except Exception as e:
msg = f"从打包服务器下载失败: {e}"
self.log(msg, "ERROR")
self._record_step(8, "拷贝到网盘", "失败", msg)
return False
# 拷贝到网盘
target_dir = self.nas['target_dir']
self.log(f"正在拷贝到网盘: {target_dir}")
# 确保网盘目录存在
try:
os.makedirs(target_dir, exist_ok=True)
except Exception as e:
msg = f"创建网盘目录失败: {e}"
self.log(msg, "ERROR")
self._record_step(8, "拷贝到网盘", "失败", msg)
return False
try:
nas_tar = os.path.join(target_dir, f"{package_name}.tar.gz")
nas_md5 = os.path.join(target_dir, f"{package_name}.tar.gz.md5")
shutil.copy2(local_tar, nas_tar)
shutil.copy2(local_md5, nas_md5)
self.log("拷贝到网盘完成")
except Exception as e:
msg = f"拷贝到网盘失败: {e}"
self.log(msg, "ERROR")
self._record_step(8, "拷贝到网盘", "失败", msg)
return False
self._record_step(8, "拷贝到网盘", "成功")
return True
# ==================== 步骤9:清理临时文件 ====================
def step9_cleanup(self):
"""步骤9:清理所有临时文件"""
self.log("=" * 50)
self.log("步骤9:清理临时文件")
self.log("=" * 50)
package_name = self.build_server['package_name']
deploy_dir = self.build_server['deploy_dir']
# 清理打包服务器上的 tar.gz 和 md5
self.log("清理打包服务器上的压缩包...")
self._exec_build_cmd(
f'rm -f "{deploy_dir}/{package_name}.tar.gz" '
f'"{deploy_dir}/{package_name}.tar.gz.md5"'
)
self._exec_build_cmd('rm -rf /tmp/services_update')
# 清理打包服务器上的配置备份文件
self._exec_build_cmd('rm -f /tmp/config_backup_*.json')
self._exec_build_cmd('rm -f /tmp/settingbus_backup_*.conf')
# 清理本地临时目录
self.log("清理本地临时文件...")
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir, ignore_errors=True)
os.makedirs(self.temp_dir, exist_ok=True)
self.log("清理完成")
self._record_step(9, "清理临时文件", "成功")
return True
# ==================== 步骤10:生成报告 ====================
def step10_generate_report(self):
"""步骤10:生成操作报告"""
self.log("=" * 50)
self.log("步骤10:生成报告")
self.log("=" * 50)
# 计算总耗时
end_time = datetime.now()
self.report_data['end_time'] = end_time.strftime('%Y-%m-%d %H:%M:%S')
if self.total_start_time:
duration = end_time - self.total_start_time
total_seconds = int(duration.total_seconds())
minutes, seconds = divmod(total_seconds, 60)
self.report_data['total_duration'] = f"{minutes}分{seconds}秒"
else:
self.report_data['total_duration'] = "未知"
# 判断整体状态
if self.report_data['errors']:
self.report_data['status'] = '失败'
elif any(
s['status'] == '失败' for s in self.report_data['steps']
):
self.report_data['status'] = '失败'
else:
self.report_data['status'] = '成功'
# 生成报告内容
report_content = self._build_report()
# 保存报告
timestamp = datetime.now().strftime('%Y%m%d')
report_path = os.path.join(
self.reports_dir, f'x86_update_{timestamp}.md'
)
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report_content)
self.log(f"报告已生成: {report_path}")
self._record_step(10, "生成报告", "成功", f"报告路径: {report_path}")
return report_path
def _build_report(self):
"""构建报告内容"""
lines = []
lines.append("# X86自动更新部署包版本报告\n")
lines.append("## 基本信息\n")
lines.append(f"- 执行时间:{self.report_data['start_time']}")
lines.append(f"- 结束时间:{self.report_data['end_time']}")
lines.append(f"- 总耗时:{self.report_data['total_duration']}")
lines.append(f"- 执行状态:**{self.report_data['status']}**\n")
# 操作步骤记录
lines.append("## 操作步骤记录\n")
lines.append("| 序号 | 步骤名称 | 时间 | 状态 | 详情 |")
lines.append("|------|---------|------|------|------|")
for step in self.report_data['steps']:
status_icon = "✅" if step['status'] == "成功" else "❌"
lines.append(
f"| {step['num']} | {step['name']} | {step['time']} "
f"| {status_icon} {step['status']} | {step['detail']} |"
)
lines.append("")
# 服务更新详情
if self.report_data['service_details']:
lines.append("## 服务更新详情\n")
lines.append(
"| 服务名称 | 类型 | 更新前大小 | 更新后大小 | 状态 |"
)
lines.append(
"|---------|------|-----------|-----------|------|"
)
for svc in self.report_data['service_details']:
status_icon = "✅" if svc['status'] == "成功" else "❌"
lines.append(
f"| {svc['name']} | {svc['type']} "
f"| {svc['old_size']} | {svc['new_size']} "
f"| {status_icon} {svc['status']} |"
)
lines.append("")
# 异常记录
if self.report_data['errors']:
lines.append("## 异常记录\n")
for err in self.report_data['errors']:
lines.append(f"- **{err['service']}**: {err['error']}")
lines.append("")
# 最终产物
lines.append("## 最终产物\n")
package_name = self.build_server['package_name']
lines.append(f"- 压缩包:`{package_name}.tar.gz`")
lines.append(f"- MD5文件:`{package_name}.tar.gz.md5`")
lines.append(f"- 网盘路径:`{self.nas['target_dir']}`")
lines.append("")
return '\n'.join(lines)
# ==================== 主执行流程 ====================
def run(self, update_only=False):
"""执行完整的更新流程"""
self.total_start_time = datetime.now()
self.report_data['start_time'] = (
self.total_start_time.strftime('%Y-%m-%d %H:%M:%S')
)
# 初始化日志
self._init_log()
self.log("X86自动更新部署包版本 - 开始执行")
self.log(f"模式: {'仅更新服务' if update_only else '完整更新'}")
try:
# 连接服务器
if not self._connect_servers():
self.log("服务器连接失败,终止执行", "ERROR")
self.step10_generate_report()
return False
if not update_only:
# 步骤1:从网盘下载部署包到打包服务器
if not self.step1_download_from_nas():
self.log("步骤1失败,终止执行", "ERROR")
self.step10_generate_report()
return False
# 步骤2:解压部署包
if not self.step2_extract_package():
self.log("步骤2失败,终止执行", "ERROR")
self.step10_generate_report()
return False
# 步骤3:从测试服务器打包所有服务
if not self.step3_package_test_services():
self.log("步骤3失败,终止执行", "ERROR")
self.step10_generate_report()
return False
# 步骤4:解压服务包
if not self.step4_extract_test_services():
self.log("步骤4失败,终止执行", "ERROR")
self.step10_generate_report()
return False
# 步骤5-6:服务替换与配置保留
if not self.step5_6_update_services():
self.log("步骤5-6失败,终止执行", "ERROR")
self.step10_generate_report()
return False
# 步骤7:重新打包
if not self.step7_repackage():
self.log("步骤7失败,终止执行", "ERROR")
self.step10_generate_report()
return False
# 步骤8:拷贝到网盘
if not self.step8_copy_to_nas():
self.log("步骤8失败,终止执行", "ERROR")
self.step10_generate_report()
return False
# 步骤9:清理临时文件
self.step9_cleanup()
# 步骤10:生成报告
report_path = self.step10_generate_report()
self.log("=" * 50)
self.log(
f"全部完成!总耗时: {self.report_data['total_duration']}"
)
self.log(f"报告路径: {report_path}")
self.log("=" * 50)
return True
except KeyboardInterrupt:
self.log("\n用户中断执行", "WARN")
self.report_data['status'] = '中断'
self.step10_generate_report()
return False
except Exception as e:
self.log(f"执行异常: {e}", "ERROR")
self.report_data['status'] = '异常'
self.report_data['errors'].append({
'service': '系统',
'error': str(e)
})
self.step10_generate_report()
return False
finally:
# 断开连接
self._disconnect_servers()
# 清理本地临时文件
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir, ignore_errors=True)
def main():
"""主入口函数"""
parser = argparse.ArgumentParser(
description='X86自动更新部署包版本工具'
)
parser.add_argument(
'--update-only',
action='store_true',
help='仅更新服务(跳过步骤1-2,打包服务器已有解压后的部署包)'
)
args = parser.parse_args()
updater = X86PackageUpdate()
success = updater.run(update_only=args.update_only)
sys.exit(0 if success else 1)
if __name__ == '__main__':
main()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论