SecurityAuditLogging security-audit-logging

安全审计日志记录技能,用于构建合规性、取证和安全监控系统,涵盖结构化日志、日志保留和SIEM集成。

安全审计 0 次安装 0 次浏览 更新于 3/4/2026

实施全面的安全审计日志记录,以满足合规性、取证和SIEM集成的需求。当构建审计线索、合规性日志记录或安全监控系统时使用。

安全审计日志记录

概览

实现全面的审计日志记录,记录安全事件、用户行为和系统变更,包括结构化日志记录、保留策略和SIEM集成。

何时使用

  • 合规性要求(SOC 2、HIPAA、PCI-DSS)
  • 安全监控
  • 取证调查
  • 用户活动跟踪
  • 系统变更审计
  • 违规检测

实施示例

1. Node.js 审计日志记录器

// audit-logger.js
const winston = require('winston');
const { ElasticsearchTransport } = require('winston-elasticsearch');

class AuditLogger {
  constructor() {
    this.logger = winston.createLogger({
      level: 'info',
      format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.json()
      ),
      transports: [
        // 文件传输
        new winston.transports.File({
          filename: 'logs/audit.log',
          maxsize: 10485760, // 10MB
          maxFiles: 30,
          tailable: true
        }),

        // Elasticsearch传输用于SIEM
        new ElasticsearchTransport({
          level: 'info',
          clientOpts: {
            node: process.env.ELASTICSEARCH_URL
          },
          index: 'security-audit'
        })
      ]
    });
  }

  /**
   * 记录认证事件
   */
  logAuth(userId, action, success, metadata = {}) {
    this.logger.info({
      category: 'authentication',
      userId,
      action, // login, logout, password_change
      success,
      timestamp: new Date().toISOString(),
      ip: metadata.ip,
      userAgent: metadata.userAgent,
      location: metadata.location,
      mfaUsed: metadata.mfaUsed
    });
  }

  /**
   * 记录授权事件
   */
  logAuthorization(userId, resource, action, granted, metadata = {}) {
    this.logger.info({
      category: 'authorization',
      userId,
      resource,
      action,
      granted,
      timestamp: new Date().toISOString(),
      ip: metadata.ip,
      reason: metadata.reason
    });
  }

  /**
   * 记录数据访问
   */
  logDataAccess(userId, dataType, recordId, action, metadata = {}) {
    this.logger.info({
      category: 'data_access',
      userId,
      dataType, // user, payment, health_record
      recordId,
      action, // read, create, update, delete
      timestamp: new Date().toISOString(),
      ip: metadata.ip,
      query: metadata.query,
      resultCount: metadata.resultCount
    });
  }

  /**
   * 记录配置变更
   */
  logConfigChange(userId, setting, oldValue, newValue, metadata = {}) {
    this.logger.info({
      category: 'configuration_change',
      userId,
      setting,
      oldValue,
      newValue,
      timestamp: new Date().toISOString(),
      ip: metadata.ip
    });
  }

  /**
   * 记录安全事件
   */
  logSecurityEvent(eventType, severity, description, metadata = {}) {
    this.logger.warn({
      category: 'security_event',
      eventType, // brute_force, suspicious_activity, data_breach
      severity, // low, medium, high, critical
      description,
      timestamp: new Date().toISOString(),
      ...metadata
    });
  }

  /**
   * 记录管理员操作
   */
  logAdminAction(adminId, action, targetUserId, metadata = {}) {
    this.logger.info({
      category: 'admin_action',
      adminId,
      action, // user_delete, role_change, system_config
      targetUserId,
      timestamp: new Date().toISOString(),
      changes: metadata.changes,
      reason: metadata.reason
    });
  }

  /**
   * 记录API请求
   */
  logAPIRequest(userId, method, endpoint, statusCode, duration, metadata = {}) {
    this.logger.info({
      category: 'api_request',
      userId,
      method,
      endpoint,
      statusCode,
      duration,
      timestamp: new Date().toISOString(),
      ip: metadata.ip,
      userAgent: metadata.userAgent,
      requestId: metadata.requestId
    });
  }
}

// Express中间件
function auditMiddleware(auditLogger) {
  return (req, res, next) => {
    const startTime = Date.now();

    // 捕获响应
    const originalSend = res.send;
    res.send = function(data) {
      res.send = originalSend;

      const duration = Date.now() - startTime;

      // 记录API请求
      auditLogger.logAPIRequest(
        req.user?.id || 'anonymous',
        req.method,
        req.path,
        res.statusCode,
        duration,
        {
          ip: req.ip,
          userAgent: req.get('user-agent'),
          requestId: req.id
        }
      );

      return res.send(data);
    };

    next();
  };
}

