Python异步编程模式Skill python-async-patterns

这个技能用于掌握Python异步编程,包括asyncio、async/await、并发执行模式,适用于处理I/O-bound和CPU-bound任务,提升程序并发性能。关键词:Python异步编程, asyncio, async/await, 并发模式, I/O-bound任务, CPU-bound任务, 异步编程模式

后端开发 0 次安装 0 次浏览 更新于 3/25/2026

name: python-async-patterns user-invocable: false description: 掌握Python异步编程,使用asyncio、async/await和concurrent.futures。用于异步代码和并发模式。 allowed-tools:

  • Bash
  • Read

Python 异步编程模式

掌握Python中的异步编程,使用asyncio、async/await语法,以及用于I/O-bound和CPU-bound任务的并发执行模式。

基本 Async/Await

核心异步语法:

import asyncio

# 使用async def定义异步函数
async def fetch_data(url: str) -> str:
    print(f"Fetching {url}...")
    await asyncio.sleep(1)  # 模拟I/O操作
    return f"Data from {url}"

# 调用异步函数
async def main() -> None:
    result = await fetch_data("https://api.example.com")
    print(result)

# 运行异步函数
asyncio.run(main())

多个并发操作:

import asyncio

async def fetch_url(url: str) -> str:
    await asyncio.sleep(1)
    return f"Content from {url}"

async def main() -> None:
    # 使用gather并发运行
    results = await asyncio.gather(
        fetch_url("https://example.com/1"),
        fetch_url("https://example.com/2"),
        fetch_url("https://example.com/3")
    )
    for result in results:
        print(result)

asyncio.run(main())

asyncio.create_task

创建和管理任务:

import asyncio

async def process_item(item: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Processed {item}"

async def main() -> None:
    # 创建任务以并发执行
    task1 = asyncio.create_task(process_item("A", 2.0))
    task2 = asyncio.create_task(process_item("B", 1.0))
    task3 = asyncio.create_task(process_item("C", 1.5))

    # 在任务运行时做其他工作
    print("Tasks started")

    # 等待任务完成
    result1 = await task1
    result2 = await task2
    result3 = await task3

    print(result1, result2, result3)

asyncio.run(main())

带名称和上下文的任务:

import asyncio

async def background_task(name: str) -> None:
    print(f"Task {name} starting")
    await asyncio.sleep(2)
    print(f"Task {name} completed")

async def main() -> None:
    # 创建命名任务
    task = asyncio.create_task(
        background_task("worker"),
        name="background-worker"
    )

    # 检查任务状态
    print(f"Task name: {task.get_name()}")
    print(f"Task done: {task.done()}")

    await task

asyncio.run(main())

asyncio.gather 与 asyncio.wait

使用gather获取结果:

import asyncio

async def fetch(n: int) -> int:
    await asyncio.sleep(1)
    return n * 2

async def main() -> None:
    # gather按顺序返回结果
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        fetch(3)
    )
    print(results)  # [2, 4, 6]

    # 返回异常而非抛出
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        return_exceptions=True
    )

asyncio.run(main())

使用wait进行更细粒度控制:

import asyncio

async def worker(n: int) -> int:
    await asyncio.sleep(n)
    return n

async def main() -> None:
    tasks = [
        asyncio.create_task(worker(1)),
        asyncio.create_task(worker(2)),
        asyncio.create_task(worker(3))
    ]

    # 等待第一个任务完成
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    print(f"Done: {len(done)}, Pending: {len(pending)}")

    # 取消待处理任务
    for task in pending:
        task.cancel()

    # 使用超时等待所有
    done, pending = await asyncio.wait(
        tasks,
        timeout=2.0
    )

asyncio.run(main())

异步上下文管理器

创建异步上下文管理器:

import asyncio
from typing import AsyncIterator

class AsyncResource:
    async def __aenter__(self) -> "AsyncResource":
        print("Acquiring resource")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        print("Releasing resource")
        await asyncio.sleep(0.1)

    async def query(self) -> str:
        return "data"

async def main() -> None:
    async with AsyncResource() as resource:
        result = await resource.query()
        print(result)

asyncio.run(main())

使用asynccontextmanager装饰器:

from contextlib import asynccontextmanager
import asyncio

