日志与可观测性Skill logging-observability

该技能提供生产系统中全面的日志记录和可观测性模式,包括结构化日志记录、分布式追踪、指标收集、日志聚合和告警。适用于开发和运维团队,确保系统健康监控、故障排除和性能优化。关键词:日志记录、可观测性、分布式追踪、指标监控、日志聚合、告警系统、OpenTelemetry、Prometheus、ELK、DevOps。

DevOps 0 次安装 0 次浏览 更新于 3/24/2026

name: logging-observability description: 生产系统中全面的日志记录和可观测性模式,包括结构化日志记录、分布式追踪、指标收集、日志聚合和告警。触发此技能的词汇 - 日志、记录、跟踪、追踪、指标、可观测性、OpenTelemetry、OTEL、Jaeger、Zipkin、结构化日志、日志级别、调试、信息、警告、错误、致命、关联ID、跨度、ELK、Elasticsearch、Loki、Datadog、Prometheus、Grafana、分布式追踪、日志聚合、告警、监控、JSON日志、遥测。

日志记录与可观测性

概述

可观测性通过日志、指标和追踪来理解系统行为。此技能提供以下模式:

  • 结构化日志记录:包含关联ID和上下文数据的JSON日志
  • 分布式追踪:跨服务的基于跨度的请求追踪(OpenTelemetry、Jaeger、Zipkin)
  • 指标收集:计数器、仪表、直方图用于系统健康(Prometheus模式)
  • 日志聚合:集中式日志管理(ELK、Loki、Datadog)
  • 告警:基于症状的告警与操作手册

指令

1. 结构化日志记录(JSON日志)

Python实现

import json
import logging
import sys
from datetime import datetime
from contextvars import ContextVar
from typing import Any

# 用于请求追踪的上下文变量
correlation_id: ContextVar[str] = ContextVar('correlation_id', default='')
span_id: ContextVar[str] = ContextVar('span_id', default='')

class StructuredFormatter(logging.Formatter):
    """结构化日志记录的JSON格式化器。"""

    def format(self, record: logging.LogRecord) -> str:
        log_data = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "correlation_id": correlation_id.get(),
            "span_id": span_id.get(),
        }

        # 如果存在异常信息,添加
        if record.exc_info:
            log_data["exception"] = self.formatException(record.exc_info)

        # 添加额外字段
        if hasattr(record, 'structured_data'):
            log_data.update(record.structured_data)

        return json.dumps(log_data)

def setup_logging():
    """配置结构化日志记录。"""
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(StructuredFormatter())

    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)
    root_logger.addHandler(handler)

# 用法
logger = logging.getLogger(__name__)
logger.info("用户登录", extra={
    "structured_data": {
        "user_id": "123",
        "ip_address": "192.168.1.1",
        "action": "login"
    }
})

TypeScript实现

interface LogContext {
  correlationId?: string;
  spanId?: string;
  [key: string]: unknown;
}

interface LogEntry {
  timestamp: string;
  level: string;
  message: string;
  context: LogContext;
}

class StructuredLogger {
  private context: LogContext = {};

  withContext(context: LogContext): StructuredLogger {
    const child = new StructuredLogger();
    child.context = { ...this.context, ...context };
    return child;
  }

  private log(
    level: string,
    message: string,
    data?: Record<string, unknown>,
  ): void {
    const entry: LogEntry = {
      timestamp: new Date().toISOString(),
      level,
      message,
      context: { ...this.context, ...data },
    };
    console.log(JSON.stringify(entry));
  }

  debug(message: string, data?: Record<string, unknown>): void {
    this.log("DEBUG", message, data);
  }

  info(message: string, data?: Record<string, unknown>): void {
    this.log("INFO", message, data);
  }

  warn(message: string, data?: Record<string, unknown>): void {
    this.log("WARN", message, data);
  }

  error(message: string, data?: Record<string, unknown>): void {
    this.log("ERROR", message, data);
  }
}

2. 日志级别及何时使用每个级别

