Python性能优化Skill python-performance-optimization

Python性能优化技能用于分析和优化Python代码的性能,包括CPU分析、内存优化、代码优化最佳实践等。关键词:Python性能优化、CPU分析、内存分析、代码优化、性能瓶颈、应用性能提升。

架构设计 0 次安装 0 次浏览 更新于 3/22/2026

name: Python性能优化 description: 使用cProfile、内存分析器和性能最佳实践来分析和优化Python代码。当调试慢速Python代码、优化瓶颈或提升应用性能时使用。

Python性能优化

全面指南,用于分析、分析和优化Python代码以提高性能,包括CPU分析、内存优化和实施最佳实践。

何时使用此技能

  • 识别Python应用程序中的性能瓶颈
  • 减少应用程序延迟和响应时间
  • 优化CPU密集型操作
  • 减少内存消耗和内存泄漏
  • 改善数据库查询性能
  • 优化I/O操作
  • 加速数据处理管道
  • 实施高性能算法
  • 分析生产应用程序

核心概念

1. 分析类型

  • CPU分析:识别耗时的函数
  • 内存分析:跟踪内存分配和泄漏
  • 行分析:以逐行粒度进行分析
  • 调用图:可视化函数调用关系

2. 性能指标

  • 执行时间:操作所需时间
  • 内存使用:峰值和平均内存消耗
  • CPU利用率:处理器使用模式
  • I/O等待:I/O操作花费的时间

3. 优化策略

  • 算法:更好的算法和数据结构
  • 实施:更高效的代码模式
  • 并行化:多线程/多处理
  • 缓存:避免冗余计算
  • 本地扩展:对关键路径使用C/Rust

快速开始

基本计时

import time

def measure_time():
    """简单计时测量。"""
    start = time.time()

    # 您的代码在这里
    result = sum(range(1000000))

    elapsed = time.time() - start
    print(f"执行时间: {elapsed:.4f} 秒")
    return result

# 更好:使用timeit进行准确测量
import timeit

execution_time = timeit.timeit(
    "sum(range(1000000))",
    number=100
)
print(f"平均时间: {execution_time/100:.6f} 秒")

分析工具

模式1: cProfile - CPU分析

import cProfile
import pstats
from pstats import SortKey

def slow_function():
    """要分析的函数。"""
    total = 0
    for i in range(1000000):
        total += i
    return total

def another_function():
    """另一个函数。"""
    return [i**2 for i in range(100000)]

def main():
    """要分析的主函数。"""
    result1 = slow_function()
    result2 = another_function()
    return result1, result2

# 分析代码
if __name__ == "__main__":
    profiler = cProfile.Profile()
    profiler.enable()

    main()

    profiler.disable()

    # 打印统计信息
    stats = pstats.Stats(profiler)
    stats.sort_stats(SortKey.CUMULATIVE)
    stats.print_stats(10)  # 前10个函数

    # 保存到文件以便后续分析
    stats.dump_stats("profile_output.prof")

命令行分析:

# 分析脚本
python -m cProfile -o output.prof script.py

# 查看结果
python -m pstats output.prof
# 在pstats中:
# sort cumtime
# stats 10

模式2: line_profiler - 逐行分析

# 安装: pip install line-profiler

# 添加@profile装饰器(line_profiler提供)
@profile
def process_data(data):
    """使用行分析处理数据。"""
    result = []
    for item in data:
        processed = item * 2
        result.append(processed)
    return result

# 运行:
# kernprof -l -v script.py

手动行分析:

from line_profiler import LineProfiler

def process_data(data):
    """要分析的函数。"""
    result = []
    for item in data:
        processed = item * 2
        result.append(processed)
    return result

if __name__ == "__main__":
    lp = LineProfiler()
    lp.add_function(process_data)

    data = list(range(100000))

    lp_wrapper = lp(process_data)
    lp_wrapper(data)

    lp.print_stats()

模式3: memory_profiler - 内存使用

# 安装: pip install memory-profiler

from memory_profiler import profile

