Python资源管理Skill python-resource-management

Python资源管理技能专注于使用上下文管理器、清理模式和流处理技术来高效管理资源,如数据库连接、文件句柄和网络套接字,确保即使在异常情况下也能可靠释放。适用于后端开发、DevOps、架构设计等场景,提高代码的健壮性和可维护性。关键词:Python, 资源管理, 上下文管理器, 清理模式, 流处理, 异常处理, 数据库连接, 文件处理, 自动清理。

后端开发 0 次安装 0 次浏览 更新于 3/22/2026

name: python-resource-management description: Python资源管理,使用上下文管理器、清理模式和流处理。适用于管理连接、文件句柄、实现清理逻辑或构建带有累积状态的流式响应。

Python资源管理

使用上下文管理器确定性管理资源。像数据库连接、文件句柄和网络套接字这样的资源应该可靠地释放,即使发生异常。

何时使用此技能

  • 管理数据库连接和连接池
  • 处理文件句柄和I/O
  • 实现自定义上下文管理器
  • 构建带有状态的流式响应
  • 处理嵌套资源清理
  • 创建异步上下文管理器

核心概念

1. 上下文管理器

with语句确保资源自动释放,即使在异常情况下。

2. 协议方法

__enter__/__exit__用于同步资源管理,__aenter__/__aexit__用于异步资源管理。

3. 无条件清理

__exit__总是运行,无论是否发生异常。

4. 异常处理

__exit__返回True以抑制异常,返回False以传播异常。

快速开始

from contextlib import contextmanager

@contextmanager
def managed_resource():
    resource = acquire_resource()
    try:
        yield resource
    finally:
        resource.cleanup()

with managed_resource() as r:
    r.do_work()

基本模式

模式1:基于类的上下文管理器

为复杂资源实现上下文管理器协议。

class DatabaseConnection:
    """带有自动清理的数据库连接。"""

    def __init__(self, dsn: str) -> None:
        self._dsn = dsn
        self._conn: Connection | None = None

    def connect(self) -> None:
        """建立数据库连接。"""
        self._conn = psycopg.connect(self._dsn)

    def close(self) -> None:
        """如果连接打开则关闭。"""
        if self._conn is not None:
            self._conn.close()
            self._conn = None

    def __enter__(self) -> "DatabaseConnection":
        """进入上下文:连接并返回自身。"""
        self.connect()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """退出上下文:总是关闭连接。"""
        self.close()

# 使用上下文管理器(首选)
with DatabaseConnection(dsn) as db:
    result = db.execute(query)

# 需要时手动管理
db = DatabaseConnection(dsn)
db.connect()
try:
    result = db.execute(query)
finally:
    db.close()

模式2:异步上下文管理器

为异步资源实现异步协议。

class AsyncDatabasePool:
    """异步数据库连接池。"""

    def __init__(self, dsn: str, min_size: int = 1, max_size: int = 10) -> None:
        self._dsn = dsn
        self._min_size = min_size
        self._max_size = max_size
        self._pool: asyncpg.Pool | None = None

    async def __aenter__(self) -> "AsyncDatabasePool":
        """创建连接池。"""
        self._pool = await asyncpg.create_pool(
            self._dsn,
            min_size=self._min_size,
            max_size=self._max_size,
        )
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """关闭池中所有连接。"""
        if self._pool is not None:
            await self._pool.close()

    async def execute(self, query: str, *args) -> list[dict]:
        """使用池连接执行查询。"""
        async with self._pool.acquire() as conn:
            return await conn.fetch(query, *args)

# 使用
async with AsyncDatabasePool(dsn) as pool:
    users = await pool.execute("SELECT * FROM users WHERE active = $1", True)

模式3:使用@contextmanager装饰器

简化简单情况的上下文管理器。

from contextlib import contextmanager, asynccontextmanager
import time
import structlog

logger = structlog.get_logger()

@contextmanager
def timed_block(name: str):
    """计时代码块。"""
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        logger.info(f"{name}完成", duration_seconds=round(elapsed, 3))

# 使用
with timed_block("data_processing"):
    process_large_dataset()

@asynccontextmanager
async def database_transaction(conn: AsyncConnection):
    """管理数据库事务。"""
    await conn.execute("BEGIN")
    try:
        yield conn
        await conn.execute("COMMIT")
    except Exception:
        await conn.execute("ROLLBACK")
        raise

# 使用
async with database_transaction(conn) as tx:
    await tx.execute("INSERT INTO users ...")
    await tx.execute("INSERT INTO audit_log ...")

模式4:无条件资源释放

__exit__中总是清理资源,无论异常。

