""" 服务日志监测系统 功能:通过SSH实时监控远程服务器日志,发现错误时收集上下文并保存到文件,通过钉钉发送通知 特点: 1. 多服务器多日志文件同时监控 2. 错误日志去重和冷却机制 3. 自动保存错误上下文到JSON文件 4. 每日自动清理告警缓存 """ import re import paramiko import threading import time import logging import sys import os import json import socket from datetime import datetime, timedelta # ==================== 初始化配置 ==================== # 配置日志输出到控制台,只显示INFO级别以上的信息 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) logging.getLogger().addHandler(console_handler) logging.getLogger().setLevel(logging.INFO) # 获取当前脚本所在目录 current_dir = os.path.dirname(os.path.abspath(__file__)) logging.info(f"当前脚本目录: {current_dir}") # 构建Base目录路径并添加到系统路径 base_dir = os.path.normpath(os.path.join(current_dir, "..", "Base")) sys.path.append(base_dir) logging.info(f"Base目录已添加到系统路径: {base_dir}") # ==================== 模块导入 ==================== try: from base import dingding_send_message logging.info("✅ 成功导入 base 模块") except ImportError as e: logging.error(f"❌ 导入失败: {e}") logging.info("sys.path 包含路径:") for p in sys.path: logging.info(f" - {p}") sys.exit(1) class LogMonitor: """ 日志监控类,负责单个日志文件的监控处理 """ def __init__(self, host, username, private_key_path, passphrase, log_path, check_interval=1, ding_type="标准版服务监测", resend_interval=10800): """ 初始化监控实例 :param host: 服务器IP :param username: SSH用户名 :param private_key_path: SSH私钥路径 :param passphrase: SSH密钥密码 :param log_path: 要监控的日志路径 :param check_interval: 检查间隔(秒) :param ding_type: 钉钉消息类型标识 :param resend_interval: 重复告警冷却时间(秒) """ self.host = host self.username = username self.private_key_path = private_key_path self.passphrase = passphrase self.log_path = log_path self.check_interval = check_interval self.ding_type = ding_type # SSH连接相关 self.client = None self.channel = None self.collecting = False # 线程安全锁 self.lock = threading.Lock() # 日志行缓冲区 self.line_buffer = [] self.buffer_size = 500 # 错误上下文收集 self.error_contexts = [] self.sent_errors = {} # 格式: {error_hash: last_send_timestamp} self.resend_interval = resend_interval # 启动每日清理任务 self.schedule_daily_clear() def connect(self): """建立SSH连接并开始tail -f日志""" try: # 加载私钥 private_key = paramiko.RSAKey.from_private_key_file( self.private_key_path, password=self.passphrase ) # 创建SSH客户端 self.client = paramiko.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接参数设置 connect_params = { 'hostname': self.host, 'username': self.username, 'pkey': private_key, 'timeout': 30, 'banner_timeout': 200, 'auth_timeout': 200 } self.client.connect(**connect_params) # 创建交互式shell self.channel = self.client.invoke_shell() self.channel.setblocking(0) # 非阻塞模式 self.channel.transport.set_keepalive(30) # 保持连接 # 发送tail命令 self.channel.send(f"tail -f {self.log_path}\n") logging.info(f"已连接到 {self.host},开始监控 {self.log_path}") return True except Exception as e: logging.error(f"连接失败: {e},目标主机: {self.host}") return False def start_monitoring(self): """开始监控日志""" if self.collecting: logging.warning("监控已在进行中") return if not self.connect(): return self.collecting = True self.monitor_thread = threading.Thread(target=self._monitor_loop) self.monitor_thread.daemon = True # 设置为守护线程 self.monitor_thread.start() logging.info(f"监控线程已启动: {self.log_path}") def stop_monitoring(self): """停止监控""" self.collecting = False if self.channel: self.channel.close() if self.client: self.client.close() logging.info(f"已停止监控: {self.log_path}") def _monitor_loop(self): """ 监控主循环 持续读取日志数据并处理 """ retry_count = 0 MAX_RETRY = 5 while self.collecting: try: if self.channel.recv_ready(): # 读取日志数据 data = self.channel.recv(1024).decode('utf-8', errors='ignore') # 处理每行日志 for line in data.splitlines(): self._process_line(line.strip()) retry_count = 0 # 重置重试计数 else: time.sleep(self.check_interval) except (paramiko.SSHException, socket.error, OSError) as e: logging.warning(f"SSH连接异常: {e},尝试重连...") self.restart_monitoring() retry_count += 1 if retry_count > MAX_RETRY: logging.error("达到最大重试次数,停止监控") self.stop_monitoring() return time.sleep(min(5 * retry_count, 60)) # 退避算法 def save_error_contexts_to_json(self): """ 保存错误上下文到JSON文件 返回: 文件路径或None """ try: # 获取error_log目录路径 current_dir = os.path.dirname(os.path.abspath(__file__)) # 当前是 /系统监测/系统日志监测/ base_dir = os.path.dirname(current_dir) # 上溯一级到 /系统监测/ error_log_dir = os.path.join(base_dir, "error_log") # 最终路径 /系统监测/error_log/ os.makedirs(error_log_dir, exist_ok=True) # 生成带时间戳的文件名 timestamp = time.strftime("%Y-%m-%d-%H%M%S") filename = f"error_log_{timestamp}_{self.host.replace('.', '_')}.json" file_path = os.path.join(error_log_dir, filename) # 写入JSON文件 with open(file_path, 'w', encoding='utf-8') as f: json.dump(self.error_contexts, f, ensure_ascii=False, indent=4) logging.info(f"错误日志已保存: {file_path}") return file_path except Exception as e: logging.error(f"保存错误日志失败: {e}") return None def generate_error_log_url(self, file_path): """生成错误日志的公网访问URL""" if not file_path: return None filename = os.path.basename(file_path) return f"http://nat.ubainsyun.com:32233/{filename}" def _process_line(self, line): """ 处理单行日志 1. 维护行缓冲区 2. 解析日志行 3. 检查是否ERROR日志 4. 特殊过滤(JWT验证类错误直接忽略) 5. 生成去重key并检查冷却期 6. 获取上下文并保存到文件 7. 发送钉钉通知(非冷却期内) """ with self.lock: self.line_buffer.append(line) if len(self.line_buffer) > self.buffer_size: self.line_buffer.pop(0) try: # 解析日志行 parsed = self.parse_log_line(line) if not parsed or parsed['level'] != 'ERROR': return # ----- 新增过滤规则 ----- if parsed['module'] == 'JwtTokenUtiljwt工具类' and parsed['action'] == 'JWT格式验证': logging.info(f"忽略JWT验证类错误: {parsed['module']}/{parsed['action']}") return # ---------------------- # 生成去重key(过滤动态内容) clean_message = re.sub( r'\d{1,3}\\.\d{1,3}\\.\d{1,3}\\.\d{1,3}', '[IP]', parsed['message'] ) clean_message = re.sub(r'\d+', '[NUM]', clean_message) key = f"{parsed['module']}|{parsed['action']}|{clean_message}" error_hash = hash(key) current_time = time.time() # 检查冷却期 if error_hash in self.sent_errors: time_diff = current_time - self.sent_errors[error_hash] if time_diff < self.resend_interval: logging.info(f"冷却期内相同错误 [{parsed['module']}/{parsed['action']}]") return logging.info(f"发现新ERROR日志 [{parsed['module']}/{parsed['action']}]") # 获取日志上下文(只对新错误执行) full_log = self.get_remote_log_with_paramiko( host=self.host, username=self.username, private_key_path=self.private_key_path, passphrase=self.passphrase, log_path=self.log_path, num_lines=500 ) if not full_log: logging.error("获取日志上下文失败") return # 定位错误行并截取上下文 lines = full_log.split('\n') for i, l in enumerate(lines): if line.strip() in l.strip(): context = lines[max(0, i - 100):min(len(lines), i + 101)] with self.lock: self.error_contexts.append({ 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'host': self.host, 'log_path': self.log_path, 'error_line': line, 'context': context, 'fingerprint': key, 'structured': parsed }) # 保存到文件 file_path = self.save_error_contexts_to_json() if not file_path: return # 发送钉钉通知 error_log_url = self.generate_error_log_url(file_path) if error_log_url: try: dingding_send_message(error_log_url, ding_type=self.ding_type) self.sent_errors[error_hash] = current_time logging.info(f"钉钉通知发送成功 [{self.ding_type}]") except Exception as e: logging.error(f"钉钉发送失败: {str(e)}") break except Exception as e: logging.error(f"处理日志行异常: {str(e)}", exc_info=True) def restart_monitoring(self): """重启监控""" logging.info("尝试重启监控...") self.stop_monitoring() time.sleep(5) self.start_monitoring() @staticmethod def get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path, num_lines=1000, timeout=30): """ 通过SSH获取远程日志内容 返回: 日志内容或None """ try: private_key = paramiko.RSAKey.from_private_key_file( private_key_path, password=passphrase ) client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(host, username=username, pkey=private_key, timeout=timeout) command = f"tail -n {num_lines} {log_path}" stdin, stdout, stderr = client.exec_command(command, timeout=timeout) if stderr.read().decode('utf-8'): raise Exception("远程命令执行失败") return stdout.read().decode('utf-8') except Exception as e: logging.error(f"获取远程日志失败: {e}") return None finally: if 'client' in locals(): client.close() def schedule_daily_clear(self): """定时每天凌晨清空已发送记录""" now = datetime.now() next_run = (now + timedelta(days=1)).replace( hour=0, minute=0, second=0, microsecond=0 ) delay = (next_run - now).total_seconds() timer = threading.Timer(delay, self.daily_clear_sent_errors) timer.daemon = True timer.start() def daily_clear_sent_errors(self): """清空已发送记录缓存""" with self.lock: self.sent_errors.clear() logging.info("✅ 已发送错误记录已清空") self.schedule_daily_clear() # 设置下一天任务 def parse_log_line(self, line): """ 解析日志行 返回: { 'level': 日志级别, 'module': 模块名, 'action': 操作名, 'message': 日志消息, 'raw': 原始日志 } 或 None """ # 匹配时间戳 timestamp_match = re.match(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+)', line) # 匹配日志级别 level_match = re.search(r'\s(ERROR|INFO|WARNING)\b', line) # 匹配中括号内容 bracket_content = re.findall(r'\[(.*?)\]|\【(.*?)\】', line) bracket_content = [x[0] or x[1] for x in bracket_content if x[0] or x[1]] if not timestamp_match or not level_match or len(bracket_content) < 3: return None def remove_ip(text): """过滤IP地址""" return re.sub(r'\d{1,3}\\.\d{1,3}\\.\d{1,3}\\.\d{1,3}', '[IP]', text) return { 'level': level_match.group(1).strip(), 'module': remove_ip(bracket_content[0].strip()), 'action': remove_ip(bracket_content[1].strip()), 'message': remove_ip(bracket_content[2].strip()), 'raw': line.strip() } if __name__ == "__main__": """ 主程序入口 配置服务器列表并启动监控 """ # 服务器配置列表 SERVERS = [ { "host": "192.168.5.235", "username": "root", "private_key_path": "C:/Users/Administrator/.ssh/id_rsa", "passphrase": "Ubains@123", "logs": [ { "log_path": "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log", "ding_type": "235标准预定对内服务" }, { "log_path": "/var/www/java/external-meeting-api/logs/ubains-INFO-AND-ERROR.log", "ding_type": "235标准预定对外服务" } ] }, { "host": "192.168.5.200", "username": "root", "private_key_path": "C:/Users/Administrator/.ssh/id_rsa", "passphrase": "Ubains@123", "logs": [ { "log_path": "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log", "ding_type": "展厅预定对内服务" }, { "log_path": "/var/www/java/external-meeting-api/logs/ubains-INFO-AND-ERROR.log", "ding_type": "展厅预定对外服务" }, { "log_path": "/var/www/html/log/uinfo.log", "ding_type": "展厅运维服务" }, { "log_path": "/var/www/java/unifiedPlatform/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log", "ding_type": "展厅统一平台对内服务" }, { "log_path": "/var/www/java/unifiedPlatform/external-meeting-api/logs/ubains-INFO-AND-ERROR.log", "ding_type": "展厅统一平台对外服务" }, { "log_path": "/var/www/java/unifiedPlatform/api-dubbo-smc-three/log.out", "ding_type": "展厅统一平台SMC服务" }, { "log_path": "/var/www/java/unifiedPlatform/api-dubbo-tencent-meeting/log.out", "ding_type": "展厅统一平台腾讯服务" }, { "log_path": "/var/www/java/unifiedPlatform/api-dubbo-tencent-meeting/log.out", "ding_type": "展厅统一平台腾讯服务" }, { "log_path": "/var/www/java/unifiedPlatform/auth-sso-gatway/log.out", "ding_type": "展厅统一平台gatway服务" }, { "log_path": "/var/www/java/unifiedPlatform/auth-sso-auth/log.out", "ding_type": "展厅统一平台auth服务" }, { "log_path": "/var/www/java/unifiedPlatform/auth-sso-system/log.out", "ding_type": "展厅统一平台system服务" } ] }, { "host": "139.9.60.86", "username": "root", "private_key_path": "C:/Users/Administrator/.ssh/id_rsa", "passphrase": "Ubains@123", "logs": [ { "log_path": "/var/www/html/log/uinfo.log", "ding_type": "对外云端运维集控服务" }, { "log_path": "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log", "ding_type": "对外云端预定对内服务" }, { "log_path": "/var/www/java/external-meeting-api/logs/ubains-INFO-AND-ERROR.log", "ding_type": "对外云端预定对外服务" } ] } ] monitors = [] threads = [] logging.info("========== 开始启动监控服务 ==========") # 为每个服务器的每个日志创建监控实例 for server in SERVERS: for log_config in server["logs"]: monitor = LogMonitor( host=server["host"], username=server["username"], private_key_path=server["private_key_path"], passphrase=server["passphrase"], log_path=log_config["log_path"], ding_type=log_config["ding_type"] ) monitors.append(monitor) thread = threading.Thread(target=monitor.start_monitoring) thread.daemon = True threads.append(thread) thread.start() logging.info(f"已启动监控: {server['host']} - {log_config['log_path']}") # 保持主线程运行 try: while True: time.sleep(1) except KeyboardInterrupt: logging.info("接收到中断信号,停止所有监控...") for monitor in monitors: monitor.stop_monitoring() logging.info("所有监控已停止")