@asynccontextmanager
async def get_connection(url: str) -> AsyncIterator[str]:
    # 设置
    print(f"Connecting to {url}")
    await asyncio.sleep(0.1)
    conn = f"connection-{url}"

    try:
        yield conn
    finally:
        # 清理
        print(f"Closing connection to {url}")
        await asyncio.sleep(0.1)

async def main() -> None:
    async with get_connection("localhost") as conn:
        print(f"Using {conn}")

asyncio.run(main())

异步迭代器

创建异步迭代器:

import asyncio
from typing import AsyncIterator

class AsyncRange:
    def __init__(self, count: int) -> None:
        self.count = count

    def __aiter__(self) -> AsyncIterator[int]:
        return self

    async def __anext__(self) -> int:
        if self.count <= 0:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.count -= 1
        return self.count

async def main() -> None:
    async for i in AsyncRange(5):
        print(i)

asyncio.run(main())

异步生成器函数:

import asyncio
from typing import AsyncIterator

async def async_range(count: int) -> AsyncIterator[int]:
    for i in range(count):
        await asyncio.sleep(0.1)
        yield i

async def fetch_pages(urls: list[str]) -> AsyncIterator[str]:
    for url in urls:
        await asyncio.sleep(0.5)
        yield f"Page content from {url}"

async def main() -> None:
    async for num in async_range(5):
        print(num)

    urls = ["url1", "url2", "url3"]
    async for page in fetch_pages(urls):
        print(page)

asyncio.run(main())

异步队列

使用Queue的生产者-消费者模式:

import asyncio
from asyncio import Queue

async def producer(queue: Queue[int], n: int) -> None:
    for i in range(n):
        await asyncio.sleep(0.1)
        await queue.put(i)
        print(f"Produced {i}")
    await queue.put(None)  # 哨兵值

async def consumer(queue: Queue[int], name: str) -> None:
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        await asyncio.sleep(0.2)
        print(f"Consumer {name} processed {item}")
        queue.task_done()

async def main() -> None:
    queue: Queue[int] = Queue(maxsize=5)

    # 启动生产者和消费者
    prod = asyncio.create_task(producer(queue, 10))
    cons1 = asyncio.create_task(consumer(queue, "A"))
    cons2 = asyncio.create_task(consumer(queue, "B"))

    await prod
    await queue.join()  # 等待所有任务处理完成

    # 向消费者发送退出信号
    await queue.put(None)
    await queue.put(None)

    await cons1
    await cons2

asyncio.run(main())

信号量和锁

限制并发操作:

import asyncio

async def fetch_with_limit(
    url: str,
    semaphore: asyncio.Semaphore
) -> str:
    async with semaphore:
        print(f"Fetching {url}")
        await asyncio.sleep(1)
        return f"Data from {url}"

async def main() -> None:
    # 限制为3个并发操作
    semaphore = asyncio.Semaphore(3)

    urls = [f"https://example.com/{i}" for i in range(10)]

    tasks = [
        fetch_with_limit(url, semaphore)
        for url in urls
    ]

    results = await asyncio.gather(*tasks)
    print(f"Fetched {len(results)} URLs")

asyncio.run(main())

使用锁进行互斥:

import asyncio

class Counter:
    def __init__(self) -> None:
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self) -> None:
        async with self.lock:
            # 临界区
            current = self.value
            await asyncio.sleep(0.01)
            self.value = current + 1

async def main() -> None:
    counter = Counter()

    # 并发运行递增
    await asyncio.gather(*[
        counter.increment()
        for _ in range(100)
    ])

    print(f"Final value: {counter.value}")  # 应为100

asyncio.run(main())

超时和取消

使用超时:

import asyncio

async def slow_operation() -> str:
    await asyncio.sleep(5)
    return "completed"

async def main() -> None:
    # 2秒后超时
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=2.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(main())

处理取消:

import asyncio

async def cancellable_task() -> None:
    try:
        while True:
            print("Working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled")
        # 清理
        raise

async def main() -> None:
    task = asyncio.create_task(cancellable_task())

    await asyncio.sleep(3)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task cancellation confirmed")

asyncio.run(main())

事件循环管理

直接事件循环控制:

import asyncio

async def task1() -> None:
    print("Task 1")

async def task2() -> None:
    print("Task 2")