class FileProcessor:
    """处理文件并保证清理。"""

    def __init__(self, path: str) -> None:
        self._path = path
        self._file: IO | None = None
        self._temp_files: list[Path] = []

    def __enter__(self) -> "FileProcessor":
        self._file = open(self._path, "r")
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """无条件清理所有资源。"""
        # 关闭主文件
        if self._file is not None:
            self._file.close()

        # 清理任何临时文件
        for temp_file in self._temp_files:
            try:
                temp_file.unlink()
            except OSError:
                pass  # 尽力清理

        # 返回None/False以传播任何异常

高级模式

模式5:选择性异常抑制

只抑制特定的、有记录的异常。

class StreamWriter:
    """优雅处理断管错误的写入器。"""

    def __init__(self, stream) -> None:
        self._stream = stream

    def __enter__(self) -> "StreamWriter":
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> bool:
        """清理,在关闭时抑制BrokenPipeError。"""
        self._stream.close()

        # 抑制BrokenPipeError(客户端断开连接)
        # 这是预期行为,不是错误
        if exc_type is BrokenPipeError:
            return True  # 异常被抑制

        return False  # 传播所有其他异常

模式6:带有累积状态的流处理

在流处理期间维护增量块和累积状态。

from collections.abc import Generator
from dataclasses import dataclass, field

@dataclass
class StreamingResult:
    """累积的流结果。"""

    chunks: list[str] = field(default_factory=list)
    _finalized: bool = False

    @property
    def content(self) -> str:
        """获取累积内容。"""
        return "".join(self.chunks)

    def add_chunk(self, chunk: str) -> None:
        """添加块到累加器。"""
        if self._finalized:
            raise RuntimeError("不能向已最终化的结果添加")
        self.chunks.append(chunk)

    def finalize(self) -> str:
        """标记流完成并返回内容。"""
        self._finalized = True
        return self.content

def stream_with_accumulation(
    response: StreamingResponse,
) -> Generator[tuple[str, str], None, str]:
    """流式响应同时累积内容。

    生成:
        每个块的(累积内容, 新块)元组。

    返回:
        最终累积内容。
    """
    result = StreamingResult()

    for chunk in response.iter_content():
        result.add_chunk(chunk)
        yield result.content, chunk

    return result.finalize()

模式7:高效字符串累积

避免累积时的O(n²)字符串连接。

def accumulate_stream(stream) -> str:
    """高效累积流内容。"""
    # 不好:由于字符串不可变性导致O(n²)
    # content = ""
    # for chunk in stream:
    #     content += chunk  # 每次创建新字符串

    # 好:使用列表和join实现O(n)
    chunks: list[str] = []
    for chunk in stream:
        chunks.append(chunk)
    return "".join(chunks)  # 单次分配

模式8:跟踪流指标

测量首字节时间和总流时间。

import time
from collections.abc import Generator

def stream_with_metrics(
    response: StreamingResponse,
) -> Generator[str, None, dict]:
    """流式响应同时收集指标。

    生成:
        内容块。

    返回:
        指标字典。
    """
    start = time.perf_counter()
    first_chunk_time: float | None = None
    chunk_count = 0
    total_bytes = 0

    for chunk in response.iter_content():
        if first_chunk_time is None:
            first_chunk_time = time.perf_counter() - start

        chunk_count += 1
        total_bytes += len(chunk.encode())
        yield chunk

    total_time = time.perf_counter() - start

    return {
        "time_to_first_byte_ms": round((first_chunk_time or 0) * 1000, 2),
        "total_time_ms": round(total_time * 1000, 2),
        "chunk_count": chunk_count,
        "total_bytes": total_bytes,
    }

模式9:使用ExitStack管理多个资源

干净地处理动态数量的资源。

from contextlib import ExitStack, AsyncExitStack
from pathlib import Path

def process_files(paths: list[Path]) -> list[str]:
    """处理多个文件并自动清理。"""
    results = []

    with ExitStack() as stack:
        # 打开所有文件 - 当块退出时它们都会被关闭
        files = [stack.enter_context(open(p)) for p in paths]

        for f in files:
            results.append(f.read())

    return results

async def process_connections(hosts: list[str]) -> list[dict]:
    """处理多个异步连接。"""
    results = []

    async with AsyncExitStack() as stack:
        connections = [
            await stack.enter_async_context(connect_to_host(host))
            for host in hosts
        ]

        for conn in connections:
            results.append(await conn.fetch_data())

    return results

最佳实践总结

  1. 总是使用上下文管理器 - 对于任何需要清理的资源
  2. 无条件清理 - __exit__即使在异常时也运行
  3. 不要意外抑制 - 除非有意抑制,否则返回False
  4. 使用@contextmanager - 对于简单资源模式
  5. 实现两种协议 - 支持with和手动管理
  6. 使用ExitStack - 对于动态数量的资源
  7. 高效累积 - 使用列表+join,而不是字符串连接
  8. 跟踪指标 - 首字节时间对流处理很重要
  9. 记录行为 - 特别是异常抑制
  10. 测试清理路径 - 验证资源在错误时被释放