name: python-database-patterns description: “用于Python的SQLAlchemy和数据库模式。触发关键词:sqlalchemy、database、orm、migration、alembic、async database、connection pool、repository pattern、unit of work。” compatibility: “SQLAlchemy 2.0+, Python 3.10+。异步功能需要asyncpg(PostgreSQL)或aiosqlite。” allowed-tools: “Read Write Bash” depends-on: [python-typing-patterns, python-async-patterns] related-skills: [python-fastapi-patterns]
Python数据库模式
SQLAlchemy 2.0及数据库最佳实践。
SQLAlchemy 2.0基础
from sqlalchemy import create_engine, select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, Session
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
email: Mapped[str] = mapped_column(String(255), unique=True)
is_active: Mapped[bool] = mapped_column(default=True)
# 创建引擎和表
engine = create_engine("postgresql://user:pass@localhost/db")
Base.metadata.create_all(engine)
# 使用2.0风格查询
with Session(engine) as session:
stmt = select(User).where(User.is_active == True)
users = session.execute(stmt).scalars().all()
异步SQLAlchemy
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy import select
# 异步引擎
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
echo=False,
pool_size=5,
max_overflow=10,
)
# 会话工厂
async_session = async_sessionmaker(engine, expire_on_commit=False)
# 使用示例
async with async_session() as session:
result = await session.execute(select(User).where(User.id == 1))
user = result.scalar_one_or_none()
模型关系
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship, Mapped, mapped_column
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
# 一对多
posts: Mapped[list["Post"]] = relationship(back_populates="author")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# 多对一
author: Mapped["User"] = relationship(back_populates="posts")
常见查询模式
from sqlalchemy import select, and_, or_, func
# 基础选择
stmt = select(User).where(User.is_active == True)
# 多条件
stmt = select(User).where(
and_(
User.is_active == True,
User.age >= 18
)
)
# OR条件
stmt = select(User).where(
or_(User.role == "admin", User.role == "moderator")
)
# 排序和限制
stmt = select(User).order_by(User.created_at.desc()).limit(10)
# 聚合函数
stmt = select(func.count(User.id)).where(User.is_active == True)
# 连接查询
stmt = select(User, Post).join(Post, User.id == Post.author_id)
# 预加载
from sqlalchemy.orm import selectinload
stmt = select(User).options(selectinload(User.posts))
FastAPI集成
from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
yield session
DB = Annotated[AsyncSession, Depends(get_db)]
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: DB):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404)
return user
快速参考
| 操作 | SQLAlchemy 2.0风格 |
|---|---|
| 选择所有 | select(User) |
| 过滤 | .where(User.id == 1) |
| 获取第一条 | .scalar_one_or_none() |
| 获取所有 | .scalars().all() |
| 计数 | select(func.count(User.id)) |
| 连接 | .join(Post) |
| 预加载 | .options(selectinload(User.posts)) |
附加资源
./references/sqlalchemy-async.md- 异步模式,会话管理./references/connection-pooling.md- 连接池配置,健康检查./references/transactions.md- 事务模式,隔离级别./references/migrations.md- Alembic设置,迁移策略
资产文件
./assets/alembic.ini.template- Alembic配置模板
另请参阅
先决条件:
python-typing-patterns- 映射类型和注解python-async-patterns- 异步数据库会话
相关技能:
python-fastapi-patterns- 数据库会话的依赖注入python-pytest-patterns- 数据库夹具和测试