安全审计技能Skill security-auditing

安全审计技能提供全面的安全审计和合规解决方案,包括防篡改日志记录、SIEM系统集成、自动化漏洞评估和合规报告生成。适用于满足GDPR、HIPAA、PCI-DSS等法规要求,确保系统安全并防止数据泄露。关键词:安全审计、合规、日志完整性、SIEM、漏洞扫描、防篡改日志、安全监控。

安全审计 0 次安装 0 次浏览 更新于 3/15/2026

安全审计技能


name: security-auditing version: 1.0.0 domain: security/compliance risk_level: HIGH languages: [python, go, typescript] frameworks: [structlog, opentelemetry, falco] requires_security_review: true compliance: [GDPR, HIPAA, PCI-DSS, SOC2, ISO27001] last_updated: 2025-01-15

强制阅读协议:在实施审计日志记录之前,阅读 references/advanced-patterns.md 以了解防篡改模式,并阅读 references/threat-model.md 以了解日志完整性攻击。

1. 概述

1.1 目的和范围

此技能提供安全审计和合规能力:

  • 防篡改日志记录:加密签名的审计跟踪
  • SIEM集成:将事件转发到安全监控系统
  • 漏洞评估:自动化安全扫描和报告
  • 合规报告:为法规生成审计报告

1.2 风险评估

风险等级:高

理由

  • 审计日志是事件调查中的证据
  • 日志篡改隐藏攻击者活动
  • 合规违规导致法律处罚
  • 缺失日志 = 安全监控盲点

攻击面

  • 日志注入攻击
  • 日志篡改/删除
  • SIEM配置错误
  • 日志中的敏感数据(PII泄漏)
  • 日志存储耗尽

2. 核心职责

2.1 主要功能

  1. 生成防篡改审计日志 用于安全事件
  2. 将事件转发到SIEM 用于关联和告警
  3. 评估漏洞 通过自动化扫描
  4. 生成合规报告 用于法规要求
  5. 检测异常 在用户行为和系统活动中

2.2 核心原则

  • 测试驱动开发优先:在实施前编写安全检查的测试
  • 性能意识:使用增量扫描和缓存以提高效率
  • 绝不 记录敏感数据(密码、PII、密钥)
  • 绝不 未经完整性验证就信任日志数据
  • 总是 使用结构化日志记录(JSON)
  • 总是 包含用于请求追踪的关联ID
  • 总是 保护日志免受未经授权的修改

3. 技术栈

组件 推荐 目的
结构化日志记录 structlog (Python) JSON日志生成
日志聚合 Elasticsearch, Loki 集中存储
SIEM Splunk, QRadar, Sentinel 安全监控
完整性 签名日志、WORM存储 防篡改证据
合规 OpenSCAP, Prowler, Trivy 评估工具

4. 实现模式

4.1 防篡改审计日志记录(摘要)

import hashlib
import hmac
import json
from datetime import datetime, timezone

