提交 b829312c authored 作者: 陈泽健's avatar 陈泽健

refactor(scheduler): 重构定时任务调度器实现异步执行和时间段控制

- 添加工作线程池异步执行任务队列
- 实现时间段禁用功能避免特定时间执行任务
- 优化日志记录格式和错误处理机制
- 重构定时任务配置结构化管理
- 添加展厅巡检工作日定时执行逻辑
- 统一任务执行入口和参数传递方式
上级 247608b6
import schedule
import datetime
import logging
import queue
from Base.base import *
import threading
import time
import logging
import schedule
from Base.base import run_automation_test
# ==============================
# 日志配置(建议你按需改级别/输出位置)
# ==============================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
if getattr(sys, 'frozen', False):
base_dir = os.path.dirname(sys.executable)
else:
base_dir = os.path.dirname(os.path.abspath(__file__))
cases_dir = os.path.join(base_dir, 'cases')
if not os.path.isdir(cases_dir):
print("cases_dir 路径为:", cases_dir)
raise FileNotFoundError("请将 cases 目录放在程序同级目录下!")
"""
调试主机-执行指令:
1.打开一个终端输入:
- cd .\预定系统\
- python -m http.server 83 --directory reports
2.打开新终端输入:
- cd .\预定系统\ngrok\
- .\start.bat
3.再打开一个终端输入:
- cd .\预定系统\
- python .\定时执行功能测试.py
"""
import os
import sys
def get_resource_path(relative_path):
"""
获取打包后的资源文件真实路径(适用于 PyInstaller)
:param relative_path: 相对于打包时指定的路径
:return: 真实路径字符串
"""
if getattr(sys, 'frozen', False): # 是否被打包成 .exe
base_path = sys._MEIPASS
else:
base_path = os.path.dirname(os.path.abspath(__file__))
return os.path.join(base_path, relative_path)
# ==============================
# 任务队列 + 工作线程
# ==============================
task_queue: "queue.Queue[tuple]" = queue.Queue()
# 创建一个任务队列,用于存储待处理的任务
task_queue = queue.Queue()
def run_task(task, *args, **kwargs):
# 将任务及其参数放入任务队列
def run_task(task, *args, **kwargs) -> None:
"""把任务放进队列,由 worker 异步执行"""
task_queue.put((task, args, kwargs))
logging.debug(f"任务已加入队列: {task.__name__} with args: {args} and kwargs: {kwargs}")
logging.debug("任务已加入队列: %s args=%s kwargs=%s", getattr(task, "__name__", str(task)), args, kwargs)
def worker():
# 工作线程的主循环
def worker() -> None:
"""工作线程:不断从队列取任务执行"""
while True:
# 从任务队列中获取任务及其参数
task, args, kwargs = task_queue.get()
try:
# 记录任务开始执行的时间
logging.debug(f"开始执行任务: {task.__name__} with args: {args} and kwargs: {kwargs}")
# 执行任务并获取结果
logging.info("开始执行任务: %s", getattr(task, "__name__", str(task)))
result = task(*args, **kwargs)
# 如果任务有返回结果,记录日志
if result:
logging.info(result)
logging.info("任务返回: %s", result)
except Exception as e:
# 捕获任务执行过程中发生的任何异常并记录错误日志
logging.error(f"执行任务时发生错误: {e}", exc_info=True)
logging.error("执行任务时发生错误: %s", e, exc_info=True)
finally:
# 无论任务是否成功执行,都标记任务已完成
task_queue.task_done()
# 记录任务完成的时间
logging.debug(f"任务完成: {task.__name__}")
logging.debug("任务完成: %s", getattr(task, "__name__", str(task)))
def start_workers(num_workers):
# 启动指定数量的工作线程
def start_workers(num_workers: int) -> None:
for _ in range(num_workers):
# 创建一个新的工作线程,目标函数为 worker,设置为守护线程
threading.Thread(target=worker, daemon=True).start()
# 启动3个工作线程
start_workers(3)
# 定时执行预定系统测试任务
schedule.every().day.at("10:00").do(run_task, run_automation_test, report_title="预定系统测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="预定系统测试", ding_type="标准版巡检")
# 定时每一小时执行一次预定快速测试任务
schedule.every(0.5).hours.do(run_task, run_automation_test, report_title="预定系统快速测试测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="预定系统快速测试", ding_type="标准版巡检")
# ==============================
# 时间段控制(禁用窗口)
# ==============================
def is_in_time_range(now: datetime.time, start: datetime.time, end: datetime.time) -> bool:
"""判断 now 是否在 [start, end) 区间内(不跨天场景)"""
return start <= now < end
# 定时执行展厅巡检任务
# schedule.every().day.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().monday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().thursday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().wednesday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().tuesday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
schedule.every().friday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
# schedule.every().saturday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
# schedule.every().sunday.at("07:42").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31136", test_case="展厅巡检", ding_type="展厅巡检")
def run_task_if_allowed(task, *args, block_ranges=None, **kwargs) -> None:
"""
在指定时间段内跳过任务(不入队)
try:
# 无限循环,持续检查并执行计划任务
block_ranges: [(start_time, end_time), ...]
e.g. [(time(7,0), time(9,0))]
"""
now_time = datetime.datetime.now().time()
if block_ranges:
for start_t, end_t in block_ranges:
if is_in_time_range(now_time, start_t, end_t):
logging.info(
"任务已跳过(禁用时段 %s~%s): %s",
start_t.strftime("%H:%M"),
end_t.strftime("%H:%M"),
getattr(task, "__name__", str(task)),
)
return
run_task(task, *args, **kwargs)
# ==============================
# Schedule 配置
# ==============================
REPORT_URL_PREFIX = "http://nat.ubainsyun.com:31136"
def setup_schedules() -> None:
# 1) 每天 10:00 执行“预定系统测试”
schedule.every().day.at("10:00").do(
run_task,
run_automation_test,
report_title="预定系统测试报告",
report_url_prefix=REPORT_URL_PREFIX,
test_case="预定系统测试",
ding_type="标准版巡检",
)
# 2) 每半小时执行一次“预定快速测试”(07:00~09:00 不执行)
schedule.every(30).minutes.do(
run_task_if_allowed,
run_automation_test,
report_title="预定系统快速测试报告",
report_url_prefix=REPORT_URL_PREFIX,
test_case="预定系统快速测试",
ding_type="标准版巡检",
block_ranges=[(datetime.time(7, 0), datetime.time(9, 0))],
)
# 3) 展厅巡检(工作日 07:42)
for weekday in ("monday", "tuesday", "wednesday", "thursday", "friday"):
getattr(schedule.every(), weekday).at("07:42").do(
run_task,
run_automation_test,
report_title="展厅巡检测试报告",
report_url_prefix=REPORT_URL_PREFIX,
test_case="展厅巡检",
ding_type="展厅巡检",
)
def main() -> None:
# 启动工作线程
start_workers(3)
# 注册定时任务
setup_schedules()
logging.info("Scheduler started. Running pending jobs...")
try:
while True:
schedule.run_pending() # 检查并执行所有待处理的任务
time.sleep(1) # 每秒检查一次
except KeyboardInterrupt:
# 捕获用户中断信号 (Ctrl+C)
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logging.info("Scheduler interrupted by user.")
except Exception as e:
# 捕获其他未预期的异常
logging.error(f"Unexpected error: {e}", exc_info=True)
\ No newline at end of file
except Exception as e:
logging.error("Unexpected error: %s", e, exc_info=True)
if __name__ == "__main__":
main()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论