PythonAsyncPatternsSkill py-async-patterns

Python异步编程模式,用于FastAPI和SQLAlchemy。适用于处理异步代码、数据库会话、并发操作或调试Python中的异步问题。

后端开发 0 次安装 0 次浏览 更新于 3/3/2026

Python Async Patterns

问题陈述

Async Python功能强大但容易出错。在异步代码库中,常见的陷阱包括竞态条件、会话泄漏和连接池问题。


模式:AsyncSession 生命周期

**问题:**会话必须限定在请求范围内。会话泄漏会导致数据过时和连接耗尽。

# ✅ 正确:通过依赖将会话限定在请求范围内
async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        yield session
        # 请求结束后会话自动关闭

# 在端点中的使用
@router.get("/users/{user_id}")
async def get_user(
    user_id: UUID,
    session: AsyncSession = Depends(get_session),
) -> UserRead:
    result = await session.execute(select(User).where(User.id == user_id))
    return result.scalar_one()

# ❌ 错误:全局会话(数据过时,连接泄漏)
_global_session = None  # 切勿这样做

async def get_user(user_id: UUID):
    result = await _global_session.execute(...)  # 过时的,共享状态

**为什么重要:**每个请求都需要隔离的数据库状态。共享会话会看到过时的数据,并且不能安全地提交。


模式:并发与顺序查询

**问题:**独立查询顺序执行浪费时间。但依赖的查询必须是顺序的。

# ✅ 正确:并发独立查询
async def get_dashboard_data(user_id: UUID, session: AsyncSession):
    # 这些互不依赖 - 并行运行
    user_result, stats_result, recent_result = await asyncio.gather(
        session.execute(select(User).where(User.id == user_id)),
        session.execute(select(UserStats).where(UserStats.user_id == user_id)),
        session.execute(
            select(Activity)
            .where(Activity.user_id == user_id)
            .order_by(Activity.created_at.desc())
            .limit(10)
        ),
    )
    
    return {
        "user": user_result.scalar_one(),
        "stats": stats_result.scalar_one_or_none(),
        "recent": recent_result.scalars().all(),
    }

# ❌ 错误:当并行安全时顺序执行
async def get_dashboard_data_slow(user_id: UUID, session: AsyncSession):
    user = await session.execute(...)      # 等待...
    stats = await session.execute(...)     # 等待更多...
    recent = await session.execute(...)    # 更多等待
    # 总时间 = 所有查询的总和

# ✅ 正确:当查询依赖时顺序执行
async def get_user_with_team(user_id: UUID, session: AsyncSession):
    # 必须先获取用户才能知道team_id
    user_result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = user_result.scalar_one()
    
    # 现在我们可以查询团队
    team_result = await session.execute(
        select(Team).where(Team.id == user.team_id)
    )
    return user, team_result.scalar_one()

决策框架:

查询共享数据? 使用
否(独立) asyncio.gather()
是(依赖) 顺序 await

模式:事务边界

**问题:**知道何时提交、回滚和刷新。

# ✅ 正确:多步骤操作的显式事务
async def transfer_player(
    player_id: UUID,
    from_team_id: UUID,
    to_team_id: UUID,
    session: AsyncSession,
):
    try:
        # 所有操作在一个事务中
        player = await session.get(Player, player_id)
        player.team_id = to_team_id
        
        from_team = await session.get(Team, from_team_id)
        from_team.player_count -= 1
        
        to_team = await session.get(Team, to_team_id)
        to_team.player_count += 1
        
        await session.commit()
    except Exception:
        await session.rollback()
        raise

# ✅ 正确:使用上下文管理器
async with session.begin():
    # 这里的所有操作都在一个事务中
    # 成功时自动提交,异常时自动回滚
    player.team_id = to_team_id
    from_team.player_count -= 1
    to_team.player_count += 1

# ✅ 正确:提交后刷新以获取数据库生成的值
await session.commit()
await session.refresh(new_entity)  # 获取id, created_at等
return new_entity

何时使用什么:

场景 模式
单个创建/更新 session.add() + 请求结束时 commit()
多步骤操作 显式 begin() / commit() / rollback()
需要数据库生成的值 提交后 refresh()
只读查询 不需要提交

模式:连接池管理

**问题:**耗尽连接池会导致请求挂起。

# 此代码库使用NullPool进行异步 - 了解为什么
engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,  # 无连接池
)

# NullPool:每个请求获得新连接,结束后关闭
# 为什么:避免异步pg + 连接重用的问题
# 权衡:稍微增加连接开销