# 创建新事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
    # 调度回调
    loop.call_soon(lambda: print("Callback"))

    # 调度延迟回调
    loop.call_later(1.0, lambda: print("Delayed"))

    # 运行协程
    loop.run_until_complete(task1())

    # 运行多个任务
    loop.run_until_complete(
        asyncio.gather(task1(), task2())
    )
finally:
    loop.close()

concurrent.futures

ThreadPoolExecutor用于I/O-bound任务:

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def blocking_io_task(n: int) -> int:
    print(f"Task {n} starting")
    time.sleep(2)  # 阻塞I/O
    return n * 2

async def main() -> None:
    loop = asyncio.get_event_loop()

    with ThreadPoolExecutor(max_workers=3) as executor:
        # 在线程池中运行阻塞函数
        tasks = [
            loop.run_in_executor(executor, blocking_io_task, i)
            for i in range(5)
        ]

        results = await asyncio.gather(*tasks)
        print(results)

asyncio.run(main())

ProcessPoolExecutor用于CPU-bound任务:

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_intensive_task(n: int) -> int:
    # CPU密集型计算
    result = sum(i * i for i in range(n))
    return result

async def main() -> None:
    loop = asyncio.get_event_loop()

    with ProcessPoolExecutor(max_workers=4) as executor:
        # 在进程池中运行CPU-bound函数
        tasks = [
            loop.run_in_executor(
                executor,
                cpu_intensive_task,
                10_000_000
            )
            for _ in range(4)
        ]

        results = await asyncio.gather(*tasks)
        print(f"Completed {len(results)} tasks")

asyncio.run(main())

使用aiohttp的异步HTTP

发起异步HTTP请求:

import asyncio
import aiohttp

async def fetch_url(
    session: aiohttp.ClientSession,
    url: str
) -> str:
    async with session.get(url) as response:
        return await response.text()

async def main() -> None:
    async with aiohttp.ClientSession() as session:
        urls = [
            "https://example.com/1",
            "https://example.com/2",
            "https://example.com/3"
        ]

        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        print(f"Fetched {len(results)} pages")

asyncio.run(main())

错误处理

在异步代码中处理异常:

import asyncio

async def failing_task() -> None:
    await asyncio.sleep(1)
    raise ValueError("Task failed")

async def main() -> None:
    # 处理单个任务中的异常
    try:
        await failing_task()
    except ValueError as e:
        print(f"Caught: {e}")

    # 使用gather处理异常
    results = await asyncio.gather(
        failing_task(),
        failing_task(),
        return_exceptions=True
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"Task failed: {result}")

asyncio.run(main())

何时使用此技能

使用python-async-patterns当您需要:

  • 并发处理多个I/O操作(API调用、数据库查询)
  • 构建异步Web服务器或客户端
  • 异步处理数据流
  • 使用异步队列实现生产者-消费者模式
  • 运行阻塞I/O操作而不阻塞事件循环
  • 创建异步上下文管理器进行资源管理
  • 实现异步迭代器进行流数据
  • 使用信号量和锁控制并发
  • 处理异步操作中的超时和取消
  • 高效混合CPU-bound和I/O-bound操作

最佳实践

  • 使用asyncio.run()作为主入口点
  • 使用asyncio.create_task()创建任务以实现并发执行
  • 需要所有结果时使用gather()
  • 需要细粒度控制时使用wait()
  • 总是在长时间运行的任务中处理CancelledError
  • 使用信号量限制并发操作
  • 优先使用异步上下文管理器进行资源管理
  • 使用asyncio.Queue实现生产者-消费者模式
  • 使用run_in_executor()在线程池中运行阻塞I/O
  • 在进程池中运行CPU-bound任务
  • 为网络操作设置适当超时
  • 使用结构化并发模式(nurseries)

常见陷阱

  • 忘记等待协程(创建协程对象但不运行)
  • 使用CPU密集型工作阻塞事件循环
  • 未正确处理任务取消
  • 使用time.sleep()而不是asyncio.sleep()
  • 创建过多并发任务而无限制
  • 在异步上下文中未正确关闭资源
  • 错误地混合阻塞和异步代码
  • 未处理后台任务中的异常
  • 使用Queue时忘记调用task_done()
  • 使用全局事件循环而不是asyncio.run()

资源