应用日志记录
概述
实现全面的结构化日志记录,包括适当的级别、上下文和集中聚合,以便有效的调试和监控。
何时使用
- 应用程序调试
- 审计跟踪创建
- 性能分析
- 合规要求
- 集中日志聚合
指南
1. Node.js 结构化日志记录与 Winston
// logger.js
const winston = require('winston');
const logFormat = winston.format.combine(
winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
winston.format.errors({ stack: true }),
winston.format.json()
);
const logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: logFormat,
defaultMeta: {
service: 'api-service',
environment: process.env.NODE_ENV || 'development'
},
transports: [
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
)
}),
new winston.transports.File({
filename: 'logs/error.log',
level: 'error'
}),
new winston.transports.File({
filename: 'logs/combined.log'
})
]
});
module.exports = logger;
2. Express HTTP 请求日志记录
// Express 中间件
const express = require('express');
const expressWinston = require('express-winston');
const logger = require('./logger');
const app = express();
app.use(expressWinston.logger({
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: 'logs/http.log' })
],
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
meta: true,
msg: 'HTTP {{req.method}} {{req.url}}',
expressFormat: true
}));
app.get('/api/users/:id', (req, res) => {
const requestId = req.headers['x-request-id'] || Math.random().toString();
logger.info('用户请求开始', { requestId, userId: req.params.id });
try {
const user = { id: req.params.id, name: 'John Doe' };
logger.debug('用户数据检索', { requestId, user });
res.json(user);
} catch (error) {
logger.error('用户检索失败', {
requestId,
error: error.message,
stack: error.stack
});
res.status(500).json({ error: '内部服务器错误' });
}
});
3. Python 结构化日志记录
# logger_config.py
import logging
import json
from pythonjsonlogger import jsonlogger
import sys
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
log_record['timestamp'] = self.formatTime(record)
log_record['service'] = 'api-service'
log_record['level'] = record.levelname
def setup_logging():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler(sys.stdout)
formatter = CustomJsonFormatter()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
logger = setup_logging()
4. Flask 集成
# Flask 应用
from flask import Flask, request, g
import uuid
import time
app = Flask(__name__)
@app.before_request
def before_request():
g.start_time = time.time()
g.request_id = request.headers.get('X-Request-ID', str(uuid.uuid4()))
@app.after_request
def after_request(response):
duration = time.time() - g.start_time
logger.info('HTTP 请求', extra={
'method': request.method,
'path': request.path,
'status_code': response.status_code,
'duration_ms': duration * 1000,
'request_id': g.request_id
})
return response
@app.route('/api/orders/<order_id>')
def get_order(order_id):
logger.info('订单请求', extra={
'order_id': order_id,
'request_id': g.request_id
})
try:
order = db.query(f'SELECT * FROM orders WHERE id = {order_id}')
logger.debug('订单检索', extra={'order_id': order_id})
return {'order': order}
except Exception as e:
logger.error('订单检索失败', extra={
'order_id': order_id,
'error': str(e),
'request_id': g.request_id
}, exc_info=True)
return {'error': '内部服务器错误'}, 500
5. ELK 堆栈设置
# docker-compose.yml
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.0.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
logstash:
image: docker.elastic.co/logstash/logstash:8.0.0
ports:
- "5000:5000"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
depends_on:
- elasticsearch
kibana:
image: docker.elastic.co/kibana/kibana:8.0.0
ports:
- "5601:5601"
environment:
ELASTICSEARCH_HOSTS: http://elasticsearch:9200
depends_on:
- elasticsearch
volumes:
elasticsearch_data:
6. Logstash 配置
# logstash.conf
input {
tcp {
port => 5000
codec => json
}
}
filter {
date {
match => [ "timestamp", "YYYY-MM-dd HH:mm:ss" ]
target => "@timestamp"
}
mutate {
add_field => { "[@metadata][index_name]" => "logs-%{+YYYY.MM.dd}" }
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][index_name]}"
}
}
最佳实践
✅ 执行
- 使用结构化 JSON 日志记录
- 包括请求 ID 进行追踪
- 在适当的级别记录
- 为错误日志添加上下文
- 实施日志轮转
- 一致地使用时间戳
- 集中聚合日志
- 过滤敏感数据
❌ 不要
- 记录密码或机密
- 记录每个操作的 INFO
- 使用非结构化消息
- 忽略日志存储限制
- 跳过上下文信息
- 在生产中记录到 stdout
- 创建无界日志文件
日志级别
- ERROR: 需要立即关注的应用程序错误
- WARN: 需要调查的潜在问题
- INFO: 重要的应用程序事件
- DEBUG: 详细的诊断信息