// 使用
const auditLogger = new AuditLogger();

// 登录事件
app.post('/api/login', async (req, res) => {
  const { email, password } = req.body;

  try {
    const user = await authenticateUser(email, password);

    auditLogger.logAuth(
      user.id,
      'login',
      true,
      {
        ip: req.ip,
        userAgent: req.get('user-agent'),
        mfaUsed: user.mfaEnabled
      }
    );

    res.json({ token: generateToken(user) });
  } catch (error) {
    auditLogger.logAuth(
      email,
      'login',
      false,
      {
        ip: req.ip,
        userAgent: req.get('user-agent')
      }
    );

    res.status(401).json({ error: 'Invalid credentials' });
  }
});

// 数据访问事件
app.get('/api/users/:id', authorize, async (req, res) => {
  const userId = req.params.id;

  // 检查授权
  if (req.user.id !== userId && !req.user.isAdmin) {
    auditLogger.logAuthorization(
      req.user.id,
      `user:${userId}`,
      'read',
      false,
      {
        ip: req.ip,
        reason: 'Insufficient permissions'
      }
    );

    return res.status(403).json({ error: 'Forbidden' });
  }

  const user = await getUser(userId);

  auditLogger.logDataAccess(
    req.user.id,
    'user',
    userId,
    'read',
    {
      ip: req.ip
    }
  );

  res.json(user);
});

module.exports = { AuditLogger, auditMiddleware };

2. Python 审计日志系统

# audit_logging.py
import logging
import json
from datetime import datetime
from typing import Dict, Any, Optional
import structlog
from elasticsearch import Elasticsearch

class AuditLogger:
    def __init__(self):
        # 配置结构化日志记录
        structlog.configure(
            processors=[
                structlog.processors.TimeStamper(fmt="iso"),
                structlog.processors.JSONRenderer()
            ]
        )

        self.logger = structlog.get_logger()

        # 文件处理器
        file_handler = logging.FileHandler('logs/audit.log')
        file_handler.setLevel(logging.INFO)

        # Elasticsearch用于SIEM
        self.es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

    def _log(self, category: str, event_data: Dict[str, Any]):
        """内部日志记录方法"""
        log_entry = {
            'category': category,
            'timestamp': datetime.utcnow().isoformat(),
            **event_data
        }

        # 记录到文件
        self.logger.info(json.dumps(log_entry))

        # 发送到Elasticsearch
        try:
            self.es.index(
                index='security-audit',
                document=log_entry
            )
        except Exception as e:
            print(f"Failed to send to Elasticsearch: {e}")

    def log_auth(self, user_id: str, action: str, success: bool,
                  ip: str = None, user_agent: str = None, **kwargs):
        """记录认证事件"""
        self._log('authentication', {
            'user_id': user_id,
            'action': action,
            'success': success,
            'ip': ip,
            'user_agent': user_agent,
            **kwargs
        })

    def log_authorization(self, user_id: str, resource: str, action: str,
                         granted: bool, reason: str = None, **kwargs):
        """记录授权决策"""
        self._log('authorization', {
            'user_id': user_id,
            'resource': resource,
            'action': action,
            'granted': granted,
            'reason': reason,
            **kwargs
        })

    def log_data_access(self, user_id: str, data_type: str, record_id: str,
                       action: str, **kwargs):
        """记录数据访问事件"""
        self._log('data_access', {
            'user_id': user_id,
            'data_type': data_type,
            'record_id': record_id,
            'action': action,
            **kwargs
        })

    def log_security_event(self, event_type: str, severity: str,
                          description: str, **kwargs):
        """记录安全事件"""
        self._log('security_event', {
            'event_type': event_type,
            'severity': severity,
            'description': description,
            **kwargs
        })

    def log_config_change(self, user_id: str, setting: str,
                         old_value: Any, new_value: Any, **kwargs):
        """记录配置变更"""
        self._log('configuration_change', {
            'user_id': user_id,
            'setting': setting,
            'old_value': str(old_value),
            'new_value': str(new_value),
            **kwargs
        })

# Flask集成
from flask import Flask, request, g
from functools import wraps

app = Flask(__name__)
audit_logger = AuditLogger()

@app.before_request
def before_request():
    g.request_start_time = datetime.now()

@app.after_request
def after_request(response):
    if hasattr(g, 'request_start_time'):
        duration = (datetime.now() - g.request_start_time).total_seconds() * 1000

        audit_logger._log('api_request', {
            'user_id': getattr(g, 'user_id', 'anonymous'),
            'method': request.method,
            'endpoint': request.path,
            'status_code': response.status_code,
            'duration_ms': duration,
            'ip': request.remote_addr,
            'user_agent': request.user_agent.string
        })

    return response

