异步编程技能Skill async-programming

异步编程技能是一种专注于使用Python的asyncio和Rust的Tokio进行并发操作的开发技能。它旨在预防竞态条件、确保资源安全并优化性能,适用于高并发系统、后端服务、网络应用和性能关键场景。关键词:异步编程、并发、竞态条件、资源管理、性能优化、asyncio、Tokio、后端开发、安全编程。

后端开发 0 次安装 0 次浏览 更新于 3/15/2026

name: async-programming description: 使用asyncio和Tokio进行并发操作,专注于竞态条件预防、资源安全和性能 model: sonnet risk_level: MEDIUM

异步编程技能

文件组织

  • SKILL.md: 核心原则、模式、基本安全(本文件)
  • references/security-examples.md: 竞态条件和资源安全示例
  • references/advanced-patterns.md: 高级异步模式和优化

验证门

门 0.1: 领域专业知识验证

  • 状态: 通过
  • 专业知识领域: asyncio、Tokio、竞态条件、资源管理、并发安全

门 0.2: 漏洞研究

  • 状态: 通过(针对中等风险有3+个问题)
  • 研究日期: 2025-11-20
  • 问题: CVE-2024-12254(asyncio内存)、Redis竞态条件(CVE-2023-28858/9)

门 0.11: 文件组织决策

  • 决策: 分割结构(中等风险,约400行主文件 + 参考文件)

1. 概述

风险级别: 中等

理由: 异步编程引入了竞态条件、资源泄漏和基于时间的漏洞。虽然不直接暴露于外部攻击,但不当的异步代码可能导致数据损坏、死锁和安全敏感的竞态条件,如双重支付或TOCTOU(检查时间-使用时间)。

您是Python(asyncio)和Rust(Tokio)异步编程模式的专家。您编写无竞态条件的并发代码,正确管理资源,并优雅地处理错误。

核心专业知识领域

  • 竞态条件识别和预防
  • 异步资源管理(连接、锁、文件)
  • 并发上下文中的错误处理
  • 异步工作负载的性能优化
  • 优雅关闭和取消

2. 核心原则

  1. TDD优先: 使用pytest-asyncio在实现前编写异步测试
  2. 性能意识: 使用asyncio.gather、信号量,并避免阻塞调用
  3. 识别竞态条件: 识别在等待点之间访问的共享状态
  4. 保护共享状态: 使用锁、原子操作或消息传递
  5. 管理资源: 确保即使在取消时也进行清理
  6. 处理错误: 不让一个任务的失败破坏其他任务
  7. 避免死锁: 一致的锁顺序、锁超时

决策框架

情况 方法
共享可变状态 使用asyncio.Lock或RwLock
数据库事务 使用原子操作,SELECT FOR UPDATE
资源清理 使用异步上下文管理器
任务协调 使用asyncio.Event、Queue或Semaphore
后台任务 跟踪任务,处理取消

3. 实现工作流(TDD)

步骤1: 首先编写失败测试

import pytest
import asyncio

@pytest.mark.asyncio
async def test_concurrent_counter_safety():
    """测试计数器在并发访问下保持一致性。"""
    counter = SafeCounter()  # 尚未实现 - 将失败

    async def increment_many():
        for _ in range(100):
            await counter.increment()

    # 运行10个并发递增器
    await asyncio.gather(*[increment_many() for _ in range(10)])

    # 必须恰好为1000(无丢失更新)
    assert await counter.get() == 1000

@pytest.mark.asyncio
async def test_resource_cleanup_on_cancellation():
    """测试即使任务被取消,资源也能被清理。"""
    cleanup_called = False

    async def task_with_resource():
        nonlocal cleanup_called
        async with managed_resource() as resource:  # 尚未实现
            await asyncio.sleep(10)  # 长时间操作
        cleanup_called = True

    task = asyncio.create_task(task_with_resource())
    await asyncio.sleep(0.1)
    task.cancel()

    with pytest.raises(asyncio.CancelledError):
        await task

    assert cleanup_called  # 清理必须发生

步骤2: 实现最小通过代码

import asyncio
from contextlib import asynccontextmanager

class SafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = asyncio.Lock()

    async def increment(self) -> int:
        async with self._lock:
            self._value += 1
            return self._value

    async def get(self) -> int:
        async with self._lock:
            return self._value

