提交 3940433b authored 作者: 陈泽健's avatar 陈泽健

feat(scripts): 添加自动化服务监测脚本

- 实现SSH连接管理,支持多服务器连接与自动重连
- 添加平台识别功能,自动检测新统一平台(/data/services)与传统平台(/var/www)
- 实现系统识别,检测ujava、upython、upython_voice容器并映射业务系统
- 添加日志暴涨检测,采用增量行数窗口法监测日志增长速率
- 实现ERROR日志审计,抓取错误上下文并记录发生时间段
- 添加内存资源监控,记录使用峰值、平均值与时间点
- 实现MySQL连接数监测,检测容器内连接数并判断暴涨情况
- 集成多线程监控,支持并发监测多台服务器
- 配置中文日志输出与滚动文件日志记录
上级 6cf626a3
import paramiko
import socket
import re
import time
from dataclasses import dataclass
from typing import List, Optional, Tuple
import logging
from logging.handlers import RotatingFileHandler
import threading
@dataclass
class ServerConfig:
host: str
username: str
password: str
port: int = 22
@dataclass
class PlatformInfo:
is_new_platform: bool # True => /data/services exists; False => legacy /var/www
base_path: str # '/data/services' or '/var/www'
@dataclass
class SystemInfo:
has_ujava: bool
has_upython: bool
has_upython_voice: bool
systems: List[str] # e.g., ['meeting', 'ops', 'transcription']
class SSHClientWrapper:
def __init__(self, config: ServerConfig, timeout: int = 10):
self.config = config
self.timeout = timeout
self.client: Optional[paramiko.SSHClient] = None
def connect(self) -> None:
# Close any previous client before reconnecting
self.close()
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(
hostname=self.config.host,
port=self.config.port,
username=self.config.username,
password=self.config.password,
timeout=self.timeout,
allow_agent=False,
look_for_keys=False,
)
# Enable TCP keepalive on the underlying socket and Paramiko keepalive
transport = self.client.get_transport()
if transport:
transport.set_keepalive(30) # SSH-level keepalive every 30s
# Try to set OS-level TCP keepalive
sock = transport.sock
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Optional platform-specific tuning (Linux)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
except Exception:
pass
def exec(self, command: str) -> Tuple[str, str, int]:
if not self.client:
raise RuntimeError("SSH client not connected.")
stdin, stdout, stderr = self.client.exec_command(command, timeout=self.timeout)
out = stdout.read().decode(errors="ignore")
err = stderr.read().decode(errors="ignore")
code = stdout.channel.recv_exit_status()
return out, err, code
def exists(self, path: str) -> bool:
# Use test -d for directories; test -e for general existence if needed
_, _, code = self.exec(f"test -d '{path}'")
return code == 0
def close(self) -> None:
try:
if self.client:
self.client.close()
finally:
self.client = None
def detect_platform(ssh: SSHClientWrapper) -> PlatformInfo:
# New platform if /data/services exists; else legacy
if ssh.exists("/data/services"):
return PlatformInfo(is_new_platform=True, base_path="/data/services")
return PlatformInfo(is_new_platform=False, base_path="/var/www")
def detect_systems(ssh: SSHClientWrapper, platform: PlatformInfo) -> SystemInfo:
# Detect containers/services: ujava, upython, upython_voice
# We try typical locations under both platforms.
paths_to_check = [
"/data/services", # new
"/data/services/api", # new
"/var/www", # legacy
"/var/www/java", # legacy
"/var/www/python", # legacy
]
def any_glob_exists(name: str) -> bool:
# Check common directories where container markers may appear
for base in paths_to_check:
# Use 'ls -d' with glob and test success
out, _, code = ssh.exec(f"bash -lc \"ls -d {base}/**/{name}* 2>/dev/null | head -n 1\"")
if code == 0 and out.strip():
return True
# Also check docker/container listings for names if applicable
out, _, _ = ssh.exec("bash -lc \"docker ps --format '{{.Names}}' 2>/dev/null || true\"")
if name in out:
return True
return False
has_ujava = any_glob_exists("ujava")
has_upython = any_glob_exists("upython")
has_upython_voice = any_glob_exists("upython_voice")
systems = []
if has_ujava:
systems.append("meeting") # 会议预定系统
if has_upython:
systems.append("ops") # 运维集控系统
if has_upython_voice:
systems.append("transcription") # 转录系统
return SystemInfo(
has_ujava=has_ujava,
has_upython=has_upython,
has_upython_voice=has_upython_voice,
systems=systems,
)
def setup_logger() -> logging.Logger:
logger = logging.getLogger("自动化服务监测")
if logger.handlers:
return logger
logger.setLevel(logging.INFO)
fmt = logging.Formatter("[%(asctime)s] %(levelname)s %(message)s")
# 控制台中文日志
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(fmt)
logger.addHandler(ch)
# 滚动文件日志,10MB,保留5个
fh = RotatingFileHandler("AutomatedServiceMonitoring.log", maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8")
fh.setLevel(logging.INFO)
fh.setFormatter(fmt)
logger.addHandler(fh)
return logger
# ==== MySQL 容器检测与容器内查询辅助函数 ====
def find_mysql_container(ssh: 'SSHClientWrapper') -> Optional[str]:
"""
检测 MySQL/MariaDB 容器名称,返回首个匹配的容器名。
匹配规则:容器名或镜像名包含 mysql/mariadb 关键字(不区分大小写)。
"""
cmd = "bash -lc \"docker ps --format '{{.Names}} {{.Image}}' 2>/dev/null\""
out, _, code = ssh.exec(cmd)
if code != 0 or not out.strip():
return None
cand = None
for line in out.splitlines():
parts = line.strip().split(None, 1)
if not parts:
continue
name = parts[0]
image = parts[1] if len(parts) > 1 else ""
low = f"{name} {image}".lower()
if any(k in low for k in ["mysql", "mariadb"]):
cand = name
break
return cand
def get_mysql_threads_connected_via_container(ssh: 'SSHClientWrapper', container: str) -> Optional[int]:
"""
在容器内执行查询,优先用 mysql -ss 输出,解析更简单;失败回退 mysqladmin status。
使用全局固定账号密码。
"""
auth = f" -u{MYSQL_USER} -p'{MYSQL_PASSWORD}'"
# 使用 -ss 去掉格式,配合 cut 提取第二列(值)
cmd1 = (
f"bash -lc \"docker exec -i {container} mysql -ss{auth} -e 'SHOW STATUS LIKE \\\'Threads_connected\\\'' 2>/dev/null | tail -n1 | cut -f2\""
)
out1, _, code1 = ssh.exec(cmd1)
if code1 == 0 and out1.strip():
try:
return int(out1.strip())
except Exception:
pass
cmd2 = f"bash -lc \"docker exec -i {container} mysqladmin{auth} status 2>/dev/null\""
out2, _, code2 = ssh.exec(cmd2)
if code2 == 0 and out2.strip():
m = re.search(r"Threads:\s*(\d+)", out2)
if m:
try:
return int(m.group(1))
except Exception:
pass
return None
# 在每台服务器监控循环中调用 monitor_logs_once(已分离暴涨与ERROR)
def monitor_host(cfg: ServerConfig, interval_seconds: int, logger: logging.Logger):
ssh = SSHClientWrapper(cfg)
last_result: Optional[dict] = None
while True:
try:
if ssh.client is None:
logger.info(f"[连接] 正在连接服务器 {cfg.host}")
ssh.connect()
try:
_, _, code = ssh.exec("true")
if code != 0:
raise RuntimeError("SSH存活检查失败")
except Exception:
logger.info(f"[重连] 服务器 {cfg.host} 连接异常,尝试重连")
ssh.connect()
platform = detect_platform(ssh)
systems = detect_systems(ssh, platform)
current = {
"host": cfg.host,
"platform": "新统一平台(/data/services)" if platform.is_new_platform else "传统平台(/var/www)",
"base_path": platform.base_path,
"systems": systems.systems,
"has_ujava": systems.has_ujava,
"has_upython": systems.has_upython,
"has_upython_voice": systems.has_upython_voice,
}
if last_result != current:
logger.info(f"[平台识别] {cfg.host} => {current['platform']}")
logger.info(f"[系统识别] {cfg.host} => ujava={systems.has_ujava}, upython={systems.has_upython}, upython_voice={systems.has_upython_voice}, 映射={systems.systems or ['无']}")
last_result = current
logger.info(f"[心跳] {cfg.host}: 平台={ '新' if platform.is_new_platform else '旧' } 基路径={platform.base_path} 系统={','.join(systems.systems) if systems.systems else '无'}")
# 4.2 日志审计(暴涨 + ERROR)
try:
monitor_logs_once(ssh, platform, systems, logger, host=cfg.host)
except Exception as e:
logger.error(f"[日志审计错误] {cfg.host} => {e}")
# 4.3 内存资源消耗
try:
monitor_mem_once(ssh, cfg.host, logger)
except Exception as e:
logger.error(f"[内存监测错误] {cfg.host} => {e}")
# 4.4 MySQL连接数
try:
monitor_mysql_once(ssh, cfg.host, logger, window_seconds=300, min_burst_conn=200, rate_threshold_per_sec=1.0)
except Exception as e:
logger.error(f"[MySQL监测错误] {cfg.host} => {e}")
except paramiko.AuthenticationException as e:
logger.error(f"[认证错误] {cfg.host} => {e},稍后重试")
time.sleep(interval_seconds)
except Exception as e:
logger.error(f"[错误] {cfg.host} => {e},将尝试重连")
ssh.close()
time.sleep(interval_seconds)
time.sleep(interval_seconds)
def resolve_log_targets(platform: PlatformInfo, systems: List[str]) -> List[Tuple[str, str]]:
"""
返回需要审计的日志文件列表 [(系统名, 日志绝对路径), ...]
当前根据 PRD 仅覆盖会议预定系统的 2.0 / 3.0 后端日志。
"""
targets: List[Tuple[str, str]] = []
if "meeting" in systems:
if platform.is_new_platform:
targets.append(("meeting-2.0", f"{platform.base_path}/api/java-meeting/java-meeting2.0/logs/ubains-INFO-AND-ERROR.log"))
targets.append(("meeting-3.0", f"{platform.base_path}/api/java-meeting/java-meeting3.0/logs/ubains-INFO-AND-ERROR.log"))
else:
targets.append(("meeting-2.0", f"/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log"))
return targets
def fetch_error_context(
ssh: SSHClientWrapper,
log_path: str,
max_context: int = 50,
max_sections: int = 3
) -> Tuple[str, int, Tuple[str, str]]:
"""
抓取日志最近若干个 ERROR 的上下文(每段上下 max_context 行),并返回:
- ctx 文本(可能较多)
- 最近 5000 行中的 ERROR 数量
- 发生时间段 (start_ts, end_ts),依据上下文中第一条与最后一条的时间戳估计
"""
out, _, code = ssh.exec(f"bash -lc \"test -f '{log_path}'\"")
if code != 0:
return "", 0, ("", "")
# 抽取最近 5000 行
tail_out, _, _ = ssh.exec(f"bash -lc \"tail -n 5000 '{log_path}'\"")
if not tail_out.strip():
return "", 0, ("", "")
# 统计 ERROR 数量
cnt_out, _, _ = ssh.exec(f"bash -lc \"tail -n 5000 '{log_path}' | grep -c 'ERROR' || true\"")
try:
count = int(cnt_out.strip())
except Exception:
count = 0
# 用 nl 标注行号后抓取最近 max_sections 段 ERROR 的上下文
cmd_ctx = (
f"bash -lc \"tail -n 5000 '{log_path}' | nl -ba | "
f"awk '{{if($0 ~ /ERROR/) print $1}}' | tail -n {max_sections} | "
"while read ln; do "
f"start=ln-{max_context}; end=ln+{max_context}; "
"if [ $start -lt 1 ]; then start=1; fi; "
f"sed -n \\\"${{start}},${{end}}p\\\" <(tail -n 5000 '{log_path}') ; "
"echo \\\"\\n---- 上下文分隔线 (行号: $ln) ----\\n\\\"; "
"done\""
)
ctx, _, _ = ssh.exec(cmd_ctx)
ctx = ctx.strip()
# 在本地用正则解析上下文中的时间戳,避免远端 echo 转义问题
# 适配形如:YYYY-MM-DD HH:MM:SS
ts_pattern = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})")
ts_matches = ts_pattern.findall(ctx)
start_ts = ts_matches[0] if ts_matches else ""
end_ts = ts_matches[-1] if ts_matches else ""
return ctx, count, (start_ts, end_ts)
def monitor_log_errors_once(
ssh: SSHClientWrapper,
sys_name: str,
log_path: str,
logger: logging.Logger,
max_context: int = 50
) -> None:
"""
监测 ERROR 并记录上下 50 行的发生时间段:
- 与“日志暴涨”无关,仅关注 ERROR。
"""
ctx, err_count, (start_ts, end_ts) = fetch_error_context(ssh, log_path, max_context=max_context, max_sections=3)
if err_count == 0:
logger.info(f"[ERROR审计] 系统={sys_name} 日志={log_path} 最近段落未发现 ERROR")
return
# 记录发生时间段和上下文
if start_ts or end_ts:
logger.warning(f"[ERROR出现] 系统={sys_name} 日志={log_path} 最近5000行 ERROR数量={err_count} 发生时间段=[{start_ts} ~ {end_ts}]")
else:
logger.warning(f"[ERROR出现] 系统={sys_name} 日志={log_path} 最近5000行 ERROR数量={err_count}")
if ctx:
logger.info(f"[ERROR上下文] 系统={sys_name} 日志={log_path}\n{ctx}")
# 用于日志暴涨增量统计(上一轮总行数与采样时间戳)
BURST_STATE: dict[Tuple[str, str], Tuple[int, float]] = {}
# key = (host, log_path) => (last_total_lines, last_timestamp_epoch)
# 改进:日志暴涨检测采用“增量行数窗口法”,不依赖解析日志时间戳
def monitor_log_burst_once(
ssh: SSHClientWrapper,
host: str,
sys_name: str,
log_path: str,
logger: logging.Logger,
window_seconds: int = 300,
min_lines_threshold: int = 1000,
rate_threshold_per_sec: float = 5.0 # 每秒行数阈值,例如 5 行/秒
) -> None:
# 确认文件存在
_, _, code = ssh.exec(f"bash -lc \"test -f '{log_path}'\"")
if code != 0:
logger.info(f"[日志暴涨审计] 系统={sys_name} 日志不存在:{log_path}")
return
out, _, _ = ssh.exec(f"bash -lc \"wc -l < '{log_path}' || echo 0\"")
try:
total_lines = int(out.strip())
except Exception:
logger.info(f"[日志暴涨审计] 系统={sys_name} 无法获取总行数")
return
now_out, _, _ = ssh.exec("bash -lc \"date +%s\"")
try:
now_ts = float(now_out.strip())
except Exception:
now_ts = time.time()
key = (host, log_path)
last = BURST_STATE.get(key)
if not last:
BURST_STATE[key] = (total_lines, now_ts)
logger.info(f"[日志暴涨审计] 系统={sys_name} 初始化窗口,总行数={total_lines}")
return
last_total, last_ts = last
elapsed = max(1.0, now_ts - last_ts)
delta_lines = max(0, total_lines - last_total)
# 更新状态,供下一轮使用
BURST_STATE[key] = (total_lines, now_ts)
if elapsed < window_seconds:
logger.info(f"[日志暴涨审计] 系统={sys_name} 累积中:窗口{elapsed:.0f}s 未达 {window_seconds}s,新增行数={delta_lines}")
return
rate = delta_lines / elapsed
time_info = f"窗口={elapsed:.0f}s 新增行数={delta_lines} 速率={rate:.2f}行/秒"
# 记录发生时间段(上一窗口到当前)
start_ts_out, _, _ = ssh.exec("bash -lc \"date -d @"+str(int(last_ts))+" +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
end_ts_out, _, _ = ssh.exec("bash -lc \"date -d @"+str(int(now_ts))+" +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
if delta_lines >= min_lines_threshold or rate >= rate_threshold_per_sec:
logger.warning(f"[日志打印暴涨] 系统={sys_name} 日志={log_path} {time_info},时间段=[{start_ts_out.strip()} ~ {end_ts_out.strip()}]")
else:
logger.info(f"[日志暴涨审计] 系统={sys_name} 日志={log_path} {time_info},未发现暴涨")
# 更新:聚合监测时传入 host 用于区分 BURST_STATE
def monitor_logs_once(ssh: SSHClientWrapper, platform: PlatformInfo, systems: SystemInfo, logger: logging.Logger, host: str = "") -> None:
"""
聚合一次日志监测调用:分别进行“日志暴涨”检测与“ERROR上下文”检测。
"""
targets = resolve_log_targets(platform, systems.systems)
if not targets:
logger.info("[日志审计] 未匹配到需要审计的日志目标")
return
for sys_name, log_path in targets:
monitor_log_burst_once(
ssh=ssh,
host=host,
sys_name=sys_name,
log_path=log_path,
logger=logger,
window_seconds=300,
min_lines_threshold=1000,
rate_threshold_per_sec=5.0,
)
monitor_log_errors_once(
ssh=ssh,
sys_name=sys_name,
log_path=log_path,
logger=logger,
max_context=50,
)
# ==== 4.3 内存资源消耗状态(按主机维度) ====
MEM_STATE: dict[str, dict] = {}
# { host: { 'samples': int, 'sum_used_mb': float, 'peak_used_mb': float, 'peak_ts': float } }
def monitor_mem_once(ssh: SSHClientWrapper, host: str, logger: logging.Logger) -> None:
"""
使用 /proc/meminfo 采集内存使用(Python侧解析,避免 shell 管道差异):
- 读取 MemTotal 与 MemAvailable(单位 kB),换算为 MB
- 记录样本数、平均、峰值与峰值时间
"""
out, _, code = ssh.exec("bash -lc \"cat /proc/meminfo\"")
if code != 0 or not out:
logger.error(f"[内存监测] {host} => 获取 /proc/meminfo 失败")
return
total_kb = None
avail_kb = None
for line in out.splitlines():
if line.startswith("MemTotal:"):
# e.g. "MemTotal: 15199764 kB"
try:
total_kb = float(line.split(":")[1].strip().split()[0])
except Exception:
pass
elif line.startswith("MemAvailable:"):
try:
avail_kb = float(line.split(":")[1].strip().split()[0])
except Exception:
pass
if total_kb is None or avail_kb is None:
logger.error(f"[内存监测] {host} => 解析失败: {out.strip().splitlines()[0] if out.strip() else 'N/A'}")
return
total_mb = total_kb / 1024.0
used_mb = max(0.0, total_mb - (avail_kb / 1024.0))
st = MEM_STATE.setdefault(host, {'samples': 0, 'sum_used_mb': 0.0, 'peak_used_mb': 0.0, 'peak_ts': 0.0})
st['samples'] += 1
st['sum_used_mb'] += used_mb
if used_mb > st['peak_used_mb']:
st['peak_used_mb'] = used_mb
st['peak_ts'] = time.time()
avg_used = st['sum_used_mb'] / max(1, st['samples'])
peak_ts_human = ""
if st['peak_ts'] > 0:
ts_out, _, _ = ssh.exec(f"bash -lc \"date -d @{int(st['peak_ts'])} +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
peak_ts_human = ts_out.strip()
logger.info(f"[内存监测] {host} 当前使用={used_mb:.0f}MB 总={total_mb:.0f}MB 平均={avg_used:.0f}MB 峰值={st['peak_used_mb']:.0f}MB@{peak_ts_human or 'N/A'}")
# ==== 4.4 MySQL连接数状态(按主机维度) ====
MYSQL_STATE: dict[str, dict] = {}
# { host: { 'samples': int, 'sum_conn': int, 'peak_conn': int, 'peak_ts': float, 'last_total': int, 'last_ts': float } }
def monitor_mysql_once(
ssh: 'SSHClientWrapper',
host: str,
logger: logging.Logger,
window_seconds: int = 300,
min_burst_conn: int = 200,
rate_threshold_per_sec: float = 1.0
) -> None:
"""
采集 MySQL 连接数:
- 优先:检测 MySQL 容器名,进入容器内用 mysql/mysqladmin 查询 Threads_connected。
- 回退:在宿主机直接执行 mysqladmin 或 mysql(同样使用固定账号密码)。
- 记录当前、平均、峰值与峰值时间,并按窗口增量法判断是否“连接暴涨”。
"""
conn: Optional[int] = None
container = find_mysql_container(ssh)
if container:
conn = get_mysql_threads_connected_via_container(ssh, container)
if conn is None:
logger.error(f"[MySQL监测] {host} => 在容器 {container} 内获取连接数失败")
else:
logger.info(f"[MySQL监测] {host} => 未检测到 MySQL/MariaDB 容器,回退宿主机命令")
if conn is None:
auth = f" -u{MYSQL_USER} -p'{MYSQL_PASSWORD}'"
cmd_status = f"bash -lc \"mysqladmin{auth} status 2>/dev/null\""
out, _, code = ssh.exec(cmd_status)
if code == 0 and out.strip():
m = re.search(r"Threads:\s*(\d+)", out)
if m:
try:
conn = int(m.group(1))
except Exception:
conn = None
if conn is None:
auth = f" -u{MYSQL_USER} -p'{MYSQL_PASSWORD}'"
cmd_threads = (
f"bash -lc \"mysql -ss{auth} -e 'SHOW STATUS LIKE \\\'Threads_connected\\\'' 2>/dev/null | tail -n1 | cut -f2\""
)
out2, _, code2 = ssh.exec(cmd_threads)
if code2 == 0 and out2.strip():
try:
conn = int(out2.strip())
except Exception:
conn = None
if conn is None:
logger.error(f"[MySQL监测] {host} => 获取连接数失败(请确认 Docker/MySQL 客户端与权限)")
return
now_out, _, _ = ssh.exec("bash -lc \"date +%s\"")
try:
now_ts = float(now_out.strip())
except Exception:
now_ts = time.time()
st = MYSQL_STATE.setdefault(host, {'samples': 0, 'sum_conn': 0, 'peak_conn': 0, 'peak_ts': 0.0, 'last_total': None, 'last_ts': None})
st['samples'] += 1
st['sum_conn'] += conn
if conn > st['peak_conn']:
st['peak_conn'] = conn
st['peak_ts'] = now_ts
avg_conn = int(st['sum_conn'] / max(1, st['samples']))
peak_ts_human = ""
if st['peak_ts'] > 0:
ts_out, _, _ = ssh.exec(f"bash -lc \"date -d @{int(st['peak_ts'])} +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
peak_ts_human = ts_out.strip()
logger.info(f"[MySQL监测] {host} 当前连接数={conn} 平均={avg_conn} 峰值={st['peak_conn']}@{peak_ts_human or 'N/A'}")
# 暴涨判定(窗口增量法)
if st['last_total'] is None or st['last_ts'] is None:
st['last_total'] = conn
st['last_ts'] = now_ts
return
elapsed = max(1.0, now_ts - st['last_ts'])
delta_conn = max(0, conn - st['last_total'])
st['last_total'] = conn
st['last_ts'] = now_ts
if elapsed >= window_seconds:
rate = delta_conn / elapsed
start_ts_out, _, _ = ssh.exec(f"bash -lc \"date -d @{int(now_ts - elapsed)} +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
end_ts_out, _, _ = ssh.exec(f"bash -lc \"date -d @{int(now_ts)} +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
if delta_conn >= min_burst_conn or rate >= rate_threshold_per_sec:
logger.warning(f"[MySQL连接暴涨] {host} 窗口={elapsed:.0f}s 增量={delta_conn} 速率={rate:.2f}/s 时间段=[{start_ts_out.strip()} ~ {end_ts_out.strip()}]")
else:
logger.info(f"[MySQL暴涨审计] {host} 窗口={elapsed:.0f}s 增量={delta_conn} 速率={rate:.2f}/s 未发现暴涨")
# 全局MySQL账号配置(PRD)
MYSQL_USER = "root"
MYSQL_PASSWORD = "dNrprU&2S"
def main():
logger = setup_logger()
servers = [
ServerConfig(host="192.168.5.48", username="root", password="Ubains@1234"),
ServerConfig(host="192.168.5.44", username="root", password="Ubains@123"), # 修正为与PRD一致
]
interval_seconds = 60
threads: List[threading.Thread] = []
for cfg in servers:
t = threading.Thread(target=monitor_host, args=(cfg, interval_seconds, logger), daemon=True)
t.start()
threads.append(t)
logger.info(f"[启动监测线程] {cfg.host}")
try:
while True:
time.sleep(3600)
except KeyboardInterrupt:
logger.info("[退出] 收到中断信号,程序将退出")
if __name__ == "__main__":
main()
\ No newline at end of file
# 服务自检需求说明文档
## 📋 概述
本脚本主要用于在自动化运行期间的服务器监测,用于检测在自动化运行期间的服务器时间的服务信息。
### 背景
目前针对系统服务进行自检,需要区分两种平台环境:
- **新统一平台**:使用 `/data/` 目录结构
- **传统平台**:使用 `/var/www/` 目录结构
## 🎯 功能实现总览
### 服务监测 (`AutomatedServiceMonitoring.py`)
脚本路径:E:\GithubData\自动化\ubains-module-test\辅助工具\脚本工具\自动化服务监测\AutomatedServiceMonitoring.py
#### 检测需求
##### 1、SSH连接(✅ 已实现):
支持预设服务器列表
1、192.168.5.48 root Ubains@1234
2、192.168.5.44 root Ubains@1234
##### 2、平台识别(✅ 已实现):
自动检测目标服务器平台类型(检测 /data/services 目录,如果没有则是传统平台)
##### 3、系统识别(✅ 已实现):
自动检测目标服务器的系统类型(检测容器分为三种:ujava、upython、upython_voice,如果有ujava则有会议预定系统、python对应运维集控系统、upython_voice对应转录系统)
##### 4、服务监测(待实现):
4.1、多连接:(✅ 已实现)
可支持连接多台服务器
4.2、日志审计:✅ 已实现)
- 根据平台类型持续审计日志信息在某个时间段是否存在暴涨的情况:
每次采样获取日志总行数 total_lines 和采样时间 now_ts。
与上次采样 last_total、last_ts 比较,得到 elapsed 和 delta_lines。
只有当 elapsed ≥ window_seconds(默认 300 秒)时进行判定:
若 delta_lines ≥ min_lines_threshold(默认 1000 行),判定为暴涨。
或 delta_lines/elapsed ≥ rate_threshold_per_sec(默认 5 行/秒),判定为暴涨。
记录暴涨时间段为 [上次采样时间 ~ 本次采样时间],分别用服务器 date -d @epoch 转成人类可读时间。
- 记录所有ERROR日志信息上下50行的时间段
新平台:
预定系统:
1、2.0对内后端日志:
路径:/data/services/api/java-meeting/java-meeting2.0
日志打印:tail -f logs/ubains-INFO-AND-ERROR.log
2、3.0对内后端日志:
路径:/data/services/api/java-meeting/java-meeting3.0
日志打印:tail -f logs/ubains-INFO-AND-ERROR.log
传统平台:
预定系统:
1、2.0对内后端日志:
路径:/var/www/java/api-java-meeting2.0
日志打印:tail -f logs/ubains-INFO-AND-ERROR.log
4.3、内存资源消耗:
根据平台类型持续监测服务器内存占用情况,记录峰值与峰值时的时间点,以及记录平均值。
4.4、mysql连接数:
根据平台类型持续监测mysql连接数在某个时间段是否存在暴涨,或是异常一直没断开的连接数。
先检查mysql容器名称,然后通过进入mysql容器内部进行查询
4.5、emqx连接数:
根据平台类型持续监测EMQX连接数量峰值、平均值,以及是否存在暴涨情况,或是判断一直没断开的异常连接。
##### 5、监测日志审计:
需要丰富日志体系,日志需要用中文打印
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论