日志聚合
概述
构建全面的日志聚合系统,从多个来源收集、解析和分析日志,实现集中监控、调试和合规审计。
何时使用
- 集中日志收集
- 分布式系统调试
- 合规和审计日志
- 安全事件监控
- 应用性能分析
- 错误跟踪和报警
- 历史日志保留
- 实时日志搜索
实施示例
1. ELK Stack配置
# docker-compose.yml - ELK Stack设置
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
healthcheck:
test: curl -s http://localhost:9200 >/dev/null || exit 1
interval: 10s
timeout: 5s
retries: 5
logstash:
image: docker.elastic.co/logstash/logstash:8.5.0
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
ports:
- "5000:5000"
- "9600:9600"
depends_on:
- elasticsearch
environment:
- "LS_JAVA_OPTS=-Xmx256m -Xms256m"
kibana:
image: docker.elastic.co/kibana/kibana:8.5.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_URL=http://elasticsearch:9200
depends_on:
- elasticsearch
filebeat:
image: docker.elastic.co/beats/filebeat:8.5.0
volumes:
- ./filebeat.yml:/usr/share/filebeat/filebeat.yml
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
command: filebeat -e -strict.perms=false
depends_on:
- elasticsearch
volumes:
elasticsearch_data:
2. Logstash Pipeline配置
# logstash.conf
input {
# 通过TCP/UDP接收日志
tcp {
port => 5000
codec => json
}
# 从文件读取
file {
path => "/var/log/app/*.log"
start_position => "beginning"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
# 从Kubernetes读取
kubernetes {
kubernetes_url => "https://kubernetes.default"
ca_file => "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
}
}
filter {
# 解析JSON日志
json {
source => "message"
target => "parsed"
}
# 提取字段
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:message}"
}
}
# 添加时间戳
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
# 添加元数据
mutate {
add_field => {
"environment" => "production"
"datacenter" => "us-east-1"
}
remove_field => ["host"]
}
# 在生产环境中丢弃调试日志
if [level] == "DEBUG" {
drop { }
}
# 标记错误
if [level] =~ /ERROR|FATAL/ {
mutate {
add_tag => ["error"]
}
}
}
output {
# 发送到Elasticsearch
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
document_type => "_doc"
}
# 同时将错误输出到控制台
if "error" in [tags] {
stdout {
codec => rubydebug
}
}
}
3. Filebeat配置
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/app/*.log
fields:
app: myapp
environment: production
multiline.pattern: '^\['
multiline.negate: true
multiline.match: after
- type: docker
enabled: true
hints.enabled: true
hints.default_config:
enabled: true
type: container
paths:
- /var/lib/docker/containers/${data.docker.container.id}/*.log
- type: log
enabled: true
paths:
- /var/log/syslog
- /var/log/auth.log
fields:
service: system
environment: production
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
- add_kubernetes_metadata:
in_cluster: true
- add_host_metadata:
- add_fields:
target: ''
fields:
environment: production
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "filebeat-%{+yyyy.MM.dd}"
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0640
4. Kibana Dashboard和警报
{
"dashboard": {
"title": "应用日志概览",
"panels": [
{
"title": "服务错误率",
"query": "level: ERROR",
"visualization": "bar_chart",
"groupBy": ["service"],
"timeRange": "1h"
},
{
"title": "十大错误信息",
"query": "level: ERROR",
"visualization": "table",
"fields": ["message", "count"],
"sort": [{"count": "desc"}],
"size": 10
},
{
"title": "请求延迟分布",
"query": "duration: *",
"visualization": "histogram"
},
{
"title": "错误随时间变化",
"query": "level: ERROR",
"visualization": "line_chart",
"dateHistogram": "1m"
}
]
},
"alerts": [
{
"name": "高错误率",
"query": "level: ERROR",
"threshold": 100,
"window": "5m",
"action": "slack"
},
{
"name": "关键异常",
"query": "level: FATAL",
"threshold": 1,
"window": "1m",
"action": "email"
}
]
}
5. Loki配置(Kubernetes)
# loki-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: loki-config
namespace: logging
data:
loki-config.yaml: |
auth_enabled: false
ingester:
chunk_idle_period: 3m
chunk_retain_period: 1m
max_chunk_age: 1h
chunk_encoding: snappy
chunk_target_size: 1048576
limits_config:
enforce_metric_name: false
reject_old_samples: true
reject_old_samples_max_age: 168h
schema_config:
configs:
- from: 2020-05-15
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
server:
http_listen_port: 3100
storage_config:
boltdb_shipper:
active_index_directory: /loki/index
cache_location: /loki/cache
shared_store: filesystem
filesystem:
directory: /loki/chunks
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: loki
namespace: logging
spec:
replicas: 1
selector:
matchLabels:
app: loki
template:
metadata:
labels:
app: loki
spec:
containers:
- name: loki
image: grafana/loki:2.8.0
ports:
- containerPort: 3100
volumeMounts:
- name: loki-config
mountPath: /etc/loki
- name: loki-storage
mountPath: /loki
args:
- -config.file=/etc/loki/loki-config.yaml
volumes:
- name: loki-config
configMap:
name: loki-config
- name: loki-storage
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: loki
namespace: logging
spec:
selector:
app: loki
ports:
- port: 3100
targetPort: 3100
6. 日志聚合部署脚本
#!/bin/bash
# deploy-logging.sh - 部署日志基础设施
set -euo pipefail
NAMESPACE="logging"
ENV="${1:-production}"
echo "Deploying logging stack to $ENV..."
# 创建命名空间
kubectl create namespace "$NAMESPACE" --dry-run=client -o yaml | kubectl apply -f -
# 部署Elasticsearch
echo "Deploying Elasticsearch..."
kubectl apply -f elasticsearch-deployment.yaml -n "$NAMESPACE"
kubectl rollout status deployment/elasticsearch -n "$NAMESPACE" --timeout=5m
# 部署Logstash
echo "Deploying Logstash..."
kubectl apply -f logstash-deployment.yaml -n "$NAMESPACE"
kubectl rollout status deployment/logstash -n "$NAMESPACE" --timeout=5m
# 部署Kibana
echo "Deploying Kibana..."
kubectl apply -f kibana-deployment.yaml -n "$NAMESPACE"
kubectl rollout status deployment/kibana -n "$NAMESPACE" --timeout=5m
# 部署Filebeat作为DaemonSet
echo "Deploying Filebeat..."
kubectl apply -f filebeat-daemonset.yaml -n "$NAMESPACE"
# 等待所有pod
echo "Waiting for all logging services..."
kubectl wait --for=condition=ready pod -l app=elasticsearch -n "$NAMESPACE" --timeout=300s
# 创建默认索引模式
echo "Setting up Kibana index pattern..."
kubectl exec -it -n "$NAMESPACE" svc/kibana -- curl -X POST \
http://localhost:5601/api/saved_objects/index-pattern/logs \
-H 'kbn-xsrf: true' \
-H 'Content-Type: application/json' \
-d '{"attributes":{"title":"logs-*","timeFieldName":"@timestamp"}}'
echo "Logging stack deployed successfully!"
echo "Kibana: http://localhost:5601"
最佳实践
✅ 执行
- 解析和结构化日志数据
- 使用适当的日志级别
- 添加上下文信息
- 实施日志保留政策
- 设置基于日志的报警
- 索引重要字段
- 使用一致的时间戳格式
- 实施访问控制
❌ 不要
- 在日志中存储敏感数据
- 在生产中以DEBUG级别记录日志
- 发送原始未结构化的日志
- 忽略存储成本
- 跳过日志解析
- 缺乏对日志系统的监控
- 永久存储日志
- 未经加密记录PII