提交 17bea318 authored 作者: 彭甘宇's avatar 彭甘宇

1、增加钉钉短信对接;

2、增加用户编辑和删除模块;
3、调整初始化方法;
4、增加测试数据;
上级 cc50c6a5
......@@ -2,13 +2,13 @@ from lib.login import *
def suite_setup():
wd = GSTORE['wd']
open_browser()
user_login("admin@pgy", "ub@123456", "csba")
enter_system()
enter_manage()
enter_user_manage()
# def suite_teardown():
# INFO('进行清除操作')
# wd = GSTORE['wd']
# wd.quit()
# INFO('进行清除操作')
# wd = GSTORE['wd']
# wd.quit()
from lib.login import *
class UserAdd001:
name = 'user_add_001'
def teststeps(self):
wd = GSTORE['wd']
STEP(1, '点击新增按钮')
user_add = WebDriverWait(wd, 10).until(
EC.element_to_be_clickable((By.XPATH, "//div[@class='company-edmit-right']//span[contains(text(),'新增')]"))
)
user_add.click()
sleep(2)
STEP(2, '填写登录名')
account_input = WebDriverWait(wd, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@placeholder='登录名以字母开头,长度在5-18之间,只能包含字母、数字和下划线、@']"))
)
account_input.clear()
account_input.send_keys("admin@test")
STEP(3, '填写用户名称')
account_name_input = WebDriverWait(wd, 10).until(
EC.presence_of_element_located((By.XPATH, "//div[@class='el-input el-input--suffix']//input[@placeholder='请输入用户名称']"))
)
account_name_input.clear()
account_name_input.send_keys("测试用户")
STEP(4, '填写新密码')
passwd_input = WebDriverWait(wd, 10).until(
EC.presence_of_element_located(
(By.XPATH, "//input[@placeholder='必须包含字母(不区分大小写)、数字和特殊字符,至少8个字符']"))
)
passwd_input.clear()
passwd_input.send_keys("ub@123456")
STEP(5, '填写确认密码')
sepasswd_input = WebDriverWait(wd, 10).until(
EC.presence_of_element_located(
(By.XPATH, "//input[@placeholder='请确认密码']"))
)
sepasswd_input.clear()
sepasswd_input.send_keys("ub@123456")
STEP(6, '点击确认')
commit = WebDriverWait(wd, 10).until(
EC.element_to_be_clickable((By.XPATH, "//div[@aria-label='新增']//span[contains(text(),'确 定')]"))
)
commit.click()
STEP(7, '验证是否新增成功')
get_menu = WebDriverWait(wd, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '.el-message__content'))
)
get_menu1 = get_menu.text
CHECK_POINT('检查是否出现成功提示弹窗', get_menu1 == "操作成功!")
sleep(2)
class UserAdd002:
name = 'user_add_002'
def teststeps(self):
wd = GSTORE['wd']
STEP(1, '点击新增按钮')
user_add = WebDriverWait(wd, 10).until(
EC.element_to_be_clickable((By.XPATH, "//div[@class='company-edmit-right']//span[contains(text(),'新增')]"))
)
user_add.click()
sleep(2)
STEP(2, '填写登录名')
account_input = WebDriverWait(wd, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@placeholder='登录名以字母开头,长度在5-18之间,只能包含字母、数字和下划线、@']"))
)
account_input.clear()
account_input.send_keys("admin@test")
STEP(3, '填写用户名称')
account_name_input = WebDriverWait(wd, 10).until(
EC.presence_of_element_located((By.XPATH, "//div[@class='el-input el-input--suffix']//input[@placeholder='请输入用户名称']"))
)
account_name_input.clear()
account_name_input.send_keys("测试用户")
STEP(4, '填写新密码')
passwd_input = WebDriverWait(wd, 10).until(
EC.presence_of_element_located(
(By.XPATH, "//input[@placeholder='必须包含字母(不区分大小写)、数字和特殊字符,至少8个字符']"))
)
passwd_input.clear()
passwd_input.send_keys("ub@123456")
STEP(5, '填写确认密码')
sepasswd_input = WebDriverWait(wd, 10).until(
EC.presence_of_element_located(
(By.XPATH, "//input[@placeholder='请确认密码']"))
)
sepasswd_input.clear()
sepasswd_input.send_keys("ub@123456")
STEP(6, '点击确认')
commit = WebDriverWait(wd, 10).until(
EC.element_to_be_clickable((By.XPATH, "//div[@aria-label='新增']//span[contains(text(),'确 定')]"))
)
commit.click()
STEP(7, '验证是否正常提示')
get_menu = WebDriverWait(wd, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '.el-message__content'))
)
get_menu1 = get_menu.text
CHECK_POINT('检查是否出现错误提示弹窗', get_menu1 == "登录名已存在,请重新输入")
sleep(2)
from lib.login import *
def suite_setup():
wd = GSTORE['wd']
open_browser()
# def suite_teardown():
# INFO('进行清除操作')
# def suite_setup():
# wd = GSTORE['wd']
# wd.quit()
#
\ No newline at end of file
def suite_teardown():
INFO('进行清除操作')
wd = GSTORE['wd']
wd.quit()
from lib.login import *
# def suite_setup():
# INFO('初始化浏览器')
# open_browser()
# # run_login_tests()
#
#
# def suite_teardown():
# INFO('进行清除操作')
# wd = GSTORE['wd']
# wd.quit()
class user_login_001:
name = 'login_test_01'
def teststeps(self):
STEP(1, '打开网页')
open_browser()
wd = GSTORE['wd']
STEP(2, '用户登录')
user_login("admin@pgy1", "ub@123456", "csba")
STEP(1, '用户登录')
user_login("admin@pgy", "ub@123456", "csba")
# run_login_tests()
STEP(3, '验证是否登录成功')
STEP(2, '验证是否登录成功')
get_menu = WebDriverWait(wd, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '.el-message__content'))
)
......@@ -35,27 +20,27 @@ class user_login_001:
wd.quit()
class user_login_002:
name = 'login_test_02'
def teststeps(self):
STEP(1, '打开网页')
wd = GSTORE['wd']
STEP(2, '用户登录')
user_login("admin@pgy", "ub@123456", "csba")
# run_login_tests()
STEP(3, '验证是否登录成功')
get_menu = WebDriverWait(wd, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, '.el-message__content'))
)
get_menu1 = get_menu.text
CHECK_POINT('检查是否出现弹窗', get_menu1 == "操作成功!")
STEP(4, '清除数据')
def suite_teardown():
INFO('进行清除操作')
wd.quit()
# class user_login_002:
# name = 'login_test_02'
#
# def teststeps(self):
# STEP(1, '打开网页')
#
# wd = GSTORE['wd']
#
# STEP(2, '用户登录')
# user_login("admin@pgy", "ub@123456", "csba")
# # run_login_tests()
#
# STEP(3, '验证是否登录成功')
# get_menu = WebDriverWait(wd, 10).until(
# EC.presence_of_element_located((By.CSS_SELECTOR, '.el-message__content'))
# )
# get_menu1 = get_menu.text
# CHECK_POINT('检查是否出现弹窗', get_menu1 == "操作成功!")
#
# STEP(4, '清除数据')
#
# def suite_teardown():
# INFO('进行清除操作')
# wd.quit()
import json
import logging
import csv
import os
import re
import threading
import time
from time import sleep
from datetime import datetime
from zipfile import compressor_names
import paho.mqtt.client as mqtt
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class Mqtt:
def __init__(self, broker_address, port):
"""
初始化 MQTT 客户端
:param broker_address: MQTT 代理地址
:param port: MQTT 代理端口
"""
self.lock = None
self.broker_address = broker_address
self.port = port
self.client = None
self._received_message_lock = threading.Lock()
self.received_message = None
self.message_type = None
def connect(self):
"""
连接到 MQTT 服务器
"""
try:
self.client = mqtt.Client()
self.client.on_connect = self.on_connect # 设置连接回调
self.client.on_message = self.on_message # 设置消息回调
self.client.connect(self.broker_address, self.port) # 连接到代理
self.client.loop_start() # 启动网络循环
except Exception as e:
logging.error(f"连接到MQTT服务器时发生错误: {e}")
raise
def disconnect(self):
"""
断开与 MQTT 服务器的连接
"""
if self.client:
try:
self.client.loop_stop() # 停止网络循环
self.client.disconnect() # 断开连接
logging.info("已断开与MQTT 服务器的连接")
except Exception as e:
logging.error(f"断开与MQTT 服务器的连接时发生错误: {e}")
finally:
self.client = None # 确保资源被完全释放
else:
logging.warning("尝试断开连接时,客户端已不存在")
def on_connect(self, client, userdata, flags, rc):
"""
连接成功或失败的回调函数
:param client: 客户端实例
:param userdata: 用户数据
:param flags: 连接标志
:param rc: 返回码
"""
try:
if rc == 0:
logging.info("连接成功")
else:
logging.error(f"连接失败,返回码: {rc}")
# 根据不同的返回码采取不同的措施
if rc == 1:
logging.error("错误:错误的协议版本")
elif rc == 2:
logging.error("错误:无效的客户端标识符")
elif rc == 3:
logging.error("错误:服务器不可用")
elif rc == 4:
logging.error("错误:错误的用户名或密码")
elif rc == 5:
logging.error("错误:未授权")
else:
logging.error("未知错误")
except Exception as e:
logging.exception(f"在处理连接结果时发生异常: {e}")
def on_message(self, client, userdata, msg):
"""
接收到消息的回调函数
:param client: 客户端实例
:param userdata: 用户数据
:param msg: 消息对象
"""
try:
payload = msg.payload.decode('utf-8', errors='replace') # 处理解码错误
logging.info(f"收到消息: {msg.topic} {payload[:50]}...") # 脱敏日志记录
except UnicodeDecodeError as e:
logging.error(f"解码错误: {e}")
payload = "无法解码的消息"
with self._received_message_lock:
self.received_message = payload # 线程安全地存储接收到的消息
def subscribe(self, topic):
"""
订阅指定的主题
:param topic: 主题名称
"""
if self.client is None:
logging.error("客户端未初始化,无法订阅主题")
raise ValueError("客户端未初始化,无法订阅主题")
# 输入验证
if not isinstance(topic, str) or not topic.strip():
logging.error("无效的主题名称")
raise ValueError("无效的主题名称")
try:
self.client.subscribe(topic)
logging.info(f"已订阅主题: {topic}")
except ConnectionError as ce:
logging.error(f"连接错误,无法订阅主题: {topic}, 错误信息: {str(ce)}")
raise
except TimeoutError as te:
logging.error(f"超时错误,无法订阅主题: {topic}, 错误信息: {str(te)}")
raise
except ValueError as ve:
logging.error(f"值错误,无法订阅主题: {topic}, 错误信息: {str(ve)}")
raise
except Exception as e:
logging.error(f"未知错误,无法订阅主题: {topic}, 错误信息: {str(e)}")
raise
def set_message_type(self, message_type):
"""
设置消息类型
此方法用于设置或更改消息类型属性,允许对象根据需要处理不同类型的消息
参数:
message_type: 要设置的消息类型,可以是任何数据类型,但通常应该是字符串、整数或枚举类型
返回:
"""
self.message_type = message_type
def publish(self, topic, message):
"""
发布消息到指定的主题
:param topic: 主题名称
:param message: 消息内容
"""
if self.client:
try:
# 将消息转换为JSON字符串
if self.message_type == dict and isinstance(message, dict):
message = json.dumps(message)
elif message is None:
message = ""
else:
message = str(message)
except (TypeError, ValueError) as e:
logging.error(f"{datetime.now()} - 消息转换失败: {e} - 调用者: {self.__class__.__name__}.publish")
raise
try:
self.client.publish(topic, message)
logging.info(f"{datetime.now()} - 已发布消息到主题: {topic} - 调用者: {self.__class__.__name__}.publish")
except Exception as e:
logging.error(f"{datetime.now()} - 消息发布失败: {e} - 失败的主题: {topic}, 消息: {message} - 调用者: {self.__class__.__name__}.publish")
raise
def wait_for_message(self, topic, timeout=5):
"""
等待指定主题的消息
:param topic: 主题名称
:param timeout: 超时时间(秒)
:return: 接收到的消息或 None
"""
if self.client is None:
logging.warning("Client is not initialized")
return None
if timeout < 0:
logging.warning("Timeout cannot be negative")
return None
start_time = time.monotonic()
while (time.monotonic() - start_time) < timeout:
try:
with self.lock:
if self.received_message is not None:
return self.received_message
except Exception as e:
logging.error(f"Error accessing received_message: {e}")
try:
time.sleep(0.1)
except Exception as e:
logging.error(f"Error in sleep: {e}")
return None
@staticmethod
def read_config_from_csv(file_path):
"""
从 CSV 文件读取配置
:param file_path: CSV 文件路径
:param allowed_directory: 允许访问的目录
:return: 配置列表
"""
try:
# 验证文件路径
if not os.path.isfile(file_path):
raise FileNotFoundError(f"文件 {file_path} 不存在")
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
return [row for row in reader]
except FileNotFoundError as e:
print(f"错误: {e}")
return []
except PermissionError as e:
print(f"错误: {e}")
return []
except Exception as e:
print(f"未知错误: {e}")
return []
@staticmethod
def wait_for_message(self, topic, timeout=5):
"""
等待指定主题的消息
:param topic: 主题名称
:param timeout: 超时时间(秒)
:return: 接收到的消息或 None
"""
if not isinstance(topic, str) or not re.match(r'^[a-zA-Z0-9_\-]+$', topic):
raise ValueError("Invalid topic format")
if timeout < 0:
return None
try:
if self.client:
start_time = time.time()
while (time.time() - start_time) < timeout:
if self.has_received_message():
return self.received_message
sleep(0.1)
return None
except AttributeError:
return None
return None
def has_received_message(self):
return hasattr(self, 'received_message')
@staticmethod
def read_config_from_csv(file_path):
"""
从 CSV 文件读取配置
:param file_path: CSV 文件路径
:return: 配置列表
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件 {file_path} 不存在")
try:
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
config_list = [row for row in reader]
if not config_list:
raise ValueError("CSV 文件内容为空或格式不正确")
logging.info(f"成功读取文件 {file_path}")
return config_list
except Exception as e:
logging.error(f"读取文件 {file_path} 时发生错误: {e}")
return []
@staticmethod
def build_message(config, current_time,topic):
"""
构建消息内容
:param config: 配置字典
:param current_time: 当前时间
:return: 消息字典
"""
#安卓信息设备上报
if config['action'] == "_updatemaster":
return {
"action": "_updatemaster",
"client_udid": config['client_udid'],
"result": json.dumps({
"data": [{
"item": "environmental",
"pm25": 20,
"co2": 2,
"temp": 20,
"tvoc": 20,
"humi": 20,
"hcho": 20
},
{
"item": "conference",
"power": config['power'],
"exist": config['exist'],
"run": config['run']
}]
})
}
elif config['action'] == "_updatestatus":
return {
"action": "_updatestatus",
"client_udid": config['client_udid'],
"result": json.dumps({
"data": [
{"device_udid": config['device_udid'],"power": config['power'],"online": config['online'],"watt":config['watt'],"run": config['run']
},
]
})
}
elif config['action'] == "_updateaudio":
return {
"action": "_updateaudio",
"client_udid": config['client_udid'],
"result": json.dumps({
"data":[
{
"address": config['address'],
"data_all": {
"volume": 100,"mute": 0
},
"data_channel": [
{
"lineType": config['lineType'],"num": config['num'],
"status": config['status'],"volume": config['volume'],
"mute": config['mute']}
],
"data_meter": [
{
"lineType": 1,"num": 1,"dbvalue": 70}
]
}
]
}
)
}
elif config['action'] == "_updatevideo":
return {
"action": "_updatevideo",
"client_udid": config['client_udid'],
"result": json.dumps({
"data": [{
"address": config['address'],
"data_channel": [{
"lineType": config['lineType'],
"num": config['num'],
"status": config['status']
}
]
}]
})
}
elif config['action'] == "_updatelight":
return {
"action": "_updatelight",
"client_udid": config['client_udid'],
"result": json.dumps({
"data": [{
"address": config['address'],
config['data_type']: [{
"lineType": config['lineType'],
"num": config['num'],
"status": config['status'],
"run": config['run']
}]
}]
})
}
elif config['action'] == "_updatenetwork":
return {
"action": "_updatenetwork",
"client_udid": config['client_udid'],
"result": json.dumps({
"data": [{
"address": config['address'],
"data_channel": [{
config['network_type']: config['network_value'],
"status": config['status'],
"run": config['run']
}
]
}]
})
}
elif config['action'] == "_updatepower":
return {
"action": "_updatepower",
"client_udid": config['client_udid'],
"result": json.dumps({
"data": [{
"address": config['address'],
"data_channel": [{
"innum": config['innum'],
"status": config['status'],
"power": config['power'],
"level": config['level']
}
]
}]
})
}
def send_and_receive_messages(self, topic: str, message: str, num_times: int = 1, timeout: int = 5,
interval: float = 0.2):
"""
发送并接收消息
:param topic: 主题名称
:param message: 消息内容
:param num_times: 发送次数,默认为1
:param timeout: 超时时间(秒),默认为5秒
:param interval: 每次发送之间的间隔时间(秒),默认为0.2秒
"""
if not isinstance(topic, str) or not isinstance(message, str):
raise ValueError("主题和消息必须是字符串类型")
for i in range(num_times):
try:
self.publish(topic, message)
received_message = self.wait_for_message(topic, timeout=timeout)
if received_message:
logging.info("消息接收成功!")
else:
logging.warning("超时时间内未接收到消息。")
sleep(interval)
except (ConnectionError, TimeoutError) as e:
logging.error(f"网络连接或超时错误: {e}")
except ValueError as e:
logging.error(f"值错误: {e}")
except Exception as e:
logging.error(f"未知错误: {e}")
\ No newline at end of file
from selenium import webdriver
from hytest import *
import pandas as pd
import csv
import urllib
import glob
import subprocess
import requests
import json
import hmac
import hashlib
import base64
import time
import logging
from hytest import *
from selenium import webdriver
from datetime import datetime
from urllib.parse import urlencode
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.edge.options import Options
from selenium.common.exceptions import TimeoutException, ElementNotInteractableException
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common import TimeoutException,ElementNotInteractableException
from time import sleep
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
......@@ -56,7 +67,6 @@ def user_login(username, password, captcha):
login_button.click()
sleep(2)
def validate_input(username, password, captcha):
if not isinstance(username, str) or not isinstance(password, str) or not isinstance(captcha, str):
raise ValueError("Invalid input type")
......@@ -79,7 +89,7 @@ def run_login_tests(df):
def main():
# 获取当前脚本的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
csv_file_path = os.path.join(current_dir, '../testdata/登录信息.csv')
csv_file_path = os.path.join(current_dir, '../testdata/01登录模块/登录信息.csv')
try:
if not os.path.exists(csv_file_path):
print(f"File not found: {csv_file_path}")
......@@ -89,15 +99,6 @@ def main():
except Exception as e:
print(f"An error occurred: {e}")
def enter_system():
wd = GSTORE['wd']
INFO('点击系统设置按钮')
enter_sys = WebDriverWait(wd, 3).until(
EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'系统设置')]"))
)
enter_sys.click()
sleep(2)
def enter_system():
wd = GSTORE['wd']
if wd is None:
......@@ -113,7 +114,6 @@ def enter_system():
except (TimeoutException, ElementNotInteractableException) as error:
logging.error(f"Error clicking system settings button: {error}")
def enter_manage():
wd = GSTORE['wd']
if wd is None:
......@@ -140,10 +140,208 @@ def enter_user_manage():
EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'用户管理')]"))
)
enter_user_mag.click()
logging.info('用户管理按钮已点击')
logging.info('打开用户管理页面')
except (TimeoutException, ElementNotInteractableException) as error:
logging.error(f"Error clicking manage button: {error}")
def enter_areagroup_manage():
wd = GSTORE['wd']
if wd is None:
raise ValueError("WebDriver is not initialized")
logging.info('点击区域分组按钮')
try:
enter_user_mag = WebDriverWait(wd, 10).until(
EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'区域分组')]"))
)
enter_user_mag.click()
logging.info('打开区域分组页面')
except (TimeoutException, ElementNotInteractableException) as error:
logging.error(f"Error clicking manage button: {error}")
def dingding_send_message(test_report_url, title, text, mobile):
"""
发送钉钉机器人消息
:param test_report_url: 测试报告链接
:param title: 消息标题
:param text: 消息内容
:param mobile: 需要@的手机号列表
"""
# 记录调用此函数的日志
logging.info("开始构建并发送钉钉机器人消息")
# 钉钉机器人的 Webhook URL 和密钥
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4'
secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6'
# 生成时间戳
timestamp = str(round(time.time() * 1000))
# 生成签名
secret_enc = secret.encode('utf-8')
string_to_sign = f'{timestamp}\n{secret}'
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
# 构建最终的 Webhook URL
params = {
'access_token': webhook_url.split('=')[1],
'timestamp': timestamp,
'sign': sign
}
encoded_params = urllib.parse.urlencode(params)
final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}'
# 记录最终的 Webhook URL
logging.info(f"钉钉机器人Webhook URL: {final_webhook_url}")
# 构建消息体
headers = {'Content-Type': 'application/json'}
message = {
'msgtype': 'link',
'link': {
'title': title,
'messageUrl': test_report_url,
'text': text
},
"at": {
"atMobiles": [mobile],
"isAtAll": False
}
}
try:
# 发送 POST 请求
response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers)
# 检查响应状态码
if response.status_code == 200:
logging.info('消息发送成功!')
logging.info(f'响应内容: {response.text}')
else:
logging.error(f'消息发送失败,状态码: {response.status_code}')
logging.error(f'响应内容: {response.text}')
except requests.exceptions.RequestException as e:
logging.error(f'请求异常: {e}')
def get_latest_report_file(report_dir, base_url):
"""
获取指定目录下最新的HTML报告文件,并返回带有基础URL的完整路径。
:param report_dir: 报告文件所在的目录
:param base_url: 基础URL
:return: 最新的HTML报告文件的完整URL,如果没有找到则返回None
"""
# 记录调用此函数的日志
logging.info("开始调用get_latest_report_file函数获取报告文件")
# 确保报告目录存在
if not os.path.exists(report_dir):
logging.error(f"报告目录 {report_dir} 不存在。")
return None
# 获取指定目录下所有符合模式的HTML报告文件
report_files = glob.glob(os.path.join(report_dir, 'report_*.html'))
# 打印找到的文件列表
logging.debug(f"找到的报告文件: {report_files}")
# 如果没有找到报告文件,记录警告信息并返回None
if not report_files:
logging.warning("在指定目录中没有找到报告文件。")
return None
# 找到最新修改的报告文件
latest_file = max(report_files, key=os.path.getmtime)
# 获取最新报告文件的最后修改时间
last_modified_time = datetime.fromtimestamp(os.path.getmtime(latest_file)).strftime('%Y-%m-%d %H:%M:%S')
# 记录最新报告文件的信息
logging.info(f"最新报告文件: {latest_file}, 最后修改时间: {last_modified_time}")
# 将文件路径转换为相对于基础URL的相对路径
relative_path = os.path.relpath(latest_file, report_dir)
# 生成完整的URL
full_url = f"{base_url}/{relative_path}".replace("\\", "/")
# 返回完整的URL
return full_url
def get_reportfile_send_dingding(report_title, report_url_prefix):
try:
# 获取报告文件所在的目录
report_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..','log')
# 获取基础URL
base_url = report_url_prefix
# 获取最新的报告文件
latest_report = get_latest_report_file(report_dir, base_url)
# 如果找到了最新的报告文件,则发送报告链接到钉钉
if latest_report:
logging.info(f"最新报告文件URL: {latest_report}")
try:
# 记录调用钉钉消息通知函数的日志
logging.info("开始调用钉钉消息通知函数")
# 调用钉钉发送消息接口进行推送测试报告链接
dingding_send_message(latest_report, report_title, f"{report_title}_执行完成", "13169131600")
# 记录钉钉消息通知函数调用成功的日志
logging.info("钉钉消息通知函数调用成功")
except Exception as e:
# 记录钉钉消息通知函数调用失败的日志
logging.error(f"钉钉消息通知函数调用失败: {e}")
else:
# 记录没有找到报告文件的日志
logging.warning("没有找到报告文件以发送。")
except subprocess.CalledProcessError as e:
# 处理子进程调用失败的异常
logging.error(f"命令执行失败,返回码 {e.returncode}: {e.output}")
except OSError as e:
# 处理操作系统相关的异常
logging.error(f"发生操作系统错误: {e}")
finally:
# 无论是否成功,都记录测试结束的日志
logging.info("自动化测试完成。")
def read_csv_data(csv_file_path):
"""
读取CSV文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。
参数:
csv_file_path (str): CSV文件的路径。
if __name__ == "__main__":
main()
返回:
list: 包含字典的列表,每个字典包含测试用例的名称和参数。
"""
# 打开CSV文件,使用只读模式,确保兼容性并处理编码
with open(csv_file_path, mode='r', newline='', encoding='utf-8') as file:
# 创建CSV阅读器
reader = csv.reader(file)
# 读取表头,为后续数据解析做准备
headers = next(reader)
ddt_cases = []
# 遍历CSV文件中的每一行数据
for row in reader:
# 将每一行数据转换为字典,其中包含测试用例的名称和参数
case = {
'name': row[0],
'para': row[1:]
}
# 将转换后的测试用例数据添加到列表中
ddt_cases.append(case)
# 日志记录:CSV文件已读取
INFO("CSV文件已读取")
# 返回包含所有测试用例数据的列表
return ddt_cases
\ No newline at end of file
username,password,captcha
ywys@pgy,ub@1234567,csba
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论