name: python-async-patterns user-invocable: false description: 掌握Python异步编程,使用asyncio、async/await和concurrent.futures。用于异步代码和并发模式。 allowed-tools:
- Bash
- Read
Python 异步编程模式
掌握Python中的异步编程,使用asyncio、async/await语法,以及用于I/O-bound和CPU-bound任务的并发执行模式。
基本 Async/Await
核心异步语法:
import asyncio
# 使用async def定义异步函数
async def fetch_data(url: str) -> str:
print(f"Fetching {url}...")
await asyncio.sleep(1) # 模拟I/O操作
return f"Data from {url}"
# 调用异步函数
async def main() -> None:
result = await fetch_data("https://api.example.com")
print(result)
# 运行异步函数
asyncio.run(main())
多个并发操作:
import asyncio
async def fetch_url(url: str) -> str:
await asyncio.sleep(1)
return f"Content from {url}"
async def main() -> None:
# 使用gather并发运行
results = await asyncio.gather(
fetch_url("https://example.com/1"),
fetch_url("https://example.com/2"),
fetch_url("https://example.com/3")
)
for result in results:
print(result)
asyncio.run(main())
asyncio.create_task
创建和管理任务:
import asyncio
async def process_item(item: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"Processed {item}"
async def main() -> None:
# 创建任务以并发执行
task1 = asyncio.create_task(process_item("A", 2.0))
task2 = asyncio.create_task(process_item("B", 1.0))
task3 = asyncio.create_task(process_item("C", 1.5))
# 在任务运行时做其他工作
print("Tasks started")
# 等待任务完成
result1 = await task1
result2 = await task2
result3 = await task3
print(result1, result2, result3)
asyncio.run(main())
带名称和上下文的任务:
import asyncio
async def background_task(name: str) -> None:
print(f"Task {name} starting")
await asyncio.sleep(2)
print(f"Task {name} completed")
async def main() -> None:
# 创建命名任务
task = asyncio.create_task(
background_task("worker"),
name="background-worker"
)
# 检查任务状态
print(f"Task name: {task.get_name()}")
print(f"Task done: {task.done()}")
await task
asyncio.run(main())
asyncio.gather 与 asyncio.wait
使用gather获取结果:
import asyncio
async def fetch(n: int) -> int:
await asyncio.sleep(1)
return n * 2
async def main() -> None:
# gather按顺序返回结果
results = await asyncio.gather(
fetch(1),
fetch(2),
fetch(3)
)
print(results) # [2, 4, 6]
# 返回异常而非抛出
results = await asyncio.gather(
fetch(1),
fetch(2),
return_exceptions=True
)
asyncio.run(main())
使用wait进行更细粒度控制:
import asyncio
async def worker(n: int) -> int:
await asyncio.sleep(n)
return n
async def main() -> None:
tasks = [
asyncio.create_task(worker(1)),
asyncio.create_task(worker(2)),
asyncio.create_task(worker(3))
]
# 等待第一个任务完成
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"Done: {len(done)}, Pending: {len(pending)}")
# 取消待处理任务
for task in pending:
task.cancel()
# 使用超时等待所有
done, pending = await asyncio.wait(
tasks,
timeout=2.0
)
asyncio.run(main())
异步上下文管理器
创建异步上下文管理器:
import asyncio
from typing import AsyncIterator
class AsyncResource:
async def __aenter__(self) -> "AsyncResource":
print("Acquiring resource")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
print("Releasing resource")
await asyncio.sleep(0.1)
async def query(self) -> str:
return "data"
async def main() -> None:
async with AsyncResource() as resource:
result = await resource.query()
print(result)
asyncio.run(main())
使用asynccontextmanager装饰器:
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def get_connection(url: str) -> AsyncIterator[str]:
# 设置
print(f"Connecting to {url}")
await asyncio.sleep(0.1)
conn = f"connection-{url}"
try:
yield conn
finally:
# 清理
print(f"Closing connection to {url}")
await asyncio.sleep(0.1)
async def main() -> None:
async with get_connection("localhost") as conn:
print(f"Using {conn}")
asyncio.run(main())
异步迭代器
创建异步迭代器:
import asyncio
from typing import AsyncIterator
class AsyncRange:
def __init__(self, count: int) -> None:
self.count = count
def __aiter__(self) -> AsyncIterator[int]:
return self
async def __anext__(self) -> int:
if self.count <= 0:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.count -= 1
return self.count
async def main() -> None:
async for i in AsyncRange(5):
print(i)
asyncio.run(main())
异步生成器函数:
import asyncio
from typing import AsyncIterator
async def async_range(count: int) -> AsyncIterator[int]:
for i in range(count):
await asyncio.sleep(0.1)
yield i
async def fetch_pages(urls: list[str]) -> AsyncIterator[str]:
for url in urls:
await asyncio.sleep(0.5)
yield f"Page content from {url}"
async def main() -> None:
async for num in async_range(5):
print(num)
urls = ["url1", "url2", "url3"]
async for page in fetch_pages(urls):
print(page)
asyncio.run(main())
异步队列
使用Queue的生产者-消费者模式:
import asyncio
from asyncio import Queue
async def producer(queue: Queue[int], n: int) -> None:
for i in range(n):
await asyncio.sleep(0.1)
await queue.put(i)
print(f"Produced {i}")
await queue.put(None) # 哨兵值
async def consumer(queue: Queue[int], name: str) -> None:
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(0.2)
print(f"Consumer {name} processed {item}")
queue.task_done()
async def main() -> None:
queue: Queue[int] = Queue(maxsize=5)
# 启动生产者和消费者
prod = asyncio.create_task(producer(queue, 10))
cons1 = asyncio.create_task(consumer(queue, "A"))
cons2 = asyncio.create_task(consumer(queue, "B"))
await prod
await queue.join() # 等待所有任务处理完成
# 向消费者发送退出信号
await queue.put(None)
await queue.put(None)
await cons1
await cons2
asyncio.run(main())
信号量和锁
限制并发操作:
import asyncio
async def fetch_with_limit(
url: str,
semaphore: asyncio.Semaphore
) -> str:
async with semaphore:
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Data from {url}"
async def main() -> None:
# 限制为3个并发操作
semaphore = asyncio.Semaphore(3)
urls = [f"https://example.com/{i}" for i in range(10)]
tasks = [
fetch_with_limit(url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
使用锁进行互斥:
import asyncio
class Counter:
def __init__(self) -> None:
self.value = 0
self.lock = asyncio.Lock()
async def increment(self) -> None:
async with self.lock:
# 临界区
current = self.value
await asyncio.sleep(0.01)
self.value = current + 1
async def main() -> None:
counter = Counter()
# 并发运行递增
await asyncio.gather(*[
counter.increment()
for _ in range(100)
])
print(f"Final value: {counter.value}") # 应为100
asyncio.run(main())
超时和取消
使用超时:
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(5)
return "completed"
async def main() -> None:
# 2秒后超时
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=2.0
)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(main())
处理取消:
import asyncio
async def cancellable_task() -> None:
try:
while True:
print("Working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task was cancelled")
# 清理
raise
async def main() -> None:
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task cancellation confirmed")
asyncio.run(main())
事件循环管理
直接事件循环控制:
import asyncio
async def task1() -> None:
print("Task 1")
async def task2() -> None:
print("Task 2")
# 创建新事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# 调度回调
loop.call_soon(lambda: print("Callback"))
# 调度延迟回调
loop.call_later(1.0, lambda: print("Delayed"))
# 运行协程
loop.run_until_complete(task1())
# 运行多个任务
loop.run_until_complete(
asyncio.gather(task1(), task2())
)
finally:
loop.close()
concurrent.futures
ThreadPoolExecutor用于I/O-bound任务:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
def blocking_io_task(n: int) -> int:
print(f"Task {n} starting")
time.sleep(2) # 阻塞I/O
return n * 2
async def main() -> None:
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=3) as executor:
# 在线程池中运行阻塞函数
tasks = [
loop.run_in_executor(executor, blocking_io_task, i)
for i in range(5)
]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
ProcessPoolExecutor用于CPU-bound任务:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_intensive_task(n: int) -> int:
# CPU密集型计算
result = sum(i * i for i in range(n))
return result
async def main() -> None:
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=4) as executor:
# 在进程池中运行CPU-bound函数
tasks = [
loop.run_in_executor(
executor,
cpu_intensive_task,
10_000_000
)
for _ in range(4)
]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} tasks")
asyncio.run(main())
使用aiohttp的异步HTTP
发起异步HTTP请求:
import asyncio
import aiohttp
async def fetch_url(
session: aiohttp.ClientSession,
url: str
) -> str:
async with session.get(url) as response:
return await response.text()
async def main() -> None:
async with aiohttp.ClientSession() as session:
urls = [
"https://example.com/1",
"https://example.com/2",
"https://example.com/3"
]
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} pages")
asyncio.run(main())
错误处理
在异步代码中处理异常:
import asyncio
async def failing_task() -> None:
await asyncio.sleep(1)
raise ValueError("Task failed")
async def main() -> None:
# 处理单个任务中的异常
try:
await failing_task()
except ValueError as e:
print(f"Caught: {e}")
# 使用gather处理异常
results = await asyncio.gather(
failing_task(),
failing_task(),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"Task failed: {result}")
asyncio.run(main())
何时使用此技能
使用python-async-patterns当您需要:
- 并发处理多个I/O操作(API调用、数据库查询)
- 构建异步Web服务器或客户端
- 异步处理数据流
- 使用异步队列实现生产者-消费者模式
- 运行阻塞I/O操作而不阻塞事件循环
- 创建异步上下文管理器进行资源管理
- 实现异步迭代器进行流数据
- 使用信号量和锁控制并发
- 处理异步操作中的超时和取消
- 高效混合CPU-bound和I/O-bound操作
最佳实践
- 使用asyncio.run()作为主入口点
- 使用asyncio.create_task()创建任务以实现并发执行
- 需要所有结果时使用gather()
- 需要细粒度控制时使用wait()
- 总是在长时间运行的任务中处理CancelledError
- 使用信号量限制并发操作
- 优先使用异步上下文管理器进行资源管理
- 使用asyncio.Queue实现生产者-消费者模式
- 使用run_in_executor()在线程池中运行阻塞I/O
- 在进程池中运行CPU-bound任务
- 为网络操作设置适当超时
- 使用结构化并发模式(nurseries)
常见陷阱
- 忘记等待协程(创建协程对象但不运行)
- 使用CPU密集型工作阻塞事件循环
- 未正确处理任务取消
- 使用time.sleep()而不是asyncio.sleep()
- 创建过多并发任务而无限制
- 在异步上下文中未正确关闭资源
- 错误地混合阻塞和异步代码
- 未处理后台任务中的异常
- 使用Queue时忘记调用task_done()
- 使用全局事件循环而不是asyncio.run()