@profile
def memory_intensive():
    """使用大量内存的函数。"""
    # 创建大列表
    big_list = [i for i in range(1000000)]

    # 创建大字典
    big_dict = {i: i**2 for i in range(100000)}

    # 处理数据
    result = sum(big_list)

    return result

if __name__ == "__main__":
    memory_intensive()

# 运行:
# python -m memory_profiler script.py

模式4: py-spy - 生产分析

# 安装: pip install py-spy

# 分析运行的Python进程
py-spy top --pid 12345

# 生成火焰图
py-spy record -o profile.svg --pid 12345

# 分析脚本
py-spy record -o profile.svg -- python script.py

# 转储当前调用栈
py-spy dump --pid 12345

优化模式

模式5: 列表推导与循环

import timeit

# 慢: 传统循环
def slow_squares(n):
    """使用循环创建平方列表。"""
    result = []
    for i in range(n):
        result.append(i**2)
    return result

# 快: 列表推导
def fast_squares(n):
    """使用推导创建平方列表。"""
    return [i**2 for i in range(n)]

# 基准测试
n = 100000

slow_time = timeit.timeit(lambda: slow_squares(n), number=100)
fast_time = timeit.timeit(lambda: fast_squares(n), number=100)

print(f"循环: {slow_time:.4f}s")
print(f"推导: {fast_time:.4f}s")
print(f"加速: {slow_time/fast_time:.2f}x")

# 对简单操作更快: map
def faster_squares(n):
    """使用map以获得更好性能。"""
    return list(map(lambda x: x**2, range(n)))

模式6: 生成器表达式用于内存

import sys

def list_approach():
    """内存密集型列表。"""
    data = [i**2 for i in range(1000000)]
    return sum(data)

def generator_approach():
    """内存高效生成器。"""
    data = (i**2 for i in range(1000000))
    return sum(data)

# 内存比较
list_data = [i for i in range(1000000)]
gen_data = (i for i in range(1000000))

print(f"列表大小: {sys.getsizeof(list_data)} 字节")
print(f"生成器大小: {sys.getsizeof(gen_data)} 字节")

# 生成器使用恒定内存,无论大小如何

模式7: 字符串连接

import timeit

def slow_concat(items):
    """慢字符串连接。"""
    result = ""
    for item in items:
        result += str(item)
    return result

def fast_concat(items):
    """使用join的快速字符串连接。"""
    return "".join(str(item) for item in items)

def faster_concat(items):
    """使用列表更快。"""
    parts = [str(item) for item in items]
    return "".join(parts)

items = list(range(10000))

# 基准测试
slow = timeit.timeit(lambda: slow_concat(items), number=100)
fast = timeit.timeit(lambda: fast_concat(items), number=100)
faster = timeit.timeit(lambda: faster_concat(items), number=100)

print(f"连接 (+): {slow:.4f}s")
print(f"Join (生成器): {fast:.4f}s")
print(f"Join (列表): {faster:.4f}s")

模式8: 字典查找与列表搜索

import timeit

# 创建测试数据
size = 10000
items = list(range(size))
lookup_dict = {i: i for i in range(size)}

def list_search(items, target):
    """O(n) 列表搜索。"""
    return target in items

def dict_search(lookup_dict, target):
    """O(1) 字典搜索。"""
    return target in lookup_dict

target = size - 1  # 列表最坏情况

# 基准测试
list_time = timeit.timeit(
    lambda: list_search(items, target),
    number=1000
)
dict_time = timeit.timeit(
    lambda: dict_search(lookup_dict, target),
    number=1000
)

print(f"列表搜索: {list_time:.6f}s")
print(f"字典搜索: {dict_time:.6f}s")
print(f"加速: {list_time/dict_time:.0f}x")

模式9: 局部变量访问

import timeit

# 全局变量(慢)
GLOBAL_VALUE = 100

def use_global():
    """访问全局变量。"""
    total = 0
    for i in range(10000):
        total += GLOBAL_VALUE
    return total

def use_local():
    """使用局部变量。"""
    local_value = 100
    total = 0
    for i in range(10000):
        total += local_value
    return total

# 局部更快
global_time = timeit.timeit(use_global, number=1000)
local_time = timeit.timeit(use_local, number=1000)

