name: python-observability description: Python可观测性模式,包括结构化日志记录、指标和分布式跟踪。在添加日志记录、实现指标收集、设置跟踪或调试生产系统时使用。
Python可观测性
为Python应用程序添加结构化日志、指标和跟踪。当生产环境中出现问题时,无需部署新代码即可回答"什么、在哪里、为什么"。
何时使用此技能
- 为应用程序添加结构化日志记录
- 使用Prometheus实现指标收集
- 跨服务设置分布式跟踪
- 通过请求链传播关联ID
- 调试生产问题
- 构建可观测性仪表板
核心概念
1. 结构化日志记录
以JSON格式发出日志,包含生产环境中的一致字段。机器可读的日志支持强大的查询和警报。对于本地开发,考虑使用人类可读的格式。
2. 四个黄金信号
跟踪每个服务边界的延迟、流量、错误和饱和度。
3. 关联ID
为单个请求在所有日志和跨度中传递唯一ID,实现端到端跟踪。
4. 有限基数
保持指标标签值有限。无界标签(如用户ID)会爆炸存储成本。
快速开始
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger()
logger.info("请求已处理", user_id="123", duration_ms=45)
基础模式
模式1:使用Structlog进行结构化日志记录
配置structlog以JSON输出,具有一致字段。
import logging
import structlog
def configure_logging(log_level: str = "INFO") -> None:
"""为应用程序配置结构化日志记录。"""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# 在应用程序启动时初始化
configure_logging("INFO")
logger = structlog.get_logger()
模式2:一致日志字段
每个日志条目应包含用于过滤和关联的标准字段。
import structlog
from contextvars import ContextVar
# 在上下文中存储关联ID
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
logger = structlog.get_logger()
def process_request(request: Request) -> Response:
"""使用结构化日志记录处理请求。"""
logger.info(
"请求已接收",
correlation_id=correlation_id.get(),
method=request.method,
path=request.path,
user_id=request.user_id,
)
try:
result = handle_request(request)
logger.info(
"请求已完成",
correlation_id=correlation_id.get(),
status_code=200,
duration_ms=elapsed,
)
return result
except Exception as e:
logger.error(
"请求失败",
correlation_id=correlation_id.get(),
error_type=type(e).__name__,
error_message=str(e),
)
raise
模式3:语义日志级别
在整个应用程序中一致使用日志级别。
| 级别 | 目的 | 示例 |
|---|---|---|
DEBUG |
开发诊断 | 变量值、内部状态 |
INFO |
请求生命周期、操作 | 请求开始/结束、作业完成 |
WARNING |
可恢复的异常 | 重试尝试、使用回退 |
ERROR |
需要关注的失败 | 异常、服务不可用 |
# DEBUG: 详细的内部信息
logger.debug("缓存查找", key=cache_key, hit=cache_hit)
# INFO: 正常的操作事件
logger.info("订单已创建", order_id=order.id, total=order.total)
# WARNING: 异常但已处理的情况
logger.warning(
"速率限制接近",
current_rate=950,
limit=1000,
reset_seconds=30,
)
# ERROR: 需要调查的失败
logger.error(
"支付处理失败",
order_id=order.id,
error=str(e),
payment_provider="stripe",
)
永远不要在ERROR级别记录预期行为。用户输入错误密码是INFO,而不是ERROR。
模式4:关联ID传播
在入口处生成唯一ID,并通过所有操作传递它。
from contextvars import ContextVar
import uuid
import structlog
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
def set_correlation_id(cid: str | None = None) -> str:
"""为当前上下文设置关联ID。"""
cid = cid or str(uuid.uuid4())
correlation_id.set(cid)
structlog.contextvars.bind_contextvars(correlation_id=cid)
return cid
# FastAPI中间件示例
from fastapi import Request
async def correlation_middleware(request: Request, call_next):
"""中间件用于设置和传播关联ID。"""
# 使用传入的头部或生成新的
cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
set_correlation_id(cid)
response = await call_next(request)
response.headers["X-Correlation-ID"] = cid
return response
传播到出站请求:
import httpx
async def call_downstream_service(endpoint: str, data: dict) -> dict:
"""使用关联ID调用下游服务。"""
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json=data,
headers={"X-Correlation-ID": correlation_id.get()},
)
return response.json()
高级模式
模式5:使用Prometheus的四个黄金信号
跟踪每个服务边界的这些指标:
from prometheus_client import Counter, Histogram, Gauge
# 延迟:请求需要多长时间
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"以秒为单位的请求延迟",
["method", "endpoint", "status"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)
# 流量:请求率
REQUEST_COUNT = Counter(
"http_requests_total",
"总HTTP请求数",
["method", "endpoint", "status"],
)
# 错误:错误率
ERROR_COUNT = Counter(
"http_errors_total",
"总HTTP错误数",
["method", "endpoint", "error_type"],
)
# 饱和度:资源利用率
DB_POOL_USAGE = Gauge(
"db_connection_pool_used",
"正在使用的数据库连接数",
)
检测您的端点:
import time
from functools import wraps
def track_request(func):
"""装饰器用于跟踪请求指标。"""
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
method = request.method
endpoint = request.url.path
start = time.perf_counter()
try:
response = await func(request, *args, **kwargs)
status = str(response.status_code)
return response
except Exception as e:
status = "500"
ERROR_COUNT.labels(
method=method,
endpoint=endpoint,
error_type=type(e).__name__,
).inc()
raise
finally:
duration = time.perf_counter() - start
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)
return wrapper
模式6:有限基数
避免使用具有无界值的标签,以防止指标爆炸。
# 错误:用户ID可能有数百万个值
REQUEST_COUNT.labels(method="GET", user_id=user.id) # 不要这样做!
# 正确:仅使用有界值
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")
# 如果需要每个用户的指标,使用不同的方法:
# - 记录user_id并查询日志
# - 使用单独的分析系统
# - 按类型/层级对用户进行分桶
REQUEST_COUNT.labels(
method="GET",
endpoint="/users",
user_tier="premium", # 有界的值集合
)
模式7:使用上下文管理器的定时操作
为操作创建可重用的定时上下文管理器。
from contextlib import contextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_operation(name: str, **extra_fields):
"""用于定时和记录操作的上下文管理器。"""
start = time.perf_counter()
logger.debug("操作已开始", operation=name, **extra_fields)
try:
yield
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error(
"操作失败",
operation=name,
duration_ms=round(elapsed_ms, 2),
error=str(e),
**extra_fields,
)
raise
else:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"操作已完成",
operation=name,
duration_ms=round(elapsed_ms, 2),
**extra_fields,
)
# 使用
with timed_operation("获取用户订单", user_id=user.id):
orders = await order_repository.get_by_user(user.id)
模式8:OpenTelemetry跟踪
使用OpenTelemetry设置分布式跟踪。
注意: OpenTelemetry正在积极发展。查看官方Python文档以获取最新的API模式和最佳实践。
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
"""配置OpenTelemetry跟踪。"""
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str) -> Order:
"""使用跟踪处理订单。"""
with tracer.start_as_current_span("处理订单") as span:
span.set_attribute("order.id", order_id)
with tracer.start_as_current_span("验证订单"):
validate_order(order_id)
with tracer.start_as_current_span("支付费用"):
charge_payment(order_id)
with tracer.start_as_current_span("发送确认"):
send_confirmation(order_id)
return order
最佳实践总结
- 使用结构化日志记录 - 具有一致字段的JSON日志
- 传播关联ID - 通过所有请求和日志传递
- 跟踪四个黄金信号 - 延迟、流量、错误、饱和度
- 限制标签基数 - 永远不要使用无界值作为指标标签
- 在适当级别记录 - 不要用ERROR喊狼来了
- 包含上下文 - 用户ID、请求ID、操作名称在日志中
- 使用上下文管理器 - 一致的定时和错误处理
- 分离关注点 - 可观测性代码不应污染业务逻辑
- 测试您的可观测性 - 在集成测试中验证日志和指标
- 设置警报 - 没有警报的指标是无用的