提交 1662376a authored 作者: 陈泽健's avatar 陈泽健

feat(deploy): 添加架构选择和网盘部署功能

- 支持通过--arch参数指定部署架构(x86, arm, arm_ubuntu)
- 新增从网盘上传部署包到目标服务器的功能
- 添加ARM服务器EMQX镜像拉取和导出脚本
- 添加Docker Registry API直接拉取ARM64镜像功能
- 更新部署报告显示目标架构信息
- 修改路径映射逻辑适配服务更新流程
上级 4e1b2b9d
...@@ -11,6 +11,8 @@ X86 + ARM 双架构统一自动化部署脚本 ...@@ -11,6 +11,8 @@ X86 + ARM 双架构统一自动化部署脚本
python full_deploy.py --verify # 仅验证阶段(API接口+日志检查) python full_deploy.py --verify # 仅验证阶段(API接口+日志检查)
python full_deploy.py --security # 仅安全修复(Nacos+MQTT) python full_deploy.py --security # 仅安全修复(Nacos+MQTT)
python full_deploy.py --package # 仅打包输出(压缩+MD5+下载) python full_deploy.py --package # 仅打包输出(压缩+MD5+下载)
python full_deploy.py --arch arm,arm_ubuntu --deploy # 仅部署指定架构(arm=75,arm_ubuntu=76)
可选架构: x86, arm, arm_ubuntu
""" """
import sys import sys
...@@ -46,6 +48,7 @@ CONFIGS = { ...@@ -46,6 +48,7 @@ CONFIGS = {
'deploy_dir': '/data/offline_auto_unifiedPlatform', 'deploy_dir': '/data/offline_auto_unifiedPlatform',
'deploy_pkg': 'offline_auto_unifiedPlatform.tar.gz', 'deploy_pkg': 'offline_auto_unifiedPlatform.tar.gz',
'deploy_md5': 'offline_auto_unifiedPlatform.tar.gz.md5', 'deploy_md5': 'offline_auto_unifiedPlatform.tar.gz.md5',
'nas_dir': r'Z:\发布版本\03服务器部署\15新统一平台\X86部署包\全量版\版本更新-待验证',
'deploy_script': 'new_auto.sh', 'deploy_script': 'new_auto.sh',
'deploy_answers': "y\ny\ny\ny\ny\ny\ny\nn\n", 'deploy_answers': "y\ny\ny\ny\ny\ny\ny\nn\n",
'deploy_log': '/data/offline_auto_unifiedPlatform/deploy_output.log', 'deploy_log': '/data/offline_auto_unifiedPlatform/deploy_output.log',
...@@ -90,8 +93,10 @@ CONFIGS = { ...@@ -90,8 +93,10 @@ CONFIGS = {
} }
} }
# 所有架构列表 # 所有架构列表(默认值,可通过--arch参数覆盖)
ALL_ARCHS = ['x86', 'arm', 'arm_ubuntu'] DEFAULT_ARCHS = ['x86', 'arm', 'arm_ubuntu']
# 当前执行的架构列表
ACTIVE_ARCHS = DEFAULT_ARCHS
# 超管配置 # 超管配置
ADMIN_USER = 'superadmin' ADMIN_USER = 'superadmin'
...@@ -245,6 +250,45 @@ def phase_deploy_single(arch): ...@@ -245,6 +250,45 @@ def phase_deploy_single(arch):
results['steps']['disk'] = False results['steps']['disk'] = False
results['errors'].append("/data 分区不存在") results['errors'].append("/data 分区不存在")
# 步骤1.5:从网盘上传部署包到目标服务器
nas_dir = cfg.get('nas_dir')
if nas_dir:
logger.log("【步骤1.5】从网盘上传部署包到目标服务器")
import shutil
local_tar = os.path.join(nas_dir, cfg['deploy_pkg'])
local_md5 = os.path.join(nas_dir, cfg['deploy_md5'])
if os.path.exists(local_tar) and os.path.exists(local_md5):
pkg_size = os.path.getsize(local_tar)
logger.log(f"网盘部署包: {cfg['deploy_pkg']} ({pkg_size/1024/1024/1024:.2f} GB)")
# 清理目标服务器上的旧部署包目录
logger.log("清理目标服务器上的旧部署目录...")
run(f"rm -rf {cfg['deploy_dir']}")
run(f"rm -f /data/{cfg['deploy_pkg']} /data/{cfg['deploy_md5']}")
# SFTP上传
try:
sftp = ssh.open_sftp()
logger.log(f"正在上传 {cfg['deploy_pkg']}...")
sftp.put(local_tar, f"/data/{cfg['deploy_pkg']}")
logger.log("tar.gz 上传完成")
logger.log(f"正在上传 {cfg['deploy_md5']}...")
sftp.put(local_md5, f"/data/{cfg['deploy_md5']}")
logger.log("md5 上传完成")
sftp.close()
except Exception as e:
logger.log(f"上传部署包失败: {e}", "ERROR")
results['errors'].append(f"上传部署包失败: {e}")
results['steps']['upload'] = False
return results
results['steps']['upload'] = True
logger.log("[OK] 部署包上传完成")
else:
logger.log(f"网盘中未找到部署包: {nas_dir}", "WARN")
logger.log("将检查目标服务器上是否已有部署包", "WARN")
# 步骤2:部署包校验(兼容_new后缀) # 步骤2:部署包校验(兼容_new后缀)
logger.log("【步骤2】部署包校验") logger.log("【步骤2】部署包校验")
pkg = cfg['deploy_pkg'] pkg = cfg['deploy_pkg']
...@@ -482,7 +526,7 @@ def phase_deploy(): ...@@ -482,7 +526,7 @@ def phase_deploy():
with ThreadPoolExecutor(max_workers=3) as pool: with ThreadPoolExecutor(max_workers=3) as pool:
futures = { futures = {
pool.submit(phase_deploy_single, arch): arch pool.submit(phase_deploy_single, arch): arch
for arch in ALL_ARCHS for arch in ACTIVE_ARCHS
} }
for future in as_completed(futures): for future in as_completed(futures):
arch = futures[future] arch = futures[future]
...@@ -504,7 +548,7 @@ def phase_verify(deploy_results=None): ...@@ -504,7 +548,7 @@ def phase_verify(deploy_results=None):
results = {} results = {}
for arch in ALL_ARCHS: for arch in ACTIVE_ARCHS:
cfg = CONFIGS[arch] cfg = CONFIGS[arch]
logger = Logger(f"{arch}_verify") logger = Logger(f"{arch}_verify")
r = {'api': {}, 'logs': {}, 'errors': []} r = {'api': {}, 'logs': {}, 'errors': []}
...@@ -590,7 +634,7 @@ def phase_security(): ...@@ -590,7 +634,7 @@ def phase_security():
results = {} results = {}
for arch in ALL_ARCHS: for arch in ACTIVE_ARCHS:
cfg = CONFIGS[arch] cfg = CONFIGS[arch]
logger = Logger(f"{arch}_security") logger = Logger(f"{arch}_security")
r = {'nacos': {}, 'emqx': {}, 'errors': []} r = {'nacos': {}, 'emqx': {}, 'errors': []}
...@@ -768,7 +812,7 @@ def phase_package(): ...@@ -768,7 +812,7 @@ def phase_package():
results = {} results = {}
for arch in ALL_ARCHS: for arch in ACTIVE_ARCHS:
cfg = CONFIGS[arch] cfg = CONFIGS[arch]
logger = Logger(f"{arch}_package") logger = Logger(f"{arch}_package")
r = {'files': {}, 'errors': []} r = {'files': {}, 'errors': []}
...@@ -904,7 +948,7 @@ def generate_reports(deploy_results, verify_results, security_results, package_r ...@@ -904,7 +948,7 @@ def generate_reports(deploy_results, verify_results, security_results, package_r
## 一、部署执行 ## 一、部署执行
""" """
for arch in ALL_ARCHS: for arch in ACTIVE_ARCHS:
cfg = CONFIGS[arch] cfg = CONFIGS[arch]
dr = deploy_results.get(arch, {}) dr = deploy_results.get(arch, {})
report += f"### {cfg['label']}\n\n" report += f"### {cfg['label']}\n\n"
...@@ -939,7 +983,7 @@ def generate_reports(deploy_results, verify_results, security_results, package_r ...@@ -939,7 +983,7 @@ def generate_reports(deploy_results, verify_results, security_results, package_r
# 验证结果 # 验证结果
report += "## 二、服务验证\n\n" report += "## 二、服务验证\n\n"
for arch in ALL_ARCHS: for arch in ACTIVE_ARCHS:
cfg = CONFIGS[arch] cfg = CONFIGS[arch]
vr = verify_results.get(arch, {}) vr = verify_results.get(arch, {})
report += f"### {cfg['label']}\n\n" report += f"### {cfg['label']}\n\n"
...@@ -970,7 +1014,7 @@ def generate_reports(deploy_results, verify_results, security_results, package_r ...@@ -970,7 +1014,7 @@ def generate_reports(deploy_results, verify_results, security_results, package_r
report += f"- **部署执行**: {'全部通过' if all_deploy_ok else '存在问题'}\n" report += f"- **部署执行**: {'全部通过' if all_deploy_ok else '存在问题'}\n"
report += f"- **API验证**: {'全部通过' if all_api_ok else '存在问题'}\n" report += f"- **API验证**: {'全部通过' if all_api_ok else '存在问题'}\n"
deploy_time_parts = [] deploy_time_parts = []
for arch in ALL_ARCHS: for arch in ACTIVE_ARCHS:
dt = deploy_results.get(arch, {}).get('deploy_time', 'N/A') dt = deploy_results.get(arch, {}).get('deploy_time', 'N/A')
deploy_time_parts.append(f"{CONFIGS[arch]['label']} {dt}min") deploy_time_parts.append(f"{CONFIGS[arch]['label']} {dt}min")
report += f"- **部署用时**: {', '.join(deploy_time_parts)}\n" report += f"- **部署用时**: {', '.join(deploy_time_parts)}\n"
...@@ -993,7 +1037,7 @@ def generate_reports(deploy_results, verify_results, security_results, package_r ...@@ -993,7 +1037,7 @@ def generate_reports(deploy_results, verify_results, security_results, package_r
## 一、漏洞扫描与修复 ## 一、漏洞扫描与修复
""" """
for arch in ALL_ARCHS: for arch in ACTIVE_ARCHS:
cfg = CONFIGS[arch] cfg = CONFIGS[arch]
sr = security_results.get(arch, {}) sr = security_results.get(arch, {})
sec_report += f"### {cfg['label']}\n\n" sec_report += f"### {cfg['label']}\n\n"
...@@ -1048,19 +1092,42 @@ def generate_reports(deploy_results, verify_results, security_results, package_r ...@@ -1048,19 +1092,42 @@ def generate_reports(deploy_results, verify_results, security_results, package_r
# ==================== 主流程 ==================== # ==================== 主流程 ====================
def main(): def main():
global ACTIVE_ARCHS
mode = 'full' mode = 'full'
if len(sys.argv) > 1: arch_args = None
arg = sys.argv[1].lower()
if arg in ('--deploy', '--verify', '--security', '--package'): # 解析命令行参数
mode = arg[2:] # 去掉 -- args = sys.argv[1:]
i = 0
while i < len(args):
arg = args[i].lower()
if arg == '--arch' and i + 1 < len(args):
arch_args = args[i + 1].split(',')
i += 2
elif arg in ('--deploy', '--verify', '--security', '--package'):
mode = arg[2:]
i += 1
elif arg == '--full': elif arg == '--full':
mode = 'full' mode = 'full'
i += 1
else: else:
print(f"用法: python {sys.argv[0]} [--deploy|--verify|--security|--package|--full]") print(f"用法: python {sys.argv[0]} [--arch arm,arm_ubuntu] [--deploy|--verify|--security|--package|--full]")
print(f"可选架构: x86, arm, arm_ubuntu")
return 1
# 设置目标架构
if arch_args:
invalid = [a for a in arch_args if a not in CONFIGS]
if invalid:
print(f"[ERROR] 无效的架构: {invalid},可选: {list(CONFIGS.keys())}")
return 1 return 1
ACTIVE_ARCHS = arch_args
server_count = len(ACTIVE_ARCHS)
labels = ', '.join(CONFIGS[a]['label'] for a in ACTIVE_ARCHS)
print("=" * 60) print("=" * 60)
print(f"3台服务器统一自动化部署 - 模式: {mode}") print(f"{server_count}台服务器自动化部署 - 模式: {mode}")
print(f"目标: {labels}")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60) print("=" * 60)
...@@ -1077,7 +1144,7 @@ def main(): ...@@ -1077,7 +1144,7 @@ def main():
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("部署完成 - 容器状态摘要") print("部署完成 - 容器状态摘要")
print("=" * 60) print("=" * 60)
for arch in ALL_ARCHS: for arch in ACTIVE_ARCHS:
dr = deploy_results.get(arch, {}) dr = deploy_results.get(arch, {})
count = dr.get('container_count', 0) count = dr.get('container_count', 0)
errors = dr.get('errors', []) errors = dr.get('errors', [])
...@@ -1094,12 +1161,13 @@ def main(): ...@@ -1094,12 +1161,13 @@ def main():
print("部署阶段完成!请执行授权操作:") print("部署阶段完成!请执行授权操作:")
print("使用 Chrome DevTools 进行浏览器自动化授权上传") print("使用 Chrome DevTools 进行浏览器自动化授权上传")
print("=" * 60) print("=" * 60)
print(f"\nX86-欧拉: https://192.168.5.52/#/LoginConfig") for arch in ACTIVE_ARCHS:
print(f"ARM-欧拉: https://192.168.9.75/#/LoginConfig") cfg = CONFIGS[arch]
print(f"ARM-Ubuntu: https://192.168.9.76/#/LoginConfig") print(f"\n{cfg['label']}: https://{cfg['host']}/#/LoginConfig")
print(f"超管: {ADMIN_USER} / {ADMIN_PASS}") print(f"超管: {ADMIN_USER} / {ADMIN_PASS}")
print(f"验证码: {CAPTCHA}") print(f"验证码: {CAPTCHA}")
print(f"\n授权完成后运行: python full_deploy.py --verify") arch_list = ','.join(ACTIVE_ARCHS)
print(f"\n授权完成后运行: python full_deploy.py --arch {arch_list} --verify")
return 0 return 0
if mode in ('full', 'verify'): if mode in ('full', 'verify'):
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
ARM架构EMQX镜像拉取与导出脚本
在ARM服务器上拉取正确架构的EMQX镜像,保存为tar.gz格式
用法:
python pull_emqx_arm.py # 在ARM-75服务器上拉取并导出
python pull_emqx_arm.py --host 76 # 在ARM-76服务器上拉取并导出
python pull_emqx_arm.py --version 6.0.0 # 指定版本(默认6.0.0)
python pull_emqx_arm.py --download # 拉取后下载到本地
"""
import sys
import os
import time
import paramiko
from datetime import datetime
# 修复Windows控制台编码
if sys.platform == 'win32':
try:
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
except Exception:
pass
# ARM服务器配置
ARM_SERVERS = {
'75': {
'host': '192.168.9.75',
'username': 'root',
'password': 'Ubains@123',
'label': 'ARM-欧拉 (192.168.9.75)',
'use_sudo': False,
},
'76': {
'host': '192.168.9.76',
'username': 'admin',
'password': 'Ubains@123',
'label': 'ARM-Ubuntu (192.168.9.76)',
'use_sudo': True,
'sudo_password': 'Ubains@123',
},
}
# 默认使用75服务器(欧拉,直接root)
DEFAULT_SERVER = '75'
# 镜像配置
DEFAULT_EMQX_VERSION = '6.0.0'
IMAGE_NAME = 'emqx/emqx'
OUTPUT_DIR = '/data/temp'
LOCAL_OUTPUT = r'E:\自动化部署\ARM-EMQX'
def log(msg, level="INFO"):
"""输出日志"""
ts = datetime.now().strftime('%H:%M:%S')
try:
print(f"[{ts}] [{level}] {msg}")
except UnicodeEncodeError:
print(f"[{ts}] [{level}] {msg}".encode('gbk', errors='replace').decode('gbk'))
def ssh_connect(host, username, password, timeout=30):
"""建立SSH连接"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host, username=username, password=password, timeout=timeout,
look_for_keys=False, allow_agent=False)
return ssh
def ssh_exec(ssh, cmd, timeout=600, use_sudo=False, sudo_password=None):
"""执行SSH命令"""
if use_sudo and sudo_password:
actual_cmd = f"echo '{sudo_password}' | sudo -S bash -c '{cmd}'"
stdin, stdout, stderr = ssh.exec_command(actual_cmd, timeout=timeout)
else:
stdin, stdout, stderr = ssh.exec_command(cmd, timeout=timeout)
out = stdout.read().decode('utf-8', errors='replace')
err = stderr.read().decode('utf-8', errors='replace')
exit_code = stdout.channel.recv_exit_status()
# 过滤sudo提示
if use_sudo and '[sudo]' in out:
out = out.split('\n', 1)[-1] if '\n' in out else out
return exit_code, out, err
def pull_and_export(ssh, version, use_sudo=False, sudo_password=None):
"""在ARM服务器上拉取EMQX镜像并导出"""
def run(cmd, timeout=600):
return ssh_exec(ssh, cmd, timeout, use_sudo, sudo_password)
image_tag = f"{IMAGE_NAME}:{version}"
tar_name = f"arm_uemqx-{version}.tar.gz"
md5_name = f"arm_uemqx-{version}.tar.gz.md5"
remote_tar = f"{OUTPUT_DIR}/{tar_name}"
remote_md5 = f"{OUTPUT_DIR}/{md5_name}"
# 步骤1:检查服务器架构
log("【步骤1】检查服务器架构")
_, arch_out, _ = run("uname -m")
arch = arch_out.strip()
log(f"服务器架构: {arch}")
if arch not in ('aarch64', 'arm64'):
log(f"[ERROR] 当前服务器不是ARM架构: {arch}", "ERROR")
return False
# 步骤2:检查Docker状态
log("【步骤2】检查Docker状态")
_, docker_ver, _ = run("docker version --format '{{.Server.Version}}'")
if not docker_ver.strip() or 'error' in docker_ver.lower():
log("[ERROR] Docker未正常运行", "ERROR")
return False
log(f"Docker版本: {docker_ver.strip()}")
# 步骤3:检查现有EMQX容器和镜像
log("【步骤3】检查现有EMQX状态")
_, container_out, _ = run("docker ps -a --filter name=uemqx --format '{{.Names}} {{.Status}}'")
if container_out.strip():
log(f"现有EMQX容器: {container_out.strip()}")
# 检查是否有exec format error
_, log_out, _ = run("docker logs uemqx --tail 10 2>&1")
if 'exec format error' in log_out:
log("[WARN] 检测到exec format error,当前镜像架构不匹配", "WARN")
# 停止并删除旧容器
log("停止并删除旧EMQX容器...")
run("docker stop uemqx 2>/dev/null")
run("docker rm uemqx 2>/dev/null")
else:
log("未发现现有EMQX容器")
# 检查现有镜像架构
_, img_out, _ = run(f"docker images --format '{{{{.Repository}}}}:{{{{.Tag}}}} {{{{.ID}}}}' | grep emqx")
if img_out.strip():
log(f"现有EMQX镜像: {img_out.strip()}")
# 检查镜像架构
for line in img_out.strip().split('\n'):
if line.strip():
img_id = line.strip().split()[-1]
_, inspect_out, _ = run(f"docker inspect {img_id} --format '{{{{.Architecture}}}}'")
log(f"镜像架构: {inspect_out.strip()}")
# 步骤4:拉取EMQX ARM64镜像
log(f"【步骤4】拉取 {image_tag} (ARM64)")
log("这可能需要几分钟时间...")
exit_code, pull_out, pull_err = run(f"docker pull {image_tag}", timeout=1800)
if exit_code != 0 and 'not found' in pull_err.lower():
log(f"[WARN] {version}版本拉取失败,尝试latest标签...", "WARN")
exit_code, pull_out, pull_err = run(f"docker pull {IMAGE_NAME}:latest", timeout=1800)
if exit_code == 0:
# 给latest打上版本标签
run(f"docker tag {IMAGE_NAME}:latest {image_tag}")
log("[OK] 已将latest标签为" + version)
if exit_code != 0:
log(f"[ERROR] 镜像拉取失败: {pull_err}", "ERROR")
log("服务器可能无法访问Docker Hub,请检查网络", "ERROR")
return False
log(f"拉取输出: {pull_out.strip()[-200:]}")
# 验证拉取的镜像架构
_, new_arch, _ = run(f"docker inspect {image_tag} --format '{{{{.Architecture}}}}'")
log(f"新镜像架构: {new_arch.strip()}")
if 'arm64' not in new_arch.lower() and 'aarch64' not in new_arch.lower():
log(f"[WARN] 拉取的镜像架构不是ARM64: {new_arch.strip()}", "WARN")
# 步骤5:导出镜像
log(f"【步骤5】导出镜像到 {remote_tar}")
run(f"mkdir -p {OUTPUT_DIR}")
# 使用docker save + gzip管道导出
log("正在压缩导出,可能需要较长时间...")
exit_code, _, save_err = run(
f"docker save {image_tag} | gzip > {remote_tar}",
timeout=3600
)
if exit_code != 0:
# 尝试分步方式
log("管道方式失败,尝试分步导出...")
run(f"docker save {image_tag} -o /tmp/emqx_temp.tar", timeout=3600)
run(f"gzip -c /tmp/emqx_temp.tar > {remote_tar}")
run("rm -f /tmp/emqx_temp.tar")
# 验证文件
_, size_out, _ = run(f"ls -lh {remote_tar}")
log(f"导出文件: {size_out.strip()}")
# 检查文件大小(应大于100MB)
_, size_bytes, _ = run(f"stat -c %s {remote_tar} 2>/dev/null || stat -f %z {remote_tar} 2>/dev/null")
try:
sz = int(size_bytes.strip().split('\n')[0])
if sz < 100 * 1024 * 1024:
log(f"[WARN] 文件过小({sz}字节),可能导出不完整", "WARN")
else:
log(f"[OK] 文件大小正常: {sz // (1024*1024)}MB")
except ValueError:
pass
# 步骤6:生成MD5
log(f"【步骤6】生成MD5校验文件")
_, md5_out, _ = run(f"md5sum {remote_tar}")
md5_value = md5_out.strip().split()[0] if md5_out.strip() else 'unknown'
run(f"echo '{md5_out.strip()}' > {remote_md5}")
log(f"MD5: {md5_value}")
# 步骤7:加载测试
log(f"【步骤7】镜像加载测试")
# 删除现有镜像,重新加载验证
run(f"docker rmi {image_tag} 2>/dev/null")
log("重新加载镜像...")
_, load_out, load_err = run(f"docker load -i {remote_tar}", timeout=1800)
if load_out.strip():
log(f"加载输出: {load_out.strip()[-200:]}")
if 'exec format error' in load_err:
log("[ERROR] 加载后仍有exec format error", "ERROR")
return False
# 验证加载后的镜像
_, loaded_img, _ = run(f"docker images {image_tag} --format '{{{{.Repository}}}}:{{{{.Tag}}}} {{{{.ID}}}}'")
log(f"已加载镜像: {loaded_img.strip()}")
_, loaded_arch, _ = run(f"docker inspect {image_tag} --format '{{{{.Architecture}}}}'")
log(f"加载后架构: {loaded_arch.strip()}")
log(f"[OK] EMQX ARM64镜像导出完成")
log(f" 文件路径: {remote_tar}")
log(f" MD5文件: {remote_md5}")
log(f" MD5值: {md5_value}")
return {
'remote_tar': remote_tar,
'remote_md5': remote_md5,
'md5_value': md5_value,
'version': version,
}
def download_files(ssh, result, server_cfg):
"""通过SFTP下载文件到本地"""
log("【下载】从服务器下载镜像文件到本地")
os.makedirs(LOCAL_OUTPUT, exist_ok=True)
sftp = ssh.open_sftp()
# 下载tar.gz
local_tar = os.path.join(LOCAL_OUTPUT, os.path.basename(result['remote_tar']))
log(f"下载镜像文件到: {local_tar}")
log("文件较大,可能需要较长时间...")
remote_size = sftp.stat(result['remote_tar']).st_size
total_mb = remote_size // (1024 * 1024)
last_report = [0]
def progress(transferred, total):
if transferred - last_report[0] > 50 * 1024 * 1024:
pct = transferred * 100 // total
mb = transferred // (1024 * 1024)
log(f" 下载进度: {pct}% ({mb}MB/{total_mb}MB)")
last_report[0] = transferred
sftp.get(result['remote_tar'], local_tar, callback=progress)
log(f"[OK] 镜像文件已下载: {local_tar} ({total_mb}MB)")
# 下载md5
local_md5 = os.path.join(LOCAL_OUTPUT, os.path.basename(result['remote_md5']))
sftp.get(result['remote_md5'], local_md5)
log(f"[OK] MD5文件已下载: {local_md5}")
sftp.close()
return local_tar, local_md5
def main():
server_key = DEFAULT_SERVER
version = DEFAULT_EMQX_VERSION
do_download = False
# 解析参数
args = sys.argv[1:]
i = 0
while i < len(args):
arg = args[i].lower()
if arg == '--host' and i + 1 < len(args):
server_key = args[i + 1]
i += 2
elif arg == '--version' and i + 1 < len(args):
version = args[i + 1]
i += 2
elif arg == '--download':
do_download = True
i += 1
else:
print(f"用法: python {sys.argv[0]} [--host 75|76] [--version 6.0.0] [--download]")
return 1
if server_key not in ARM_SERVERS:
print(f"[ERROR] 无效服务器: {server_key},可选: {list(ARM_SERVERS.keys())}")
return 1
cfg = ARM_SERVERS[server_key]
use_sudo = cfg.get('use_sudo', False)
sudo_password = cfg.get('sudo_password')
print("=" * 60)
print(f"ARM EMQX镜像拉取与导出")
print(f"目标服务器: {cfg['label']}")
print(f"EMQX版本: {version}")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
# SSH连接
log(f"连接 {cfg['label']}...")
try:
ssh = ssh_connect(cfg['host'], cfg['username'], cfg['password'])
log("SSH连接成功")
except Exception as e:
log(f"SSH连接失败: {e}", "ERROR")
return 1
try:
# 拉取并导出
result = pull_and_export(ssh, version, use_sudo, sudo_password)
if result:
if do_download:
download_files(ssh, result, cfg)
else:
log(f"\n镜像已导出到服务器: {result['remote_tar']}")
log(f"如需下载到本地,请使用 --download 参数")
log(f"或手动SFTP下载: {result['remote_tar']}")
else:
log("镜像拉取或导出失败", "ERROR")
return 1
except Exception as e:
log(f"执行异常: {e}", "ERROR")
import traceback
traceback.print_exc()
return 1
finally:
ssh.close()
log("SSH连接已关闭")
print("\n" + "=" * 60)
print("执行完成")
print("=" * 60)
return 0
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
通过Docker Registry HTTP API直接拉取ARM64镜像并导出为docker save格式tar.gz
无需安装Docker
用法:
python registry_pull_arm.py # 拉取emqx/emqx:6.0.0 ARM64
python registry_pull_arm.py --image emqx/emqx:6.0.0 # 指定镜像
python registry_pull_arm.py --output arm_uemqx-6.0.0 # 指定输出文件名(不含扩展名)
"""
import sys
import os
import json
import hashlib
import time
import requests
from datetime import datetime
# 修复Windows控制台编码
if sys.platform == 'win32':
try:
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
except Exception:
pass
# Docker Hub配置
DOCKER_HUB_AUTH = 'https://auth.docker.io/token'
DOCKER_HUB_REGISTRY = 'https://registry-1.docker.io'
# 输出目录
OUTPUT_DIR = r'E:\自动化部署\ARM-EMQX'
def log(msg, level="INFO"):
ts = datetime.now().strftime('%H:%M:%S')
try:
print(f"[{ts}] [{level}] {msg}")
except UnicodeEncodeError:
print(f"[{ts}] [{level}] {msg}".encode('gbk', errors='replace').decode('gbk'))
class DockerRegistryClient:
"""Docker Registry HTTP API V2 客户端"""
def __init__(self):
self.session = requests.Session()
self.session.timeout = 60
self.tokens = {}
def get_token(self, repository):
"""获取Docker Hub认证token"""
if repository in self.tokens:
return self.tokens[repository]
log(f"获取认证token: {repository}")
resp = self.session.get(DOCKER_HUB_AUTH, params={
'service': 'registry.docker.io',
'scope': f'repository:{repository}:pull',
})
resp.raise_for_status()
token = resp.json()['token']
self.tokens[repository] = token
return token
def _headers(self, repository):
return {
'Authorization': f'Bearer {self.get_token(repository)}',
'Accept': 'application/vnd.docker.distribution.manifest.v2+json, '
'application/vnd.docker.distribution.manifest.list.v2+json, '
'application/vnd.oci.image.index.v1+json, '
'application/vnd.oci.image.manifest.v1+json',
}
def get_manifest(self, repository, tag):
"""获取镜像manifest"""
url = f"{DOCKER_HUB_REGISTRY}/v2/{repository}/manifests/{tag}"
resp = self.session.get(url, headers=self._headers(repository))
resp.raise_for_status()
return resp.json(), resp.headers.get('Content-Type', '')
def get_manifest_by_digest(self, repository, digest):
"""通过digest获取manifest"""
url = f"{DOCKER_HUB_REGISTRY}/v2/{repository}/manifests/{digest}"
resp = self.session.get(url, headers=self._headers(repository))
resp.raise_for_status()
return resp.json(), resp.headers.get('Content-Type', '')
def download_blob(self, repository, digest, output_path):
"""下载blob(层或配置文件)"""
url = f"{DOCKER_HUB_REGISTRY}/v2/{repository}/blobs/{digest}"
headers = {
'Authorization': f'Bearer {self.get_token(repository)}',
}
# 支持断点续传
existing_size = 0
if os.path.exists(output_path):
existing_size = os.path.getsize(output_path)
# 校验已有文件
sha256 = hashlib.sha256()
with open(output_path, 'rb') as f:
while True:
chunk = f.read(8192)
if not chunk:
break
sha256.update(chunk)
if sha256.hexdigest() == digest.replace('sha256:', ''):
log(f" [跳过] 已存在且校验通过: {os.path.basename(output_path)}")
return True
resp = self.session.get(url, headers=headers, stream=True)
if resp.status_code == 404:
log(f" [WARN] blob不存在: {digest[:20]}...", "WARN")
return False
resp.raise_for_status()
total = int(resp.headers.get('Content-Length', 0))
downloaded = 0
last_report = 0
with open(output_path, 'wb') as f:
for chunk in resp.iter_content(chunk_size=1024 * 1024):
f.write(chunk)
downloaded += len(chunk)
if total and (downloaded - last_report) > 10 * 1024 * 1024:
pct = downloaded * 100 // total
log(f" 下载进度: {pct}% ({downloaded // (1024*1024)}MB/{total // (1024*1024)}MB)")
last_report = downloaded
return True
def find_arm64_manifest(client, repository, tag):
"""在多架构镜像中找到ARM64 manifest"""
log(f"获取镜像manifest: {repository}:{tag}")
manifest, content_type = client.get_manifest(repository, tag)
# 检查是否是多架构manifest list/index
media_type = manifest.get('mediaType', content_type)
if 'manifest.list' in media_type or 'image.index' in media_type or 'manifests' in manifest:
log(f"多架构镜像,共 {len(manifest.get('manifests', []))} 个平台")
for m in manifest.get('manifests', []):
platform = m.get('platform', {})
arch = platform.get('architecture', '')
variant = platform.get('variant', '')
os_name = platform.get('os', '')
log(f" 平台: {os_name}/{arch}/{variant} digest={m['digest'][:30]}...")
if arch in ('arm64', 'aarch64') and os_name == 'linux':
log(f" [选中] ARM64 manifest: {m['digest']}")
return m['digest']
log("[ERROR] 未找到linux/arm64平台", "ERROR")
return None
else:
# 单架构manifest,检查架构
log("单架构镜像,直接使用")
return tag
def pull_image_as_docker_save(repository, tag, output_base, platform='linux/arm64'):
"""拉取指定平台镜像并保存为docker save格式tar.gz"""
client = DockerRegistryClient()
# 步骤1:获取ARM64 manifest
log("【步骤1】获取ARM64 manifest")
digest_or_tag = find_arm64_manifest(client, repository, tag)
if not digest_or_tag:
return False
# 获取具体manifest
log("【步骤2】下载ARM64镜像manifest")
manifest, content_type = client.get_manifest_by_digest(repository, digest_or_tag)
media_type = manifest.get('mediaType', content_type)
# 如果还是manifest list(某些情况),需要再次解析
if 'manifests' in manifest:
for m in manifest['manifests']:
p = m.get('platform', {})
if p.get('architecture') in ('arm64', 'aarch64'):
manifest, _ = client.get_manifest_by_digest(repository, m['digest'])
break
config_digest = manifest.get('config', {}).get('digest', '')
layers = manifest.get('layers', [])
log(f"配置digest: {config_digest}")
log(f"层数: {len(layers)}")
for i, layer in enumerate(layers):
size_mb = layer.get('size', 0) // (1024 * 1024)
log(f" 层{i+1}: {layer['digest'][:30]}... ({size_mb}MB)")
# 创建临时目录
temp_dir = output_base + '_temp'
os.makedirs(temp_dir, exist_ok=True)
# 步骤3:下载config blob
log("【步骤3】下载配置文件")
config_file = os.path.join(temp_dir, config_digest.replace('sha256:', '') + '.json')
if not client.download_blob(repository, config_digest, config_file):
log("[ERROR] 配置文件下载失败", "ERROR")
return False
# 读取config获取历史信息
with open(config_file, 'r') as f:
config_data = json.load(f)
config_arch = config_data.get('architecture', 'unknown')
config_os = config_data.get('os', 'unknown')
log(f"镜像架构: {config_os}/{config_arch}")
if config_arch not in ('arm64', 'aarch64'):
log(f"[WARN] 镜像架构不是ARM64: {config_arch},继续执行...", "WARN")
# 步骤4:下载所有层
log(f"【步骤4】下载 {len(layers)} 个层文件")
layer_files = []
for i, layer in enumerate(layers):
layer_digest = layer['digest']
layer_hash = layer_digest.replace('sha256:', '')
log(f" 下载层{i+1}/{len(layers)}: {layer_hash[:20]}...")
layer_file = os.path.join(temp_dir, layer_hash + '.tar.gz')
if client.download_blob(repository, layer_digest, layer_file):
layer_files.append((layer_hash, layer_file))
else:
log(f" [ERROR] 层{i+1}下载失败", "ERROR")
return False
# 步骤5:组装docker save格式
log("【步骤5】组装docker save格式")
# 读取config文件内容来构建layer目录
config_hash = config_digest.replace('sha256:', '')
# 构建manifest.json
manifest_json = [{
"Config": f"{config_hash}.json",
"RepoTags": [f"{repository}:{tag}"],
"Layers": [],
}]
# 每层需要解压后放入目录
import tarfile
import gzip
layer_tar_files = []
for i, (layer_hash, layer_file) in enumerate(layer_files):
log(f" 处理层{i+1}/{len(layer_files)}...")
# 检查层文件是否是gzip压缩的
with open(layer_file, 'rb') as f:
magic = f.read(2)
layer_tar_name = f"{layer_hash}/layer.tar"
layer_dir = os.path.join(temp_dir, layer_hash)
os.makedirs(layer_dir, exist_ok=True)
if magic == b'\x1f\x8b':
# gzip压缩的,解压到layer.tar
log(f" 解压gzip层...")
with gzip.open(layer_file, 'rb') as gz_in:
with open(os.path.join(layer_dir, 'layer.tar'), 'wb') as tar_out:
while True:
chunk = gz_in.read(8 * 1024 * 1024)
if not chunk:
break
tar_out.write(chunk)
else:
# 已经是tar格式,直接复制
import shutil
shutil.copy2(layer_file, os.path.join(layer_dir, 'layer.tar'))
# 写VERSION文件
with open(os.path.join(layer_dir, 'VERSION'), 'w') as f:
f.write('1.0')
# 写json文件
layer_json = {
"id": layer_hash,
"created": config_data.get('created', ''),
}
# 从config的history中获取cmd
history = config_data.get('history', [])
if i < len(history):
if 'empty_layer' not in history[i]:
layer_json['created'] = history[i].get('created', '')
with open(os.path.join(layer_dir, 'layer.tar.json'), 'w') as f:
json.dump(layer_json, f)
# 尝试用config中的diff_ids
rootfs = config_data.get('rootfs', {})
diff_ids = rootfs.get('diff_ids', [])
if i < len(diff_ids):
layer_json['id'] = diff_ids[i].replace('sha256:', '')
with open(os.path.join(layer_dir, 'layer.tar.json'), 'w') as f:
json.dump(layer_json, f)
manifest_json[0]['Layers'].append(layer_tar_name)
# 清理原始下载文件
os.remove(layer_file)
layer_tar_files.append(layer_hash)
# 写manifest.json
with open(os.path.join(temp_dir, 'manifest.json'), 'w') as f:
json.dump(manifest_json, f)
# 写repositories文件
repos = {
repository: {
tag: layer_tar_files[-1] if layer_tar_files else config_hash
}
}
with open(os.path.join(temp_dir, 'repositories'), 'w') as f:
json.dump(repos, f)
# 步骤6:打包为tar.gz
log("【步骤6】打包为tar.gz")
output_tar = output_base + '.tar.gz'
os.makedirs(os.path.dirname(output_tar), exist_ok=True)
with tarfile.open(output_tar, 'w:gz') as tar:
for item in os.listdir(temp_dir):
item_path = os.path.join(temp_dir, item)
tar.add(item_path, arcname=item)
# 步骤7:生成MD5
log("【步骤7】生成MD5校验")
md5 = hashlib.md5()
with open(output_tar, 'rb') as f:
while True:
chunk = f.read(8 * 1024 * 1024)
if not chunk:
break
md5.update(chunk)
md5_file = output_tar + '.md5'
with open(md5_file, 'w') as f:
f.write(f"{md5.hexdigest()} {os.path.basename(output_tar)}\n")
# 清理临时目录
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
# 输出结果
file_size = os.path.getsize(output_tar)
log(f"\n[OK] 镜像导出完成")
log(f" 文件: {output_tar}")
log(f" 大小: {file_size // (1024*1024)}MB")
log(f" MD5: {md5.hexdigest()}")
log(f" 架构: {config_os}/{config_arch}")
return True
def main():
image = 'emqx/emqx:6.0.0'
output_name = 'arm_uemqx-6.0.0'
# 解析参数
args = sys.argv[1:]
i = 0
while i < len(args):
arg = args[i]
if arg == '--image' and i + 1 < len(args):
image = args[i + 1]
i += 2
elif arg == '--output' and i + 1 < len(args):
output_name = args[i + 1]
i += 2
else:
print(f"用法: python {sys.argv[0]} [--image emqx/emqx:6.0.0] [--output arm_uemqx-6.0.0]")
return 1
# 解析image
if ':' in image:
repository, tag = image.rsplit(':', 1)
else:
repository = image
tag = 'latest'
output_path = os.path.join(OUTPUT_DIR, output_name)
print("=" * 60)
print(f"Docker Registry API 镜像拉取 (ARM64)")
print(f"镜像: {repository}:{tag}")
print(f"平台: linux/arm64")
print(f"输出: {output_path}.tar.gz")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
try:
success = pull_image_as_docker_save(repository, tag, output_path, 'linux/arm64')
return 0 if success else 1
except KeyboardInterrupt:
print("\n[WARN] 用户中断")
return 1
except Exception as e:
log(f"执行异常: {e}", "ERROR")
import traceback
traceback.print_exc()
return 1
if __name__ == '__main__':
sys.exit(main())
# X86自动更新部署包版本报告
## 基本信息
- 执行时间:2026-06-03 22:32:22
- 结束时间:2026-06-03 23:15:56
- 总耗时:43分33秒
- 执行状态:**成功**
## 操作步骤记录
| 序号 | 步骤名称 | 时间 | 状态 | 详情 |
|------|---------|------|------|------|
| 3 | 测试服务器打包 | 2026-06-03 22:55:01 | ✅ 成功 | 服务包大小: 5.69 GB |
| 4 | 解压服务包 | 2026-06-03 22:56:54 | ✅ 成功 | |
| 5 | 服务更新替换 | 2026-06-03 22:57:59 | ✅ 成功 | 共更新 23 个服务 |
| 7 | 重新打包 | 2026-06-03 23:09:59 | ✅ 成功 | 文件: /data/offline_auto_unifiedPlatform.tar.gz, 大小: 8.75 GB, MD5: 6b0b127b3ebcf574d312f77a1ecfc984 |
| 8 | 拷贝到网盘 | 2026-06-03 23:15:54 | ✅ 成功 | |
| 9 | 清理临时文件 | 2026-06-03 23:15:56 | ✅ 成功 | |
## 服务更新详情
| 服务名称 | 类型 | 更新前大小 | 更新后大小 | 状态 |
|---------|------|-----------|-----------|------|
| ai包 | 前端 | 0 B | 4.0 KB | ✅ 成功 |
| 后台包 | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| main包 | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| meetngV2包 | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| meetngV3包 | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| meetingControl包 | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| monitor包 | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| platform包 | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| voice包 | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| h5-meeting | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| h5-moniter | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| h5-platform-mobile | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| h5-platform-platform-mobile | 前端 | 4.0 KB | 4.0 KB | ✅ 成功 |
| auth包 | 后端jar | 134.6 MB | 125.9 MB | ✅ 成功 |
| gatway包 | 后端jar | 225.5 MB | 127.9 MB | ✅ 成功 |
| system包 | 后端jar | 142.9 MB | 142.9 MB | ✅ 成功 |
| java2.0包 | 后端jar | 343.9 MB | 344.5 MB | ✅ 成功 |
| java-extapi包 | 后端jar | 337.8 MB | 337.9 MB | ✅ 成功 |
| java-scheduling包 | 后端jar | 225.0 MB | 225.0 MB | ✅ 成功 |
| java-mqtt包 | 后端jar | 307.7 MB | 307.7 MB | ✅ 成功 |
| java-quartz包 | 后端jar | 316.6 MB | 316.6 MB | ✅ 成功 |
| cmdb包 | 后端文件夹 | 4.0 KB | 4.0 KB | ✅ 成功 |
| voice包 | 后端文件夹 | 4.0 KB | 4.0 KB | ✅ 成功 |
## 最终产物
- 压缩包:`offline_auto_unifiedPlatform.tar.gz`
- MD5文件:`offline_auto_unifiedPlatform.tar.gz.md5`
- 网盘路径:`Z:\发布版本\03服务器部署\15新统一平台\X86部署包\全量版\版本更新-待验证`
...@@ -581,8 +581,8 @@ class X86PackageUpdate: ...@@ -581,8 +581,8 @@ class X86PackageUpdate:
if self.path_mapping: if self.path_mapping:
self.log("执行路径映射重命名...") self.log("执行路径映射重命名...")
for test_path, build_path in self.path_mapping.items(): for test_path, build_path in self.path_mapping.items():
old_dir = f"{extract_dir}/services/{test_path}" old_dir = f"{extract_dir}/{test_path}"
new_dir = f"{extract_dir}/services/{build_path}" new_dir = f"{extract_dir}/{build_path}"
exit_code, out, err = self._exec_build_cmd( exit_code, out, err = self._exec_build_cmd(
f'if [ -d "{old_dir}" ]; then ' f'if [ -d "{old_dir}" ]; then '
f'mv "{old_dir}" "{new_dir}" && echo "RENAMED"; fi' f'mv "{old_dir}" "{new_dir}" && echo "RENAMED"; fi'
...@@ -611,7 +611,7 @@ class X86PackageUpdate: ...@@ -611,7 +611,7 @@ class X86PackageUpdate:
self.log("=" * 50) self.log("=" * 50)
services_base = '/data/offline_auto_unifiedPlatform/data/services' services_base = '/data/offline_auto_unifiedPlatform/data/services'
update_base = '/data/services_update/services' update_base = '/data/services_update'
all_success = True all_success = True
# ---------- 更新前端服务 ---------- # ---------- 更新前端服务 ----------
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论