print(f"全局访问: {global_time:.4f}s")
print(f"局部访问: {local_time:.4f}s")
print(f"加速: {global_time/local_time:.2f}x")

模式10: 函数调用开销

import timeit

def calculate_inline():
    """内联计算。"""
    total = 0
    for i in range(10000):
        total += i * 2 + 1
    return total

def helper_function(x):
    """辅助函数。"""
    return x * 2 + 1

def calculate_with_function():
    """带函数调用的计算。"""
    total = 0
    for i in range(10000):
        total += helper_function(i)
    return total

# 内联更快,由于没有调用开销
inline_time = timeit.timeit(calculate_inline, number=1000)
function_time = timeit.timeit(calculate_with_function, number=1000)

print(f"内联: {inline_time:.4f}s")
print(f"函数调用: {function_time:.4f}s")

高级优化

模式11: NumPy用于数值操作

import timeit
import numpy as np

def python_sum(n):
    """使用纯Python求和。"""
    return sum(range(n))

def numpy_sum(n):
    """使用NumPy求和。"""
    return np.arange(n).sum()

n = 1000000

python_time = timeit.timeit(lambda: python_sum(n), number=100)
numpy_time = timeit.timeit(lambda: numpy_sum(n), number=100)

print(f"Python: {python_time:.4f}s")
print(f"NumPy: {numpy_time:.4f}s")
print(f"加速: {python_time/numpy_time:.2f}x")

# 向量化操作
def python_multiply():
    """Python中的元素级乘法。"""
    a = list(range(100000))
    b = list(range(100000))
    return [x * y for x, y in zip(a, b)]

def numpy_multiply():
    """NumPy中的向量化乘法。"""
    a = np.arange(100000)
    b = np.arange(100000)
    return a * b

py_time = timeit.timeit(python_multiply, number=100)
np_time = timeit.timeit(numpy_multiply, number=100)

