name: sqlalchemy-2-0 description: 支持异步的现代类型安全ORM与高效查询 when_to_use: 构建支持异步的数据库后端、API、数据服务
SQLAlchemy 2.0+ 技能
快速开始
基础设置
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import asyncio
# 模型基类
class Base(AsyncAttrs, DeclarativeBase):
pass
# 异步引擎
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
# 会话工厂
async_session = async_sessionmaker(engine, expire_on_commit=False)
# 示例模型
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50))
email: Mapped[str] = mapped_column(String(100))
基础CRUD操作
async def create_user(name: str, email: str) -> User:
async with async_session() as session:
async with session.begin():
user = User(name=name, email=email)
session.add(user)
await session.flush() # 获取ID
return user
async def get_user(user_id: int) -> User | None:
async with async_session() as session:
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
async def update_user_email(user_id: int, new_email: str) -> bool:
async with async_session() as session:
result = await session.execute(
update(User).where(User.id == user_id).values(email=new_email)
)
await session.commit()
return result.rowcount > 0
常用模式
模型
带注解的类型安全模型(推荐)
from typing_extensions import Annotated
from typing import List, Optional
# 可复用的列类型
intpk = Annotated[int, mapped_column(primary_key=True)]
str50 = Annotated[str, mapped_column(String(50))]
created_at = Annotated[datetime, mapped_column(insert_default=func.now())]
class Post(Base):
__tablename__ = "posts"
id: Mapped[intpk]
title: Mapped[str50]
content: Mapped[str] = mapped_column(Text)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
created: Mapped[created_at]
# 关系
author: Mapped["User"] = relationship(back_populates="posts")
tags: Mapped[List["Tag"]] = relationship(secondary="post_tags")
经典风格模型
class Post(Base):
__tablename__ = "posts"
id = mapped_column(Integer, primary_key=True)
title = mapped_column(String(50))
content = mapped_column(Text)
author_id = mapped_column(ForeignKey("users.id"))
author = relationship("User", back_populates="posts")
关系
一对多
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
posts: Mapped[List["Post"]] = relationship(
back_populates="author",
cascade="all, delete-orphan"
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")
多对多
association_table = Table(
"post_tags",
Base.metadata,
Column("post_id", ForeignKey("posts.id"), primary_key=True),
Column("tag_id", ForeignKey("tags.id"), primary_key=True)
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
tags: Mapped[List["Tag"]] = relationship(
secondary=association_table,
back_populates="posts"
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[List["Post"]] = relationship(
secondary=association_table,
back_populates="tags"
)
查询
基础选择
from sqlalchemy import select, and_, or_
# 获取所有用户
async def get_all_users():
async with async_session() as session:
result = await session.execute(select(User))
return result.scalars().all()
# 带条件过滤
async def get_users_by_name(name: str):
async with async_session() as session:
stmt = select(User).where(User.name.ilike(f"%{name}%"))
result = await session.execute(stmt)
return result.scalars().all()
# 复杂条件
async def search_users(name: str = None, email: str = None):
async with async_session() as session:
conditions = []
if name:
conditions.append(User.name.ilike(f"%{name}%"))
if email:
conditions.append(User.email.ilike(f"%{email}%"))
if conditions:
stmt = select(User).where(and_(*conditions))
else:
stmt = select(User)
result = await session.execute(stmt)
return result.scalars().all()
关系加载
from sqlalchemy.orm import selectinload, joinedload
# 预加载关系
async def get_posts_with_author():
async with async_session() as session:
stmt = select(Post).options(selectinload(Post.author))
result = await session.execute(stmt)
return result.scalars().all()
# 连接加载单个关系
async def get_post_with_tags(post_id: int):
async with async_session() as session:
stmt = select(Post).options(
joinedload(Post.author),
selectinload(Post.tags)
).where(Post.id == post_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
分页
async def get_posts_paginated(page: int, size: int):
async with async_session() as session:
offset = (page - 1) * size
stmt = select(Post).offset(offset).limit(size).order_by(Post.created.desc())
result = await session.execute(stmt)
return result.scalars().all()
聚合
from sqlalchemy import func
async def get_user_post_count():
async with async_session() as session:
stmt = (
select(User.name, func.count(Post.id).label("post_count"))
.join(Post)
.group_by(User.id, User.name)
.order_by(func.count(Post.id).desc())
)
result = await session.execute(stmt)
return result.all()
会话管理
上下文管理器模式
async def create_post(title: str, content: str, author_id: int):
async with async_session() as session:
async with session.begin():
post = Post(title=title, content=content, author_id=author_id)
session.add(post)
return post
依赖注入(FastAPI)
from fastapi import Depends
async def get_db_session():
async with async_session() as session:
try:
yield session
finally:
await session.close()
async def create_user_endpoint(
user_data: UserCreate,
session: AsyncSession = Depends(get_db_session)
):
user = User(**user_data.dict())
session.add(user)
await session.commit()
await session.refresh(user)
return user
作用域会话
from sqlalchemy.ext.asyncio import async_scoped_session
import asyncio
# 创建作用域会话
async_session_scope = async_scoped_session(
async_sessionmaker(engine, expire_on_commit=False),
scopefunc=asyncio.current_task
)
# 在应用中使用
async def some_function():
session = async_session_scope()
# 正常使用会话
await session.commit()
高级模式
只写关系(内存高效)
from sqlalchemy.orm import WriteOnlyMapped
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
posts: WriteOnlyMapped["Post"] = relationship()
async def get_user_posts(user_id: int):
async with async_session() as session:
user = await session.get(User, user_id)
if user:
# 显式选择集合
stmt = select(Post).where(Post.author_id == user_id)
result = await session.execute(stmt)
return result.scalars().all()
return []
自定义会话类
class AsyncSessionWithDefaults(AsyncSession):
async def execute_with_defaults(self, statement, **kwargs):
# 添加默认选项
return await self.execute(statement, **kwargs)
# 使用自定义会话
async_session = async_sessionmaker(
engine,
class_=AsyncSessionWithDefaults,
expire_on_commit=False
)
连接路由
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None, **kw):
if mapper and issubclass(mapper.class_, ReadOnlyModel):
return read_engine
return write_engine
class AsyncRoutingSession(AsyncSession):
sync_session_class = RoutingSession
原生SQL
from sqlalchemy import text
async def run_raw_sql():
async with async_session() as session:
result = await session.execute(text("SELECT COUNT(*) FROM users"))
count = result.scalar()
return count
async def run_parameterized_query(user_id: int):
async with async_session() as session:
stmt = text("SELECT * FROM posts WHERE author_id = :user_id")
result = await session.execute(stmt, {"user_id": user_id})
return result.fetchall()
性能提示
- 对集合使用selectinload:比延迟加载更高效
- 批量操作:使用
add_all()进行批量插入 - 连接池:根据负载配置连接池大小
- 索引列:为频繁查询的列添加索引
- 使用流式处理:对于大型结果集,使用
stream()
# 流式处理大型结果
async def process_all_users():
async with async_session() as session:
result = await session.stream(select(User))
async for user in result.scalars():
# 处理用户,无需全部加载到内存
await process_user(user)
依赖要求
uv add sqlalchemy[asyncio] # 核心SQLAlchemy
uv add asyncpg # PostgreSQL异步驱动
# 或
uv add aiosqlite # SQLite异步驱动
# 或
uv add aiomysql # MySQL异步驱动
数据库URL
- PostgreSQL:
postgresql+asyncpg://user:pass@localhost/db - SQLite:
sqlite+aiosqlite:///database.db - MySQL:
mysql+aiomysql://user:pass@localhost/db
迁移集成
使用Alembic进行数据库迁移:
# 生成迁移
uv run alembic revision --autogenerate -m "添加用户表"
# 应用迁移
uv run alembic upgrade head