API开发模式和最佳实践
这项技能提供了全面的模式,用于在2025年构建现代API,重点关注异步/等待模式、性能优化、安全性、测试和生产就绪配置,这些模式和配置适用于不同的框架和语言。
使用这项技能的时候
当您需要:
- 设计RESTful或GraphQL API
- 实现异步/等待模式以获得高性能
- 添加用于认证、日志记录和验证的中间件
- 优雅地处理错误,并使用适当的HTTP状态代码
- 实施速率限制和节流
- 生成OpenAPI/Swagger文档
- 设置全面的测试策略
- 使用缓存和连接池优化API性能
- 实施API版本控制和向后兼容性
- 设置监控和可观测性
核心API设计原则
1. 异步/等待模式以提升性能
# patterns/async_patterns.py
import asyncio
import aiohttp
import aioredis
from typing import AsyncGenerator, Optional, List, Dict, Any
from contextlib import asynccontextmanager
from dataclasses import dataclass
from functools import wraps
import time
@dataclass
class RequestMetrics:
"""用于监控的请求指标"""
duration: float
status_code: int
endpoint: str
method: str
user_id: Optional[str] = None
def with_metrics(func):
"""添加请求指标的装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
status_code = 200
endpoint = func.__name__
method = kwargs.get('method', 'GET')
try:
result = await func(*args, **kwargs)
if hasattr(result, 'status_code'):
status_code = result.status_code
return result
except Exception as e:
status_code = getattr(e, 'status_code', 500)
raise
finally:
duration = time.time() - start_time
metrics = RequestMetrics(
duration=duration,
status_code=status_code,
endpoint=endpoint,
method=method
)
# 发送指标到监控系统
await send_metrics(metrics)
return wrapper
async def send_metrics(metrics: RequestMetrics):
"""发送指标到监控系统"""
# 实现取决于您的监控系统
# 示例:Prometheus, Datadog, 或自定义分析
pass
class AsyncAPIClient:
"""具有连接池的通用异步API客户端"""
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url
self.timeout = aiohttp.ClientTimeout(total=timeout)
self._session: Optional[aiohttp.ClientSession] = None
self._session_lock = asyncio.Lock()
async def _get_session(self) -> aiohttp.ClientSession:
"""获取或创建具有连接池的会话"""
if self._session is None or self._session.closed:
async with self._session_lock:
if self._session is None or self._session.closed:
connector = aiohttp.TCPConnector(
limit=100, # 总连接池大小
limit_per_host=30, # 每个主机的连接数
force_close=False,
enable_cleanup_closed=True
)
self._session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout
)
return self._session
@with_metrics
async def get(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""带重试逻辑的GET请求"""
session = await self._get_session()
url = f"{self.base_url}{endpoint}"
async with session.get(
url,
params=params,
headers=headers
) as response:
response.raise_for_status()
return await response.json()
@with_metrics
async def post(
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""带重试逻辑的POST请求"""
session = await self._get_session()
url = f"{self.base_url}{endpoint}"
async with session.post(
url,
data=data,
json=json,
headers=headers
) as response:
response.raise_for_status()
return await response.json()
async def close(self):
"""关闭会话"""
if self._session:
await self._session.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
2. 断路器模式
# patterns/circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Callable, Any, Optional
from functools import wraps
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""用于容错的断路器"""
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
expected_exception: Exception = Exception
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def __call__(self, func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
if self.state == CircuitState.HALF_OPEN:
self._reset()
return result
except self.expected_exception as e:
self._record_failure()
raise
return wrapper
def _should_attempt_reset(self) -> bool:
return time.time() - self.last_failure_time >= self.timeout
def _record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _reset(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
# 使用示例
@circuit_breaker(failure_threshold=3, timeout=30)
async def external_api_call():
"""带有断路器的外部API调用"""
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as response:
return await response.json()
3. 使用Redis的速率限制
# patterns/rate_limiting.py
import asyncio
import aioredis
import time
from typing import Optional
from fastapi import HTTPException, Request
from starlette.middleware.base import BaseHTTPMiddleware
class RateLimiter:
"""使用Redis滑动窗口的速率限制器"""
def __init__(
self,
redis_url: str,
requests_per_minute: int = 100,
window_size: int = 60
):
self.redis = None
self.redis_url = redis_url
self.requests_per_minute = requests_per_minute
self.window_size = window_size
async def initialize(self):
"""初始化Redis连接"""
self.redis = await aioredis.from_url(self.redis_url)
async def is_allowed(
self,
key: str,
limit: Optional[int] = None,
window: Optional[int] = None
) -> bool:
"""检查请求是否被允许"""
if not self.redis:
await self.initialize()
limit = limit or self.requests_per_minute
window = window or self.window_size
current_time = time.time()
# 从滑动窗口中移除旧的请求
await self.redis.zremrangebyscore(
key,
0,
current_time - window
)
# 计算窗口中的当前请求数
current_requests = await self.redis.zcard(key)
if current_requests >= limit:
return False
# 将当前请求添加到窗口
await self.redis.zadd(key, {str(current_time): current_time})
await self.redis.expire(key, window)
return True
class RateLimitMiddleware(BaseHTTPMiddleware):
"""FastAPI速率限制中间件"""
def __init__(
self,
app,
redis_url: str,
requests_per_minute: int = 100,
identifier_func=None
):
super().__init__(app)
self.limiter = RateLimiter(redis_url, requests_per_minute)
self.identifier_func = identifier_func or self._default_identifier
def _default_identifier(self, request: Request) -> str:
"""默认标识符函数"""
# 使用IP地址作为标识符
return request.client.host
async def dispatch(self, request: Request, call_next):
identifier = self.identifier_func(request)
key = f"rate_limit:{identifier}:{request.url.path}"
if not await self.limiter.is_allowed(key):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
return await call_next(request)
4. API版本控制策略
# patterns/versioning.py
from enum import Enum
from typing import Optional, Dict, Any, Type
from abc import ABC, abstractmethod
class APIVersion(str, Enum):
V1 = "v1"
V2 = "v2"
V3 = "v3"
class APIVersionStrategy(ABC):
"""API版本控制策略基类"""
@abstractmethod
def get_version_from_request(self, request) -> Optional[APIVersion]:
"""从请求中提取版本"""
pass
class HeaderVersionStrategy(APIVersionStrategy):
"""从头部提取版本策略"""
def __init__(self, header_name: str = "API-Version"):
self.header_name = header_name
def get_version_from_request(self, request) -> Optional[APIVersion]:
version = request.headers.get(self.header_name)
return APIVersion(version) if version in APIVersion.__members__ else None
class URLPathVersionStrategy(APIVersionStrategy):
"""从URL路径提取版本策略"""
def __init__(self, prefix: str = "/api"):
self.prefix = prefix
def get_version_from_request(self, request) -> Optional[APIVersion]:
path = request.url.path
if not path.startswith(self.prefix):
return None
version_part = path[len(self.prefix):].split("/")[1]
return APIVersion(version_part) if version_part in APIVersion.__members__ else None
class QueryParamVersionStrategy(APIVersionStrategy):
"""从查询参数提取版本策略"""
def __init__(self, param_name: str = "version"):
self.param_name = param_name
def get_version_from_request(self, request) -> Optional[APIVersion]:
version = request.query_params.get(self.param_name)
return APIVersion(version) if version in APIVersion.__members__ else None
class CompositeVersionStrategy(APIVersionStrategy):
"""尝试多种策略的复合版本策略"""
def __init__(self, strategies: list[APIVersionStrategy]):
self.strategies = strategies
self.default_version = APIVersion.V1
def get_version_from_request(self, request) -> Optional[APIVersion]:
for strategy in self.strategies:
version = strategy.get_version_from_request(request)
if version:
return version
return self.default_version
# 版本特定的处理器
class APIHandlerRegistry:
"""版本特定API处理器注册表"""
def __init__(self):
self.handlers: Dict[APIVersion, Dict[str, Any]] = {
version: {} for version in APIVersion
}
def register_handler(
self,
version: APIVersion,
endpoint: str,
handler: Any
):
"""为特定版本注册处理器"""
if version not in self.handlers:
self.handlers[version] = {}
self.handlers[version][endpoint] = handler
def get_handler(self, version: APIVersion, endpoint: str) -> Optional[Any]:
"""获取版本和端点的处理器"""
return self.handlers.get(version, {}).get(endpoint)
# 使用示例
version_strategy = CompositeVersionStrategy([
HeaderVersionStrategy(),
URLPathVersionStrategy(),
QueryParamVersionStrategy()
])
5. OpenAPI文档增强
# patterns/openapi.py
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
class ErrorSchema(BaseModel):
"""标准错误响应模式"""
error: str = Field(..., description="错误类型")
message: str = Field(..., description="错误信息")
details: Optional[Dict[str, Any]] = Field(
None,
description="额外的错误详情"
)
timestamp: datetime = Field(
default_factory=datetime.utcnow,
description="错误时间戳"
)
class PaginationLinks(BaseModel):
"""分页链接模式"""
first: Optional[str] = Field(None, description="第一页链接")
last: Optional[str] = Field(None, description="最后一页链接")
next: Optional[str] = Field(None, description="下一页链接")
prev: Optional[str] = Field(None, description="上一页链接")
class PaginationMeta(BaseModel):
"""分页元数据模式"""
total: int = Field(..., description="总项目数")
page: int = Field(..., description="当前页码")
per_page: int = Field(..., description="每页项目数")
pages: int = Field(..., description="总页数")
class PaginatedResponse(BaseModel):
"""通用分页响应模式"""
data: List[Any] = Field(..., description="响应数据")
meta: PaginationMeta = Field(..., description="分页元数据")
links: PaginationLinks = Field(..., description="分页链接")
# 增强的OpenAPI配置
OPENAPI_CONFIG = {
"title": "现代API",
"description": "具有异步模式的现代REST API",
"version": "1.0.0",
"docs_url": "/docs",
"redoc_url": "/redoc",
"openapi_url": "/openapi.json",
"servers": [
{
"url": "https://api.example.com/v1",
"description": "生产服务器"
},
{
"url": "https://staging-api.example.com/v1",
"description": "暂存服务器"
}
],
"components": {
"schemas": {
"Error": ErrorSchema.model_json_schema(),
"PaginatedResponse": PaginatedResponse.model_json_schema()
},
"responses": {
"ValidationError": {
"description": "验证错误",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
},
"UnauthorizedError": {
"description": "未授权错误",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
},
"NotFoundError": {
"description": "资源未找到",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
},
"RateLimitError": {
"description": "速率限制超出",
"content": {
"application/json": {
"schema": ErrorSchema.model_json_schema()
}
}
}
}
}
}
6. API测试模式
# patterns/testing.py
import pytest
import asyncio
from typing import AsyncGenerator
import httpx
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock, patch
class AsyncAPITestCase:
"""异步API测试基类"""
@pytest.fixture(scope="class")
async def async_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
"""创建异步测试客户端"""
async with httpx.AsyncClient(
app=self.app,
base_url="http://test"
) as client:
yield client
@pytest.fixture
def mock_external_service(self):
"""模拟外部服务"""
with patch("external_api_client.ExternalAPIClient") as mock:
client = mock.return_value
client.get.return_value = {"status": "ok"}
yield client
# 示例测试用例
@pytest.mark.asyncio
class TestUserAPI(AsyncAPITestCase):
"""测试用户API端点"""
async def test_create_user(self, async_client: httpx.AsyncClient):
"""测试用户创建"""
user_data = {
"email": "test@example.com",
"username": "testuser",
"password": "securepassword123"
}
response = await async_client.post(
"/api/users",
json=user_data
)
assert response.status_code == 201
data = response.json()
assert data["email"] == user_data["email"]
assert data["username"] == user_data["username"]
assert "password" not in data
async def test_get_user(self, async_client: httpx.AsyncClient):
"""测试获取用户"""
# 首先创建一个用户
create_response = await async_client.post(
"/api/users",
json={
"email": "test2@example.com",
"username": "testuser2",
"password": "securepassword123"
}
)
user_id = create_response.json()["id"]
# 获取用户
response = await async_client.get(f"/api/users/{user_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == user_id
async def test_list_users_with_pagination(
self,
async_client: httpx.AsyncClient
):
"""测试分页用户列表"""
response = await async_client.get("/api/users?page=1&per_page=10")
assert response.status_code == 200
data = response.json()
assert "data" in data
assert "meta" in data
assert "links" in data
assert data["meta"]["page"] == 1
assert data["meta"]["per_page"] == 10
async def test_rate_limiting(
self,
async_client: httpx.AsyncClient
):
"""测试速率限制"""
# 快速多次请求
responses = []
for _ in range(150): # 假设速率限制是100/分钟
response = await async_client.get("/api/users")
responses.append(response)
# 检查是否触发速率限制
rate_limited = any(
r.status_code == 429 for r in responses
)
assert rate_limited
# 集成测试
@pytest.mark.asyncio
async def test_full_user_flow():
"""测试完整的用户工作流"""
async with httpx.AsyncClient(
app=app,
base_url="http://test"
) as client:
# 创建用户
user_data = {
"email": "flowtest@example.com",
"username": "flowtest",
"password": "securepassword123"
}
create_resp = await client.post("/api/users", json=user_data)
assert create_resp.status_code == 201
user = create_resp.json()
user_id = user["id"]
# 获取用户
get_resp = await client.get(f"/api/users/{user_id}")
assert get_resp.status_code == 200
assert get_resp.json()["id"] == user_id
# 更新用户
update_data = {"username": "updateduser"}
update_resp = await client.patch(
f"/api/users/{user_id}",
json=update_data
)
assert update_resp.status_code == 200
assert update_resp.json()["username"] == "updateduser"
# 删除用户
delete_resp = await client.delete(f"/api/users/{user_id}")
assert delete_resp.status_code == 204
# 验证用户已删除
get_resp = await client.get(f"/api/users/{user_id}")
assert get_resp.status_code == 404
7. 性能优化模式
# patterns/performance.py
import asyncio
import asyncio.cache
import aioredis
from typing import Any, Optional, Callable
from functools import wraps
import hashlib
import json
import pickle
class ResponseCache:
"""使用Redis的响应缓存"""
def __init__(self, redis_url: str, default_ttl: int = 300):
self.redis = None
self.redis_url = redis_url
self.default_ttl = default_ttl
async def initialize(self):
"""初始化Redis连接"""
self.redis = await aioredis.from_url(self.redis_url)
def _make_cache_key(self, prefix: str, *args, **kwargs) -> str:
"""根据参数生成缓存键"""
key_data = {
"args": args,
"kwargs": kwargs
}
key_str = json.dumps(key_data, sort_keys=True, default=str)
key_hash = hashlib.sha256(key_str.encode()).hexdigest()
return f"{prefix}:{key_hash}"
async def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
if not self.redis:
await self.initialize()
cached = await self.redis.get(key)
if cached:
return pickle.loads(cached)
return None
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
):
"""设置缓存值"""
if not self.redis:
await self.initialize()
ttl = ttl or self.default_ttl
serialized = pickle.dumps(value)
await self.redis.setex(key, ttl, serialized)
async def delete(self, key: str):
"""删除缓存值"""
if self.redis:
await self.redis.delete(key)
def cache_response(
prefix: str,
ttl: Optional[int] = None,
cache: Optional[ResponseCache] = None
):
"""缓存响应的装饰器"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
cache_instance = cache or ResponseCache("redis://localhost")
# 生成缓存键
cache_key = cache_instance._make_cache_key(prefix, *args, **kwargs)
# 尝试从缓存中获取
cached_result = await cache_instance.get(cache_key)
if cached_result is not None:
return cached_result
# 执行函数
result = await func(*args, **kwargs)
# 缓存结果
await cache_instance.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# 使用示例
@cache_response(prefix="user_data", ttl=300)
async def get_user_data(user_id: int) -> Dict[str, Any]:
"""带缓存获取用户数据"""
# 昂贵的数据库操作或API调用
await asyncio.sleep(1) # 模拟慢操作
return {"id": user_id, "name": "John Doe", "email": "john@example.com"}
# 批量处理以优化性能
class BatchProcessor:
"""批量处理器,用于优化多个操作"""
def __init__(self, batch_size: int = 100, flush_interval: int = 5):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.queue = asyncio.Queue()
self.processing = False
async def add(self, item: Any):
"""将项目添加到批量队列"""
await self.queue.put(item)
if not self.processing:
self.processing = True
asyncio.create_task(self._process_batch())
async def _process_batch(self):
"""批量处理项目"""
while True:
batch = []
# 收集批量项目
try:
deadline = asyncio.get_event_loop().time() + self.flush_interval
while len(batch) < self.batch_size:
try:
timeout = max(0, deadline - asyncio.get_event_loop().time())
item = await asyncio.wait_for(
self.queue.get(),
timeout=timeout
)
batch.append(item)
except asyncio.TimeoutError:
break
if batch:
await self._process_batch_items(batch)
except Exception as e:
print(f"Error processing batch: {e}")
continue
async def _process_batch_items(self, batch: list[Any]):
"""处理一批项目"""
# 在子类中覆盖
# 示例:批量数据库插入,批量API调用等
print(f"Processing batch of {len(batch)} items")
8. 生产配置清单
# api/production_checklist.yaml
performance:
async_patterns:
connection_pooling: true
max_connections: 100
connection_timeout: 30
keep_alive: true
caching:
redis_cache: true
default_ttl: 300
cache_headers: true
compression:
gzip: true
level: 6
threshold: 1024
rate_limiting:
enabled: true
default_limit: 100/minute
burst_limit: 200
sliding_window: true
security:
authentication:
jwt_validation: true
token_refresh: true
revoke_tokens: true
authorization:
rbac: true
rate_limit_by_role: true
validation:
input_validation: true
sql_injection_protection: true
xss_protection: true
headers:
security_headers: true
cors: true
csrf_protection: true
monitoring:
metrics:
prometheus: true
request_duration: true
error_rate: true
throughput: true
logging:
structured_logging: true
correlation_ids: true
error_tracking: true
health_checks:
liveness_probe: true
readiness_probe: true
dependency_checks: true
documentation:
openapi:
auto_generation: true
examples: true
schemas: true
versioning:
strategy: "header_path_query"
deprecation_warnings: true
backward_compatibility: true
testing:
unit_tests: true
integration_tests: true
performance_tests: true
contract_tests: true