提交 b5481b46 authored 作者: 陈泽健's avatar 陈泽健

feat(monitoring): 添加钉钉消息发送功能并优化日志审计

- 集成钉钉机器人API,实现自动化告警消息发送
- 重构日志审计功能,添加主机信息标识
- 修复MySQL连接数监测中的逻辑错误
- 更新文档状态标记,完善需求实现进度
- 优化代码格式化和参数传递方式
上级 3940433b
......@@ -79,9 +79,9 @@ def browser_init(login_type):
# chromedriver下载地址:https://googlechromelabs.github.io/chrome-for-testing/
# 自动化运行服务器的chromedriver路径:
# 云电脑
# service = Service(r'E:\Python\Scripts\chromedriver.exe')
service = Service(r'E:\Python\Scripts\chromedriver.exe')
# 自动化虚拟机##
service = Service(r'C:\Program Files\Python310\Scripts\chromedriver.exe')
# service = Service(r'C:\Program Files\Python310\Scripts\chromedriver.exe')
# 尝试创建WebDriver实例并执行初始化操作
try:
# 创建WebDriver实例
......
This source diff could not be displayed because it is too large. You can view the blob instead.
import base64
import hashlib
import hmac
import urllib
import paramiko
import socket
import re
......@@ -7,6 +12,8 @@ from typing import List, Optional, Tuple
import logging
from logging.handlers import RotatingFileHandler
import threading
import requests
from rich import json
@dataclass
......@@ -339,7 +346,8 @@ def monitor_log_errors_once(
sys_name: str,
log_path: str,
logger: logging.Logger,
max_context: int = 50
max_context: int = 50,
host: str = ""
) -> None:
"""
监测 ERROR 并记录上下 50 行的发生时间段:
......@@ -347,17 +355,17 @@ def monitor_log_errors_once(
"""
ctx, err_count, (start_ts, end_ts) = fetch_error_context(ssh, log_path, max_context=max_context, max_sections=3)
if err_count == 0:
logger.info(f"[ERROR审计] 系统={sys_name} 日志={log_path} 最近段落未发现 ERROR")
logger.info(f"[ERROR审计] 主机={host} 系统={sys_name} 日志={log_path} 最近段落未发现 ERROR")
return
# 记录发生时间段和上下文
if start_ts or end_ts:
logger.warning(f"[ERROR出现] 系统={sys_name} 日志={log_path} 最近5000行 ERROR数量={err_count} 发生时间段=[{start_ts} ~ {end_ts}]")
logger.warning(f"[ERROR出现] 主机={host} 系统={sys_name} 日志={log_path} 最近5000行 ERROR数量={err_count} 发生时间段=[{start_ts} ~ {end_ts}]")
else:
logger.warning(f"[ERROR出现] 系统={sys_name} 日志={log_path} 最近5000行 ERROR数量={err_count}")
logger.warning(f"[ERROR出现] 主机={host} 系统={sys_name} 日志={log_path} 最近5000行 ERROR数量={err_count}")
if ctx:
logger.info(f"[ERROR上下文] 系统={sys_name} 日志={log_path}\n{ctx}")
logger.info(f"[ERROR上下文] 主机={host} 系统={sys_name} 日志={log_path}\n{ctx}")
# 用于日志暴涨增量统计(上一轮总行数与采样时间戳)
BURST_STATE: dict[Tuple[str, str], Tuple[int, float]] = {}
......@@ -377,14 +385,14 @@ def monitor_log_burst_once(
# 确认文件存在
_, _, code = ssh.exec(f"bash -lc \"test -f '{log_path}'\"")
if code != 0:
logger.info(f"[日志暴涨审计] 系统={sys_name} 日志不存在:{log_path}")
logger.info(f"[日志暴涨审计] 主机={host} 系统={sys_name} 累积中:窗口{elapsed:.0f}s 未达 {window_seconds}s,新增行数={delta_lines}")
return
out, _, _ = ssh.exec(f"bash -lc \"wc -l < '{log_path}' || echo 0\"")
try:
total_lines = int(out.strip())
except Exception:
logger.info(f"[日志暴涨审计] 系统={sys_name} 无法获取总行数")
logger.info(f"[日志暴涨审计] 主机={host} 系统={sys_name} 无法获取总行数")
return
now_out, _, _ = ssh.exec("bash -lc \"date +%s\"")
......@@ -398,7 +406,7 @@ def monitor_log_burst_once(
if not last:
BURST_STATE[key] = (total_lines, now_ts)
logger.info(f"[日志暴涨审计] 系统={sys_name} 初始化窗口,总行数={total_lines}")
logger.info(f"[日志暴涨审计] 主机={host} 系统={sys_name} 初始化窗口,总行数={total_lines}")
return
last_total, last_ts = last
......@@ -409,7 +417,7 @@ def monitor_log_burst_once(
BURST_STATE[key] = (total_lines, now_ts)
if elapsed < window_seconds:
logger.info(f"[日志暴涨审计] 系统={sys_name} 累积中:窗口{elapsed:.0f}s 未达 {window_seconds}s,新增行数={delta_lines}")
logger.info(f"[日志暴涨审计] 主机={host} 系统={sys_name} 累积中:窗口{elapsed:.0f}s 未达 {window_seconds}s,新增行数={delta_lines}")
return
rate = delta_lines / elapsed
......@@ -420,9 +428,9 @@ def monitor_log_burst_once(
end_ts_out, _, _ = ssh.exec("bash -lc \"date -d @"+str(int(now_ts))+" +%Y-%m-%d\\ %H:%M:%S 2>/dev/null || date +%Y-%m-%d\\ %H:%M:%S\"")
if delta_lines >= min_lines_threshold or rate >= rate_threshold_per_sec:
logger.warning(f"[日志打印暴涨] 系统={sys_name} 日志={log_path} {time_info},时间段=[{start_ts_out.strip()} ~ {end_ts_out.strip()}]")
logger.warning(f"[日志打印暴涨] 主机={host} 系统={sys_name} 日志={log_path} {time_info},时间段=[{start_ts_out.strip()} ~ {end_ts_out.strip()}]")
else:
logger.info(f"[日志暴涨审计] 系统={sys_name} 日志={log_path} {time_info},未发现暴涨")
logger.info(f"[日志暴涨审计] 主机={host} 系统={sys_name} 日志={log_path} {time_info},未发现暴涨")
# 更新:聚合监测时传入 host 用于区分 BURST_STATE
def monitor_logs_once(ssh: SSHClientWrapper, platform: PlatformInfo, systems: SystemInfo, logger: logging.Logger, host: str = "") -> None:
......@@ -436,21 +444,11 @@ def monitor_logs_once(ssh: SSHClientWrapper, platform: PlatformInfo, systems: Sy
for sys_name, log_path in targets:
monitor_log_burst_once(
ssh=ssh,
host=host,
sys_name=sys_name,
log_path=log_path,
logger=logger,
window_seconds=300,
min_lines_threshold=1000,
rate_threshold_per_sec=5.0,
ssh=ssh, host=host, sys_name=sys_name, log_path=log_path, logger=logger,
window_seconds=300, min_lines_threshold=1000, rate_threshold_per_sec=5.0,
)
monitor_log_errors_once(
ssh=ssh,
sys_name=sys_name,
log_path=log_path,
logger=logger,
max_context=50,
ssh=ssh, sys_name=sys_name, log_path=log_path, logger=logger, max_context=50, host=host,
)
# ==== 4.3 内存资源消耗状态(按主机维度) ====
......@@ -602,6 +600,67 @@ def monitor_mysql_once(
else:
logger.info(f"[MySQL暴涨审计] {host} 窗口={elapsed:.0f}s 增量={delta_conn} 速率={rate:.2f}/s 未发现暴涨")
def dingding_send_message(title, text):
"""
发送钉钉机器人消息
参考接口文档:https://open.dingtalk.com/document/orgapp/custom-robots-send-group-messages#title-7fs-kgs-36x
:param latest_report: 测试报告链接
:param title: 消息标题
:param mobile: 需要@的手机号列表
:param ding_type: 钉钉机器人类型,用于选择不同的 Webhook URL 和密钥
"""
# 记录调用此函数的日志
logging.info("开始构建并发送钉钉机器人消息")
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'
secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'
# 生成时间戳
timestamp = str(round(time.time() * 1000))
# 生成签名
secret_enc = secret.encode('utf-8')
string_to_sign = f'{timestamp}\n{secret}'
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
# 构建最终的 Webhook URL
params = {
'access_token': webhook_url.split('=')[1],
'timestamp': timestamp,
'sign': sign
}
encoded_params = urllib.parse.urlencode(params)
final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}'
# 记录最终的 Webhook URL
logging.info(f"钉钉机器人Webhook URL: {final_webhook_url}")
# 构建消息体
headers = {'Content-Type': 'application/json'}
message = {
'msgtype': 'markdown',
'markdown': {
'title': title,
'text': text,
}
}
try:
# 发送 POST 请求
response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers)
# 检查响应状态码
if response.status_code == 200:
logging.info('消息发送成功!')
logging.info(f'响应内容: {response.text}')
else:
logging.error(f'消息发送失败,状态码: {response.status_code}')
logging.error(f'响应内容: {response.text}')
except requests.exceptions.RequestException as e:
logging.error(f'请求异常: {e}')
# 全局MySQL账号配置(PRD)
MYSQL_USER = "root"
......
......@@ -21,7 +21,7 @@
##### 1、SSH连接(✅ 已实现):
支持预设服务器列表
1、192.168.5.48 root Ubains@1234
2、192.168.5.44 root Ubains@1234
2、192.168.5.44 root Ubains@123
##### 2、平台识别(✅ 已实现):
自动检测目标服务器平台类型(检测 /data/services 目录,如果没有则是传统平台)
......@@ -29,10 +29,10 @@
##### 3、系统识别(✅ 已实现):
自动检测目标服务器的系统类型(检测容器分为三种:ujava、upython、upython_voice,如果有ujava则有会议预定系统、python对应运维集控系统、upython_voice对应转录系统)
##### 4、服务监测(待实现):
4.1、多连接:(✅ 已实现)
##### 4、服务监测(未完成):
4.1、多连接(✅ 已实现):
可支持连接多台服务器
4.2、日志审计:✅ 已实现)
4.2、日志审计(✅ 已实现):
- 根据平台类型持续审计日志信息在某个时间段是否存在暴涨的情况:
每次采样获取日志总行数 total_lines 和采样时间 now_ts。
与上次采样 last_total、last_ts 比较,得到 elapsed 和 delta_lines。
......@@ -58,13 +58,16 @@
路径:/var/www/java/api-java-meeting2.0
日志打印:tail -f logs/ubains-INFO-AND-ERROR.log
4.3、内存资源消耗:
4.3、内存资源消耗(✅ 已实现)
根据平台类型持续监测服务器内存占用情况,记录峰值与峰值时的时间点,以及记录平均值。
4.4、mysql连接数:
4.4、mysql连接数(✅ 已实现)
根据平台类型持续监测mysql连接数在某个时间段是否存在暴涨,或是异常一直没断开的连接数。
先检查mysql容器名称,然后通过进入mysql容器内部进行查询
4.5、emqx连接数:
根据平台类型持续监测EMQX连接数量峰值、平均值,以及是否存在暴涨情况,或是判断一直没断开的异常连接。
##### 5、监测日志审计:
需要丰富日志体系,日志需要用中文打印
\ No newline at end of file
##### 5、监测日志审计(✅ 已实现):
需要丰富日志体系,日志需要用中文打印
##### 6、对接钉钉消息发送:
钉钉函数参考:
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论