import pandas as pd import csv import urllib import glob import subprocess import requests import json import hmac import hashlib import base64 import time import logging from hytest import * import pandas as pd from selenium import webdriver from datetime import datetime from urllib.parse import urlencode from selenium.webdriver.common.by import By from selenium.webdriver.edge.options import Options from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common import TimeoutException,ElementNotInteractableException from time import sleep logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def open_browser(): INFO('打开浏览器') # wd = webdriver.Chrome() edge_options = Options() edge_options.add_argument('--ignore-certificate-errors') edge_options.add_argument('--disable-blink-features=AutomationControlled') edge_options.add_argument('--allow-insecure-localhost') wd = webdriver.Edge(options=edge_options) GSTORE['wd'] = wd # wd.get('https://rms.ubainsyun.com/#/login') wd.get('https://192.168.5.218:8443/#/login') wd.maximize_window() wd.implicitly_wait(10) def user_login(username, password, captcha): wd = GSTORE['wd'] INFO(f'输入登录账号: {username}') username_input = WebDriverWait(wd, 3).until( EC.presence_of_element_located((By.XPATH, "//input[@placeholder='请输入登录账号']")) ) username_input.clear() username_input.send_keys(username) INFO(f'输入登录密码: {password}') password_input = WebDriverWait(wd, 3).until( EC.presence_of_element_located((By.XPATH, "//input[@placeholder='请输入登录密码']")) ) password_input.clear() password_input.send_keys(password) INFO(f'输入验证码:{captcha}') captcha_input = WebDriverWait(wd, 3).until( EC.presence_of_element_located((By.XPATH, "//input[@placeholder='请输入验证码(区分大小写)']")) ) captcha_input.clear() captcha_input.send_keys(captcha) INFO('点击登录按钮') login_button = WebDriverWait(wd, 3).until( EC.element_to_be_clickable((By.XPATH, "//div[@class='loginButton']")) ) login_button.click() sleep(2) def validate_input(username, password, captcha): if not isinstance(username, str) or not isinstance(password, str) or not isinstance(captcha, str): raise ValueError("Invalid input type") if len(username) < 1 or len(password) < 1 or len(captcha) < 1: raise ValueError("Input cannot be empty") def run_login_tests(df): # 遍历每一行 for index, row in df.iterrows(): username = row.get('username', None) password = row.get('password', None) captcha = row.get('captcha', None) if username and password and captcha: try: validate_input(username, password, captcha) user_login(username, password, captcha) except ValueError as e: print(f"Invalid input at row {index}: {e}") def main(): # 获取当前脚本的目录 current_dir = os.path.dirname(os.path.abspath(__file__)) csv_file_path = os.path.join(current_dir, '../testdata/01登录模块/登录信息.csv') try: if not os.path.exists(csv_file_path): print(f"File not found: {csv_file_path}") return df = pd.read_csv(csv_file_path, encoding='utf-8') run_login_tests(df) except Exception as e: print(f"An error occurred: {e}") def enter_system(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击系统设置按钮') try: enter_sys = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'系统设置')]")) ) enter_sys.click() logging.info('系统设置按钮已点击') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking system settings button: {error}") def enter_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击管理按钮') try: enter_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//div[@class='el-submenu__title']//span[contains(text(),'管理')]")) ) enter_mag.click() logging.info('管理按钮已点击') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") def enter_user_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击用户管理按钮') try: enter_user_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'用户管理')]")) ) enter_user_mag.click() logging.info('打开用户管理页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") def enter_areagroup_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击区域分组按钮') try: enter_areagroup_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'区域分组')]")) ) enter_areagroup_mag.click() logging.info('打开区域分组页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") def enter_areatype_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击区域类型按钮') try: enter_areatype_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'区域类型')]")) ) enter_areatype_mag.click() logging.info('打开区域类型页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") def enter_areafuntion_manage(): wd = GSTORE['wd'] if wd is None: raise ValueError("WebDriver is not initialized") logging.info('点击区域功能按钮') try: enter_areafuntion_mag = WebDriverWait(wd, 10).until( EC.element_to_be_clickable((By.XPATH, "//li[contains(text(),'区域功能')]")) ) enter_areafuntion_mag.click() logging.info('打开区域功能页面') except (TimeoutException, ElementNotInteractableException) as error: logging.error(f"Error clicking manage button: {error}") def dingding_send_message(test_report_url, title, text, mobile): """ 发送钉钉机器人消息 :param test_report_url: 测试报告链接 :param title: 消息标题 :param text: 消息内容 :param mobile: 需要@的手机号列表 """ # 记录调用此函数的日志 logging.info("开始构建并发送钉钉机器人消息") # 钉钉机器人的 Webhook URL 和密钥 webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=7fbf40798cad98b1b5db55ff844ba376b1816e80c5777e6f47ae1d9165dacbb4' secret = 'SEC610498ed6261ae2df1d071d0880aaa70abf5e67efe47f75a809c1f2314e0dbd6' # 生成时间戳 timestamp = str(round(time.time() * 1000)) # 生成签名 secret_enc = secret.encode('utf-8') string_to_sign = f'{timestamp}\n{secret}' string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 构建最终的 Webhook URL params = { 'access_token': webhook_url.split('=')[1], 'timestamp': timestamp, 'sign': sign } encoded_params = urllib.parse.urlencode(params) final_webhook_url = f'https://oapi.dingtalk.com/robot/send?{encoded_params}' # 记录最终的 Webhook URL logging.info(f"钉钉机器人Webhook URL: {final_webhook_url}") # 构建消息体 headers = {'Content-Type': 'application/json'} message = { 'msgtype': 'link', 'link': { 'title': title, 'messageUrl': test_report_url, 'text': text }, "at": { "atMobiles": [mobile], "isAtAll": False } } try: # 发送 POST 请求 response = requests.post(final_webhook_url, data=json.dumps(message), headers=headers) # 检查响应状态码 if response.status_code == 200: logging.info('消息发送成功!') logging.info(f'响应内容: {response.text}') else: logging.error(f'消息发送失败,状态码: {response.status_code}') logging.error(f'响应内容: {response.text}') except requests.exceptions.RequestException as e: logging.error(f'请求异常: {e}') def get_latest_report_file(report_dir, base_url): """ 获取指定目录下最新的HTML报告文件,并返回带有基础URL的完整路径。 :param report_dir: 报告文件所在的目录 :param base_url: 基础URL :return: 最新的HTML报告文件的完整URL,如果没有找到则返回None """ # 记录调用此函数的日志 logging.info("开始调用get_latest_report_file函数获取报告文件") # 确保报告目录存在 if not os.path.exists(report_dir): logging.error(f"报告目录 {report_dir} 不存在。") return None # 获取指定目录下所有符合模式的HTML报告文件 report_files = glob.glob(os.path.join(report_dir, 'report_*.html')) # 打印找到的文件列表 logging.debug(f"找到的报告文件: {report_files}") # 如果没有找到报告文件,记录警告信息并返回None if not report_files: logging.warning("在指定目录中没有找到报告文件。") return None # 找到最新修改的报告文件 latest_file = max(report_files, key=os.path.getmtime) # 获取最新报告文件的最后修改时间 last_modified_time = datetime.fromtimestamp(os.path.getmtime(latest_file)).strftime('%Y-%m-%d %H:%M:%S') # 记录最新报告文件的信息 logging.info(f"最新文件: {latest_file}, 最后修改时间: {last_modified_time}") # 将文件路径转换为相对于基础URL的相对路径 relative_path = os.path.relpath(latest_file, report_dir) # 生成完整的URL full_url = f"{base_url}/{relative_path}".replace("\\", "/") # 返回完整的URL return full_url def get_reportfile_send_dingding(report_title, report_url_prefix): try: # 获取文件所在的目录 report_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..','log') # 获取基础URL base_url = report_url_prefix # 获取最新的报告文件 latest_report = get_latest_report_file(report_dir, base_url) # 如果找到了最新的报告文件,则发送报告链接到钉钉 if latest_report: logging.info(f"最新报告文件URL: {latest_report}") try: # 记录调用钉钉消息通知函数的日志 logging.info("开始调用钉钉消息通知函数") # 调用钉钉发送消息接口进行推送测试报告链接 dingding_send_message(latest_report, report_title, f"{report_title}_执行完成", "13169131600") # 记录钉钉消息通知函数调用成功的日志 logging.info("钉钉消息通知函数调用成功") except Exception as e: # 记录钉钉消息通知函数调用失败的日志 logging.error(f"钉钉消息通知函数调用失败: {e}") else: # 记录没有找到报告文件的日志 logging.warning("没有找到报告文件以发送。") except subprocess.CalledProcessError as e: # 处理子进程调用失败的异常 logging.error(f"命令执行失败,返回码 {e.returncode}: {e.output}") except OSError as e: # 处理操作系统相关的异常 logging.error(f"发生操作系统错误: {e}") finally: # 无论是否成功,都记录测试结束的日志 logging.info("自动化测试完成。") def read_csv_data(csv_file_path): """ 读取CSV文件中的数据,并将其转换为一个包含字典的列表,每个字典代表一行测试用例数据。 参数: csv_file_path (str): CSV文件的路径。 返回: list: 包含字典的列表,每个字典包含测试用例的名称和参数。 """ # 打开CSV文件,使用只读模式,确保兼容性并处理编码 with open(csv_file_path, mode='r', newline='', encoding='utf-8') as file: # 创建CSV阅读器 reader = csv.reader(file) # 读取表头,为后续数据解析做准备 headers = next(reader) ddt_cases = [] # 遍历CSV文件中的每一行数据 for row in reader: # 将每一行数据转换为字典,其中包含测试用例的名称和参数 case = { 'name': row[0], 'para': row[1:] } # 将转换后的测试用例数据添加到列表中 ddt_cases.append(case) # 日志记录:CSV文件已读取 INFO("CSV文件已读取") # 返回包含所有测试用例数据的列表 return ddt_cases