SQLAlchemy2.0+异步ORM技能 sqlalchemy-2-0

SQLAlchemy 2.0+ 异步ORM技能详解,涵盖类型安全模型定义、高效异步查询、关系映射、会话管理及高级模式。适用于构建高性能、可扩展的异步数据库后端、API服务与数据应用。关键词:Python异步ORM,SQLAlchemy 2.0,类型安全模型,异步数据库,高效查询,关系映射,会话管理,FastAPI集成,性能优化。

后端开发 0 次安装 0 次浏览 更新于 3/2/2026

name: sqlalchemy-2-0 description: 支持异步的现代类型安全ORM与高效查询 when_to_use: 构建支持异步的数据库后端、API、数据服务

SQLAlchemy 2.0+ 技能

快速开始

基础设置

from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import asyncio

# 模型基类
class Base(AsyncAttrs, DeclarativeBase):
    pass

# 异步引擎
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")

# 会话工厂
async_session = async_sessionmaker(engine, expire_on_commit=False)

# 示例模型
class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50))
    email: Mapped[str] = mapped_column(String(100))

基础CRUD操作

async def create_user(name: str, email: str) -> User:
    async with async_session() as session:
        async with session.begin():
            user = User(name=name, email=email)
            session.add(user)
            await session.flush()  # 获取ID
            return user

async def get_user(user_id: int) -> User | None:
    async with async_session() as session:
        result = await session.execute(select(User).where(User.id == user_id))
        return result.scalar_one_or_none()

async def update_user_email(user_id: int, new_email: str) -> bool:
    async with async_session() as session:
        result = await session.execute(
            update(User).where(User.id == user_id).values(email=new_email)
        )
        await session.commit()
        return result.rowcount > 0

常用模式

模型

带注解的类型安全模型(推荐)

from typing_extensions import Annotated
from typing import List, Optional

# 可复用的列类型
intpk = Annotated[int, mapped_column(primary_key=True)]
str50 = Annotated[str, mapped_column(String(50))]
created_at = Annotated[datetime, mapped_column(insert_default=func.now())]

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[intpk]
    title: Mapped[str50]
    content: Mapped[str] = mapped_column(Text)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    created: Mapped[created_at]

    # 关系
    author: Mapped["User"] = relationship(back_populates="posts")
    tags: Mapped[List["Tag"]] = relationship(secondary="post_tags")

经典风格模型

class Post(Base):
    __tablename__ = "posts"

    id = mapped_column(Integer, primary_key=True)
    title = mapped_column(String(50))
    content = mapped_column(Text)
    author_id = mapped_column(ForeignKey("users.id"))

    author = relationship("User", back_populates="posts")

关系

一对多

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    posts: Mapped[List["Post"]] = relationship(
        back_populates="author",
        cascade="all, delete-orphan"
    )

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    author: Mapped["User"] = relationship(back_populates="posts")

多对多

association_table = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", ForeignKey("posts.id"), primary_key=True),
    Column("tag_id", ForeignKey("tags.id"), primary_key=True)
)

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    tags: Mapped[List["Tag"]] = relationship(
        secondary=association_table,
        back_populates="posts"
    )

class Tag(Base):
    __tablename__ = "tags"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)
    posts: Mapped[List["Post"]] = relationship(
        secondary=association_table,
        back_populates="tags"
    )

查询

基础选择

from sqlalchemy import select, and_, or_

# 获取所有用户
async def get_all_users():
    async with async_session() as session:
        result = await session.execute(select(User))
        return result.scalars().all()

# 带条件过滤
async def get_users_by_name(name: str):
    async with async_session() as session:
        stmt = select(User).where(User.name.ilike(f"%{name}%"))
        result = await session.execute(stmt)
        return result.scalars().all()

# 复杂条件
async def search_users(name: str = None, email: str = None):
    async with async_session() as session:
        conditions = []
        if name:
            conditions.append(User.name.ilike(f"%{name}%"))
        if email:
            conditions.append(User.email.ilike(f"%{email}%"))

        if conditions:
            stmt = select(User).where(and_(*conditions))
        else:
            stmt = select(User)

        result = await session.execute(stmt)
        return result.scalars().all()

关系加载

from sqlalchemy.orm import selectinload, joinedload

# 预加载关系
async def get_posts_with_author():
    async with async_session() as session:
        stmt = select(Post).options(selectinload(Post.author))
        result = await session.execute(stmt)
        return result.scalars().all()

