微服务模式 microservices-patterns

这个技能是关于设计微服务架构的模式,包括服务分解、通信、数据管理和弹性模式,用于构建分布式系统、分解单体应用或实施微服务。关键词:微服务、架构设计、事件驱动、分布式系统、服务边界、弹性模式、服务发现、负载均衡、数据库管理、Saga模式、断路器。

微服务 0 次安装 0 次浏览 更新于 3/7/2026

名称: 微服务模式 描述: 使用服务边界、事件驱动通信和弹性模式设计微服务架构。适用于构建分布式系统、分解单体应用或实施微服务。

微服务模式

掌握微服务架构模式,包括服务边界、服务间通信、数据管理和弹性模式,用于构建分布式系统。

何时使用此技能

  • 将单体应用分解为微服务
  • 设计服务边界和契约
  • 实施服务间通信
  • 管理分布式数据和事务
  • 构建弹性分布式系统
  • 实施服务发现和负载均衡
  • 设计事件驱动架构

核心概念

1. 服务分解策略

按业务能力

  • 围绕业务功能组织服务
  • 每个服务拥有其领域
  • 示例:OrderService, PaymentService, InventoryService

按子域(DDD)

  • 核心域、支持子域
  • 有界上下文映射到服务
  • 清晰的拥有权和责任

Strangler Fig 模式

  • 从单体应用逐步提取
  • 新功能作为微服务
  • 代理路由到旧/新系统

2. 通信模式

同步(请求/响应)

  • REST APIs
  • gRPC
  • GraphQL

异步(事件/消息)

  • 事件流(Kafka)
  • 消息队列(RabbitMQ, SQS)
  • 发布/订阅模式

3. 数据管理

每个服务一个数据库

  • 每个服务拥有其数据
  • 没有共享数据库
  • 松散耦合

Saga 模式

  • 分布式事务
  • 补偿操作
  • 最终一致性

4. 弹性模式

断路器

  • 在重复错误时快速失败
  • 防止级联故障

带退避的重试

  • 瞬时故障处理
  • 指数退避

隔板

  • 隔离资源
  • 限制故障影响

服务分解模式

模式 1: 按业务能力

# 订单服务
class OrderService:
    async def create_order(self, order_data: dict) -> Order:
        order = Order.create(order_data)
        await self.event_bus.publish(
            OrderCreatedEvent(order_id=order.id, customer_id=order.customer_id)
        )
        return order

# 支付服务(单独服务)
class PaymentService:
    async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult:
        result = await self.payment_gateway.charge(
            amount=payment_request.amount,
            customer=payment_request.customer_id
        )
        if result.success:
            await self.event_bus.publish(
                PaymentCompletedEvent(order_id=payment_request.order_id)
            )
        return result

# 库存服务(单独服务)
class InventoryService:
    async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult:
        for item in items:
            available = await self.inventory_repo.get_available(item.product_id)
            if available < item.quantity:
                return ReservationResult(success=False, error=f"库存不足")

        reservation = await self.create_reservation(order_id, items)
        await self.event_bus.publish(InventoryReservedEvent(order_id=order_id))
        return ReservationResult(success=True, reservation=reservation)

模式 2: API 网关

from fastapi import FastAPI
import httpx

class APIGateway:
    """所有客户端请求的中央入口点。"""

    def __init__(self):
        self.order_service_url = "http://order-service:8000"
        self.payment_service_url = "http://payment-service:8001"
        self.http_client = httpx.AsyncClient(timeout=5.0)

    @circuit(failure_threshold=5, recovery_timeout=30)
    async def call_order_service(self, path: str, method: str = "GET", **kwargs):
        """使用断路器调用订单服务。"""
        response = await self.http_client.request(
            method, f"{self.order_service_url}{path}", **kwargs
        )
        response.raise_for_status()
        return response.json()

    async def create_order_aggregate(self, order_id: str) -> dict:
        """从多个服务聚合数据。"""
        order, payment, inventory = await asyncio.gather(
            self.call_order_service(f"/orders/{order_id}"),
            self.call_payment_service(f"/payments/order/{order_id}"),
            self.call_inventory_service(f"/reservations/order/{order_id}"),
            return_exceptions=True
        )

        result = {"order": order}
        if not isinstance(payment, Exception):
            result["payment"] = payment
        if not isinstance(inventory, Exception):
            result["inventory"] = inventory
        return result

