提交 800b9e00 authored 作者: 陈泽健's avatar 陈泽健

通过组合日志格式中的模块+操作+消息内容作为错误唯一标识转为固定的哈希值,记录每个错误最后发送时间 {hash:timestamp},定时任务每天00:00清空发送记录,避免历史错误永远被屏蔽。

上级 6c258a0a
......@@ -23,6 +23,8 @@ def dingding_send_message(error_log_url,ding_type):
# 钉钉机器人的 Webhook URL 和密钥(测试环境)
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'
secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'
# 这是错的
# secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd612211221'
# 日志类型:1.预定系统对内服务;2.预定系统对外服务
log_type = ''
......
......@@ -8,4 +8,6 @@
- 处理运行12小时后被远程主机主动断开连接问题,通过配置 SSH Client 的 keepalive 参数,让连接保持活跃,避免超时断开。
- 增加多台服务器的连接监测,补充对展厅的日志监测,调整error_log日志文件存放路径,补充对应的ngrok映射目录。
3. 2025-06-09:
- 优化日志监控的异常处理。处理路径拼接方式,处理服务出现错误日志没有发送的问题,调试重复发送问题。
\ No newline at end of file
- 优化日志监控的异常处理。处理路径拼接方式,处理服务出现错误日志没有发送的问题,调试重复发送问题。
4. 2025-06-10:
- 通过组合日志格式中的模块+操作+消息内容 作为错误唯一标识转为固定的哈希值,记录每个错误最后发送时间 {hash: timestamp},定时任务每天00:00清空发送记录,避免历史错误永远被屏蔽。
\ No newline at end of file
......@@ -3,5 +3,5 @@ trust_host_root_certs: false
tunnels:
nat1:
proto:
tcp: 127.0.0.1:80
tcp: 127.0.0.1:81
remote_port: 32233
import re
import paramiko
import threading
import time
......@@ -5,7 +7,8 @@ import logging
import sys
import os
import json
import socket # 用于捕获 socket.error
import socket
from datetime import datetime, timedelta
# 配置日志输出到控制台
console_handler = logging.StreamHandler()
......@@ -13,7 +16,7 @@ console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logging.getLogger().addHandler(console_handler)
logging.getLogger().setLevel(logging.INFO) # 设置全局日志级别
logging.getLogger().setLevel(logging.INFO)
# 获取当前脚本所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
......@@ -39,7 +42,7 @@ except ImportError as e:
class LogMonitor:
def __init__(self, host, username, private_key_path, passphrase, log_path,
check_interval=1, ding_type="标准版服务监测", resend_interval=300):
check_interval=1, ding_type="标准版服务监测", resend_interval=43200):
self.host = host
self.username = username
self.private_key_path = private_key_path
......@@ -56,6 +59,7 @@ class LogMonitor:
self.error_contexts = []
self.sent_errors = {} # 已发送的错误日志 {hash: last_send_time}
self.resend_interval = resend_interval # 钉钉重发冷却时间(秒)
self.schedule_daily_clear() # 启动每日清理任务
def connect(self):
try:
......@@ -172,57 +176,62 @@ class LogMonitor:
self.line_buffer.pop(0)
try:
level_part = line.split(" : ")[0]
level = level_part.split()[-1]
if any(keyword in line.upper() for keyword in ["ERROR"]):
logging.info(f"发现 {level} 日志!正在通过 SSH 获取上下文日志...")
full_log = self.get_remote_log_with_paramiko(
host=self.host,
username=self.username,
private_key_path=self.private_key_path,
passphrase=self.passphrase,
log_path=self.log_path,
num_lines=500
)
if full_log:
lines = full_log.split('\n')
for i, l in enumerate(lines):
if line.strip() in l.strip():
start = max(0, i - 100)
end = min(len(lines), i + 101)
context = lines[start:end]
with self.lock:
self.error_contexts.append({
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'error_line': line,
'context': context
})
file_path = self.save_error_contexts_to_json()
error_log_url = self.generate_error_log_url(file_path)
error_hash = hash(line.strip())
current_time = time.time()
if error_hash in self.sent_errors:
if current_time - self.sent_errors[error_hash] < self.resend_interval:
logging.info(f"该错误已在冷却期内,跳过重复发送:{line[:100]}...")
break
try:
dingding_send_message(error_log_url, ding_type=self.ding_type)
self.sent_errors[error_hash] = current_time
except Exception as e:
logging.info(f"发送钉钉消息失败: {e}")
logging.info("上下文日志如下:\n" + "\n".join(context))
break
else:
logging.error("获取日志失败,无法获取上下文")
parsed = self.parse_log_line(line)
if not parsed or parsed['level'] != 'ERROR':
return
logging.info(f"发现 {parsed['level']} 日志!正在通过 SSH 获取上下文日志...")
full_log = self.get_remote_log_with_paramiko(
host=self.host,
username=self.username,
private_key_path=self.private_key_path,
passphrase=self.passphrase,
log_path=self.log_path,
num_lines=500
)
if full_log:
lines = full_log.split('\n')
for i, l in enumerate(lines):
if line.strip() in l.strip():
start = max(0, i - 100)
end = min(len(lines), i + 101)
context = lines[start:end]
with self.lock:
self.error_contexts.append({
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'error_line': line,
'context': context,
'structured': parsed # 同时保存结构化数据
})
file_path = self.save_error_contexts_to_json()
error_log_url = self.generate_error_log_url(file_path)
# 使用结构化字段做 key
key = f"{parsed['module']}_{parsed['action']}_{parsed['message']}"
# 打印构造去重key值
logging.info(f"构造去重 key: {key}")
error_hash = hash(key)
current_time = time.time()
if error_hash in self.sent_errors:
if current_time - self.sent_errors[error_hash] < self.resend_interval:
logging.info(f"该错误已在冷却期内,跳过重复发送:{line[:100]}...")
break
try:
dingding_send_message(error_log_url, ding_type=self.ding_type)
self.sent_errors[error_hash] = current_time
except Exception as e:
logging.info(f"发送钉钉消息失败: {e}")
logging.info("上下文日志如下:\n" + "\n".join(context))
break
else:
logging.error("获取日志失败,无法获取上下文")
logging.debug("Received line: %s", line)
......@@ -270,6 +279,52 @@ class LogMonitor:
if 'client' in locals():
client.close()
def schedule_daily_clear(self):
"""每天凌晨自动清空已发送日志缓存"""
now = datetime.now()
next_run = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
delay = (next_run - now).total_seconds()
threading.Timer(delay, self.daily_clear_sent_errors).start()
def daily_clear_sent_errors(self):
"""每日凌晨执行,清空已发送日志缓存"""
logging.info("🔄 开始每日凌晨清理已发送错误日志缓存...")
with self.lock:
self.sent_errors.clear()
logging.info("✅ 已发送错误日志缓存已清空!")
self.schedule_daily_clear() # 递归调用,设置下一天任务
def parse_log_line(self, line):
logging.info(f"正在处理的日志行: {line}")
logging.info(f"repr格式: {repr(line)}")
timestamp_match = re.match(r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+)', line)
level_match = re.search(r'\s(ERROR|INFO|WARNING)\b', line) # 扩展支持更多日志级别
if not timestamp_match or not level_match:
return None
# 修改后的正则表达式:同时匹配中文【】和英文[]括号
bracket_content = re.findall(r'\[(.*?)\]|\【(.*?)\】', line)
bracket_content = [x[0] or x[1] for x in bracket_content if x[0] or x[1]]
logging.info(f"提取到的内容: {bracket_content}")
if len(bracket_content) < 3:
return None
module = bracket_content[0].strip()
action = bracket_content[1].strip()
message = bracket_content[2].strip()
return {
'level': level_match.group(1).strip(),
'module': module,
'action': action,
'message': message,
'raw': line.strip()
}
if __name__ == "__main__":
SERVERS = [
......@@ -296,12 +351,12 @@ if __name__ == "__main__":
"passphrase": "Ubains@123",
"logs": [
{
"log_path": "/var/www/javaredline/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log",
"log_path": "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "展厅预定系统-对内服务服务监测"
},
{
"log_path": "/var/www/javaredline/external-meeting-api/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "展厅预定系统-对服务服务监测"
"log_path": "/var/www/java/external-meeting-api/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "展厅预定系统-对服务服务监测"
}
]
}
......@@ -326,3 +381,21 @@ if __name__ == "__main__":
threads.append(thread)
thread.start()
logging.info(f"已启动对 {log_config['log_path']} 的监控")
# if __name__ == "__main__":
# import logging
#
# logging.basicConfig(level=logging.INFO)
#
# monitor = LogMonitor("localhost", "user", "key", "", "/tmp/log")
#
# test_lines = [
# "2025-06-10 17:53:47.000 ERROR [用户登录][验证失败][用户名或密码错误]",
# "2025-06-10 17:53:47.000 INFO [模块A][操作B][消息C]",
# "2025-06-10 17:53:47.000 ERROR 【用户登录】【验证失败】【用户名或密码错误】",
# ]
#
# for line in test_lines:
# parsed = monitor.parse_log_line(line)
# print(f"\n【测试行】{line}")
# print("解析结果:", parsed)
......@@ -32,6 +32,12 @@ class TopicDeclaration:
wd = GSTORE['wd']
name = self.name
# 执行完一个用例就刷新一下页面重置
wd.refresh()
wd.refresh()
wd.refresh()
sleep(10)
# 点击【议题申报】按钮进入模块
INFO("点击【议题申报】按钮")
safe_click((By.XPATH, "//div[@id='CreateTopic']"), wd)
......@@ -54,14 +60,16 @@ class TopicDeclaration:
if element_type == "click":
safe_click((locator_type, locator_value), wd)
sleep(2)
sleep(4)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "input":
safe_send_keys((locator_type, locator_value), element_value, wd)
sleep(2)
sleep(4)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "getTips":
notify_text = get_notify_text(wd, (locator_type, locator_value))
INFO(f"获取到的提示信息为:{notify_text}")
sleep(2)
sleep(4)
CHECK_POINT(f"获取到的提示信息为:{notify_text}", expented_result in notify_text)
SELENIUM_LOG_SCREEN(wd, "75")
......@@ -69,9 +77,4 @@ class TopicDeclaration:
text = elment_get_text((locator_type, locator_value), wd)
INFO(f"获取到的文本信息为:{text}")
CHECK_POINT(f"获取到的文本信息为:{text}", expented_result in text)
SELENIUM_LOG_SCREEN(wd, "75")
# 执行完一个用例就刷新一下页面重置
wd.refresh()
wd.refresh()
sleep(2)
\ No newline at end of file
SELENIUM_LOG_SCREEN(wd, "75")
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论