class TamperEvidentLogger:
    """具有加密完整性保护的审计记录器。"""

    def __init__(self, signing_key: bytes, output_path: str):
        self._key = signing_key
        self._path = output_path
        self._sequence = 0
        self._previous_hash = b'\x00' * 32

    def log(self, event: str, actor: str = None, **context) -> dict:
        """记录一个防篡改审计条目。"""
        self._sequence += 1

        entry = {
            'timestamp': datetime.now(timezone.utc).isoformat(),
            'sequence': self._sequence,
            'event': event,
            'actor': actor,
            'context': context,
            'previous_hash': self._previous_hash.hex(),
        }

        # 计算和签名
        entry_bytes = json.dumps(entry, sort_keys=True).encode()
        entry['hash'] = hashlib.sha256(entry_bytes).hexdigest()
        entry['signature'] = hmac.new(
            self._key, entry_bytes, hashlib.sha256
        ).hexdigest()

        self._previous_hash = bytes.fromhex(entry['hash'])

        with open(self._path, 'a') as f:
            f.write(json.dumps(entry) + '
')

        return entry

📚 完整实现(验证、链验证):

  • 参见 references/advanced-patterns.md

4.2 结构化安全日志记录

import structlog

logger = structlog.get_logger()

class SecurityAuditLogger:
    """专注于安全的审计日志记录。"""

    @staticmethod
    def log_authentication(user_id: str, success: bool, method: str, ip: str):
        """记录身份验证尝试。"""
        logger.info(
            "auth.attempt",
            user_id=user_id,  # 从不记录邮箱以保护隐私
            success=success,
            method=method,
            ip_address=ip
        )

    @staticmethod
    def log_authorization(user_id: str, resource: str, action: str, allowed: bool):
        """记录授权决策。"""
        logger.info(
            "authz.decision",
            user_id=user_id,
            resource=resource,
            action=action,
            allowed=allowed
        )

    @staticmethod
    def log_data_access(user_id: str, resource_type: str, resource_id: str, action: str):
        """为合规记录数据访问。"""
        logger.info(
            "data.access",
            user_id=user_id,
            resource_type=resource_type,
            resource_id=resource_id,
            action=action
        )

📚 完整模式(装饰器、上下文管理器、SIEM集成):

  • 参见 references/security-examples.md

4.3 SIEM集成(CEF格式)

class SIEMForwarder:
    def _to_cef(self, event: dict) -> str:
        """将事件转换为CEF格式以用于SIEM摄取。"""
        severity = self._map_severity(event.get('level', 'INFO'))
        return (f"CEF:0|JARVIS|SecurityAudit|1.0|{event.get('event', 'unknown')}|"
                f"{event.get('event', 'Unknown Event')}|{severity}|"
                f"src={event.get('ip_address', '')} suser={event.get('user_id', '')}")

📚 完整SIEM实现:参见 references/security-examples.md#siem-integration

4.4 漏洞评估

from dataclasses import dataclass
from typing import List

@dataclass
class Vulnerability:
    id: str
    severity: str
    package: str
    fixed_version: str

class VulnerabilityScanner:
    def scan_dependencies(self, path: str) -> List[Vulnerability]:
        """使用pip-audit、trivy扫描容器依赖项。"""
        pass

📚 完整扫描器:参见 references/advanced-patterns.md#vulnerability-assessment

5. 实现工作流程(测试驱动开发)

步骤1:首先编写失败测试

import pytest
from security_auditing import TamperEvidentLogger, SecurityAuditLogger

class TestTamperEvidentLogger:
    def test_log_entry_contains_required_fields(self, tmp_path):
        """每个日志条目必须包含时间戳、序列号、哈希、签名。"""
        logger = TamperEvidentLogger(b'test-key', str(tmp_path / 'audit.log'))
        entry = logger.log("user.login", actor="user123")
        assert all(k in entry for k in ['timestamp', 'sequence', 'hash', 'signature'])

    def test_chain_integrity_detects_tampering(self, tmp_path):
        """篡改的日志必须通过链验证检测到。"""
        log_path = tmp_path / 'audit.log'
        logger = TamperEvidentLogger(b'test-key', str(log_path))
        logger.log("event1", actor="user1")

        # 篡改日志文件
        tampered = log_path.read_text().replace('"event1"', '"TAMPERED"')
        log_path.write_text(tampered)

        valid, errors = logger.verify_chain()
        assert not valid and len(errors) > 0

    def test_no_pii_in_log_output(self, tmp_path):
        """PII模式不得出现在日志中。"""
        import re
        log_path = tmp_path / 'audit.log'
        logger = SecurityAuditLogger(str(log_path))
        logger.log_authentication(user_id="user123", success=True, method="password", ip="192.168.1.1")
        content = log_path.read_text()
        assert not re.search(r'[\w\.-]+@[\w\.-]+', content)  # 无邮箱

步骤2:实现最少代码以通过测试

# 仅实现通过测试所需的内容
class TamperEvidentLogger:
    def __init__(self, signing_key: bytes, output_path: str):
        self._key, self._path = signing_key, output_path
        self._sequence, self._previous_hash = 0, b'\x00' * 32

    def log(self, event: str, actor: str = None, **context) -> dict:
        self._sequence += 1
        entry = {'timestamp': datetime.now(timezone.utc).isoformat(),
                 'sequence': self._sequence, 'event': event, 'actor': actor}
        # 添加哈希和签名...
        return entry

步骤3:遵循模式进行重构

测试通过后,重构以改进错误处理、性能优化和安全加固。

步骤4:运行完整验证

pytest tests/security_auditing/ -v --tb=short
pytest tests/security_auditing/ --cov=security_auditing --cov-report=term-missing

6. 性能模式

6.1 增量扫描

# 错误:每次全扫描
def scan_all_dependencies():
    return subprocess.run(['pip-audit', '--format=json'], capture_output=True)

# 正确:基于更改的增量扫描
class IncrementalScanner:
    def scan_if_changed(self, requirements_path: str) -> List[Vulnerability]:
        current_hash = self._hash_file(requirements_path)
        if current_hash == self._last_hash:
            return self._load_cached_results()
        results = self._full_scan(requirements_path)
        self._save_cache(current_hash, results)
        return results

6.2 缓存扫描结果

# 错误:无缓存 - 每次获取
def get_vulnerability_info(cve_id: str) -> dict:
    return requests.get(f"https://nvd.nist.gov/vuln/detail/{cve_id}")

# 正确:带TTL的缓存
class VulnerabilityCache:
    def get_vulnerability(self, cve_id: str) -> dict:
        if cve_id in self._cache:
            data, timestamp = self._cache[cve_id]
            if datetime.now() - timestamp < self._ttl:
                return data
        data = self._fetch_from_api(cve_id)
        self._cache[cve_id] = (data, datetime.now())
        return data

6.3 并行分析

# 错误:顺序扫描
def scan_multiple_projects(paths: List[str]) -> List[Report]:
    return [scan_project(path) for path in paths]

# 正确:使用线程池的并行扫描
from concurrent.futures import ThreadPoolExecutor, as_completed

def scan_multiple_projects_parallel(paths: List[str], max_workers: int = 4):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(scan_project, p): p for p in paths}
        return [f.result() for f in as_completed(futures)]

