相关性追踪Skill correlation-tracing

实现分布式追踪和相关性ID,用于跨微服务跟踪请求,理解系统行为,适用于微服务架构、调试分布式系统、性能监控、请求流程可视化、跨服务错误跟踪、依赖分析和延迟优化。

架构设计 0 次安装 0 次浏览 更新于 3/3/2026

实施分布式跟踪和相关性ID,跨微服务跟踪请求,理解系统行为。

何时使用

  • 微服务架构
  • 调试分布式系统
  • 性能监控
  • 请求流程可视化
  • 跨服务错误跟踪
  • 依赖分析
  • 延迟优化

实现示例

1. 相关性ID中间件(Express)

import express from 'express';
import { v4 as uuidv4 } from 'uuid';

// 异步本地存储上下文
import { AsyncLocalStorage } from 'async_hooks';

const traceContext = new AsyncLocalStorage<Map<string, any>>();

interface TraceContext {
  traceId: string;
  spanId: string;
  parentSpanId?: string;
  serviceName: string;
}

function correlationMiddleware(serviceName: string) {
  return (
    req: express.Request,
    res: express.Response,
    next: express.NextFunction
  ) => {
    // 提取或生成跟踪ID
    const traceId = req.headers['x-trace-id'] as string || uuidv4();
    const parentSpanId = req.headers['x-span-id'] as string;
    const spanId = uuidv4();

    // 设置上下文
    const context = new Map<string, any>();
    context.set('traceId', traceId);
    context.set('spanId', spanId);
    context.set('parentSpanId', parentSpanId);
    context.set('serviceName', serviceName);

    // 注入跟踪头
    res.setHeader('X-Trace-Id', traceId);
    res.setHeader('X-Span-Id', spanId);

    // 在上下文中运行
    traceContext.run(context, () => {
      next();
    });
  };
}

// 助手函数获取当前上下文
function getTraceContext(): TraceContext | null {
  const context = traceContext.getStore();
  if (!context) return null;

  return {
    traceId: context.get('traceId'),
    spanId: context.get('spanId'),
    parentSpanId: context.get('parentSpanId'),
    serviceName: context.get('serviceName')
  };
}

// 增强的记录器,带有跟踪上下文
class TracedLogger {
  log(level: string, message: string, data?: any): void {
    const context = getTraceContext();

    const logEntry = {
      level,
      message,
      ...data,
      ...context,
      timestamp: new Date().toISOString()
    };

    console.log(JSON.stringify(logEntry));
  }

  info(message: string, data?: any): void {
    this.log('info', message, data);
  }

  error(message: string, data?: any): void {
    this.log('error', message, data);
  }

  warn(message: string, data?: any): void {
    this.log('warn', message, data);
  }
}

const logger = new TracedLogger();

// 带有跟踪传播的HTTP客户端
async function tracedFetch(
  url: string,
  options: RequestInit = {}
): Promise<Response> {
  const context = getTraceContext();

  const headers = new Headers(options.headers);

  if (context) {
    headers.set('X-Trace-Id', context.traceId);
    headers.set('X-Span-Id', context.spanId);
    headers.set('X-Parent-Span-Id', context.spanId);
  }

  const startTime = Date.now();

  try {
    const response = await fetch(url, {
      ...options,
      headers
    });

    const duration = Date.now() - startTime;

    logger.info('HTTP request completed', {
      method: options.method || 'GET',
      url,
      statusCode: response.status,
      duration
    });

    return response;
  } catch (error) {
    const duration = Date.now() - startTime;

    logger.error('HTTP request failed', {
      method: options.method || 'GET',
      url,
      error: (error as Error).message,
      duration
    });

    throw error;
  }
}

// 使用方法
const app = express();

app.use(correlationMiddleware('api-service'));

app.get('/api/users/:id', async (req, res) => {
  logger.info('Fetching user', { userId: req.params.id });

  // 调用另一个服务,带有跟踪传播
  const response = await tracedFetch(
    `http://user-service/users/${req.params.id}`
  );

  const data = await response.json();

  logger.info('User fetched successfully');

  res.json(data);
});

2. OpenTelemetry集成

import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';

