提交 8865cfb8 authored 作者: 陈泽健's avatar 陈泽健

feat(预定系统): 添加全栈更新脚本并集成定时任务- 新增 pull_remote_devlop.py 脚本实现全栈更新功能

- 在定时执行功能测试文件中添加每日 9:05 执行更新任务
- 更新后执行兰州中石化项目测试任务
上级 6b8ca05f
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
预定系统全栈更新脚本 - 终极完整版
包含:
1. 完整的前端备份功能(使用mv命令)
2. 完整的后端更新功能
3. 完整的异常处理流程
4. 完整的日志记录系统
"""
import fnmatch
import paramiko
import os
from datetime import datetime
from tqdm import tqdm
import sys
import re
import tarfile
import tempfile
import shutil
import stat
class Config:
"""完整的系统配置类"""
# ===== 连接配置 =====
SSH_PORT = 22
SSH_USER = "root"
SSH_PASSWORD = os.getenv('DEPLOY_SSH_PASSWORD') or "hzpassw0RD@KP"
# ===== 服务器配置 =====
SERVER_HOST = '139.159.163.86' # 默认使用兰州中石化测试环境
# 新增自动版本发现配置
VERSION_DISCOVERY = {
'backend': {
'inner': {
'search_path': r"\\192.168.9.9\deploy\01会议预定\标准版本-长期运维\01版本管理\01后端运行服务\内部预定",
'file_pattern': '*.jar'
},
'external': {
'search_path': r"\\192.168.9.9\deploy\01会议预定\标准版本-长期运维\01版本管理\01后端运行服务\对外预定",
'file_pattern': '*.jar'
}
},
'frontend': {
'front': {
'search_path': r"\\192.168.9.9\deploy\00项目管理\2025\L 兰州中石化项目\01版本管理\02前端PC网页",
'files_to_update': ['*.worker.js', 'index.html', 'static/']
},
'admin': {
'search_path': r"\\192.168.9.9\deploy\01会议预定\标准版本-预定后台\01版本管理",
'files_to_update': ['index.html', 'static/']
}
}
}
# ===== 备份配置 =====
BACKUP_CONFIG = {
'front': {
'remote_dir': "/var/www/java/ubains-web-2.0/",
'backup_dir': "/var/www/java/ubains-web-2.0/backup/",
'files_to_backup': ['index.html', 'static', '*.worker.js']
},
'admin': {
'remote_dir': "/var/www/java/ubains-web-admin/",
'backup_dir': "/var/www/java/ubains-web-admin/backup/",
'files_to_backup': ['index.html', 'static']
},
'inner': {
'remote_dir': "/var/www/java/api-java-meeting2.0/",
'backup_dir': "/var/www/java/api-java-meeting2.0/backup/",
'files_to_backup': ['ubains-meeting-inner-api-1.0-SNAPSHOT.jar']
},
'external': {
'remote_dir': "/var/www/java/external-meeting-api/",
'backup_dir': "/var/www/java/external-meeting-api/backup/",
'files_to_backup': ['ubains-meeting-api-1.0-SNAPSHOT.jar']
}
}
# ===== 日志过滤规则 =====
IGNORABLE_LOG_PATTERNS = [
re.compile(r'unexpected operator'),
re.compile(r'Usage: kill'),
re.compile(r'kill -l'),
re.compile(r'Java HotSpot\(TM\)'),
re.compile(r'SLF4J:'),
re.compile(r'Listening for transport'),
re.compile(r'Starting Application'),
re.compile(r'tail:.*file truncated'),
re.compile(r'log4j:WARN')
]
class Deployer:
"""完整的更新器实现"""
def __init__(self):
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.logger = self._setup_logger()
self.sftp = None
self.remote_archive_name = "frontend_update.tar.gz"
def _setup_logger(self):
"""完整的日志系统初始化"""
def log(msg, level="INFO", important=False):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_msg = f"[{timestamp}] [{level}] {msg}"
if level in ("WARNING", "ERROR") or important:
print(f"\033[1m{log_msg}\033[0m")
if level == "ERROR":
sys.stderr.write(log_msg + "\n")
else:
print(log_msg)
return log
def connect(self):
"""完整的连接方法"""
max_retries = 3
for attempt in range(1, max_retries + 1):
try:
self.logger(f"尝试连接服务器 ({attempt}/{max_retries})...", important=True)
self.ssh.connect(
hostname=Config.SERVER_HOST,
port=Config.SSH_PORT,
username=Config.SSH_USER,
password=Config.SSH_PASSWORD,
timeout=30,
banner_timeout=200
)
self.sftp = self.ssh.open_sftp()
# 确保所有必要的目录存在
for service in ['front', 'admin']:
config = Config.BACKUP_CONFIG[service]
self._ensure_remote_dir_exists(config['backup_dir'])
self._ensure_remote_dir_exists(config['remote_dir'])
self.logger("服务器连接成功", important=True)
return True
except Exception as e:
self.logger(f"连接尝试 {attempt} 失败: {str(e)}", "WARNING")
if attempt == max_retries:
self.logger("连接服务器最终失败", "ERROR")
return False
def _ensure_remote_dir_exists(self, remote_dir):
"""确保远程目录存在(完整实现)"""
try:
self.sftp.stat(remote_dir)
except FileNotFoundError:
try:
self.sftp.mkdir(remote_dir)
self.logger(f"已创建远程目录: {remote_dir}", "INFO")
except Exception as e:
raise Exception(f"创建目录失败: {remote_dir} - {str(e)}")
except Exception as e:
raise Exception(f"目录检查失败: {remote_dir} - {str(e)}")
def _remote_file_exists(self, remote_path):
"""检查远程文件是否存在(完整实现)"""
try:
self.sftp.stat(remote_path)
return True
except:
return False
def _find_worker_files(self, remote_dir):
"""完整的工作线程文件查找"""
try:
stdin, stdout, stderr = self.ssh.exec_command(
f"find {remote_dir} -maxdepth 1 -name '*.worker.js' -type f"
)
files = []
for line in stdout:
line = line.strip()
if line and self._remote_file_exists(line):
files.append(line)
return files
except Exception as e:
self.logger(f"查找worker文件失败: {str(e)}", "ERROR")
return []
def _discover_latest_version(self, service_type, service_name):
"""自动发现最新版本(核心逻辑)"""
config = Config.VERSION_DISCOVERY[service_type][service_name]
self.logger(f"\n===== 正在发现 {service_name} 最新版本 =====", important=True)
try:
# 获取最新文件夹
latest_folder = self._get_latest_local_folder(config['search_path'])
if not latest_folder:
raise Exception("未找到有效版本文件夹")
if service_type == 'backend':
# 获取JAR包
jar_pattern = config['file_pattern']
latest_jar = self._get_latest_matching_file(latest_folder, jar_pattern)
if not latest_jar:
raise Exception(f"未找到匹配 {jar_pattern} 的文件")
self.logger(f"发现最新版本: {latest_jar}", important=True)
return {'local': latest_jar, 'remote': Config.BACKEND_PATHS[service_name]['remote']}
else:
# 获取前端文件
files_to_update = config['files_to_update']
found_files = self._get_frontend_files(latest_folder, files_to_update)
self.logger(f"发现最新版本于: {latest_folder}", important=True)
return {
'local_dir': latest_folder,
'files_to_update': files_to_update,
'found_files': found_files
}
except Exception as e:
self.logger(f"版本发现失败: {str(e)}", "ERROR")
return None
def _get_frontend_files(self, folder_path, files_to_update):
"""获取前端文件清单"""
found = {}
for item in files_to_update:
if item.endswith('/'):
dir_name = item[:-1]
dir_path = os.path.join(folder_path, dir_name)
found[dir_name] = os.path.exists(dir_path)
elif item.startswith('*.'):
ext = item[2:]
found[ext] = any(f.endswith(ext) for f in os.listdir(folder_path))
else:
found[item] = os.path.exists(os.path.join(folder_path, item))
return found
def _get_latest_matching_file(self, folder, pattern):
"""获取匹配模式的最新文件"""
try:
files = [f for f in os.listdir(folder) if fnmatch.fnmatch(f, pattern)]
if not files:
return None
# 按修改时间排序
files_with_mtime = []
for f in files:
full_path = os.path.join(folder, f)
mtime = os.path.getmtime(full_path)
files_with_mtime.append((full_path, mtime))
self.logger(f"发现文件: {f} (修改时间: {datetime.fromtimestamp(mtime)})")
latest = max(files_with_mtime, key=lambda x: x[1])[0]
return latest
except Exception as e:
self.logger(f"获取匹配文件失败: {str(e)}", "ERROR")
return None
def _get_latest_local_folder(self, path):
"""获取本地最新文件夹"""
try:
if not os.path.exists(path):
raise FileNotFoundError(f"路径不存在: {path}")
folders = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f))]
if not folders:
return None
# 按修改时间排序
folders_with_mtime = []
for f in folders:
full_path = os.path.join(path, f)
mtime = os.path.getmtime(full_path)
folders_with_mtime.append((full_path, mtime))
self.logger(f"发现版本文件夹: {f} (修改时间: {datetime.fromtimestamp(mtime)})")
latest = max(folders_with_mtime, key=lambda x: x[1])[0]
return latest
except Exception as e:
self.logger(f"获取最新文件夹失败: {str(e)}", "ERROR")
return None
def _execute_backup_move(self, src, dest, is_directory=False):
"""完整的备份移动操作"""
try:
if not self._remote_file_exists(src):
raise FileNotFoundError(f"源文件不存在: {src}")
cmd = f"mv {src} {dest}"
if not self.run_command(cmd):
raise Exception(f"移动操作失败: {cmd}")
return True
except Exception as e:
self.logger(f"备份移动失败: {src} -> {dest} - {str(e)}", "ERROR")
raise
def _create_backup_folder(self, backup_dir):
"""完整的备份文件夹创建"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
temp_dir = os.path.join(backup_dir, f"{timestamp}_temp")
final_dir = os.path.join(backup_dir, timestamp)
try:
self.sftp.mkdir(temp_dir)
return temp_dir, final_dir
except Exception as e:
raise Exception(f"创建备份文件夹失败: {str(e)}")
def run_frontend_backup(self, frontend_type):
"""完整的前端备份实现"""
config = Config.BACKUP_CONFIG[frontend_type]
self.logger(f"\n===== 开始备份 {frontend_type} 前端 =====", important=True)
try:
# 1. 创建时间戳文件夹
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_folder_name = f"backup_{timestamp}"
temp_backup_path = os.path.join(config['backup_dir'], backup_folder_name)
# 2. 确保备份目录存在
self._ensure_remote_dir_exists(config['backup_dir'])
# 3. 创建临时备份文件夹
self.sftp.mkdir(temp_backup_path)
self.logger(f"已创建备份文件夹: {temp_backup_path}", "INFO")
# 4. 备份index.html
index_src = os.path.join(config['remote_dir'], 'index.html')
index_dest = os.path.join(temp_backup_path, 'index.html')
if self._remote_file_exists(index_src):
self._execute_backup_move(index_src, index_dest)
else:
self.logger("未找到index.html文件,跳过备份", "WARNING")
# 5. 备份worker.js(仅前台)
if frontend_type == 'front':
worker_files = self._find_worker_files(config['remote_dir'])
if not worker_files:
self.logger("未找到worker.js文件,跳过备份", "WARNING")
for src in worker_files:
dest = os.path.join(temp_backup_path, os.path.basename(src))
self._execute_backup_move(src, dest)
# 6. 备份static目录
static_src = os.path.join(config['remote_dir'], 'static')
static_dest = os.path.join(temp_backup_path, 'static')
if self._remote_file_exists(static_src):
self._execute_backup_move(static_src, static_dest, is_directory=True)
else:
self.logger("未找到static目录,跳过备份", "WARNING")
self.logger(f"备份成功: {temp_backup_path}", important=True)
return True
except Exception as e:
self.logger(f"备份过程失败: {str(e)}", "ERROR")
return False
def run_command(self, command):
"""执行远程命令"""
try:
stdin, stdout, stderr = self.ssh.exec_command(command)
exit_status = stdout.channel.recv_exit_status()
error_output = stderr.read().decode().strip()
if error_output and exit_status != 0:
for line in error_output.split('\n'):
if line and not self._should_ignore_log(line):
self.logger(f"命令错误: {line}", "ERROR")
return False
output = stdout.read().decode().strip()
if output:
self.logger(f"命令输出: {output}", "INFO")
return True
except Exception as e:
self.logger(f"命令执行异常: {str(e)}", "ERROR")
return False
def _restore_config_js(self, remote_dir):
"""恢复原config.json文件"""
try:
# 检查备份的config.json是否存在
backup_config = os.path.join(remote_dir, 'backup_config.json')
target_config = os.path.join(remote_dir, 'static', 'config.json')
if self._remote_file_exists(backup_config):
self.logger("正在恢复原config.json文件...", "INFO")
if not self.run_command(f"mv {backup_config} {target_config}"):
raise Exception("恢复config.json失败")
return True
return False
except Exception as e:
self.logger(f"恢复config.json失败: {str(e)}", "ERROR")
return False
def upload_file(self, local_path, remote_path):
"""安全上传文件"""
try:
# 验证本地文件
if not os.path.exists(local_path):
raise FileNotFoundError(f"本地文件不存在: {local_path}")
file_size = os.path.getsize(local_path)
self.logger(f"准备上传: {os.path.basename(local_path)} ({file_size / 1024 / 1024:.2f}MB)", important=True)
# 带进度条上传
with tqdm(total=file_size, unit='B', unit_scale=True, desc="上传进度") as pbar:
def callback(transferred, total):
pbar.update(transferred - pbar.n)
self.sftp.put(local_path, remote_path, callback=callback)
# 验证上传结果
remote_size = self.sftp.stat(remote_path).st_size
if remote_size != file_size:
raise Exception(f"文件大小不匹配 (本地: {file_size}, 远程: {remote_size})")
self.logger("文件上传验证成功", important=True)
return True
except Exception as e:
self.logger(f"文件上传失败: {str(e)}", "ERROR")
# 清理可能上传失败的部分文件
try:
self.sftp.remove(remote_path)
except:
pass
return False
def prepare_frontend_archive(self, local_dir, files_to_update):
"""准备前端压缩包
:param local_dir: 本地目录路径
:param files_to_update: 需要更新的文件列表
"""
tmp_path = os.path.join(tempfile.gettempdir(), self.remote_archive_name)
try:
# 清理旧压缩包
if os.path.exists(tmp_path):
os.unlink(tmp_path)
# 创建新压缩包
with tarfile.open(tmp_path, "w:gz") as tar:
for item in files_to_update:
src_path = os.path.join(local_dir, item)
if item.endswith('/'):
dir_name = item[:-1]
if os.path.exists(src_path):
tar.add(src_path, arcname=dir_name)
else:
raise FileNotFoundError(f"目录不存在: {src_path}")
elif item == '*.worker.js':
worker_files = [f for f in os.listdir(local_dir) if f.endswith('.worker.js')]
if not worker_files:
self.logger("未找到任何.worker.js文件", "WARNING")
for f in worker_files:
tar.add(os.path.join(local_dir, f), arcname=f)
else:
if os.path.exists(src_path):
tar.add(src_path, arcname=item)
else:
raise FileNotFoundError(f"文件不存在: {src_path}")
self.logger(f"成功创建压缩包: {tmp_path}", "INFO")
return tmp_path, os.path.getsize(tmp_path)
except Exception as e:
self.logger(f"创建压缩包失败: {str(e)}", "ERROR")
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise
def deploy_frontend(self, frontend_type):
"""精确版前端更新流程"""
config = Config.FRONTEND_CONFIG[frontend_type]
backup_config = Config.BACKUP_CONFIG[frontend_type]
self.logger(f"\n===== 开始更新 {frontend_type} 前端 =====", important=True)
tmp_path = None
try:
# 1. 创建整体备份文件夹 -------------------------------------------
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir = os.path.join(backup_config['backup_dir'], f"bak_{timestamp}")
self._ensure_remote_dir_exists(backup_config['backup_dir'])
self.sftp.mkdir(backup_dir)
# 2. 直接备份config.json -----------------------------------------
remote_config = os.path.join(config['remote_dir'], 'static/config.json')
if self._remote_file_exists(remote_config):
self.run_command(f"cp {remote_config} {backup_dir}/config.json")
self.logger(f"config.json已备份到: {backup_dir}/config.json", "INFO")
else:
self.logger("未找到config.json,可能为新安装", "WARNING")
# 3. 整体移动文件到备份文件夹 -------------------------------------
backup_cmd = f"""
cd {config['remote_dir']} && \
mv index.html {backup_dir}/ && \
mv static {backup_dir}/ && \
{"mv *.worker.js " + backup_dir + "/ 2>/dev/null || echo" if frontend_type == 'front' else ":"}
"""
self.run_command(backup_cmd)
self.logger(f"整体备份完成: {backup_dir}", important=True)
# 4. 上传并解压新版本 --------------------------------------------
tmp_path = self.prepare_frontend_archive(config['local_dir'], config['files_to_update'])[0]
self.upload_file(tmp_path, os.path.join(config['remote_dir'], self.remote_archive_name))
self.run_command(f"""
cd {config['remote_dir']} && \
tar -xzf {self.remote_archive_name} && \
rm {self.remote_archive_name}
""")
# 5. 强制恢复config.json -----------------------------------------
if self._remote_file_exists(f"{backup_dir}/config.json"):
self.run_command(f"cp -f {backup_dir}/config.json {config['remote_dir']}/static/")
self.logger("config.json已强制恢复", important=True)
self.logger(f"{frontend_type}前端更新成功", important=True)
return True
except Exception as e:
self.logger(f"更新失败: {str(e)}", "ERROR")
return False
finally:
if tmp_path: os.unlink(tmp_path)
def deploy_backend(self, service_type):
"""更新后端服务(带备份功能)"""
config = Config.BACKEND_PATHS[service_type]
self.logger(f"\n===== 开始更新 {service_type} 后端服务 =====", important=True)
try:
# 1. 执行备份
if not self._backup_jar_file(service_type):
raise Exception("备份失败,中止更新")
# 2. 上传新JAR文件
remote_jar_path = os.path.join(config['remote'], os.path.basename(config['local']))
if not self.upload_file(config['local'], remote_jar_path):
raise Exception("文件上传失败")
# 3. 重启服务
if not self.run_command(config['command']):
raise Exception("服务重启失败")
self.logger(f"{service_type}后端更新成功", important=True)
return True
except Exception as e:
self.logger(f"{service_type}后端更新失败: {str(e)}", "ERROR")
return False
def _cleanup(self):
"""清理资源"""
if hasattr(self, 'sftp') and self.sftp:
self.sftp.close()
if hasattr(self, 'ssh') and self.ssh:
self.ssh.close()
self.logger("连接资源已释放", "INFO")
def deploy(self):
try:
if not self.connect():
return False
# 自动发现并更新后端
for service in ['inner', 'external']:
version_info = self._discover_latest_version('backend', service)
if not version_info:
return False
# 更新配置
Config.BACKEND_PATHS[service]['local'] = version_info['local']
if not self.deploy_backend(service):
return False
# 自动发现并更新前端
for frontend in ['front', 'admin']:
version_info = self._discover_latest_version('frontend', frontend)
if not version_info:
return False
# 更新配置
Config.FRONTEND_CONFIG[frontend]['local_dir'] = version_info['local_dir']
if not self.deploy_frontend(frontend):
return False
self.logger("\n===== 自动发现版本并更新完成 =====", important=True)
return True
except Exception as e:
self.logger(f"自动更新失败: {str(e)}", "ERROR")
return False
def _backup_jar_file(self, service_type):
"""备份当前的JAR文件"""
# 从BACKUP_CONFIG获取备份配置
config = Config.BACKUP_CONFIG[service_type]
self.logger(f"\n===== 开始备份 {service_type} 后端服务 =====", important=True)
try:
# 1. 确保备份目录存在
self._ensure_remote_dir_exists(config['backup_dir'])
# 2. 查找当前JAR文件
jar_name = config['files_to_backup'][0] # 从备份配置获取jar文件名
remote_jar = os.path.join(config['remote_dir'], jar_name)
if not self._remote_file_exists(remote_jar):
self.logger(f"未找到需要备份的JAR文件: {remote_jar}", "WARNING")
return True
# 3. 创建备份文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"{os.path.splitext(jar_name)[0]}_{timestamp}.jar"
backup_path = os.path.join(config['backup_dir'], backup_name)
# 4. 执行备份
if not self.run_command(f"cp {remote_jar} {backup_path}"):
raise Exception(f"备份命令执行失败: {remote_jar} -> {backup_path}")
self.logger(f"备份成功: {backup_path}", important=True)
return True
except Exception as e:
self.logger(f"{service_type}后端备份失败: {str(e)}", "ERROR")
return False
if __name__ == "__main__":
print("\n=== 预定系统服务更新工具 ===")
if Config.SSH_PASSWORD == "hzpassw0RD@KP":
print("\033[1;31m! 安全警告: 您正在使用默认密码!\033[0m")
deployer = Deployer()
if deployer.deploy():
print("\n\033[1;32m全栈更新成功!\033[0m")
sys.exit(0)
else:
print("\n\033[1;31m更新失败!\033[0m")
sys.exit(1)
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
预定系统全栈更新脚本 - 最终稳定版
功能:
1. 从本地共享路径自动发现最新版本
2. 先备份再更新服务器上的前后端服务
3. 支持后端服务在指定容器中执行启动脚本
4. 异常安全控制 + 日志记录
"""
import os
import sys
import re
import fnmatch
from datetime import datetime
import tarfile
import tempfile
import shutil
import paramiko
from tqdm import tqdm
class Config:
"""系统配置类"""
# ===== 连接配置 =====
SSH_PORT = 22
SSH_USER = "root"
SSH_PASSWORD = os.getenv('DEPLOY_SSH_PASSWORD') or "hzpassw0RD@KP"
# ===== 服务器配置 =====
SERVER_HOST = '139.159.163.86' # 默认使用兰州中石化测试环境
# 版本发现配置(本地网络路径)
VERSION_DISCOVERY = {
'backend': {
'inner': {
'search_path': r"\\192.168.9.9\deploy\01会议预定\标准版本-长期运维\01版本管理\01后端运行服务\内部预定",
'file_pattern': '*.jar'
},
'external': {
'search_path': r"\\192.168.9.9\deploy\01会议预定\标准版本-长期运维\01版本管理\01后端运行服务\对外预定",
'file_pattern': '*.jar'
}
},
'frontend': {
'front': {
'search_path': r"\\192.168.9.9\deploy\00项目管理\2025\L 兰州中石化项目\01版本管理\02前端PC网页",
'files_to_update': ['*.worker.js', 'index.html', 'static/']
},
'admin': {
'search_path': r"\\192.168.9.9\deploy\01会议预定\标准版本-预定后台\01版本管理",
'files_to_update': ['index.html', 'static/']
}
}
}
# ===== 部署目标 & 备份配置(远程)=====
DEPLOY_CONFIG = {
'front': {
'remote_dir': "/var/www/java/ubains-web-2.0/",
'backup_dir': "/var/www/java/ubains-web-2.0/backup/",
'type': 'frontend'
},
'admin': {
'remote_dir': "/var/www/java/ubains-web-admin/",
'backup_dir': "/var/www/java/ubains-web-admin/backup/",
'type': 'frontend'
},
'inner': {
'remote_dir': "/var/www/java/api-java-meeting2.0/",
'backup_dir': "/var/www/java/api-java-meeting2.0/backup/",
'jar_name': "ubains-meeting-inner-api-1.0-SNAPSHOT.jar",
'container': "ujava2",
'exec_cmd': "/var/www/java/api-java-meeting2.0/run.sh", # 绝对路径
'type': 'backend'
},
'external': {
'remote_dir': "/var/www/java/external-meeting-api/",
'backup_dir': "/var/www/java/external-meeting-api/backup/",
'jar_name': "ubains-meeting-api-1.0-SNAPSHOT.jar",
'container': "ujava2",
'exec_cmd': "/var/www/java/external-meeting-api/run.sh",
'type': 'backend'
}
}
# 日志过滤规则
IGNORABLE_LOG_PATTERNS = [
re.compile(r'unexpected operator'),
re.compile(r'Usage: kill'),
re.compile(r'kill -l'),
re.compile(r'Java HotSpot\(TM\)'),
re.compile(r'SLF4J:'),
re.compile(r'Listening for transport'),
re.compile(r'Starting Application'),
re.compile(r'tail:.*file truncated'),
re.compile(r'log4j:WARN')
]
class Deployer:
def __init__(self):
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.sftp = None
self.logger = self._setup_logger()
def _setup_logger(self):
def log(msg, level="INFO", important=False):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_msg = f"[{timestamp}] [{level}] {msg}"
if level in ("WARNING", "ERROR") or important:
print(f"\033[1m{log_msg}\033[0m")
if level == "ERROR":
sys.stderr.write(log_msg + "\n")
else:
print(log_msg)
return log
def connect(self):
max_retries = 3
for attempt in range(1, max_retries + 1):
try:
self.logger(f"尝试连接服务器 ({attempt}/{max_retries})...", important=True)
self.ssh.connect(
hostname=Config.SERVER_HOST,
port=Config.SSH_PORT,
username=Config.SSH_USER,
password=Config.SSH_PASSWORD,
timeout=30,
banner_timeout=200
)
self.sftp = self.ssh.open_sftp()
self.logger("服务器连接成功", important=True)
return True
except Exception as e:
self.logger(f"连接尝试 {attempt} 失败: {str(e)}", "WARNING")
if attempt == max_retries:
self.logger("连接服务器最终失败", "ERROR")
return False
def _ensure_remote_dir_exists(self, remote_dir):
try:
self.sftp.stat(remote_dir)
except FileNotFoundError:
self.sftp.mkdir(remote_dir)
self.logger(f"创建远程目录: {remote_dir}", "INFO")
def _get_latest_local_folder(self, path):
try:
folders = [f for f in os.listdir(path) if os.path.isdir(os.path.join(path, f))]
if not folders:
raise Exception("未找到有效版本文件夹")
latest = max(folders, key=lambda f: os.path.getmtime(os.path.join(path, f)))
return os.path.join(path, latest)
except Exception as e:
self.logger(f"获取最新文件夹失败: {e}", "ERROR")
return None
def _get_latest_matching_file(self, folder, pattern):
files = [os.path.join(folder, f) for f in os.listdir(folder) if fnmatch.fnmatch(f, pattern)]
if not files:
return None
latest = max(files, key=os.path.getmtime)
return latest
def _read_remote_file(self, remote_path):
try:
with self.sftp.open(remote_path, 'r') as fp:
return fp.read().decode('utf-8')
except Exception as e:
self.logger(f"读取远程文件失败: {e}", "ERROR")
return ""
def _discover_backend_version(self, service_type):
backend_config = Config.VERSION_DISCOVERY.get('backend')
if not backend_config:
self.logger("版本发现配置中缺少 'backend'", "ERROR")
return None
config = backend_config.get(service_type)
if not config:
self.logger(f"找不到名为 {service_type} 的后端服务配置", "ERROR")
return None
search_path = config['search_path']
# 获取本地最新版本文件夹
latest_folder = self._get_latest_local_folder(search_path)
if not latest_folder:
return None
# 提取本地版本号
local_version = self.extract_backend_version(os.path.basename(latest_folder))
# 获取远程 JAR 文件版本号(假设 JAR 文件名包含版本号)
remote_jar_path = os.path.join(config['search_path'], config['file_pattern'].replace("*.", ""))
if self._remote_file_exists(remote_jar_path):
remote_jar_name = os.path.basename(remote_jar_path)
remote_version = self.extract_backend_version(remote_jar_name)
# 对比版本号
if local_version <= remote_version:
self.logger(f"本地版本 {local_version} 不高于远程版本 {remote_version},跳过更新", "WARNING")
return None
# 获取 JAR 文件路径
jar_file = self._get_latest_matching_file(latest_folder, config['file_pattern'])
if not jar_file:
self.logger(f"未找到匹配 {config['file_pattern']} 的 JAR 文件", "ERROR")
return None
return {'local_jar': jar_file, 'local_version': local_version}
def _discover_frontend_version(self, frontend_type):
frontend_config = Config.VERSION_DISCOVERY.get('frontend')
if not frontend_config:
self.logger("版本发现配置中缺少 'frontend'", "ERROR")
return None
config = frontend_config.get(frontend_type)
if not config:
self.logger(f"找不到名为 {frontend_type} 的前端服务配置", "ERROR")
return None
search_path = config['search_path']
# 获取本地最新版本文件夹
latest_folder = self._get_latest_local_folder(search_path)
if not latest_folder:
return None
# 提取本地版本号
local_version = self.extract_frontend_version(os.path.basename(latest_folder))
# 获取远程版本号(假设有一个 version.txt 文件记录版本号)
remote_version_path = os.path.join(Config.DEPLOY_CONFIG[frontend_type]['remote_dir'], 'version.txt')
if self._remote_file_exists(remote_version_path):
remote_version_str = self._read_remote_file(remote_version_path)
remote_version = tuple(map(int, remote_version_str.strip().split('.')))
# 对比版本号
if local_version <= remote_version:
self.logger(f"本地版本 {local_version} 不高于远程版本 {remote_version},跳过更新", "WARNING")
return None
return {'local_dir': latest_folder, 'files': config['files_to_update'], 'local_version': local_version}
def _execute_command(self, command):
try:
stdin, stdout, stderr = self.ssh.exec_command(command)
exit_code = stdout.channel.recv_exit_status()
output = stdout.read().decode().strip()
error = stderr.read().decode().strip()
if output:
self.logger(f"命令输出: {output}", "INFO")
if error and exit_code != 0:
for line in error.splitlines():
if any(p.search(line) for p in Config.IGNORABLE_LOG_PATTERNS):
continue
self.logger(f"命令错误: {line}", "ERROR")
return False
return True
except Exception as e:
self.logger(f"命令执行失败: {e}", "ERROR")
return False
def extract_backend_version(self, folder_name):
match = re.search(r'V(\d+\.\d+\.\d+\.\d+)', folder_name)
if match:
return tuple(map(int, match.group(1).split('.')))
return (0, 0, 0, 0)
def extract_frontend_version(self, folder_name):
match = re.search(r'(\d+\.\d+\.\d+\.\d+)', folder_name)
if match:
return tuple(map(int, match.group(1).split('.')))
return (0, 0, 0, 0)
def _upload_jar(self, local_jar, remote_dir, jar_name):
try:
remote_path = os.path.join(remote_dir, jar_name)
file_size = os.path.getsize(local_jar)
self.logger(f"上传新版本至: {remote_path}")
progress_bar = tqdm(total=file_size, unit='B', unit_scale=True, desc="JAR上传进度")
def upload_progress_callback(sent, total):
progress_bar.update(sent - progress_bar.n)
self.sftp.put(local_jar, remote_path, callback=upload_progress_callback)
progress_bar.close()
return True
except Exception as e:
self.logger(f"上传失败: {e}", "ERROR")
return False
def _restart_backend_service(self, container, exec_cmd):
try:
cmd = f"docker exec -i {container} {exec_cmd}"
self.logger(f"正在重启服务: {cmd}")
if not self._execute_command(cmd):
raise Exception("服务重启失败")
return True
except Exception as e:
self.logger(f"重启服务失败: {e}", "ERROR")
return False
def _backup_and_update_backend(self, service_type, version_info):
config = Config.DEPLOY_CONFIG.get(service_type)
if not config:
self.logger(f"找不到名为 {service_type} 的部署配置", "ERROR")
return False
remote_dir = config['remote_dir']
backup_dir = config['backup_dir']
jar_name = config['jar_name']
local_version = version_info.get('local_version', (0, 0, 0, 0))
try:
self.logger(f"\n===== 开始更新 {service_type} 后端服务 (版本: {local_version}) =====", important=True)
# 确保备份目录存在
self._ensure_remote_dir_exists(backup_dir)
# 获取远程路径
current_jar = os.path.join(remote_dir, jar_name)
# 第一步:先做备份(如果存在)
if self._remote_file_exists(current_jar):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_jar = os.path.join(backup_dir, f"{jar_name}_{timestamp}.jar")
self.logger(f"正在备份旧版本到: {backup_jar}")
if not self._execute_command(f"cp {current_jar} {backup_jar}"):
raise Exception("备份失败")
# 第二步:上传新 JAR 文件
if not self._upload_jar(version_info['local_jar'], remote_dir, jar_name):
raise Exception("上传失败")
# 第三步:重启服务
if not self._restart_backend_service(config['container'], config['exec_cmd']):
raise Exception("服务重启失败")
self.logger(f"{service_type} 后端更新成功 (版本: {local_version})", important=True)
return True
except Exception as e:
self.logger(f"{service_type} 更新失败: {e}", "ERROR")
return False
def _remote_file_exists(self, remote_path):
try:
self.sftp.stat(remote_path)
return True
except:
return False
def _list_remote_files(self, remote_dir, pattern="*"):
try:
files = self.sftp.listdir(remote_dir)
matched_files = [f for f in files if fnmatch.fnmatch(f, pattern)]
return matched_files
except Exception as e:
self.logger(f"列出远程目录文件失败: {e}", "ERROR")
return []
def _create_frontend_backup(self, remote_dir, backup_dir):
try:
self._ensure_remote_dir_exists(backup_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = os.path.join(backup_dir, f"backup_{timestamp}")
self.sftp.mkdir(backup_path)
self.logger(f"创建备份目录: {backup_path}")
# 1. 备份 index.html
index_html_path = os.path.join(remote_dir, 'index.html')
if self._remote_file_exists(index_html_path):
self._execute_command(f"mv {index_html_path} {backup_path}/")
self.logger(f"已备份 index.html 到 {backup_path}", "INFO")
else:
self.logger("未找到 index.html,跳过备份", "WARNING")
# 2. 备份 static 目录
static_path = os.path.join(remote_dir, 'static')
if self._remote_file_exists(static_path):
self._execute_command(f"mv {static_path} {backup_path}/")
self.logger(f"已备份 static 目录到 {backup_path}", "INFO")
else:
self.logger("未找到 static 目录,跳过备份", "WARNING")
# 3. 备份 .worker.js 文件(支持通配符)
worker_files = self._list_remote_files(remote_dir, pattern="*.worker.js")
for worker_file in worker_files:
worker_path = os.path.join(remote_dir, worker_file)
self._execute_command(f"mv {worker_path} {backup_path}/")
self.logger(f"已备份 worker 文件: {worker_file}", "INFO")
return True
except Exception as e:
self.logger(f"前端备份失败: {e}", "ERROR")
return False
def _write_remote_file(self, remote_path, content):
try:
with self.sftp.open(remote_path, 'w') as fp:
fp.write(content)
return True
except Exception as e:
self.logger(f"写入远程文件失败: {e}", "ERROR")
return False
def _upload_frontend(self, local_dir, remote_dir):
tmp_archive = os.path.join(tempfile.gettempdir(), "frontend.tar.gz")
try:
# 打包前端文件
with tarfile.open(tmp_archive, "w:gz") as tar:
for root, dirs, files in os.walk(local_dir):
arcname = os.path.relpath(root, local_dir)
tar.add(root, arcname=arcname)
# 获取本地文件大小用于进度条
file_size = os.path.getsize(tmp_archive)
remote_archive = "/tmp/" + os.path.basename(tmp_archive) # 强制使用 Linux 风格路径
self.logger(f"正在上传文件到远程服务器: {remote_archive}")
# 定义进度条回调函数
progress_bar = tqdm(total=file_size, unit='B', unit_scale=True, desc="上传进度")
def upload_progress_callback(sent, total):
progress_bar.update(sent - progress_bar.n) # 更新进度条
# 使用带有进度条的上传
self.sftp.put(tmp_archive, remote_archive, callback=upload_progress_callback)
progress_bar.close()
# 远程解压并清理
self._execute_command(f"tar -xzf {remote_archive} -C {remote_dir} && rm -f {remote_archive}")
os.unlink(tmp_archive)
return True
except Exception as e:
self.logger(f"前端上传失败: {e}", "ERROR")
return False
def _update_frontend(self, frontend_type):
config = Config.DEPLOY_CONFIG.get(frontend_type)
if not config:
self.logger(f"找不到名为 {frontend_type} 的前端部署配置", "ERROR")
return False
remote_dir = config['remote_dir']
backup_dir = config['backup_dir']
try:
self.logger(f"\n===== 开始更新 {frontend_type} 前端 =====", important=True)
# 1. 获取最新前端文件
version_info = self._discover_frontend_version(frontend_type)
if not version_info:
raise Exception("未找到可用版本")
local_dir = version_info['local_dir']
local_version = version_info.get('local_version', (0, 0, 0, 0))
self.logger(f"使用版本路径: {local_dir}")
# 第一步:创建备份
if not self._create_frontend_backup(remote_dir, backup_dir):
raise Exception("备份失败")
# 第二步:上传并解压新版本
if not self._upload_frontend(local_dir, remote_dir):
raise Exception("上传失败")
# 第三步:更新版本号文件
version_file_path = os.path.join(remote_dir, 'version.txt')
self._write_remote_file(version_file_path, '.'.join(map(str, local_version)))
self.logger(f"{frontend_type} 前端更新成功 (版本: {local_version})", important=True)
return True
except Exception as e:
self.logger(f"{frontend_type} 前端更新失败: {e}", "ERROR")
return False
def deploy(self):
try:
if not self.connect():
return False
# 更新后端
for service in ['inner', 'external']:
info = self._discover_backend_version(service)
if not info:
self.logger(f"{service} 服务更新跳过", "WARNING")
continue
if not self._backup_and_update_backend(service, info):
return False
# 更新前端
for frontend in ['front', 'admin']:
if not self._update_frontend(frontend):
return False
self.logger("\n✅ 全栈更新完成!", important=True)
return True
except Exception as e:
self.logger(f"主流程出错: {e}", "ERROR")
return False
finally:
self._cleanup()
def _cleanup(self):
if self.sftp:
self.sftp.close()
if self.ssh:
self.ssh.close()
self.logger("资源清理完成")
if __name__ == "__main__":
print("\n=== 预定系统服务更新工具 ===")
if Config.SSH_PASSWORD == "hzpassw0RD@KP":
print("\033[1;31m! 安全警告: 您正在使用默认密码!\033[0m")
deployer = Deployer()
success = deployer.deploy()
if success:
print("\n\033[1;32m✅ 全栈更新成功!\033[0m")
sys.exit(0)
else:
print("\n\033[1;31m❌ 更新失败,请检查日志。\033[0m")
sys.exit(1)
\ No newline at end of file
......@@ -3,6 +3,7 @@ import queue
from Base.base import *
import time
import logging
import pull_remote_devlop
"""
调试主机-执行指令:
......@@ -70,8 +71,13 @@ start_workers(3)
# 定时执行预定系统测试任务
# schedule.every().day.at("10:00").do(run_task, run_automation_test, report_title="预定系统测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="JSON测试", ding_type="标准版巡检")
# 定时执行兰州中石化项目测试任务
schedule.every().day.at("06:00").do(run_task, run_automation_test, report_title="兰州中石化项目测试报告", report_url_prefix="http://nat.ubainsyun.com:31135", test_case="兰州中石化项目", ding_type="标准版巡检")
# 更新兰州项目环境,并定时执行兰州中石化项目测试任务
# 添加定时任务
schedule.every().day.at("09:05").do(
run_task,
pull_remote_devlop.Deployer().deploy
)
schedule.every().day.at("09:10").do(run_task, run_automation_test, report_title="兰州中石化项目测试报告", report_url_prefix="http://nat.ubainsyun.com:31135", test_case="兰州中石化项目", ding_type="标准版巡检")
# 定时执行展厅巡检任务
# schedule.every().day.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论