事件驱动架构Skill event-driven

事件驱动架构技能用于实现松散耦合、可扩展的分布式系统,通过事件通信模式如消息队列、发布/订阅、事件溯源、CQRS和Saga。适用于异步消息传递、分布式事务、事件存储、数据流处理等场景。关键词:事件驱动、消息队列、Kafka、RabbitMQ、事件溯源、CQRS、Saga、分布式系统、微服务。

架构设计 0 次安装 0 次浏览 更新于 3/24/2026

name: 事件驱动 description: 事件驱动架构模式包括消息队列、发布/订阅、事件溯源、CQRS和Saga。在实现异步消息传递、分布式事务、事件存储、命令查询分离、领域事件、集成事件、数据流、编排、编舞或与RabbitMQ、Kafka、Apache Pulsar、AWS SQS、AWS SNS、NATS、事件总线或消息代理集成时使用。 triggers:

  • 事件
  • 消息
  • 消息传递
  • 发布/订阅
  • pubsub
  • 发布/订阅
  • kafka
  • rabbitmq
  • sqs
  • sns
  • nats
  • pulsar
  • 事件溯源
  • CQRS
  • saga
  • 编舞
  • 编排
  • 事件存储
  • 领域事件
  • 集成事件
  • 消息队列
  • 消息代理
  • 事件总线
  • 数据流
  • 流处理
  • 事件驱动

事件驱动架构

概述

事件驱动架构(EDA)通过事件而非直接调用来实现松散耦合、可扩展的系统。本技能涵盖消息队列、发布/订阅模式、事件溯源、CQRS、使用Saga进行分布式事务管理以及使用Kafka进行数据流处理。

可用代理

  • 高级软件工程师 (Opus) - 架构设计、模式选择、分布式系统设计
  • 软件工程师 (Sonnet) - 事件处理程序实现、消费者/生产者代码
  • 高级软件工程师 (Opus) - 事件安全、授权模式、消息加密
  • 高级软件工程师 (Opus) - 消息代理设置、Kafka集群、队列配置

关键概念

消息队列

RabbitMQ实现:

import amqp, { Channel, Connection } from "amqplib";

interface QueueConfig {
  name: string;
  durable: boolean;
  deadLetterExchange?: string;
  messageTtl?: number;
  maxRetries?: number;
}

class RabbitMQClient {
  private connection: Connection | null = null;
  private channel: Channel | null = null;

  async connect(url: string): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();

    // 处理连接错误
    this.connection.on("error", (err) => {
      console.error("RabbitMQ连接错误:", err);
      this.reconnect(url);
    });
  }

  async setupQueue(config: QueueConfig): Promise<void> {
    if (!this.channel) throw new Error("未连接");

    const options: amqp.Options.AssertQueue = {
      durable: config.durable,
      arguments: {},
    };

    if (config.deadLetterExchange) {
      options.arguments!["x-dead-letter-exchange"] = config.deadLetterExchange;
    }
    if (config.messageTtl) {
      options.arguments!["x-message-ttl"] = config.messageTtl;
    }

    await this.channel.assertQueue(config.name, options);
  }

  async publish(
    queue: string,
    message: unknown,
    options?: PublishOptions,
  ): Promise<void> {
    if (!this.channel) throw new Error("未连接");

    const content = Buffer.from(JSON.stringify(message));
    const publishOptions: amqp.Options.Publish = {
      persistent: true,
      messageId: options?.messageId || crypto.randomUUID(),
      timestamp: Date.now(),
      headers: options?.headers,
    };

    this.channel.sendToQueue(queue, content, publishOptions);
  }

  async consume<T>(
    queue: string,
    handler: (
      message: T,
      ack: () => void,
      nack: (requeue?: boolean) => void,
    ) => Promise<void>,
    options?: ConsumeOptions,
  ): Promise<void> {
    if (!this.channel) throw new Error("未连接");

    await this.channel.prefetch(options?.prefetch || 10);

    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        const content: T = JSON.parse(msg.content.toString());
        const retryCount =
          (msg.properties.headers?.["x-retry-count"] as number) || 0;

        await handler(
          content,
          () => this.channel!.ack(msg),
          (requeue = false) => {
            if (requeue && retryCount < (options?.maxRetries || 3)) {
              // 重试并增加重试计数
              this.channel!.nack(msg, false, false);
              this.publish(queue, content, {
                headers: { "x-retry-count": retryCount + 1 },
              });
            } else {
              this.channel!.nack(msg, false, false); // 发送到死信队列
            }
          },
        );
      } catch (error) {
        console.error("消息处理错误:", error);
        this.channel!.nack(msg, false, false);
      }
    });
  }
}

AWS SQS实现:

import {
  SQSClient,
  SendMessageCommand,
  ReceiveMessageCommand,
  DeleteMessageCommand,
} from "@aws-sdk/client-sqs";

interface SQSMessage<T> {
  id: string;
  body: T;
  receiptHandle: string;
  approximateReceiveCount: number;
}

class SQSQueue<T> {
  private client: SQSClient;
  private queueUrl: string;

  constructor(queueUrl: string, region: string = "us-east-1") {
    this.client = new SQSClient({ region });
    this.queueUrl = queueUrl;
  }

  async send(
    message: T,
    options?: { delaySeconds?: number; deduplicationId?: string },
  ): Promise<string> {
    const command = new SendMessageCommand({
      QueueUrl: this.queueUrl,
      MessageBody: JSON.stringify(message),
      DelaySeconds: options?.delaySeconds,
      MessageDeduplicationId: options?.deduplicationId,
      MessageGroupId: options?.deduplicationId ? "default" : undefined,
    });

    const response = await this.client.send(command);
    return response.MessageId!;
  }

