Event-DrivenArchitecturewithModernPatterns event-driven-architecture

这项技能提供全面的模式,用于构建现代分布式系统,使用事件溯源、CQRS、Saga模式和Dapr集成。它被设计为框架无关,适用于任何需要强大事件驱动能力领域。

架构设计 0 次安装 0 次浏览 更新于 3/3/2026

事件驱动架构与现代模式

此技能提供了全面的模式,用于实现事件驱动的微服务,使用现代消息系统、分布式运行时和云原生模式。它被设计为框架无关,适用于任何需要事件驱动能力的领域。

何时使用此技能

当您需要:

  • 构建事件驱动的微服务架构
  • 实现 Kafka、RabbitMQ 或云服务的发布/订阅模式
  • 使用 Dapr 实现分布式应用模式
  • 实施实时通知和工作流
  • 构建周期性任务和提醒系统
  • 创建审计跟踪和活动日志
  • 实施分布式状态管理
  • 构建无服务器事件工作流
  • 处理事件溯源和 CQRS 模式

1. 核心事件架构

基础事件模式

# events/core.py
from pydantic import BaseModel, Field, validator
from datetime import datetime
from typing import Optional, Dict, Any, Union, List
from enum import Enum
from abc import ABC, abstractmethod
import uuid
import json

class EventVersion(str, Enum):
    """事件版本控制"""
    V1_0 = "1.0"
    V1_1 = "1.1"
    V2_0 = "2.0"

class EventPriority(str, Enum):
    """事件优先级"""
    LOW = "low"
    NORMAL = "normal"
    HIGH = "high"
    CRITICAL = "critical"

class BaseEvent(BaseModel, ABC):
    """基础事件模式,包含公共字段"""

    # 核心识别
    event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    event_type: str = Field(..., description="事件类型")
    event_version: EventVersion = Field(default=EventVersion.V1_0)

    # 元数据
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    correlation_id: Optional[str] = None
    causation_id: Optional[str] = None  # 导致此事件的事件
    source: str = Field(..., description="源服务/标识")

    # 上下文
    user_id: Optional[str] = None
    tenant_id: Optional[str] = None  # 多租户支持
    session_id: Optional[str] = None

    # 处理元数据
    priority: EventPriority = Field(default=EventPriority.NORMAL)
    retry_count: int = Field(default=0)
    max_retries: int = Field(default=3)
    delay_until: Optional[datetime] = None  # 用于延迟处理

    # 模式版本控制
    schema_version: str = Field(default="1.0")

    class Config:
        # 允许额外字段以实现可扩展性
        extra = "allow"
        # 使用枚举值
        use_enum_values = True

    @validator('event_type')
    def validate_event_type(cls, v):
        """验证事件类型格式"""
        if not v or '.' not in v:
            raise ValueError('event_type 必须以 format: domain.event_name 的格式存在')
        return v.lower()

    @validator('correlation_id')
    def validate_correlation_id(cls, v, values):
        """如果没有提供,则从 causation_id 设置 correlation_id"""
        if not v and 'causation_id' in values:
            return values['causation_id']
        return v

    def to_dict(self) -> Dict[str, Any]:
        """将事件转换为字典以进行序列化"""
        return self.model_dump()

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'BaseEvent':
        """从字典创建事件"""
        return cls(**data)

    def with_context(self, **context) -> 'BaseEvent':
        """向事件添加上下文"""
        for key, value in context.items():
            if hasattr(self, key):
                setattr(self, key, value)
        return self

class DomainEvent(BaseEvent):
    """特定领域的事件"""

    aggregate_id: str = Field(..., description="聚合根 ID")
    aggregate_type: str = Field(..., description="聚合类型")
    event_data: Dict[str, Any] = Field(default_factory=dict)

    @validator('aggregate_type')
    def validate_aggregate_type(cls, v):
        """验证聚合类型"""
        if not v:
            raise ValueError('aggregate_type 是必需的')
        return v.lower()

class IntegrationEvent(BaseEvent):
    """用于跨服务通信的集成事件"""

    target_services: List[str] = Field(default_factory=list)
    routing_key: Optional[str] = None
    message_format: str = Field(default="json")

    @validator('routing_key')
    def validate_routing_key(cls, v, values):
        """如果没有提供,则从 event_type 生成路由键"""
        if not v and 'event_type' in values:
            return values['event_type'].replace('.', '/')
        return v

class CommandEvent(BaseEvent):
    """表示意图的命令事件"""

    command_type: str = Field(..., description="命令类型")
    command_data: Dict[str, Any] = Field(default_factory=dict)
    expected_version: Optional[int] = None  # 用于乐观并发

    @validator('command_type')
    def validate_command_type(cls, v):
        """验证命令类型"""
        if not v.endswith('.command'):
            v = f"{v}.command"
        return v.lower()

class QueryEvent(BaseEvent):
    """用于数据检索的查询事件"""

    query_type: str = Field(..., description="查询类型")
    query_params: Dict[str, Any] = Field(default_factory=dict)
    result_topic: Optional[str] = None  # 发送结果的地方

    @validator('query_type')
    def validate_query_type(cls, v):
        """验证查询类型"""
        if not v.endswith('.query'):
            v = f"{v}.query"
        return v.lower()

事件存储模式

# events/store.py
import asyncio
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any, AsyncIterator
from datetime import datetime, timedelta
import json
import uuid

