提交 900ad813 authored 作者: 陈泽健's avatar 陈泽健

处理日志监测错误异常日志的钉钉发送消息流程,增加预定系统对内外的日志监测,增加通过链接形式打开日志存储的文本数据,支持公网访问。

上级 25f9b139
......@@ -9,7 +9,7 @@ import base64
import requests
import hytest
def dingding_send_message(error_message,ding_type):
def dingding_send_message(error_log_url,ding_type):
"""
发送钉钉机器人消息
参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x
......@@ -20,14 +20,17 @@ def dingding_send_message(error_message,ding_type):
"""
# 记录调用此函数的日志
logging.info("开始构建并发送钉钉机器人消息")
# 钉钉机器人的 Webhook URL 和密钥(正式环境)
# webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=b0eea0bbf097ce3badb4c832d2cd0267a50486f395ec8beca6e2042102bb295b'
# secret = 'SEC928b11659c5fd6476cfa2042edbf56da876abf759289f7e4d3c671fb9a81bf43'
# 钉钉机器人的 Webhook URL 和密钥(测试环境)
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'
secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'
# 日志类型:1.预定系统对内服务;2.预定系统对外服务
log_type = ''
if ding_type == '标准版预定-对内服务服务监测':
log_type = '预定系统对内服务日志出现异常,请尽快排查处理!!!'
elif ding_type == '标准版预定-对外服务服务监测':
log_type = '预定系统对外服务日志出现异常,请尽快排查处理!!!'
if ding_type == '标准版服务监测':
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'
secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'
# 生成时间戳
timestamp = str(round(time.time() * 1000))
......@@ -55,9 +58,10 @@ def dingding_send_message(error_message,ding_type):
headers = {'Content-Type': 'application/json'}
message = {
'msgtype': 'link',
'markdown': {
'title': "监测到预定服务出现异常日志,请进行排查处理!!!",
'text': error_message,
'link': {
'title': log_type,
'messageUrl': error_log_url,
'text': log_type
},
"at": {
"atMobiles": "13724387318",
......
1. 2025-06-05:
- 补充预定系统对内服务日志的日志监测脚本,获取到错误日志信息会进行收集前后文,并调用钉钉消息发送至钉钉群中。
- 调整日志监测模块的文件规范路径,补充相关日志打印。
\ No newline at end of file
- 调整日志监测模块的文件规范路径,补充相关日志打印。
- 处理日志监测错误异常日志的钉钉发送消息流程,增加预定系统对内外的日志监测,增加通过链接形式打开日志存储的文本数据,支持公网访问。
\ No newline at end of file
......@@ -4,6 +4,7 @@ import time
import logging
import sys
import os
import json
# 配置日志输出到控制台
console_handler = logging.StreamHandler()
......@@ -34,14 +35,16 @@ except ImportError as e:
for p in sys.path:
print(" -", p)
class LogMonitor:
def __init__(self, host, username, private_key_path, passphrase, log_path, check_interval=1):
def __init__(self, host, username, private_key_path, passphrase, log_path, check_interval=1, ding_type="标准版服务监测"):
self.host = host
self.username = username
self.private_key_path = private_key_path
self.passphrase = passphrase
self.log_path = log_path
self.check_interval = check_interval # 日志检查间隔(秒)
self.ding_type = ding_type # 钉钉群标识
self.client = None
self.channel = None
self.collecting = False
......@@ -75,7 +78,7 @@ class LogMonitor:
self.collecting = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.start()
logging.info("开始日志监控...")
logging.info(f"开始监控日志: {self.log_path}")
def stop_monitoring(self):
self.collecting = False
......@@ -83,7 +86,7 @@ class LogMonitor:
self.channel.close()
if self.client:
self.client.close()
logging.info("停止日志监控.")
logging.info(f"停止对日志 {self.log_path} 的监控.")
def _monitor_loop(self):
try:
......@@ -105,6 +108,53 @@ class LogMonitor:
logging.info(f"监控过程中发生异常: {e}")
self.restart_monitoring()
def save_error_contexts_to_json(self):
# 获取当前脚本所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建正确的相对路径(指向 ubains-module-test 根目录)
root_dir = os.path.dirname(current_dir) # 返回到 "日志监测"
root_dir = os.path.dirname(root_dir) # 返回到 "ubains-module-test"
# 拼接目标路径
full_path = os.path.normpath(os.path.join(root_dir, "预定系统", "reports", "error_log"))
# 确保目录存在
os.makedirs(full_path, exist_ok=True)
# 文件名带时间戳
timestamp = time.strftime("%Y-%m-%d-%H:%M")
filename = f"error_log{timestamp}.json"
file_path = os.path.join(full_path, filename)
# 写入 JSON 文件
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(self.error_contexts, f, ensure_ascii=False, indent=4)
logging.info(f"✅ 错误上下文已保存至: {file_path}")
return file_path
except Exception as e:
logging.error(f"❌ 保存 JSON 文件失败: {e}")
return None
def generate_error_log_url(self, file_path):
# 根据本地存储路径,生成公网访问链接。
# 示例:
# file_path = D:\GithubData\自动化\ubains-module-test\预定系统\reports\error_log\error_log2025-06-05-20:15.json
# url = http://nat.ubainsyun.com:31133/error_log/error_log2025-06-05-20:15.json
if not file_path:
return None
# 从文件路径中提取最后一级目录名(error_log)和文件名
dir_name = os.path.basename(os.path.dirname(file_path)) # 获取 error_log
filename = os.path.basename(file_path)
# 拼接 URL
error_log_url = f"http://nat.ubainsyun.com:31133/{dir_name}/{filename}"
logging.info(f"生成公网访问链接: {error_log_url}")
return error_log_url
def _process_line(self, line):
with self.lock:
self.line_buffer.append(line)
......@@ -144,21 +194,13 @@ class LogMonitor:
'context': context
})
# 构造钉钉消息内容
error_time = time.strftime('%Y-%m-%d %H:%M:%S')
error_log_snippet = "\n".join(context)
message_text = (
f"- **错误日志发生时间**: {error_time}\n"
f"- **错误行**: `{line.strip()}`\n"
f"- **上下文日志**:\n"
f"\n{error_log_snippet}\n\n"
f"请及时查看处理!@相关人员"
)
# 保存为 JSON 并生成公网链接
file_path = self.save_error_contexts_to_json()
error_log_url = self.generate_error_log_url(file_path)
# 调用钉钉发送函数
try:
dingding_send_message(message_text, ding_type="标准版服务监测")
dingding_send_message(error_log_url, ding_type=self.ding_type)
except Exception as e:
logging.info(f"发送钉钉消息失败: {e}")
......@@ -183,28 +225,9 @@ class LogMonitor:
filter_word=None):
"""
使用 Paramiko 获取远程服务器的日志文件内容,并通过过滤词过滤日志内容.
Args:
host (str): 服务器 IP 地址或域名.
username (str): 用户名.
private_key_path (str): SSH 私钥文件路径.
passphrase (str): 私钥文件的 passphrase.
log_path (str): 日志文件路径.
num_lines (int): 要获取的日志行数 (默认 100).
timeout (int): SSH 命令执行的超时时间(秒).
filter_word (str): 过滤词,只有包含该词的日志行才会被返回 (默认 None).
Returns:
str: 获取的日志内容,如果出错返回 None.
"""
try:
print(f"Loading private key from {private_key_path}...")
if passphrase:
private_key = paramiko.RSAKey.from_private_key_file(private_key_path, password=passphrase)
else:
private_key = paramiko.RSAKey.from_private_key_file(private_key_path)
private_key = paramiko.RSAKey.from_private_key_file(private_key_path, password=passphrase)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(host, username=username, pkey=private_key, timeout=timeout)
......@@ -214,30 +237,20 @@ class LogMonitor:
error = stderr.read().decode('utf-8')
if error:
print(f"Error: {error}")
logging.error(f"执行命令失败: {error}")
return None
output = stdout.read().decode('utf-8')
print("Successfully retrieved log content.")
print("Full log content:")
print(output) # 打印完整的日志内容
if filter_word:
filtered_output = "\n".join([line for line in output.split('\n') if filter_word in line])
if not filtered_output:
print(f"No lines found containing the filter word: {filter_word}")
return filtered_output
return output
except paramiko.ssh_exception.PasswordRequiredException:
print("Error: The private key file is encrypted but no passphrase was provided.")
logging.error("私钥加密但未提供密码。")
return None
except paramiko.ssh_exception.SSHException as e:
print(f"SSH Error: {e}")
logging.error(f"SSH 错误: {e}")
return None
except Exception as e:
print(f"An error occurred: {e}")
logging.exception(f"获取日志失败: {e}")
return None
finally:
if 'client' in locals():
......@@ -250,14 +263,42 @@ if __name__ == "__main__":
username = "root"
private_key_path = "C:/Users/29194/.ssh/id_rsa"
passphrase = "Ubains@123"
log_path = "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log"
monitor = LogMonitor(host, username, private_key_path, passphrase, log_path)
LOG_PATHS = [
{
"log_path": "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log",
"ding_type": "标准版预定-对内服务服务监测"
},
{
"log_path": "/var/www/java/api-java-meeting2.0/logs/another-log-file.log",
"ding_type": "标准版预定-对外服务服务监测"
}
]
monitors = []
threads = []
for config in LOG_PATHS:
monitor = LogMonitor(
host=host,
username=username,
private_key_path=private_key_path,
passphrase=passphrase,
log_path=config["log_path"],
ding_type=config["ding_type"]
)
monitors.append(monitor)
thread = threading.Thread(target=monitor.start_monitoring)
threads.append(thread)
thread.start()
logging.info(f"已启动对 {config['log_path']} 的监控")
try:
monitor.start_monitoring()
while True:
time.sleep(1)
except KeyboardInterrupt:
monitor.stop_monitoring()
print("用户终止监控。")
logging.info("用户终止监控,正在停止所有监控器...")
for monitor in monitors:
monitor.stop_monitoring()
print("所有日志监控已停止。")
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论