提交 6c258a0a authored 作者: 陈泽健's avatar 陈泽健

兰州中石化项目输出议题申报流程的JSON数据,调试自动化运行。

上级 7a9d4d6f
/预定系统/log
/预定系统/reports/
/预定系统/reports/imgs/
/预定系统/reports/imgs/
\ No newline at end of file
# 一、环境运行
- python3.10
- 需要手动安装库:pip install hytest
- 若切换电脑环境后需要输入指令: 将本地库导入到外部库中。
- # 找到 site-packages 路径
python -c "import site; print(site.getsitepackages())"
- # 复制你的库到其中
cp -r hytest /path/to/site-packages/
# 二、项目说明
- 模块以文件夹形式
......
from .common import signal,GSTORE,INFO,STEP,CHECK_POINT,LOG_IMG,SELENIUM_LOG_SCREEN
import os, sys
sys.path.append(os.getcwd())
\ No newline at end of file
supportedLang = ['zh','en']
class l:
LANGS = {
'zh' : 0,
'en' : 1,
}
n = None # 当前使用的语言编号
import sys
if '--lang' in sys.argv:
try:
idx = sys.argv.index('--lang')
lang = sys.argv[idx+1]
if lang in supportedLang:
l.n = l.LANGS[lang]
except:...
if l.n is None:
import locale
if 'zh_CN' in locale.getdefaultlocale():
l.n = l.LANGS['zh']
else :
l.n = l.LANGS['en']
LANG_TABLE = {
'测试报告' : ['测试报告','Test Report'],
'指定测试报告标题' : ['指定测试报告标题','set test report title'],
}
# 返回当前使用语言字符串
def ls(lookupStr):
return LANG_TABLE[lookupStr][l.n]
class Settings:
auto_open_report = True
report_title = '' # 命令行参数会设置,并且有缺省值
report_url_prefix = '' # 命令行参数会设置,并且有缺省值
\ No newline at end of file
from .utils.signal import signal
from .utils.runner import Runner
from datetime import datetime
from .cfg import l
class _GlobalStore:
def __getitem__(self, key, default=None):
if hasattr(self, key):
return getattr(self, key)
else:
return default
def __setitem__(self,key,value):
setattr(self, key, value )
get = __getitem__
# used for storing global shared data
GSTORE = _GlobalStore()
def INFO(info):
"""
print information in log and report.
This will not show in terminal window.
Parameters
----------
info : object to print
"""
signal.info(f'{info}')
def STEP(stepNo:int,desc:str):
"""
print information about test steps in log and report .
This will not show in terminal window.
Parameters
----------
stepNo : step number
desc : description about this step
"""
signal.step(stepNo,desc)
def CHECK_POINT(desc:str, condition, failStop=True, failLogScreenWebDriver = None):
"""
check point of testing.
pass or fail of this check point depends on argument condition is true or false.
it will print information about check point in log and report.
Parameters
----------
desc : check point description, like check what.
condition : usually it's a bool expression, like `a==b`,
so actually, after evaluating the expression, it's a result bool object passed in .
failStop : switch for whether continue this test cases when the condition is false
failLogScreenWebDriver : Selenium web driver object,
when you want a screenshot image of browser in test report if current check point fail.
"""
if condition:
signal.checkpoint_pass(desc)
else:
signal.checkpoint_fail(desc)
# 如果需要截屏
if failLogScreenWebDriver is not None:
SELENIUM_LOG_SCREEN(failLogScreenWebDriver)
# 记录下当前执行结果为失败
Runner.curRunningCase.execRet='fail'
Runner.curRunningCase.error=('检查点不通过','checkpoint failed')[l.n]
Runner.curRunningCase.stacktrace="\n"*3+('具体错误看测试步骤检查点','see checkpoint of case for details')[l.n]
# 如果失败停止,中止此测试用例
if failStop:
raise AssertionError()
def LOG_IMG(imgPath: str, width: str = None):
"""
add image in test report
Parameters
----------
imgPath: the path of image
width: display width of image in html, like 50% / 800px / 30em
"""
signal.log_img(imgPath, width)
def SELENIUM_LOG_SCREEN(driver, width: str = None, module_name: str = None, function_name: str = None, stepname: str = None):
"""
add screenshot image of browser into test report when using Selenium
在日志中加入selenium控制的 浏览器截屏图片
Parameters
----------
driver: selenium webdriver
width: display width of image in html, like 50% / 800px / 30em
module_name: 传入模块名称,截图会在log/imgs下创建对应目录进行存放。
function_name: 传入功能名称,截图文件放在指定目录下。
stepname: 传入步骤名称,截图以步骤名称+时间戳进行命名。
"""
# 当未指定模块名称、功能名称和步骤名称时的处理逻辑
if module_name == None and function_name == None and stepname == None:
# 使用当前时间作为文件名,确保唯一性
filename = datetime.now().strftime('%Y%m%d%H%M%S%f')
# 定义文件路径和相对于log的路径
filepath = f'log/imgs/{filename}.png'
filepath_relative_to_log = f'imgs/{filename}.png'
# 获取并保存截图
driver.get_screenshot_as_file(filepath)
# 使用signal记录截图到日志中,并指定宽度
signal.log_img(filepath_relative_to_log, width)
else:
# 当指定了模块名称、功能名称和步骤名称时的处理逻辑
filename = str(stepname)
print(filename)
# 定义文件路径和相对于log的路径,根据模块和功能名称创建目录
filepath = f'reports/imgs/{module_name}/{function_name}/{filename}.png'
filepath_relative_to_log = f'imgs/{module_name}/{function_name}/{filename}.png'
# 获取并保存截图
driver.get_screenshot_as_file(filepath)
# 使用signal记录截图到日志中,并指定宽度
signal.log_img(filepath_relative_to_log, width)
\ No newline at end of file
version= '0.8.12'
\ No newline at end of file
import re, os, traceback
from .cfg import l, Settings
import argparse
from .product import version
def tagExpressionGen(argstr):
tagRules = []
for part in argstr:
# 有单引号,是表达式
if "'" in part:
rule = re.sub(r"'.+?'", lambda m: f'tagmatch({m.group(0)})', part)
tagRules.append(f'({rule})')
# 是简单标签名
else:
rule = f"tagmatch('{part}')"
tagRules.append(f'{rule}')
return ' or '.join(tagRules)
def run():
parser = argparse.ArgumentParser()
parser.add_argument('--version', action='version', version=f'hytest v{version}',
help=("显示版本号", 'display hytest version')[l.n])
parser.add_argument('--lang', choices=['zh', 'en'],
help=("设置工具语言", 'set language')[l.n])
parser.add_argument('--new', metavar='project_dir',
help=("创建新项目目录", "create a project folder")[l.n])
parser.add_argument("case_dir", nargs='?', default='cases',
help=("用例根目录", "")[l.n])
parser.add_argument("--loglevel", metavar='Level_Number', type=int, default=3,
help=
("日志级别 0,1,2,3,4,5(数字越大,日志越详细)", "log level 0,1,2,3,4,5(bigger for more info)")[
l.n])
parser.add_argument('--auto_open_report', choices=['yes', 'no'], default='yes',
help=("测试结束不自动打开报告", "don't open report automatically after testing")[l.n])
parser.add_argument("--report_title", metavar='Report_Title',
default=['测试报告', 'Test Report'][l.n],
help=['指定测试报告标题', 'set test report title'][l.n])
parser.add_argument("--report_url_prefix", metavar='Url_Prefix',
default='',
help=['测试报告URL前缀', 'test report URL prefix'][l.n])
parser.add_argument("--test", metavar='Case_Name', action='append', default=[],
help=("用例名过滤,支持通配符", "filter by case name")[l.n])
parser.add_argument("--suite", metavar='Suite_Name', action='append', default=[],
help=("套件名过滤,支持通配符", "filter by suite name")[l.n])
parser.add_argument("--tag", metavar='Tag_Expression', action='append', default=[],
help=("标签名过滤,支持通配符", "filter by tag name")[l.n])
parser.add_argument("--tagnot", metavar='Tag_Expression', action='append', default=[],
help=("标签名反向过滤,支持通配符", "reverse filter by tag name")[l.n])
parser.add_argument("-A", "--argfile", metavar='Argument_File',
type=argparse.FileType('r', encoding='utf8'),
help=("使用参数文件", "use argument file")[l.n])
args = parser.parse_args()
# 有参数放在文件中,必须首先处理
if args.argfile:
fileArgs = [para for para in args.argfile.read().replace('\n', ' ').split() if para]
print(fileArgs)
args = parser.parse_args(fileArgs, args)
# 看命令行中是否设置了语言
if args.lang:
l.n = l.LANGS[args.lang]
# 报告标题
Settings.report_title = args.report_title
# 测试结束后,是否自动打开测试报告
Settings.auto_open_report = True if args.auto_open_report == 'yes' else False
# 测试结束后,要显示的测试报告的url前缀,比如: run.bat --report_url_prefix http://127.0.0.1
# 可以本机启动http服务,比如:python -m http.server 80 --directory log
# 方便 jenkins上查看
Settings.report_url_prefix = args.report_url_prefix
# 创建项目目录
if args.new:
projDir = args.new
if os.path.exists(projDir):
print(f'{projDir} already exists!')
exit(2)
os.makedirs(f'{projDir}/cases')
with open(f'{projDir}/cases/case1.py', 'w', encoding='utf8') as f:
caseContent = [
'''class c1:
name = '用例名称 - 0001'
# 测试用例步骤
def teststeps(self):
ret = 1
''',
'''class c1:
name = 'test case name - 0001'
# test case steps
def teststeps(self):...''',
][l.n]
f.write(caseContent)
exit()
if not os.path.exists(args.case_dir):
print(
f' {args.case_dir} {("目录不存在,工作目录为:", "folder not exists, workding dir is:")[l.n]} {os.getcwd()}')
exit(2) # '2' stands for no test cases to run
if not os.path.isdir(args.case_dir):
print(f' {args.case_dir} {("不是目录,工作目录为:", "is not a folder, workding dir is:")[l.n]} {os.getcwd()}')
exit(2) # '2' stands for no test cases to run
# 同时执行log里面的初始化日志模块,注册signal的代码
from .utils.log import LogLevel
from .utils.runner import Collector, Runner
LogLevel.level = args.loglevel
# print('loglevel',LogLevel.level)
# --tag "'冒烟测试' and 'UITest' or (not '快速' and 'fast')" --tag 白月 --tag 黑羽
tag_include_expr = tagExpressionGen(args.tag)
tag_exclude_expr = tagExpressionGen(args.tagnot)
# print(tag_include_expr)
# print(tag_exclude_expr)
print(f'''
* * * * * * * * * * * * * * * * * *
* hytest {version} www.byhy.net *
* * * * * * * * * * * * * * * * * *
'''
)
os.makedirs('log/imgs', exist_ok=True)
try:
Collector.run(
casedir=args.case_dir,
suitename_filters=args.suite,
casename_filters=args.test,
tag_include_expr=tag_include_expr,
tag_exclude_expr=tag_exclude_expr,
)
except:
print(traceback.format_exc())
print(('\n\n!! 搜集用例时发现代码错误,异常终止 !!\n\n',
'\n\n!! Collect Test Cases Exception Aborted !!\n\n')[l.n])
exit(3)
# 0 表示执行成功 , 1 表示有错误 , 2 表示没有可以执行的用例
result = Runner.run()
# keep 10 report files at most
ReportFileNumber = 10
import glob
reportFiles = glob.glob('./log/report_*.html')
fileNum = len(reportFiles)
if fileNum >= ReportFileNumber:
reportFiles.sort()
for rf in reportFiles[:fileNum - ReportFileNumber]:
try:
os.remove(rf)
except:
...
return result
if __name__ == '__main__':
exit(run())
\ No newline at end of file
import logging, os, time, traceback, platform
import shutil
from logging.handlers import RotatingFileHandler
from rich.console import Console
from rich.theme import Theme
from hytest.product import version
from datetime import datetime
from hytest.common import GSTORE
from .runner import Collector
from ..cfg import l, Settings
os.makedirs('log', exist_ok=True)
# 日志文件
logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
logFile = os.path.join('log', 'testresult.log')
handler = RotatingFileHandler(
logFile,
maxBytes=1024 * 1024 * 30,
backupCount=2,
encoding='utf8')
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt='%(message)s')
handler.setFormatter(formatter)
handler.doRollover() # 每次启动创建一个新log文件,而不是从原来的基础上继续添加
logger.addHandler(handler)
percent = ''
# # 重定向stdout,改变print行为,同时写屏和日志
# import sys
# class MyPrintClass:
# def __init__(self):
# self.console = sys.stdout
# def write(self, message):
# self.console.write(message)
# logger.info(message)
# def flush(self):
# self.console.flush()
# # self.file.flush()
# sys.stdout = MyPrintClass()
console = Console(theme=Theme(inherit=False))
print = console.print
class LogLevel:
"""
here, we use different log level numbers with Python logging lib
CRITICAL - 0
ERROR - 1
WARNING - 2
INFO - 3
DEBUG - 4
ALL - 5
"""
level = 3
class Stats:
def test_start(self, _title='Test Report'):
self.result = {
# 这是准备执行的用例数量
'case_count_to_run': Collector.case_number,
# 这个是实际执行的用例数量,可能有其他的用例因为初始化失败没有执行
'case_count': 0,
'case_pass': 0,
'case_fail': 0,
'case_abort': 0,
'suite_setup_fail': 0,
'case_setup_fail': 0,
'suite_teardown_fail': 0,
'case_teardown_fail': 0,
'case_pass_list': [],
'case_fail_list': [],
'case_abort_list': [],
}
self.start_time = time.time()
def test_end(self, runner):
self.end_time = time.time()
self.test_duration = self.end_time - self.start_time
if self.result['case_fail'] or \
self.result['case_abort'] or \
self.result['suite_setup_fail'] or \
self.result['case_setup_fail'] or \
self.result['suite_teardown_fail'] or \
self.result['case_teardown_fail']:
GSTORE['---ret---'] = 1
else:
GSTORE['---ret---'] = 0
def enter_case(self, caseId, name, case_className):
self.result['case_count'] += 1
def case_result(self, case):
if case.execRet == 'pass':
self.result['case_pass'] += 1
elif case.execRet == 'fail':
self.result['case_fail'] += 1
elif case.execRet == 'abort':
self.result['case_abort'] += 1
# utype 可能是 suite case case_default
def setup_fail(self, name, utype, e, stacktrace):
if utype == 'suite':
self.result['suite_setup_fail'] += 1
else:
self.result['case_setup_fail'] += 1
def teardown_fail(self, name, utype, e, stacktrace):
if utype == 'suite':
self.result['suite_teardown_fail'] += 1
else:
self.result['case_teardown_fail'] += 1
stats = Stats()
class ConsoleLogger:
def test_end(self, runner):
ret = stats.result
print((f'\n\n ========= 测试耗时 : {stats.test_duration:.3f} 秒 =========\n',
f'\n\n ========= Duration Of Testing : {stats.test_duration:.3f} seconds =========\n')[l.n])
print(f"\n {('预备执行用例数量', 'number of cases plan to run')[l.n]} : {ret['case_count_to_run']}")
print(f"\n {('实际执行用例数量', 'number of cases actually run')[l.n]} : {ret['case_count']}")
print(f"\n {('通过', 'passed')[l.n]} : {ret['case_pass']}", style='green')
num = ret['case_fail']
style = 'white' if num == 0 else 'bright_red'
print(f"\n {('失败', 'failed')[l.n]} : {num}", style=style)
num = ret['case_abort']
style = 'white' if num == 0 else 'bright_red'
print(f"\n {('异常', 'exception aborted')[l.n]} : {num}", style=style)
num = ret['suite_setup_fail']
style = 'white' if num == 0 else 'bright_red'
print(f"\n {('套件初始化失败', 'suite setup failed')[l.n]} : {num}", style=style)
num = ret['suite_teardown_fail']
style = 'white' if num == 0 else 'bright_red'
print(f"\n {('套件清除 失败', 'suite teardown failed')[l.n]} : {num}", style=style)
num = ret['case_setup_fail']
style = 'white' if num == 0 else 'bright_red'
print(f"\n {('用例初始化失败', 'cases setup failed')[l.n]} : {num}", style=style)
num = ret['case_teardown_fail']
style = 'white' if num == 0 else 'bright_red'
print(f"\n {('用例清除 失败', 'cases teardown failed')[l.n]} : {num}", style=style)
print("\n\n")
def enter_suite(self, name, suitetype):
if suitetype == 'file':
print(f'\n\n>>> {name}', style='bold bright_white')
def enter_case(self, caseId, name, case_className):
print(f'\n* {name}', style='bright_white')
def case_steps(self, name):
...
# def case_pass(self, case, caseId, name):
# print(' PASS',style='green')
# def case_fail(self, case, caseId, name, e, stacktrace):
# print(f' FAIL\n{e}',style='bright_red')
# def case_abort(self, case, caseId, name, e, stacktrace):
# print(f' ABORT\n{e}',style='magenta')
def case_result(self, case):
if case.execRet == 'pass':
print(' PASS', style='green')
elif case.execRet == 'fail':
print(f' FAIL\n{case.error}', style='bright_red')
elif case.execRet == 'abort':
print(f' ABORT\n{case.error}', style='magenta')
def setup(self, name, utype):
...
def teardown(self, name, utype):
...
# utype 可能是 suite case case_default
def setup_fail(self, name, utype, e, stacktrace):
utype = ('套件', 'suite')[l.n] if utype == 'suite' else ('用例', 'case')[l.n]
print(f"\n{utype} {('初始化失败', 'setup failed')[l.n]} | {name} | {e}", style='bright_red')
# print(f'\n{utype} setup fail | {name} | {e}',style='bright_red')
def teardown_fail(self, name, utype, e, stacktrace):
utype = ('套件', 'suite')[l.n] if utype == 'suite' else ('用例', 'case')[l.n]
print(f"\n{utype} {('清除失败', 'teardown failed')[l.n]} | {name} | {e}", style='bright_red')
# print(f'\n{utype} teardown fail | {name} | {e}',style='bright_red')
def info(self, msg):
if LogLevel.level >= 3:
print(f'{msg}')
def debug(self, msg):
if LogLevel.level >= 4:
print(f'{msg}')
def error(self, msg):
if LogLevel.level >= 1:
print(f'{msg}', style='bright_red')
def critical(self, msg):
if LogLevel.level >= 0:
print(f'{msg}', style='green')
class TextLogger:
def test_start(self, _title=''):
startTime = time.strftime('%Y%m%d_%H%M%S',
time.localtime(stats.start_time))
logger.info(f'\n\n ========= {("测试开始", "Test Start")[l.n]} : {startTime} =========\n')
def test_end(self, runner):
endTime = time.strftime('%Y%m%d_%H%M%S',
time.localtime(stats.end_time))
logger.info(f'\n\n ========= {("测试结束", "Test End")[l.n]} : {endTime} =========\n')
logger.info(f"\n {('耗时', 'Duration Of Testing ')[l.n]} : {(stats.end_time - stats.start_time):.3f} 秒\n")
ret = stats.result
logger.info(f"\n {('预备执行用例数量', 'number of cases plan to run')[l.n]} : {ret['case_count_to_run']}")
logger.info(f"\n {('实际执行用例数量', 'number of cases actually run')[l.n]} : {ret['case_count']}")
logger.info(f"\n {('通过', 'passed')[l.n]} : {ret['case_pass']}")
logger.info(f"\n {('失败', 'failed')[l.n]} : {ret['case_fail']}")
logger.info(f"\n {('异常', 'exception aborted')[l.n]} : {ret['case_abort']}")
logger.info(f"\n {('套件初始化失败', 'suite setup failed')[l.n]} : {ret['suite_setup_fail']}")
logger.info(f"\n {('套件清除 失败', 'suite teardown failed')[l.n]} : {ret['suite_teardown_fail']}")
logger.info(f"\n {('用例初始化失败', 'cases setup failed')[l.n]} : {ret['case_setup_fail']}")
logger.info(f"\n {('用例清除 失败', 'cases teardown failed')[l.n]} : {ret['case_teardown_fail']}")
def enter_suite(self, name, suitetype):
logger.info(f'\n\n>>> {name}')
def enter_case(self, caseId, name, case_className):
curTime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logger.info(f'\n* {name} - {curTime}')
def case_steps(self, name):
logger.info(f'\n [ case execution steps ]')
# def case_pass(self, case, caseId, name):
# logger.info(' PASS ')
# def case_fail(self, case, caseId, name, e, stacktrace):
# stacktrace = "Traceback:\n" +stacktrace.split("\n",3)[3]
# logger.info(f' FAIL {e} \n{stacktrace}')
# def case_abort(self, case, caseId, name, e, stacktrace):
# stacktrace = "Traceback:\n" +stacktrace.split("\n",3)[3]
# logger.info(f' ABORT {e} \n{stacktrace}')
def case_result(self, case):
if case.execRet == 'pass':
logger.info(' PASS ')
else:
stacktrace = "Traceback:\n" + case.stacktrace.split("\n", 3)[3]
if case.execRet == 'fail':
logger.info(f' FAIL {case.error} \n{stacktrace}')
elif case.execRet == 'abort':
logger.info(f' ABORT {case.error} \n{stacktrace}')
def setup(self, name, utype):
logger.info(f'\n[ {utype} setup ] {name}')
def teardown(self, name, utype):
logger.info(f'\n[ {utype} teardown ] {name}')
def setup_fail(self, name, utype, e, stacktrace):
stacktrace = "Traceback:\n" + stacktrace.split("\n", 3)[3]
logger.info(f'{utype} setup fail | {e} \n{stacktrace}')
def teardown_fail(self, name, utype, e, stacktrace):
stacktrace = "Traceback:\n" + stacktrace.split("\n", 3)[3]
logger.info(f'{utype} teardown fail | {e} \n{stacktrace}')
def info(self, msg):
if LogLevel.level >= 3:
logger.info(f'{msg}')
def debug(self, msg):
if LogLevel.level >= 4:
logger.info(f'{msg}')
def error(self, msg):
if LogLevel.level >= 1:
logger.info(f'{msg}')
def critical(self, msg):
if LogLevel.level >= 0:
logger.info(f'{msg}')
def step(self, stepNo, desc):
logger.info((f'\n-- 第 {stepNo} 步 -- {desc} \n',
f'\n-- Step #{stepNo} -- {desc} \n',
)[l.n])
def checkpoint_pass(self, desc):
logger.info((f'\n** 检查点 ** {desc} ----> 通过\n',
f'\n** checkpoint ** {desc} ----> pass\n'
)[l.n])
def checkpoint_fail(self, desc):
logger.info((f'\n** 检查点 ** {desc} ----> !! 不通过!!\n',
f'\n** checkpoint ** {desc} ----> !! fail!!\n'
)[l.n])
def log_img(self, imgPath: str, width: str = None):
logger.info(f'picture {imgPath}')
from dominate.tags import *
from dominate.util import raw
from dominate import document
class HtmlLogger:
def __init__(self):
self.curEle = None
def test_start(self, _title=''):
# css file
with open(os.path.join(os.path.dirname(__file__), 'report.css'), encoding='utf8') as f:
_css_style = f.read()
# js file
with open(os.path.join(os.path.dirname(__file__), 'report.js'), encoding='utf8') as f:
_js = f.read()
self.doc = document(title=Settings.report_title)
self.doc.head.add(
meta(charset="UTF-8"),
style(raw(_css_style)),
script(raw(_js), type='text/javascript'))
# 添加模态框结构
self.doc.body.add(
div(
div(
img(id="img01", _class="modal-content"),
span("关闭", _class="close")
),
id="imageModal", _class="modal")
)
self.main = self.doc.body.add(div(_class='main_section'))
self.main.add(h1(f'{Settings.report_title}', style='font-family: auto'))
self.main.add(h3(('统计结果', 'Test Statistics')[l.n]))
resultDiv = self.main.add(div(_class='result'))
self.result_table, self.result_barchart = resultDiv.add(
table(_class='result_table'),
div(_class='result_barchart')
)
_, self.logDiv = self.main.add(
div(
# span('切换到精简模式',_class='h3_button', id='display_mode' ,onclick="toggle_folder_all_cases()"),
h3(('执行日志', 'Test Execution Log')[l.n], style='display:inline'),
style='margin-top:2em'
),
div(_class='exec_log')
)
# 查看上一个和下一个错误的
self.ev = div(
div('∧', _class='menu-item', onclick="previous_error()", title='上一个错误'),
div('∨', _class='menu-item', onclick="next_error()", title='下一个错误'),
_class='error_jumper'
)
helpLink = ("http://www.byhy.net/tut/auto/hytest/01", 'https://github.com/jcyrss/hytest/Documentation.md')[l.n]
self.doc.body.add(div(
div(('页首', 'Home')[l.n], _class='menu-item',
onclick='document.querySelector("body").scrollIntoView()'),
div(('帮助', 'Help')[l.n], _class='menu-item',
onclick=f'window.open("{helpLink}", "_blank"); '),
div(('Summary', 'Summary')[l.n], _class='menu-item', id='display_mode',
onclick="toggle_folder_all_cases()"),
self.ev,
id='float_menu')
)
self.curEle = self.main # 记录当前所在的 html element
self.curSuiteEle = None # 记录当前的套件元素
self.curCaseEle = None # 记录当前的用例元素
self.curCaseLableEle = None # 记录当前的用例里面的 种类标题元素
self.curSetupEle = None # 记录当前的初始化元素
self.curTeardownEle = None # 记录当前的清除元素
self.suitepath2element = {}
def test_end(self, runner):
execStartTime = time.strftime('%Y/%m/%d %H:%M:%S',
time.localtime(stats.start_time))
execEndTime = time.strftime('%Y/%m/%d %H:%M:%S',
time.localtime(stats.end_time))
ret = stats.result
errorNum = 0
trs = []
trs.append(tr(td(('hytest 版本', 'hytest version')[l.n]), td(version)))
trs.append(tr(td(('开始时间', 'Test Start Time')[l.n]), td(f'{execStartTime}')))
trs.append(tr(td(('结束时间', 'Test End Time')[l.n]), td(f'{execEndTime}')))
trs.append(
tr(td(('耗时', 'Duration Of Testing')[l.n]), td(f'{stats.test_duration:.3f}' + (' 秒', ' Seconds')[l.n])))
trs.append(tr(td(('预备执行用例数量', 'number of cases plan to run')[l.n]), td(f"{ret['case_count_to_run']}")))
trs.append(tr(td(('实际执用例行数量', 'number of cases actually run')[l.n]), td(f"{ret['case_count']}")))
trs.append(tr(td(('通过', 'passed')[l.n]), td(f"{ret['case_pass']}")))
case_count_to_run = ret['case_count_to_run']
num = ret['case_fail']
style = '' if num == 0 else 'color:red'
trs.append(tr(td(('失败', 'failed')[l.n]), td(f"{num}", style=style)))
errorNum += num
num = ret['case_abort']
style = '' if num == 0 else 'color:red'
trs.append(tr(td(('异常', 'exception aborted')[l.n]), td(f"{num}", style=style)))
errorNum += num
# 计算阻塞用例个数
blocked_num = case_count_to_run - ret['case_pass'] - ret['case_fail'] - ret['case_abort']
style = '' if blocked_num == 0 else 'color:red'
trs.append(tr(td(('阻塞', 'blocked')[l.n]), td(f"{blocked_num}", style=style)))
num = ret['suite_setup_fail']
style = '' if num == 0 else 'color:red'
trs.append(tr(td(('套件初始化失败', 'suite setup failed')[l.n]), td(f"{num}", style=style)))
errorNum += num
num = ret['suite_teardown_fail']
style = '' if num == 0 else 'color:red'
trs.append(tr(td(('套件清除 失败', 'suite teardown failed')[l.n]), td(f"{num}", style=style)))
errorNum += num
num = ret['case_setup_fail']
style = '' if num == 0 else 'color:red'
trs.append(tr(td(('用例初始化失败', 'cases setup failed')[l.n]), td(f"{num}", style=style)))
errorNum += num
num = ret['case_teardown_fail']
style = '' if num == 0 else 'color:red'
trs.append(tr(td(('用例清除 失败', 'cases teardown failed')[l.n]), td(f"{num}", style=style)))
errorNum += num
self.ev['display'] = 'none' if errorNum == 0 else 'block'
# 添加结果统计表
self.result_table.add(tbody(*trs))
# 添加 结果柱状图
def add_barchar_item(statName, percent, color):
if type(percent) == str:
barPercentStr = percent
percentStr = '-'
else:
# 小于 1% 的, 都显示 1% 长度,否则就看不见了
barPercent = 1 if 0 < percent <= 1 else percent
barPercentStr = f'{barPercent}%'
percentStr = f'{percent}%'
self.result_barchart.add(
div(
span(statName),
div(
div(
"", # 柱状里面不填写内容了,如果值为1.86%,背景色部分太短,由于颜色是白色,溢出到右边的空白背景,看不清
style=f'width: {barPercentStr}; background-color: {color};',
_class="barchart_bar",
),
_class="barchart_barbox"
),
_class="barchar_item"
)
)
# add_barchar_item(
# f"用例总数 : {ret['case_count']} 个",
# 100,
# '#2196f3')
def percentCalc(upper, lower):
percent = str(round(upper * 100 / lower, 1))
percent = percent[:-2] if percent.endswith('.0') else percent
return percent
percent = percentCalc(ret['case_pass'], case_count_to_run)
# 将生成的变量存储到全局变量中
GSTORE['case_pass'] = percent
print(f"这是我获取到的值:{GSTORE['case_pass']}")
add_barchar_item(
f"{('用例通过', 'cases passed')[l.n]} {percent}% : {ret['case_pass']} {('个', '')[l.n]}",
float(percent),
'#04AA6D')
percent = percentCalc(ret['case_fail'], case_count_to_run)
add_barchar_item(
f"{('用例失败', 'cases failed')[l.n]} {percent}% : {ret['case_fail']} {('个', '')[l.n]}",
float(percent),
'#bb4069')
percent = percentCalc(ret['case_abort'], case_count_to_run)
add_barchar_item(
f"{('用例异常', 'cases exception aborted')[l.n]} {percent}% : {ret['case_abort']} {('个', '')[l.n]}",
float(percent),
'#9c27b0')
percent = percentCalc(blocked_num, case_count_to_run)
add_barchar_item(
f"{('用例阻塞', 'cases blocked')[l.n]} {percent}% : {blocked_num} {('个', '')[l.n]}",
float(percent),
'#dcbdbd')
# st_fail = ret['suite_setup_fail'] + ret['case_setup_fail'] + ret['suite_teardown_fail'] + ret['case_teardown_fail']
# percent = '100%' if st_fail > 0 else '0%'
# add_barchar_item(
# f"初始化/清除 失败 {st_fail} 次",
# percent,
# '#dcbdbd')
# 产生文件
htmlcontent = self.doc.render()
timestamp = time.strftime('%Y%m%d_%H%M%S', time.localtime(stats.start_time))
fileName = f'report_{timestamp}.html'
reportPath = os.path.join('log', fileName)
with open(reportPath, 'w', encoding='utf8') as f:
f.write(htmlcontent)
if Settings.auto_open_report:
try:
my_os = platform.system().lower()
if my_os == 'windows':
os.startfile(reportPath)
elif my_os == 'darwin': # macOS
os.system(f'open {reportPath}')
except:
print(traceback.format_exc())
# with command line parameter report_url_prefix
# need to copy report from dir 'log' to 'reports'
if Settings.report_url_prefix:
os.makedirs('reports', exist_ok=True)
cpTargetPath = os.path.join('reports', fileName)
shutil.copyfile(reportPath, cpTargetPath)
o1 = ('测试报告', 'test report')[l.n]
print(f"{o1} : {Settings.report_url_prefix}/{fileName} \n")
# def _findParentSuite(self,name):
# if name.endswith(os.path.sep):
# name = name[:-1]
# parentpath = os.path.dirname(name)
# # name 对应的 是用例根目录,
# if parentpath == '':
# self._addSuiteDir(self.body,name)
# return None
# # rug
# if parentpath not in self.suitepath2element:
# dirToCreate = []
# levels = parentpath.split(os.sep)
# cur = ''
# for level in levels:
# cur = os.path.join(cur,level)
print(f"这是我抓的全局变量值:{GSTORE['case_pass']}")
def enter_suite(self, name: str, suitetype):
_class = 'suite_' + suitetype
enterInfo = ('进入目录', 'Enter Folder')[l.n] if suitetype == 'dir' else ('进入文件', 'Enter File')[l.n]
self.curEle = self.logDiv.add(
div(
div(
span(enterInfo, _class='label'),
span(name)
),
_class=_class, id=f'{_class} {name}'
)
)
self.curSuiteEle = self.curEle
self.curSuiteFilePath = name
self.suitepath2element[name] = self.curEle
def enter_case(self, caseId, name, case_className):
# 执行有结果后,要修改这个 self.curCaseLableEle ,比如 加上 PASS
self.curCaseLableEle = span(('用例', 'Case')[l.n], _class='label caselabel')
# folder_body 是折叠区 内容部分,可以隐藏
self.curCaseBodyEle = div(
span(f'{self.curSuiteFilePath}::{case_className}', _class='case_class_path'),
_class='folder_body')
self.curCaseEle = self.curSuiteEle.add(
div(
div(
self.curCaseLableEle,
span(name, _class='casename'),
span(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), _class='executetime'),
_class='folder_header'
),
self.curCaseBodyEle,
_class='case', id=f'case_{caseId:08}'
)
)
self.curEle = self.curCaseBodyEle
def case_steps(self, name):
ele = div(
span(('测试步骤', 'Test Steps')[l.n], _class='label'),
_class='test_steps', id='test_steps ' + name)
self.curEle = self.curCaseBodyEle.add(ele)
# def case_pass(self, case, caseId, name):
# self.curCaseEle['class'] += ' pass'
# self.curCaseLableEle += ' PASS'
# def case_fail(self, case, caseId, name, e, stacktrace):
# self.curCaseEle['class'] += ' fail'
# self.curCaseLableEle += ' FAIL'
# stacktrace = "Traceback:\n" +stacktrace.split("\n",3)[3]
# self.curEle += div(f'{e} \n{stacktrace}', _class='info error-info')
# def case_abort(self, case, caseId, name, e, stacktrace):
# self.curCaseEle['class'] += ' abort'
# self.curCaseLableEle += ' ABORT'
# stacktrace = "Traceback:\n" +stacktrace.split("\n",3)[3]
# self.curEle += div(f'{e} \n{stacktrace}', _class='info error-info')
def case_result(self, case):
if case.execRet == 'pass':
self.curCaseEle['class'] += ' pass'
self.curCaseLableEle += ' PASS'
else:
# Traceback 前3行信息多余, 不要
stacktrace = "Traceback:\n" + case.stacktrace.split("\n", 3)[3]
if case.execRet == 'fail':
# 如果 Traceback 后3行信息固定的是 common.py 里面的 AssertionError ,也多余, 不要
if ', in CHECK_POINT' in stacktrace:
stacktrace = stacktrace.rsplit("\n", 4)[0]
self.curCaseEle['class'] += ' fail'
self.curCaseLableEle += ' FAIL'
self.curEle += div(f'{case.error} \n{stacktrace}', _class='info error-info')
elif case.execRet == 'abort':
self.curCaseEle['class'] += ' abort'
self.curCaseLableEle += ' ABORT'
self.curEle += div(f'{case.error} \n{stacktrace}', _class='info error-info')
# utype 可能是 suite case case_default
def setup(self, name, utype):
_class = f'{utype}_setup setup'
# 套件 setup
if utype == 'suite':
# folder_body 是折叠区 内容部分,可以隐藏
stHeaderEle = div(
span(('套件初始化', 'Suite Setup')[l.n], _class='label'),
span(name),
span(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), _class='executetime'),
_class='folder_header')
stBodyEle = self.curEle = div(_class='folder_body')
self.curSetupEle = div(
stHeaderEle,
stBodyEle,
_class=_class,
id=f'{_class} {name}')
self.curSuiteEle.add(self.curSetupEle)
# 用例 setup
else:
self.curSetupEle = self.curEle = div(
span(('用例初始化', 'Case Setup')[l.n], _class='label'),
_class=_class,
id=f'{_class} {name}')
self.curCaseBodyEle.add(self.curSetupEle)
self.curEle['class'] += ' case_st_lable'
# utype 可能是 suite case case_default
def teardown(self, name, utype):
_class = f'{utype}_teardown teardown'
# 套件 teardown
if utype == 'suite':
# folder_body 是折叠区 内容部分,可以隐藏
stHeaderEle = div(
span(('套件清除', 'Suite Teardown')[l.n], _class='label'),
span(name),
span(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), _class='executetime'),
_class='folder_header')
stBodyEle = self.curEle = div(_class='folder_body')
self.curTeardownEle = div(
stHeaderEle,
stBodyEle,
_class=_class,
id=f'{_class} {name}')
self.curSuiteEle.add(self.curTeardownEle)
# 用例 teardown
else:
self.curTeardownEle = self.curEle = div(
span(('用例清除', 'Case Teardown')[l.n], _class='label'),
_class=_class,
id=f'{_class} {name}')
self.curCaseBodyEle.add(self.curTeardownEle)
self.curEle['class'] += ' case_st_lable'
def setup_fail(self, name, utype, e, stacktrace):
self.curSetupEle['class'] += ' fail'
stacktrace = "Traceback:\n" + stacktrace.split("\n", 3)[3]
self.curEle += div(f'{utype} setup fail | {e} \n{stacktrace}', _class='info error-info')
def teardown_fail(self, name, utype, e, stacktrace):
self.curTeardownEle['class'] += ' fail'
stacktrace = "Traceback:\n" + stacktrace.split("\n", 3)[3]
self.curEle += div(f'{utype} teardown fail | {e} \n{stacktrace}', _class='info error-info')
def info(self, msg):
msg = f'{msg}'
if self.curEle is None:
return
self.curEle += div(msg, _class='info')
def step(self, stepNo, desc):
if self.curEle is None:
return
self.curEle += div(span(f'{("步骤", "Step")[l.n]} #{stepNo}', _class='tag'), span(desc), _class='case_step')
def checkpoint_pass(self, desc):
if self.curEle is None:
return
self.curEle += div(span(f'{("检查点", "CheckPoint")[l.n]} PASS', _class='tag'), span(desc),
_class='checkpoint_pass')
def checkpoint_fail(self, desc):
if self.curEle is None:
return
self.curEle += div(span(f'{("检查点", "CheckPoint")[l.n]} FAIL', _class='tag'), span(desc),
_class='checkpoint_fail')
def log_img(self, imgPath: str, width: str = None):
if self.curEle is None:
return
self.curEle += div(img(src=imgPath, width='aa' if width is None else width, _class='modal-image screenshot'))
from .signal import signal
signal.register([
stats,
ConsoleLogger(),
TextLogger(),
HtmlLogger()])
body {
font-family: consolas, Verdana, sans-serif;
font-size: 1.2em;
color: #696e71;
display: grid;
grid-template-columns: 1fr 5rem;
}
.main_section {
width: 90%;
margin: 0 auto;
}
#float_menu{
position:fixed;
top:0;
right:0;
text-align: center;
}
#float_menu .menu-item {
cursor: pointer;
padding: .5em;
margin: .5em 0;
color: #c08580;
background-color: #f8f0ef;
font-size: 1.2em;
}
.result{
display: flex;
}
.result_table{
border-collapse: collapse;
border: 1px solid #f0e0e5;
width: 30em;
text-align: center;
font-size: 1.0em;
}
.result_table td{
border: 1px solid #f0e0e5;
padding: .3em;
}
.result_barchart{
width: 30em;
margin: 0 5em 0 5em;
}
.barchar_item{
margin: 2.5rem 0;
}
.barchart_barbox {
margin: 0.5em 0;
width: 100%;
background-color: #fff;
border: 1px solid #86c2dd;
border-radius: .2em;
}
.barchart_bar {
text-align: right;
height: 1.2rem;
}
.h3_button {
margin: 1.5em;
cursor: pointer;
color: #03a9f4;
}
.info
{
white-space:pre-wrap;
margin: .8em 1.5em;
}
.error-info
{
color: #a64747
}
.suite_dir {
margin: 1em .2em;
padding: .3em;
/* background-color: #dfeff6; */
border: 1px solid #bcd8e4;
}
.suite_file {
margin: 1em .2em;
padding: .3em;
border: 1px solid #bcd8e4;
}
.case {
margin: 1em .2em;
/* padding: .3em; */
border: 1px solid #e7d4d4;
}
.case_class_path{
margin: 0em 1em;
}
.folder_header {
padding: .2em .7em;
background-color: #fffaf9;
cursor: pointer;
}
.setup{
margin: .2em;
/* padding: .3em; */
/* border: 1px solid #e7d4d4; */
}
.teardown{
margin: .2em;
/* padding: .3em;*/
/* border: 1px solid #e7d4d4; */
}
.test_steps{
margin: .2em;
padding: .3em;
/* border: 1px solid #e7d4d4; */
}
.label {
display: inline-block;
padding: .1em .5em;
font-size: .88em;
letter-spacing: 1px;
white-space: nowrap;
color: #0d6ebc;
border-radius: .2em;
min-width: 5em;
margin-right: 2em;
font-family: consolas;
}
/* .suite_setup .label{
color: #219e26 ;
}
.suite_teardown .label{
color: #219e26;
} */
/* .case.pass .casename{
color: #329132 ;
} */
.case.pass .caselabel{
color: white;
background-color: #3b9e3f;
}
/* .case.fail .casename{
color: #a64747;
} */
.case.fail .caselabel{
color: white;
background-color: #a64747;
}
/* .case.abort .casename{
color: #953ab7;
} */
.case.abort .caselabel{
color: white;
background-color: #9c27b0;
}
.case_step {
margin: .8em;
}
.checkpoint_pass {
margin: .8em;
}
.checkpoint_fail {
margin: .8em;
}
.case_step .tag{
color: #2196f3;;
margin: .3em 1em .3em 0;
padding: .1em .3em;
font-size: .92em;
}
.checkpoint_pass .tag{
color: #009806;
margin:.3em 1em .3em .5em;
padding: .1em .3em;
font-size: .92em;
}
.checkpoint_fail .tag{
color: #9c2020;
margin:.3em 1em .3em .5em;
padding: .1em .3em;
font-size: .92em;
}
.screenshot {
border: 1px solid #86c2dd;
}
.executetime {
float: right;
}
/* 模态框内容 */
.modal-content {
margin: auto;
display: block;
width: 95%;
max-width: 700px;
max-height: 80vh; /* 设置最大高度为视口高度的80% */
object-fit: contain; /* 保持图片的宽高比 */
zoom: 3;
}
/* 模态框 */
.modal {
display: none; /* 隐藏 */
position: fixed; /* 固定位置 */
z-index: 1; /* 坐在顶部 */
padding-top: 40px; /* 在图片上方添加一些内边距 */
left: 0;
top: 0;
width: 100%; /* 宽度 */
height: 100%; /* 高度 */
overflow: auto; /* 启用滚动 */
background-color: rgb(0,0,0); /* 背景颜色 */
background-color: rgba(0,0,0,0.9); /* 黑色背景半透明 */
}
/* 关闭按钮 */
.close {
position: absolute; /* 定义元素的定位方式为绝对定位 */
top: 10px; /* 距离最近的已定位祖先元素顶部15像素 */
right: 30px; /* 距离最近的已定位祖先元素右侧35像素 */
color: #f1f1f1; /* 文本颜色为浅灰色 */
font-size: 15px; /* 字体大小为40像素 */
font-weight: bold; /* 字体加粗 */
transition: 0.3s; /* 过渡效果,0.3秒内完成 */
}
.close:hover,
.close:focus {
color: #bbb;
text-decoration: none;
cursor: pointer;
}
var FOLDER_ALL_CASES = false; // 是否为精简模式的标记
var ERROR_INFOS = []; // 错误信息列表
var current_error_idx = -1;
// 页面加载后执行的函数
window.addEventListener("load", function(){
// 所有 .folder_header 添加点击事件处理
let folderHeaderEles = document.querySelectorAll(".folder_header");
folderHeaderEles.forEach(function(ele) {
ele.addEventListener("click", function(event) {
let fb = event.target.closest('.folder_header').nextElementSibling;
fb.style.display = fb.style.display === 'none' ? 'block' : 'none';
});
});
// 找到所有的错误信息对象
ERROR_INFOS = document.querySelectorAll(".error-info");
// 获取所有图片元素
let images = document.querySelectorAll('.modal-image');
// 获取模态框元素
let modal = document.getElementById("imageModal");
// 获取模态框中的图片元素
let modalImg = document.getElementById("img01");
// 获取关闭按钮元素
let span = document.getElementsByClassName("close")[0];
// 为每个图片添加点击事件监听器
images.forEach(function(img) {
img.addEventListener("click", function() {
modal.style.display = "block"; // 显示模态框
modalImg.src = this.src; // 设置模态框中的图片为点击的图片
});
});
// 当点击关闭按钮时,隐藏模态框
span.onclick = function() {
modal.style.display = "none";
};
// 当点击模态框外区域时,隐藏模态框
window.onclick = function(event) {
if (event.target == modal) {
modal.style.display = "none";
}
};
});
function toggle_folder_all_cases(){
let eles = document.querySelectorAll(".folder_body");
FOLDER_ALL_CASES = !FOLDER_ALL_CASES;
document.getElementById('display_mode').innerHTML = FOLDER_ALL_CASES ? "Detail" : "Summary";
for (const ele of eles){
ele.style.display = FOLDER_ALL_CASES ? "none" : "block";
}
}
function previous_error(){
// 查找错误必须是详细模式
if (FOLDER_ALL_CASES)
toggle_folder_all_cases()
current_error_idx -= 1;
if (current_error_idx < 0)
current_error_idx = 0;
let error = ERROR_INFOS[current_error_idx];
error.scrollIntoView({behavior: "smooth", block: "center", inline: "start"});
}
function next_error(){
// 查找错误必须是详细模式
if (FOLDER_ALL_CASES)
toggle_folder_all_cases()
current_error_idx += 1;
if (current_error_idx > ERROR_INFOS.length - 1)
current_error_idx = ERROR_INFOS.length - 1;
let error = ERROR_INFOS[current_error_idx];
error.scrollIntoView({behavior: "smooth", block: "center", inline: "start"});
}
import os, types, importlib.util, fnmatch, traceback
from .signal import signal
from ..cfg import l
# 用例标签匹配,有一个满足即可
def tagmatch(pattern):
for tag in Collector.current_case_tags:
if fnmatch.fnmatch(tag, pattern):
# print(' --> match')
return True
# print(' --> nomatch')
return False
'''
搜集有效执行对象的 思路 伪代码如下:
循环遍历加载用例目录下面所有的 py 文件,导入为模块:
从该模块找到测试相关的信息 比如:套件标签、套件初始化清除、用例类, 保存到字典meta中
如果是用例模块,根据 选择条件 判定模块里面的用例 是否被选中,去掉没有选中的用例
从执行列表中去掉 没有包含用例的 目录模块
'''
class Collector:
SUITE_TAGS = [
'force_tags',
'default_tags', # 暂时不用
]
SUITE_STS = [
'suite_setup',
'suite_teardown',
'test_setup',
'test_teardown',
]
# 最终要执行的 相关模块文件
exec_list = []
# 最终要执行的 相关模块文件 和 对应的 对象
exec_table = {}
# 记录本次要执行的用例个数
case_number = 0
# 标签表,根据进入的路径,记录和当前模块相关的标签
# 格式如下
# 'force_tags': {
# 'cases\\': ['冒烟测试', '订单功能'],
# 'cases\\customer\\功能21.py': ['冒烟测试', '订单功能'],},
# 'default_tags': {
# 'cases\\customer\\功能31.py': ['优先级7'] }
suite_tag_table = {
'force_tags': {},
'default_tags': {}
}
# 当前用例的所有标签
current_case_tags = []
@classmethod
def run(cls,
casedir='cases',
suitename_filters=[], # 只要有一个匹配就算匹配
casename_filters=[], # 只要有一个匹配就算匹配
tag_include_expr=None,
tag_exclude_expr=None,
):
signal.info(
('\n\n=== [ 收集测试用例 ] === \n',
'\n\n=== [ collect test cases ] === \n')[l.n]
)
for (dirpath, dirnames, filenames) in os.walk(casedir):
# 确保 __st__.py 在最前面
if '__st__.py' in filenames:
filenames.remove('__st__.py')
filenames.insert(0, '__st__.py')
# 处理每个可能的执行模块文件
for fn in filenames:
filepath = os.path.join(dirpath, fn)
if not filepath.endswith('.py'):
continue
signal.info(f'\n== {filepath} \n')
module_name = fn[:-3]
spec = importlib.util.spec_from_file_location(module_name, filepath)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# 处理一个模块文件
cls.handleOneModule(module, filepath,
tag_include_expr,
tag_exclude_expr,
suitename_filters,
casename_filters)
# *** 从执行列表中去掉 没有包含用例的 目录模块 ***
# 先把 套件目录 和 套件文件 分别放到列表 sts, cases 中
sts, cases = [], []
for filepath in cls.exec_list:
if filepath.endswith('py'):
cases.append(filepath)
else:
sts.append(filepath)
# 再找出 套件目录中没有可以执行的测试文件的 哪些, 去掉不要
for stPath in sts:
valid = False
for casePath in cases:
if casePath.startswith(stPath):
valid = True
break
if not valid:
cls.exec_list.remove(stPath)
cls.exec_table.pop(stPath)
# 处理一个模块文件
@classmethod
def handleOneModule(cls, module, filepath: str,
tag_include_expr: str,
tag_exclude_expr: str,
suitename_filters: list,
casename_filters: list):
cur_module_name = os.path.basename(filepath).replace('.py', '')
stType = filepath.endswith('__st__.py')
caseType = not stType
if stType:
filepath = filepath.replace('__st__.py', '')
# ====== 搜寻该模块 hytest关键信息 ,保存在 meta 里面========
meta = {'type': 'casefile' if caseType else 'st'}
if caseType:
meta['cases'] = []
for name, item in module.__dict__.items():
# __ 开头的名字肯定不是hytest关键名,跳过
if name.startswith('__'):
continue
# 对应一个模块文件的,肯定是外部导入的模块,跳过
if hasattr(item, '__file__'):
continue
# 外部模块内部导入的名字,跳过
if hasattr(item, '__module__'):
if item.__module__ != cur_module_name:
continue
# signal.info(f'-- {name}')
# 列表 : 是 标签 吗?
if isinstance(item, list):
# 非标签关键字,跳过
if name not in cls.SUITE_TAGS:
continue
# 如果标签列表为空,跳过
if not item:
continue
meta[name] = item
cls.suite_tag_table[name][filepath] = item
signal.debug(f'-- {name}')
# 函数 : 是 初始化清除 吗?
elif isinstance(item, types.FunctionType):
# 非套件初始化清除关键字,跳过
if name not in cls.SUITE_STS:
continue
meta[name] = item
signal.debug(f'-- {name}')
# 类 : 是 用例 吗?
elif caseType and isinstance(item, type):
# 没有 teststeps , 肯定不是用例类, 跳过
if not hasattr(item, 'teststeps'):
signal.info(f'no teststeps in class "{name}", skip it.')
continue
# 如果 有 name 是 一个用例
if hasattr(item, 'name'):
# 同时有 ddt_cases ,格式不对
if hasattr(item, 'ddt_cases'):
signal.info(f'both "name" and "ddt_cases" in class "{name}", skip it.')
continue
meta['cases'].append(item())
signal.debug(f'-- {name}')
# 如果 有 ddt_cases 是数据驱动用例,对应多个用例
elif hasattr(item, 'ddt_cases'):
for caseData in item.ddt_cases:
# 实例化每个用例,属性name,para设置好
case = item()
case.name, case.para = caseData['name'], caseData['para'],
meta['cases'].append(case)
# 没有 name 也没有 ddt_cases, 类名作为用例名
else:
item.name = name
meta['cases'].append(item())
signal.debug(f'-- {name}')
# suite_tag_table 表中去掉 和 当前模块不相干的记录,
# 这样每次进入新的模块目录,就会自动去掉前面已经处理过的路径 标签记录
new_suite_tag_table = {}
for tname, table in cls.suite_tag_table.items():
new_suite_tag_table[tname] = {p: v for p, v in table.items() if filepath.startswith(p)}
cls.suite_tag_table = new_suite_tag_table
# 用例模块
if caseType:
# 如果 没有用例
if not meta['cases']:
signal.info(f'\n** no cases in this file, skip it.')
return
# 模块里面的用例 根据选择条件过滤 ,如果没有通过,会从 meta['cases'] 里面去掉
cls.caseFilter(filepath, meta, tag_include_expr, tag_exclude_expr, suitename_filters, casename_filters)
# 如果 用例都被过滤掉了
if not meta['cases']:
signal.info(f'\n** no cases in this file , skip it.')
return
# 待执行用例总数更新
cls.case_number += len(meta['cases'])
# __st__ 模块
else:
# 应该包含 初始化 或者 清除 或者 标签 ,否则是无效模块,跳过
if len(meta) == 1:
signal.info(f'\n** no setup/teardown/tags in this file , skip it.')
return
# 该模块文件 先暂时 加入执行列表
cls.exec_list.append(filepath)
cls.exec_table[filepath] = meta
# 经过这个函数的执行, 最后 meta['cases'] 里面依然保存的,才是需要执行的用例
@classmethod
def caseFilter(cls, filepath: str, meta: dict,
tag_include_expr: str,
tag_exclude_expr: str,
suitename_filters: list,
casename_filters: list):
# -------- 模块所有用例进行分析 ---------
# 没有任何过滤条件,就不需要再看每个用例的情况了
if not tag_include_expr and not tag_exclude_expr and not suitename_filters and not casename_filters:
return
# 如果没有排除过滤,
# 并且 有 套件名过滤,并且整个套件被选中,就不需要再看每个用例的情况了
# 一个用例文件 ,路径上的每一级都是一个套件
if not tag_exclude_expr and suitename_filters:
suitenames = filepath.split(os.path.sep)
# 套件文件名的后缀.py 去掉 作为套件名
suitenames = [sn[:-3] if sn.endswith('.py') else sn for sn in suitenames]
if cls._patternMatch(suitenames, suitename_filters):
return
# -------- 对每个用例进行分析 ---------
passedCases = [] # 被选中的用例列表
for caseClass in meta['cases']:
signal.debug(f'\n* {caseClass.name}')
# ----------- 先看标签排除过滤 ------------
# 得到当前模块相关的 套件 标签,就是表中现有的标签合并
suite_tags = [t for tl in cls.suite_tag_table['force_tags'].values() for t in tl]
# 用例本身的标签
case_tags = getattr(caseClass, 'tags', [])
# 用例关联的所有的标签
cls.current_case_tags = set(suite_tags + case_tags)
# print(cls.current_case_tags)
# 如果有标签排除过滤
if tag_exclude_expr:
# 条件满足,被排除
if eval(tag_exclude_expr) == True:
signal.debug(f'excluded for meeting expr : {tag_exclude_expr}')
continue
# 没有被排除
else:
# 并且没有其他的 选择条件(只有标签排除过滤),就是被选中
if not casename_filters and not suitename_filters and not tag_include_expr:
passedCases.append(caseClass)
continue
# --------- 再看 名字匹配过滤 ------------
# 有用例名过滤
if casename_filters:
caseName = getattr(caseClass, 'name')
# 通过用例名过滤
if cls._patternMatch([caseName], casename_filters):
passedCases.append(caseClass)
continue
# ----------- 再看标签匹配过滤 ------------
if tag_include_expr:
if eval(tag_include_expr) == True:
passedCases.append(caseClass)
continue
# 上面一个选择条件也没有满足
signal.debug(f'excluded for not meet any include rules')
# 最终存放 通过过滤的用例
meta['cases'] = passedCases
@classmethod
def _patternMatch(cls, names, patterns):
for name in names:
for pattern in patterns:
if fnmatch.fnmatch(name, pattern):
return True
return False
'''
执行自动化的 思路 伪代码如下:
1. 先保证 exec_list 中 该teardown的地方插入 teardown记录
执行前, exec_list 示例如下
[
'cases\\',
'cases\\.功能3.py',
'cases\\功能1.py',
'cases\\功能2.py',
'cases\\customer\\',
'cases\\customer\\功能21.py',
'cases\\order\\',
'cases\\order\\功能31.py',
]
遍历 exec_table 中的每个对象:
如果 该执行对象 type 是 st, 说明是 套件目录:
如果有 tear_down, 到 exec_list 中 找到合适的位置,插入 tear_down 操作
执行完此步骤后, exec_list 示例如下
[
'cases\\',
'cases\\.功能3.py',
'cases\\功能1.py',
'cases\\功能2.py',
'cases\\customer\\',
'cases\\customer\\功能21.py',
'cases\\customer\\--teardown--',
'cases\\order\\',
'cases\\order\\功能31.py',
'cases\\order\\--teardown--',
'cases\\--teardown--'
]
2. 然后执行测试
suite_setup_failed_list = [] 记录初始化失败的套件
for name in exec_list:
检查 这个name 是否以 suite_setup_failed_list 里面的内容开头
如果是 continue
if name 以 --teardown-- 结尾:
去掉 --teardown-- 部分,找到 exec_table中的对象执行 teardown
else:
以name 为key, 找到 exec_table中的对象:
if 类型是 st :
如果 有 suite_setup:
执行 suite_setup
如果 suite_setup 抛异常:
添加 name 到 suite_setup_failed_list
esif 类型是 case:
执行 case里面的用例:
先执行用例的 setup
如果 setup 异常,后面的 teststeps 和 teardown都不执行
'''
class Runner:
curRunningCase = None
# 记录所有测试用例的执行结果,每个元素都是用户定义的测试用例类实例
# 执行过程中写入了测试几个到每个测试用例类中
case_list = []
@classmethod
def run(cls, ):
signal.info(
('\n\n=== [ 执行测试用例 ] === \n',
'\n\n=== [ execute test cases ] === \n')[l.n]
)
# 如果本次没有可以执行的用例(可能是过滤项原因),直接返回
if not Collector.exec_list:
signal.error(('!! 没有可以执行的测试用例', '!! No cases to run')[l.n])
return 2 # 2 表示没有可以执行的用例
signal.info(f"{('预备执行用例数量', 'Number of cases to run')[l.n]} : {Collector.case_number}\n")
# 执行用例时,为每个用例分配一个id,方便测试报告里面根据id跳转到用例
cls.caseId = 0
# 1. 先保证 exec_list 中 该teardown的地方插入 teardown记录
for name, meta in Collector.exec_table.items():
if meta['type'] == 'st' and 'suite_teardown' in meta:
cls._insertTeardownToExecList(name)
# print(Collector.exec_list)
# 2. 然后执行自动化流程
signal.test_start()
cls.execTest()
signal.test_end(cls)
from hytest.common import GSTORE
# 0 表示执行成功 , 1 表示有错误 , 2 表示没有可以执行的用例, 3 表示未知错误
return GSTORE.get('---ret---', 3)
@classmethod
def execTest(cls):
suite_setup_failed_list = [] # 记录初始化失败的套件
for name in Collector.exec_list:
# 检查 这个name 是否属于套件初始化失败影响的范围
affected = False
for suite_setup_failed in suite_setup_failed_list:
if name.startswith(suite_setup_failed):
affected = True
break
if affected:
continue
# 套件目录清除
if name.endswith('--teardown--'):
# 去掉 --teardown-- 部分
name = name.replace('--teardown--', '')
# 找到 exec_table 中的对象执行 teardown
suite_teardown = Collector.exec_table[name]['suite_teardown']
signal.teardown(name, 'suite')
try:
suite_teardown()
except Exception as e:
# 套件目录 清除失败
signal.teardown_fail(name, 'suite', e, traceback.format_exc())
else:
meta = Collector.exec_table[name]
# 进入套件目录
if meta['type'] == 'st':
signal.enter_suite(name, 'dir')
suite_setup = meta.get('suite_setup')
# 套件目录初始化
if suite_setup:
signal.setup(name, 'suite')
try:
suite_setup()
except Exception as e:
# 套件目录 初始化失败,
signal.setup_fail(name, 'suite', e, traceback.format_exc())
# 记录到 初始化失败目录列表 中, 该套件目录内容都不会再执行
suite_setup_failed_list.append(name)
# 进入套件文件
elif meta['type'] == 'casefile':
signal.enter_suite(name, 'file')
# 套件文件 初始化
suite_setup = meta.get('suite_setup')
if suite_setup:
signal.setup(name, 'suite')
try:
suite_setup()
except Exception as e:
# 套件文件 初始化失败
signal.setup_fail(name, 'suite', e, traceback.format_exc())
# 该套件文件内容都不会再执行
continue
# 执行套件文件里面的用例
cls._exec_cases(meta)
# 套件文件 清除
suite_teardown = meta.get('suite_teardown')
if suite_teardown:
signal.teardown(name, 'suite')
try:
suite_teardown()
except Exception as e:
# 套件文件 清除失败
signal.teardown_fail(name, 'suite', e, traceback.format_exc())
# exec_list 中 找到 stName 对应的 teardown的地方插入 teardown记录
@classmethod
def _insertTeardownToExecList(cls, stName):
findStart = False
insertPos = -1
for pos, name in enumerate(Collector.exec_list):
# 这样肯定会先找到 等于 stName 的位置
if not findStart:
if name != stName:
continue
else:
findStart = True
else:
# print(name,stName)
# 接下来找 不以 stName 开头的那个元素,就在此位置插入
if not name.startswith(stName):
insertPos = pos
break
# 直到最后也没有找到,是用例根目录,添加到最后
tearDownName = stName + '--teardown--'
if insertPos == -1:
Collector.exec_list.append(tearDownName)
else:
Collector.exec_list.insert(insertPos, tearDownName)
# 执行套件文件里面的多个用例
@classmethod
def _exec_cases(cls, meta):
# 缺省 test_setup test_teardown
test_setup = meta.get('test_setup')
test_teardown = meta.get('test_teardown')
# 取出每一个用例
for case in meta['cases']:
# 记录到 cls.case_list 中,方便测试结束后,遍历每个测试用例
cls.case_list.append(case)
case_className = type(case).__name__
# 用例 id 自动递增 分配, 这个id 主要是 作为 产生的HTML日志里面的html元素id
cls.caseId += 1
signal.enter_case(cls.caseId, case.name, case_className)
# 记录当前执行的case
cls.curRunningCase = case
# 如果用例有 setup
caseSetup = getattr(case, 'setup', None)
if caseSetup:
signal.setup(case.name, 'case')
try:
caseSetup()
except Exception as e:
signal.setup_fail(case.name, 'case', e, traceback.format_exc())
continue # 初始化失败,这个用例的后续也不用执行了
# 如果用例没有 setup,但是有缺省 test_setup
elif test_setup:
signal.setup(case.name, 'case_default')
try:
test_setup()
except Exception as e:
signal.setup_fail(case.name, 'case_default', e, traceback.format_exc())
continue # 初始化失败,这个用例的后续也不用执行了
signal.case_steps(case.name)
try:
# 先预设结果为通过,如果有检查点不通过,那里会设置为fail
case.execRet = 'pass'
case.teststeps()
signal.case_result(case)
except AssertionError as e:
case.execRet = 'fail'
case.error = e
case.stacktrace = traceback.format_exc()
signal.case_result(case)
except Exception as e:
case.execRet = 'abort'
case.error = e
case.stacktrace = traceback.format_exc()
signal.case_result(case)
# 用例 teardown
caseTeardown = getattr(case, 'teardown', None)
if caseTeardown:
signal.teardown(case.name, 'case')
try:
caseTeardown()
except Exception as e:
signal.teardown_fail(case.name, 'case', e, traceback.format_exc())
# 如果用例没有 teardown ,但是有缺省 test_teardown
elif test_teardown:
signal.teardown(case.name, 'case_default')
try:
test_teardown()
except Exception as e:
signal.teardown_fail(case.name, 'case_default', e, traceback.format_exc())
if __name__ == '__main__':
Collector.run(
# suitename_filters=['cust*'],
# casename_filters=['cust*','or*'],
# tag_include_expr="(tagmatch('优先级4')) or (tagmatch('UITest')) or (tagmatch('Web*'))"
)
# print(Collector.exec_table)
Runner.run()
class Signal:
_clients = []
_curMethodName = None
def register(self, client):
if isinstance(client,list):
self._clients += client
else:
self._clients.append(client)
def _broadcast(self,*arg,**kargs):
for logger in self._clients:
method = getattr(logger,self._curMethodName,None)
if method:
method(*arg,**kargs)
def __getattr__(self, attr):
self._curMethodName = attr
return self._broadcast
signal = Signal()
......@@ -76,11 +76,11 @@ def browser_init(login_type):
# chromedriver下载地址:https://googlechromelabs.github.io/chrome-for-testing/
# 自动化运行服务器的chromedriver路径:
# 拯救者电脑
service = Service(r'C:\Users\29194\AppData\Local\Programs\Python\Python310\Scripts\chromedriver.exe')
# service = Service(r'C:\Users\29194\AppData\Local\Programs\Python\Python310\Scripts\chromedriver.exe')
# EDY电脑
# service = Service(r'C:\Program Files\Python310\Scripts\chromedriver.exe')
# 云电脑
# service = Service(r'E:\Python\Scripts\chromedriver.exe')
service = Service(r'E:\Python\Scripts\chromedriver.exe')
# 尝试创建WebDriver实例并执行初始化操作
try:
# 创建WebDriver实例
......
......@@ -311,4 +311,6 @@
89. 2025-06-03:
- 兰州中石化项目输出会议申报模块的JSON数据,调试自动化运行,补充会议申报模块的JSON数据。
90. 2025-06-05:
- 兰州中石化项目输出角色权限组的部分JSON数据,调试自动化运行。排查展厅自动化失败问题,处理chrome版本升级后,chromedriver版本没对上问题。
\ No newline at end of file
- 兰州中石化项目输出角色权限组的部分JSON数据,调试自动化运行。排查展厅自动化失败问题,处理chrome版本升级后,chromedriver版本没对上问题。
91. 2025-06-10:
- 兰州中石化项目输出议题申报流程的JSON数据,调试自动化运行。
\ No newline at end of file
......@@ -18,7 +18,7 @@ class ConferenceDeclaration:
"""
执行指令是:
1.cd 预定系统
2.hytest --report_title 兰州中石化项目会议申报测试报告 --report_url_prefix http://nat.ubainsyun.com:31133 --tag 兰州中石化项目会议申报
2.hytest --report_title 兰州中石化项目会议申报测试报告 --report_url_prefix http://nat.ubainsyun.com:31135 --tag 兰州中石化项目会议申报
"""
ddt_cases = read_xlsx_data(xlsx_file_path, sheet_name='会议申报',case_type="兰州中石化项目25-05-24")
# 测试开始前调用clear_columns_in_xlsx函数,将测试用例中的测试结果和日志截图置空
......
......@@ -18,7 +18,7 @@ class RolePermissionManagement:
"""
执行指令是:
1.cd 预定系统
2.hytest --report_title 兰州中石化项目角色权限管理测试报告 --report_url_prefix http://nat.ubainsyun.com:31133 --tag 兰州中石化项目角色权限管理
2.hytest --report_title 兰州中石化项目角色权限管理测试报告 --report_url_prefix http://nat.ubainsyun.com:31135 --tag 兰州中石化项目角色权限管理
"""
ddt_cases = read_xlsx_data(xlsx_file_path, sheet_name='角色权限管理',case_type="兰州中石化项目25-05-24")
# 测试开始前调用clear_columns_in_xlsx函数,将测试用例中的测试结果和日志截图置空
......
import sys
import os
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建预定系统的绝对路径
预定系统_path = os.path.abspath(os.path.join(current_dir, '..','..'))
# 添加路径
sys.path.append(预定系统_path)
# 导入模块
from 预定系统.Base.base import *
def suite_setup():
STEP(1, "初始化浏览器")
# 初始化浏览器与系统地址
browser_init("兰州中石化项目测试环境")
user_login("admin", "Ubains@4321")
sleep(2)
wd = GSTORE['wd']
sleep(1)
def suite_teardown():
wd = GSTORE['wd']
wd.quit()
\ No newline at end of file
import sys
import os
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构建预定系统的绝对路径
预定系统_path = os.path.abspath(os.path.join(current_dir, '..', '..', '..' , '..'))
# 添加路径
sys.path.append(预定系统_path)
# 导入模块
from 预定系统.Base.base import *
# 构建XLSX文件的绝对路径
xlsx_file_path = os.path.join(预定系统_path, '测试数据', '兰州中石化项目测试用例.xlsx')
class TopicDeclaration:
tags = ['兰州中石化项目议题申报']
"""
执行指令是:
1.cd 预定系统
2.hytest --report_title 兰州中石化项目议题申报测试报告 --report_url_prefix http://nat.ubainsyun.com:31135 --tag 兰州中石化项目议题申报
"""
ddt_cases = read_xlsx_data(xlsx_file_path, sheet_name='议题申报',case_type="兰州中石化项目25-05-24")
# 测试开始前调用clear_columns_in_xlsx函数,将测试用例中的测试结果和日志截图置空
# clear_columns_in_xlsx(xlsx_file_path, sheet_name='会议创建', columns_to_clear=['测试结果', '测试频次', '日志截图'])
def teststeps(self):
"""
执行测试步骤函数,主要用于执行读取的测试用例并进行信息统计模块功能测试操作
"""
# 从全局存储中获取webdriver对象
wd = GSTORE['wd']
name = self.name
# 点击【议题申报】按钮进入模块
INFO("点击【议题申报】按钮")
safe_click((By.XPATH, "//div[@id='CreateTopic']"), wd)
sleep(1)
for step in self.para:
# 赋值页面类型page
page_type = step.get('page')
# 赋值元素定位类型,并将字符串转为Enum类型
locator_type = get_by_enum(step.get('locator_type'))
# 赋值元素值
locator_value = step.get('locator_value')
# 赋值元素类型,例如:click点击、input输入框等
element_type = step.get('element_type')
# 赋值元素值,例如输入框的输入值
element_value = step.get('element_value')
# 赋值预期结果
expented_result = step.get('expented_result')
INFO(f"页面: {page_type}、元素定位类型: {locator_type}、元素定位值: {locator_value}、元素类型: {element_type}、元素值: {element_value}、预期结果: {expented_result}")
if element_type == "click":
safe_click((locator_type, locator_value), wd)
sleep(2)
elif element_type == "input":
safe_send_keys((locator_type, locator_value), element_value, wd)
sleep(2)
elif element_type == "getTips":
notify_text = get_notify_text(wd, (locator_type, locator_value))
INFO(f"获取到的提示信息为:{notify_text}")
sleep(2)
CHECK_POINT(f"获取到的提示信息为:{notify_text}", expented_result in notify_text)
SELENIUM_LOG_SCREEN(wd, "75")
elif element_type == "getText":
text = elment_get_text((locator_type, locator_value), wd)
INFO(f"获取到的文本信息为:{text}")
CHECK_POINT(f"获取到的文本信息为:{text}", expented_result in text)
SELENIUM_LOG_SCREEN(wd, "75")
# 执行完一个用例就刷新一下页面重置
wd.refresh()
wd.refresh()
sleep(2)
\ No newline at end of file
server_addr: ngrok.ubsyun.com:9083
trust_host_root_certs: false
tunnels:
nat1:
proto:
tcp: 127.0.0.1:80
remote_port: 31135
ngrok -config=ngrok.cfg start nat1
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论