# ✅ 正确:始终关闭会话(由Depends处理)
async with async_session() as session:
    # 使用会话
    pass  # 这里关闭会话

# ❌ 错误:忘记关闭
session = async_session()
result = await session.execute(query)
# 会话从未关闭 - 连接泄漏!

模式:后台任务

**问题:**长时间运行的工作不应该阻塞响应。

from fastapi import BackgroundTasks

# ✅ 正确:FastAPI BackgroundTasks用于请求范围的工作
@router.post("/assessments/{id}/submit")
async def submit_assessment(
    id: UUID,
    session: AsyncSession = Depends(get_session),
    background_tasks: BackgroundTasks,
) -> AssessmentResult:
    # 快速工作 - 返回响应
    result = await process_submission(id, session)
    
    # 慢速工作 - 在响应后执行
    background_tasks.add_task(send_completion_email, result.user_email)
    background_tasks.add_task(update_analytics, result)
    
    return result

# ✅ 正确:asyncio.create_task用于火并忘记
async def process_with_side_effect():
    result = await main_operation()
    
    # 火并忘记 - 不等待
    asyncio.create_task(log_to_external_service(result))
    
    return result

# ❌ 错误:等待非关键的慢速操作
async def slow_endpoint():
    result = await main_operation()
    await send_email(result)           # 用户等待邮件...
    await update_analytics(result)     # 用户仍在等待...
    return result

何时使用什么:

场景 模式
响应后清理 BackgroundTasks
火并忘记日志记录 asyncio.create_task()
必须在响应前完成 直接 await

模式:避免死锁

**问题:**并发操作以不同顺序获取锁。

# ❌ 错误:潜在死锁
async def transfer_both_ways():
    # 任务1:锁定A,然后B
    # 任务2:锁定B,然后A
    # = 如果交错,则死锁
    pass

# ✅ 正确:一致的锁定顺序
async def transfer_credits(
    from_id: UUID,
    to_id: UUID,
    amount: int,
    session: AsyncSession,
):
    # 始终以一致的顺序锁定(例如,按UUID)
    first_id, second_id = sorted([from_id, to_id])
    
    # 以一致的顺序锁定
    first = await session.get(Account, first_id, with_for_update=True)
    second = await session.get(Account, second_id, with_for_update=True)
    
    # 现在可以安全修改
    if from_id == first_id:
        first.balance -= amount
        second.balance += amount
    else:
        second.balance -= amount
        first.balance += amount
    
    await session.commit()

模式:后置条件验证

与前端相同的原则 - 验证异步操作是否成功:

# ✅ 正确:异步操作后验证
async def create_assessment(data: AssessmentCreate, session: AsyncSession):
    assessment = Assessment(**data.model_dump())
    session.add(assessment)
    await session.commit()
    await session.refresh(assessment)
    
    # 验证后置条件
    if assessment.id is None:
        raise RuntimeError("Assessment creation failed - no ID assigned")
    
    return assessment

# ✅ 正确:验证数据是否实际加载
async def get_user_or_fail(user_id: UUID, session: AsyncSession) -> User:
    result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()
    
    if user is None:
        raise HTTPException(404, f"User {user_id} not found")
    
    return user

模式:异步操作日志记录

import structlog

logger = structlog.get_logger()

async def complex_operation(user_id: UUID, session: AsyncSession):
    logger.info("complex_operation.start", user_id=str(user_id))
    
    try:
        result = await step_one(session)
        logger.debug("complex_operation.step_one_complete", result_count=len(result))
        
        await step_two(result, session)
        logger.debug("complex_operation.step_two_complete")
        
        await session.commit()
        logger.info("complex_operation.success", user_id=str(user_id))
        
    except Exception as e:
        logger.error("complex_operation.failed", 
            user_id=str(user_id), 
            error=str(e),
            step="unknown"
        )
        raise

常见问题

问题 可能的原因 解决方案
“会话已关闭” 使用会话在请求结束后 保持会话在请求范围内
连接超时 池耗尽 检查会话泄漏
数据过时 共享会话或缺少刷新 将会话限定在请求范围内,提交后刷新
死锁 不一致的锁定顺序 始终以相同的顺序获取锁
慢速端点 可以并行的顺序查询 使用 asyncio.gather()

检测命令

# 查找潜在的会话泄漏(全局会话)
grep -rn "async_session()" --include="*.py" | grep -v "async with\|Depends"

# 查找可能并行化的顺序查询
grep -rn "await session.execute" --include="*.py" -A2 | grep -B1 "await session.execute"

# 查找缺少的await
ruff check --select=RUF006  # asyncio悬挂任务