print(f"
Python乘法: {py_time:.4f}s")
print(f"NumPy乘法: {np_time:.4f}s")
print(f"加速: {py_time/np_time:.2f}x")

模式12: 使用functools.lru_cache缓存

from functools import lru_cache
import timeit

def fibonacci_slow(n):
    """无缓存的递归斐波那契。"""
    if n < 2:
        return n
    return fibonacci_slow(n-1) + fibonacci_slow(n-2)

@lru_cache(maxsize=None)
def fibonacci_fast(n):
    """带缓存的递归斐波那契。"""
    if n < 2:
        return n
    return fibonacci_fast(n-1) + fibonacci_fast(n-2)

# 递归算法的巨大加速
n = 30

slow_time = timeit.timeit(lambda: fibonacci_slow(n), number=1)
fast_time = timeit.timeit(lambda: fibonacci_fast(n), number=1000)

print(f"无缓存 (1次运行): {slow_time:.4f}s")
print(f"有缓存 (1000次运行): {fast_time:.4f}s")

# 缓存信息
print(f"缓存信息: {fibonacci_fast.cache_info()}")

模式13: 使用slots用于内存

import sys

class RegularClass:
    """带__dict__的常规类。"""
    def __init__(self, x, y, z):
        self.x = x
        self.y = y
        self.z = z

class SlottedClass:
    """带__slots__的类用于内存效率。"""
    __slots__ = ['x', 'y', 'z']

    def __init__(self, x, y, z):
        self.x = x
        self.y = y
        self.z = z

# 内存比较
regular = RegularClass(1, 2, 3)
slotted = SlottedClass(1, 2, 3)

print(f"常规类大小: {sys.getsizeof(regular)} 字节")
print(f"槽类大小: {sys.getsizeof(slotted)} 字节")

# 许多实例的显著节省
regular_objects = [RegularClass(i, i+1, i+2) for i in range(10000)]
slotted_objects = [SlottedClass(i, i+1, i+2) for i in range(10000)]

print(f"
10000个常规对象内存: ~{sys.getsizeof(regular) * 10000} 字节")
print(f"10000个槽对象内存: ~{sys.getsizeof(slotted) * 10000} 字节")

模式14: 多处理用于CPU绑定任务

import multiprocessing as mp
import time

def cpu_intensive_task(n):
    """CPU密集型计算。"""
    return sum(i**2 for i in range(n))

def sequential_processing():
    """顺序处理任务。"""
    start = time.time()
    results = [cpu_intensive_task(1000000) for _ in range(4)]
    elapsed = time.time() - start
    return elapsed, results

def parallel_processing():
    """并行处理任务。"""
    start = time.time()
    with mp.Pool(processes=4) as pool:
        results = pool.map(cpu_intensive_task, [1000000] * 4)
    elapsed = time.time() - start
    return elapsed, results

if __name__ == "__main__":
    seq_time, seq_results = sequential_processing()
    par_time, par_results = parallel_processing()

    print(f"顺序: {seq_time:.2f}s")
    print(f"并行: {par_time:.2f}s")
    print(f"加速: {seq_time/par_time:.2f}x")

模式15: 异步I/O用于I/O绑定任务

import asyncio
import aiohttp
import time
import requests

urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
]

def synchronous_requests():
    """同步HTTP请求。"""
    start = time.time()
    results = []
    for url in urls:
        response = requests.get(url)
        results.append(response.status_code)
    elapsed = time.time() - start
    return elapsed, results

async def async_fetch(session, url):
    """异步HTTP请求。"""
    async with session.get(url) as response:
        return response.status

async def asynchronous_requests():
    """异步HTTP请求。"""
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [async_fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    return elapsed, results

# 对I/O绑定工作异步更快
sync_time, sync_results = synchronous_requests()
async_time, async_results = asyncio.run(asynchronous_requests())

print(f"同步: {sync_time:.2f}s")
print(f"异步: {async_time:.2f}s")
print(f"加速: {sync_time/async_time:.2f}x")

数据库优化

模式16: 批量数据库操作

import sqlite3
import time

def create_db():
    """创建测试数据库。"""
    conn = sqlite3.connect(":memory:")
    conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
    return conn

def slow_inserts(conn, count):
    """一次插入一条记录。"""
    start = time.time()
    cursor = conn.cursor()
    for i in range(count):
        cursor.execute("INSERT INTO users (name) VALUES (?)", (f"User {i}",))
        conn.commit()  # 每次插入提交
    elapsed = time.time() - start
    return elapsed

def fast_inserts(conn, count):
    """带单次提交的批量插入。"""
    start = time.time()
    cursor = conn.cursor()
    data = [(f"User {i}",) for i in range(count)]
    cursor.executemany("INSERT INTO users (name) VALUES (?)", data)
    conn.commit()  # 单次提交
    elapsed = time.time() - start
    return elapsed

# 基准测试
conn1 = create_db()
slow_time = slow_inserts(conn1, 1000)

conn2 = create_db()
fast_time = fast_inserts(conn2, 1000)

print(f"单个插入: {slow_time:.4f}s")
print(f"批量插入: {fast_time:.4f}s")
print(f"加速: {slow_time/fast_time:.2f}x")

模式17: 查询优化

# 为频繁查询的列使用索引
"""
-- 慢: 无索引
SELECT * FROM users WHERE email = 'user@example.com';

-- 快: 带索引
CREATE INDEX idx_users_email ON users(email);
SELECT * FROM users WHERE email = 'user@example.com';
"""

# 使用查询规划
import sqlite3

conn = sqlite3.connect("example.db")
cursor = conn.cursor()

# 分析查询性能
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = ?", ("test@example.com",))
print(cursor.fetchall())

# 仅选择所需列
# 慢: SELECT *
# 快: SELECT id, name

内存优化

模式18: 检测内存泄漏

import tracemalloc
import gc

def memory_leak_example():
    """泄漏内存的示例。"""
    leaked_objects = []

    for i in range(100000):
        # 添加但从未移除的对象
        leaked_objects.append([i] * 100)

    # 在实际代码中,这可能是意外引用

def track_memory_usage():
    """跟踪内存分配。"""
    tracemalloc.start()

    # 之前快照
    snapshot1 = tracemalloc.take_snapshot()

    # 运行代码
    memory_leak_example()

    # 之后快照
    snapshot2 = tracemalloc.take_snapshot()

    # 比较
    top_stats = snapshot2.compare_to(snapshot1, 'lineno')

    print("前10个内存分配:")
    for stat in top_stats[:10]:
        print(stat)

    tracemalloc.stop()

# 监控内存
track_memory_usage()

# 强制垃圾回收
gc.collect()

模式19: 迭代器与列表

import sys

def process_file_list(filename):
    """将整个文件加载到内存。"""
    with open(filename) as f:
        lines = f.readlines()  # 加载所有行
        return sum(1 for line in lines if line.strip())

def process_file_iterator(filename):
    """逐行处理文件。"""
    with open(filename) as f:
        return sum(1 for line in f if line.strip())

# 迭代器使用恒定内存
# 列表将整个文件加载到内存

模式20: 弱引用用于缓存

import weakref

class CachedResource:
    """可被垃圾回收的资源。"""
    def __init__(self, data):
        self.data = data

# 常规缓存阻止垃圾回收
regular_cache = {}

def get_resource_regular(key):
    """从常规缓存获取资源。"""
    if key not in regular_cache:
        regular_cache[key] = CachedResource(f"Data for {key}")
    return regular_cache[key]

# 弱引用缓存允许垃圾回收
weak_cache = weakref.WeakValueDictionary()

def get_resource_weak(key):
    """从弱缓存获取资源。"""
    resource = weak_cache.get(key)
    if resource is None:
        resource = CachedResource(f"Data for {key}")
        weak_cache[key] = resource
    return resource

# 当没有强引用时,对象可以被GC

基准测试工具

自定义基准装饰器

import time
from functools import wraps

def benchmark(func):
    """用于基准测试函数执行的装饰器。"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        elapsed = time.perf_counter() - start
        print(f"{func.__name__} 花费 {elapsed:.6f} 秒")
        return result
    return wrapper

@benchmark
def slow_function():
    """要基准测试的函数。"""
    time.sleep(0.5)
    return sum(range(1000000))

result = slow_function()

使用pytest-benchmark进行性能测试

# 安装: pip install pytest-benchmark

def test_list_comprehension(benchmark):
    """基准测试列表推导。"""
    result = benchmark(lambda: [i**2 for i in range(10000)])
    assert len(result) == 10000

def test_map_function(benchmark):
    """基准测试map函数。"""
    result = benchmark(lambda: list(map(lambda x: x**2, range(10000))))
    assert len(result) == 10000

# 运行: pytest test_performance.py --benchmark-compare

最佳实践

  1. 优化前先分析 - 测量以找到真实瓶颈
  2. 关注热点路径 - 优化最频繁运行的代码
  3. 使用适当的数据结构 - 字典用于查找,集合用于成员资格
  4. 避免过早优化 - 清晰度优先,然后优化
  5. 使用内置函数 - 它们用C实现
  6. 缓存昂贵计算 - 使用lru_cache
  7. 批量I/O操作 - 减少系统调用
  8. 对大数据集使用生成器
  9. 考虑NumPy用于数值操作
  10. 分析生产代码 - 对实时系统使用py-spy

常见陷阱

  • 未分析就优化
  • 不必要地使用全局变量
  • 未使用适当的数据结构
  • 创建不必要的数据副本
  • 未使用数据库连接池
  • 忽略算法复杂性
  • 过度优化罕见代码路径
  • 未考虑内存使用

资源

  • cProfile: 内置CPU分析器
  • memory_profiler: 内存使用分析
  • line_profiler: 逐行分析
  • py-spy: 用于生产的采样分析器
  • NumPy: 高性能数值计算
  • Cython: 将Python编译为C
  • PyPy: 带JIT的替代Python解释器

性能检查清单

  • [ ] 分析代码以识别瓶颈
  • [ ] 使用适当的数据结构
  • [ ] 在有益处时实现缓存
  • [ ] 优化数据库查询
  • [ ] 对大数据集使用生成器
  • [ ] 对CPU绑定任务考虑多处理
  • [ ] 对I/O绑定任务使用异步I/O
  • [ ] 在热点循环中最小化函数调用开销
  • [ ] 检查内存泄漏
  • [ ] 优化前后进行基准测试