级别 用法 示例
TRACE 细粒度调试 循环迭代、变量值
DEBUG 诊断信息 函数入口/出口、中间状态
INFO 正常操作 请求开始、作业完成、用户操作
WARN 潜在问题 已弃用API使用、重试尝试、慢查询
ERROR 需要关注的失败 异常捕获、操作失败
FATAL 关键失败 系统无法继续、数据损坏
# 日志级别使用示例
logger.debug("处理项", extra={"structured_data": {"item_id": item.id}})
logger.info("订单处理成功", extra={"structured_data": {"order_id": order.id, "total": order.total}})
logger.warning("接近速率限制", extra={"structured_data": {"current": 95, "limit": 100}})
logger.error("支付失败", extra={"structured_data": {"order_id": order.id, "error": str(e)}})

3. 分布式追踪

关联ID和跨度

import uuid
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Optional
import time

@dataclass
class Span:
    name: str
    trace_id: str
    span_id: str = field(default_factory=lambda: str(uuid.uuid4())[:16])
    parent_span_id: Optional[str] = None
    start_time: float = field(default_factory=time.time)
    end_time: Optional[float] = None
    attributes: dict = field(default_factory=dict)

    def end(self):
        self.end_time = time.time()

    @property
    def duration_ms(self) -> float:
        if self.end_time:
            return (self.end_time - self.start_time) * 1000
        return 0

current_span: ContextVar[Optional[Span]] = ContextVar('current_span', default=None)

class Tracer:
    def __init__(self, service_name: str):
        self.service_name = service_name

    def start_span(self, name: str, parent: Optional[Span] = None) -> Span:
        parent = parent or current_span.get()
        trace_id = parent.trace_id if parent else str(uuid.uuid4())[:32]
        parent_span_id = parent.span_id if parent else None

        span = Span(
            name=name,
            trace_id=trace_id,
            parent_span_id=parent_span_id,
            attributes={"service": self.service_name}
        )
        current_span.set(span)
        return span

    def end_span(self, span: Span):
        span.end()
        self._export(span)
        # 在生成中,使用跨度堆栈

    def _export(self, span: Span):
        """将跨度导出到追踪后端。"""
        logger.info(f"跨度完成: {span.name}", extra={
            "structured_data": {
                "trace_id": span.trace_id,
                "span_id": span.span_id,
                "parent_span_id": span.parent_span_id,
                "duration_ms": span.duration_ms,
                "attributes": span.attributes
            }
        })

# 跨度的上下文管理器
from contextlib import contextmanager

@contextmanager
def trace_span(tracer: Tracer, name: str):
    span = tracer.start_span(name)
    try:
        yield span
    except Exception as e:
        span.attributes["error"] = True
        span.attributes["error.message"] = str(e)
        raise
    finally:
        tracer.end_span(span)

# 用法
tracer = Tracer("订单服务")

async def process_order(order_id: str):
    with trace_span(tracer, "process_order") as span:
        span.attributes["order_id"] = order_id

        with trace_span(tracer, "validate_order"):
            await validate(order_id)

        with trace_span(tracer, "charge_payment"):
            await charge(order_id)

4. 指标收集

from dataclasses import dataclass
from typing import Dict, List
from enum import Enum
import time
import threading

class MetricType(Enum):
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"

@dataclass
class Counter:
    name: str
    labels: Dict[str, str]
    value: float = 0

    def inc(self, amount: float = 1):
        self.value += amount

@dataclass
class Gauge:
    name: str
    labels: Dict[str, str]
    value: float = 0

    def set(self, value: float):
        self.value = value

    def inc(self, amount: float = 1):
        self.value += amount

    def dec(self, amount: float = 1):
        self.value -= amount

@dataclass
class Histogram:
    name: str
    labels: Dict[str, str]
    buckets: List[float]
    values: List[float] = None

    def __post_init__(self):
        self.values = []
        self._bucket_counts = {b: 0 for b in self.buckets}
        self._bucket_counts[float('inf')] = 0
        self._sum = 0
        self._count = 0

    def observe(self, value: float):
        self.values.append(value)
        self._sum += value
        self._count += 1
        for bucket in sorted(self._bucket_counts.keys()):
            if value <= bucket:
                self._bucket_counts[bucket] += 1

