Python Async Patterns
问题陈述
Async Python功能强大但容易出错。在异步代码库中,常见的陷阱包括竞态条件、会话泄漏和连接池问题。
模式:AsyncSession 生命周期
**问题:**会话必须限定在请求范围内。会话泄漏会导致数据过时和连接耗尽。
# ✅ 正确:通过依赖将会话限定在请求范围内
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
yield session
# 请求结束后会话自动关闭
# 在端点中的使用
@router.get("/users/{user_id}")
async def get_user(
user_id: UUID,
session: AsyncSession = Depends(get_session),
) -> UserRead:
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one()
# ❌ 错误:全局会话(数据过时,连接泄漏)
_global_session = None # 切勿这样做
async def get_user(user_id: UUID):
result = await _global_session.execute(...) # 过时的,共享状态
**为什么重要:**每个请求都需要隔离的数据库状态。共享会话会看到过时的数据,并且不能安全地提交。
模式:并发与顺序查询
**问题:**独立查询顺序执行浪费时间。但依赖的查询必须是顺序的。
# ✅ 正确:并发独立查询
async def get_dashboard_data(user_id: UUID, session: AsyncSession):
# 这些互不依赖 - 并行运行
user_result, stats_result, recent_result = await asyncio.gather(
session.execute(select(User).where(User.id == user_id)),
session.execute(select(UserStats).where(UserStats.user_id == user_id)),
session.execute(
select(Activity)
.where(Activity.user_id == user_id)
.order_by(Activity.created_at.desc())
.limit(10)
),
)
return {
"user": user_result.scalar_one(),
"stats": stats_result.scalar_one_or_none(),
"recent": recent_result.scalars().all(),
}
# ❌ 错误:当并行安全时顺序执行
async def get_dashboard_data_slow(user_id: UUID, session: AsyncSession):
user = await session.execute(...) # 等待...
stats = await session.execute(...) # 等待更多...
recent = await session.execute(...) # 更多等待
# 总时间 = 所有查询的总和
# ✅ 正确:当查询依赖时顺序执行
async def get_user_with_team(user_id: UUID, session: AsyncSession):
# 必须先获取用户才能知道team_id
user_result = await session.execute(
select(User).where(User.id == user_id)
)
user = user_result.scalar_one()
# 现在我们可以查询团队
team_result = await session.execute(
select(Team).where(Team.id == user.team_id)
)
return user, team_result.scalar_one()
决策框架:
| 查询共享数据? | 使用 |
|---|---|
| 否(独立) | asyncio.gather() |
| 是(依赖) | 顺序 await |
模式:事务边界
**问题:**知道何时提交、回滚和刷新。
# ✅ 正确:多步骤操作的显式事务
async def transfer_player(
player_id: UUID,
from_team_id: UUID,
to_team_id: UUID,
session: AsyncSession,
):
try:
# 所有操作在一个事务中
player = await session.get(Player, player_id)
player.team_id = to_team_id
from_team = await session.get(Team, from_team_id)
from_team.player_count -= 1
to_team = await session.get(Team, to_team_id)
to_team.player_count += 1
await session.commit()
except Exception:
await session.rollback()
raise
# ✅ 正确:使用上下文管理器
async with session.begin():
# 这里的所有操作都在一个事务中
# 成功时自动提交,异常时自动回滚
player.team_id = to_team_id
from_team.player_count -= 1
to_team.player_count += 1
# ✅ 正确:提交后刷新以获取数据库生成的值
await session.commit()
await session.refresh(new_entity) # 获取id, created_at等
return new_entity
何时使用什么:
| 场景 | 模式 |
|---|---|
| 单个创建/更新 | session.add() + 请求结束时 commit() |
| 多步骤操作 | 显式 begin() / commit() / rollback() |
| 需要数据库生成的值 | 提交后 refresh() |
| 只读查询 | 不需要提交 |
模式:连接池管理
**问题:**耗尽连接池会导致请求挂起。
# 此代码库使用NullPool进行异步 - 了解为什么
engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool, # 无连接池
)
# NullPool:每个请求获得新连接,结束后关闭
# 为什么:避免异步pg + 连接重用的问题
# 权衡:稍微增加连接开销
# ✅ 正确:始终关闭会话(由Depends处理)
async with async_session() as session:
# 使用会话
pass # 这里关闭会话
# ❌ 错误:忘记关闭
session = async_session()
result = await session.execute(query)
# 会话从未关闭 - 连接泄漏!
模式:后台任务
**问题:**长时间运行的工作不应该阻塞响应。
from fastapi import BackgroundTasks
# ✅ 正确:FastAPI BackgroundTasks用于请求范围的工作
@router.post("/assessments/{id}/submit")
async def submit_assessment(
id: UUID,
session: AsyncSession = Depends(get_session),
background_tasks: BackgroundTasks,
) -> AssessmentResult:
# 快速工作 - 返回响应
result = await process_submission(id, session)
# 慢速工作 - 在响应后执行
background_tasks.add_task(send_completion_email, result.user_email)
background_tasks.add_task(update_analytics, result)
return result
# ✅ 正确:asyncio.create_task用于火并忘记
async def process_with_side_effect():
result = await main_operation()
# 火并忘记 - 不等待
asyncio.create_task(log_to_external_service(result))
return result
# ❌ 错误:等待非关键的慢速操作
async def slow_endpoint():
result = await main_operation()
await send_email(result) # 用户等待邮件...
await update_analytics(result) # 用户仍在等待...
return result
何时使用什么:
| 场景 | 模式 |
|---|---|
| 响应后清理 | BackgroundTasks |
| 火并忘记日志记录 | asyncio.create_task() |
| 必须在响应前完成 | 直接 await |
模式:避免死锁
**问题:**并发操作以不同顺序获取锁。
# ❌ 错误:潜在死锁
async def transfer_both_ways():
# 任务1:锁定A,然后B
# 任务2:锁定B,然后A
# = 如果交错,则死锁
pass
# ✅ 正确:一致的锁定顺序
async def transfer_credits(
from_id: UUID,
to_id: UUID,
amount: int,
session: AsyncSession,
):
# 始终以一致的顺序锁定(例如,按UUID)
first_id, second_id = sorted([from_id, to_id])
# 以一致的顺序锁定
first = await session.get(Account, first_id, with_for_update=True)
second = await session.get(Account, second_id, with_for_update=True)
# 现在可以安全修改
if from_id == first_id:
first.balance -= amount
second.balance += amount
else:
second.balance -= amount
first.balance += amount
await session.commit()
模式:后置条件验证
与前端相同的原则 - 验证异步操作是否成功:
# ✅ 正确:异步操作后验证
async def create_assessment(data: AssessmentCreate, session: AsyncSession):
assessment = Assessment(**data.model_dump())
session.add(assessment)
await session.commit()
await session.refresh(assessment)
# 验证后置条件
if assessment.id is None:
raise RuntimeError("Assessment creation failed - no ID assigned")
return assessment
# ✅ 正确:验证数据是否实际加载
async def get_user_or_fail(user_id: UUID, session: AsyncSession) -> User:
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(404, f"User {user_id} not found")
return user
模式:异步操作日志记录
import structlog
logger = structlog.get_logger()
async def complex_operation(user_id: UUID, session: AsyncSession):
logger.info("complex_operation.start", user_id=str(user_id))
try:
result = await step_one(session)
logger.debug("complex_operation.step_one_complete", result_count=len(result))
await step_two(result, session)
logger.debug("complex_operation.step_two_complete")
await session.commit()
logger.info("complex_operation.success", user_id=str(user_id))
except Exception as e:
logger.error("complex_operation.failed",
user_id=str(user_id),
error=str(e),
step="unknown"
)
raise
常见问题
| 问题 | 可能的原因 | 解决方案 |
|---|---|---|
| “会话已关闭” | 使用会话在请求结束后 | 保持会话在请求范围内 |
| 连接超时 | 池耗尽 | 检查会话泄漏 |
| 数据过时 | 共享会话或缺少刷新 | 将会话限定在请求范围内,提交后刷新 |
| 死锁 | 不一致的锁定顺序 | 始终以相同的顺序获取锁 |
| 慢速端点 | 可以并行的顺序查询 | 使用 asyncio.gather() |
检测命令
# 查找潜在的会话泄漏(全局会话)
grep -rn "async_session()" --include="*.py" | grep -v "async with\|Depends"
# 查找可能并行化的顺序查询
grep -rn "await session.execute" --include="*.py" -A2 | grep -B1 "await session.execute"
# 查找缺少的await
ruff check --select=RUF006 # asyncio悬挂任务