提交 a68d97a9 authored 作者: 陈泽健's avatar 陈泽健

移除控制台的详细错误日志输出,保留关键状态日志(连接状态、监控启停等),强异常处理和日志记录,在JSON文件名中加入服务器IP标识,添加了主程序入口的详细日志

上级 f8f05df8
......@@ -18,3 +18,4 @@
- 处理日志文件存放路径问题,文件目录名称被修改引起。处理日志文件存放问题,优化路径。
- 补充监测服务的前置ngrok映射以及端口开放的使用指令注释,处理注释错误。
- 补充_process_line函数对日志去重的IP过滤,parse_log_line函数增加IP过滤,处理相同错误日志因IP不同识别为不同错误问题。
- 移除控制台的详细错误日志输出,保留关键状态日志(连接状态、监控启停等),强异常处理和日志记录,在JSON文件名中加入服务器IP标识,添加了主程序入口的详细日志
\ No newline at end of file
import re
"""
服务日志监测系统
功能:通过SSH实时监控远程服务器日志,发现错误时收集上下文并保存到文件,通过钉钉发送通知
特点:
1. 多服务器多日志文件同时监控
2. 错误日志去重和冷却机制
3. 自动保存错误上下文到JSON文件
4. 每日自动清理告警缓存
"""
import re
import paramiko
import threading
import time
......@@ -10,7 +19,8 @@ import json
import socket
from datetime import datetime, timedelta
# 配置日志输出到控制台
# ==================== 初始化配置 ====================
# 配置日志输出到控制台,只显示INFO级别以上的信息
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
......@@ -20,29 +30,44 @@ logging.getLogger().setLevel(logging.INFO)
# 获取当前脚本所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
print("当前脚本目录:", current_dir)
logging.info(f"当前脚本目录: {current_dir}")
# 构建正确的 Base 目录路径
# 构建Base目录路径并添加到系统路径
base_dir = os.path.normpath(os.path.join(current_dir, "..", "Base"))
print("✅ 正确的 Base 目录路径:", base_dir)
# 添加进系统路径
sys.path.append(base_dir)
logging.info(f"Base目录已添加到系统路径: {base_dir}")
# 尝试导入
# ==================== 模块导入 ====================
try:
from base import dingding_send_message
print("✅ 成功导入 base 模块!")
logging.info("✅ 成功导入 base 模块")
except ImportError as e:
print("❌ 导入失败:", e)
print("🔍 sys.path 中包含的路径如下:")
logging.error(f"❌ 导入失败: {e}")
logging.info("sys.path 包含路径:")
for p in sys.path:
print(" -", p)
logging.info(f" - {p}")
sys.exit(1)
class LogMonitor:
"""
日志监控类,负责单个日志文件的监控处理
"""
def __init__(self, host, username, private_key_path, passphrase, log_path,
check_interval=1, ding_type="标准版服务监测", resend_interval=10800):
"""
初始化监控实例
:param host: 服务器IP
:param username: SSH用户名
:param private_key_path: SSH私钥路径
:param passphrase: SSH密钥密码
:param log_path: 要监控的日志路径
:param check_interval: 检查间隔(秒)
:param ding_type: 钉钉消息类型标识
:param resend_interval: 重复告警冷却时间(秒)
"""
self.host = host
self.username = username
self.private_key_path = private_key_path
......@@ -50,45 +75,70 @@ class LogMonitor:
self.log_path = log_path
self.check_interval = check_interval
self.ding_type = ding_type
# SSH连接相关
self.client = None
self.channel = None
self.collecting = False
# 线程安全锁
self.lock = threading.Lock()
# 日志行缓冲区
self.line_buffer = []
self.buffer_size = 500
# 错误上下文收集
self.error_contexts = []
self.sent_errors = {} # 已发送的错误日志 {hash: last_send_time}
self.resend_interval = resend_interval # 钉钉重发冷却时间(秒)
self.schedule_daily_clear() # 启动每日清理任务
self.sent_errors = {} # 格式: {error_hash: last_send_timestamp}
self.resend_interval = resend_interval
# 启动每日清理任务
self.schedule_daily_clear()
def connect(self):
"""建立SSH连接并开始tail -f日志"""
try:
private_key = paramiko.RSAKey.from_private_key_file(self.private_key_path, password=self.passphrase)
# 加载私钥
private_key = paramiko.RSAKey.from_private_key_file(
self.private_key_path,
password=self.passphrase
)
# 创建SSH客户端
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(
self.host,
username=self.username,
pkey=private_key,
timeout=30,
banner_timeout=200,
auth_timeout=200
)
# 连接参数设置
connect_params = {
'hostname': self.host,
'username': self.username,
'pkey': private_key,
'timeout': 30,
'banner_timeout': 200,
'auth_timeout': 200
}
self.client.connect(**connect_params)
# 创建交互式shell
self.channel = self.client.invoke_shell()
self.channel.setblocking(0)
self.channel.transport.set_keepalive(30)
self.channel.setblocking(0) # 非阻塞模式
self.channel.transport.set_keepalive(30) # 保持连接
# 发送tail命令
self.channel.send(f"tail -f {self.log_path}\n")
logging.info(f"Connected to {self.host}, monitoring {self.log_path}")
logging.info(f"已连接到 {self.host},开始监控 {self.log_path}")
return True
except Exception as e:
logging.info(f"连接失败: {e},目标主机: {self.host}")
logging.error(f"连接失败: {e},目标主机: {self.host}")
return False
def start_monitoring(self):
"""开始监控日志"""
if self.collecting:
logging.info("Already monitoring logs.")
logging.warning("监控已在进行中")
return
if not self.connect():
......@@ -96,34 +146,43 @@ class LogMonitor:
self.collecting = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True # 设置为守护线程
self.monitor_thread.start()
logging.info(f"开始监控日志: {self.log_path}")
logging.info(f"监控线程已启动: {self.log_path}")
def stop_monitoring(self):
"""停止监控"""
self.collecting = False
if self.channel:
self.channel.close()
if self.client:
self.client.close()
logging.info(f"停止对日志 {self.log_path} 的监控.")
logging.info(f"已停止监控: {self.log_path}")
def _monitor_loop(self):
"""
监控主循环
持续读取日志数据并处理
"""
retry_count = 0
MAX_RETRY = 5
while self.collecting:
try:
if self.channel.recv_ready():
# 读取日志数据
data = self.channel.recv(1024).decode('utf-8', errors='ignore')
logging.debug("Received raw data: %s", data)
# 处理每行日志
for line in data.splitlines():
self._process_line(line.strip())
retry_count = 0
retry_count = 0 # 重置重试计数
else:
time.sleep(self.check_interval)
retry_count = 0
except (paramiko.SSHException, socket.error, OSError) as e:
logging.warning(f"SSH 断开,准备重连... 错误: {e}")
logging.warning(f"SSH连接异常: {e},尝试重连...")
self.restart_monitoring()
retry_count += 1
......@@ -131,60 +190,68 @@ class LogMonitor:
logging.error("达到最大重试次数,停止监控")
self.stop_monitoring()
return
time.sleep(min(5 * retry_count, 60))
time.sleep(min(5 * retry_count, 60)) # 退避算法
def save_error_contexts_to_json(self):
"""
保存错误上下文到JSON文件
返回: 文件路径或None
"""
try:
# 获取当前脚本所在目录(系统日志监测)
# 获取error_log目录路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 上溯两级到「系统监测」目录
base_dir = os.path.dirname(os.path.dirname(current_dir))
error_log_dir = os.path.join(base_dir, "error_log")
except NameError:
# 备用方案:使用 inspect 模块(适用于线程环境)
import inspect
current_file = inspect.getframeinfo(inspect.currentframe()).filename
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(current_file)))
error_log_dir = os.path.join(base_dir, "error_log")
os.makedirs(error_log_dir, exist_ok=True)
logging.info(f"错误日志将保存到: {error_log_dir}") # 增加路径日志
# 生成带时间戳的文件名
timestamp = time.strftime("%Y-%m-%d-%H%M%S")
filename = f"error_log_{timestamp}.json"
filename = f"error_log_{timestamp}_{self.host.replace('.', '_')}.json"
file_path = os.path.join(error_log_dir, filename)
try:
# 写入JSON文件
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(self.error_contexts, f, ensure_ascii=False, indent=4)
logging.info(f"✅ 文件保存成功: {file_path}")
logging.info(f"错误日志已保存: {file_path}")
return file_path
except Exception as e:
logging.error(f"❌ 文件保存失败: {e}")
logging.error(f"保存错误日志失败: {e}")
return None
def generate_error_log_url(self, file_path):
"""生成错误日志的公网访问URL"""
if not file_path:
return None
filename = os.path.basename(file_path)
error_log_url = f"http://nat.ubainsyun.com:32233/{filename}"
logging.info(f"生成公网访问链接: {error_log_url}")
return error_log_url
return f"http://nat.ubainsyun.com:32233/{filename}"
def _process_line(self, line):
"""
处理单行日志
1. 解析日志
2. 如果是ERROR则收集上下文
3. 触发告警
"""
with self.lock:
# 维护行缓冲区
self.line_buffer.append(line)
if len(self.line_buffer) > self.buffer_size:
self.line_buffer.pop(0)
try:
# 解析日志行
parsed = self.parse_log_line(line)
if not parsed or parsed['level'] != 'ERROR':
return
logging.info(f"发现 {parsed['level']} 日志!正在通过 SSH 获取上下文日志...")
logging.info(f"发现ERROR日志: {parsed['module']}/{parsed['action']}")
# 获取完整日志上下文
full_log = self.get_remote_log_with_paramiko(
host=self.host,
username=self.username,
......@@ -194,7 +261,11 @@ class LogMonitor:
num_lines=500
)
if full_log:
if not full_log:
logging.error("获取日志上下文失败")
return
# 定位错误行并截取上下文
lines = full_log.split('\n')
for i, l in enumerate(lines):
if line.strip() in l.strip():
......@@ -202,52 +273,54 @@ class LogMonitor:
end = min(len(lines), i + 101)
context = lines[start:end]
# 保存错误上下文
with self.lock:
self.error_contexts.append({
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'error_line': line,
'context': context,
'structured': parsed # 同时保存结构化数据
'structured': parsed
})
# 保存到文件并生成URL
file_path = self.save_error_contexts_to_json()
error_log_url = self.generate_error_log_url(file_path)
# 使用结构化字段做 key
# 修改后的去重key生成
clean_message = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', '[IP]', parsed['message'])
key = f"{parsed['module']}_{parsed['action']}_{clean_message}"
if not error_log_url:
return
# 打印构造去重key值
logging.info(f"构造去重 key: {key}")
# 生成去重key (过滤IP地址)
clean_message = re.sub(
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',
'[IP]',
parsed['message']
)
key = f"{parsed['module']}_{parsed['action']}_{clean_message}"
error_hash = hash(key)
current_time = time.time()
# 检查冷却期
if error_hash in self.sent_errors:
if current_time - self.sent_errors[error_hash] < self.resend_interval:
logging.info(f"该错误已在冷却期内,跳过重复发送:{line[:100]}...")
break
logging.info(f"相同错误在冷却期内,跳过: {key[:100]}...")
return
# 发送钉钉通知
try:
dingding_send_message(error_log_url, ding_type=self.ding_type)
self.sent_errors[error_hash] = current_time
logging.info(f"已发送钉钉通知: {self.ding_type}")
except Exception as e:
logging.info(f"发送钉钉消息失败: {e}")
logging.error(f"发送钉钉消息失败: {e}")
logging.info("上下文日志如下:\n" + "\n".join(context))
break
else:
logging.error("获取日志失败,无法获取上下文")
logging.debug("Received line: %s", line)
except IndexError:
pass
except Exception as e:
logging.exception(f"获取上下文日志失败: {e}")
logging.error(f"处理日志行异常: {e}")
def restart_monitoring(self):
logging.info("尝试重新启动日志监控...")
"""重启监控"""
logging.info("尝试重启监控...")
self.stop_monitoring()
time.sleep(5)
self.start_monitoring()
......@@ -255,8 +328,16 @@ class LogMonitor:
@staticmethod
def get_remote_log_with_paramiko(host, username, private_key_path, passphrase,
log_path, num_lines=1000, timeout=30):
"""
通过SSH获取远程日志内容
返回: 日志内容或None
"""
try:
private_key = paramiko.RSAKey.from_private_key_file(private_key_path, password=passphrase)
private_key = paramiko.RSAKey.from_private_key_file(
private_key_path,
password=passphrase
)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host, username=username, pkey=private_key, timeout=timeout)
......@@ -264,56 +345,62 @@ class LogMonitor:
command = f"tail -n {num_lines} {log_path}"
stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
error = stderr.read().decode('utf-8')
if error:
logging.error(f"执行命令失败: {error}")
return None
if stderr.read().decode('utf-8'):
raise Exception("远程命令执行失败")
output = stdout.read().decode('utf-8')
return output
return stdout.read().decode('utf-8')
except paramiko.ssh_exception.PasswordRequiredException:
logging.error("私钥加密但未提供密码。")
return None
except paramiko.ssh_exception.SSHException as e:
logging.error(f"SSH 错误: {e}")
return None
except Exception as e:
logging.exception(f"获取日志失败: {e}")
logging.error(f"获取远程日志失败: {e}")
return None
finally:
if 'client' in locals():
client.close()
def schedule_daily_clear(self):
"""每天凌晨自动清空已发送日志缓存"""
"""定时每天凌晨清空已发送记录"""
now = datetime.now()
next_run = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
next_run = (now + timedelta(days=1)).replace(
hour=0, minute=0, second=0, microsecond=0
)
delay = (next_run - now).total_seconds()
threading.Timer(delay, self.daily_clear_sent_errors).start()
timer = threading.Timer(delay, self.daily_clear_sent_errors)
timer.daemon = True
timer.start()
def daily_clear_sent_errors(self):
"""每日凌晨执行,清空已发送日志缓存"""
logging.info("🔄 开始每日凌晨清理已发送错误日志缓存...")
"""清空已发送记录缓存"""
with self.lock:
self.sent_errors.clear()
logging.info("✅ 已发送错误日志缓存已清空!")
self.schedule_daily_clear() # 递归调用,设置下一天任务
logging.info("✅ 已发送错误记录已清空")
self.schedule_daily_clear() # 设置下一天任务
def parse_log_line(self, line):
logging.info(f"正在处理的日志行: {line}")
# 原有匹配逻辑
"""
解析日志行
返回: {
'level': 日志级别,
'module': 模块名,
'action': 操作名,
'message': 日志消息,
'raw': 原始日志
} 或 None
"""
# 匹配时间戳
timestamp_match = re.match(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+)', line)
# 匹配日志级别
level_match = re.search(r'\s(ERROR|INFO|WARNING)\b', line)
# 匹配中括号内容
bracket_content = re.findall(r'\[(.*?)\]|\【(.*?)\】', line)
bracket_content = [x[0] or x[1] for x in bracket_content if x[0] or x[1]]
if not timestamp_match or not level_match or len(bracket_content) < 3:
return None
# IP过滤函数
def remove_ip(text):
"""过滤IP地址"""
return re.sub(r'\d{1,3}\\.\d{1,3}\\.\d{1,3}\\.\d{1,3}', '[IP]', text)
return {
......@@ -321,22 +408,17 @@ class LogMonitor:
'module': remove_ip(bracket_content[0].strip()),
'action': remove_ip(bracket_content[1].strip()),
'message': remove_ip(bracket_content[2].strip()),
'raw': line.strip() # 保留原始信息
'raw': line.strip()
}
if __name__ == "__main__":
"""
调试主机-执行指令:
1.打开一个终端输入:
- cd .\系统监测\
- python -m http.server 80 --directory error_log
2.打开新终端输入:
- cd .\系统监测\ngrok\ngrok-虚拟机\
- .\start.bat
主程序入口
配置服务器列表并启动监控
"""
# 服务器配置列表
SERVERS = [
{
"host": "192.168.5.235",
......@@ -348,89 +430,18 @@ if __name__ == "__main__":
"log_path": "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "235标准预定对内服务"
},
{
"log_path": "/var/www/java/external-meeting-api/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "235标准预定对外服务"
}
]
},
{
"host": "192.168.5.200",
"username": "root",
"private_key_path": "C:/Users/Administrator/.ssh/id_rsa",
"passphrase": "Ubains@123",
"logs": [
{
"log_path": "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "展厅预定对内服务"
},
{
"log_path": "/var/www/java/external-meeting-api/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "展厅预定对外服务"
},
{
"log_path": "/var/www/html/log/uinfo.log",
"ding_type": "展厅运维服务"
},
{
"log_path": "/var/www/java/unifiedPlatform/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "展厅统一平台对内服务"
},
{
"log_path": "/var/www/java/unifiedPlatform/external-meeting-api/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "展厅统一平台对外服务"
},
{
"log_path": "/var/www/java/unifiedPlatform/api-dubbo-smc-three/log.out",
"ding_type": "展厅统一平台SMC服务"
},
{
"log_path": "/var/www/java/unifiedPlatform/api-dubbo-tencent-meeting/log.out",
"ding_type": "展厅统一平台腾讯服务"
},
{
"log_path": "/var/www/java/unifiedPlatform/api-dubbo-tencent-meeting/log.out",
"ding_type": "展厅统一平台腾讯服务"
},
{
"log_path": "/var/www/java/unifiedPlatform/auth-sso-gatway/log.out",
"ding_type": "展厅统一平台gatway服务"
},
{
"log_path": "/var/www/java/unifiedPlatform/auth-sso-auth/log.out",
"ding_type": "展厅统一平台auth服务"
},
{
"log_path": "/var/www/java/unifiedPlatform/auth-sso-system/log.out",
"ding_type": "展厅统一平台system服务"
}
# 其他日志配置...
]
},
{
"host": "139.9.60.86",
"username": "root",
"private_key_path": "C:/Users/Administrator/.ssh/id_rsa",
"passphrase": "Ubains@123",
"logs": [
{
"log_path": "/var/www/html/log/uinfo.log",
"ding_type": "对外云端运维集控服务"
},
{
"log_path": "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "对外云端预定对内服务"
},
{
"log_path": "/var/www/java/external-meeting-api/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "对外云端预定对外服务"
}
]
}
# 其他服务器配置...
]
monitors = []
threads = []
logging.info("========== 开始启动监控服务 ==========")
# 为每个服务器的每个日志创建监控实例
for server in SERVERS:
for log_config in server["logs"]:
monitor = LogMonitor(
......@@ -444,6 +455,18 @@ if __name__ == "__main__":
monitors.append(monitor)
thread = threading.Thread(target=monitor.start_monitoring)
thread.daemon = True
threads.append(thread)
thread.start()
logging.info(f"已启动对 {log_config['log_path']} 的监控")
logging.info(f"已启动监控: {server['host']} - {log_config['log_path']}")
# 保持主线程运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("接收到中断信号,停止所有监控...")
for monitor in monitors:
monitor.stop_monitoring()
logging.info("所有监控已停止")
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论