渗透测试
概述 系统安全测试,通过模拟攻击来识别、利用和记录应用程序、网络和基础设施中的漏洞。
何时使用
- 预生产安全验证
- 年度安全评估
- 合规要求(PCI-DSS, ISO 27001)
- 事件后安全审查
- 第三方安全审计
- 红队演习
实施示例
- 自动化渗透测试框架
# pentest_framework.py
import requests
import socket
import subprocess
import json
from typing import List, Dict
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class Finding:
severity: str
category: str
target: str
vulnerability: str
evidence: str
remediation: str
cvss_score: float
class PenetrationTester:
def __init__(self, target: str):
self.target = target
self.findings: List[Finding] = []
def test_sql_injection(self, url: str) -> None:
"""测试SQL注入漏洞"""
print(f"在 {url} 上测试SQL注入")
payloads = [
"' OR '1'='1",
"'; DROP TABLE users--",
"' UNION SELECT NULL, NULL, NULL--",
"1' AND 1=1--",
"admin'--"
]
for payload in payloads:
try:
response = requests.get(
url,
params={'id': payload},
timeout=5
)
# 检查SQL错误
sql_errors = [
'mysql_fetch_array',
'SQLServer JDBC Driver',
'ORA-01756',
'PostgreSQL',
'sqlite3.OperationalError'
]
for error in sql_errors:
if error in response.text:
self.findings.append(Finding(
severity='critical',
category='SQL Injection',
target=url,
vulnerability=f'SQL注入检测到payload: {payload}',
evidence=f'错误信息: {error}',
remediation='使用参数化查询或预编译语句',
cvss_score=9.8
))
break
except Exception as e:
print(f"在测试 {url} 时出错: {e}")
def test_xss(self, url: str) -> None:
"""测试跨站脚本攻击漏洞"""
print(f"在 {url} 上测试XSS")
payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"javascript:alert('XSS')",
"<svg onload=alert('XSS')>",
"'-alert('XSS')-'"
]
for payload in payloads:
try:
response = requests.get(
url,
params={'q': payload},
timeout=5
)
if payload in response.text:
self.findings.append(Finding(
severity='high',
category='Cross-Site Scripting',
target=url,
vulnerability=f'反射型XSS检测到payload: {payload}',
evidence='响应中未对payload进行清理即反射',
remediation='实施输出编码和内容安全策略',
cvss_score=7.3
))
break
except Exception as e:
print(f"在测试 {url} 时出错: {e}")
def test_authentication(self, login_url: str) -> None:
"""测试认证机制"""
print(f"在 {login_url} 上测试认证")
# 测试默认凭证
default_creds = [
('admin', 'admin'),
('admin', 'password'),
('root', 'root'),
('administrator', 'administrator')
]
for username, password in default_creds:
try:
response = requests.post(
login_url,
data={'username': username, 'password': password},
timeout=5
)
if response.status_code == 200 and 'dashboard' in response.text.lower():
self.findings.append(Finding(
severity='critical',
category='Weak Authentication',
target=login_url,
vulnerability=f'默认凭证被接受: {username}/{password}',
evidence='使用默认凭证成功认证',
remediation='强制执行强密码策略并移除默认账户',
cvss_score=9.1
))
except Exception as e:
print(f"在测试凭证时出错: {e}")
def test_security_headers(self, url: str) -> None:
"""测试缺失的安全头"""
print(f"在 {url} 上测试安全头")
try:
response = requests.get(url, timeout=5)
关键头 = {
'Strict-Transport-Security': '未实施HSTS',
'X-Frame-Options': '缺少点击劫持保护',
'X-Content-Type-Options': '缺少MIME嗅探防护',
'Content-Security-Policy': '未实施CSP',
'X-XSS-Protection': '缺少XSS保护头'
}
for header, description in 关键头.items():
if header not in response.headers:
self.findings.append(Finding(
severity='medium',
category='Missing Security Header',
target=url,
vulnerability=f'缺少头: {header}',
evidence=description,
remediation=f'在所有响应中添加 {header} 头',
cvss_score=5.3
))
except Exception as e:
print(f"在测试头时出错: {e}")
def test_directory_traversal(self, url: str) -> None:
"""测试目录遍历漏洞"""
print(f"在 {url} 上测试目录遍历")
payloads = [
'../../../etc/passwd',
'..\\..\\..\\windows\\system32\\drivers\\etc\\hosts',
'....//....//....//etc/passwd',
'%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd'
]
for payload in payloads:
try:
response = requests.get(
url,
params={'file': payload},
timeout=5
)
# 检查Unix passwd文件
if 'root:' in response.text or 'daemon:' in response.text:
self.findings.append(Finding(
severity='critical',
category='Directory Traversal',
target=url,
vulnerability=f'路径遍历检测到payload: {payload}',
evidence='系统文件内容暴露',
remediation='验证和清理文件路径,使用白名单方法',
cvss_score=8.6
))
break
except Exception as e:
print(f"在测试遍历时出错: {e}")
def test_ssl_tls(self) -> None:
"""测试SSL/TLS配置"""
print(f"在 {self.target} 上测试SSL/TLS")
try:
result = subprocess.run(
['testssl.sh', '--jsonfile', 'ssl-results.json', self.target],
capture_output=True,
text=True,
timeout=60
)
# 解析SSL测试结果
# 这是一个简化的检查
弱协议 = ['SSLv2', 'SSLv3', 'TLSv1.0']
for protocol in 弱协议:
self.findings.append(Finding(
severity='high',
category='Weak SSL/TLS',
target=self.target,
vulnerability=f'启用了弱协议: {protocol}',
evidence='过时的SSL/TLS协议支持',
remediation='禁用弱协议,强制执行TLS 1.2+',
cvss_score=7.5
))
except Exception as e:
print(f"SSL测试错误: {e}")
def run_full_pentest(self, target_urls: List[str]) -> Dict:
"""执行全面的渗透测试"""
for url in target_urls:
self.test_sql_injection(url)
self.test_xss(url)
self.test_security_headers(url)
self.test_directory_traversal(url)
self.test_ssl_tls()
return self.generate_report()
def generate_report(self) -> Dict:
"""生成全面的渗透测试报告"""
summary = {
'critical': 0,
'high': 0,
'medium': 0,
'low': 0
}
for finding in self.findings:
if finding.severity in summary:
summary[finding.severity] += 1
report = {
'timestamp': datetime.now().isoformat(),
'target': self.target,
'total_findings': len(self.findings),
'summary': summary,
'findings': [asdict(f) for f in self.findings],
'risk_score': self._calculate_risk_score(),
'recommendations': self._generate_recommendations()
}
with open('pentest-report.json', 'w') as f:
json.dump(report, f, indent=2)
return report
def _calculate_risk_score(self) -> float:
"""计算总体风险评分"""
if not self.findings:
return 0.0
总cvss = sum(f.cvss_score for f in self.findings)
return round(总cvss / len(self.findings), 2)
def _generate_recommendations(self) -> List[str]:
"""生成优先级建议"""
recommendations = []
categories = {}
for finding in self.findings:
if finding.category not in categories:
categories[finding.category] = []
categories[finding.category].append(finding)
for category, findings in sorted(
categories.items(),
key=lambda x: len(x[1]),
reverse=True
):
recommendations.append(
f"解决 {len(findings)} {category} 漏洞"
)
return recommendations[:5]
# 使用方法
if __name__ == '__main__':
tester = PenetrationTester('https://example.com')
target_urls = [
'https://example.com/api/users',
'https://example.com/search',
'https://example.com/download'
]
report = tester.run_full_pentest(target_urls)
print("
=== 渗透测试报告 ===")
print(f"目标: {report['target']}")
print(f"总发现: {report['total_findings']}")
print(f"风险评分: {report['risk_score']}")
print(f"
按严重程度排列的发现:")
print(f" 严重: {report['summary']['critical']}")
print(f" 高: {report['summary']['high']}")
print(f" 中: {report['summary']['medium']}")
print(f" 低: {report['summary']['low']}")
2. Burp套件自动化脚本
// burp-automation.js - Node.js Burp套件集成
const axios = require('axios');
const fs = require('fs').promises;
class BurpSuiteAutomation {
constructor(burpApiUrl = 'http://127.0.0.1:1337') {
this.apiUrl = burpApiUrl;
this.taskId = null;
}
async startScan(targetUrl) {
console.log(`开始对 ${targetUrl} 进行Burp扫描`);
const scanConfig = {
urls: [targetUrl],
scan_configurations: [
{
name: 'Crawl and Audit - Lightweight',
type: 'NamedConfiguration'
}
]
};
try {
const response = await axios.post(
`${this.apiUrl}/v0.1/scan`,
scanConfig
);
this.taskId = response.data.task_id;
console.log(`扫描开始,任务ID: ${this.taskId}`);
return this.taskId;
} catch (error) {
console.error('扫描启动失败:', error.message);
throw error;
}
}
async getScanStatus() {
if (!this.taskId) {
throw new Error('没有活跃的扫描任务');
}
const response = await axios.get(
`${this.apiUrl}/v0.1/scan/${this.taskId}`
);
return {
taskId: this.taskId,
status: response.data.scan_status,
metrics: response.data.scan_metrics
};
}
async waitForCompletion() {
console.log('等待扫描完成...');
while (true) {
const status = await this.getScanStatus();
console.log(`进度: ${status.metrics.crawl_requests_made} 请求`);
if (status.status === 'succeeded') {
console.log('扫描成功完成');
break;
} else if (status.status === 'failed') {
throw new Error('扫描失败');
}
await new Promise(resolve => setTimeout(resolve, 10000));
}
}
async getIssues() {
if (!this.taskId) {
throw new Error('没有活跃的扫描任务');
}
const response = await axios.get(
`${this.apiUrl}/v0.1/scan/${this.taskId}/issues`
);
return response.data.issues;
}
async generateReport() {
const issues = await this.getIssues();
const report = {
summary: {
high: 0,
medium: 0,
low: 0,
info: 0
},
issues: []
};
for (const issue of issues) {
report.summary[issue.severity.toLowerCase()]++;
report.issues.push({
severity: issue.severity,
confidence: issue.confidence,
name: issue.name,
path: issue.path,
description: issue.description,
remediation: issue.remediation
});
}
await fs.writeFile(
'burp-report.json',
JSON.stringify(report, null, 2)
);
return report;
}
}
// 使用方法
async function runBurpScan() {
const burp = new BurpSuiteAutomation();
await burp.startScan('https://example.com');
await burp.waitForCompletion();
const report = await burp.generateReport();
console.log('
=== Burp套件扫描结果 ===');
console.log(`高: ${report.summary.high}`);
console.log(`中: ${report.summary.medium}`);
console.log(`低: ${report.summary.low}`);
}
runBurpScan().catch(console.error);
最佳实践
✅ 执行
- 获取书面授权
- 定义清晰范围
- 使用受控环境
- 记录所有发现
- 遵循负责任的披露
- 提供补救指导
- 在修补后验证修复
- 维护保管链
❌ 不要
- 未经批准测试生产环境
- 导致服务中断
- 外泄敏感数据
- 公开分享发现
- 超出授权范围
- 使用破坏性负载
渗透测试阶段
- 侦察: 信息收集
- 扫描: 漏洞识别
- 利用: 概念验证
- 后利用: 评估影响
- 报告: 记录发现
- 补救: 协助修复
常用工具
- Burp套件: Web应用测试
- OWASP ZAP: 免费安全扫描器
- Metasploit: 利用框架
- Nmap: 网络扫描
- SQLMap: SQL注入测试
- testssl.sh: SSL/TLS测试
- Nikto: Web服务器扫描器