API开发模式和最佳实践 api-dev

这项技能提供全面的模式,用于构建现代、高性能的API,包括异步/等待模式、断路器、速率限制、版本控制、全面测试和生产优化技术,适用于不同的框架和语言。

后端开发 0 次安装 0 次浏览 更新于 3/2/2026

API开发模式和最佳实践

这项技能提供了全面的模式,用于在2025年构建现代API,重点关注异步/等待模式、性能优化、安全性、测试和生产就绪配置,这些模式和配置适用于不同的框架和语言。

使用这项技能的时候

当您需要:

  • 设计RESTful或GraphQL API
  • 实现异步/等待模式以获得高性能
  • 添加用于认证、日志记录和验证的中间件
  • 优雅地处理错误,并使用适当的HTTP状态代码
  • 实施速率限制和节流
  • 生成OpenAPI/Swagger文档
  • 设置全面的测试策略
  • 使用缓存和连接池优化API性能
  • 实施API版本控制和向后兼容性
  • 设置监控和可观测性

核心API设计原则

1. 异步/等待模式以提升性能

# patterns/async_patterns.py
import asyncio
import aiohttp
import aioredis
from typing import AsyncGenerator, Optional, List, Dict, Any
from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import wraps
import time

@dataclass
class RequestMetrics:
    """用于监控的请求指标"""
    duration: float
    status_code: int
    endpoint: str
    method: str
    user_id: Optional[str] = None

