name: 异步-python-模式 description: 掌握Python asyncio、并发编程和async/await模式,用于高性能应用程序。在构建异步API、并发系统或需要非阻塞操作的I/O绑定应用程序时使用。
异步Python模式
使用asyncio、并发编程模式和async/await实现异步Python应用程序的综合指南,用于构建高性能、非阻塞系统。
何时使用此技能
- 构建异步Web API(FastAPI、aiohttp、Sanic)
- 实现并发I/O操作(数据库、文件、网络)
- 创建具有并发请求的网络爬虫
- 开发实时应用程序(WebSocket服务器、聊天系统)
- 同时处理多个独立任务
- 构建具有异步通信的微服务
- 优化I/O绑定工作负载
- 实现异步后台任务和队列
快速开始
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7+
asyncio.run(main())
基础Async/Await
async def fetch_data(url: str) -> dict:
"""从URL异步获取数据。"""
await asyncio.sleep(1) # 模拟I/O
return {"url": url, "data": "result"}
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())
并发执行
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
"""并发获取多个用户。"""
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
核心概念
1. 事件循环
事件循环是asyncio的核心,管理和调度异步任务。
关键特性:
- 单线程协作多任务
- 调度协程执行
- 处理I/O操作而不阻塞
- 管理回调和future
2. 协程
使用async def定义的函数,可以暂停和恢复。
async def my_coroutine():
result = await some_async_operation()
return result
3. 任务
在事件循环上并发运行的调度协程。
task = asyncio.create_task(background_task())
result = await task
4. 异步上下文管理器
支持async with以进行适当清理的资源。
async with AsyncDatabaseConnection("postgresql://localhost") as conn:
result = await conn.query("SELECT * FROM users")
5. 异步迭代器
支持async for以迭代异步数据源的对象。
async for item in async_range(1, 10):
process(item)
参见详细解释: 核心概念
基本模式
模式1: 任务创建和管理
有效创建和管理并发任务。
task1 = asyncio.create_task(background_task("Task 1", 2))
task2 = asyncio.create_task(background_task("Task 2", 1))
result1 = await task1
result2 = await task2
参见详细模式: 基本模式
模式2: 错误处理
通过适当的异常处理来处理异步代码中的错误。
async def safe_operation(item_id: int):
try:
return await risky_operation(item_id)
except ValueError as e:
print(f"Error: {e}")
return None
results = await asyncio.gather(*tasks, return_exceptions=True)
参见详细模式: 错误处理
模式3: 超时处理
执行具有超时的操作以防止挂起。
try:
result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
except asyncio.TimeoutError:
print("Operation timed out")
参见详细模式: 超时和取消
高级模式
异步上下文管理器
实现异步上下文管理器以进行资源管理。
class AsyncDatabaseConnection:
async def __aenter__(self):
# 设置逻辑
return connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 清理逻辑
pass
参见详细实现: 高级模式
生产者-消费者模式
实现异步队列以进行生产者-消费者工作流。
queue = asyncio.Queue(maxsize=10)
async def producer(queue):
for item in items:
await queue.put(item)
async def consumer(queue):
while True:
item = await queue.get()
process(item)
queue.task_done()
参见详细模式: 并发模式
使用信号量进行速率限制
使用信号量控制并发访问。
semaphore = asyncio.Semaphore(5)
async def api_call(url, semaphore):
async with semaphore:
return await fetch(url)
参见详细实现: 同步
实际应用
使用aiohttp进行网络爬取
async def scrape_urls(urls: List[str]):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
参见详细示例: 实际应用
异步数据库操作
async def get_user_data(db, user_id):
user, orders, profile = await asyncio.gather(
db.fetch_user(user_id),
db.fetch_orders(user_id),
db.fetch_profile(user_id)
)
return {"user": user, "orders": orders, "profile": profile}
参见详细模式: 实际应用
性能最佳实践
- 使用连接池: 高效重用连接
- 批量操作: 批量处理项目
- 避免阻塞: 在executor中运行CPU绑定工作
- 使用信号量: 限制并发操作
- 处理取消: 适当清理资源
参见详细指南: 性能最佳实践
常见陷阱
- 忘记await: 返回协程对象,不执行
- 阻塞事件循环: 使用
time.sleep()而不是asyncio.sleep() - 不处理取消: 任务取消而无清理
- 混合同步和异步: 从同步代码调用异步不正确
参见详细解决方案: 常见陷阱
测试异步代码
import pytest
@pytest.mark.asyncio
async def test_async_function():
result = await fetch_data("https://api.example.com")
assert result is not None
@pytest.mark.asyncio
async def test_with_timeout():
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(5), timeout=1.0)
参见详细测试指南: 测试
最佳实践总结
- 使用
asyncio.run()作为入口点(Python 3.7+) - 始终await协程以执行它们
- 使用
gather()进行并发执行 - 实现适当的错误处理
- 使用超时以防止挂起操作
- 池化连接以获得更好性能
- 避免在异步代码中使用阻塞操作
- 使用信号量进行速率限制
- 正确处理任务取消
- 使用pytest-asyncio测试异步代码
资源
- Python asyncio文档: https://docs.python.org/3/library/asyncio.html
- aiohttp: 异步HTTP客户端/服务器
- FastAPI: 现代异步Web框架
- asyncpg: 异步PostgreSQL驱动
- motor: 异步MongoDB驱动