class MetricsRegistry:
    def __init__(self):
        self._metrics: Dict[str, any] = {}
        self._lock = threading.Lock()

    def counter(self, name: str, labels: Dict[str, str] = None) -> Counter:
        key = f"{name}:{labels}"
        with self._lock:
            if key not in self._metrics:
                self._metrics[key] = Counter(name, labels or {})
            return self._metrics[key]

    def gauge(self, name: str, labels: Dict[str, str] = None) -> Gauge:
        key = f"{name}:{labels}"
        with self._lock:
            if key not in self._metrics:
                self._metrics[key] = Gauge(name, labels or {})
            return self._metrics[key]

    def histogram(self, name: str, buckets: List[float], labels: Dict[str, str] = None) -> Histogram:
        key = f"{name}:{labels}"
        with self._lock:
            if key not in self._metrics:
                self._metrics[key] = Histogram(name, labels or {}, buckets)
            return self._metrics[key]

# 用法
metrics = MetricsRegistry()

# 请求计数器
request_counter = metrics.counter("http_requests_total", {"method": "GET", "path": "/api/orders"})
request_counter.inc()

# 活跃连接仪表
active_connections = metrics.gauge("active_connections")
active_connections.inc()
# ... 处理连接 ...
active_connections.dec()

# 请求持续时间直方图
request_duration = metrics.histogram(
    "http_request_duration_seconds",
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)

start = time.time()
# ... 处理请求 ...
request_duration.observe(time.time() - start)

5. OpenTelemetry模式

from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

def setup_opentelemetry(service_name: str, otlp_endpoint: str):
    """使用OTLP导出初始化OpenTelemetry。"""

    # 追踪设置
    trace_provider = TracerProvider(
        resource=Resource.create({"service.name": service_name})
    )
    trace_provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
    )
    trace.set_tracer_provider(trace_provider)

    # 指标设置
    metric_provider = MeterProvider(
        resource=Resource.create({"service.name": service_name})
    )
    metrics.set_meter_provider(metric_provider)

    # 自动检测
    RequestsInstrumentor().instrument()

    return trace.get_tracer(service_name), metrics.get_meter(service_name)

# 与FastAPI一起使用
from fastapi import FastAPI

app = FastAPI()
FastAPIInstrumentor.instrument_app(app)

tracer, meter = setup_opentelemetry("订单服务", "http://otel-collector:4317")

# 自定义跨度
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
    with tracer.start_as_current_span("fetch_order") as span:
        span.set_attribute("order.id", order_id)
        order = await order_repository.get(order_id)
        span.set_attribute("order.status", order.status)
        return order

6. 日志聚合模式

ELK堆栈(Elasticsearch、Logstash、Kibana)

# Logstash管道配置
input {
  file {
    path => "/var/log/app/*.log"
    codec => json
  }
}

filter {
  # 解析结构化JSON日志
  json {
    source => "message"
  }

  # 基于日期添加Elasticsearch索引
  mutate {
    add_field => {
      "[@metadata][index]" => "app-logs-%{+YYYY.MM.dd}"
    }
  }

  # 使用地理位置丰富(如果IP存在)
  geoip {
    source => "ip_address"
    target => "geo"
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "%{[@metadata][index]}"
  }
}

Grafana Loki