def audit_data_access(data_type: str):
    """数据访问日志记录装饰器"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            result = f(*args, **kwargs)

            audit_logger.log_data_access(
                user_id=g.user_id,
                data_type=data_type,
                record_id=kwargs.get('id', 'unknown'),
                action=request.method.lower(),
                ip=request.remote_addr
            )

            return result

        return decorated_function
    return decorator

@app.route('/api/users/<user_id>', methods=['GET'])
@audit_data_access('user')
def get_user(user_id):
    # 获取用户
    return jsonify({'id': user_id})

if __name__ == '__main__':
    app.run()

3. Java 审计日志

// AuditLogger.java
package com.example.security;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;

public class AuditLogger {
    private static final Logger logger = LoggerFactory.getLogger("AUDIT");
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public enum Category {
        AUTHENTICATION,
        AUTHORIZATION,
        DATA_ACCESS,
        CONFIGURATION_CHANGE,
        SECURITY_EVENT,
        ADMIN_ACTION
    }

    public enum Severity {
        LOW, MEDIUM, HIGH, CRITICAL
    }

    public void logAuth(String userId, String action, boolean success,
                       String ip, Map<String, Object> metadata) {
        Map<String, Object> logEntry = new HashMap<>();
        logEntry.put("category", Category.AUTHENTICATION);
        logEntry.put("userId", userId);
        logEntry.put("action", action);
        logEntry.put("success", success);
        logEntry.put("ip", ip);
        logEntry.put("timestamp", Instant.now().toString());
        logEntry.putAll(metadata);

        log(logEntry);
    }

    public void logDataAccess(String userId, String dataType, String recordId,
                             String action, String ip) {
        Map<String, Object> logEntry = new HashMap<>();
        logEntry.put("category", Category.DATA_ACCESS);
        logEntry.put("userId", userId);
        logEntry.put("dataType", dataType);
        logEntry.put("recordId", recordId);
        logEntry.put("action", action);
        logEntry.put("ip", ip);
        logEntry.put("timestamp", Instant.now().toString());

        log(logEntry);
    }

    public void logSecurityEvent(String eventType, Severity severity,
                                String description, Map<String, Object> metadata) {
        Map<String, Object> logEntry = new HashMap<>();
        logEntry.put("category", Category.SECURITY_EVENT);
        logEntry.put("eventType", eventType);
        logEntry.put("severity", severity);
        logEntry.put("description", description);
        logEntry.put("timestamp", Instant.now().toString());
        logEntry.putAll(metadata);

        log(logEntry);
    }

    private void log(Map<String, Object> logEntry) {
        try {
            String json = objectMapper.writeValueAsString(logEntry);
            logger.info(json);

            // 发送到SIEM/Elasticsearch
            // siemClient.send(logEntry);
        } catch (Exception e) {
            logger.error("Failed to log audit event", e);
        }
    }
}

// Spring Boot Filter
@Component
public class AuditFilter extends OncePerRequestFilter {

    @Autowired
    private AuditLogger auditLogger;

    @Override
    protected void doFilterInternal(HttpServletRequest request,
                                   HttpServletResponse response,
                                   FilterChain filterChain)
            throws ServletException, IOException {

        long startTime = System.currentTimeMillis();

        filterChain.doFilter(request, response);

        long duration = System.currentTimeMillis() - startTime;

        String userId = SecurityContextHolder.getContext()
            .getAuthentication()
            .getName();

        Map<String, Object> metadata = new HashMap<>();
        metadata.put("method", request.getMethod());
        metadata.put("endpoint", request.getRequestURI());
        metadata.put("statusCode", response.getStatus());
        metadata.put("duration", duration);
        metadata.put("userAgent", request.getHeader("User-Agent"));

        auditLogger.logAuth(userId, "api_request", true,
                           request.getRemoteAddr(), metadata);
    }
}

最佳实践

✅ 要做

  • 记录所有安全事件
  • 使用结构化日志记录
  • 包含时间戳(UTC)
  • 记录用户上下文
  • 实施日志保留
  • 加密敏感日志
  • 监控日志完整性
  • 发送到SIEM
  • 包含请求ID

❌ 不要做

  • 记录密码/密钥
  • 无谓地记录敏感PII
  • 跳过失败尝试
  • 允许日志篡改
  • 不安全地存储日志
  • 忽略日志分析

要记录的事件

  • 认证:登录、登出、密码更改
  • 授权:访问授权/拒绝
  • 数据访问:CRUD操作
  • 配置:系统变更
  • 安全事件:可疑活动
  • 管理员操作:特权操作

日志保留

  • SOC 2:至少1年
  • HIPAA:6年
  • PCI-DSS:在线1年,存档3年
  • GDPR:根据目的需要

资源