Python FastAPI 开发
专家模式用于构建 Python API,使用 FastAPI、uv 包管理器、模块化架构和 SQLAlchemy 数据库集成。
技术栈
- 运行时:Python 3.12+
- 包管理器:uv(快速,基于 Rust)
- 框架:FastAPI
- ORM:SQLAlchemy 2.0(异步)
- 验证:Pydantic v2
- 数据库:PostgreSQL(或 SQLite 用于开发)
- 迁移:Alembic
- 测试:pytest, pytest-asyncio
- 代码检查:ruff
项目结构
基于特性的模块化架构 - 代码按领域组织,而不是按层:
我的项目/
├── pyproject.toml # 项目配置,使用 uv
├── uv.lock # 锁定文件
├── .python-version # Python 版本
├── .env # 环境变量
├── .env.example
├── alembic.ini # Alembic 配置
├── alembic/ # 迁移
│ ├── env.py
│ ├── script.py.mako
│ └── versions/
├── src/
│ └── app/
│ ├── __init__.py
│ ├── main.py # FastAPI 应用入口
│ ├── config.py # 设置
│ ├── database.py # 数据库会话
│ ├── core/
│ │ ├── __init__.py
│ │ ├── dependencies.py # 共享依赖项
│ │ ├── exceptions.py # 自定义异常
│ │ ├── middleware.py # 中间件
│ │ └── security.py # 认证工具
│ ├── models/
│ │ ├── __init__.py
│ │ └── base.py # SQLAlchemy 基础和混合
│ ├── features/
│ │ ├── __init__.py
│ │ ├── auth/
│ │ │ ├── __init__.py
│ │ │ ├── api.py # 认证端点
│ │ │ ├── schemas.py # 认证 Pydantic 模式
│ │ │ ├── services.py # 认证业务逻辑
│ │ │ ├── models.py # 认证 SQLAlchemy 模型
│ │ │ └── utils.py # 认证助手(JWT 等)
│ │ ├── users/
│ │ │ ├── __init__.py
│ │ │ ├── api.py # 用户端点
│ │ │ ├── schemas.py # 用户 Pydantic 模式
│ │ │ ├── services.py # 用户业务逻辑
│ │ │ ├── models.py # 用户 SQLAlchemy 模型
│ │ │ └── repository.py # 用户数据访问
│ │ └── items/
│ │ ├── __init__.py
│ │ ├── api.py
│ │ ├── schemas.py
│ │ ├── services.py
│ │ └── models.py
│ └── api/
│ ├── __init__.py
│ └── router.py # 聚合所有特性路由器
└── tests/
├── __init__.py
├── conftest.py
├── features/
│ ├── auth/
│ │ └── test_auth.py
│ └── users/
│ └── test_users.py
└── integration/
快速设置使用 uv
# 安装 uv
curl -LsSf https://astral.sh/uv/install.sh | sh
# 创建新项目
uv init 我的项目
cd 我的项目
# 设置 Python 版本
uv python pin 3.12
# 添加依赖项
uv add fastapi uvicorn[standard] sqlalchemy[asyncio] asyncpg
uv add pydantic pydantic-settings python-dotenv
uv add alembic
# 添加开发依赖项
uv add --dev pytest pytest-asyncio pytest-cov httpx ruff mypy
# 创建源结构
mkdir -p src/app/{api/v1/endpoints,core,models,schemas,services,repositories}
touch src/app/__init__.py
核心模式
pyproject.toml
[project]
name = "我的项目"
version = "0.1.0"
description = "FastAPI 应用程序"
requires-python = ">=3.12"
dependencies = [
"fastapi>=0.115.0",
"uvicorn[standard]>=0.32.0",
"sqlalchemy[asyncio]>=2.0.0",
"asyncpg>=0.30.0",
"pydantic>=2.10.0",
"pydantic-settings>=2.6.0",
"python-dotenv>=1.0.0",
"alembic>=1.14.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.24.0",
"pytest-cov>=6.0.0",
"httpx>=0.28.0",
"ruff>=0.8.0",
"mypy>=1.13.0",
]
[tool.ruff]
target-version = "py312"
line-length = 88
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W", "UP", "B", "C4", "SIM"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
[tool.mypy]
python_version = "3.12"
strict = true
配置(src/app/config.py)
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
# App
app_name: str = "我的 API"
debug: bool = False
api_v1_prefix: str = "/api/v1"
# 数据库
database_url: str = "postgresql+asyncpg://user:pass@localhost:5432/db"
# 安全
secret_key: str = "change-me-in-production"
access_token_expire_minutes: int = 30
@lru_cache
def get_settings() -> Settings:
return Settings()
settings = get_settings()
数据库设置(src/app/database.py)
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from app.config import settings
engine = create_async_engine(
settings.database_url,
echo=settings.debug,
pool_pre_ping=True,
)
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
SQLAlchemy 基础模型(src/app/models/base.py)
from datetime import datetime
from sqlalchemy import DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)
特性模块模式
每个特性都是自包含的,有自己的 api、模式、服务、模型和工具。
特性:users/schemas.py
from pydantic import BaseModel, EmailStr, ConfigDict
class UserBase(BaseModel):
email: EmailStr
full_name: str | None = None
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
email: EmailStr | None = None
full_name: str | None = None
password: str | None = None
class UserResponse(UserBase):
model_config = ConfigDict(from_attributes=True)
id: int
is_active: bool
特性:users/models.py
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column
from app.models.base import Base, TimestampMixin
class User(Base, TimestampMixin):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
full_name: Mapped[str | None] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(default=True)
特性:users/repository.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.features.users.models import User
class UserRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def get(self, id: int) -> User | None:
result = await self.db.execute(select(User).where(User.id == id))
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> User | None:
result = await self.db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def get_all(self, skip: int = 0, limit: int = 100) -> list[User]:
result = await self.db.execute(select(User).offset(skip).limit(limit))
return list(result.scalars().all())
async def create(self, data: dict) -> User:
user = User(**data)
self.db.add(user)
await self.db.flush()
await self.db.refresh(user)
return user
async def update(self, user: User, data: dict):
for field, value in data.items():
if value is not None:
setattr(user, field, value)
await self.db.flush()
await self.db.refresh(user)
return user
特性:users/services.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import hash_password
from app.features.users.models import User
from app.features.users.repository import UserRepository
from app.features.users.schemas import UserCreate, UserUpdate
class UserService:
def __init__(self, db: AsyncSession):
self.db = db
self.repo = UserRepository(db)
async def get(self, user_id: int) -> User | None:
return await self.repo.get(user_id)
async def get_by_email(self, email: str) -> User | None:
return await self.repo.get_by_email(email)
async def list(self, skip: int = 0, limit: int = 100) -> list[User]:
return await self.repo.get_all(skip=skip, limit=limit)
async def create(self, user_in: UserCreate) -> User:
data = user_in.model_dump()
data["hashed_password"] = hash_password(data.pop("password"))
return await self.repo.create(data)
async def update(self, user: User, user_in: UserUpdate) -> User:
data = user_in.model_dump(exclude_unset=True)
if "password" in data:
data["hashed_password"] = hash_password(data.pop("password"))
return await self.repo.update(user, data)
特性:users/api.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.features.users.schemas import UserCreate, UserResponse, UserUpdate
from app.features.users.services import UserService
router = APIRouter(prefix="/users", tags=["users"])
def get_service(db: AsyncSession = Depends(get_db)) -> UserService:
return UserService(db)
@router.get("", response_model=list[UserResponse])
async def list_users(
skip: int = 0,
limit: int = 100,
service: UserService = Depends(get_service),
):
return await service.list(skip=skip, limit=limit)
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
service: UserService = Depends(get_service),
):
user = await service.get(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户未找到")
return user
@router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_in: UserCreate,
service: UserService = Depends(get_service),
):
if await service.get_by_email(user_in.email):
raise HTTPException(status_code=400, detail="电子邮件已注册")
return await service.create(user_in)
@router.patch("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_in: UserUpdate,
service: UserService = Depends(get_service),
):
user = await service.get(user_id)
if not user:
raise HTTPException(status_code=404, detail="用户未找到")
return await service.update(user, user_in)
特性:users/init.py(导出)
from app.features.users.api import router
from app.features.users.models import User
from app.features.users.schemas import UserCreate, UserResponse, UserUpdate
from app.features.users.services import UserService
__all__ = ["router", "User", "UserCreate", "UserResponse", "UserUpdate", "UserService"]
主路由器(src/app/api/router.py)
from fastapi import APIRouter
from app.features.auth import router as auth_router
from app.features.users import router as users_router
from app.features.items import router as items_router
api_router = APIRouter()
api_router.include_router(auth_router)
api_router.include_router(users_router)
api_router.include_router(items_router)
FastAPI 应用(src/app/main.py)
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.router import api_router
from app.config import settings
from app.database import engine
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# 启动
yield
# 关闭
await engine.dispose()
app = FastAPI(
title=settings.app_name,
openapi_url=f"{settings.api_v1_prefix}/openapi.json",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产配置
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router, prefix=settings.api_v1_prefix)
@app.get("/health")
async def health_check():
return {"status": "healthy"}
数据库迁移与 Alembic
# 初始化 Alembic
uv run alembic init alembic
# 更新 alembic/env.py 为异步
# 然后创建迁移
uv run alembic revision --autogenerate -m "初始迁移"
# 应用迁移
uv run alembic upgrade head
异步 Alembic env.py
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.config import settings
from app.models.base import Base
from app.models import user, item # 导入所有模型
config = context.config
config.set_main_option("sqlalchemy.url", settings.database_url)
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
测试
conftest.py
import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from app.main import app
from app.database import get_db
from app.models.base import Base
@pytest.fixture
async def db_session():
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=True,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
session_maker = async_sessionmaker(engine, expire_on_commit=False)
async with session_maker() as session:
yield session
await engine.dispose()
@pytest.fixture
async def client(db_session: AsyncSession):
async def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test"
) as ac:
yield ac
app.dependency_overrides.clear()
示例测试
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_create_user(client: AsyncClient):
response = await client.post(
"/api/v1/users",
json={
"email": "test@example.com",
"password": "testpassword123",
"full_name": "测试用户",
},
)
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert "id" in data
运行应用程序
# 开发
uv run uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# 生产
uv run uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
# 运行测试
uv run pytest -v
# 运行覆盖率
uv run pytest --cov=app --cov-report=html
# 代码检查和格式化
uv run ruff check .
uv run ruff format .
# 类型检查
uv run mypy src/
最佳实践
-
分层架构
- 路由:处理 HTTP、验证、响应格式化
- 服务:业务逻辑、编排
- 仓库:数据访问、查询
-
依赖注入
- 使用 FastAPI 的
Depends()进行清晰的 DI - 注入数据库会话、服务、配置
- 使用 FastAPI 的
-
类型安全
- 使用 Pydantic 用于所有请求/响应模式
- 使用 SQLAlchemy 2.0 映射列与类型
- 启用严格的 mypy
-
异步优先
- 整个过程中使用 async/await
- 使用 asyncpg 用于 PostgreSQL
- 使用 aiosqlite 用于测试
-
配置
- 使用 pydantic-settings 进行类型安全的配置
- 从环境变量加载
- 永远不要提交秘密
-
测试
- 使用内存 SQLite 进行单元测试
- 使用测试容器进行集成测试
- 模拟外部服务