  async receive(
    maxMessages: number = 10,
    waitTimeSeconds: number = 20,
  ): Promise<SQSMessage<T>[]> {
    const command = new ReceiveMessageCommand({
      QueueUrl: this.queueUrl,
      MaxNumberOfMessages: maxMessages,
      WaitTimeSeconds: waitTimeSeconds,
      AttributeNames: ["ApproximateReceiveCount"],
    });

    const response = await this.client.send(command);

    return (response.Messages || []).map((msg) => ({
      id: msg.MessageId!,
      body: JSON.parse(msg.Body!) as T,
      receiptHandle: msg.ReceiptHandle!,
      approximateReceiveCount: parseInt(
        msg.Attributes?.ApproximateReceiveCount || "1",
      ),
    }));
  }

  async delete(receiptHandle: string): Promise<void> {
    const command = new DeleteMessageCommand({
      QueueUrl: this.queueUrl,
      ReceiptHandle: receiptHandle,
    });
    await this.client.send(command);
  }

  async processMessages(
    handler: (message: T) => Promise<void>,
    options?: { maxRetries?: number; pollInterval?: number },
  ): Promise<void> {
    const maxRetries = options?.maxRetries || 3;

    while (true) {
      const messages = await this.receive();

      await Promise.all(
        messages.map(async (msg) => {
          try {
            await handler(msg.body);
            await this.delete(msg.receiptHandle);
          } catch (error) {
            console.error(`处理消息 ${msg.id} 错误:`, error);

            if (msg.approximateReceiveCount >= maxRetries) {
              // 消息将在可见性超时后进入死信队列
              console.warn(`消息 ${msg.id} 超过最大重试次数`);
            }
            // 不删除 - 将在可见性超时后重新处理
          }
        }),
      );

      if (messages.length === 0 && options?.pollInterval) {
        await new Promise((r) => setTimeout(r, options.pollInterval));
      }
    }
  }
}

发布/订阅模式

Kafka实现:

import { Kafka, Producer, Consumer, EachMessagePayload } from "kafkajs";

interface Event<T = unknown> {
  id: string;
  type: string;
  timestamp: Date;
  source: string;
  data: T;
  metadata?: Record<string, string>;
}

class KafkaEventBus {
  private kafka: Kafka;
  private producer: Producer | null = null;
  private consumers: Map<string, Consumer> = new Map();

  constructor(config: { brokers: string[]; clientId: string }) {
    this.kafka = new Kafka({
      clientId: config.clientId,
      brokers: config.brokers,
    });
  }

  async connect(): Promise<void> {
    this.producer = this.kafka.producer({
      idempotent: true,
      maxInFlightRequests: 5,
    });
    await this.producer.connect();
  }

  async publish<T>(
    topic: string,
    event: Omit<Event<T>, "id" | "timestamp">,
  ): Promise<void> {
    if (!this.producer) throw new Error("生产者未连接");

    const fullEvent: Event<T> = {
      ...event,
      id: crypto.randomUUID(),
      timestamp: new Date(),
    };

    await this.producer.send({
      topic,
      messages: [
        {
          key:
            event.data && typeof event.data === "object" && "id" in event.data
              ? String((event.data as { id: unknown }).id)
              : fullEvent.id,
          value: JSON.stringify(fullEvent),
          headers: {
            "event-type": event.type,
            "event-source": event.source,
          },
        },
      ],
    });
  }

  async subscribe<T>(
    topics: string[],
    groupId: string,
    handler: (event: Event<T>) => Promise<void>,
    options?: { fromBeginning?: boolean },
  ): Promise<void> {
    const consumer = this.kafka.consumer({ groupId });
    await consumer.connect();

    for (const topic of topics) {
      await consumer.subscribe({
        topic,
        fromBeginning: options?.fromBeginning,
      });
    }

    this.consumers.set(groupId, consumer);

    await consumer.run({
      eachMessage: async ({
        topic,
        partition,
        message,
      }: EachMessagePayload) => {
        try {
          const event: Event<T> = JSON.parse(message.value!.toString());
          await handler(event);
        } catch (error) {
          console.error(
            `处理来自 ${topic}:${partition} 的消息错误:`,
            error,
          );
          throw error; // 将根据消费者配置触发重试
        }
      },
    });
  }

  async disconnect(): Promise<void> {
    await this.producer?.disconnect();
    for (const consumer of this.consumers.values()) {
      await consumer.disconnect();
    }
  }
}

// 使用示例
const eventBus = new KafkaEventBus({
  brokers: ["localhost:9092"],
  clientId: "order-service",
});

await eventBus.connect();

// 发布
await eventBus.publish<OrderCreatedData>("orders", {
  type: "order.created",
  source: "order-service",
  data: { orderId: "123", items: [], total: 99.99 },
});

// 订阅
await eventBus.subscribe<OrderCreatedData>(
  ["orders"],
  "inventory-service",
  async (event) => {
    if (event.type === "order.created") {
      await reserveInventory(event.data);
    }
  },
);

NATS实现:

import {
  connect,
  NatsConnection,
  StringCodec,
  JetStreamManager,
  JetStreamClient,
} from "nats";

class NATSEventBus {
  private nc: NatsConnection | null = null;
  private js: JetStreamClient | null = null;
  private sc = StringCodec();

  async connect(servers: string[]): Promise<void> {
    this.nc = await connect({ servers });

    // 设置JetStream以持久化
    const jsm = await this.nc.jetstreamManager();
    this.js = this.nc.jetstream();

    // 如果不存在则创建流
    try {
      await jsm.streams.add({
        name: "EVENTS",
        subjects: ["events.*"],
        retention: "limits",
        max_msgs: 1000000,
        max_age: 7 * 24 * 60 * 60 * 1000000000, // 7天以纳秒为单位
      });
    } catch (e) {
      // 流可能已存在
    }
  }

  async publish(subject: string, data: unknown): Promise<void> {
    if (!this.js) throw new Error("未连接");

    await this.js.publish(
      `events.${subject}`,
      this.sc.encode(JSON.stringify(data)),
    );
  }

