name: logging-observability description: 生产系统中全面的日志记录和可观测性模式,包括结构化日志记录、分布式追踪、指标收集、日志聚合和告警。触发此技能的词汇 - 日志、记录、跟踪、追踪、指标、可观测性、OpenTelemetry、OTEL、Jaeger、Zipkin、结构化日志、日志级别、调试、信息、警告、错误、致命、关联ID、跨度、ELK、Elasticsearch、Loki、Datadog、Prometheus、Grafana、分布式追踪、日志聚合、告警、监控、JSON日志、遥测。
日志记录与可观测性
概述
可观测性通过日志、指标和追踪来理解系统行为。此技能提供以下模式:
- 结构化日志记录:包含关联ID和上下文数据的JSON日志
- 分布式追踪:跨服务的基于跨度的请求追踪(OpenTelemetry、Jaeger、Zipkin)
- 指标收集:计数器、仪表、直方图用于系统健康(Prometheus模式)
- 日志聚合:集中式日志管理(ELK、Loki、Datadog)
- 告警:基于症状的告警与操作手册
指令
1. 结构化日志记录(JSON日志)
Python实现
import json
import logging
import sys
from datetime import datetime
from contextvars import ContextVar
from typing import Any
# 用于请求追踪的上下文变量
correlation_id: ContextVar[str] = ContextVar('correlation_id', default='')
span_id: ContextVar[str] = ContextVar('span_id', default='')
class StructuredFormatter(logging.Formatter):
"""结构化日志记录的JSON格式化器。"""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"correlation_id": correlation_id.get(),
"span_id": span_id.get(),
}
# 如果存在异常信息,添加
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# 添加额外字段
if hasattr(record, 'structured_data'):
log_data.update(record.structured_data)
return json.dumps(log_data)
def setup_logging():
"""配置结构化日志记录。"""
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(StructuredFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)
# 用法
logger = logging.getLogger(__name__)
logger.info("用户登录", extra={
"structured_data": {
"user_id": "123",
"ip_address": "192.168.1.1",
"action": "login"
}
})
TypeScript实现
interface LogContext {
correlationId?: string;
spanId?: string;
[key: string]: unknown;
}
interface LogEntry {
timestamp: string;
level: string;
message: string;
context: LogContext;
}
class StructuredLogger {
private context: LogContext = {};
withContext(context: LogContext): StructuredLogger {
const child = new StructuredLogger();
child.context = { ...this.context, ...context };
return child;
}
private log(
level: string,
message: string,
data?: Record<string, unknown>,
): void {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
message,
context: { ...this.context, ...data },
};
console.log(JSON.stringify(entry));
}
debug(message: string, data?: Record<string, unknown>): void {
this.log("DEBUG", message, data);
}
info(message: string, data?: Record<string, unknown>): void {
this.log("INFO", message, data);
}
warn(message: string, data?: Record<string, unknown>): void {
this.log("WARN", message, data);
}
error(message: string, data?: Record<string, unknown>): void {
this.log("ERROR", message, data);
}
}
2. 日志级别及何时使用每个级别
| 级别 | 用法 | 示例 |
|---|---|---|
| TRACE | 细粒度调试 | 循环迭代、变量值 |
| DEBUG | 诊断信息 | 函数入口/出口、中间状态 |
| INFO | 正常操作 | 请求开始、作业完成、用户操作 |
| WARN | 潜在问题 | 已弃用API使用、重试尝试、慢查询 |
| ERROR | 需要关注的失败 | 异常捕获、操作失败 |
| FATAL | 关键失败 | 系统无法继续、数据损坏 |
# 日志级别使用示例
logger.debug("处理项", extra={"structured_data": {"item_id": item.id}})
logger.info("订单处理成功", extra={"structured_data": {"order_id": order.id, "total": order.total}})
logger.warning("接近速率限制", extra={"structured_data": {"current": 95, "limit": 100}})
logger.error("支付失败", extra={"structured_data": {"order_id": order.id, "error": str(e)}})
3. 分布式追踪
关联ID和跨度
import uuid
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
import time
@dataclass
class Span:
name: str
trace_id: str
span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16])
parent_span_id: Optional[str] = None
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
attributes: dict = field(default_factory=dict)
def end(self):
self.end_time = time.time()
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0
current_span: ContextVar[Optional[Span]] = ContextVar('current_span', default=None)
class Tracer:
def __init__(self, service_name: str):
self.service_name = service_name
def start_span(self, name: str, parent: Optional[Span] = None) -> Span:
parent = parent or current_span.get()
trace_id = parent.trace_id if parent else str(uuid.uuid4())[:32]
parent_span_id = parent.span_id if parent else None
span = Span(
name=name,
trace_id=trace_id,
parent_span_id=parent_span_id,
attributes={"service": self.service_name}
)
current_span.set(span)
return span
def end_span(self, span: Span):
span.end()
self._export(span)
# 在生成中,使用跨度堆栈
def _export(self, span: Span):
"""将跨度导出到追踪后端。"""
logger.info(f"跨度完成: {span.name}", extra={
"structured_data": {
"trace_id": span.trace_id,
"span_id": span.span_id,
"parent_span_id": span.parent_span_id,
"duration_ms": span.duration_ms,
"attributes": span.attributes
}
})
# 跨度的上下文管理器
from contextlib import contextmanager
@contextmanager
def trace_span(tracer: Tracer, name: str):
span = tracer.start_span(name)
try:
yield span
except Exception as e:
span.attributes["error"] = True
span.attributes["error.message"] = str(e)
raise
finally:
tracer.end_span(span)
# 用法
tracer = Tracer("订单服务")
async def process_order(order_id: str):
with trace_span(tracer, "process_order") as span:
span.attributes["order_id"] = order_id
with trace_span(tracer, "validate_order"):
await validate(order_id)
with trace_span(tracer, "charge_payment"):
await charge(order_id)
4. 指标收集
from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
import time
import threading
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
@dataclass
class Counter:
name: str
labels: Dict[str, str]
value: float = 0
def inc(self, amount: float = 1):
self.value += amount
@dataclass
class Gauge:
name: str
labels: Dict[str, str]
value: float = 0
def set(self, value: float):
self.value = value
def inc(self, amount: float = 1):
self.value += amount
def dec(self, amount: float = 1):
self.value -= amount
@dataclass
class Histogram:
name: str
labels: Dict[str, str]
buckets: List[float]
values: List[float] = None
def __post_init__(self):
self.values = []
self._bucket_counts = {b: 0 for b in self.buckets}
self._bucket_counts[float('inf')] = 0
self._sum = 0
self._count = 0
def observe(self, value: float):
self.values.append(value)
self._sum += value
self._count += 1
for bucket in sorted(self._bucket_counts.keys()):
if value <= bucket:
self._bucket_counts[bucket] += 1
class MetricsRegistry:
def __init__(self):
self._metrics: Dict[str, any] = {}
self._lock = threading.Lock()
def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Counter(name, labels or {})
return self._metrics[key]
def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Gauge(name, labels or {})
return self._metrics[key]
def histogram(self, name: str, buckets: List[float], labels: Dict[str, str] = None) -> Histogram:
key = f"{name}:{labels}"
with self._lock:
if key not in self._metrics:
self._metrics[key] = Histogram(name, labels or {}, buckets)
return self._metrics[key]
# 用法
metrics = MetricsRegistry()
# 请求计数器
request_counter = metrics.counter("http_requests_total", {"method": "GET", "path": "/api/orders"})
request_counter.inc()
# 活跃连接仪表
active_connections = metrics.gauge("active_connections")
active_connections.inc()
# ... 处理连接 ...
active_connections.dec()
# 请求持续时间直方图
request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
start = time.time()
# ... 处理请求 ...
request_duration.observe(time.time() - start)
5. OpenTelemetry模式
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
def setup_opentelemetry(service_name: str, otlp_endpoint: str):
"""使用OTLP导出初始化OpenTelemetry。"""
# 追踪设置
trace_provider = TracerProvider(
resource=Resource.create({"service.name": service_name})
)
trace_provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
)
trace.set_tracer_provider(trace_provider)
# 指标设置
metric_provider = MeterProvider(
resource=Resource.create({"service.name": service_name})
)
metrics.set_meter_provider(metric_provider)
# 自动检测
RequestsInstrumentor().instrument()
return trace.get_tracer(service_name), metrics.get_meter(service_name)
# 与FastAPI一起使用
from fastapi import FastAPI
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
tracer, meter = setup_opentelemetry("订单服务", "http://otel-collector:4317")
# 自定义跨度
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
with tracer.start_as_current_span("fetch_order") as span:
span.set_attribute("order.id", order_id)
order = await order_repository.get(order_id)
span.set_attribute("order.status", order.status)
return order
6. 日志聚合模式
ELK堆栈(Elasticsearch、Logstash、Kibana)
# Logstash管道配置
input {
file {
path => "/var/log/app/*.log"
codec => json
}
}
filter {
# 解析结构化JSON日志
json {
source => "message"
}
# 基于日期添加Elasticsearch索引
mutate {
add_field => {
"[@metadata][index]" => "app-logs-%{+YYYY.MM.dd}"
}
}
# 使用地理位置丰富(如果IP存在)
geoip {
source => "ip_address"
target => "geo"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][index]}"
}
}
Grafana Loki
# Promtail抓取配置
scrape_configs:
- job_name: app-logs
static_configs:
- targets:
- localhost
labels:
job: app-logs
__path__: /var/log/app/*.log
# 将JSON字段提取为标签
pipeline_stages:
- json:
expressions:
level: level
correlation_id: correlation_id
service: service
- labels:
level:
correlation_id:
service:
Datadog代理配置
# datadog.yaml
logs_enabled: true
logs_config:
processing_rules:
- type: exclude_at_match
name: exclude_healthcheck
pattern: "GET /health"
# 自动解析JSON日志
auto_multi_line_detection: true
# 从文件收集日志
logs:
- type: file
path: "/var/log/app/*.log"
service: "order-service"
source: "python"
tags:
- "env:production"
7. 告警设计
Prometheus告警规则
# Prometheus告警规则
groups:
- name: service-alerts
rules:
# 高错误率告警
- alert: HighErrorRate
expr: |
sum(rate(http_requests_total{status=~"5.."}[5m]))
/ sum(rate(http_requests_total[5m])) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "检测到高错误率"
description: "过去5分钟内错误率为 {{ $value | humanizePercentage }}"
runbook_url: "https://wiki.example.com/runbooks/high-error-rate"
# 高延迟告警
- alert: HighLatency
expr: |
histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1
for: 10m
labels:
severity: warning
annotations:
summary: "检测到高延迟"
description: "95th百分位数延迟为 {{ $value }}s"
# 服务下线告警
- alert: ServiceDown
expr: up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "服务 {{ $labels.instance }} 已下线"
description: "{{ $labels.job }} 已下线超过1分钟"
告警严重级别
| 级别 | 响应时间 | 示例 |
|---|---|---|
| Critical | 立即 | 服务下线、高错误率、数据丢失 |
| Warning | 工作时间 | 高延迟、接近限制、重试峰值 |
| Info | 仅记录 | 部署开始、配置更改 |
最佳实践
日志记录
-
适当级别的日志记录:开发用DEBUG,正常操作用INFO,潜在问题用WARN,失败用ERROR,关键失败用FATAL。
-
包含上下文:始终在结构化字段中包含关联ID、追踪ID、用户ID和相关业务标识符。
-
避免敏感数据:永远不要记录密码、令牌、信用卡或PII。必要时实施自动脱敏。
-
使用结构化日志记录:JSON日志便于在日志聚合系统(ELK、Loki、Datadog)中解析和查询。
-
一致的字段命名:跨服务标准化字段名称(例如,始终使用
correlation_id,不要有时用request_id)。
分布式追踪
-
追踪边界:在服务边界、数据库调用、外部API调用和重要操作处创建跨度。
-
传播上下文:通过HTTP头部传递追踪ID和跨度ID跨服务边界(遵循OpenTelemetry标准)。
-
添加有意义的属性:在跨度属性中包含业务上下文(user_id、order_id)和技术上下文(db_query、cache_hit)。
-
适当采样:使用自适应采样 - 追踪100%的错误,根据流量量采样成功请求。
指标
-
跟踪黄金信号:监控四个黄金信号 - 延迟、流量、错误、饱和度。
-
使用正确的指标类型:计数器用于总数(请求),仪表用于当前值(内存),直方图用于分布(延迟)。
-
标签基数:保持标签基数低 - 避免在指标标签中使用高基数值如用户ID。
-
命名约定:遵循Prometheus命名 -
http_requests_total(计数器),process_memory_bytes(仪表),http_request_duration_seconds(直方图)。
告警
-
基于症状告警:对用户影响问题告警(错误率、延迟),而不是原因(CPU使用率)。症状指示什么坏了,原因解释为什么。
-
包含操作手册:每个告警必须链接到操作手册,包含调查步骤、常见原因和修复程序。
-
使用适当的阈值:基于SLO和历史数据设置阈值,而非任意值。
-
告警疲劳:确保告警可操作。不可操作的告警会导致告警疲劳和忽略关键问题。
集成
-
端到端关联:使用关联ID链接日志、追踪和指标,以启用跨系统调试。
-
集中化:使用集中式日志聚合(ELK、Loki)和追踪收集(Jaeger、Zipkin)以获得跨服务可见性。
-
测试可观测性:在开发中验证日志记录、追踪和指标 - 不要在生产中发现缺口。
示例
完整的请求日志记录中间件
import time
import uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
class ObservabilityMiddleware(BaseHTTPMiddleware):
def __init__(self, app, tracer, metrics):
super().__init__(app)
self.tracer = tracer
self.request_counter = metrics.counter("http_requests_total")
self.request_duration = metrics.histogram(
"http_request_duration_seconds",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
async def dispatch(self, request: Request, call_next):
# 提取或生成关联ID
corr_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
correlation_id.set(corr_id)
start_time = time.time()
with self.tracer.start_as_current_span(
f"{request.method} {request.url.path}"
) as span:
span.set_attribute("http.method", request.method)
span.set_attribute("http.url", str(request.url))
span.set_attribute("correlation_id", corr_id)
try:
response = await call_next(request)
span.set_attribute("http.status_code", response.status_code)
# 记录指标
labels = {
"method": request.method,
"path": request.url.path,
"status": str(response.status_code)
}
self.request_counter.labels(**labels).inc()
self.request_duration.labels(**labels).observe(
time.time() - start_time
)
# 添加关联ID到响应
response.headers["X-Correlation-ID"] = corr_id
return response
except Exception as e:
span.set_attribute("error", True)
span.record_exception(e)
raise