6.4 定向审计

# 错误:总是扫描所有内容
def full_security_audit(project_path: str):
    scan_dependencies(project_path)
    scan_secrets(project_path)
    scan_code_vulnerabilities(project_path)

# 正确:基于更改的定向扫描
def targeted_security_audit(project_path: str, changed_files: List[str]):
    scans_needed = set()
    for file in changed_files:
        if file.endswith(('requirements.txt', 'package.json')):
            scans_needed.add('dependencies')
        elif file.endswith(('.env', '.yml', '.yaml')):
            scans_needed.add('secrets')
        elif file.endswith(('.py', '.js', '.ts')):
            scans_needed.add('code')
    # 仅运行所需扫描
    return {scan: globals()[f'scan_{scan}'](project_path) for scan in scans_needed}

6.5 资源限制

# 错误:无界资源使用
def scan_large_codebase(path: str):
    for root, dirs, files in os.walk(path):
        for file in files:
            analyze_file(os.path.join(root, file))

# 正确:资源限制的扫描
class BoundedScanner:
    def __init__(self, max_memory_mb: int = 512, max_files: int = 10000):
        self._max_memory = max_memory_mb * 1024 * 1024
        self._max_files = max_files

    def scan_with_limits(self, path: str):
        import resource
        resource.setrlimit(resource.RLIMIT_AS, (self._max_memory, -1))
        files_scanned = 0
        for root, _, files in os.walk(path):
            for file in files:
                if files_scanned >= self._max_files:
                    return
                files_scanned += 1
                analyze_file(os.path.join(root, file))

7. 安全标准

