Skip to content
项目
群组
代码片段
帮助
正在加载...
帮助
为 GitLab 提交贡献
登录
切换导航
U
ubains-module-test
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
分枝图
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
计划
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
分枝图
统计图
创建新议题
作业
提交
议题看板
打开侧边栏
郑晓兵
ubains-module-test
Commits
7a9d4d6f
提交
7a9d4d6f
authored
6月 09, 2025
作者:
陈泽健
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
优化日志监控的异常处理。处理路径拼接方式,处理服务出现错误日志没有发送的问题,调试重复发送问题。
上级
fe624c3b
隐藏空白字符变更
内嵌
并排
正在显示
5 个修改的文件
包含
51 行增加
和
50 行删除
+51
-50
base.cpython-310.pyc
日志监测/Base/__pycache__/base.cpython-310.pyc
+0
-0
base.py
日志监测/Base/base.py
+1
-1
README.md
日志监测/README.md
+2
-2
error_log2025-06-09-19
日志监测/error_log/error_log2025-06-09-19
+0
-0
预定日志检测.py
日志监测/预定系统日志监测/预定日志检测.py
+48
-47
没有找到文件。
日志监测/Base/__pycache__/base.cpython-310.pyc
浏览文件 @
7a9d4d6f
No preview for this file type
日志监测/Base/base.py
浏览文件 @
7a9d4d6f
...
...
@@ -36,7 +36,7 @@ def dingding_send_message(error_log_url,ding_type):
log_type
=
'展厅预定-对内服务服务监测'
elif
ding_type
==
'展厅预定系统-对外服务服务监测'
:
log_type
=
'展厅预定-对外服务
服务监测
'
log_type
=
'展厅预定-对外服务
日志出现异常
'
logging
.
info
(
f
"预定服务日志类型:{log_type}"
)
...
...
日志监测/README.md
浏览文件 @
7a9d4d6f
...
...
@@ -8,4 +8,4 @@
-
处理运行12小时后被远程主机主动断开连接问题,通过配置 SSH Client 的 keepalive 参数,让连接保持活跃,避免超时断开。
-
增加多台服务器的连接监测,补充对展厅的日志监测,调整error_log日志文件存放路径,补充对应的ngrok映射目录。
3.
2025-06-09:
-
优化日志监控的异常处理。
\ No newline at end of file
-
优化日志监控的异常处理。处理路径拼接方式,处理服务出现错误日志没有发送的问题,调试重复发送问题。
\ No newline at end of file
日志监测/error_log/error_log2025-06-09-19
0 → 100644
浏览文件 @
7a9d4d6f
日志监测/预定系统日志监测/预定日志检测.py
浏览文件 @
7a9d4d6f
...
...
@@ -5,6 +5,7 @@ import logging
import
sys
import
os
import
json
import
socket
# 用于捕获 socket.error
# 配置日志输出到控制台
console_handler
=
logging
.
StreamHandler
()
...
...
@@ -37,21 +38,24 @@ except ImportError as e:
class
LogMonitor
:
def
__init__
(
self
,
host
,
username
,
private_key_path
,
passphrase
,
log_path
,
check_interval
=
1
,
ding_type
=
"标准版服务监测"
):
def
__init__
(
self
,
host
,
username
,
private_key_path
,
passphrase
,
log_path
,
check_interval
=
1
,
ding_type
=
"标准版服务监测"
,
resend_interval
=
300
):
self
.
host
=
host
self
.
username
=
username
self
.
private_key_path
=
private_key_path
self
.
passphrase
=
passphrase
self
.
log_path
=
log_path
self
.
check_interval
=
check_interval
# 日志检查间隔(秒)
self
.
ding_type
=
ding_type
# 钉钉群标识
self
.
check_interval
=
check_interval
self
.
ding_type
=
ding_type
self
.
client
=
None
self
.
channel
=
None
self
.
collecting
=
False
self
.
lock
=
threading
.
Lock
()
self
.
line_buffer
=
[]
# 缓存最近若干行日志,用于上下文提取
self
.
buffer_size
=
200
# 缓存最多保留多少行日志
self
.
error_contexts
=
[]
# 存储所有错误日志的上下文
self
.
line_buffer
=
[]
self
.
buffer_size
=
500
self
.
error_contexts
=
[]
self
.
sent_errors
=
{}
# 已发送的错误日志 {hash: last_send_time}
self
.
resend_interval
=
resend_interval
# 钉钉重发冷却时间(秒)
def
connect
(
self
):
try
:
...
...
@@ -68,8 +72,8 @@ class LogMonitor:
)
self
.
channel
=
self
.
client
.
invoke_shell
()
self
.
channel
.
setblocking
(
0
)
# 设置为非阻塞模式
self
.
channel
.
transport
.
set_keepalive
(
30
)
# 每隔 30 秒发一次 keepalive 包
self
.
channel
.
setblocking
(
0
)
self
.
channel
.
transport
.
set_keepalive
(
30
)
self
.
channel
.
send
(
f
"tail -f {self.log_path}
\n
"
)
logging
.
info
(
f
"Connected to {self.host}, monitoring {self.log_path}"
)
...
...
@@ -105,11 +109,16 @@ class LogMonitor:
while
self
.
collecting
:
try
:
if
self
.
channel
.
recv_ready
():
...
data
=
self
.
channel
.
recv
(
1024
)
.
decode
(
'utf-8'
,
errors
=
'ignore'
)
logging
.
debug
(
"Received raw data:
%
s"
,
data
)
for
line
in
data
.
splitlines
():
self
.
_process_line
(
line
.
strip
())
retry_count
=
0
else
:
time
.
sleep
(
self
.
check_interval
)
retry_count
=
0
# 成功时重置重试次数
except
(
paramiko
.
SSHException
,
paramiko
.
socket
.
error
,
OSError
)
as
e
:
retry_count
=
0
except
(
paramiko
.
SSHException
,
socket
.
error
,
OSError
)
as
e
:
logging
.
warning
(
f
"SSH 断开,准备重连... 错误: {e}"
)
self
.
restart_monitoring
()
...
...
@@ -118,28 +127,26 @@ class LogMonitor:
logging
.
error
(
"达到最大重试次数,停止监控"
)
self
.
stop_monitoring
()
return
time
.
sleep
(
min
(
5
*
retry_count
,
60
))
# 指数退避
time
.
sleep
(
min
(
5
*
retry_count
,
60
))
def
save_error_contexts_to_json
(
self
):
# 获取当前脚本所在目录
current_dir
=
os
.
path
.
dirname
(
os
.
path
.
abspath
(
__file__
))
try
:
current_file
=
__file__
except
NameError
:
import
inspect
current_file
=
inspect
.
getframeinfo
(
inspect
.
currentframe
())
.
filename
# 构建正确的相对路径(指向 ubains-module-test 根目录)
root_dir
=
os
.
path
.
dirname
(
current_dir
)
# 返回到 "日志监测"
root_dir
=
os
.
path
.
dirname
(
root_dir
)
# 返回到 "ubains-module-test"
current_dir
=
os
.
path
.
dirname
(
os
.
path
.
abspath
(
current_file
))
root_dir
=
os
.
path
.
dirname
(
current_dir
)
root_dir
=
os
.
path
.
dirname
(
root_dir
)
# 拼接目标路径
full_path
=
os
.
path
.
normpath
(
os
.
path
.
join
(
root_dir
,
"日志监测"
,
"error_log"
))
# 确保目录存在
os
.
makedirs
(
full_path
,
exist_ok
=
True
)
# 文件名带时间戳
timestamp
=
time
.
strftime
(
"
%
Y-
%
m-
%
d-
%
H:
%
M"
)
filename
=
f
"error_log{timestamp}.json"
file_path
=
os
.
path
.
join
(
full_path
,
filename
)
# 写入 JSON 文件
try
:
with
open
(
file_path
,
'w'
,
encoding
=
'utf-8'
)
as
f
:
json
.
dump
(
self
.
error_contexts
,
f
,
ensure_ascii
=
False
,
indent
=
4
)
...
...
@@ -150,20 +157,11 @@ class LogMonitor:
return
None
def
generate_error_log_url
(
self
,
file_path
):
# 根据本地存储路径,生成公网访问链接。
# 示例:
# file_path = D:\GithubData\自动化\ubains-module-test\预定系统\reports\error_log\error_log2025-06-05-20:15.json
# url = http://nat.ubainsyun.com:31133/error_log/error_log2025-06-05-20:15.json
if
not
file_path
:
return
None
# 从文件路径中提取最后一级目录名(error_log)和文件名
dir_name
=
os
.
path
.
basename
(
os
.
path
.
dirname
(
file_path
))
# 获取 error_log
filename
=
os
.
path
.
basename
(
file_path
)
# 拼接 URL
error_log_url
=
f
"http://nat.ubainsyun.com:31133/{dir_name}/{filename}"
error_log_url
=
f
"http://nat.ubainsyun.com:32233/{filename}"
logging
.
info
(
f
"生成公网访问链接: {error_log_url}"
)
return
error_log_url
...
...
@@ -173,12 +171,11 @@ class LogMonitor:
if
len
(
self
.
line_buffer
)
>
self
.
buffer_size
:
self
.
line_buffer
.
pop
(
0
)
# 提取日志级别字段(如 INFO / ERROR)
try
:
level_part
=
line
.
split
(
" : "
)[
0
]
# 取 "时间戳 LEVEL" 部分
level
=
level_part
.
split
()[
-
1
]
# 取最后一个词作为日志级别
level_part
=
line
.
split
(
" : "
)[
0
]
level
=
level_part
.
split
()[
-
1
]
if
level
in
[
"ERROR"
,
"Exception"
]
:
if
any
(
keyword
in
line
.
upper
()
for
keyword
in
[
"ERROR"
])
:
logging
.
info
(
f
"发现 {level} 日志!正在通过 SSH 获取上下文日志..."
)
full_log
=
self
.
get_remote_log_with_paramiko
(
...
...
@@ -187,7 +184,7 @@ class LogMonitor:
private_key_path
=
self
.
private_key_path
,
passphrase
=
self
.
passphrase
,
log_path
=
self
.
log_path
,
num_lines
=
2
00
num_lines
=
5
00
)
if
full_log
:
...
...
@@ -198,7 +195,6 @@ class LogMonitor:
end
=
min
(
len
(
lines
),
i
+
101
)
context
=
lines
[
start
:
end
]
# 将上下文日志保存到 error_contexts 中
with
self
.
lock
:
self
.
error_contexts
.
append
({
'timestamp'
:
time
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
),
...
...
@@ -206,13 +202,20 @@ class LogMonitor:
'context'
:
context
})
# 保存为 JSON 并生成公网链接
file_path
=
self
.
save_error_contexts_to_json
()
error_log_url
=
self
.
generate_error_log_url
(
file_path
)
# 调用钉钉发送函数
error_hash
=
hash
(
line
.
strip
())
current_time
=
time
.
time
()
if
error_hash
in
self
.
sent_errors
:
if
current_time
-
self
.
sent_errors
[
error_hash
]
<
self
.
resend_interval
:
logging
.
info
(
f
"该错误已在冷却期内,跳过重复发送:{line[:100]}..."
)
break
try
:
dingding_send_message
(
error_log_url
,
ding_type
=
self
.
ding_type
)
self
.
sent_errors
[
error_hash
]
=
current_time
except
Exception
as
e
:
logging
.
info
(
f
"发送钉钉消息失败: {e}"
)
...
...
@@ -220,24 +223,23 @@ class LogMonitor:
break
else
:
logging
.
error
(
"获取日志失败,无法获取上下文"
)
logging
.
debug
(
"Received line:
%
s"
,
line
)
except
IndexError
:
pass
except
Exception
as
e
:
logging
.
exception
(
f
"获取上下文日志失败: {e}"
)
def
restart_monitoring
(
self
):
"""自动重启日志监控"""
logging
.
info
(
"尝试重新启动日志监控..."
)
self
.
stop_monitoring
()
time
.
sleep
(
5
)
self
.
start_monitoring
()
@
staticmethod
def
get_remote_log_with_paramiko
(
host
,
username
,
private_key_path
,
passphrase
,
log_path
,
num_lines
=
1000
,
timeout
=
30
,
filter_word
=
None
):
"""
使用 Paramiko 获取远程服务器的日志文件内容,并通过过滤词过滤日志内容.
"""
def
get_remote_log_with_paramiko
(
host
,
username
,
private_key_path
,
passphrase
,
log_path
,
num_lines
=
1000
,
timeout
=
30
):
try
:
private_key
=
paramiko
.
RSAKey
.
from_private_key_file
(
private_key_path
,
password
=
passphrase
)
client
=
paramiko
.
SSHClient
()
...
...
@@ -270,7 +272,6 @@ class LogMonitor:
if
__name__
==
"__main__"
:
# 多个服务器配置
SERVERS
=
[
{
"host"
:
"192.168.5.235"
,
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论