Python异步编程模式精通Skill async-python-patterns

此技能专注于Python的异步编程技术,包括asyncio、并发模式和async/await,用于开发高性能、非阻塞的后端应用、Web API和I/O密集型系统。关键词:Python异步编程、asyncio、并发、高性能应用、非阻塞操作、异步API、并发系统、I/O绑定、Web爬虫、实时应用。

后端开发 0 次安装 2 次浏览 更新于 3/16/2026

name: 异步-python-模式 description: 掌握Python asyncio、并发编程和async/await模式,用于高性能应用程序。在构建异步API、并发系统或需要非阻塞操作的I/O绑定应用程序时使用。

异步Python模式

使用asyncio、并发编程模式和async/await实现异步Python应用程序的综合指南,用于构建高性能、非阻塞系统。

何时使用此技能

  • 构建异步Web API(FastAPI、aiohttp、Sanic)
  • 实现并发I/O操作(数据库、文件、网络)
  • 创建具有并发请求的网络爬虫
  • 开发实时应用程序(WebSocket服务器、聊天系统)
  • 同时处理多个独立任务
  • 构建具有异步通信的微服务
  • 优化I/O绑定工作负载
  • 实现异步后台任务和队列

快速开始

import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# Python 3.7+
asyncio.run(main())

基础Async/Await

async def fetch_data(url: str) -> dict:
    """从URL异步获取数据。"""
    await asyncio.sleep(1)  # 模拟I/O
    return {"url": url, "data": "result"}

async def main():
    result = await fetch_data("https://api.example.com")
    print(result)

asyncio.run(main())

并发执行

async def fetch_all_users(user_ids: List[int]) -> List[dict]:
    """并发获取多个用户。"""
    tasks = [fetch_user(uid) for uid in user_ids]
    results = await asyncio.gather(*tasks)
    return results

核心概念

1. 事件循环

事件循环是asyncio的核心,管理和调度异步任务。

关键特性:

  • 单线程协作多任务
  • 调度协程执行
  • 处理I/O操作而不阻塞
  • 管理回调和future

2. 协程

使用async def定义的函数,可以暂停和恢复。

async def my_coroutine():
    result = await some_async_operation()
    return result

3. 任务

在事件循环上并发运行的调度协程。

task = asyncio.create_task(background_task())
result = await task

4. 异步上下文管理器

支持async with以进行适当清理的资源。

async with AsyncDatabaseConnection("postgresql://localhost") as conn:
    result = await conn.query("SELECT * FROM users")

5. 异步迭代器

支持async for以迭代异步数据源的对象。

async for item in async_range(1, 10):
    process(item)

参见详细解释: 核心概念

基本模式

模式1: 任务创建和管理

有效创建和管理并发任务。

task1 = asyncio.create_task(background_task("Task 1", 2))
task2 = asyncio.create_task(background_task("Task 2", 1))

result1 = await task1
result2 = await task2

参见详细模式: 基本模式

模式2: 错误处理

通过适当的异常处理来处理异步代码中的错误。

async def safe_operation(item_id: int):
    try:
        return await risky_operation(item_id)
    except ValueError as e:
        print(f"Error: {e}")
        return None

results = await asyncio.gather(*tasks, return_exceptions=True)

参见详细模式: 错误处理

模式3: 超时处理

执行具有超时的操作以防止挂起。

try:
    result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
except asyncio.TimeoutError:
    print("Operation timed out")

参见详细模式: 超时和取消

高级模式

异步上下文管理器

实现异步上下文管理器以进行资源管理。

class AsyncDatabaseConnection:
    async def __aenter__(self):
        # 设置逻辑
        return connection

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 清理逻辑
        pass

参见详细实现: 高级模式

生产者-消费者模式

实现异步队列以进行生产者-消费者工作流。

queue = asyncio.Queue(maxsize=10)

async def producer(queue):
    for item in items:
        await queue.put(item)

async def consumer(queue):
    while True:
        item = await queue.get()
        process(item)
        queue.task_done()

参见详细模式: 并发模式

使用信号量进行速率限制

使用信号量控制并发访问。

semaphore = asyncio.Semaphore(5)

async def api_call(url, semaphore):
    async with semaphore:
        return await fetch(url)

参见详细实现: 同步

实际应用

使用aiohttp进行网络爬取

async def scrape_urls(urls: List[str]):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

参见详细示例: 实际应用

异步数据库操作

async def get_user_data(db, user_id):
    user, orders, profile = await asyncio.gather(
        db.fetch_user(user_id),
        db.fetch_orders(user_id),
        db.fetch_profile(user_id)
    )
    return {"user": user, "orders": orders, "profile": profile}

参见详细模式: 实际应用

性能最佳实践

  1. 使用连接池: 高效重用连接
  2. 批量操作: 批量处理项目
  3. 避免阻塞: 在executor中运行CPU绑定工作
  4. 使用信号量: 限制并发操作
  5. 处理取消: 适当清理资源

参见详细指南: 性能最佳实践

常见陷阱

  1. 忘记await: 返回协程对象,不执行
  2. 阻塞事件循环: 使用time.sleep()而不是asyncio.sleep()
  3. 不处理取消: 任务取消而无清理
  4. 混合同步和异步: 从同步代码调用异步不正确

参见详细解决方案: 常见陷阱

测试异步代码

import pytest

@pytest.mark.asyncio
async def test_async_function():
    result = await fetch_data("https://api.example.com")
    assert result is not None

@pytest.mark.asyncio
async def test_with_timeout():
    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(slow_operation(5), timeout=1.0)

参见详细测试指南: 测试

最佳实践总结

  1. 使用asyncio.run()作为入口点(Python 3.7+)
  2. 始终await协程以执行它们
  3. 使用gather()进行并发执行
  4. 实现适当的错误处理
  5. 使用超时以防止挂起操作
  6. 池化连接以获得更好性能
  7. 避免在异步代码中使用阻塞操作
  8. 使用信号量进行速率限制
  9. 正确处理任务取消
  10. 使用pytest-asyncio测试异步代码

资源