7.1 已知漏洞

CVE 严重性 组件 缓解措施
CVE-2023-50960 严重 QRadar 命令注入 - 更新QRadar
CVE-2023-50961 QRadar 存储型XSS - 更新QRadar
CVE-2023-2976 Guava 文件暴露 - 更新至32.0+
CVE-2024-22365 PAM 拒绝服务 - 更新Linux PAM
CVE-2023-22875 QRadar 信息泄露 - 更新

7.2 OWASP映射

OWASP 2025 风险 实现
A09: 安全日志记录失败 严重 防篡改日志、SIEM转发
A05: 安全配置错误 日志保护、保留策略
A01: 破坏的访问控制 日志访问审计

📚 详细OWASP指南

  • 参见 references/security-examples.md#owasp-coverage

7.3 合规要求

  • GDPR第30条:处理活动记录
  • HIPAA 164.312(b):审计控制
  • PCI-DSS 10:跟踪所有对网络资源的访问
  • SOC2 CC7.2:监控系统组件

8. 测试要求

def test_log_integrity_tamper_detection(audit_logger):
    """篡改的日志必须被检测到。"""
    audit_logger.log("test.event", actor="user1")

    # 篡改并验证检测
    valid, errors = audit_logger.verify_chain()
    assert not valid

def test_no_pii_in_logs(audit_logger):
    """PII不得出现在日志中。"""
    # 检查日志输出中的邮箱、电话、社保号模式
    pass

📚 完整测试套件

  • 参见 references/security-examples.md#testing

9. 常见错误

关键反模式

# ❌ 绝不:记录密码、令牌、PII
logger.info(f"User {email} logged in with password {password}")
# ✅ 总是:仅记录标识符
logger.info("user.login", user_id=user.id, method="password")

# ❌ 绝不:任何人都能修改的纯文本日志
with open('audit.log', 'a') as f:
    f.write(json.dumps(event) + '
')
# ✅ 总是:签名、链式日志
entry['signature'] = hmac.new(key, data, hashlib.sha256).hexdigest()

# ❌ 绝不:未追踪的请求
logger.info("Processing request")
# ✅ 总是:包含关联ID
logger.info("request.processing", correlation_id=request.id)
# ❌ chmod 644 /var/log/audit.log  # 全球可读
# ✅ chmod 600 /var/log/audit.log  # 受限

📚 完整反模式:参见 references/advanced-patterns.md#anti-patterns

10. 实施前检查清单

阶段1:编写代码前

  • [ ] 阅读日志完整性攻击的威胁模型
  • [ ] 识别合规要求(GDPR、HIPAA、PCI-DSS)
  • [ ] 设计防篡改日志格式
  • [ ] 规划SIEM集成架构
  • [ ] 为安全检查编写失败测试

阶段2:实施期间

  • [ ] 实现结构化JSON日志记录
  • [ ] 启用防篡改签名
  • [ ] 日志中无PII/密钥
  • [ ] 所有条目包含关联ID
  • [ ] 应用性能模式(缓存、增量扫描)
  • [ ] 配置资源限制

阶段3:提交前

  • [ ] 所有安全审计测试通过
  • [ ] 验证日志保护(600权限)
  • [ ] 测试SIEM转发
  • [ ] 为合规配置WORM存储
  • [ ] 强制执行保留策略
  • [ ] 配置告警规则
  • [ ] 覆盖率满足最低阈值

11. 总结

关键目标

  1. 防篡改日志:加密签名和链式记录
  2. 集中监控:所有事件的SIEM集成
  3. 合规就绪:满足GDPR、HIPAA、PCI-DSS要求
  4. 隐私保护:日志中无PII/密钥

参考资料

  • references/advanced-patterns.md - 完整实现、WORM存储
  • references/security-examples.md - SIEM配置、合规报告
  • references/threat-model.md - 日志完整性攻击场景

如果没有记录,就没有发生。如果日志可以被篡改,你无法证明任何事。