提交 a1ec7d2d authored 作者: 彭甘宇's avatar 彭甘宇

实现模拟多用户同时登录网页功能,解决浏览器多驱动冲突问题。

上级 bb083bab
<changelist name="更改11" date="1746580098209" recycled="false">
<option name="PATH" value="$PROJECT_DIR$/.idea/shelf/更改11/shelved.patch" />
<option name="DESCRIPTION" value="更改" />
<binary>
<option name="BEFORE_PATH" value="统一平台/log/testresult.log.2" />
</binary>
</changelist>
\ No newline at end of file
import sys
import os
from time import sleep
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建统一平台的绝对路径
platform_path = os.path.abspath(os.path.join(current_dir, '..','..','..'))
# 添加路径
sys.path.append(platform_path)
# 导入模块
try:
from 统一平台.base.bases import *
except ModuleNotFoundError as e:
print(f"ModuleNotFoundError: {e}")
print("尝试使用绝对路径导入")
from 统一平台.base.bases import *
# def suite_setup():
# STEP(1, "初始化浏览器")
# browser_init("统一平台")
# def suite_teardown():
# browser_quit()
\ No newline at end of file
from concurrent.futures import ThreadPoolExecutor
from 统一平台.base.bases import *
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
platform_path = os.path.abspath(os.path.join(current_dir, '..','..','..'))
sys.path.append(platform_path)
# 自定义的浏览器实例列表
_browser_instances = []
# ========== 可配置参数 ==========
LOOP_INTERVAL = 30 # 每隔多少秒执行一次
MAX_LOOP_COUNT = 10 # 最大执行次数(设为 None 表示无限循环)
# 封装单个浏览器的登录流程为一个函数
def login_task(task_id):
global _browser_instances
wd = None
try:
wd = browser_init('统一平台')
if wd:
_browser_instances.append(wd)
STEP(1, f'任务 {task_id} - 输入手机号')
safe_send_keys((By.XPATH, "//input[@placeholder='手机号/用户名/邮箱']"), "admin@pgy", wd)
STEP(2, f'任务 {task_id} - 输入密码')
safe_send_keys((By.XPATH, "//input[@placeholder='密码']"), "Ubains@1357", wd)
STEP(3, f'任务 {task_id} - 输入图形验证码')
safe_send_keys((By.XPATH, "//input[@placeholder='图形验证']"), "csba", wd)
STEP(4, f'任务 {task_id} - 点击同意协议')
safe_click((By.XPATH, "//span[@class='el-checkbox__inner']"), wd)
STEP(5, f'任务 {task_id} - 点击登录按钮')
safe_click((By.XPATH, "//div[@id='pane-1']//div//span[contains(text(),'登录')]"), wd)
finally:
# wd.quit() # 如果不关闭浏览器,可注释掉此行
pass
# 关闭打开的全部浏览器窗口
def close_all_browsers():
global _browser_instances
for wd in _browser_instances:
try:
if wd:
wd.quit()
except Exception as e:
print(f"关闭浏览器失败: {e}")
_browser_instances.clear()
print("所有浏览器已关闭")
class Unified_Platform_0001:
tags = ['登录平台']
def teststeps(self):
loop_count = 0
while MAX_LOOP_COUNT is None or loop_count < MAX_LOOP_COUNT:
print(f"\n【第 {loop_count + 1} 次循环开始】")
max_workers = 2
task_count = 2
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(login_task, range(task_count))
# 手动关闭所有浏览器
# close_all_browsers()
loop_count += 1
if MAX_LOOP_COUNT is not None and loop_count >= MAX_LOOP_COUNT:
break
print(f"等待 {LOOP_INTERVAL} 秒后再次执行...")
time.sleep(LOOP_INTERVAL)
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论