@asynccontextmanager
async def managed_resource():
    resource = await acquire_resource()
    try:
        yield resource
    finally:
        await release_resource(resource)  # 始终运行

步骤3: 重构遵循模式

应用性能模式,添加超时,改进错误处理。

步骤4: 运行完整验证

# 运行异步测试
pytest tests/ -v --asyncio-mode=auto

# 检查阻塞调用
python -m asyncio debug

# 运行并发压力测试
pytest tests/ -v -n auto --asyncio-mode=auto

4. 性能模式

模式1: asyncio.gather用于并发

# 坏例子 - 顺序执行
async def fetch_all_sequential(urls: list[str]) -> list[str]:
    results = []
    for url in urls:
        result = await fetch(url)  # 等待每个
        results.append(result)
    return results  # 总时间:所有获取的总和

# 好例子 - 并发执行
async def fetch_all_concurrent(urls: list[str]) -> list[str]:
    return await asyncio.gather(*[fetch(url) for url in urls])
    # 总时间:所有获取的最大值

模式2: 信号量用于速率限制

# 坏例子 - 无界并发(可能压垮服务器)
async def fetch_many(urls: list[str]):
    return await asyncio.gather(*[fetch(url) for url in urls])

# 好例子 - 使用信号量的有界并发
async def fetch_many_limited(urls: list[str], max_concurrent: int = 10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_with_limit(url: str):
        async with semaphore:
            return await fetch(url)

    return await asyncio.gather(*[fetch_with_limit(url) for url in urls])

模式3: 任务组(Python 3.11+)

# 坏例子 - 手动任务跟踪
async def process_items_manual(items):
    tasks = []
    for item in items:
        task = asyncio.create_task(process(item))
        tasks.append(task)
    return await asyncio.gather(*tasks)

# 好例子 - 具有自动清理的任务组
async def process_items_taskgroup(items):
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(process(item)) for item in items]
    return [task.result() for task in tasks]
    # 任何失败时自动取消

模式4: 高效事件循环使用

# 坏例子 - 每次创建新事件循环
def run_async_bad():
    loop = asyncio.new_event_loop()
    try:
        return loop.run_until_complete(main())
    finally:
        loop.close()

# 好例子 - 重用运行循环或使用asyncio.run
def run_async_good():
    return asyncio.run(main())  # 处理循环生命周期

# 好例子 - 对于库代码,获取现有循环
async def library_function():
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    # 使用现有循环

模式5: 避免阻塞调用

# 坏例子 - 阻塞事件循环
async def process_file_bad(path: str):
    with open(path) as f:  # 阻塞I/O
        data = f.read()
    result = hashlib.sha256(data).hexdigest()  # CPU绑定阻塞循环
    return result

# 好例子 - 使用aiofiles和执行器的非阻塞
import aiofiles

async def process_file_good(path: str):
    async with aiofiles.open(path, 'rb') as f:
        data = await f.read()

    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(
        None, lambda: hashlib.sha256(data).hexdigest()
    )
    return result

5. 技术基础

版本推荐

组件 版本 注释
Python 3.11+ asyncio改进,TaskGroup
Rust 1.75+ 稳定异步
Tokio 1.35+ 异步运行时
aioredis 使用redis-py 更好的维护

关键库

# Python异步生态系统
asyncio           # 核心异步
aiohttp           # HTTP客户端
asyncpg           # PostgreSQL
aiofiles          # 文件I/O
pytest-asyncio    # 测试

6. 实现模式

模式1: 使用锁保护共享状态

import asyncio

class SafeCounter:
    """异步上下文中的线程安全计数器。"""
    def __init__(self):
        self._value = 0
        self._lock = asyncio.Lock()

    async def increment(self) -> int:
        async with self._lock:
            self._value += 1
            return self._value

    async def get(self) -> int:
        async with self._lock:
            return self._value

模式2: 原子数据库操作

from sqlalchemy.ext.asyncio import AsyncSession

async def transfer_safe(db: AsyncSession, from_id: int, to_id: int, amount: int):
    """使用行锁的原子转账。"""
    async with db.begin():
        stmt = (
            select(Account)
            .where(Account.id.in_([from_id, to_id]))
            .with_for_update()  # 锁定行
        )
        accounts = {a.id: a for a in (await db.execute(stmt)).scalars()}

        if accounts[from_id].balance < amount:
            raise ValueError("资金不足")

        accounts[from_id].balance -= amount
        accounts[to_id].balance += amount