  async subscribe(
    subject: string,
    durableName: string,
    handler: (data: unknown) => Promise<void>,
  ): Promise<void> {
    if (!this.js) throw new Error("未连接");

    const consumer = await this.js.consumers
      .get("EVENTS", durableName)
      .catch(async () => {
        // 如果不存在则创建消费者
        const jsm = await this.nc!.jetstreamManager();
        await jsm.consumers.add("EVENTS", {
          durable_name: durableName,
          filter_subject: `events.${subject}`,
          ack_policy: "explicit",
          max_deliver: 3,
        });
        return this.js!.consumers.get("EVENTS", durableName);
      });

    const messages = await consumer.consume();

    for await (const msg of messages) {
      try {
        const data = JSON.parse(this.sc.decode(msg.data));
        await handler(data);
        msg.ack();
      } catch (error) {
        console.error("处理消息错误:", error);
        msg.nak();
      }
    }
  }
}

事件溯源

interface DomainEvent {
  id: string;
  aggregateId: string;
  aggregateType: string;
  type: string;
  version: number;
  timestamp: Date;
  data: unknown;
  metadata: {
    userId?: string;
    correlationId?: string;
    causationId?: string;
  };
}

interface EventStore {
  append(events: DomainEvent[]): Promise<void>;
  getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]>;
  getEventsByType(type: string, fromTimestamp?: Date): Promise<DomainEvent[]>;
}

// PostgreSQL事件存储
class PostgresEventStore implements EventStore {
  constructor(private pool: Pool) {}