# 连接加载单个关系
async def get_post_with_tags(post_id: int):
    async with async_session() as session:
        stmt = select(Post).options(
            joinedload(Post.author),
            selectinload(Post.tags)
        ).where(Post.id == post_id)
        result = await session.execute(stmt)
        return result.scalar_one_or_none()

分页

async def get_posts_paginated(page: int, size: int):
    async with async_session() as session:
        offset = (page - 1) * size
        stmt = select(Post).offset(offset).limit(size).order_by(Post.created.desc())
        result = await session.execute(stmt)
        return result.scalars().all()

聚合

from sqlalchemy import func

async def get_user_post_count():
    async with async_session() as session:
        stmt = (
            select(User.name, func.count(Post.id).label("post_count"))
            .join(Post)
            .group_by(User.id, User.name)
            .order_by(func.count(Post.id).desc())
        )
        result = await session.execute(stmt)
        return result.all()

会话管理

上下文管理器模式

async def create_post(title: str, content: str, author_id: int):
    async with async_session() as session:
        async with session.begin():
            post = Post(title=title, content=content, author_id=author_id)
            session.add(post)
            return post

依赖注入(FastAPI)

from fastapi import Depends

async def get_db_session():
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()

async def create_user_endpoint(
    user_data: UserCreate,
    session: AsyncSession = Depends(get_db_session)
):
    user = User(**user_data.dict())
    session.add(user)
    await session.commit()
    await session.refresh(user)
    return user

作用域会话

from sqlalchemy.ext.asyncio import async_scoped_session
import asyncio

# 创建作用域会话
async_session_scope = async_scoped_session(
    async_sessionmaker(engine, expire_on_commit=False),
    scopefunc=asyncio.current_task
)

# 在应用中使用
async def some_function():
    session = async_session_scope()
    # 正常使用会话
    await session.commit()

高级模式

只写关系(内存高效)

from sqlalchemy.orm import WriteOnlyMapped

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    posts: WriteOnlyMapped["Post"] = relationship()

async def get_user_posts(user_id: int):
    async with async_session() as session:
        user = await session.get(User, user_id)
        if user:
            # 显式选择集合
            stmt = select(Post).where(Post.author_id == user_id)
            result = await session.execute(stmt)
            return result.scalars().all()
        return []

自定义会话类

class AsyncSessionWithDefaults(AsyncSession):
    async def execute_with_defaults(self, statement, **kwargs):
        # 添加默认选项
        return await self.execute(statement, **kwargs)

# 使用自定义会话
async_session = async_sessionmaker(
    engine,
    class_=AsyncSessionWithDefaults,
    expire_on_commit=False
)

连接路由

class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kw):
        if mapper and issubclass(mapper.class_, ReadOnlyModel):
            return read_engine
        return write_engine

class AsyncRoutingSession(AsyncSession):
    sync_session_class = RoutingSession

原生SQL

from sqlalchemy import text

async def run_raw_sql():
    async with async_session() as session:
        result = await session.execute(text("SELECT COUNT(*) FROM users"))
        count = result.scalar()
        return count

async def run_parameterized_query(user_id: int):
    async with async_session() as session:
        stmt = text("SELECT * FROM posts WHERE author_id = :user_id")
        result = await session.execute(stmt, {"user_id": user_id})
        return result.fetchall()

性能提示

  1. 对集合使用selectinload:比延迟加载更高效
  2. 批量操作:使用 add_all() 进行批量插入
  3. 连接池:根据负载配置连接池大小
  4. 索引列:为频繁查询的列添加索引
  5. 使用流式处理:对于大型结果集,使用 stream()
# 流式处理大型结果
async def process_all_users():
    async with async_session() as session:
        result = await session.stream(select(User))
        async for user in result.scalars():
            # 处理用户,无需全部加载到内存
            await process_user(user)

依赖要求

uv add sqlalchemy[asyncio]  # 核心SQLAlchemy
uv add asyncpg             # PostgreSQL异步驱动
# 或
uv add aiosqlite           # SQLite异步驱动
# 或
uv add aiomysql            # MySQL异步驱动

数据库URL

  • PostgreSQL: postgresql+asyncpg://user:pass@localhost/db
  • SQLite: sqlite+aiosqlite:///database.db
  • MySQL: mysql+aiomysql://user:pass@localhost/db

迁移集成

使用Alembic进行数据库迁移:

# 生成迁移
uv run alembic revision --autogenerate -m "添加用户表"

# 应用迁移
uv run alembic upgrade head