// 配置OpenTelemetry
const sdk = new NodeSDK({
  resource: new Resource({
    [SemanticResourceAttributes.SERVICE_NAME]: 'my-service',
    [SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
  }),
  traceExporter: new JaegerExporter({
    endpoint: 'http://localhost:14268/api/traces',
  }),
  instrumentations: [
    getNodeAutoInstrumentations({
      '@opentelemetry/instrumentation-http': {
        enabled: true,
      },
      '@opentelemetry/instrumentation-express': {
        enabled: true,
      },
      '@opentelemetry/instrumentation-pg': {
        enabled: true,
      },
    }),
  ],
});

sdk.start();

// 自定义跨度
import { trace, SpanStatusCode } from '@opentelemetry/api';

const tracer = trace.getTracer('my-service');

async function processOrder(orderId: string) {
  const span = tracer.startSpan('process_order');

  span.setAttribute('order.id', orderId);

  try {
    // 验证订单
    const validateSpan = tracer.startSpan('validate_order', {
      parent: span,
    });

    await validateOrder(orderId);
    validateSpan.setStatus({ code: SpanStatusCode.OK });
    validateSpan.end();

    // 处理支付
    const paymentSpan = tracer.startSpan('process_payment', {
      parent: span,
    });

    await processPayment(orderId);
    paymentSpan.setStatus({ code: SpanStatusCode.OK });
    paymentSpan.end();

    span.setStatus({ code: SpanStatusCode.OK });
  } catch (error) {
    span.setStatus({
      code: SpanStatusCode.ERROR,
      message: (error as Error).message,
    });
    span.recordException(error as Error);
    throw error;
  } finally {
    span.end();
  }
}

async function validateOrder(orderId: string) {
  // 验证逻辑
}

async function processPayment(orderId: string) {
  // 支付逻辑
}

3. Python分布式跟踪

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import Resource
from flask import Flask, request
import requests
import uuid

# 设置跟踪
resource = Resource.create({"service.name": "python-service"})
trace.set_tracer_provider(TracerProvider(resource=resource))

jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

# 自动仪器Flask和请求
app = Flask(__name__)
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()

tracer = trace.get_tracer(__name__)

@app.route('/api/orders/<order_id>')
def get_order(order_id):
    # 当前跨度自动由FlaskInstrumentor创建

    with tracer.start_as_current_span("fetch_order_details") as span:
        span.set_attribute("order.id", order_id)

        # 从数据库获取
        with tracer.start_as_current_span("database_query"):
            order = fetch_order_from_db(order_id)

        # 调用另一个服务(自动跟踪)
        with tracer.start_as_current_span("fetch_user_details"):
            user = requests.get(
                f"http://user-service/users/{order['user_id']}"
            ).json()

        return {
            "order": order,
            "user": user
        }

def fetch_order_from_db(order_id):
    # 数据库逻辑
    return {"id": order_id, "user_id": "user123"}

if __name__ == '__main__':
    app.run(port=5000)

4. 手动跟踪传播

interface Span {
  traceId: string;
  spanId: string;
  parentSpanId?: string;
  name: string;
  serviceName: string;
  startTime: number;
  endTime?: number;
  duration?: number;
  tags: Record<string, any>;
  logs: Array<{ timestamp: number; message: string; fields?: any }>;
  status: 'ok' | 'error';
}

class DistributedTracer {
  private spans: Span[] = [];

  startSpan(
    name: string,
    parentSpanId?: string
  ): Span {
    const context = getTraceContext();

    const span: Span = {
      traceId: context?.traceId || uuidv4(),
      spanId: uuidv4(),
      parentSpanId: parentSpanId || context?.parentSpanId,
      name,
      serviceName: context?.serviceName || 'unknown',
      startTime: Date.now(),
      tags: {},
      logs: [],
      status: 'ok'
    };

    this.spans.push(span);
    return span;
  }

  endSpan(span: Span): void {
    span.endTime = Date.now();
    span.duration = span.endTime - span.startTime;

    // 发送到跟踪后端
    this.reportSpan(span);
  }

  setTag(span: Span, key: string, value: any): void {
    span.tags[key] = value;
  }

  logEvent(span: Span, message: string, fields?: any): void {
    span.logs.push({
      timestamp: Date.now(),
      message,
      fields
    });
  }

  setError(span: Span, error: Error): void {
    span.status = 'error';
    span.tags['error'] = true;
    span.tags['error.message'] = error.message;
    span.tags['error.stack'] = error.stack;
  }

  private async reportSpan(span: Span): Promise<void> {
    // 发送到Jaeger、Zipkin或其他后端
    console.log('Reporting span:', JSON.stringify(span, null, 2));

    // 在生产中:
    // await fetch('http://tracing-collector/api/spans', {
    //   method: 'POST',
    //   headers: { 'Content-Type': 'application/json' },
    //   body: JSON.stringify(span)
    // });
  }

  getAllSpans(): Span[] {
    return this.spans;
  }

  getTrace(traceId: string): Span[] {
    return this.spans.filter(s => s.traceId === traceId);
  }
}

const tracer = new DistributedTracer();

// 使用方法
async function handleRequest() {
  const span = tracer.startSpan('handle_request');

  tracer.setTag(span, 'http.method', 'GET');
  tracer.setTag(span, 'http.url', '/api/users/123');

  try {
    // 数据库操作
    const dbSpan = tracer.startSpan('database_query', span.spanId);
    tracer.setTag(dbSpan, 'db.type', 'postgresql');
    tracer.setTag(dbSpan, 'db.statement', 'SELECT * FROM users WHERE id = $1');

    await queryDatabase();

    tracer.endSpan(dbSpan);

    // 外部API调用
    const apiSpan = tracer.startSpan('external_api_call', span.spanId);
    tracer.setTag(apiSpan, 'http.url', 'https://api.example.com/data');

    await callExternalAPI();

    tracer.endSpan(apiSpan);

    tracer.logEvent(span, 'Request completed successfully');
    tracer.endSpan(span);
  } catch (error) {
    tracer.setError(span, error as Error);
    tracer.logEvent(span, 'Request failed', { error: (error as Error).message });
    tracer.endSpan(span);
    throw error;
  }
}

async function queryDatabase() {
  await new Promise(resolve => setTimeout(resolve, 100));
}

async function callExternalAPI() {
  await new Promise(resolve => setTimeout(resolve, 200));
}

最佳实践

✅ 做

  • 在入口点生成跟踪ID
  • 在服务间传播跟踪上下文
  • 在日志中包含相关ID
  • 使用结构化日志
  • 设置适当的跨度属性
  • 在高流量系统中采样跟踪
  • 监控跟踪收集开销
  • 实施上下文传播

❌ 不做

  • 跳过跟踪传播
  • 没有相关上下文的日志
  • 创建太多跨度
  • 在跨度中存储敏感数据
  • 在跟踪报告上阻塞
  • 忘记错误跟踪

跟踪头

X-Trace-Id: 跟踪标识
X-Span-Id: 当前跨度
X-Parent-Span-Id: 父跨度
X-Sampled: 采样决策

资源