class EventStore(ABC):
    """抽象事件存储接口"""

    @abstractmethod
    async def save_event(self, event: BaseEvent, stream_id: str) -> None:
        """将事件保存到流中"""
        pass

    @abstractmethod
    async def get_events(
        self,
        stream_id: str,
        from_version: Optional[int] = None,
        to_version: Optional[int] = None,
        limit: Optional[int] = None
    ) -> AsyncIterator[BaseEvent]:
        """从流中获取事件"""
        pass

    @abstractmethod
    async def get_event_by_id(self, event_id: str) -> Optional[BaseEvent]:
        """通过 ID 获取特定事件"""
        pass

    @abstractmethod
    async def get_events_by_type(
        self,
        event_type: str,
        from_timestamp: Optional[datetime] = None,
        to_timestamp: Optional[datetime] = None
    ) -> AsyncIterator[BaseEvent]:
        """通过类型获取事件"""
        pass

    @abstractmethod
    async def get_aggregate_snapshot(
        self,
        aggregate_id: str,
        aggregate_type: str
    ) -> Optional[Dict[str, Any]]:
        """获取聚合快照"""
        pass

    @abstractmethod
    async def save_aggregate_snapshot(
        self,
        aggregate_id: str,
        aggregate_type: str,
        data: Dict[str, Any],
        version: int
    ) -> None:
        """保存聚合快照"""
        pass

class InMemoryEventStore(EventStore):
    """用于测试和开发的内存事件存储"""

    def __init__(self):
        self._events: Dict[str, List[BaseEvent]] = {}
        self._snapshots: Dict[str, Dict[str, Any]] = {}
        self._type_index: Dict[str, List[str]] = {}

    async def save_event(self, event: BaseEvent, stream_id: str) -> None:
        """将事件保存到内存中"""
        if stream_id not in self._events:
            self._events[stream_id] = []

        self._events[stream_id].append(event)

        # 更新类型索引
        if event.event_type not in self._type_index:
            self._type_index[event.event_type] = []
        self._type_index[event.event_type].append(event.event_id)

    async def get_events(
        self,
        stream_id: str,
        from_version: Optional[int] = None,
        to_version: Optional[int] = None,
        limit: Optional[int] = None
    ) -> AsyncIterator[BaseEvent]:
        """从内存中获取事件"""
        events = self._events.get(stream_id, [])

        # 应用过滤器
        if from_version is not None:
            events = events[from_version:]
        if to_version is not None:
            events = events[:to_version]
        if limit is not None:
            events = events[:limit]

        for event in events:
            yield event

    async def get_event_by_id(self, event_id: str) -> Optional[BaseEvent]:
        """从内存中通过 ID 获取事件"""
        for events in self._events.values():
            for event in events:
                if event.event_id == event_id:
                    return event
        return None

    async def get_events_by_type(
        self,
        event_type: str,
        from_timestamp: Optional[datetime] = None,
        to_timestamp: Optional[datetime] = None
    ) -> AsyncIterator[BaseEvent]:
        """从内存中按类型获取事件"""
        event_ids = self._type_index.get(event_type, [])

        for event_id in event_ids:
            event = await self.get_event_by_id(event_id)
            if event:
                # 应用时间过滤器
                if from_timestamp and event.timestamp < from_timestamp:
                    continue
                if to_timestamp and event.timestamp > to_timestamp:
                    continue
                yield event

    async def get_aggregate_snapshot(
        self,
        aggregate_id: str,
        aggregate_type: str
    ) -> Optional[Dict[str, Any]]:
        """从内存中获取快照"""
        snapshot_key = f"{aggregate_type}:{aggregate_id}"
        return self._snapshots.get(snapshot_key)

    async def save_aggregate_snapshot(
        self,
        aggregate_id: str,
        aggregate_type: str,
        data: Dict[str, Any],
        version: int
    ) -> None:
        """将快照保存到内存中"""
        snapshot_key = f"{aggregate_type}:{aggregate_id}"
        self._snapshots[snapshot_key] = {
            "data": data,
            "version": version,
            "timestamp": datetime.utcnow()
        }

# PostgreSQL 事件存储实现
class PostgreSQLEventStore(EventStore):
    """基于 PostgreSQL 的事件存储"""

    def __init__(self, db_pool):
        self.db_pool = db_pool

    async def initialize_schema(self):
        """初始化数据库模式"""
        async with self.db_pool.acquire() as conn:
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS event_store (
                    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                    stream_id VARCHAR(255) NOT NULL,
                    stream_version INTEGER NOT NULL,
                    event_id VARCHAR(255) UNIQUE NOT NULL,
                    event_type VARCHAR(255) NOT NULL,
                    event_data JSONB NOT NULL,
                    metadata JSONB,
                    timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
                    INDEX (stream_id, stream_version),
                    INDEX (event_type),
                    INDEX (timestamp)
                );

                CREATE TABLE IF NOT EXISTS snapshots (
                    aggregate_id VARCHAR(255) NOT NULL,
                    aggregate_type VARCHAR(255) NOT NULL,
                    data JSONB NOT NULL,
                    version INTEGER NOT NULL,
                    timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
                    PRIMARY KEY (aggregate_type, aggregate_id)
                );
            """)

    async def save_event(self, event: BaseEvent, stream_id: str) -> None:
        """将事件保存到 PostgreSQL"""
        async with self.db_pool.acquire() as conn:
            # 获取下一个版本
            result = await conn.fetchval(
                "SELECT COALESCE(MAX(stream_version), 0) + 1 FROM event_store WHERE stream_id = $1",
                stream_id
            )

            await conn.execute(
                """
                INSERT INTO event_store (
                    stream_id, stream_version, event_id, event_type,
                    event_data, metadata, timestamp
                ) VALUES ($1, $2, $3, $4, $5, $6, $7)
                """,