Python 异步测试
问题陈述
异步测试需要特定的模式。pytest-asyncio 有不同的模式会影响行为。数据库测试需要隔离。异步函数的模拟与同步不同。如果处理不当,测试可能会不稳定或无法捕获错误。
模式:pytest-asyncio 配置
问题: Pytest 需要异步测试的配置。
# pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "strict" # 需要明确的 @pytest.mark.asyncio
# 或
asyncio_mode = "auto" # 所有异步测试自动运行
import pytest
# 使用 asyncio_mode = "strict"(此代码库)
@pytest.mark.asyncio
async def test_something():
result = await some_async_function()
assert result == expected
# 没有标记 = 测试不会作为异步运行!
模式:异步 Fixtures
问题: 提供异步资源的 fixtures 需要特殊处理。
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
# ✅ 正确:会话的异步 fixture
@pytest.fixture
async def session() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
yield session
await session.rollback() # 测试后清理
# ✅ 正确:测试数据的异步 fixture
@pytest.fixture
async def test_user(session: AsyncSession) -> User:
user = User(email="test@example.com", hashed_password="...")
session.add(user)
await session.commit()
await session.refresh(user)
return user
# ✅ 正确:使用异步 fixtures
@pytest.mark.asyncio
async def test_get_user(session: AsyncSession, test_user: User):
result = await session.execute(
select(User).where(User.id == test_user.id)
)
user = result.scalar_one()
assert user.email == "test@example.com"
模式:数据库测试隔离
问题: 测试互相污染数据库状态。
# ✅ 正确:每个测试的事务回滚
@pytest.fixture
async def session() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
# 开始事务
async with session.begin():
yield session
# 退出时自动回滚
# ✅ 正确:复杂测试的嵌套事务
@pytest.fixture
async def session() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
await session.begin()
yield session
await session.rollback()
# 替代方案:使用单独的测试数据库
# conftest.py
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def test_engine():
# 测试使用 SQLite,生产使用 PostgreSQL
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
yield engine
await engine.dispose()
模式:模拟异步函数
问题: 常规的 Mock 不适用于异步函数。
from unittest.mock import AsyncMock, patch
# ✅ 正确:异步函数的 AsyncMock
@pytest.mark.asyncio
async def test_with_mocked_service():
mock_service = AsyncMock()
mock_service.get_user.return_value = User(id=uuid4(), email="test@example.com")
result = await mock_service.get_user(user_id)
assert result.email == "test@example.com"
mock_service.get_user.assert_called_once_with(user_id)
# ✅ 正确:异步函数的 Patching
@pytest.mark.asyncio
@patch("app.services.user_service.send_email", new_callable=AsyncMock)
async def test_user_creation_sends_email(mock_send_email: AsyncMock, session: AsyncSession):
mock_send_email.return_value = True
user = await create_user(email="new@example.com", session=session)
mock_send_email.assert_called_once_with(user.email, "Welcome!")
# ✅ 正确:AsyncMock 带 side_effect
mock_service = AsyncMock()
mock_service.get_user.side_effect = [
User(id=uuid4(), email="first@example.com"),
User(id=uuid4(), email="second@example.com"),
]
# 第一次调用返回第一个用户,第二次调用返回第二个用户
模式:HTTP 客户端测试
问题: 使用异步客户端测试 FastAPI 端点。
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.fixture
async def client() -> AsyncGenerator[AsyncClient, None]:
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
yield client
@pytest.mark.asyncio
async def test_get_users(client: AsyncClient):
response = await client.get("/api/users")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
@pytest.mark.asyncio
async def test_create_assessment(client: AsyncClient, auth_headers: dict):
response = await client.post(
"/api/assessments",
json={"title": "Test Assessment", "skill_areas": ["fundamentals"]},
headers=auth_headers,
)
assert response.status_code == 201
data = response.json()
assert data["title"] == "Test Assessment"
# ✅ 正确:认证 fixture
@pytest.fixture
async def auth_headers(test_user: User) -> dict:
token = create_access_token(user_id=test_user.id)
return {"Authorization": f"Bearer {token}"}
模式:测试服务函数
@pytest.mark.asyncio
async def test_calculate_rating(session: AsyncSession, test_user: User):
# 安排:创建测试数据
assessment = Assessment(user_id=test_user.id, title="Test")
session.add(assessment)
await session.commit()
answers = [
UserAnswer(user_id=test_user.id, question_id=q_id, value=4)
for q_id in question_ids
]
session.add_all(answers)
await session.commit()
# 行动:调用服务
result = await calculate_rating(assessment.id, session)
# 断言:检查结果
assert result.rating >= 1.0
assert result.rating <= 5.5
assert result.confidence > 0
@pytest.mark.asyncio
async def test_calculate_rating_no_answers(session: AsyncSession, test_user: User):
assessment = Assessment(user_id=test_user.id, title="Empty")
session.add(assessment)
await session.commit()
# 应该引发或返回特定结果
with pytest.raises(ValueError, match="No answers found"):
await calculate_rating(assessment.id, session)
模式:测试多步骤流程
与前端相同的原则 - 测试整个流程,而不仅仅是单元:
@pytest.mark.asyncio
async def test_complete_assessment_flow(session: AsyncSession, test_user: User):
"""测试完整的评估流程:创建 -> 回答 -> 提交 -> 结果。"""
# 第一步:创建评估
assessment = await create_assessment(
user_id=test_user.id,
data=AssessmentCreate(title="Full Flow Test", skill_areas=["fundamentals"]),
session=session,
)
assert assessment.id is not None
# 第二步:回答问题
questions = await get_assessment_questions(assessment.id, session)
for question in questions:
await submit_answer(
user_id=test_user.id,
question_id=question.id,
value=4,
session=session,
)
# 第三步:提交评估
result = await submit_assessment(assessment.id, session)
assert result.status == "completed"
# 第四步:验证结果
rating = await get_assessment_rating(assessment.id, session)
assert rating is not None
assert rating.skill_area == "fundamentals"
模式:Fixture 范围
问题: 理解何时重新创建 fixtures。
# function(默认) - 每个测试重新创建
@pytest.fixture
async def session():
... # 每个测试一个新会话
# class - 在测试类中共享
@pytest.fixture(scope="class")
async def shared_data():
... # 每个测试类创建一次
# module - 在测试文件中共享
@pytest.fixture(scope="module")
async def module_setup():
... # 每个文件创建一次
# session - 在整个测试运行中共享
@pytest.fixture(scope="session")
async def database():
... # 创建一次,所有测试使用
最佳实践:
- 数据库会话:
function范围(隔离) - 测试数据:
function范围(清洁状态) - 数据库引擎:
session范围(创建成本高)
模式:测试错误案例
@pytest.mark.asyncio
async def test_get_nonexistent_user(session: AsyncSession):
fake_id = uuid4()
with pytest.raises(HTTPException) as exc_info:
await get_user_or_404(fake_id, session)
assert exc_info.value.status_code == 404
assert str(fake_id) in exc_info.value.detail
@pytest.mark.asyncio
async def test_duplicate_email_rejected(session: AsyncSession, test_user: User):
with pytest.raises(IntegrityError):
duplicate = User(email=test_user.email, hashed_password="...")
session.add(duplicate)
await session.commit()
模式:参数化测试
@pytest.mark.asyncio
@pytest.mark.parametrize("skill_area,expected_min,expected_max", [
("fundamentals", 1.0, 5.5),
("advanced", 1.0, 5.5),
("strategy", 1.0, 5.5),
])
async def test_rating_ranges(
skill_area: str,
expected_min: float,
expected_max: float,
session: AsyncSession,
):
rating = await calculate_rating_for_area(skill_area, session)
assert expected_min <= rating <= expected_max
@pytest.mark.asyncio
@pytest.mark.parametrize("invalid_input", [
{"title": ""}, # 空标题
{"title": "x" * 201}, # 太长
{"skill_areas": []}, # 空区域
])
async def test_assessment_validation(invalid_input: dict, client: AsyncClient):
response = await client.post("/api/assessments", json=invalid_input)
assert response.status_code == 422
常见问题
| 问题 | 可能的原因 | 解决方案 |
|---|---|---|
| “协程从未等待” | 测试中缺少 await |
添加 await |
| 测试不运行异步 | 缺少 @pytest.mark.asyncio |
添加标记或使用 asyncio_mode = "auto" |
| 测试互相污染 | 缺少回滚 | 使用带有回滚的事务 fixture |
| “事件循环已关闭” | fixture 范围不匹配 | 检查异步 fixtures 上的 scope |
| 模拟不工作 | 使用 Mock 而不是 AsyncMock |
对于异步使用 AsyncMock |
测试命令
# 运行所有测试
uv run pytest
# 详细输出
uv run pytest -v
# 特定文件
uv run pytest tests/test_assessments.py
# 特定测试
uv run pytest tests/test_assessments.py::test_create_assessment
# 带覆盖率
uv run pytest --cov=app --cov-report=html
# 首次失败即停止
uv run pytest -x
# 显示打印输出
uv run pytest -s