name: async-programming description: 使用asyncio和Tokio进行并发操作,专注于竞态条件预防、资源安全和性能 model: sonnet risk_level: MEDIUM
异步编程技能
文件组织
- SKILL.md: 核心原则、模式、基本安全(本文件)
- references/security-examples.md: 竞态条件和资源安全示例
- references/advanced-patterns.md: 高级异步模式和优化
验证门
门 0.1: 领域专业知识验证
- 状态: 通过
- 专业知识领域: asyncio、Tokio、竞态条件、资源管理、并发安全
门 0.2: 漏洞研究
- 状态: 通过(针对中等风险有3+个问题)
- 研究日期: 2025-11-20
- 问题: CVE-2024-12254(asyncio内存)、Redis竞态条件(CVE-2023-28858/9)
门 0.11: 文件组织决策
- 决策: 分割结构(中等风险,约400行主文件 + 参考文件)
1. 概述
风险级别: 中等
理由: 异步编程引入了竞态条件、资源泄漏和基于时间的漏洞。虽然不直接暴露于外部攻击,但不当的异步代码可能导致数据损坏、死锁和安全敏感的竞态条件,如双重支付或TOCTOU(检查时间-使用时间)。
您是Python(asyncio)和Rust(Tokio)异步编程模式的专家。您编写无竞态条件的并发代码,正确管理资源,并优雅地处理错误。
核心专业知识领域
- 竞态条件识别和预防
- 异步资源管理(连接、锁、文件)
- 并发上下文中的错误处理
- 异步工作负载的性能优化
- 优雅关闭和取消
2. 核心原则
- TDD优先: 使用pytest-asyncio在实现前编写异步测试
- 性能意识: 使用asyncio.gather、信号量,并避免阻塞调用
- 识别竞态条件: 识别在等待点之间访问的共享状态
- 保护共享状态: 使用锁、原子操作或消息传递
- 管理资源: 确保即使在取消时也进行清理
- 处理错误: 不让一个任务的失败破坏其他任务
- 避免死锁: 一致的锁顺序、锁超时
决策框架
| 情况 | 方法 |
|---|---|
| 共享可变状态 | 使用asyncio.Lock或RwLock |
| 数据库事务 | 使用原子操作,SELECT FOR UPDATE |
| 资源清理 | 使用异步上下文管理器 |
| 任务协调 | 使用asyncio.Event、Queue或Semaphore |
| 后台任务 | 跟踪任务,处理取消 |
3. 实现工作流(TDD)
步骤1: 首先编写失败测试
import pytest
import asyncio
@pytest.mark.asyncio
async def test_concurrent_counter_safety():
"""测试计数器在并发访问下保持一致性。"""
counter = SafeCounter() # 尚未实现 - 将失败
async def increment_many():
for _ in range(100):
await counter.increment()
# 运行10个并发递增器
await asyncio.gather(*[increment_many() for _ in range(10)])
# 必须恰好为1000(无丢失更新)
assert await counter.get() == 1000
@pytest.mark.asyncio
async def test_resource_cleanup_on_cancellation():
"""测试即使任务被取消,资源也能被清理。"""
cleanup_called = False
async def task_with_resource():
nonlocal cleanup_called
async with managed_resource() as resource: # 尚未实现
await asyncio.sleep(10) # 长时间操作
cleanup_called = True
task = asyncio.create_task(task_with_resource())
await asyncio.sleep(0.1)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert cleanup_called # 清理必须发生
步骤2: 实现最小通过代码
import asyncio
from contextlib import asynccontextmanager
class SafeCounter:
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._value
@asynccontextmanager
async def managed_resource():
resource = await acquire_resource()
try:
yield resource
finally:
await release_resource(resource) # 始终运行
步骤3: 重构遵循模式
应用性能模式,添加超时,改进错误处理。
步骤4: 运行完整验证
# 运行异步测试
pytest tests/ -v --asyncio-mode=auto
# 检查阻塞调用
python -m asyncio debug
# 运行并发压力测试
pytest tests/ -v -n auto --asyncio-mode=auto
4. 性能模式
模式1: asyncio.gather用于并发
# 坏例子 - 顺序执行
async def fetch_all_sequential(urls: list[str]) -> list[str]:
results = []
for url in urls:
result = await fetch(url) # 等待每个
results.append(result)
return results # 总时间:所有获取的总和
# 好例子 - 并发执行
async def fetch_all_concurrent(urls: list[str]) -> list[str]:
return await asyncio.gather(*[fetch(url) for url in urls])
# 总时间:所有获取的最大值
模式2: 信号量用于速率限制
# 坏例子 - 无界并发(可能压垮服务器)
async def fetch_many(urls: list[str]):
return await asyncio.gather(*[fetch(url) for url in urls])
# 好例子 - 使用信号量的有界并发
async def fetch_many_limited(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(url: str):
async with semaphore:
return await fetch(url)
return await asyncio.gather(*[fetch_with_limit(url) for url in urls])
模式3: 任务组(Python 3.11+)
# 坏例子 - 手动任务跟踪
async def process_items_manual(items):
tasks = []
for item in items:
task = asyncio.create_task(process(item))
tasks.append(task)
return await asyncio.gather(*tasks)
# 好例子 - 具有自动清理的任务组
async def process_items_taskgroup(items):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process(item)) for item in items]
return [task.result() for task in tasks]
# 任何失败时自动取消
模式4: 高效事件循环使用
# 坏例子 - 每次创建新事件循环
def run_async_bad():
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(main())
finally:
loop.close()
# 好例子 - 重用运行循环或使用asyncio.run
def run_async_good():
return asyncio.run(main()) # 处理循环生命周期
# 好例子 - 对于库代码,获取现有循环
async def library_function():
loop = asyncio.get_running_loop()
future = loop.create_future()
# 使用现有循环
模式5: 避免阻塞调用
# 坏例子 - 阻塞事件循环
async def process_file_bad(path: str):
with open(path) as f: # 阻塞I/O
data = f.read()
result = hashlib.sha256(data).hexdigest() # CPU绑定阻塞循环
return result
# 好例子 - 使用aiofiles和执行器的非阻塞
import aiofiles
async def process_file_good(path: str):
async with aiofiles.open(path, 'rb') as f:
data = await f.read()
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, lambda: hashlib.sha256(data).hexdigest()
)
return result
5. 技术基础
版本推荐
| 组件 | 版本 | 注释 |
|---|---|---|
| Python | 3.11+ | asyncio改进,TaskGroup |
| Rust | 1.75+ | 稳定异步 |
| Tokio | 1.35+ | 异步运行时 |
| aioredis | 使用redis-py | 更好的维护 |
关键库
# Python异步生态系统
asyncio # 核心异步
aiohttp # HTTP客户端
asyncpg # PostgreSQL
aiofiles # 文件I/O
pytest-asyncio # 测试
6. 实现模式
模式1: 使用锁保护共享状态
import asyncio
class SafeCounter:
"""异步上下文中的线程安全计数器。"""
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._value
模式2: 原子数据库操作
from sqlalchemy.ext.asyncio import AsyncSession
async def transfer_safe(db: AsyncSession, from_id: int, to_id: int, amount: int):
"""使用行锁的原子转账。"""
async with db.begin():
stmt = (
select(Account)
.where(Account.id.in_([from_id, to_id]))
.with_for_update() # 锁定行
)
accounts = {a.id: a for a in (await db.execute(stmt)).scalars()}
if accounts[from_id].balance < amount:
raise ValueError("资金不足")
accounts[from_id].balance -= amount
accounts[to_id].balance += amount
模式3: 使用上下文管理器管理资源
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_connection():
"""确保即使在取消时也进行连接清理。"""
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)
模式4: 优雅关闭
import asyncio, signal
class GracefulApp:
def __init__(self):
self.shutdown_event = asyncio.Event()
self.tasks: set[asyncio.Task] = set()
async def run(self):
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self.shutdown_event.set)
self.tasks.add(asyncio.create_task(self.worker()))
await self.shutdown_event.wait()
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
7. 安全标准
7.1 常见异步漏洞
| 问题 | 严重性 | 缓解措施 |
|---|---|---|
| 竞态条件 | 高 | 使用锁或原子操作 |
| TOCTOU | 高 | 原子数据库操作 |
| 资源泄漏 | 中等 | 上下文管理器 |
| CVE-2024-12254 | 高 | 升级Python |
| 死锁 | 中等 | 锁顺序、超时 |
7.2 竞态条件检测
# 竞态条件 - 读取/等待/写入模式
class UserSession:
async def update(self, key, value):
current = self.data.get(key, 0) # 读取
await validate(value) # 等待 = 上下文切换
self.data[key] = current + value # 写入陈旧值
# 修复 - 在锁外验证,在锁内原子更新
class SafeUserSession:
async def update(self, key, value):
await validate(value)
async with self._lock:
self.data[key] = self.data.get(key, 0) + value
8. 常见错误与反模式
反模式1: 未保护的共享状态
# 永不 - 缓存上的竞态条件
async def get_or_fetch(self, key):
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]
# 始终 - 锁保护
async def get_or_fetch(self, key):
async with self._lock:
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]
反模式2: 射后不理任务
# 永不 - 任务可能被垃圾回收
asyncio.create_task(background_work())
# 始终 - 跟踪任务
task = asyncio.create_task(background_work())
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
反模式3: 阻塞事件循环
# 永不 - 阻塞所有异步任务
time.sleep(5)
# 始终 - 使用异步
await asyncio.sleep(5)
result = await loop.run_in_executor(None, cpu_bound_func)
9. 预实现检查清单
阶段1: 编写代码前
- [ ] 为竞态条件场景编写失败测试
- [ ] 编写取消时资源清理的测试
- [ ] 识别所有共享可变状态
- [ ] 规划锁层次以避免死锁
- [ ] 确定适当的并发限制
阶段2: 实现期间
- [ ] 使用锁保护所有共享状态
- [ ] 使用异步上下文管理器管理资源
- [ ] 使用asyncio.gather进行并发操作
- [ ] 应用信号量进行速率限制
- [ ] 对CPU绑定工作使用执行器
- [ ] 跟踪所有创建的任务
阶段3: 提交前
- [ ] 所有异步测试通过:
pytest --asyncio-mode=auto - [ ] 事件循环上无阻塞调用
- [ ] 所有外部操作有超时
- [ ] 优雅关闭处理取消
- [ ] 竞态条件测试验证线程安全
- [ ] 锁顺序一致(无死锁潜力)
10. 总结
您的目标是创建异步代码,具有:
- 测试驱动: 首先使用pytest-asyncio编写异步测试
- 无竞态条件: 保护共享状态,使用原子操作
- 资源安全: 上下文管理器,正确清理
- 高性能: asyncio.gather、信号量,避免阻塞
- 弹性: 处理错误,支持取消
关键性能规则:
- 使用
asyncio.gather进行并发I/O操作 - 应用信号量限制并发连接
- 使用TaskGroup(Python 3.11+)进行自动清理
- 永不阻塞事件循环 - 对CPU工作使用
run_in_executor - 重用事件循环,不要创建新的
安全提醒:
- 每个共享可变状态都需要保护
- 数据库操作必须是原子的(预防TOCTOU)
- 始终使用异步上下文管理器管理资源
- 跟踪所有任务以实现优雅关闭
- 使用并发负载测试以发现竞态条件