Python可观测性Skill python-observability

Python可观测性技能用于为Python应用程序实施可观测性模式,包括结构化日志记录、指标收集和分布式跟踪,帮助在生产环境中快速诊断问题、监控性能并确保系统可靠性。关键词:Python、可观测性、日志记录、指标、跟踪、生产调试、监控、Prometheus、OpenTelemetry。

DevOps 0 次安装 0 次浏览 更新于 3/22/2026

name: python-observability description: Python可观测性模式,包括结构化日志记录、指标和分布式跟踪。在添加日志记录、实现指标收集、设置跟踪或调试生产系统时使用。

Python可观测性

为Python应用程序添加结构化日志、指标和跟踪。当生产环境中出现问题时,无需部署新代码即可回答"什么、在哪里、为什么"。

何时使用此技能

  • 为应用程序添加结构化日志记录
  • 使用Prometheus实现指标收集
  • 跨服务设置分布式跟踪
  • 通过请求链传播关联ID
  • 调试生产问题
  • 构建可观测性仪表板

核心概念

1. 结构化日志记录

以JSON格式发出日志,包含生产环境中的一致字段。机器可读的日志支持强大的查询和警报。对于本地开发,考虑使用人类可读的格式。

2. 四个黄金信号

跟踪每个服务边界的延迟、流量、错误和饱和度。

3. 关联ID

为单个请求在所有日志和跨度中传递唯一ID,实现端到端跟踪。

4. 有限基数

保持指标标签值有限。无界标签(如用户ID)会爆炸存储成本。

快速开始

import structlog

structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
)

logger = structlog.get_logger()
logger.info("请求已处理", user_id="123", duration_ms=45)

基础模式

模式1:使用Structlog进行结构化日志记录

配置structlog以JSON输出,具有一致字段。

import logging
import structlog

def configure_logging(log_level: str = "INFO") -> None:
    """为应用程序配置结构化日志记录。"""
    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer(),
        ],
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, log_level.upper())
        ),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

# 在应用程序启动时初始化
configure_logging("INFO")
logger = structlog.get_logger()

模式2:一致日志字段

每个日志条目应包含用于过滤和关联的标准字段。

import structlog
from contextvars import ContextVar

# 在上下文中存储关联ID
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

logger = structlog.get_logger()

def process_request(request: Request) -> Response:
    """使用结构化日志记录处理请求。"""
    logger.info(
        "请求已接收",
        correlation_id=correlation_id.get(),
        method=request.method,
        path=request.path,
        user_id=request.user_id,
    )

    try:
        result = handle_request(request)
        logger.info(
            "请求已完成",
            correlation_id=correlation_id.get(),
            status_code=200,
            duration_ms=elapsed,
        )
        return result
    except Exception as e:
        logger.error(
            "请求失败",
            correlation_id=correlation_id.get(),
            error_type=type(e).__name__,
            error_message=str(e),
        )
        raise

模式3:语义日志级别

在整个应用程序中一致使用日志级别。

级别 目的 示例
DEBUG 开发诊断 变量值、内部状态
INFO 请求生命周期、操作 请求开始/结束、作业完成
WARNING 可恢复的异常 重试尝试、使用回退
ERROR 需要关注的失败 异常、服务不可用
# DEBUG: 详细的内部信息
logger.debug("缓存查找", key=cache_key, hit=cache_hit)

# INFO: 正常的操作事件
logger.info("订单已创建", order_id=order.id, total=order.total)

# WARNING: 异常但已处理的情况
logger.warning(
    "速率限制接近",
    current_rate=950,
    limit=1000,
    reset_seconds=30,
)

# ERROR: 需要调查的失败
logger.error(
    "支付处理失败",
    order_id=order.id,
    error=str(e),
    payment_provider="stripe",
)

永远不要在ERROR级别记录预期行为。用户输入错误密码是INFO,而不是ERROR

模式4:关联ID传播

在入口处生成唯一ID,并通过所有操作传递它。

from contextvars import ContextVar
import uuid
import structlog

correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

def set_correlation_id(cid: str | None = None) -> str:
    """为当前上下文设置关联ID。"""
    cid = cid or str(uuid.uuid4())
    correlation_id.set(cid)
    structlog.contextvars.bind_contextvars(correlation_id=cid)
    return cid

# FastAPI中间件示例
from fastapi import Request

async def correlation_middleware(request: Request, call_next):
    """中间件用于设置和传播关联ID。"""
    # 使用传入的头部或生成新的
    cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
    set_correlation_id(cid)

    response = await call_next(request)
    response.headers["X-Correlation-ID"] = cid
    return response

传播到出站请求:

import httpx