通信模式

模式 1: 同步 REST 通信

import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

class ServiceClient:
    """带重试和超时的 HTTP 客户端。"""

    def __init__(self, base_url: str):
        self.base_url = base_url
        self.client = httpx.AsyncClient(timeout=httpx.Timeout(5.0, connect=2.0))

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
    async def get(self, path: str, **kwargs):
        """带自动重试的 GET 请求。"""
        response = await self.client.get(f"{self.base_url}{path}", **kwargs)
        response.raise_for_status()
        return response.json()

payment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.get("/payments/123")

模式 2: 异步事件驱动

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json

class EventBus:
    """事件发布和订阅。"""

    async def publish(self, event: DomainEvent):
        """发布事件到 Kafka 主题。"""
        await self.producer.send_and_wait(
            event.event_type,
            value=asdict(event),
            key=event.aggregate_id.encode()
        )

    async def subscribe(self, topic: str, handler: callable):
        """订阅事件。"""
        consumer = AIOKafkaConsumer(topic, bootstrap_servers=self.bootstrap_servers)
        await consumer.start()
        async for message in consumer:
            await handler(message.value)

# 订单服务发布
await event_bus.publish(OrderCreatedEvent(order_id=order.id))

# 库存服务订阅
async def handle_order_created(event_data: dict):
    await reserve_inventory(event_data["order_id"], event_data["items"])

模式 3: Saga 模式(分布式事务)

class OrderFulfillmentSaga:
    """订单履行的编排式 saga。"""

    def __init__(self):
        self.steps = [
            SagaStep("create_order", self.create_order, self.cancel_order),
            SagaStep("reserve_inventory", self.reserve_inventory, self.release_inventory),
            SagaStep("process_payment", self.process_payment, self.refund_payment),
            SagaStep("confirm_order", self.confirm_order, self.cancel_order_confirmation)
        ]

    async def execute(self, order_data: dict) -> SagaResult:
        completed_steps = []
        context = {"order_data": order_data}

        try:
            for step in self.steps:
                result = await step.action(context)
                if not result.success:
                    await self.compensate(completed_steps, context)
                    return SagaResult(status=SagaStatus.FAILED, error=result.error)

                completed_steps.append(step)
                context.update(result.data)

            return SagaResult(status=SagaStatus.COMPLETED, data=context)
        except Exception as e:
            await self.compensate(completed_steps, context)
            return SagaResult(status=SagaStatus.FAILED, error=str(e))

    async def compensate(self, completed_steps: List[SagaStep], context: dict):
        """以相反顺序执行补偿操作。"""
        for step in reversed(completed_steps):
            await step.compensation(context)

弹性模式

断路器模式

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"   # 正常操作
    OPEN = "open"       # 失败,拒绝请求
    HALF_OPEN = "half_open"  # 测试恢复

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 30):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.state = CircuitState.CLOSED
        self.opened_at = None

    async def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpenError("断路器已打开")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED

    def _on_failure(self):
        self.failure_count += 1
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()

breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
result = await breaker.call(payment_client.process_payment, payment_data)

最佳实践

  1. 服务边界: 与业务能力对齐
  2. 每个服务一个数据库: 无共享数据库
  3. API 契约: 版本化,向后兼容
  4. 尽可能异步: 事件优于直接调用
  5. 断路器: 服务失败时快速失败
  6. 分布式追踪: 跨服务跟踪请求
  7. 服务注册表: 动态服务发现
  8. 健康检查: 存活性和就绪性探针

常见陷阱

  • 分布式单体: 服务紧密耦合
  • 聊天式服务: 太多服务间调用
  • 共享数据库: 通过数据紧密耦合
  • 无断路器: 级联故障
  • 全部同步: 紧密耦合,弹性差
  • 过早微服务: 从微服务开始
  • 忽略网络故障: 假设可靠网络
  • 无补偿逻辑: 无法撤销失败事务