提交 a048a53a authored 作者: 陈泽健's avatar 陈泽健

chore(script): 移除自动化服务监测和问题处理文档脚本

- 删除了 AutomatedServiceMonitoring.py 自动化服务监测脚本
- 移除了 CommonProblemHandling/PRD_问题处理文档.md 常见问题处理说明文档
- 清理相关的服务器监控和版本更新功能代码
- 移除了 MySQL 连接数监测和日志暴涨检测逻辑
- 删除了系统权限修复和配置文件处理功能
- 移除了钉钉机器人消息发送集成代码
上级 69c44e4b
import base64
import hashlib
import hmac
import urllib
import paramiko
import socket
import re
import time
from dataclasses import dataclass
from typing import List, Optional, Tuple
import logging
from logging.handlers import RotatingFileHandler
import threading
import requests
from rich import json
@dataclass
class ServerConfig:
host: str
username: str
password: str
port: int = 22
@dataclass
class PlatformInfo:
is_new_platform: bool # True => /data/services exists; False => legacy /var/www
base_path: str # '/data/services' or '/var/www'
@dataclass
class SystemInfo:
has_ujava: bool
has_upython: bool
has_upython_voice: bool
systems: List[str] # e.g., ['meeting', 'ops', 'transcription']
class SSHClientWrapper:
def __init__(self, config: ServerConfig, timeout: int = 10):
self.config = config
self.timeout = timeout
self.client: Optional[paramiko.SSHClient] = None
def connect(self) -> None:
# Close any previous client before reconnecting
self.close()
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(
hostname=self.config.host,
port=self.config.port,
username=self.config.username,
password=self.config.password,
timeout=self.timeout,
allow_agent=False,
look_for_keys=False,
)
# Enable TCP keepalive on the underlying socket and Paramiko keepalive
transport = self.client.get_transport()
if transport:
transport.set_keepalive(30) # SSH-level keepalive every 30s
# Try to set OS-level TCP keepalive
sock = transport.sock
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Optional platform-specific tuning (Linux)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30)
# sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
except Exception:
pass
def exec(self, command: str) -> Tuple[str, str, int]:
if not self.client:
raise RuntimeError("SSH client not connected.")
stdin, stdout, stderr = self.client.exec_command(command, timeout=self.timeout)
out = stdout.read().decode(errors="ignore")
err = stderr.read().decode(errors="ignore")
code = stdout.channel.recv_exit_status()
return out, err, code
def exists(self, path: str) -> bool:
# Use test -d for directories; test -e for general existence if needed
_, _, code = self.exec(f"test -d '{path}'")
return code == 0
def close(self) -> None:
try:
if self.client:
self.client.close()
finally:
self.client = None
def detect_platform(ssh: SSHClientWrapper) -> PlatformInfo:
# New platform if /data/services exists; else legacy
if ssh.exists("/data/services"):
return PlatformInfo(is_new_platform=True, base_path="/data/services")
return PlatformInfo(is_new_platform=False, base_path="/var/www")
def detect_systems(ssh: SSHClientWrapper, platform: PlatformInfo) -> SystemInfo:
# Detect containers/services: ujava, upython, upython_voice
# We try typical locations under both platforms.
paths_to_check = [
"/data/services", # new
"/data/services/api", # new
"/var/www", # legacy
"/var/www/java", # legacy
"/var/www/python", # legacy
]
def any_glob_exists(name: str) -> bool:
# Check common directories where container markers may appear
for base in paths_to_check:
# Use 'ls -d' with glob and test success
out, _, code = ssh.exec(f"bash -lc \"ls -d {base}/**/{name}* 2>/dev/null | head -n 1\"")
if code == 0 and out.strip():
return True
# Also check docker/container listings for names if applicable
out, _, _ = ssh.exec("bash -lc \"docker ps --format '{{.Names}}' 2>/dev/null || true\"")
if name in out:
return True
return False
has_ujava = any_glob_exists("ujava")
has_upython = any_glob_exists("upython")
has_upython_voice = any_glob_exists("upython_voice")
systems = []
if has_ujava:
systems.append("meeting") # 会议预定系统
if has_upython:
systems.append("ops") # 运维集控系统
if has_upython_voice:
systems.append("transcription") # 转录系统
return SystemInfo(
has_ujava=has_ujava,
has_upython=has_upython,
has_upython_voice=has_upython_voice,
systems=systems,
)
def setup_logger() -> logging.Logger:
logger = logging.getLogger("自动化服务监测")
if logger.handlers:
return logger
logger.setLevel(logging.INFO)
fmt = logging.Formatter("[%(asctime)s] %(levelname)s %(message)s")
# 控制台中文日志
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(fmt)
logger.addHandler(ch)
# 滚动文件日志,10MB,保留5个
fh = RotatingFileHandler("AutomatedServiceMonitoring.log", maxBytes=10 * 1024 * 1024, backupCount=5, encoding="utf-8")
fh.setLevel(logging.INFO)
fh.setFormatter(fmt)
logger.addHandler(fh)
return logger
# ==== MySQL 容器检测与容器内查询辅助函数 ====
def find_mysql_container(ssh: 'SSHClientWrapper') -> Optional[str]:
"""
检测 MySQL/MariaDB 容器名称,返回首个匹配的容器名。
匹配规则:容器名或镜像名包含 mysql/mariadb 关键字(不区分大小写)。
"""
cmd = "bash -lc \"docker ps --format '{{.Names}} {{.Image}}' 2>/dev/null\""
out, _, code = ssh.exec(cmd)
if code != 0 or not out.strip():
return None
cand = None
for line in out.splitlines():
parts = line.strip().split(None, 1)
if not parts:
continue
name = parts[0]
image = parts[1] if len(parts) > 1 else ""
low = f"{name} {image}".lower()
if any(k in low for k in ["mysql", "mariadb"]):
cand = name
break
return cand
def get_mysql_threads_connected_via_container(ssh: 'SSHClientWrapper', container: str) -> Optional[int]:
"""
在容器内执行查询,优先用 mysql -ss 输出,解析更简单;失败回退 mysqladmin status。
使用全局固定账号密码。
"""
auth = f" -u{MYSQL_USER} -p'{MYSQL_PASSWORD}'"
# 使用 -ss 去掉格式,配合 cut 提取第二列(值)
cmd1 = (
f"bash -lc \"docker exec -i {container} mysql -ss{auth} -e 'SHOW STATUS LIKE \\\'Threads_connected\\\'' 2>/dev/null | tail -n1 | cut -f2\""
)
out1, _, code1 = ssh.exec(cmd1)
if code1 == 0 and out1.strip():
try:
return int(out1.strip())
except Exception:
pass
cmd2 = f"bash -lc \"docker exec -i {container} mysqladmin{auth} status 2>/dev/null\""
out2, _, code2 = ssh.exec(cmd2)
if code2 == 0 and out2.strip():
m = re.search(r"Threads:\s*(\d+)", out2)
if m:
try:
return int(m.group(1))
except Exception:
pass
return None
# 在每台服务器监控循环中调用 monitor_logs_once(已分离暴涨与ERROR)
def monitor_host(cfg: ServerConfig, interval_seconds: int, logger: logging.Logger):
ssh = SSHClientWrapper(cfg)
last_result: Optional[dict] = None
while True:
try:
if ssh.client is None:
logger.info(f"[连接] 正在连接服务器 {cfg.host}")
ssh.connect()
try:
_, _, code = ssh.exec("true")
if code != 0:
raise RuntimeError("SSH存活检查失败")
except Exception:
logger.info(f"[重连] 服务器 {cfg.host} 连接异常,尝试重连")
ssh.connect()
platform = detect_platform(ssh)
systems = detect_systems(ssh, platform)
current = {
"host": cfg.host,
"platform": "新统一平台(/data/services)" if platform.is_new_platform else "传统平台(/var/www)",
"base_path": platform.base_path,
"systems": systems.systems,
"has_ujava": systems.has_ujava,
"has_upython": systems.has_upython,
"has_upython_voice": systems.has_upython_voice,
}
if last_result != current:
logger.info(f"[平台识别] {cfg.host} => {current['platform']}")
logger.info(f"[系统识别] {cfg.host} => ujava={systems.has_ujava}, upython={systems.has_upython}, upython_voice={systems.has_upython_voice}, 映射={systems.systems or ['无']}")
last_result = current
logger.info(f"[心跳] {cfg.host}: 平台={ '新' if platform.is_new_platform else '旧' } 基路径={platform.base_path} 系统={','.join(systems.systems) if systems.systems else '无'}")
# 4.2 日志审计(暴涨 + ERROR)
try:
monitor_logs_once(ssh, platform, systems, logger, host=cfg.host)
except Exception as e:
logger.error(f"[日志审计错误] {cfg.host} => {e}")
# 4.3 内存资源消耗
try:
monitor_mem_once(ssh, cfg.host, logger)
except Exception as e:
logger.error(f"[内存监测错误] {cfg.host} => {e}")
# 4.4 MySQL连接数
try:
monitor_mysql_once(ssh, cfg.host, logger, window_seconds=300, min_burst_conn=200, rate_threshold_per_sec=1.0)
except Exception as e:
logger.error(f"[MySQL监测错误] {cfg.host} => {e}")
except paramiko.AuthenticationException as e:
logger.error(f"[认证错误] {cfg.host} => {e},稍后重试")
time.sleep(interval_seconds)
except Exception as e:
logger.error(f"[错误] {cfg.host} => {e},将尝试重连")
ssh.close()
time.sleep(interval_seconds)
time.sleep(interval_seconds)
def resolve_log_targets(platform: PlatformInfo, systems: List[str]) -> List[Tuple[str, str]]:
"""
返回需要审计的日志文件列表 [(系统名, 日志绝对路径), ...]
当前根据 PRD 仅覆盖会议预定系统的 2.0 / 3.0 后端日志。
"""
targets: List[Tuple[str, str]] = []
if "meeting" in systems:
if platform.is_new_platform:
targets.append(("meeting-2.0", f"{platform.base_path}/api/java-meeting/java-meeting2.0/logs/ubains-INFO-AND-ERROR.log"))
targets.append(("meeting-3.0", f"{platform.base_path}/api/java-meeting/java-meeting3.0/logs/ubains-INFO-AND-ERROR.log"))
else:
targets.append(("meeting-2.0", f"/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log"))
return targets
def fetch_error_context(
ssh: SSHClientWrapper,
log_path: str,
max_context: int = 50,
max_sections: int = 3
) -> Tuple[str, int, Tuple[str, str]]:
"""
抓取日志最近若干个 ERROR 的上下文(每段上下 max_context 行),并返回:
- ctx 文本(可能较多)
- 最近 5000 行中的 ERROR 数量
- 发生时间段 (start_ts, end_ts),依据上下文中第一条与最后一条的时间戳估计
"""
out, _, code = ssh.exec(f"bash -lc \"test -f '{log_path}'\"")
if code != 0:
return "", 0, ("", "")
# 抽取最近 5000 行
tail_out, _, _ = ssh.exec(f"bash -lc \"tail -n 5000 '{log_path}'\"")
if not tail_out.strip():
return "", 0, ("", "")
# 统计 ERROR 数量
cnt_out, _, _ = ssh.exec(f"bash -lc \"tail -n 5000 '{log_path}' | grep -c 'ERROR' || true\"")
try:
count = int(cnt_out.strip())
except Exception:
count = 0
# 用 nl 标注行号后抓取最近 max_sections 段 ERROR 的上下文
cmd_ctx = (
f"bash -lc \"tail -n 5000 '{log_path}' | nl -ba | "
f"awk '{{if($0 ~ /ERROR/) print $1}}' | tail -n {max_sections} | "
"while read ln; do "
f"start=ln-{max_context}; end=ln+{max_context}; "
"if [ $start -lt 1 ]; then start=1; fi; "
f"sed -n \\\"${{start}},${{end}}p\\\" <(tail -n 5000 '{log_path}') ; "
"echo \\\"\\n---- 上下文分隔线 (行号: $ln) ----\\n\\\"; "
"done\""
)
ctx, _, _ = ssh.exec(cmd_ctx)
ctx = ctx.strip()
# 在本地用正则解析上下文中的时间戳,避免远端 echo 转义问题
# 适配形如:YYYY-MM-DD HH:MM:SS
ts_pattern = re.compile(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})")
ts_matches = ts_pattern.findall(ctx)
start_ts = ts_matches[0] if ts_matches else ""
end_ts = ts_matches[-1] if ts_matches else ""
return ctx, count, (start_ts, end_ts)
def monitor_log_errors_once(
ssh: SSHClientWrapper,
sys_name: str,
log_path: str,
logger: logging.Logger,
max_context: int = 50,
host: str = ""
) -> None:
"""
监测 ERROR 并记录上下 50 行的发生时间段:
- 与“日志暴涨”无关,仅关注 ERROR。
"""
ctx, err_count, (start_ts, end_ts) = fetch_error_context(ssh, log_path, max_context=max_context, max_sections=3)
if err_count == 0:
logger.info(f"[ERROR审计] 主机={host} 系统={sys_name} 日志={log_path} 最近段落未发现 ERROR")
return
# 记录发生时间段和上下文
if start_ts or end_ts:
logger.warning(f"[ERROR出现] 主机={host} 系统={sys_name} 日志={log_path} 最近5000行 ERROR数量={err_count} 发生时间段=[{start_ts} ~ {end_ts}]")
else:
logger.warning(f"[ERROR出现] 主机={host} 系统={sys_name} 日志={log_path} 最近5000行 ERROR数量={err_count}")
if ctx:
logger.info(f"[ERROR上下文] 主机={host} 系统={sys_name} 日志={log_path}\n{ctx}")
# 用于日志暴涨增量统计(上一轮总行数与采样时间戳)
BURST_STATE: dict[Tuple[str, str], Tuple[int, float]] = {}
# key = (host, log_path) => (last_total_lines, last_timestamp_epoch)
# 改进:日志暴涨检测采用“增量行数窗口法”,不依赖解析日志时间戳
def monitor_log_burst_once(
ssh: SSHClientWrapper,
host: str,
sys_name: str,
log_path: str,
logger: logging.Logger,
window_seconds: int = 300,
min_lines_threshold: int = 1000,
rate_threshold_per_sec: float = 5.0 # 每秒行数阈值,例如 5 行/秒
) -> None:
# 确认文件存在
_, _, code = ssh.exec(f"bash -lc \"test -f '{log_path}'\"")
if code != 0:
logger.info(f"[日志暴涨审计] 主机={host} 系统={sys_name} 累积中:窗口{elapsed:.0f}s 未达 {window_seconds}s,新增行数={delta_lines}")
return
out, _, _ = ssh.exec(f"bash -lc \"wc -l < '{log_path}' || echo 0\"")
try:
total_lines = int(out.strip())
except Exception:
logger.info(f"[日志暴涨审计] 主机={host} 系统={sys_name} 无法获取总行数")
return
now_out, _, _ = ssh.exec("bash -lc \"date +%s\"")
try:
now_ts = float(now_out.strip())
except Exception:
now_ts = time.time()
key = (host, log_path)
last = BURST_STATE.get(key)
if not last:
BURST_STATE[key] = (total_lines, now_ts)
logger.info(f"[日志暴涨审计] 主机={host} 系统={sys_name} 初始化窗口,总行数={total_lines}")
return
last_total, last_ts = last
elapsed = max(1.0, now_ts - last_ts)
delta_lines = max(0, total_lines - last_total)
# 更新状态,供下一轮使用
BURST_STATE[key] = (total_lines, now_ts)
if elapsed < window_seconds:
logger.info(f"[日志暴涨审计] 主机={host} 系统={sys_name} 累积中:窗口{elapsed:.0f}s 未达 {window_seconds}s,新增行数={delta_lines}")
return
rate = delta_lines / elapsed
time_info = f"窗口={elapsed:.0f}s 新增行数={delta_lines} 速率={rate:.2f}行/秒"
# 记录发生时间段(上一窗口到当前)
start_ts_out, _, _ = ssh.exec("bash -lc \"date -d @"+str(int(last_ts))+" +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
end_ts_out, _, _ = ssh.exec("bash -lc \"date -d @"+str(int(now_ts))+" +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
if delta_lines >= min_lines_threshold or rate >= rate_threshold_per_sec:
logger.warning(f"[日志打印暴涨] 主机={host} 系统={sys_name} 日志={log_path} {time_info},时间段=[{start_ts_out.strip()} ~ {end_ts_out.strip()}]")
else:
logger.info(f"[日志暴涨审计] 主机={host} 系统={sys_name} 日志={log_path} {time_info},未发现暴涨")
# 更新:聚合监测时传入 host 用于区分 BURST_STATE
def monitor_logs_once(ssh: SSHClientWrapper, platform: PlatformInfo, systems: SystemInfo, logger: logging.Logger, host: str = "") -> None:
"""
聚合一次日志监测调用:分别进行“日志暴涨”检测与“ERROR上下文”检测。
"""
targets = resolve_log_targets(platform, systems.systems)
if not targets:
logger.info("[日志审计] 未匹配到需要审计的日志目标")
return
for sys_name, log_path in targets:
monitor_log_burst_once(
ssh=ssh, host=host, sys_name=sys_name, log_path=log_path, logger=logger,
window_seconds=300, min_lines_threshold=1000, rate_threshold_per_sec=5.0,
)
monitor_log_errors_once(
ssh=ssh, sys_name=sys_name, log_path=log_path, logger=logger, max_context=50, host=host,
)
# ==== 4.3 内存资源消耗状态(按主机维度) ====
MEM_STATE: dict[str, dict] = {}
# { host: { 'samples': int, 'sum_used_mb': float, 'peak_used_mb': float, 'peak_ts': float } }
def monitor_mem_once(ssh: SSHClientWrapper, host: str, logger: logging.Logger) -> None:
"""
使用 /proc/meminfo 采集内存使用(Python侧解析,避免 shell 管道差异):
- 读取 MemTotal 与 MemAvailable(单位 kB),换算为 MB
- 记录样本数、平均、峰值与峰值时间
"""
out, _, code = ssh.exec("bash -lc \"cat /proc/meminfo\"")
if code != 0 or not out:
logger.error(f"[内存监测] {host} => 获取 /proc/meminfo 失败")
return
total_kb = None
avail_kb = None
for line in out.splitlines():
if line.startswith("MemTotal:"):
# e.g. "MemTotal: 15199764 kB"
try:
total_kb = float(line.split(":")[1].strip().split()[0])
except Exception:
pass
elif line.startswith("MemAvailable:"):
try:
avail_kb = float(line.split(":")[1].strip().split()[0])
except Exception:
pass
if total_kb is None or avail_kb is None:
logger.error(f"[内存监测] {host} => 解析失败: {out.strip().splitlines()[0] if out.strip() else 'N/A'}")
return
total_mb = total_kb / 1024.0
used_mb = max(0.0, total_mb - (avail_kb / 1024.0))
st = MEM_STATE.setdefault(host, {'samples': 0, 'sum_used_mb': 0.0, 'peak_used_mb': 0.0, 'peak_ts': 0.0})
st['samples'] += 1
st['sum_used_mb'] += used_mb
if used_mb > st['peak_used_mb']:
st['peak_used_mb'] = used_mb
st['peak_ts'] = time.time()
avg_used = st['sum_used_mb'] / max(1, st['samples'])
peak_ts_human = ""
if st['peak_ts'] > 0:
ts_out, _, _ = ssh.exec(f"bash -lc \"date -d @{int(st['peak_ts'])} +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
peak_ts_human = ts_out.strip()
logger.info(f"[内存监测] {host} 当前使用={used_mb:.0f}MB 总={total_mb:.0f}MB 平均={avg_used:.0f}MB 峰值={st['peak_used_mb']:.0f}MB@{peak_ts_human or 'N/A'}")
# ==== 4.4 MySQL连接数状态(按主机维度) ====
MYSQL_STATE: dict[str, dict] = {}
# { host: { 'samples': int, 'sum_conn': int, 'peak_conn': int, 'peak_ts': float, 'last_total': int, 'last_ts': float } }
def monitor_mysql_once(
ssh: 'SSHClientWrapper',
host: str,
logger: logging.Logger,
window_seconds: int = 300,
min_burst_conn: int = 200,
rate_threshold_per_sec: float = 1.0
) -> None:
"""
采集 MySQL 连接数:
- 优先:检测 MySQL 容器名,进入容器内用 mysql/mysqladmin 查询 Threads_connected。
- 回退:在宿主机直接执行 mysqladmin 或 mysql(同样使用固定账号密码)。
- 记录当前、平均、峰值与峰值时间,并按窗口增量法判断是否“连接暴涨”。
"""
conn: Optional[int] = None
container = find_mysql_container(ssh)
if container:
conn = get_mysql_threads_connected_via_container(ssh, container)
if conn is None:
logger.error(f"[MySQL监测] {host} => 在容器 {container} 内获取连接数失败")
else:
logger.info(f"[MySQL监测] {host} => 未检测到 MySQL/MariaDB 容器,回退宿主机命令")
if conn is None:
auth = f" -u{MYSQL_USER} -p'{MYSQL_PASSWORD}'"
cmd_status = f"bash -lc \"mysqladmin{auth} status 2>/dev/null\""
out, _, code = ssh.exec(cmd_status)
if code == 0 and out.strip():
m = re.search(r"Threads:\s*(\d+)", out)
if m:
try:
conn = int(m.group(1))
except Exception:
conn = None
if conn is None:
auth = f" -u{MYSQL_USER} -p'{MYSQL_PASSWORD}'"
cmd_threads = (
f"bash -lc \"mysql -ss{auth} -e 'SHOW STATUS LIKE \\\'Threads_connected\\\'' 2>/dev/null | tail -n1 | cut -f2\""
)
out2, _, code2 = ssh.exec(cmd_threads)
if code2 == 0 and out2.strip():
try:
conn = int(out2.strip())
except Exception:
conn = None
if conn is None:
logger.error(f"[MySQL监测] {host} => 获取连接数失败(请确认 Docker/MySQL 客户端与权限)")
return
now_out, _, _ = ssh.exec("bash -lc \"date +%s\"")
try:
now_ts = float(now_out.strip())
except Exception:
now_ts = time.time()
st = MYSQL_STATE.setdefault(host, {'samples': 0, 'sum_conn': 0, 'peak_conn': 0, 'peak_ts': 0.0, 'last_total': None, 'last_ts': None})
st['samples'] += 1
st['sum_conn'] += conn
if conn > st['peak_conn']:
st['peak_conn'] = conn
st['peak_ts'] = now_ts
avg_conn = int(st['sum_conn'] / max(1, st['samples']))
peak_ts_human = ""
if st['peak_ts'] > 0:
ts_out, _, _ = ssh.exec(f"bash -lc \"date -d @{int(st['peak_ts'])} +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
peak_ts_human = ts_out.strip()
logger.info(f"[MySQL监测] {host} 当前连接数={conn} 平均={avg_conn} 峰值={st['peak_conn']}@{peak_ts_human or 'N/A'}")
# 暴涨判定(窗口增量法)
if st['last_total'] is None or st['last_ts'] is None:
st['last_total'] = conn
st['last_ts'] = now_ts
return
elapsed = max(1.0, now_ts - st['last_ts'])
delta_conn = max(0, conn - st['last_total'])
st['last_total'] = conn
st['last_ts'] = now_ts
if elapsed >= window_seconds:
rate = delta_conn / elapsed
start_ts_out, _, _ = ssh.exec(f"bash -lc \"date -d @{int(now_ts - elapsed)} +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
end_ts_out, _, _ = ssh.exec(f"bash -lc \"date -d @{int(now_ts)} +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
if delta_conn >= min_burst_conn or rate >= rate_threshold_per_sec:
logger.warning(f"[MySQL连接暴涨] {host} 窗口={elapsed:.0f}s 增量={delta_conn} 速率={rate:.2f}/s 时间段=[{start_ts_out.strip()} ~ {end_ts_out.strip()}]")
else:
logger.info(f"[MySQL暴涨审计] {host} 窗口={elapsed:.0f}s 增量={delta_conn} 速率={rate:.2f}/s 未发现暴涨")
def dingding_send_message(title, text):
"""
发送钉钉机器人消息
参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x
:param latest_report: 测试报告链接
:param title: 消息标题
:param mobile: 需要@的手机号列表
:param ding_type: 钉钉机器人类型,用于选择不同的 Webhook URL 和密钥
"""
# 记录调用此函数的日志
logging.info("开始构建并发送钉钉机器人消息")
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'
secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'
# 生成时间戳
timestamp = str(round(time.time() * 1000))
# 生成签名
secret_enc = secret.encode('utf-8')
string_to_sign = f'{timestamp}\n{secret}'
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
# 构建最终的 Webhook URL
params = {
'access_token': webhook_url.split('=')[1],
'timestamp': timestamp,
'sign': sign
}
encoded_params = urllib.parse.urlencode(params)
final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}'
# 记录最终的 Webhook URL
logging.info(f"钉钉机器人Webhook URL: {final_webhook_url}")
# 构建消息体
headers = {'Content-Type': 'application/json'}
message = {
'msgtype': 'markdown',
'markdown': {
'title': title,
'text': text,
}
}
try:
# 发送 POST 请求
response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers)
# 检查响应状态码
if response.status_code == 200:
logging.info('消息发送成功!')
logging.info(f'响应内容: {response.text}')
else:
logging.error(f'消息发送失败,状态码: {response.status_code}')
logging.error(f'响应内容: {response.text}')
except requests.exceptions.RequestException as e:
logging.error(f'请求异常: {e}')
# 全局MySQL账号配置(PRD)
MYSQL_USER = "root"
MYSQL_PASSWORD = "dNrprU&2S"
def main():
logger = setup_logger()
servers = [
ServerConfig(host="192.168.5.48", username="root", password="Ubains@1234"),
ServerConfig(host="192.168.5.44", username="root", password="Ubains@123"), # 修正为与PRD一致
]
interval_seconds = 60
threads: List[threading.Thread] = []
for cfg in servers:
t = threading.Thread(target=monitor_host, args=(cfg, interval_seconds, logger), daemon=True)
t.start()
threads.append(t)
logger.info(f"[启动监测线程] {cfg.host}")
try:
while True:
time.sleep(3600)
except KeyboardInterrupt:
logger.info("[退出] 收到中断信号,程序将退出")
if __name__ == "__main__":
main()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论