penetration-testing penetration-testing

渗透测试是一种系统化的安全测试方法,通过模拟攻击来识别、利用和记录应用程序、网络和基础设施中的漏洞,用于评估应用安全态势和识别可被利用的漏洞。

渗透测试 0 次安装 0 次浏览 更新于 3/4/2026

渗透测试

概述 系统安全测试,通过模拟攻击来识别、利用和记录应用程序、网络和基础设施中的漏洞。

何时使用

  • 预生产安全验证
  • 年度安全评估
  • 合规要求(PCI-DSS, ISO 27001)
  • 事件后安全审查
  • 第三方安全审计
  • 红队演习

实施示例

  1. 自动化渗透测试框架
# pentest_framework.py
import requests
import socket
import subprocess
import json
from typing import List, Dict
from dataclasses import dataclass, asdict
from datetime import datetime

@dataclass
class Finding:
    severity: str
    category: str
    target: str
    vulnerability: str
    evidence: str
    remediation: str
    cvss_score: float

class PenetrationTester:
    def __init__(self, target: str):
        self.target = target
        self.findings: List[Finding] = []

    def test_sql_injection(self, url: str) -> None:
        """测试SQL注入漏洞"""
        print(f"在 {url} 上测试SQL注入")

        payloads = [
            "' OR '1'='1",
            "'; DROP TABLE users--",
            "' UNION SELECT NULL, NULL, NULL--",
            "1' AND 1=1--",
            "admin'--"
        ]

        for payload in payloads:
            try:
                response = requests.get(
                    url,
                    params={'id': payload},
                    timeout=5
                )

                # 检查SQL错误
                sql_errors = [
                    'mysql_fetch_array',
                    'SQLServer JDBC Driver',
                    'ORA-01756',
                    'PostgreSQL',
                    'sqlite3.OperationalError'
                ]

                for error in sql_errors:
                    if error in response.text:
                        self.findings.append(Finding(
                            severity='critical',
                            category='SQL Injection',
                            target=url,
                            vulnerability=f'SQL注入检测到payload: {payload}',
                            evidence=f'错误信息: {error}',
                            remediation='使用参数化查询或预编译语句',
                            cvss_score=9.8
                        ))
                        break

            except Exception as e:
                print(f"在测试 {url} 时出错: {e}")

    def test_xss(self, url: str) -> None:
        """测试跨站脚本攻击漏洞"""
        print(f"在 {url} 上测试XSS")

        payloads = [
            "<script>alert('XSS')</script>",
            "<img src=x onerror=alert('XSS')>",
            "javascript:alert('XSS')",
            "<svg onload=alert('XSS')>",
            "'-alert('XSS')-'"
        ]

        for payload in payloads:
            try:
                response = requests.get(
                    url,
                    params={'q': payload},
                    timeout=5
                )

                if payload in response.text:
                    self.findings.append(Finding(
                        severity='high',
                        category='Cross-Site Scripting',
                        target=url,
                        vulnerability=f'反射型XSS检测到payload: {payload}',
                        evidence='响应中未对payload进行清理即反射',
                        remediation='实施输出编码和内容安全策略',
                        cvss_score=7.3
                    ))
                    break

            except Exception as e:
                print(f"在测试 {url} 时出错: {e}")

    def test_authentication(self, login_url: str) -> None:
        """测试认证机制"""
        print(f"在 {login_url} 上测试认证")

        # 测试默认凭证
        default_creds = [
            ('admin', 'admin'),
            ('admin', 'password'),
            ('root', 'root'),
            ('administrator', 'administrator')
        ]

        for username, password in default_creds:
            try:
                response = requests.post(
                    login_url,
                    data={'username': username, 'password': password},
                    timeout=5
                )

                if response.status_code == 200 and 'dashboard' in response.text.lower():
                    self.findings.append(Finding(
                        severity='critical',
                        category='Weak Authentication',
                        target=login_url,
                        vulnerability=f'默认凭证被接受: {username}/{password}',
                        evidence='使用默认凭证成功认证',
                        remediation='强制执行强密码策略并移除默认账户',
                        cvss_score=9.1
                    ))

            except Exception as e:
                print(f"在测试凭证时出错: {e}")

    def test_security_headers(self, url: str) -> None:
        """测试缺失的安全头"""
        print(f"在 {url} 上测试安全头")

        try:
            response = requests.get(url, timeout=5)

            关键头 = {
                'Strict-Transport-Security': '未实施HSTS',
                'X-Frame-Options': '缺少点击劫持保护',
                'X-Content-Type-Options': '缺少MIME嗅探防护',
                'Content-Security-Policy': '未实施CSP',
                'X-XSS-Protection': '缺少XSS保护头'
            }

            for header, description in 关键头.items():
                if header not in response.headers:
                    self.findings.append(Finding(
                        severity='medium',
                        category='Missing Security Header',
                        target=url,
                        vulnerability=f'缺少头: {header}',
                        evidence=description,
                        remediation=f'在所有响应中添加 {header} 头',
                        cvss_score=5.3
                    ))

        except Exception as e:
            print(f"在测试头时出错: {e}")

    def test_directory_traversal(self, url: str) -> None:
        """测试目录遍历漏洞"""
        print(f"在 {url} 上测试目录遍历")

        payloads = [
            '../../../etc/passwd',
            '..\\..\\..\\windows\\system32\\drivers\\etc\\hosts',
            '....//....//....//etc/passwd',
            '%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd'
        ]

        for payload in payloads:
            try:
                response = requests.get(
                    url,
                    params={'file': payload},
                    timeout=5
                )

                # 检查Unix passwd文件
                if 'root:' in response.text or 'daemon:' in response.text:
                    self.findings.append(Finding(
                        severity='critical',
                        category='Directory Traversal',
                        target=url,
                        vulnerability=f'路径遍历检测到payload: {payload}',
                        evidence='系统文件内容暴露',
                        remediation='验证和清理文件路径,使用白名单方法',
                        cvss_score=8.6
                    ))
                    break

            except Exception as e:
                print(f"在测试遍历时出错: {e}")

    def test_ssl_tls(self) -> None:
        """测试SSL/TLS配置"""
        print(f"在 {self.target} 上测试SSL/TLS")

        try:
            result = subprocess.run(
                ['testssl.sh', '--jsonfile', 'ssl-results.json', self.target],
                capture_output=True,
                text=True,
                timeout=60
            )

            # 解析SSL测试结果
            # 这是一个简化的检查
            弱协议 = ['SSLv2', 'SSLv3', 'TLSv1.0']
            for protocol in 弱协议:
                self.findings.append(Finding(
                    severity='high',
                    category='Weak SSL/TLS',
                    target=self.target,
                    vulnerability=f'启用了弱协议: {protocol}',
                    evidence='过时的SSL/TLS协议支持',
                    remediation='禁用弱协议,强制执行TLS 1.2+',
                    cvss_score=7.5
                ))

        except Exception as e:
            print(f"SSL测试错误: {e}")

    def run_full_pentest(self, target_urls: List[str]) -> Dict:
        """执行全面的渗透测试"""
        for url in target_urls:
            self.test_sql_injection(url)
            self.test_xss(url)
            self.test_security_headers(url)
            self.test_directory_traversal(url)

        self.test_ssl_tls()

        return self.generate_report()

    def generate_report(self) -> Dict:
        """生成全面的渗透测试报告"""
        summary = {
            'critical': 0,
            'high': 0,
            'medium': 0,
            'low': 0
        }

        for finding in self.findings:
            if finding.severity in summary:
                summary[finding.severity] += 1

        report = {
            'timestamp': datetime.now().isoformat(),
            'target': self.target,
            'total_findings': len(self.findings),
            'summary': summary,
            'findings': [asdict(f) for f in self.findings],
            'risk_score': self._calculate_risk_score(),
            'recommendations': self._generate_recommendations()
        }

        with open('pentest-report.json', 'w') as f:
            json.dump(report, f, indent=2)

        return report

    def _calculate_risk_score(self) -> float:
        """计算总体风险评分"""
        if not self.findings:
            return 0.0

        总cvss = sum(f.cvss_score for f in self.findings)
        return round(总cvss / len(self.findings), 2)

    def _generate_recommendations(self) -> List[str]:
        """生成优先级建议"""
        recommendations = []

        categories = {}
        for finding in self.findings:
            if finding.category not in categories:
                categories[finding.category] = []
            categories[finding.category].append(finding)

        for category, findings in sorted(
            categories.items(),
            key=lambda x: len(x[1]),
            reverse=True
        ):
            recommendations.append(
                f"解决 {len(findings)} {category} 漏洞"
            )

        return recommendations[:5]