模式3: 使用上下文管理器管理资源

from contextlib import asynccontextmanager

@asynccontextmanager
async def get_connection():
    """确保即使在取消时也进行连接清理。"""
    conn = await pool.acquire()
    try:
        yield conn
    finally:
        await pool.release(conn)

模式4: 优雅关闭

import asyncio, signal

class GracefulApp:
    def __init__(self):
        self.shutdown_event = asyncio.Event()
        self.tasks: set[asyncio.Task] = set()

    async def run(self):
        loop = asyncio.get_event_loop()
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(sig, self.shutdown_event.set)

        self.tasks.add(asyncio.create_task(self.worker()))
        await self.shutdown_event.wait()

        for task in self.tasks:
            task.cancel()
        await asyncio.gather(*self.tasks, return_exceptions=True)

7. 安全标准

7.1 常见异步漏洞

问题 严重性 缓解措施
竞态条件 使用锁或原子操作
TOCTOU 原子数据库操作
资源泄漏 中等 上下文管理器
CVE-2024-12254 升级Python
死锁 中等 锁顺序、超时

7.2 竞态条件检测

# 竞态条件 - 读取/等待/写入模式
class UserSession:
    async def update(self, key, value):
        current = self.data.get(key, 0)  # 读取
        await validate(value)             # 等待 = 上下文切换
        self.data[key] = current + value  # 写入陈旧值

# 修复 - 在锁外验证,在锁内原子更新
class SafeUserSession:
    async def update(self, key, value):
        await validate(value)
        async with self._lock:
            self.data[key] = self.data.get(key, 0) + value

8. 常见错误与反模式

反模式1: 未保护的共享状态

# 永不 - 缓存上的竞态条件
async def get_or_fetch(self, key):
    if key not in self.data:
        self.data[key] = await fetch(key)
    return self.data[key]

# 始终 - 锁保护
async def get_or_fetch(self, key):
    async with self._lock:
        if key not in self.data:
            self.data[key] = await fetch(key)
        return self.data[key]

反模式2: 射后不理任务

# 永不 - 任务可能被垃圾回收
asyncio.create_task(background_work())

# 始终 - 跟踪任务
task = asyncio.create_task(background_work())
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)

反模式3: 阻塞事件循环

# 永不 - 阻塞所有异步任务
time.sleep(5)

# 始终 - 使用异步
await asyncio.sleep(5)
result = await loop.run_in_executor(None, cpu_bound_func)

9. 预实现检查清单

阶段1: 编写代码前

  • [ ] 为竞态条件场景编写失败测试
  • [ ] 编写取消时资源清理的测试
  • [ ] 识别所有共享可变状态
  • [ ] 规划锁层次以避免死锁
  • [ ] 确定适当的并发限制

阶段2: 实现期间

  • [ ] 使用锁保护所有共享状态
  • [ ] 使用异步上下文管理器管理资源
  • [ ] 使用asyncio.gather进行并发操作
  • [ ] 应用信号量进行速率限制
  • [ ] 对CPU绑定工作使用执行器
  • [ ] 跟踪所有创建的任务

阶段3: 提交前

  • [ ] 所有异步测试通过: pytest --asyncio-mode=auto
  • [ ] 事件循环上无阻塞调用
  • [ ] 所有外部操作有超时
  • [ ] 优雅关闭处理取消
  • [ ] 竞态条件测试验证线程安全
  • [ ] 锁顺序一致(无死锁潜力)

10. 总结

您的目标是创建异步代码,具有:

  • 测试驱动: 首先使用pytest-asyncio编写异步测试
  • 无竞态条件: 保护共享状态,使用原子操作
  • 资源安全: 上下文管理器,正确清理
  • 高性能: asyncio.gather、信号量,避免阻塞
  • 弹性: 处理错误,支持取消

关键性能规则:

  1. 使用asyncio.gather进行并发I/O操作
  2. 应用信号量限制并发连接
  3. 使用TaskGroup(Python 3.11+)进行自动清理
  4. 永不阻塞事件循环 - 对CPU工作使用run_in_executor
  5. 重用事件循环,不要创建新的

安全提醒:

  1. 每个共享可变状态都需要保护
  2. 数据库操作必须是原子的(预防TOCTOU)
  3. 始终使用异步上下文管理器管理资源
  4. 跟踪所有任务以实现优雅关闭
  5. 使用并发负载测试以发现竞态条件