def with_metrics(func):
    """添加请求指标的装饰器"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        status_code = 200
        endpoint = func.__name__
        method = kwargs.get('method', 'GET')

        try:
            result = await func(*args, **kwargs)
            if hasattr(result, 'status_code'):
                status_code = result.status_code
            return result
        except Exception as e:
            status_code = getattr(e, 'status_code', 500)
            raise
        finally:
            duration = time.time() - start_time
            metrics = RequestMetrics(
                duration=duration,
                status_code=status_code,
                endpoint=endpoint,
                method=method
            )
            # 发送指标到监控系统
            await send_metrics(metrics)

    return wrapper

async def send_metrics(metrics: RequestMetrics):
    """发送指标到监控系统"""
    # 实现取决于您的监控系统
    # 示例:Prometheus, Datadog, 或自定义分析
    pass

class AsyncAPIClient:
    """具有连接池的通用异步API客户端"""

    def __init__(self, base_url: str, timeout: int = 30):
        self.base_url = base_url
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self._session: Optional[aiohttp.ClientSession] = None
        self._session_lock = asyncio.Lock()

    async def _get_session(self) -> aiohttp.ClientSession:
        """获取或创建具有连接池的会话"""
        if self._session is None or self._session.closed:
            async with self._session_lock:
                if self._session is None or self._session.closed:
                    connector = aiohttp.TCPConnector(
                        limit=100,  # 总连接池大小
                        limit_per_host=30,  # 每个主机的连接数
                        force_close=False,
                        enable_cleanup_closed=True
                    )
                    self._session = aiohttp.ClientSession(
                        connector=connector,
                        timeout=self.timeout
                    )
        return self._session

    @with_metrics
    async def get(
        self,
        endpoint: str,
        params: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """带重试逻辑的GET请求"""
        session = await self._get_session()
        url = f"{self.base_url}{endpoint}"

        async with session.get(
            url,
            params=params,
            headers=headers
        ) as response:
            response.raise_for_status()
            return await response.json()

    @with_metrics
    async def post(
        self,
        endpoint: str,
        data: Optional[Dict[str, Any]] = None,
        json: Optional[Dict[str, Any]] = None,
        headers: Optional[Dict[str, str]] = None
    ) -> Dict[str, Any]:
        """带重试逻辑的POST请求"""
        session = await self._get_session()
        url = f"{self.base_url}{endpoint}"

        async with session.post(
            url,
            data=data,
            json=json,
            headers=headers
        ) as response:
            response.raise_for_status()
            return await response.json()

    async def close(self):
        """关闭会话"""
        if self._session:
            await self._session.close()

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

2. 断路器模式

# patterns/circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Callable, Any, Optional
from functools import wraps

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """用于容错的断路器"""

    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        expected_exception: Exception = Exception
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    def __call__(self, func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs):
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise Exception("Circuit breaker is OPEN")

            try:
                result = await func(*args, **kwargs)
                if self.state == CircuitState.HALF_OPEN:
                    self._reset()
                return result
            except self.expected_exception as e:
                self._record_failure()
                raise

        return wrapper

    def _should_attempt_reset(self) -> bool:
        return time.time() - self.last_failure_time >= self.timeout

    def _record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

    def _reset(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

# 使用示例
@circuit_breaker(failure_threshold=3, timeout=30)
async def external_api_call():
    """带有断路器的外部API调用"""
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com") as response:
            return await response.json()

3. 使用Redis的速率限制

# patterns/rate_limiting.py
import asyncio
import aioredis
import time
from typing import Optional
from fastapi import HTTPException, Request
from starlette.middleware.base import BaseHTTPMiddleware

class RateLimiter:
    """使用Redis滑动窗口的速率限制器"""

    def __init__(
        self,
        redis_url: str,
        requests_per_minute: int = 100,
        window_size: int = 60
    ):
        self.redis = None
        self.redis_url = redis_url
        self.requests_per_minute = requests_per_minute
        self.window_size = window_size

    async def initialize(self):
        """初始化Redis连接"""
        self.redis = await aioredis.from_url(self.redis_url)

    async def is_allowed(
        self,
        key: str,
        limit: Optional[int] = None,
        window: Optional[int] = None
    ) -> bool:
        """检查请求是否被允许"""
        if not self.redis:
            await self.initialize()

        limit = limit or self.requests_per_minute
        window = window or self.window_size
        current_time = time.time()

        # 从滑动窗口中移除旧的请求
        await self.redis.zremrangebyscore(
            key,
            0,
            current_time - window
        )

        # 计算窗口中的当前请求数
        current_requests = await self.redis.zcard(key)

        if current_requests >= limit:
            return False

        # 将当前请求添加到窗口
        await self.redis.zadd(key, {str(current_time): current_time})
        await self.redis.expire(key, window)

        return True

class RateLimitMiddleware(BaseHTTPMiddleware):
    """FastAPI速率限制中间件"""

    def __init__(
        self,
        app,
        redis_url: str,
        requests_per_minute: int = 100,
        identifier_func=None
    ):
        super().__init__(app)
        self.limiter = RateLimiter(redis_url, requests_per_minute)
        self.identifier_func = identifier_func or self._default_identifier

    def _default_identifier(self, request: Request) -> str:
        """默认标识符函数"""
        # 使用IP地址作为标识符
        return request.client.host

    async def dispatch(self, request: Request, call_next):
        identifier = self.identifier_func(request)
        key = f"rate_limit:{identifier}:{request.url.path}"

        if not await self.limiter.is_allowed(key):
            raise HTTPException(
                status_code=429,
                detail="Rate limit exceeded"
            )

        return await call_next(request)

4. API版本控制策略

# patterns/versioning.py
from enum import Enum
from typing import Optional, Dict, Any, Type
from abc import ABC, abstractmethod

class APIVersion(str, Enum):
    V1 = "v1"
    V2 = "v2"
    V3 = "v3"

class APIVersionStrategy(ABC):
    """API版本控制策略基类"""

    @abstractmethod
    def get_version_from_request(self, request) -> Optional[APIVersion]:
        """从请求中提取版本"""
        pass

class HeaderVersionStrategy(APIVersionStrategy):
    """从头部提取版本策略"""

    def __init__(self, header_name: str = "API-Version"):
        self.header_name = header_name

    def get_version_from_request(self, request) -> Optional[APIVersion]:
        version = request.headers.get(self.header_name)
        return APIVersion(version) if version in APIVersion.__members__ else None

class URLPathVersionStrategy(APIVersionStrategy):
    """从URL路径提取版本策略"""

    def __init__(self, prefix: str = "/api"):
        self.prefix = prefix

    def get_version_from_request(self, request) -> Optional[APIVersion]:
        path = request.url.path
        if not path.startswith(self.prefix):
            return None

        version_part = path[len(self.prefix):].split("/")[1]
        return APIVersion(version_part) if version_part in APIVersion.__members__ else None

class QueryParamVersionStrategy(APIVersionStrategy):
    """从查询参数提取版本策略"""

    def __init__(self, param_name: str = "version"):
        self.param_name = param_name

    def get_version_from_request(self, request) -> Optional[APIVersion]:
        version = request.query_params.get(self.param_name)
        return APIVersion(version) if version in APIVersion.__members__ else None

class CompositeVersionStrategy(APIVersionStrategy):
    """尝试多种策略的复合版本策略"""

    def __init__(self, strategies: list[APIVersionStrategy]):
        self.strategies = strategies
        self.default_version = APIVersion.V1

    def get_version_from_request(self, request) -> Optional[APIVersion]:
        for strategy in self.strategies:
            version = strategy.get_version_from_request(request)
            if version:
                return version
        return self.default_version

# 版本特定的处理器
class APIHandlerRegistry:
    """版本特定API处理器注册表"""

    def __init__(self):
        self.handlers: Dict[APIVersion, Dict[str, Any]] = {
            version: {} for version in APIVersion
        }

    def register_handler(
        self,
        version: APIVersion,
        endpoint: str,
        handler: Any
    ):
        """为特定版本注册处理器"""
        if version not in self.handlers:
            self.handlers[version] = {}
        self.handlers[version][endpoint] = handler

    def get_handler(self, version: APIVersion, endpoint: str) -> Optional[Any]:
        """获取版本和端点的处理器"""
        return self.handlers.get(version, {}).get(endpoint)

# 使用示例
version_strategy = CompositeVersionStrategy([
    HeaderVersionStrategy(),
    URLPathVersionStrategy(),
    QueryParamVersionStrategy()
])

5. OpenAPI文档增强

# patterns/openapi.py
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum

class ErrorSchema(BaseModel):
    """标准错误响应模式"""
    error: str = Field(..., description="错误类型")
    message: str = Field(..., description="错误信息")
    details: Optional[Dict[str, Any]] = Field(
        None,
        description="额外的错误详情"
    )
    timestamp: datetime = Field(
        default_factory=datetime.utcnow,
        description="错误时间戳"
    )

class PaginationLinks(BaseModel):
    """分页链接模式"""
    first: Optional[str] = Field(None, description="第一页链接")
    last: Optional[str] = Field(None, description="最后一页链接")
    next: Optional[str] = Field(None, description="下一页链接")
    prev: Optional[str] = Field(None, description="上一页链接")

class PaginationMeta(BaseModel):
    """分页元数据模式"""
    total: int = Field(..., description="总项目数")
    page: int = Field(..., description="当前页码")
    per_page: int = Field(..., description="每页项目数")
    pages: int = Field(..., description="总页数")

class PaginatedResponse(BaseModel):
    """通用分页响应模式"""
    data: List[Any] = Field(..., description="响应数据")
    meta: PaginationMeta = Field(..., description="分页元数据")
    links: PaginationLinks = Field(..., description="分页链接")

# 增强的OpenAPI配置
OPENAPI_CONFIG = {
    "title": "现代API",
    "description": "具有异步模式的现代REST API",
    "version": "1.0.0",
    "docs_url": "/docs",
    "redoc_url": "/redoc",
    "openapi_url": "/openapi.json",
    "servers": [
        {
            "url": "https://api.example.com/v1",
            "description": "生产服务器"
        },
        {
            "url": "https://staging-api.example.com/v1",
            "description": "暂存服务器"
        }
    ],
    "components": {
        "schemas": {
            "Error": ErrorSchema.model_json_schema(),
            "PaginatedResponse": PaginatedResponse.model_json_schema()
        },
        "responses": {
            "ValidationError": {
                "description": "验证错误",
                "content": {
                    "application/json": {
                        "schema": ErrorSchema.model_json_schema()
                    }
                }
            },
            "UnauthorizedError": {
                "description": "未授权错误",
                "content": {
                    "application/json": {
                        "schema": ErrorSchema.model_json_schema()
                    }
                }
            },
            "NotFoundError": {
                "description": "资源未找到",
                "content": {
                    "application/json": {
                        "schema": ErrorSchema.model_json_schema()
                    }
                }
            },
            "RateLimitError": {
                "description": "速率限制超出",
                "content": {
                    "application/json": {
                        "schema": ErrorSchema.model_json_schema()
                    }
                }
            }
        }
    }
}

6. API测试模式

# patterns/testing.py
import pytest
import asyncio
from typing import AsyncGenerator
import httpx
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, patch

class AsyncAPITestCase:
    """异步API测试基类"""

    @pytest.fixture(scope="class")
    async def async_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
        """创建异步测试客户端"""
        async with httpx.AsyncClient(
            app=self.app,
            base_url="http://test"
        ) as client:
            yield client

    @pytest.fixture
    def mock_external_service(self):
        """模拟外部服务"""
        with patch("external_api_client.ExternalAPIClient") as mock:
            client = mock.return_value
            client.get.return_value = {"status": "ok"}
            yield client

# 示例测试用例
@pytest.mark.asyncio
class TestUserAPI(AsyncAPITestCase):
    """测试用户API端点"""

    async def test_create_user(self, async_client: httpx.AsyncClient):
        """测试用户创建"""
        user_data = {
            "email": "test@example.com",
            "username": "testuser",
            "password": "securepassword123"
        }

        response = await async_client.post(
            "/api/users",
            json=user_data
        )

        assert response.status_code == 201
        data = response.json()
        assert data["email"] == user_data["email"]
        assert data["username"] == user_data["username"]
        assert "password" not in data

    async def test_get_user(self, async_client: httpx.AsyncClient):
        """测试获取用户"""
        # 首先创建一个用户
        create_response = await async_client.post(
            "/api/users",
            json={
                "email": "test2@example.com",
                "username": "testuser2",
                "password": "securepassword123"
            }
        )
        user_id = create_response.json()["id"]

        # 获取用户
        response = await async_client.get(f"/api/users/{user_id}")
        assert response.status_code == 200
        data = response.json()
        assert data["id"] == user_id

    async def test_list_users_with_pagination(
        self,
        async_client: httpx.AsyncClient
    ):
        """测试分页用户列表"""
        response = await async_client.get("/api/users?page=1&per_page=10")
        assert response.status_code == 200
        data = response.json()
        assert "data" in data
        assert "meta" in data
        assert "links" in data
        assert data["meta"]["page"] == 1
        assert data["meta"]["per_page"] == 10

    async def test_rate_limiting(
        self,
        async_client: httpx.AsyncClient
    ):
        """测试速率限制"""
        # 快速多次请求
        responses = []
        for _ in range(150):  # 假设速率限制是100/分钟
            response = await async_client.get("/api/users")
            responses.append(response)

        # 检查是否触发速率限制
        rate_limited = any(
            r.status_code == 429 for r in responses
        )
        assert rate_limited

# 集成测试
@pytest.mark.asyncio
async def test_full_user_flow():
    """测试完整的用户工作流"""
    async with httpx.AsyncClient(
        app=app,
        base_url="http://test"
    ) as client:

        # 创建用户
        user_data = {
            "email": "flowtest@example.com",
            "username": "flowtest",
            "password": "securepassword123"
        }
        create_resp = await client.post("/api/users", json=user_data)
        assert create_resp.status_code == 201
        user = create_resp.json()
        user_id = user["id"]

        # 获取用户
        get_resp = await client.get(f"/api/users/{user_id}")
        assert get_resp.status_code == 200
        assert get_resp.json()["id"] == user_id

        # 更新用户
        update_data = {"username": "updateduser"}
        update_resp = await client.patch(
            f"/api/users/{user_id}",
            json=update_data
        )
        assert update_resp.status_code == 200
        assert update_resp.json()["username"] == "updateduser"

        # 删除用户
        delete_resp = await client.delete(f"/api/users/{user_id}")
        assert delete_resp.status_code == 204

        # 验证用户已删除
        get_resp = await client.get(f"/api/users/{user_id}")
        assert get_resp.status_code == 404

7. 性能优化模式

# patterns/performance.py
import asyncio
import asyncio.cache
import aioredis
from typing import Any, Optional, Callable
from functools import wraps
import hashlib
import json
import pickle

class ResponseCache:
    """使用Redis的响应缓存"""

    def __init__(self, redis_url: str, default_ttl: int = 300):
        self.redis = None
        self.redis_url = redis_url
        self.default_ttl = default_ttl

    async def initialize(self):
        """初始化Redis连接"""
        self.redis = await aioredis.from_url(self.redis_url)

    def _make_cache_key(self, prefix: str, *args, **kwargs) -> str:
        """根据参数生成缓存键"""
        key_data = {
            "args": args,
            "kwargs": kwargs
        }
        key_str = json.dumps(key_data, sort_keys=True, default=str)
        key_hash = hashlib.sha256(key_str.encode()).hexdigest()
        return f"{prefix}:{key_hash}"

    async def get(self, key: str) -> Optional[Any]:
        """获取缓存值"""
        if not self.redis:
            await self.initialize()

        cached = await self.redis.get(key)
        if cached:
            return pickle.loads(cached)
        return None

    async def set(
        self,
        key: str,
        value: Any,
        ttl: Optional[int] = None
    ):
        """设置缓存值"""
        if not self.redis:
            await self.initialize()

        ttl = ttl or self.default_ttl
        serialized = pickle.dumps(value)
        await self.redis.setex(key, ttl, serialized)

    async def delete(self, key: str):
        """删除缓存值"""
        if self.redis:
            await self.redis.delete(key)

def cache_response(
    prefix: str,
    ttl: Optional[int] = None,
    cache: Optional[ResponseCache] = None
):
    """缓存响应的装饰器"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs):
            cache_instance = cache or ResponseCache("redis://localhost")

            # 生成缓存键
            cache_key = cache_instance._make_cache_key(prefix, *args, **kwargs)

            # 尝试从缓存中获取
            cached_result = await cache_instance.get(cache_key)
            if cached_result is not None:
                return cached_result

            # 执行函数
            result = await func(*args, **kwargs)

            # 缓存结果
            await cache_instance.set(cache_key, result, ttl)

            return result

        return wrapper
    return decorator

