import schedule import threading import queue from Base.base import * import time import logging """ 调试主机-执行指令: 1.打开一个终端输入: - cd .\预定系统\ - python -m http.server 80 --directory reports 2.打开新终端输入: - cd .\预定系统\ngrok\ngrok-调试主机\ - .\start.bat 3.再打开一个终端输入: - cd .\预定系统\ - python .\定时执行功能测试.py 自动化运行虚拟机-执行指令: 1.打开一个终端输入: - cd .\预定系统\ - python -m http.server 81 --directory reports 2.打开新终端输入: - cd .\预定系统\ngrok\ngrok-自动化运行虚拟机 - .\start.bat 3.再打开一个终端输入: - cd .\预定系统\ - python .\定时执行功能测试.py """ # 创建一个任务队列,用于存储待处理的任务 task_queue = queue.Queue() def run_task(task, *args, **kwargs): # 将任务及其参数放入任务队列 task_queue.put((task, args, kwargs)) logging.debug(f"任务已加入队列: {task.__name__} with args: {args} and kwargs: {kwargs}") def worker(): # 工作线程的主循环 while True: # 从任务队列中获取任务及其参数 task, args, kwargs = task_queue.get() try: # 记录任务开始执行的时间 logging.debug(f"开始执行任务: {task.__name__} with args: {args} and kwargs: {kwargs}") # 执行任务并获取结果 result = task(*args, **kwargs) # 如果任务有返回结果,记录日志 if result: logging.info(result) except Exception as e: # 捕获任务执行过程中发生的任何异常并记录错误日志 logging.error(f"执行任务时发生错误: {e}", exc_info=True) finally: # 无论任务是否成功执行,都标记任务已完成 task_queue.task_done() # 记录任务完成的时间 logging.debug(f"任务完成: {task.__name__}") def start_workers(num_workers): # 启动指定数量的工作线程 for _ in range(num_workers): # 创建一个新的工作线程,目标函数为 worker,设置为守护线程 threading.Thread(target=worker, daemon=True).start() # 启动3个工作线程 start_workers(3) # 定时执行预定系统测试任务1 # schedule.every().day.at("15:33").do(run_task, run_automation_test, report_title="预定系统测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="预定系统功能", ding_type="标准版巡检") # 获取本机IP地址 # local_ip = get_local_ip() # logging.info(f"本机IP地址: {local_ip}") # # 更新ngrok.cfg文件 # ngrok_config_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\ngrok\ngrok-调试主机\ngrok.cfg' # update_ngrok_config(ngrok_config_path, local_ip) # # # 启动ngrok # ngrok_path = r'D:\GithubData\自动化\ubains-module-test\预定系统\ngrok\ngrok-调试主机\ngrok.exe' # start_ngrok(ngrok_path, ngrok_config_path) # 定时执行展厅巡检任务 # schedule.every().day.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检") schedule.every().monday.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检") schedule.every().thursday.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检") schedule.every().wednesday.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检") schedule.every().tuesday.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检") schedule.every().friday.at("07:45").do(run_task, run_automation_test, report_title="展厅巡检测试报告", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="展厅巡检", ding_type="展厅巡检") # 调试使用 # schedule.every().day.at("08:44").do(run_task, run_automation_test, report_title="展厅调试", report_url_prefix="http://nat.ubainsyun.com:31133", test_case="腾讯会议", ding_type="展厅巡检") try: # 无限循环,持续检查并执行计划任务 while True: schedule.run_pending() # 检查并执行所有待处理的任务 time.sleep(1) # 每秒检查一次 except KeyboardInterrupt: # 捕获用户中断信号 (Ctrl+C) logging.info("Scheduler interrupted by user.") except Exception as e: # 捕获其他未预期的异常 logging.error(f"Unexpected error: {e}", exc_info=True)