  async append(events: DomainEvent[]): Promise<void> {
    const client = await this.pool.connect();

    try {
      await client.query("BEGIN");

      for (const event of events) {
        // 乐观并发检查
        const { rows } = await client.query(
          "SELECT MAX(version) as max_version FROM events WHERE aggregate_id = $1",
          [event.aggregateId],
        );

        const currentVersion = rows[0]?.max_version || 0;
        if (event.version !== currentVersion + 1) {
          throw new ConcurrencyError(
            `预期版本 ${currentVersion + 1}, 但得到 ${event.version}`,
          );
        }

        await client.query(
          `INSERT INTO events (id, aggregate_id, aggregate_type, type, version, timestamp, data, metadata)
           VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
          [
            event.id,
            event.aggregateId,
            event.aggregateType,
            event.type,
            event.version,
            event.timestamp,
            JSON.stringify(event.data),
            JSON.stringify(event.metadata),
          ],
        );
      }

      await client.query("COMMIT");

      // 发布到事件总线以进行投影
      for (const event of events) {
        await this.eventBus.publish(event);
      }
    } catch (error) {
      await client.query("ROLLBACK");
      throw error;
    } finally {
      client.release();
    }
  }

  async getEvents(
    aggregateId: string,
    fromVersion: number = 0,
  ): Promise<DomainEvent[]> {
    const { rows } = await this.pool.query(
      `SELECT * FROM events
       WHERE aggregate_id = $1 AND version > $2
       ORDER BY version ASC`,
      [aggregateId, fromVersion],
    );

    return rows.map(this.rowToEvent);
  }
}

// 聚合基类
abstract class Aggregate {
  private _id: string;
  private _version: number = 0;
  private _uncommittedEvents: DomainEvent[] = [];

  get id(): string {
    return this._id;
  }
  get version(): number {
    return this._version;
  }

  constructor(id: string) {
    this._id = id;
  }

  protected apply(
    event: Omit<
      DomainEvent,
      "id" | "aggregateId" | "aggregateType" | "version" | "timestamp"
    >,
  ): void {
    const domainEvent: DomainEvent = {
      ...event,
      id: crypto.randomUUID(),
      aggregateId: this._id,
      aggregateType: this.constructor.name,
      version: this._version + 1,
      timestamp: new Date(),
    };

    this.when(domainEvent);
    this._version = domainEvent.version;
    this._uncommittedEvents.push(domainEvent);
  }

  protected abstract when(event: DomainEvent): void;

  loadFromHistory(events: DomainEvent[]): void {
    for (const event of events) {
      this.when(event);
      this._version = event.version;
    }
  }

  getUncommittedEvents(): DomainEvent[] {
    return [...this._uncommittedEvents];
  }

  clearUncommittedEvents(): void {
    this._uncommittedEvents = [];
    }
}

// 示例:订单聚合
class Order extends Aggregate {
  private status:
    | "pending"
    | "confirmed"
    | "shipped"
    | "delivered"
    | "cancelled" = "pending";
  private items: OrderItem[] = [];
  private total: number = 0;

  static create(id: string, customerId: string, items: OrderItem[]): Order {
    const order = new Order(id);
    order.apply({
      type: "OrderCreated",
      data: { customerId, items },
      metadata: {},
    });
    return order;
  }

  confirm(): void {
    if (this.status !== "pending") {
      throw new Error("只能确认待处理的订单");
    }
    this.apply({
      type: "OrderConfirmed",
      data: { confirmedAt: new Date() },
      metadata: {},
    });
  }

  cancel(reason: string): void {
    if (["shipped", "delivered", "cancelled"].includes(this.status)) {
      throw new Error("在当前状态下无法取消订单");
    }
    this.apply({
      type: "OrderCancelled",
      data: { reason, cancelledAt: new Date() },
      metadata: {},
    });
  }

  protected when(event: DomainEvent): void {
    switch (event.type) {
      case "OrderCreated":
        const data = event.data as { items: OrderItem[] };
        this.items = data.items;
        this.total = data.items.reduce(
          (sum, item) => sum + item.price * item.quantity,
          0,
        );
        this.status = "pending";
        break;
      case "OrderConfirmed":
        this.status = "confirmed";
        break;
      case "OrderCancelled":
        this.status = "cancelled";
        break;
    }
  }
}

CQRS(命令查询责任分离)

// 命令
interface Command {
  type: string;
  payload: unknown;
  metadata: {
    userId: string;
    correlationId: string;
    timestamp: Date;
  };
}

interface CommandHandler<T extends Command> {
  handle(command: T): Promise<void>;
}

// 命令总线
class CommandBus {
  private handlers: Map<string, CommandHandler<Command>> = new Map();

  register<T extends Command>(type: string, handler: CommandHandler<T>): void {
    this.handlers.set(type, handler as CommandHandler<Command>);
  }

  async dispatch(command: Command): Promise<void> {
    const handler = this.handlers.get(command.type);
    if (!handler) {
      throw new Error(`未注册命令处理程序: ${command.type}`);
    }
    await handler.handle(command);
  }
}

// 查询
interface Query<TResult> {
  type: string;
  params: unknown;
}

interface QueryHandler<TQuery extends Query<TResult>, TResult> {
  handle(query: TQuery): Promise<TResult>;
}

// 查询总线
class QueryBus {
  private handlers: Map<string, QueryHandler<Query<unknown>, unknown>> =
    new Map();

  register<TQuery extends Query<TResult>, TResult>(
    type: string,
    handler: QueryHandler<TQuery, TResult>,
  ): void {
    this.handlers.set(type, handler as QueryHandler<Query<unknown>, unknown>);
  }

  async execute<TResult>(query: Query<TResult>): Promise<TResult> {
    const handler = this.handlers.get(query.type);
    if (!handler) {
      throw new Error(`未注册查询处理程序: ${query.type}`);
    }
    return handler.handle(query) as Promise<TResult>;
  }
}

// 读模型(投影)
interface OrderReadModel {
  id: string;
  customerId: string;
  customerName: string;
  status: string;
  items: Array<{
    productId: string;
    productName: string;
    quantity: number;
    price: number;
  }>;
  total: number;
  createdAt: Date;
  updatedAt: Date;
}

class OrderProjection {
  constructor(
    private db: Database,
    private eventBus: EventBus,
  ) {
    this.setupSubscriptions();
  }

  private setupSubscriptions(): void {
    this.eventBus.subscribe("OrderCreated", this.onOrderCreated.bind(this));
    this.eventBus.subscribe("OrderConfirmed", this.onOrderConfirmed.bind(this));
    this.eventBus.subscribe("OrderCancelled", this.onOrderCancelled.bind(this));
  }

  private async onOrderCreated(event: DomainEvent): Promise<void> {
    const data = event.data as OrderCreatedData;

    // 用客户数据丰富
    const customer = await this.db.customers.findById(data.customerId);

    // 用产品数据丰富
    const items = await Promise.all(
      data.items.map(async (item) => {
        const product = await this.db.products.findById(item.productId);
        return {
          ...item,
          productName: product.name,
        };
      }),
    );

    await this.db.orderReadModels.create({
      id: event.aggregateId,
      customerId: data.customerId,
      customerName: customer.name,
      status: "pending",
      items,
      total: items.reduce((sum, i) => sum + i.price * i.quantity, 0),
      createdAt: event.timestamp,
      updatedAt: event.timestamp,
    });
  }

  private async onOrderConfirmed(event: DomainEvent): Promise<void> {
    await this.db.orderReadModels.update(event.aggregateId, {
      status: "confirmed",
      updatedAt: event.timestamp,
    });
  }

  private async onOrderCancelled(event: DomainEvent): Promise<void> {
    await this.db.orderReadModels.update(event.aggregateId, {
      status: "cancelled",
      updatedAt: event.timestamp,
    });
  }
}

Saga模式用于分布式事务

interface SagaStep<TData> {
  name: string;
  execute: (data: TData) => Promise<void>;
  compensate: (data: TData) => Promise<void>;
}

interface SagaDefinition<TData> {
  name: string;
  steps: SagaStep<TData>[];
}

interface SagaInstance {
  id: string;
  sagaName: string;
  data: unknown;
  currentStep: number;
  status: "running" | "completed" | "compensating" | "failed";
  completedSteps: string[];
  error?: string;
  startedAt: Date;
  updatedAt: Date;
}

class SagaOrchestrator {
  private sagas: Map<string, SagaDefinition<unknown>> = new Map();
  private store: SagaStore;

  register<TData>(saga: SagaDefinition<TData>): void {
    this.sagas.set(saga.name, saga as SagaDefinition<unknown>);
  }

  async start<TData>(sagaName: string, data: TData): Promise<string> {
    const saga = this.sagas.get(sagaName);
    if (!saga) throw new Error(`未找到Saga: ${sagaName}`);

    const instance: SagaInstance = {
      id: crypto.randomUUID(),
      sagaName,
      data,
      currentStep: 0,
      status: "running",
      completedSteps: [],
      startedAt: new Date(),
      updatedAt: new Date(),
    };

    await this.store.save(instance);
    await this.executeNextStep(instance, saga);

    return instance.id;
  }

  private async executeNextStep(
    instance: SagaInstance,
    saga: SagaDefinition<unknown>,
  ): Promise<void> {
    if (instance.currentStep >= saga.steps.length) {
      instance.status = "completed";
      await this.store.save(instance);
      return;
    }

    const step = saga.steps[instance.currentStep];

    try {
      await step.execute(instance.data);

      instance.completedSteps.push(step.name);
      instance.currentStep++;
      instance.updatedAt = new Date();
      await this.store.save(instance);

      await this.executeNextStep(instance, saga);
    } catch (error) {
      instance.status = "compensating";
      instance.error = error instanceof Error ? error.message : String(error);
      await this.store.save(instance);

      await this.compensate(instance, saga);
    }
  }

  private async compensate(
    instance: SagaInstance,
    saga: SagaDefinition<unknown>,
  ): Promise<void> {
    // 以相反顺序执行补偿
    for (let i = instance.completedSteps.length - 1; i >= 0; i--) {
      const stepName = instance.completedSteps[i];
      const step = saga.steps.find((s) => s.name === stepName);

      if (step) {
        try {
          await step.compensate(instance.data);
        } catch (error) {
          console.error(`步骤 ${stepName} 的补偿失败:`, error);
          // 继续其他补偿
        }
      }
    }

    instance.status = "failed";
    instance.updatedAt = new Date();
    await this.store.save(instance);
  }
}

// 示例:订单履行Saga
interface OrderFulfillmentData {
  orderId: string;
  customerId: string;
  items: Array<{ productId: string; quantity: number; price: number }>;
  paymentId?: string;
  shipmentId?: string;
}

const orderFulfillmentSaga: SagaDefinition<OrderFulfillmentData> = {
  name: "order-fulfillment",
  steps: [
    {
      name: "reserve-inventory",
      execute: async (data) => {
        await inventoryService.reserve(data.items);
      },
      compensate: async (data) => {
        await inventoryService.release(data.items);
      },
    },
    {
      name: "process-payment",
      execute: async (data) => {
        const total = data.items.reduce(
          (sum, i) => sum + i.price * i.quantity,
          0,
        );
        const payment = await paymentService.charge(data.customerId, total);
        data.paymentId = payment.id;
      },
      compensate: async (data) => {
        if (data.paymentId) {
          await paymentService.refund(data.paymentId);
        }
      },
    },
    {
      name: "create-shipment",
      execute: async (data) => {
        const shipment = await shippingService.createShipment(
          data.orderId,
          data.items,
        );
        data.shipmentId = shipment.id;
      },
      compensate: async (data) => {
        if (data.shipmentId) {
          await shippingService.cancelShipment(data.shipmentId);
        }
      },
    },
    {
      name: "confirm-order",
      execute: async (data) => {
        await orderService.confirm(data.orderId);
      },
      compensate: async (data) => {
        await orderService.cancel(data.orderId, "Saga补偿");
      },
    },
  ],
};

幂等性和恰好一次交付

interface IdempotencyKey {
  key: string;
  response?: unknown;
  createdAt: Date;
  expiresAt: Date;
}

class IdempotencyService {
  constructor(private redis: Redis) {}

  async process<T>(
    key: string,
    operation: () => Promise<T>,
    ttlSeconds: number = 86400, // 24小时
  ): Promise<T> {
    const lockKey = `idempotency:lock:${key}`;
    const dataKey = `idempotency:data:${key}`;

    // 尝试获取锁
    const locked = await this.redis.set(lockKey, "1", "EX", 30, "NX");

    if (!locked) {
      // 另一个进程正在处理此请求,等待结果
      return this.waitForResult<T>(dataKey);
    }

    try {
      // 检查是否已处理
      const existing = await this.redis.get(dataKey);
      if (existing) {
        return JSON.parse(existing) as T;
      }

      // 执行操作
      const result = await operation();

      // 存储结果
      await this.redis.setex(dataKey, ttlSeconds, JSON.stringify(result));

      return result;
    } finally {
      await this.redis.del(lockKey);
    }
  }

  private async waitForResult<T>(
    dataKey: string,
    maxWaitMs: number = 30000,
  ): Promise<T> {
    const startTime = Date.now();

    while (Date.now() - startTime < maxWaitMs) {
      const data = await this.redis.get(dataKey);
      if (data) {
        return JSON.parse(data) as T;
      }
      await new Promise((r) => setTimeout(r, 100));
    }

    throw new Error("等待幂等操作结果超时");
  }
}

// 消费者消息去重
class DeduplicatingConsumer<T> {
  constructor(
    private redis: Redis,
    private windowSeconds: number = 3600, // 1小时去重窗口
  ) {}

  async process(
    messageId: string,
    handler: () => Promise<T>,
  ): Promise<{ result: T; duplicate: boolean }> {
    const dedupKey = `dedup:${messageId}`;

    // 检查是否已处理
    const existing = await this.redis.get(dedupKey);
    if (existing) {
      return { result: JSON.parse(existing) as T, duplicate: true };
    }

    // 处理消息
    const result = await handler();

    // 标记为已处理
    await this.redis.setex(
      dedupKey,
      this.windowSeconds,
      JSON.stringify(result),
    );

    return { result, duplicate: false };
  }
}

死信队列

interface DeadLetterMessage {
  id: string;
  originalQueue: string;
  originalMessage: unknown;
  error: string;
  failedAt: Date;
  retryCount: number;
  lastRetryAt?: Date;
}

class DeadLetterQueueManager {
  constructor(
    private dlqStore: DLQStore,
    private originalQueue: MessageQueue,
  ) {}

  async moveToDeadLetter(
    message: unknown,
    originalQueue: string,
    error: Error,
    retryCount: number,
  ): Promise<void> {
    const dlqMessage: DeadLetterMessage = {
      id: crypto.randomUUID(),
      originalQueue,
      originalMessage: message,
      error: error.message,
      failedAt: new Date(),
      retryCount,
    };

    await this.dlqStore.save(dlqMessage);

    // 当DLQ增长时发出警报
    const dlqSize = await this.dlqStore.count(originalQueue);
    if (dlqSize > 100) {
      await this.alerting.warn({
        title: "DLQ大小警告",
        message: `${originalQueue} 的死信队列有 ${dlqSize} 条消息`,
      });
    }
  }

  async retry(messageId: string): Promise<void> {
    const dlqMessage = await this.dlqStore.get(messageId);
    if (!dlqMessage) throw new Error("未在DLQ中找到消息");

    try {
      await this.originalQueue.publish(
        dlqMessage.originalQueue,
        dlqMessage.originalMessage,
      );
      await this.dlqStore.delete(messageId);
    } catch (error) {
      dlqMessage.lastRetryAt = new Date();
      dlqMessage.retryCount++;
      await this.dlqStore.save(dlqMessage);
      throw error;
    }
  }

  async retryAll(queue: string): Promise<{ success: number; failed: number }> {
    const messages = await this.dlqStore.getByQueue(queue);
    let success = 0;
    let failed = 0;

    for (const message of messages) {
      try {
        await this.retry(message.id);
        success++;
      } catch {
        failed++;
      }
    }

    return { success, failed };
  }

  async purge(queue: string, olderThan?: Date): Promise<number> {
    return this.dlqStore.deleteByQueue(queue, olderThan);
  }
}

使用Kafka进行数据流处理

流处理:

import { Kafka, CompressionTypes } from "kafkajs";

interface StreamRecord<T> {
  key: string;
  value: T;
  timestamp: number;
  partition: number;
  offset: string;
}

class KafkaStreamProcessor {
  private kafka: Kafka;

  constructor(brokers: string[]) {
    this.kafka = new Kafka({
      clientId: "stream-processor",
      brokers,
    });
  }

  // 有状态流聚合
  async aggregateStream<TInput, TState>(
    inputTopic: string,
    outputTopic: string,
    groupId: string,
    initialState: TState,
    aggregator: (state: TState, record: TInput) => TState,
    windowMs: number = 60000,
  ): Promise<void> {
    const consumer = this.kafka.consumer({ groupId });
    const producer = this.kafka.producer({
      compression: CompressionTypes.GZIP,
    });

    await consumer.connect();
    await producer.connect();
    await consumer.subscribe({ topic: inputTopic });

    const stateByKey = new Map<string, TState>();
    const windowTimers = new Map<string, NodeJS.Timeout>();

    await consumer.run({
      eachMessage: async ({ message }) => {
        const key = message.key?.toString() || "default";
        const value: TInput = JSON.parse(message.value!.toString());

        // 获取或初始化状态
        const currentState = stateByKey.get(key) || initialState;
        const newState = aggregator(currentState, value);
        stateByKey.set(key, newState);

        // 清除现有窗口计时器
        const existingTimer = windowTimers.get(key);
        if (existingTimer) clearTimeout(existingTimer);

        // 设置新窗口计时器以发出聚合状态
        const timer = setTimeout(async () => {
          const finalState = stateByKey.get(key);
          await producer.send({
            topic: outputTopic,
            messages: [
              {
                key,
                value: JSON.stringify(finalState),
                timestamp: Date.now().toString(),
              },
            ],
          });
          stateByKey.delete(key);
          windowTimers.delete(key);
        }, windowMs);

        windowTimers.set(key, timer);
      },
    });
  }

  // 流连接
  async joinStreams<TLeft, TRight, TResult>(
    leftTopic: string,
    rightTopic: string,
    outputTopic: string,
    groupId: string,
    joiner: (left: TLeft, right: TRight) => TResult,
    windowMs: number = 30000,
  ): Promise<void> {
    const consumer = this.kafka.consumer({ groupId });
    const producer = this.kafka.producer();

    await consumer.connect();
    await producer.connect();
    await consumer.subscribe({ topics: [leftTopic, rightTopic] });

    const leftCache = new Map<string, { data: TLeft; timestamp: number }>();
    const rightCache = new Map<string, { data: TRight; timestamp: number }>();

    await consumer.run({
      eachMessage: async ({ topic, message }) => {
        const key = message.key?.toString() || "default";
        const timestamp = parseInt(message.timestamp);
        const now = Date.now();

        // 清理旧条目
        this.cleanOldEntries(leftCache, now, windowMs);
        this.cleanOldEntries(rightCache, now, windowMs);

        if (topic === leftTopic) {
          const leftData: TLeft = JSON.parse(message.value!.toString());
          leftCache.set(key, { data: leftData, timestamp });

          // 尝试与右流连接
          const rightEntry = rightCache.get(key);
          if (
            rightEntry &&
            Math.abs(timestamp - rightEntry.timestamp) <= windowMs
          ) {
            const result = joiner(leftData, rightEntry.data);
            await producer.send({
              topic: outputTopic,
              messages: [{ key, value: JSON.stringify(result) }],
            });
          }
        } else {
          const rightData: TRight = JSON.parse(message.value!.toString());
          rightCache.set(key, { data: rightData, timestamp });

          // 尝试与左流连接
          const leftEntry = leftCache.get(key);
          if (
            leftEntry &&
            Math.abs(timestamp - leftEntry.timestamp) <= windowMs
          ) {
            const result = joiner(leftEntry.data, rightData);
            await producer.send({
              topic: outputTopic,
              messages: [{ key, value: JSON.stringify(result) }],
            });
          }
        }
      },
    });
  }

  private cleanOldEntries<T>(
    cache: Map<string, { data: T; timestamp: number }>,
    now: number,
    windowMs: number,
  ): void {
    for (const [key, entry] of cache.entries()) {
      if (now - entry.timestamp > windowMs) {
        cache.delete(key);
      }
    }
  }
}

// Kafka流式风格操作
class KafkaStream<T> {
  constructor(
    private kafka: Kafka,
    private topic: string,
  ) {}

  // Map转换
  map<R>(mapper: (value: T) => R): KafkaStream<R> {
    const outputTopic = `${this.topic}-mapped`;
    this.processStream(outputTopic, async (record) => ({
      key: record.key,
      value: mapper(record.value),
    }));
    return new KafkaStream<R>(this.kafka, outputTopic);
  }

  // Filter转换
  filter(predicate: (value: T) => boolean): KafkaStream<T> {
    const outputTopic = `${this.topic}-filtered`;
    this.processStream(outputTopic, async (record) =>
      predicate(record.value) ? record : null,
    );
    return new KafkaStream<T>(this.kafka, outputTopic);
  }

  // 按键分组和聚合
  groupBy<K, V>(
    keyExtractor: (value: T) => K,
    aggregator: (key: K, values: T[]) => V,
    windowMs: number = 60000,
  ): KafkaStream<V> {
    const outputTopic = `${this.topic}-grouped`;
    const groups = new Map<string, T[]>();
    const timers = new Map<string, NodeJS.Timeout>();

    this.processStream(outputTopic, async (record, producer) => {
      const key = String(keyExtractor(record.value));
      const values = groups.get(key) || [];
      values.push(record.value);
      groups.set(key, values);

      const existingTimer = timers.get(key);
      if (existingTimer) clearTimeout(existingTimer);

      const timer = setTimeout(async () => {
        const groupValues = groups.get(key) || [];
        const result = aggregator(keyExtractor(record.value), groupValues);
        await producer.send({
          topic: outputTopic,
          messages: [{ key, value: JSON.stringify(result) }],
        });
        groups.delete(key);
        timers.delete(key);
      }, windowMs);

      timers.set(key, timer);

      return null; // 不立即发出
    });

    return new KafkaStream<V>(this.kafka, outputTopic);
  }

  private async processStream(
    outputTopic: string,
    processor: (
      record: StreamRecord<T>,
      producer: any,
    ) => Promise<{ key: string; value: any } | null>,
  ): Promise<void> {
    const consumer = this.kafka.consumer({
      groupId: `${this.topic}-processor`,
    });
    const producer = this.kafka.producer();

    await consumer.connect();
    await producer.connect();
    await consumer.subscribe({ topic: this.topic });

    await consumer.run({
      eachMessage: async ({ message, partition }) => {
        const record: StreamRecord<T> = {
          key: message.key?.toString() || "default",
          value: JSON.parse(message.value!.toString()),
          timestamp: parseInt(message.timestamp),
          partition,
          offset: message.offset,
        };

        const result = await processor(record, producer);
        if (result) {
          await producer.send({
            topic: outputTopic,
            messages: [
              {
                key: result.key,
                value: JSON.stringify(result.value),
              },
            ],
          });
        }
      },
    });
  }
}

事件溯源模式

快照用于性能优化:

interface Snapshot {
  aggregateId: string;
  version: number;
  state: unknown;
  timestamp: Date;
}

class SnapshotStore {
  constructor(private db: Database) {}

  async save(snapshot: Snapshot): Promise<void> {
    await this.db.snapshots.upsert({
      aggregateId: snapshot.aggregateId,
      version: snapshot.version,
      state: JSON.stringify(snapshot.state),
      timestamp: snapshot.timestamp,
    });
  }

  async getLatest(aggregateId: string): Promise<Snapshot | null> {
    const row = await this.db.snapshots.findOne(
      { aggregateId },
      { orderBy: { version: "desc" } },
    );
    return row
      ? {
          aggregateId: row.aggregateId,
          version: row.version,
          state: JSON.parse(row.state),
          timestamp: row.timestamp,
        }
      : null;
  }
}

// 增强的带快照的聚合
abstract class SnapshotAggregate extends Aggregate {
  private static SNAPSHOT_FREQUENCY = 100; // 每100个事件快照一次

  async load(
    eventStore: EventStore,
    snapshotStore: SnapshotStore,
  ): Promise<void> {
    // 首先尝试从快照加载
    const snapshot = await snapshotStore.getLatest(this.id);
    if (snapshot) {
      this.applySnapshot(snapshot.state);
      this._version = snapshot.version;

      // 加载自快照以来的事件
      const events = await eventStore.getEvents(this.id, snapshot.version);
      this.loadFromHistory(events);
    } else {
      // 无快照,加载所有事件
      const events = await eventStore.getEvents(this.id);
      this.loadFromHistory(events);
    }
  }

  async save(
    eventStore: EventStore,
    snapshotStore: SnapshotStore,
  ): Promise<void> {
    const events = this.getUncommittedEvents();
    await eventStore.append(events);
    this.clearUncommittedEvents();

    // 如果需要则创建快照
    if (this.version % SnapshotAggregate.SNAPSHOT_FREQUENCY === 0) {
      await snapshotStore.save({
        aggregateId: this.id,
        version: this.version,
        state: this.createSnapshot(),
        timestamp: new Date(),
      });
    }
  }

  protected abstract createSnapshot(): unknown;
  protected abstract applySnapshot(state: unknown): void;
}

事件升级(模式迁移):

interface EventUpcaster {
  eventType: string;
  fromVersion: number;
  toVersion: number;
  upcast: (event: DomainEvent) => DomainEvent;
}

class EventStoreWithUpcasting implements EventStore {
  private upcasters: Map<string, EventUpcaster[]> = new Map();

  registerUpcaster(upcaster: EventUpcaster): void {
    const existing = this.upcasters.get(upcaster.eventType) || [];
    existing.push(upcaster);
    existing.sort((a, b) => a.fromVersion - b.fromVersion);
    this.upcasters.set(upcaster.eventType, existing);
  }

  async getEvents(
    aggregateId: string,
    fromVersion: number = 0,
  ): Promise<DomainEvent[]> {
    const rawEvents = await this.rawEventStore.getEvents(
      aggregateId,
      fromVersion,
    );

    return rawEvents.map((event) => this.upcastEvent(event));
  }

  private upcastEvent(event: DomainEvent): DomainEvent {
    const upcasters = this.upcasters.get(event.type) || [];
    let currentEvent = event;

    for (const upcaster of upcasters) {
      const eventVersion = (currentEvent.data as any)?.schemaVersion || 1;
      if (eventVersion === upcaster.fromVersion) {
        currentEvent = upcaster.upcast(currentEvent);
      }
    }

    return currentEvent;
  }
}

// 示例升级器
const orderCreatedV1toV2: EventUpcaster = {
  eventType: "OrderCreated",
  fromVersion: 1,
  toVersion: 2,
  upcast: (event) => ({
    ...event,
    data: {
      ...(event.data as any),
      // V2添加了与计费地址分开的送货地址
      shippingAddress: (event.data as any).address,
      billingAddress: (event.data as any).address,
      schemaVersion: 2,
    },
  }),
};

Saga模式

编舞与编排:

// 编舞:服务独立响应事件
class OrderService {
  async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
    // 发布事件,其他服务响应
    await this.eventBus.publish("order.created", {
      orderId: event.orderId,
      customerId: event.customerId,
      items: event.items,
    });
  }
}

class InventoryService {
  constructor(private eventBus: EventBus) {
    // 监听并响应订单事件
    this.eventBus.subscribe("order.created", this.reserveInventory.bind(this));
  }

  private async reserveInventory(event: OrderCreatedEvent): Promise<void> {
    try {
      await this.reserve(event.items);
      // 发布成功事件
      await this.eventBus.publish("inventory.reserved", {
        orderId: event.orderId,
      });
    } catch (error) {
      // 发布失败事件
      await this.eventBus.publish("inventory.reservation-failed", {
        orderId: event.orderId,
        reason: error.message,
      });
    }
  }
}

class PaymentService {
  constructor(private eventBus: EventBus) {
    // 在库存处理后再处理付款
    this.eventBus.subscribe(
      "inventory.reserved",
      this.processPayment.bind(this),
    );
  }

  private async processPayment(event: InventoryReservedEvent): Promise<void> {
    // 处理付款并发布结果...
  }
}

// 编排:中央协调器控制流程
class OrderFulfillmentOrchestrator {
  async fulfillOrder(orderId: string): Promise<void> {
    try {
      // 步骤1:保留库存
      await this.inventoryService.reserve(orderId);

      // 步骤2:处理付款
      await this.paymentService.charge(orderId);

      // 步骤3:创建发货
      await this.shippingService.createShipment(orderId);

      // 步骤4:确认订单
      await this.orderService.confirm(orderId);
    } catch (error) {
      // 显式补偿
      await this.compensate(orderId);
    }
  }

  private async compensate(orderId: string): Promise<void> {
    // 以相反顺序撤销
    await this.shippingService.cancelShipment(orderId);
    await this.paymentService.refund(orderId);
    await this.inventoryService.release(orderId);
    await this.orderService.cancel(orderId);
  }
}

Saga状态机:

type SagaState =
  | "STARTED"
  | "INVENTORY_RESERVED"
  | "PAYMENT_PROCESSED"
  | "SHIPPED"
  | "COMPLETED"
  | "COMPENSATING"
  | "FAILED";

interface SagaStateMachine<TData> {
  state: SagaState;
  data: TData;
  transitions: Map<SagaState, SagaTransition<TData>>;
}

interface SagaTransition<TData> {
  onEnter: (data: TData) => Promise<void>;
  onSuccess: SagaState;
  onFailure: SagaState;
  compensate?: (data: TData) => Promise<void>;
}

class StatefulSagaOrchestrator<TData> {
  async execute(saga: SagaStateMachine<TData>): Promise<void> {
    let currentState = saga.state;

    while (currentState !== "COMPLETED" && currentState !== "FAILED") {
      const transition = saga.transitions.get(currentState);
      if (!transition)
        throw new Error(`无状态转换: ${currentState}`);

      try {
        await transition.onEnter(saga.data);
        currentState = transition.onSuccess;
        saga.state = currentState;
        await this.persistSaga(saga); // 保存状态
      } catch (error) {
        // 补偿
        if (transition.compensate) {
          await transition.compensate(saga.data);
        }
        currentState = transition.onFailure;
        saga.state = currentState;
        await this.persistSaga(saga);
      }
    }
  }

  private async persistSaga(saga: SagaStateMachine<TData>): Promise<void> {
    // 为恢复保存Saga状态
    await this.sagaStore.save({
      id: (saga.data as any).orderId,
      state: saga.state,
      data: saga.data,
      updatedAt: new Date(),
    });
  }
}

最佳实践

  1. 事件设计

    • 事件应不可变并代表事实
    • 使用过去时命名(如OrderCreated,而非CreateOrder)
    • 包含所有必要数据;避免引用可变状态
    • 为模式演化对事件进行版本控制
  2. 幂等性

    • 始终设计消费者为幂等的
    • 使用唯一消息ID进行去重
    • 存储处理状态以处理重试
  3. 错误处理

    • 为失败消息实现死信队列
    • 设置合理的重试限制和指数退避
    • 监控DLQ大小并在增长时发出警报
  4. 排序

    • 在Kafka中使用分区键保证排序
    • 理解至少一次与恰好一次语义
    • 设计时需要处理乱序消息
  5. 监控

    • 跟踪消息延迟、处理时间和错误率
    • 设置消费者延迟警报
    • 监控事件存储增长和查询性能

示例

完整订单处理流程

// 1. API接收订单请求
app.post("/orders", async (req, res) => {
  const command: CreateOrderCommand = {
    type: "CreateOrder",
    payload: req.body,
    metadata: {
      userId: req.user.id,
      correlationId: req.headers["x-correlation-id"] as string,
      timestamp: new Date(),
    },
  };

  await commandBus.dispatch(command);
  res.status(202).json({ message: "订单创建已启动" });
});

// 2. 命令处理器创建聚合并持久化事件
class CreateOrderHandler implements CommandHandler<CreateOrderCommand> {
  async handle(command: CreateOrderCommand): Promise<void> {
    const order = Order.create(
      crypto.randomUUID(),
      command.payload.customerId,
      command.payload.items,
    );

    await this.eventStore.append(order.getUncommittedEvents());
  }
}

// 3. 事件发布到Kafka,投影更新读模型
// 4. Saga编排器启动履行过程
// 5. 每个Saga步骤发布事件以更新投影