# 使用示例
@cache_response(prefix="user_data", ttl=300)
async def get_user_data(user_id: int) -> Dict[str, Any]:
    """带缓存获取用户数据"""
    # 昂贵的数据库操作或API调用
    await asyncio.sleep(1)  # 模拟慢操作
    return {"id": user_id, "name": "John Doe", "email": "john@example.com"}

# 批量处理以优化性能
class BatchProcessor:
    """批量处理器,用于优化多个操作"""

    def __init__(self, batch_size: int = 100, flush_interval: int = 5):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.queue = asyncio.Queue()
        self.processing = False

    async def add(self, item: Any):
        """将项目添加到批量队列"""
        await self.queue.put(item)
        if not self.processing:
            self.processing = True
            asyncio.create_task(self._process_batch())

    async def _process_batch(self):
        """批量处理项目"""
        while True:
            batch = []

            # 收集批量项目
            try:
                deadline = asyncio.get_event_loop().time() + self.flush_interval

                while len(batch) < self.batch_size:
                    try:
                        timeout = max(0, deadline - asyncio.get_event_loop().time())
                        item = await asyncio.wait_for(
                            self.queue.get(),
                            timeout=timeout
                        )
                        batch.append(item)
                    except asyncio.TimeoutError:
                        break

                if batch:
                    await self._process_batch_items(batch)

            except Exception as e:
                print(f"Error processing batch: {e}")
                continue

    async def _process_batch_items(self, batch: list[Any]):
        """处理一批项目"""
        # 在子类中覆盖
        # 示例:批量数据库插入,批量API调用等
        print(f"Processing batch of {len(batch)} items")