# 使用方法
if __name__ == '__main__':
    tester = PenetrationTester('https://example.com')

    target_urls = [
        'https://example.com/api/users',
        'https://example.com/search',
        'https://example.com/download'
    ]

    report = tester.run_full_pentest(target_urls)

    print("
=== 渗透测试报告 ===")
    print(f"目标: {report['target']}")
    print(f"总发现: {report['total_findings']}")
    print(f"风险评分: {report['risk_score']}")
    print(f"
按严重程度排列的发现:")
    print(f"  严重: {report['summary']['critical']}")
    print(f"  高: {report['summary']['high']}")
    print(f"  中: {report['summary']['medium']}")
    print(f"  低: {report['summary']['low']}")

2. Burp套件自动化脚本

// burp-automation.js - Node.js Burp套件集成
const axios = require('axios');
const fs = require('fs').promises;

class BurpSuiteAutomation {
  constructor(burpApiUrl = 'http://127.0.0.1:1337') {
    this.apiUrl = burpApiUrl;
    this.taskId = null;
  }

  async startScan(targetUrl) {
    console.log(`开始对 ${targetUrl} 进行Burp扫描`);

    const scanConfig = {
      urls: [targetUrl],
      scan_configurations: [
        {
          name: 'Crawl and Audit - Lightweight',
          type: 'NamedConfiguration'
        }
      ]
    };

    try {
      const response = await axios.post(
        `${this.apiUrl}/v0.1/scan`,
        scanConfig
      );

      this.taskId = response.data.task_id;
      console.log(`扫描开始,任务ID: ${this.taskId}`);

      return this.taskId;
    } catch (error) {
      console.error('扫描启动失败:', error.message);
      throw error;
    }
  }

  async getScanStatus() {
    if (!this.taskId) {
      throw new Error('没有活跃的扫描任务');
    }

    const response = await axios.get(
      `${this.apiUrl}/v0.1/scan/${this.taskId}`
    );

    return {
      taskId: this.taskId,
      status: response.data.scan_status,
      metrics: response.data.scan_metrics
    };
  }

  async waitForCompletion() {
    console.log('等待扫描完成...');

    while (true) {
      const status = await this.getScanStatus();

      console.log(`进度: ${status.metrics.crawl_requests_made} 请求`);

      if (status.status === 'succeeded') {
        console.log('扫描成功完成');
        break;
      } else if (status.status === 'failed') {
        throw new Error('扫描失败');
      }

      await new Promise(resolve => setTimeout(resolve, 10000));
    }
  }

  async getIssues() {
    if (!this.taskId) {
      throw new Error('没有活跃的扫描任务');
    }

    const response = await axios.get(
      `${this.apiUrl}/v0.1/scan/${this.taskId}/issues`
    );

    return response.data.issues;
  }

  async generateReport() {
    const issues = await this.getIssues();

    const report = {
      summary: {
        high: 0,
        medium: 0,
        low: 0,
        info: 0
      },
      issues: []
    };

    for (const issue of issues) {
      report.summary[issue.severity.toLowerCase()]++;

      report.issues.push({
        severity: issue.severity,
        confidence: issue.confidence,
        name: issue.name,
        path: issue.path,
        description: issue.description,
        remediation: issue.remediation
      });
    }

    await fs.writeFile(
      'burp-report.json',
      JSON.stringify(report, null, 2)
    );

    return report;
  }
}

// 使用方法
async function runBurpScan() {
  const burp = new BurpSuiteAutomation();

  await burp.startScan('https://example.com');
  await burp.waitForCompletion();

  const report = await burp.generateReport();

  console.log('
=== Burp套件扫描结果 ===');
  console.log(`高: ${report.summary.high}`);
  console.log(`中: ${report.summary.medium}`);
  console.log(`低: ${report.summary.low}`);
}

runBurpScan().catch(console.error);

最佳实践

✅ 执行

  • 获取书面授权
  • 定义清晰范围
  • 使用受控环境
  • 记录所有发现
  • 遵循负责任的披露
  • 提供补救指导
  • 在修补后验证修复
  • 维护保管链

❌ 不要

  • 未经批准测试生产环境
  • 导致服务中断
  • 外泄敏感数据
  • 公开分享发现
  • 超出授权范围
  • 使用破坏性负载

渗透测试阶段

  1. 侦察: 信息收集
  2. 扫描: 漏洞识别
  3. 利用: 概念验证
  4. 后利用: 评估影响
  5. 报告: 记录发现
  6. 补救: 协助修复

常用工具

  • Burp套件: Web应用测试
  • OWASP ZAP: 免费安全扫描器
  • Metasploit: 利用框架
  • Nmap: 网络扫描
  • SQLMap: SQL注入测试
  • testssl.sh: SSL/TLS测试
  • Nikto: Web服务器扫描器

资源