提交 57f9a879 authored 作者: 陈泽健's avatar 陈泽健

补充预定系统对内服务日志的日志监测脚本,获取到错误日志信息会进行收集前后文,并调用钉钉消息发送至钉钉群中。

上级 3d1590e0
1. 2025-06-05:
- 补充预定系统对内服务日志的日志监测脚本,获取到错误日志信息会进行收集前后文,并调用钉钉消息发送至钉钉群中。
\ No newline at end of file
import json
import logging
import time
import urllib
from urllib.parse import urlencode
import hmac
import hashlib
import base64
import requests
def dingding_send_message(error_message,ding_type):
"""
发送钉钉机器人消息
参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x
:param text: 消息文本
:param mobile: 需要@的手机号列表
:param ding_type: 钉钉机器人类型,用于选择不同的 Webhook URL 和密钥
"""
# 记录调用此函数的日志
logging.info("开始构建并发送钉钉机器人消息")
# 钉钉机器人的 Webhook URL 和密钥(正式环境)
# webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=b0eea0bbf097ce3badb4c832d2cd0267a50486f395ec8beca6e2042102bb295b'
# secret = 'SEC928b11659c5fd6476cfa2042edbf56da876abf759289f7e4d3c671fb9a81bf43'
# 钉钉机器人的 Webhook URL 和密钥(测试环境)
if ding_type == '标准版服务监测':
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'
secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'
# 生成时间戳
timestamp = str(round(time.time() * 1000))
# 生成签名
secret_enc = secret.encode('utf-8')
string_to_sign = f'{timestamp}\n{secret}'
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
# 构建最终的 Webhook URL
params = {
'access_token': webhook_url.split('=')[1],
'timestamp': timestamp,
'sign': sign
}
encoded_params = urllib.parse.urlencode(params)
final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}'
# 记录最终的 Webhook URL
logging.info(f"钉钉机器人Webhook URL: {final_webhook_url}")
# 构建消息体
headers = {'Content-Type': 'application/json'}
message = {
'msgtype': 'link',
'markdown': {
'title': "监测到预定服务出现异常日志,请进行排查处理!!!",
'text': error_message,
},
"at": {
"atMobiles": "13724387318",
"isAtAll": True
}
}
try:
# 发送 POST 请求
response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers)
# 检查响应状态码
if response.status_code == 200:
logging.info('消息发送成功!')
logging.info(f'响应内容: {response.text}')
else:
logging.error(f'消息发送失败,状态码: {response.status_code}')
logging.error(f'响应内容: {response.text}')
except requests.exceptions.RequestException as e:
logging.error(f'请求异常: {e}')
\ No newline at end of file
import paramiko
import threading
import time
import logging
import sys
import os
# 配置日志输出
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
INFO = logging.info
# 添加 base.py 路径到系统路径中(根据你的实际项目结构调整)
sys.path.append(os.path.abspath(".."))
# 从 base.py 导入钉钉发送函数
from base import dingding_send_message
# 要监控的关键字
ERROR_KEYWORDS = ["ERROR", "Exception"]
class LogMonitor:
def __init__(self, host, username, private_key_path, passphrase, log_path, check_interval=1):
self.host = host
self.username = username
self.private_key_path = private_key_path
self.passphrase = passphrase
self.log_path = log_path
self.check_interval = check_interval # 日志检查间隔(秒)
self.client = None
self.channel = None
self.collecting = False
self.lock = threading.Lock()
self.line_buffer = [] # 缓存最近若干行日志,用于上下文提取
self.buffer_size = 200 # 缓存最多保留多少行日志
self.error_contexts = [] # 存储所有错误日志的上下文
def connect(self):
try:
private_key = paramiko.RSAKey.from_private_key_file(self.private_key_path, password=self.passphrase)
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(self.host, username=self.username, pkey=private_key)
self.channel = self.client.invoke_shell()
self.channel.send(f"tail -f {self.log_path}\n")
INFO(f"Connected to {self.host}, monitoring {self.log_path}")
return True
except Exception as e:
INFO(f"连接失败: {e}")
return False
def start_monitoring(self):
if self.collecting:
INFO("Already monitoring logs.")
return
if not self.connect():
return
self.collecting = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.start()
INFO("开始日志监控...")
def stop_monitoring(self):
self.collecting = False
if self.channel:
self.channel.close()
if self.client:
self.client.close()
INFO("停止日志监控.")
def _monitor_loop(self):
try:
while self.collecting:
if self.channel.recv_ready():
output = self.channel.recv(1024).decode('utf-8', errors='ignore')
lines = output.strip().split('\n')
for line in lines:
self._process_line(line)
else:
time.sleep(self.check_interval)
except (paramiko.SSHException, paramiko.socket.error, OSError) as e:
INFO(f"SSH 连接中断: {e}")
self.restart_monitoring()
except Exception as e:
INFO(f"监控过程中发生异常: {e}")
self.restart_monitoring()
def _process_line(self, line):
with self.lock:
self.line_buffer.append(line)
if len(self.line_buffer) > self.buffer_size:
self.line_buffer.pop(0)
# 提取日志级别字段(如 INFO / ERROR)
try:
level_part = line.split(" : ")[0] # 取 "时间戳 LEVEL" 部分
level = level_part.split()[-1] # 取最后一个词作为日志级别
if level in ["ERROR", "Exception"]:
INFO(f"发现 {level} 日志!正在通过 SSH 获取上下文日志...")
full_log = self.get_remote_log_with_paramiko(
host=self.host,
username=self.username,
private_key_path=self.private_key_path,
passphrase=self.passphrase,
log_path=self.log_path,
num_lines=200
)
if full_log:
lines = full_log.split('\n')
for i, l in enumerate(lines):
if line.strip() in l.strip():
start = max(0, i - 100)
end = min(len(lines), i + 101)
context = lines[start:end]
# 将上下文日志保存到 error_contexts 中
with self.lock:
self.error_contexts.append({
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'error_line': line,
'context': context
})
# 构造钉钉消息内容
error_time = time.strftime('%Y-%m-%d %H:%M:%S')
error_log_snippet = "\n".join(context)
message_text = (
f"- **错误日志发生时间**: {error_time}\n"
f"- **错误行**: `{line.strip()}`\n"
f"- **上下文日志**:\n"
f"\n{error_log_snippet}\n\n"
f"请及时查看处理!@相关人员"
)
# 调用钉钉发送函数
try:
dingding_send_message(message_text, ding_type="标准版服务监测")
except Exception as e:
INFO(f"发送钉钉消息失败: {e}")
INFO("上下文日志如下:\n" + "\n".join(context))
break
else:
INFO("获取日志失败,无法获取上下文")
except IndexError:
pass
except Exception as e:
INFO(f"获取上下文日志失败: {e}")
def restart_monitoring(self):
"""自动重启日志监控"""
INFO("尝试重新启动日志监控...")
self.stop_monitoring()
time.sleep(5)
self.start_monitoring()
@staticmethod
def get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path, num_lines=1000, timeout=30,
filter_word=None):
"""
使用 Paramiko 获取远程服务器的日志文件内容,并通过过滤词过滤日志内容.
Args:
host (str): 服务器 IP 地址或域名.
username (str): 用户名.
private_key_path (str): SSH 私钥文件路径.
passphrase (str): 私钥文件的 passphrase.
log_path (str): 日志文件路径.
num_lines (int): 要获取的日志行数 (默认 100).
timeout (int): SSH 命令执行的超时时间(秒).
filter_word (str): 过滤词,只有包含该词的日志行才会被返回 (默认 None).
Returns:
str: 获取的日志内容,如果出错返回 None.
"""
try:
print(f"Loading private key from {private_key_path}...")
if passphrase:
private_key = paramiko.RSAKey.from_private_key_file(private_key_path, password=passphrase)
else:
private_key = paramiko.RSAKey.from_private_key_file(private_key_path)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host, username=username, pkey=private_key, timeout=timeout)
command = f"tail -n {num_lines} {log_path}"
stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
error = stderr.read().decode('utf-8')
if error:
print(f"Error: {error}")
return None
output = stdout.read().decode('utf-8')
print("Successfully retrieved log content.")
print("Full log content:")
print(output) # 打印完整的日志内容
if filter_word:
filtered_output = "\n".join([line for line in output.split('\n') if filter_word in line])
if not filtered_output:
print(f"No lines found containing the filter word: {filter_word}")
return filtered_output
return output
except paramiko.ssh_exception.PasswordRequiredException:
print("Error: The private key file is encrypted but no passphrase was provided.")
return None
except paramiko.ssh_exception.SSHException as e:
print(f"SSH Error: {e}")
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
finally:
if 'client' in locals():
client.close()
if __name__ == "__main__":
# 配置参数
host = "192.168.5.218"
username = "root"
private_key_path = "C:/Users/29194/.ssh/id_rsa"
passphrase = "Ubains@123"
log_path = "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log"
monitor = LogMonitor(host, username, private_key_path, passphrase, log_path)
try:
monitor.start_monitoring()
while True:
time.sleep(1)
except KeyboardInterrupt:
monitor.stop_monitoring()
print("用户终止监控。")
......@@ -6,24 +6,32 @@ Upgrade command line: yum upgrade
Activate the web console with: systemctl enable --now cockpit.socket
Last login: Wed May 21 10:25:53 2025 from 192.168.9.51
Last login: Thu Jun 5 16:14:44 2025 from 192.168.9.51
tail -f /var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log
Welcome to 4.19.90-2403.3.0.0270.87.uel20.x86_64
System information as of time: 2025年 05月 21日 星期三 10:27:55 CST
System information as of time: 2025年 06月 05日 星期四 16:47:27 CST
System load: 0.32
Processes: 449
Memory used: 79.6%
Swap used: 44.9%
Usage On: 51%
System load: 0.75
Processes: 407
Memory used: 68.0%
Swap used: 23.2%
Usage On: 60%
IP address: 192.168.5.218
IP address: 172.17.0.1
Users online: 1
[?2004h[root@localhost ~]# tail -f /var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log
[?2004l 2025-05-21 10:27:00.012 INFO- [CorpWechatCommonQuartz企业微信公共定时任务类][定时同步会议][IP:][CN-99N-UBAINS][QUARTZ][企微功能开关,决定是否同步]:false
2025-05-21 10:27:00.015 INFO- [CorpWechatCommonQuartz企业微信公共定时任务类][定时同步会议][
\ No newline at end of file
[?2004l 2025-06-05 16:47:02.432 INFO : [ThirdSyncMeetCallable-第三方同步会议线程类][第三方会议同步][IP:][n03q9qwtsddfiwqqgfx7cipzt0lo2mhp][SYNC][会议室编号对比-相同才进行后面的判断-原编号n03q9qwtsddfiwqqgfx7cipzt0lo2mhp-为true]:true
2025-06-05 16:47:02.432 INFO : [ThirdSyncMeetCallable-第三方同步会议线程类][第三方会议同步][IP:][Thu Jun 05 16:46:02 CST 2025][SYNC][修改触发-更新会议的更新时间不同]:"2025-06-05 16:46:02"
2025-06-05 16:47:02.432 INFO
: [ThirdSyncMeetCallable-第三方同步会议线程类][第三方会议同步][IP:][2025-1748915224214][SYNC][被修改的会议名称为]:"结束正在召开的周期会议test1"
2025-06-05 16:47:02.432 INFO : [ThirdSyncServiceImpl第三方页面同步服务实现类][第三方会议同步][IP:][][SYNC][数据库操作-开始]:""
2025-06-05 16:47:02.433 INFO : [ThirdSyncServiceImpl第三方页面同步服务实现类][第三方会议同步][IP:][][SYNC][新增会议-失败]:""
2025-06-05 16:47:03.222 INFO : [ThirdSyncServiceImpl第三方页面同步服务实现类][第三方会议同步][IP:][][SYNC][修改会议-成功]:"会议数量1002"
2025-06-05 16:47:03.222 INFO : 插入会议操作完毕-------------------------------->耗时:790
2025-06-05 16:47:03.286 INFO : [ThirdSyncServiceImpl第三方页面同步服务实现类][第三方会议同步][IP:][][SYNC][结束]:"原有第三方会议数量为+1002新增的会议数量为-0修改的会议数量为-1002"
2025-06-05 16:47:03.287 INFO : [ZsjServi
\ No newline at end of file
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论