async def call_downstream_service(endpoint: str, data: dict) -> dict:
    """使用关联ID调用下游服务。"""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            endpoint,
            json=data,
            headers={"X-Correlation-ID": correlation_id.get()},
        )
        return response.json()

高级模式

模式5:使用Prometheus的四个黄金信号

跟踪每个服务边界的这些指标:

from prometheus_client import Counter, Histogram, Gauge

# 延迟:请求需要多长时间
REQUEST_LATENCY = Histogram(
    "http_request_duration_seconds",
    "以秒为单位的请求延迟",
    ["method", "endpoint", "status"],
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)

# 流量:请求率
REQUEST_COUNT = Counter(
    "http_requests_total",
    "总HTTP请求数",
    ["method", "endpoint", "status"],
)

# 错误:错误率
ERROR_COUNT = Counter(
    "http_errors_total",
    "总HTTP错误数",
    ["method", "endpoint", "error_type"],
)

# 饱和度:资源利用率
DB_POOL_USAGE = Gauge(
    "db_connection_pool_used",
    "正在使用的数据库连接数",
)

检测您的端点:

import time
from functools import wraps

def track_request(func):
    """装饰器用于跟踪请求指标。"""
    @wraps(func)
    async def wrapper(request: Request, *args, **kwargs):
        method = request.method
        endpoint = request.url.path
        start = time.perf_counter()

        try:
            response = await func(request, *args, **kwargs)
            status = str(response.status_code)
            return response
        except Exception as e:
            status = "500"
            ERROR_COUNT.labels(
                method=method,
                endpoint=endpoint,
                error_type=type(e).__name__,
            ).inc()
            raise
        finally:
            duration = time.perf_counter() - start
            REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
            REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)

    return wrapper

模式6:有限基数

避免使用具有无界值的标签,以防止指标爆炸。

# 错误:用户ID可能有数百万个值
REQUEST_COUNT.labels(method="GET", user_id=user.id)  # 不要这样做!

# 正确:仅使用有界值
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")

# 如果需要每个用户的指标,使用不同的方法:
# - 记录user_id并查询日志
# - 使用单独的分析系统
# - 按类型/层级对用户进行分桶
REQUEST_COUNT.labels(
    method="GET",
    endpoint="/users",
    user_tier="premium",  # 有界的值集合
)

模式7:使用上下文管理器的定时操作

为操作创建可重用的定时上下文管理器。

from contextlib import contextmanager
import time
import structlog

logger = structlog.get_logger()

@contextmanager
def timed_operation(name: str, **extra_fields):
    """用于定时和记录操作的上下文管理器。"""
    start = time.perf_counter()
    logger.debug("操作已开始", operation=name, **extra_fields)

    try:
        yield
    except Exception as e:
        elapsed_ms = (time.perf_counter() - start) * 1000
        logger.error(
            "操作失败",
            operation=name,
            duration_ms=round(elapsed_ms, 2),
            error=str(e),
            **extra_fields,
        )
        raise
    else:
        elapsed_ms = (time.perf_counter() - start) * 1000
        logger.info(
            "操作已完成",
            operation=name,
            duration_ms=round(elapsed_ms, 2),
            **extra_fields,
        )

# 使用
with timed_operation("获取用户订单", user_id=user.id):
    orders = await order_repository.get_by_user(user.id)

模式8:OpenTelemetry跟踪

使用OpenTelemetry设置分布式跟踪。

注意: OpenTelemetry正在积极发展。查看官方Python文档以获取最新的API模式和最佳实践。

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
    """配置OpenTelemetry跟踪。"""
    provider = TracerProvider()
    processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

async def process_order(order_id: str) -> Order:
    """使用跟踪处理订单。"""
    with tracer.start_as_current_span("处理订单") as span:
        span.set_attribute("order.id", order_id)

        with tracer.start_as_current_span("验证订单"):
            validate_order(order_id)

        with tracer.start_as_current_span("支付费用"):
            charge_payment(order_id)

        with tracer.start_as_current_span("发送确认"):
            send_confirmation(order_id)

        return order

最佳实践总结

  1. 使用结构化日志记录 - 具有一致字段的JSON日志
  2. 传播关联ID - 通过所有请求和日志传递
  3. 跟踪四个黄金信号 - 延迟、流量、错误、饱和度
  4. 限制标签基数 - 永远不要使用无界值作为指标标签
  5. 在适当级别记录 - 不要用ERROR喊狼来了
  6. 包含上下文 - 用户ID、请求ID、操作名称在日志中
  7. 使用上下文管理器 - 一致的定时和错误处理
  8. 分离关注点 - 可观测性代码不应污染业务逻辑
  9. 测试您的可观测性 - 在集成测试中验证日志和指标
  10. 设置警报 - 没有警报的指标是无用的