8. 生产配置清单

# api/production_checklist.yaml
performance:
  async_patterns:
    connection_pooling: true
    max_connections: 100
    connection_timeout: 30
    keep_alive: true

  caching:
    redis_cache: true
    default_ttl: 300
    cache_headers: true

  compression:
    gzip: true
    level: 6
    threshold: 1024

  rate_limiting:
    enabled: true
    default_limit: 100/minute
    burst_limit: 200
    sliding_window: true

security:
  authentication:
    jwt_validation: true
    token_refresh: true
    revoke_tokens: true

  authorization:
    rbac: true
    rate_limit_by_role: true

  validation:
    input_validation: true
    sql_injection_protection: true
    xss_protection: true

  headers:
    security_headers: true
    cors: true
    csrf_protection: true

monitoring:
  metrics:
    prometheus: true
    request_duration: true
    error_rate: true
    throughput: true

  logging:
    structured_logging: true
    correlation_ids: true
    error_tracking: true

  health_checks:
    liveness_probe: true
    readiness_probe: true
    dependency_checks: true

documentation:
  openapi:
    auto_generation: true
    examples: true
    schemas: true

  versioning:
    strategy: "header_path_query"
    deprecation_warnings: true
    backward_compatibility: true

  testing:
    unit_tests: true
    integration_tests: true
    performance_tests: true
    contract_tests: true