# Promtail抓取配置
scrape_configs:
  - job_name: app-logs
    static_configs:
      - targets:
          - localhost
        labels:
          job: app-logs
          __path__: /var/log/app/*.log

    # 将JSON字段提取为标签
    pipeline_stages:
      - json:
          expressions:
            level: level
            correlation_id: correlation_id
            service: service
      - labels:
          level:
          correlation_id:
          service:

Datadog代理配置

# datadog.yaml
logs_enabled: true

logs_config:
  processing_rules:
    - type: exclude_at_match
      name: exclude_healthcheck
      pattern: "GET /health"

  # 自动解析JSON日志
  auto_multi_line_detection: true

# 从文件收集日志
logs:
  - type: file
    path: "/var/log/app/*.log"
    service: "order-service"
    source: "python"
    tags:
      - "env:production"

7. 告警设计

Prometheus告警规则

# Prometheus告警规则
groups:
  - name: service-alerts
    rules:
      # 高错误率告警
      - alert: HighErrorRate
        expr: |
          sum(rate(http_requests_total{status=~"5.."}[5m]))
          / sum(rate(http_requests_total[5m])) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "检测到高错误率"
          description: "过去5分钟内错误率为 {{ $value | humanizePercentage }}"
          runbook_url: "https://wiki.example.com/runbooks/high-error-rate"

      # 高延迟告警
      - alert: HighLatency
        expr: |
          histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket[5m])) by (le)) > 1
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "检测到高延迟"
          description: "95th百分位数延迟为 {{ $value }}s"

      # 服务下线告警
      - alert: ServiceDown
        expr: up == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "服务 {{ $labels.instance }} 已下线"
          description: "{{ $labels.job }} 已下线超过1分钟"

告警严重级别

级别 响应时间 示例
Critical 立即 服务下线、高错误率、数据丢失
Warning 工作时间 高延迟、接近限制、重试峰值
Info 仅记录 部署开始、配置更改

最佳实践

日志记录

  1. 适当级别的日志记录:开发用DEBUG,正常操作用INFO,潜在问题用WARN,失败用ERROR,关键失败用FATAL。

  2. 包含上下文:始终在结构化字段中包含关联ID、追踪ID、用户ID和相关业务标识符。

  3. 避免敏感数据:永远不要记录密码、令牌、信用卡或PII。必要时实施自动脱敏。

  4. 使用结构化日志记录:JSON日志便于在日志聚合系统(ELK、Loki、Datadog)中解析和查询。

  5. 一致的字段命名:跨服务标准化字段名称(例如,始终使用correlation_id,不要有时用request_id)。

分布式追踪

  1. 追踪边界:在服务边界、数据库调用、外部API调用和重要操作处创建跨度。

  2. 传播上下文:通过HTTP头部传递追踪ID和跨度ID跨服务边界(遵循OpenTelemetry标准)。

  3. 添加有意义的属性:在跨度属性中包含业务上下文(user_id、order_id)和技术上下文(db_query、cache_hit)。

  4. 适当采样:使用自适应采样 - 追踪100%的错误,根据流量量采样成功请求。

指标

  1. 跟踪黄金信号:监控四个黄金信号 - 延迟、流量、错误、饱和度。

  2. 使用正确的指标类型:计数器用于总数(请求),仪表用于当前值(内存),直方图用于分布(延迟)。

  3. 标签基数:保持标签基数低 - 避免在指标标签中使用高基数值如用户ID。

  4. 命名约定:遵循Prometheus命名 - http_requests_total(计数器),process_memory_bytes(仪表),http_request_duration_seconds(直方图)。

告警

  1. 基于症状告警:对用户影响问题告警(错误率、延迟),而不是原因(CPU使用率)。症状指示什么坏了,原因解释为什么。

  2. 包含操作手册:每个告警必须链接到操作手册,包含调查步骤、常见原因和修复程序。

  3. 使用适当的阈值:基于SLO和历史数据设置阈值,而非任意值。

  4. 告警疲劳:确保告警可操作。不可操作的告警会导致告警疲劳和忽略关键问题。

集成

  1. 端到端关联:使用关联ID链接日志、追踪和指标,以启用跨系统调试。

  2. 集中化:使用集中式日志聚合(ELK、Loki)和追踪收集(Jaeger、Zipkin)以获得跨服务可见性。

  3. 测试可观测性:在开发中验证日志记录、追踪和指标 - 不要在生产中发现缺口。

示例

完整的请求日志记录中间件

import time
import uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

class ObservabilityMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, tracer, metrics):
        super().__init__(app)
        self.tracer = tracer
        self.request_counter = metrics.counter("http_requests_total")
        self.request_duration = metrics.histogram(
            "http_request_duration_seconds",
            buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
        )

    async def dispatch(self, request: Request, call_next):
        # 提取或生成关联ID
        corr_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
        correlation_id.set(corr_id)

        start_time = time.time()

        with self.tracer.start_as_current_span(
            f"{request.method} {request.url.path}"
        ) as span:
            span.set_attribute("http.method", request.method)
            span.set_attribute("http.url", str(request.url))
            span.set_attribute("correlation_id", corr_id)

            try:
                response = await call_next(request)

                span.set_attribute("http.status_code", response.status_code)

                # 记录指标
                labels = {
                    "method": request.method,
                    "path": request.url.path,
                    "status": str(response.status_code)
                }
                self.request_counter.labels(**labels).inc()
                self.request_duration.labels(**labels).observe(
                    time.time() - start_time
                )

                # 添加关联ID到响应
                response.headers["X-Correlation-ID"] = corr_id

                return response

            except Exception as e:
                span.set_attribute("error", True)
                span.record_exception(e)
                raise