提交 a74e1bdc authored 作者: 陈泽健's avatar 陈泽健

增加LogCollector类实现从服务器获取日志,在测试步骤开始前进行获取,到步骤测试结束后获取结束,输出为日志文本文件。

上级 024d3ca5
...@@ -1937,6 +1937,10 @@ def write_xlsx_data(xlsx_file_path, sheet_name, function_number, test_result, lo ...@@ -1937,6 +1937,10 @@ def write_xlsx_data(xlsx_file_path, sheet_name, function_number, test_result, lo
import paramiko import paramiko
import time import time
import paramiko
import paramiko
def get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path, num_lines=1000, timeout=30, filter_word=None): def get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path, num_lines=1000, timeout=30, filter_word=None):
""" """
使用 Paramiko 获取远程服务器的日志文件内容,并通过过滤词过滤日志内容. 使用 Paramiko 获取远程服务器的日志文件内容,并通过过滤词过滤日志内容.
...@@ -1955,9 +1959,20 @@ def get_remote_log_with_paramiko(host, username, private_key_path, passphrase, l ...@@ -1955,9 +1959,20 @@ def get_remote_log_with_paramiko(host, username, private_key_path, passphrase, l
str: 获取的日志内容,如果出错返回 None. str: 获取的日志内容,如果出错返回 None.
""" """
try: try:
private_key = paramiko.RSAKey.from_private_key_file(private_key_path, password=passphrase) print(f"Loading private key from {private_key_path}...")
if passphrase:
print(f"passphrase为:{passphrase}")
print(f"private_key_path:{private_key_path}")
private_key = paramiko.RSAKey.from_private_key_file(private_key_path, password=passphrase)
else:
private_key = paramiko.RSAKey.from_private_key_file(private_key_path)
print(f"Private key loaded successfully from {private_key_path}")
client = paramiko.SSHClient() client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
print(f"Connecting to {host} as {username}...")
client.connect(host, username=username, pkey=private_key, timeout=timeout) client.connect(host, username=username, pkey=private_key, timeout=timeout)
command = f"tail -n {num_lines} {log_path}" command = f"tail -n {num_lines} {log_path}"
...@@ -1982,22 +1997,147 @@ def get_remote_log_with_paramiko(host, username, private_key_path, passphrase, l ...@@ -1982,22 +1997,147 @@ def get_remote_log_with_paramiko(host, username, private_key_path, passphrase, l
return output return output
except paramiko.ssh_exception.PasswordRequiredException:
print("Error: The private key file is encrypted but no passphrase was provided.")
return None
except paramiko.ssh_exception.SSHException as e:
print(f"SSH Error: {e}")
return None
except Exception as e: except Exception as e:
print(f"An error occurred: {e}") print(f"An error occurred: {e}")
return None return None
finally: finally:
client.close() if 'client' in locals():
client.close()
if __name__ == "__main__":
host = "192.168.5.218" # 替换为你的服务器 IP 或域名
username = "root" # 替换为你的用户名
# private_key_path = "C:\\Users\\29194\\.ssh\\id_rsa" # 替换为你的私钥文件路径
private_key_path = "C:/Users/29194/.ssh/id_rsa"
passphrase = "Ubains@123" # 确保这是你的私钥文件的正确 passphrase
log_path = "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log" # 替换为你要读取的日志文件路径
filter_word = "" # 替换为你想要过滤的关键字
log_content = get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path)
if log_content:
print(log_content)
else:
print("Failed to retrieve log content.")
# if __name__ == "__main__":
# host = "192.168.5.218"
# username = "root"
# private_key_path = "C:\\Users\\29194\\.ssh\\id_rsa" # 替换为你的私钥文件路径
# passphrase = "Ubains@123" # 替换为你的 passphrase import paramiko
# log_path = "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log" import threading
# filter_word = "桌牌" import time
# log_content = get_remote_log_with_paramiko(host, username, private_key_path, passphrase, log_path, filter_word=filter_word)
# class LogCollector:
# if log_content: def __init__(self, host, username, private_key_path, passphrase, log_path, timeout=30, keepalive_interval=60):
# print(log_content) self.host = host
# else: self.username = username
# print("Failed to retrieve log content.") self.private_key_path = private_key_path
\ No newline at end of file self.passphrase = passphrase
self.log_path = log_path
self.timeout = timeout
self.keepalive_interval = keepalive_interval # 保持连接活跃的时间间隔(秒)
self.client = None
self.channel = None
self.log_content = []
self.collecting = False
self.lock = threading.Lock()
self.thread = None
def connect(self):
try:
private_key = paramiko.RSAKey.from_private_key_file(self.private_key_path, password=self.passphrase)
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(self.host, username=self.username, pkey=private_key, timeout=self.timeout)
print(f"Connected to {self.host} as {self.username}")
except Exception as e:
print(f"Failed to connect: {e}")
self.client = None
def start_collection(self):
if self.collecting:
print("Already collecting logs.")
return
self.connect()
if self.client is None:
print("Failed to start collection due to connection failure.")
return
self.channel = self.client.invoke_shell()
command = f"tail -f {self.log_path}\n"
self.channel.send(command)
self.collecting = True
self.thread = threading.Thread(target=self.collect_logs)
self.thread.start()
print("Log collection started.")
def stop_collection(self, output_file):
if not self.collecting:
print("Not collecting logs.")
return
self.collecting = False
if self.channel:
self.channel.close()
if self.client:
self.client.close()
print("Log collection stopped.")
with open(output_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(self.log_content))
print(f"Log content saved to {output_file}")
def collect_logs(self):
try:
while self.collecting:
if self.channel.recv_ready():
output = self.channel.recv(1024).decode('utf-8')
with self.lock:
self.log_content.append(output.strip())
print(output.strip())
else:
time.sleep(0.1) # 避免CPU占用过高
# 发送keepalive包以保持连接活跃
self.client.get_transport().send_ignore()
time.sleep(self.keepalive_interval)
except Exception as e:
print(f"Error in collect_logs: {e}")
self.collecting = False
finally:
if self.channel:
self.channel.close()
if self.client:
self.client.close()
print("Log collection thread terminated.")
# 使用示例
if __name__ == "__main__":
host = "192.168.5.218"
username = "root"
private_key_path = "C:\\Users\\29194\\.ssh\\id_rsa" # 替换为你的私钥文件路径
passphrase = "Ubains@123" # 替换为你的 passphrase
log_path = "/var/www/java/api-java-meeting2.0/logs/ubains-INFO-AND-ERROR.log"
output_file = "collected_logs.txt"
collector = LogCollector(host, username, private_key_path, passphrase, log_path)
# 开始收集日志
collector.start_collection()
# 假设一段时间后停止收集日志
try:
while True:
time.sleep(1) # 保持主程序运行
except KeyboardInterrupt:
print("Stopping log collection due to user interrupt.")
collector.stop_collection(output_file)
print("日志收集完成!")
\ No newline at end of file
...@@ -292,4 +292,6 @@ ...@@ -292,4 +292,6 @@
- 修复因xlsx文件读取失败导致的报错。 - 修复因xlsx文件读取失败导致的报错。
82. 2025-04-18 82. 2025-04-18
- 修复展厅巡检预定系统因页面bug导致的异常定位报错。处理优化。 - 修复展厅巡检预定系统因页面bug导致的异常定位报错。处理优化。
- 优化get_remote_log_with_paramiko函数,增加filter_word过滤词打印日志。 - 优化get_remote_log_with_paramiko函数,增加filter_word过滤词打印日志。
\ No newline at end of file 83. 2025-04-27
- 增加LogCollector类实现从服务器获取日志,在测试步骤开始前进行获取,到步骤测试结束后获取结束,输出为日志文本文件。
\ No newline at end of file
...@@ -90,6 +90,8 @@ schedule.every().thursday.at("07:42").do(run_task, run_automation_test, report_t ...@@ -90,6 +90,8 @@ schedule.every().thursday.at("07:42").do(run_task, run_automation_test, report_t
schedule.every().wednesday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检") schedule.every().wednesday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().tuesday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检") schedule.every().tuesday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().friday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检") schedule.every().friday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检")
# schedule.every().saturday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检")
# schedule.every().sunday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检")
# 调试使用 # 调试使用
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论