实施全面的安全审计日志记录,以满足合规性、取证和SIEM集成的需求。当构建审计线索、合规性日志记录或安全监控系统时使用。
安全审计日志记录
概览
实现全面的审计日志记录,记录安全事件、用户行为和系统变更,包括结构化日志记录、保留策略和SIEM集成。
何时使用
- 合规性要求(SOC 2、HIPAA、PCI-DSS)
- 安全监控
- 取证调查
- 用户活动跟踪
- 系统变更审计
- 违规检测
实施示例
1. Node.js 审计日志记录器
// audit-logger.js
const winston = require('winston');
const { ElasticsearchTransport } = require('winston-elasticsearch');
class AuditLogger {
constructor() {
this.logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
// 文件传输
new winston.transports.File({
filename: 'logs/audit.log',
maxsize: 10485760, // 10MB
maxFiles: 30,
tailable: true
}),
// Elasticsearch传输用于SIEM
new ElasticsearchTransport({
level: 'info',
clientOpts: {
node: process.env.ELASTICSEARCH_URL
},
index: 'security-audit'
})
]
});
}
/**
* 记录认证事件
*/
logAuth(userId, action, success, metadata = {}) {
this.logger.info({
category: 'authentication',
userId,
action, // login, logout, password_change
success,
timestamp: new Date().toISOString(),
ip: metadata.ip,
userAgent: metadata.userAgent,
location: metadata.location,
mfaUsed: metadata.mfaUsed
});
}
/**
* 记录授权事件
*/
logAuthorization(userId, resource, action, granted, metadata = {}) {
this.logger.info({
category: 'authorization',
userId,
resource,
action,
granted,
timestamp: new Date().toISOString(),
ip: metadata.ip,
reason: metadata.reason
});
}
/**
* 记录数据访问
*/
logDataAccess(userId, dataType, recordId, action, metadata = {}) {
this.logger.info({
category: 'data_access',
userId,
dataType, // user, payment, health_record
recordId,
action, // read, create, update, delete
timestamp: new Date().toISOString(),
ip: metadata.ip,
query: metadata.query,
resultCount: metadata.resultCount
});
}
/**
* 记录配置变更
*/
logConfigChange(userId, setting, oldValue, newValue, metadata = {}) {
this.logger.info({
category: 'configuration_change',
userId,
setting,
oldValue,
newValue,
timestamp: new Date().toISOString(),
ip: metadata.ip
});
}
/**
* 记录安全事件
*/
logSecurityEvent(eventType, severity, description, metadata = {}) {
this.logger.warn({
category: 'security_event',
eventType, // brute_force, suspicious_activity, data_breach
severity, // low, medium, high, critical
description,
timestamp: new Date().toISOString(),
...metadata
});
}
/**
* 记录管理员操作
*/
logAdminAction(adminId, action, targetUserId, metadata = {}) {
this.logger.info({
category: 'admin_action',
adminId,
action, // user_delete, role_change, system_config
targetUserId,
timestamp: new Date().toISOString(),
changes: metadata.changes,
reason: metadata.reason
});
}
/**
* 记录API请求
*/
logAPIRequest(userId, method, endpoint, statusCode, duration, metadata = {}) {
this.logger.info({
category: 'api_request',
userId,
method,
endpoint,
statusCode,
duration,
timestamp: new Date().toISOString(),
ip: metadata.ip,
userAgent: metadata.userAgent,
requestId: metadata.requestId
});
}
}
// Express中间件
function auditMiddleware(auditLogger) {
return (req, res, next) => {
const startTime = Date.now();
// 捕获响应
const originalSend = res.send;
res.send = function(data) {
res.send = originalSend;
const duration = Date.now() - startTime;
// 记录API请求
auditLogger.logAPIRequest(
req.user?.id || 'anonymous',
req.method,
req.path,
res.statusCode,
duration,
{
ip: req.ip,
userAgent: req.get('user-agent'),
requestId: req.id
}
);
return res.send(data);
};
next();
};
}
// 使用
const auditLogger = new AuditLogger();
// 登录事件
app.post('/api/login', async (req, res) => {
const { email, password } = req.body;
try {
const user = await authenticateUser(email, password);
auditLogger.logAuth(
user.id,
'login',
true,
{
ip: req.ip,
userAgent: req.get('user-agent'),
mfaUsed: user.mfaEnabled
}
);
res.json({ token: generateToken(user) });
} catch (error) {
auditLogger.logAuth(
email,
'login',
false,
{
ip: req.ip,
userAgent: req.get('user-agent')
}
);
res.status(401).json({ error: 'Invalid credentials' });
}
});
// 数据访问事件
app.get('/api/users/:id', authorize, async (req, res) => {
const userId = req.params.id;
// 检查授权
if (req.user.id !== userId && !req.user.isAdmin) {
auditLogger.logAuthorization(
req.user.id,
`user:${userId}`,
'read',
false,
{
ip: req.ip,
reason: 'Insufficient permissions'
}
);
return res.status(403).json({ error: 'Forbidden' });
}
const user = await getUser(userId);
auditLogger.logDataAccess(
req.user.id,
'user',
userId,
'read',
{
ip: req.ip
}
);
res.json(user);
});
module.exports = { AuditLogger, auditMiddleware };
2. Python 审计日志系统
# audit_logging.py
import logging
import json
from datetime import datetime
from typing import Dict, Any, Optional
import structlog
from elasticsearch import Elasticsearch
class AuditLogger:
def __init__(self):
# 配置结构化日志记录
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
self.logger = structlog.get_logger()
# 文件处理器
file_handler = logging.FileHandler('logs/audit.log')
file_handler.setLevel(logging.INFO)
# Elasticsearch用于SIEM
self.es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
def _log(self, category: str, event_data: Dict[str, Any]):
"""内部日志记录方法"""
log_entry = {
'category': category,
'timestamp': datetime.utcnow().isoformat(),
**event_data
}
# 记录到文件
self.logger.info(json.dumps(log_entry))
# 发送到Elasticsearch
try:
self.es.index(
index='security-audit',
document=log_entry
)
except Exception as e:
print(f"Failed to send to Elasticsearch: {e}")
def log_auth(self, user_id: str, action: str, success: bool,
ip: str = None, user_agent: str = None, **kwargs):
"""记录认证事件"""
self._log('authentication', {
'user_id': user_id,
'action': action,
'success': success,
'ip': ip,
'user_agent': user_agent,
**kwargs
})
def log_authorization(self, user_id: str, resource: str, action: str,
granted: bool, reason: str = None, **kwargs):
"""记录授权决策"""
self._log('authorization', {
'user_id': user_id,
'resource': resource,
'action': action,
'granted': granted,
'reason': reason,
**kwargs
})
def log_data_access(self, user_id: str, data_type: str, record_id: str,
action: str, **kwargs):
"""记录数据访问事件"""
self._log('data_access', {
'user_id': user_id,
'data_type': data_type,
'record_id': record_id,
'action': action,
**kwargs
})
def log_security_event(self, event_type: str, severity: str,
description: str, **kwargs):
"""记录安全事件"""
self._log('security_event', {
'event_type': event_type,
'severity': severity,
'description': description,
**kwargs
})
def log_config_change(self, user_id: str, setting: str,
old_value: Any, new_value: Any, **kwargs):
"""记录配置变更"""
self._log('configuration_change', {
'user_id': user_id,
'setting': setting,
'old_value': str(old_value),
'new_value': str(new_value),
**kwargs
})
# Flask集成
from flask import Flask, request, g
from functools import wraps
app = Flask(__name__)
audit_logger = AuditLogger()
@app.before_request
def before_request():
g.request_start_time = datetime.now()
@app.after_request
def after_request(response):
if hasattr(g, 'request_start_time'):
duration = (datetime.now() - g.request_start_time).total_seconds() * 1000
audit_logger._log('api_request', {
'user_id': getattr(g, 'user_id', 'anonymous'),
'method': request.method,
'endpoint': request.path,
'status_code': response.status_code,
'duration_ms': duration,
'ip': request.remote_addr,
'user_agent': request.user_agent.string
})
return response
def audit_data_access(data_type: str):
"""数据访问日志记录装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
result = f(*args, **kwargs)
audit_logger.log_data_access(
user_id=g.user_id,
data_type=data_type,
record_id=kwargs.get('id', 'unknown'),
action=request.method.lower(),
ip=request.remote_addr
)
return result
return decorated_function
return decorator
@app.route('/api/users/<user_id>', methods=['GET'])
@audit_data_access('user')
def get_user(user_id):
# 获取用户
return jsonify({'id': user_id})
if __name__ == '__main__':
app.run()
3. Java 审计日志
// AuditLogger.java
package com.example.security;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
public class AuditLogger {
private static final Logger logger = LoggerFactory.getLogger("AUDIT");
private static final ObjectMapper objectMapper = new ObjectMapper();
public enum Category {
AUTHENTICATION,
AUTHORIZATION,
DATA_ACCESS,
CONFIGURATION_CHANGE,
SECURITY_EVENT,
ADMIN_ACTION
}
public enum Severity {
LOW, MEDIUM, HIGH, CRITICAL
}
public void logAuth(String userId, String action, boolean success,
String ip, Map<String, Object> metadata) {
Map<String, Object> logEntry = new HashMap<>();
logEntry.put("category", Category.AUTHENTICATION);
logEntry.put("userId", userId);
logEntry.put("action", action);
logEntry.put("success", success);
logEntry.put("ip", ip);
logEntry.put("timestamp", Instant.now().toString());
logEntry.putAll(metadata);
log(logEntry);
}
public void logDataAccess(String userId, String dataType, String recordId,
String action, String ip) {
Map<String, Object> logEntry = new HashMap<>();
logEntry.put("category", Category.DATA_ACCESS);
logEntry.put("userId", userId);
logEntry.put("dataType", dataType);
logEntry.put("recordId", recordId);
logEntry.put("action", action);
logEntry.put("ip", ip);
logEntry.put("timestamp", Instant.now().toString());
log(logEntry);
}
public void logSecurityEvent(String eventType, Severity severity,
String description, Map<String, Object> metadata) {
Map<String, Object> logEntry = new HashMap<>();
logEntry.put("category", Category.SECURITY_EVENT);
logEntry.put("eventType", eventType);
logEntry.put("severity", severity);
logEntry.put("description", description);
logEntry.put("timestamp", Instant.now().toString());
logEntry.putAll(metadata);
log(logEntry);
}
private void log(Map<String, Object> logEntry) {
try {
String json = objectMapper.writeValueAsString(logEntry);
logger.info(json);
// 发送到SIEM/Elasticsearch
// siemClient.send(logEntry);
} catch (Exception e) {
logger.error("Failed to log audit event", e);
}
}
}
// Spring Boot Filter
@Component
public class AuditFilter extends OncePerRequestFilter {
@Autowired
private AuditLogger auditLogger;
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain)
throws ServletException, IOException {
long startTime = System.currentTimeMillis();
filterChain.doFilter(request, response);
long duration = System.currentTimeMillis() - startTime;
String userId = SecurityContextHolder.getContext()
.getAuthentication()
.getName();
Map<String, Object> metadata = new HashMap<>();
metadata.put("method", request.getMethod());
metadata.put("endpoint", request.getRequestURI());
metadata.put("statusCode", response.getStatus());
metadata.put("duration", duration);
metadata.put("userAgent", request.getHeader("User-Agent"));
auditLogger.logAuth(userId, "api_request", true,
request.getRemoteAddr(), metadata);
}
}
最佳实践
✅ 要做
- 记录所有安全事件
- 使用结构化日志记录
- 包含时间戳(UTC)
- 记录用户上下文
- 实施日志保留
- 加密敏感日志
- 监控日志完整性
- 发送到SIEM
- 包含请求ID
❌ 不要做
- 记录密码/密钥
- 无谓地记录敏感PII
- 跳过失败尝试
- 允许日志篡改
- 不安全地存储日志
- 忽略日志分析
要记录的事件
- 认证:登录、登出、密码更改
- 授权:访问授权/拒绝
- 数据访问:CRUD操作
- 配置:系统变更
- 安全事件:可疑活动
- 管理员操作:特权操作
日志保留
- SOC 2:至少1年
- HIPAA:6年
- PCI-DSS:在线1年,存档3年
- GDPR:根据目的需要