name: python-async-patterns description: “Python asyncio 并发编程模式。触发关键词:asyncio, async, await, coroutine, gather, semaphore, TaskGroup, event loop, aiohttp, concurrent.” compatibility: “推荐 Python 3.10+。部分模式需要 3.11+ (TaskGroup, timeout)。” allowed-tools: “Read Write” depends-on: [python-typing-patterns] related-skills: [python-fastapi-patterns, python-observability-patterns]
Python 异步模式
用于并发 Python 编程的 Asyncio 模式。
核心概念
import asyncio
# 协程 (必须被 await)
async def fetch(url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# 入口点
async def main():
result = await fetch("https://example.com")
return result
asyncio.run(main())
模式 1: 使用 gather 并发
async def fetch_all(urls: list[str]) -> list[str]:
"""并发获取多个 URL。"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
模式 2: 有界并发
async def fetch_with_limit(urls: list[str], limit: int = 10):
"""限制并发请求数。"""
semaphore = asyncio.Semaphore(limit)
async def bounded_fetch(url):
async with semaphore:
return await fetch_one(url)
return await asyncio.gather(*[bounded_fetch(url) for url in urls])
模式 3: TaskGroup (Python 3.11+)
async def process_items(items):
"""具有自动清理功能的结构化并发。"""
async with asyncio.TaskGroup() as tg:
for item in items:
tg.create_task(process_one(item))
# 所有任务在此处完成,或抛出异常
模式 4: 超时
async def with_timeout():
try:
async with asyncio.timeout(5.0): # Python 3.11+
result = await slow_operation()
except asyncio.TimeoutError:
result = None
return result
关键警告
# 错误 - 阻塞事件循环
async def bad():
time.sleep(5) # 切勿使用 time.sleep!
requests.get(url) # 阻塞 I/O!
# 正确
async def good():
await asyncio.sleep(5)
async with aiohttp.ClientSession() as s:
await s.get(url)
# 错误 - 孤立任务
async def bad():
asyncio.create_task(work()) # 可能被垃圾回收!
# 正确 - 保持引用
async def good():
task = asyncio.create_task(work())
await task
快速参考
| 模式 | 使用场景 |
|---|---|
gather(*tasks) |
多个独立操作 |
Semaphore(n) |
速率限制,资源约束 |
TaskGroup() |
结构化并发 (3.11+) |
Queue() |
生产者-消费者 |
timeout(s) |
超时包装器 (3.11+) |
Lock() |
共享可变状态 |
异步上下文管理器
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_connection():
conn = await create_connection()
try:
yield conn
finally:
await conn.close()
额外资源
如需详细模式,请加载:
./references/concurrency-patterns.md- 队列、锁、生产者-消费者./references/aiohttp-patterns.md- HTTP 客户端/服务器模式./references/mixing-sync-async.md- run_in_executor,线程池./references/debugging-async.md- 调试模式、性能分析、问题排查./references/production-patterns.md- 优雅关闭、健康检查、信号处理./references/error-handling.md- 带退避的重试、熔断器、部分失败处理./references/performance.md- uvloop、连接池、缓冲区大小调整
脚本
./scripts/find-blocking-calls.sh- 扫描异步函数中的阻塞调用
资源文件
./assets/async-project-template.py- 生产就绪的异步应用骨架
另请参阅
先决条件:
python-typing-patterns- 异步函数的类型提示
相关技能:
python-fastapi-patterns- 异步 Web APIpython-observability-patterns- 异步日志记录和追踪python-database-patterns- 异步数据库访问