安全审计技能
name: security-auditing version: 1.0.0 domain: security/compliance risk_level: HIGH languages: [python, go, typescript] frameworks: [structlog, opentelemetry, falco] requires_security_review: true compliance: [GDPR, HIPAA, PCI-DSS, SOC2, ISO27001] last_updated: 2025-01-15
强制阅读协议:在实施审计日志记录之前,阅读
references/advanced-patterns.md以了解防篡改模式,并阅读references/threat-model.md以了解日志完整性攻击。
1. 概述
1.1 目的和范围
此技能提供安全审计和合规能力:
- 防篡改日志记录:加密签名的审计跟踪
- SIEM集成:将事件转发到安全监控系统
- 漏洞评估:自动化安全扫描和报告
- 合规报告:为法规生成审计报告
1.2 风险评估
风险等级:高
理由:
- 审计日志是事件调查中的证据
- 日志篡改隐藏攻击者活动
- 合规违规导致法律处罚
- 缺失日志 = 安全监控盲点
攻击面:
- 日志注入攻击
- 日志篡改/删除
- SIEM配置错误
- 日志中的敏感数据(PII泄漏)
- 日志存储耗尽
2. 核心职责
2.1 主要功能
- 生成防篡改审计日志 用于安全事件
- 将事件转发到SIEM 用于关联和告警
- 评估漏洞 通过自动化扫描
- 生成合规报告 用于法规要求
- 检测异常 在用户行为和系统活动中
2.2 核心原则
- 测试驱动开发优先:在实施前编写安全检查的测试
- 性能意识:使用增量扫描和缓存以提高效率
- 绝不 记录敏感数据(密码、PII、密钥)
- 绝不 未经完整性验证就信任日志数据
- 总是 使用结构化日志记录(JSON)
- 总是 包含用于请求追踪的关联ID
- 总是 保护日志免受未经授权的修改
3. 技术栈
| 组件 | 推荐 | 目的 |
|---|---|---|
| 结构化日志记录 | structlog (Python) |
JSON日志生成 |
| 日志聚合 | Elasticsearch, Loki | 集中存储 |
| SIEM | Splunk, QRadar, Sentinel | 安全监控 |
| 完整性 | 签名日志、WORM存储 | 防篡改证据 |
| 合规 | OpenSCAP, Prowler, Trivy | 评估工具 |
4. 实现模式
4.1 防篡改审计日志记录(摘要)
import hashlib
import hmac
import json
from datetime import datetime, timezone
class TamperEvidentLogger:
"""具有加密完整性保护的审计记录器。"""
def __init__(self, signing_key: bytes, output_path: str):
self._key = signing_key
self._path = output_path
self._sequence = 0
self._previous_hash = b'\x00' * 32
def log(self, event: str, actor: str = None, **context) -> dict:
"""记录一个防篡改审计条目。"""
self._sequence += 1
entry = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'sequence': self._sequence,
'event': event,
'actor': actor,
'context': context,
'previous_hash': self._previous_hash.hex(),
}
# 计算和签名
entry_bytes = json.dumps(entry, sort_keys=True).encode()
entry['hash'] = hashlib.sha256(entry_bytes).hexdigest()
entry['signature'] = hmac.new(
self._key, entry_bytes, hashlib.sha256
).hexdigest()
self._previous_hash = bytes.fromhex(entry['hash'])
with open(self._path, 'a') as f:
f.write(json.dumps(entry) + '
')
return entry
📚 完整实现(验证、链验证):
- 参见
references/advanced-patterns.md
4.2 结构化安全日志记录
import structlog
logger = structlog.get_logger()
class SecurityAuditLogger:
"""专注于安全的审计日志记录。"""
@staticmethod
def log_authentication(user_id: str, success: bool, method: str, ip: str):
"""记录身份验证尝试。"""
logger.info(
"auth.attempt",
user_id=user_id, # 从不记录邮箱以保护隐私
success=success,
method=method,
ip_address=ip
)
@staticmethod
def log_authorization(user_id: str, resource: str, action: str, allowed: bool):
"""记录授权决策。"""
logger.info(
"authz.decision",
user_id=user_id,
resource=resource,
action=action,
allowed=allowed
)
@staticmethod
def log_data_access(user_id: str, resource_type: str, resource_id: str, action: str):
"""为合规记录数据访问。"""
logger.info(
"data.access",
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
action=action
)
📚 完整模式(装饰器、上下文管理器、SIEM集成):
- 参见
references/security-examples.md
4.3 SIEM集成(CEF格式)
class SIEMForwarder:
def _to_cef(self, event: dict) -> str:
"""将事件转换为CEF格式以用于SIEM摄取。"""
severity = self._map_severity(event.get('level', 'INFO'))
return (f"CEF:0|JARVIS|SecurityAudit|1.0|{event.get('event', 'unknown')}|"
f"{event.get('event', 'Unknown Event')}|{severity}|"
f"src={event.get('ip_address', '')} suser={event.get('user_id', '')}")
📚 完整SIEM实现:参见 references/security-examples.md#siem-integration
4.4 漏洞评估
from dataclasses import dataclass
from typing import List
@dataclass
class Vulnerability:
id: str
severity: str
package: str
fixed_version: str
class VulnerabilityScanner:
def scan_dependencies(self, path: str) -> List[Vulnerability]:
"""使用pip-audit、trivy扫描容器依赖项。"""
pass
📚 完整扫描器:参见 references/advanced-patterns.md#vulnerability-assessment
5. 实现工作流程(测试驱动开发)
步骤1:首先编写失败测试
import pytest
from security_auditing import TamperEvidentLogger, SecurityAuditLogger
class TestTamperEvidentLogger:
def test_log_entry_contains_required_fields(self, tmp_path):
"""每个日志条目必须包含时间戳、序列号、哈希、签名。"""
logger = TamperEvidentLogger(b'test-key', str(tmp_path / 'audit.log'))
entry = logger.log("user.login", actor="user123")
assert all(k in entry for k in ['timestamp', 'sequence', 'hash', 'signature'])
def test_chain_integrity_detects_tampering(self, tmp_path):
"""篡改的日志必须通过链验证检测到。"""
log_path = tmp_path / 'audit.log'
logger = TamperEvidentLogger(b'test-key', str(log_path))
logger.log("event1", actor="user1")
# 篡改日志文件
tampered = log_path.read_text().replace('"event1"', '"TAMPERED"')
log_path.write_text(tampered)
valid, errors = logger.verify_chain()
assert not valid and len(errors) > 0
def test_no_pii_in_log_output(self, tmp_path):
"""PII模式不得出现在日志中。"""
import re
log_path = tmp_path / 'audit.log'
logger = SecurityAuditLogger(str(log_path))
logger.log_authentication(user_id="user123", success=True, method="password", ip="192.168.1.1")
content = log_path.read_text()
assert not re.search(r'[\w\.-]+@[\w\.-]+', content) # 无邮箱
步骤2:实现最少代码以通过测试
# 仅实现通过测试所需的内容
class TamperEvidentLogger:
def __init__(self, signing_key: bytes, output_path: str):
self._key, self._path = signing_key, output_path
self._sequence, self._previous_hash = 0, b'\x00' * 32
def log(self, event: str, actor: str = None, **context) -> dict:
self._sequence += 1
entry = {'timestamp': datetime.now(timezone.utc).isoformat(),
'sequence': self._sequence, 'event': event, 'actor': actor}
# 添加哈希和签名...
return entry
步骤3:遵循模式进行重构
测试通过后,重构以改进错误处理、性能优化和安全加固。
步骤4:运行完整验证
pytest tests/security_auditing/ -v --tb=short
pytest tests/security_auditing/ --cov=security_auditing --cov-report=term-missing
6. 性能模式
6.1 增量扫描
# 错误:每次全扫描
def scan_all_dependencies():
return subprocess.run(['pip-audit', '--format=json'], capture_output=True)
# 正确:基于更改的增量扫描
class IncrementalScanner:
def scan_if_changed(self, requirements_path: str) -> List[Vulnerability]:
current_hash = self._hash_file(requirements_path)
if current_hash == self._last_hash:
return self._load_cached_results()
results = self._full_scan(requirements_path)
self._save_cache(current_hash, results)
return results
6.2 缓存扫描结果
# 错误:无缓存 - 每次获取
def get_vulnerability_info(cve_id: str) -> dict:
return requests.get(f"https://nvd.nist.gov/vuln/detail/{cve_id}")
# 正确:带TTL的缓存
class VulnerabilityCache:
def get_vulnerability(self, cve_id: str) -> dict:
if cve_id in self._cache:
data, timestamp = self._cache[cve_id]
if datetime.now() - timestamp < self._ttl:
return data
data = self._fetch_from_api(cve_id)
self._cache[cve_id] = (data, datetime.now())
return data
6.3 并行分析
# 错误:顺序扫描
def scan_multiple_projects(paths: List[str]) -> List[Report]:
return [scan_project(path) for path in paths]
# 正确:使用线程池的并行扫描
from concurrent.futures import ThreadPoolExecutor, as_completed
def scan_multiple_projects_parallel(paths: List[str], max_workers: int = 4):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(scan_project, p): p for p in paths}
return [f.result() for f in as_completed(futures)]
6.4 定向审计
# 错误:总是扫描所有内容
def full_security_audit(project_path: str):
scan_dependencies(project_path)
scan_secrets(project_path)
scan_code_vulnerabilities(project_path)
# 正确:基于更改的定向扫描
def targeted_security_audit(project_path: str, changed_files: List[str]):
scans_needed = set()
for file in changed_files:
if file.endswith(('requirements.txt', 'package.json')):
scans_needed.add('dependencies')
elif file.endswith(('.env', '.yml', '.yaml')):
scans_needed.add('secrets')
elif file.endswith(('.py', '.js', '.ts')):
scans_needed.add('code')
# 仅运行所需扫描
return {scan: globals()[f'scan_{scan}'](project_path) for scan in scans_needed}
6.5 资源限制
# 错误:无界资源使用
def scan_large_codebase(path: str):
for root, dirs, files in os.walk(path):
for file in files:
analyze_file(os.path.join(root, file))
# 正确:资源限制的扫描
class BoundedScanner:
def __init__(self, max_memory_mb: int = 512, max_files: int = 10000):
self._max_memory = max_memory_mb * 1024 * 1024
self._max_files = max_files
def scan_with_limits(self, path: str):
import resource
resource.setrlimit(resource.RLIMIT_AS, (self._max_memory, -1))
files_scanned = 0
for root, _, files in os.walk(path):
for file in files:
if files_scanned >= self._max_files:
return
files_scanned += 1
analyze_file(os.path.join(root, file))
7. 安全标准
7.1 已知漏洞
| CVE | 严重性 | 组件 | 缓解措施 |
|---|---|---|---|
| CVE-2023-50960 | 严重 | QRadar | 命令注入 - 更新QRadar |
| CVE-2023-50961 | 高 | QRadar | 存储型XSS - 更新QRadar |
| CVE-2023-2976 | 中 | Guava | 文件暴露 - 更新至32.0+ |
| CVE-2024-22365 | 中 | PAM | 拒绝服务 - 更新Linux PAM |
| CVE-2023-22875 | 中 | QRadar | 信息泄露 - 更新 |
7.2 OWASP映射
| OWASP 2025 | 风险 | 实现 |
|---|---|---|
| A09: 安全日志记录失败 | 严重 | 防篡改日志、SIEM转发 |
| A05: 安全配置错误 | 高 | 日志保护、保留策略 |
| A01: 破坏的访问控制 | 高 | 日志访问审计 |
📚 详细OWASP指南:
- 参见
references/security-examples.md#owasp-coverage
7.3 合规要求
- GDPR第30条:处理活动记录
- HIPAA 164.312(b):审计控制
- PCI-DSS 10:跟踪所有对网络资源的访问
- SOC2 CC7.2:监控系统组件
8. 测试要求
def test_log_integrity_tamper_detection(audit_logger):
"""篡改的日志必须被检测到。"""
audit_logger.log("test.event", actor="user1")
# 篡改并验证检测
valid, errors = audit_logger.verify_chain()
assert not valid
def test_no_pii_in_logs(audit_logger):
"""PII不得出现在日志中。"""
# 检查日志输出中的邮箱、电话、社保号模式
pass
📚 完整测试套件:
- 参见
references/security-examples.md#testing
9. 常见错误
关键反模式
# ❌ 绝不:记录密码、令牌、PII
logger.info(f"User {email} logged in with password {password}")
# ✅ 总是:仅记录标识符
logger.info("user.login", user_id=user.id, method="password")
# ❌ 绝不:任何人都能修改的纯文本日志
with open('audit.log', 'a') as f:
f.write(json.dumps(event) + '
')
# ✅ 总是:签名、链式日志
entry['signature'] = hmac.new(key, data, hashlib.sha256).hexdigest()
# ❌ 绝不:未追踪的请求
logger.info("Processing request")
# ✅ 总是:包含关联ID
logger.info("request.processing", correlation_id=request.id)
# ❌ chmod 644 /var/log/audit.log # 全球可读
# ✅ chmod 600 /var/log/audit.log # 受限
📚 完整反模式:参见 references/advanced-patterns.md#anti-patterns
10. 实施前检查清单
阶段1:编写代码前
- [ ] 阅读日志完整性攻击的威胁模型
- [ ] 识别合规要求(GDPR、HIPAA、PCI-DSS)
- [ ] 设计防篡改日志格式
- [ ] 规划SIEM集成架构
- [ ] 为安全检查编写失败测试
阶段2:实施期间
- [ ] 实现结构化JSON日志记录
- [ ] 启用防篡改签名
- [ ] 日志中无PII/密钥
- [ ] 所有条目包含关联ID
- [ ] 应用性能模式(缓存、增量扫描)
- [ ] 配置资源限制
阶段3:提交前
- [ ] 所有安全审计测试通过
- [ ] 验证日志保护(600权限)
- [ ] 测试SIEM转发
- [ ] 为合规配置WORM存储
- [ ] 强制执行保留策略
- [ ] 配置告警规则
- [ ] 覆盖率满足最低阈值
11. 总结
关键目标
- 防篡改日志:加密签名和链式记录
- 集中监控:所有事件的SIEM集成
- 合规就绪:满足GDPR、HIPAA、PCI-DSS要求
- 隐私保护:日志中无PII/密钥
参考资料
references/advanced-patterns.md- 完整实现、WORM存储references/security-examples.md- SIEM配置、合规报告references/threat-model.md- 日志完整性攻击场景
如果没有记录,就没有发生。如果日志可以被篡改,你无法证明任何事。