name: Python性能优化 description: 使用cProfile、内存分析器和性能最佳实践来分析和优化Python代码。当调试慢速Python代码、优化瓶颈或提升应用性能时使用。
Python性能优化
全面指南,用于分析、分析和优化Python代码以提高性能,包括CPU分析、内存优化和实施最佳实践。
何时使用此技能
- 识别Python应用程序中的性能瓶颈
- 减少应用程序延迟和响应时间
- 优化CPU密集型操作
- 减少内存消耗和内存泄漏
- 改善数据库查询性能
- 优化I/O操作
- 加速数据处理管道
- 实施高性能算法
- 分析生产应用程序
核心概念
1. 分析类型
- CPU分析:识别耗时的函数
- 内存分析:跟踪内存分配和泄漏
- 行分析:以逐行粒度进行分析
- 调用图:可视化函数调用关系
2. 性能指标
- 执行时间:操作所需时间
- 内存使用:峰值和平均内存消耗
- CPU利用率:处理器使用模式
- I/O等待:I/O操作花费的时间
3. 优化策略
- 算法:更好的算法和数据结构
- 实施:更高效的代码模式
- 并行化:多线程/多处理
- 缓存:避免冗余计算
- 本地扩展:对关键路径使用C/Rust
快速开始
基本计时
import time
def measure_time():
"""简单计时测量。"""
start = time.time()
# 您的代码在这里
result = sum(range(1000000))
elapsed = time.time() - start
print(f"执行时间: {elapsed:.4f} 秒")
return result
# 更好:使用timeit进行准确测量
import timeit
execution_time = timeit.timeit(
"sum(range(1000000))",
number=100
)
print(f"平均时间: {execution_time/100:.6f} 秒")
分析工具
模式1: cProfile - CPU分析
import cProfile
import pstats
from pstats import SortKey
def slow_function():
"""要分析的函数。"""
total = 0
for i in range(1000000):
total += i
return total
def another_function():
"""另一个函数。"""
return [i**2 for i in range(100000)]
def main():
"""要分析的主函数。"""
result1 = slow_function()
result2 = another_function()
return result1, result2
# 分析代码
if __name__ == "__main__":
profiler = cProfile.Profile()
profiler.enable()
main()
profiler.disable()
# 打印统计信息
stats = pstats.Stats(profiler)
stats.sort_stats(SortKey.CUMULATIVE)
stats.print_stats(10) # 前10个函数
# 保存到文件以便后续分析
stats.dump_stats("profile_output.prof")
命令行分析:
# 分析脚本
python -m cProfile -o output.prof script.py
# 查看结果
python -m pstats output.prof
# 在pstats中:
# sort cumtime
# stats 10
模式2: line_profiler - 逐行分析
# 安装: pip install line-profiler
# 添加@profile装饰器(line_profiler提供)
@profile
def process_data(data):
"""使用行分析处理数据。"""
result = []
for item in data:
processed = item * 2
result.append(processed)
return result
# 运行:
# kernprof -l -v script.py
手动行分析:
from line_profiler import LineProfiler
def process_data(data):
"""要分析的函数。"""
result = []
for item in data:
processed = item * 2
result.append(processed)
return result
if __name__ == "__main__":
lp = LineProfiler()
lp.add_function(process_data)
data = list(range(100000))
lp_wrapper = lp(process_data)
lp_wrapper(data)
lp.print_stats()
模式3: memory_profiler - 内存使用
# 安装: pip install memory-profiler
from memory_profiler import profile
@profile
def memory_intensive():
"""使用大量内存的函数。"""
# 创建大列表
big_list = [i for i in range(1000000)]
# 创建大字典
big_dict = {i: i**2 for i in range(100000)}
# 处理数据
result = sum(big_list)
return result
if __name__ == "__main__":
memory_intensive()
# 运行:
# python -m memory_profiler script.py
模式4: py-spy - 生产分析
# 安装: pip install py-spy
# 分析运行的Python进程
py-spy top --pid 12345
# 生成火焰图
py-spy record -o profile.svg --pid 12345
# 分析脚本
py-spy record -o profile.svg -- python script.py
# 转储当前调用栈
py-spy dump --pid 12345
优化模式
模式5: 列表推导与循环
import timeit
# 慢: 传统循环
def slow_squares(n):
"""使用循环创建平方列表。"""
result = []
for i in range(n):
result.append(i**2)
return result
# 快: 列表推导
def fast_squares(n):
"""使用推导创建平方列表。"""
return [i**2 for i in range(n)]
# 基准测试
n = 100000
slow_time = timeit.timeit(lambda: slow_squares(n), number=100)
fast_time = timeit.timeit(lambda: fast_squares(n), number=100)
print(f"循环: {slow_time:.4f}s")
print(f"推导: {fast_time:.4f}s")
print(f"加速: {slow_time/fast_time:.2f}x")
# 对简单操作更快: map
def faster_squares(n):
"""使用map以获得更好性能。"""
return list(map(lambda x: x**2, range(n)))
模式6: 生成器表达式用于内存
import sys
def list_approach():
"""内存密集型列表。"""
data = [i**2 for i in range(1000000)]
return sum(data)
def generator_approach():
"""内存高效生成器。"""
data = (i**2 for i in range(1000000))
return sum(data)
# 内存比较
list_data = [i for i in range(1000000)]
gen_data = (i for i in range(1000000))
print(f"列表大小: {sys.getsizeof(list_data)} 字节")
print(f"生成器大小: {sys.getsizeof(gen_data)} 字节")
# 生成器使用恒定内存,无论大小如何
模式7: 字符串连接
import timeit
def slow_concat(items):
"""慢字符串连接。"""
result = ""
for item in items:
result += str(item)
return result
def fast_concat(items):
"""使用join的快速字符串连接。"""
return "".join(str(item) for item in items)
def faster_concat(items):
"""使用列表更快。"""
parts = [str(item) for item in items]
return "".join(parts)
items = list(range(10000))
# 基准测试
slow = timeit.timeit(lambda: slow_concat(items), number=100)
fast = timeit.timeit(lambda: fast_concat(items), number=100)
faster = timeit.timeit(lambda: faster_concat(items), number=100)
print(f"连接 (+): {slow:.4f}s")
print(f"Join (生成器): {fast:.4f}s")
print(f"Join (列表): {faster:.4f}s")
模式8: 字典查找与列表搜索
import timeit
# 创建测试数据
size = 10000
items = list(range(size))
lookup_dict = {i: i for i in range(size)}
def list_search(items, target):
"""O(n) 列表搜索。"""
return target in items
def dict_search(lookup_dict, target):
"""O(1) 字典搜索。"""
return target in lookup_dict
target = size - 1 # 列表最坏情况
# 基准测试
list_time = timeit.timeit(
lambda: list_search(items, target),
number=1000
)
dict_time = timeit.timeit(
lambda: dict_search(lookup_dict, target),
number=1000
)
print(f"列表搜索: {list_time:.6f}s")
print(f"字典搜索: {dict_time:.6f}s")
print(f"加速: {list_time/dict_time:.0f}x")
模式9: 局部变量访问
import timeit
# 全局变量(慢)
GLOBAL_VALUE = 100
def use_global():
"""访问全局变量。"""
total = 0
for i in range(10000):
total += GLOBAL_VALUE
return total
def use_local():
"""使用局部变量。"""
local_value = 100
total = 0
for i in range(10000):
total += local_value
return total
# 局部更快
global_time = timeit.timeit(use_global, number=1000)
local_time = timeit.timeit(use_local, number=1000)
print(f"全局访问: {global_time:.4f}s")
print(f"局部访问: {local_time:.4f}s")
print(f"加速: {global_time/local_time:.2f}x")
模式10: 函数调用开销
import timeit
def calculate_inline():
"""内联计算。"""
total = 0
for i in range(10000):
total += i * 2 + 1
return total
def helper_function(x):
"""辅助函数。"""
return x * 2 + 1
def calculate_with_function():
"""带函数调用的计算。"""
total = 0
for i in range(10000):
total += helper_function(i)
return total
# 内联更快,由于没有调用开销
inline_time = timeit.timeit(calculate_inline, number=1000)
function_time = timeit.timeit(calculate_with_function, number=1000)
print(f"内联: {inline_time:.4f}s")
print(f"函数调用: {function_time:.4f}s")
高级优化
模式11: NumPy用于数值操作
import timeit
import numpy as np
def python_sum(n):
"""使用纯Python求和。"""
return sum(range(n))
def numpy_sum(n):
"""使用NumPy求和。"""
return np.arange(n).sum()
n = 1000000
python_time = timeit.timeit(lambda: python_sum(n), number=100)
numpy_time = timeit.timeit(lambda: numpy_sum(n), number=100)
print(f"Python: {python_time:.4f}s")
print(f"NumPy: {numpy_time:.4f}s")
print(f"加速: {python_time/numpy_time:.2f}x")
# 向量化操作
def python_multiply():
"""Python中的元素级乘法。"""
a = list(range(100000))
b = list(range(100000))
return [x * y for x, y in zip(a, b)]
def numpy_multiply():
"""NumPy中的向量化乘法。"""
a = np.arange(100000)
b = np.arange(100000)
return a * b
py_time = timeit.timeit(python_multiply, number=100)
np_time = timeit.timeit(numpy_multiply, number=100)
print(f"
Python乘法: {py_time:.4f}s")
print(f"NumPy乘法: {np_time:.4f}s")
print(f"加速: {py_time/np_time:.2f}x")
模式12: 使用functools.lru_cache缓存
from functools import lru_cache
import timeit
def fibonacci_slow(n):
"""无缓存的递归斐波那契。"""
if n < 2:
return n
return fibonacci_slow(n-1) + fibonacci_slow(n-2)
@lru_cache(maxsize=None)
def fibonacci_fast(n):
"""带缓存的递归斐波那契。"""
if n < 2:
return n
return fibonacci_fast(n-1) + fibonacci_fast(n-2)
# 递归算法的巨大加速
n = 30
slow_time = timeit.timeit(lambda: fibonacci_slow(n), number=1)
fast_time = timeit.timeit(lambda: fibonacci_fast(n), number=1000)
print(f"无缓存 (1次运行): {slow_time:.4f}s")
print(f"有缓存 (1000次运行): {fast_time:.4f}s")
# 缓存信息
print(f"缓存信息: {fibonacci_fast.cache_info()}")
模式13: 使用slots用于内存
import sys
class RegularClass:
"""带__dict__的常规类。"""
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
class SlottedClass:
"""带__slots__的类用于内存效率。"""
__slots__ = ['x', 'y', 'z']
def __init__(self, x, y, z):
self.x = x
self.y = y
self.z = z
# 内存比较
regular = RegularClass(1, 2, 3)
slotted = SlottedClass(1, 2, 3)
print(f"常规类大小: {sys.getsizeof(regular)} 字节")
print(f"槽类大小: {sys.getsizeof(slotted)} 字节")
# 许多实例的显著节省
regular_objects = [RegularClass(i, i+1, i+2) for i in range(10000)]
slotted_objects = [SlottedClass(i, i+1, i+2) for i in range(10000)]
print(f"
10000个常规对象内存: ~{sys.getsizeof(regular) * 10000} 字节")
print(f"10000个槽对象内存: ~{sys.getsizeof(slotted) * 10000} 字节")
模式14: 多处理用于CPU绑定任务
import multiprocessing as mp
import time
def cpu_intensive_task(n):
"""CPU密集型计算。"""
return sum(i**2 for i in range(n))
def sequential_processing():
"""顺序处理任务。"""
start = time.time()
results = [cpu_intensive_task(1000000) for _ in range(4)]
elapsed = time.time() - start
return elapsed, results
def parallel_processing():
"""并行处理任务。"""
start = time.time()
with mp.Pool(processes=4) as pool:
results = pool.map(cpu_intensive_task, [1000000] * 4)
elapsed = time.time() - start
return elapsed, results
if __name__ == "__main__":
seq_time, seq_results = sequential_processing()
par_time, par_results = parallel_processing()
print(f"顺序: {seq_time:.2f}s")
print(f"并行: {par_time:.2f}s")
print(f"加速: {seq_time/par_time:.2f}x")
模式15: 异步I/O用于I/O绑定任务
import asyncio
import aiohttp
import time
import requests
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
def synchronous_requests():
"""同步HTTP请求。"""
start = time.time()
results = []
for url in urls:
response = requests.get(url)
results.append(response.status_code)
elapsed = time.time() - start
return elapsed, results
async def async_fetch(session, url):
"""异步HTTP请求。"""
async with session.get(url) as response:
return response.status
async def asynchronous_requests():
"""异步HTTP请求。"""
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [async_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
return elapsed, results
# 对I/O绑定工作异步更快
sync_time, sync_results = synchronous_requests()
async_time, async_results = asyncio.run(asynchronous_requests())
print(f"同步: {sync_time:.2f}s")
print(f"异步: {async_time:.2f}s")
print(f"加速: {sync_time/async_time:.2f}x")
数据库优化
模式16: 批量数据库操作
import sqlite3
import time
def create_db():
"""创建测试数据库。"""
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
return conn
def slow_inserts(conn, count):
"""一次插入一条记录。"""
start = time.time()
cursor = conn.cursor()
for i in range(count):
cursor.execute("INSERT INTO users (name) VALUES (?)", (f"User {i}",))
conn.commit() # 每次插入提交
elapsed = time.time() - start
return elapsed
def fast_inserts(conn, count):
"""带单次提交的批量插入。"""
start = time.time()
cursor = conn.cursor()
data = [(f"User {i}",) for i in range(count)]
cursor.executemany("INSERT INTO users (name) VALUES (?)", data)
conn.commit() # 单次提交
elapsed = time.time() - start
return elapsed
# 基准测试
conn1 = create_db()
slow_time = slow_inserts(conn1, 1000)
conn2 = create_db()
fast_time = fast_inserts(conn2, 1000)
print(f"单个插入: {slow_time:.4f}s")
print(f"批量插入: {fast_time:.4f}s")
print(f"加速: {slow_time/fast_time:.2f}x")
模式17: 查询优化
# 为频繁查询的列使用索引
"""
-- 慢: 无索引
SELECT * FROM users WHERE email = 'user@example.com';
-- 快: 带索引
CREATE INDEX idx_users_email ON users(email);
SELECT * FROM users WHERE email = 'user@example.com';
"""
# 使用查询规划
import sqlite3
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
# 分析查询性能
cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = ?", ("test@example.com",))
print(cursor.fetchall())
# 仅选择所需列
# 慢: SELECT *
# 快: SELECT id, name
内存优化
模式18: 检测内存泄漏
import tracemalloc
import gc
def memory_leak_example():
"""泄漏内存的示例。"""
leaked_objects = []
for i in range(100000):
# 添加但从未移除的对象
leaked_objects.append([i] * 100)
# 在实际代码中,这可能是意外引用
def track_memory_usage():
"""跟踪内存分配。"""
tracemalloc.start()
# 之前快照
snapshot1 = tracemalloc.take_snapshot()
# 运行代码
memory_leak_example()
# 之后快照
snapshot2 = tracemalloc.take_snapshot()
# 比较
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print("前10个内存分配:")
for stat in top_stats[:10]:
print(stat)
tracemalloc.stop()
# 监控内存
track_memory_usage()
# 强制垃圾回收
gc.collect()
模式19: 迭代器与列表
import sys
def process_file_list(filename):
"""将整个文件加载到内存。"""
with open(filename) as f:
lines = f.readlines() # 加载所有行
return sum(1 for line in lines if line.strip())
def process_file_iterator(filename):
"""逐行处理文件。"""
with open(filename) as f:
return sum(1 for line in f if line.strip())
# 迭代器使用恒定内存
# 列表将整个文件加载到内存
模式20: 弱引用用于缓存
import weakref
class CachedResource:
"""可被垃圾回收的资源。"""
def __init__(self, data):
self.data = data
# 常规缓存阻止垃圾回收
regular_cache = {}
def get_resource_regular(key):
"""从常规缓存获取资源。"""
if key not in regular_cache:
regular_cache[key] = CachedResource(f"Data for {key}")
return regular_cache[key]
# 弱引用缓存允许垃圾回收
weak_cache = weakref.WeakValueDictionary()
def get_resource_weak(key):
"""从弱缓存获取资源。"""
resource = weak_cache.get(key)
if resource is None:
resource = CachedResource(f"Data for {key}")
weak_cache[key] = resource
return resource
# 当没有强引用时,对象可以被GC
基准测试工具
自定义基准装饰器
import time
from functools import wraps
def benchmark(func):
"""用于基准测试函数执行的装饰器。"""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
print(f"{func.__name__} 花费 {elapsed:.6f} 秒")
return result
return wrapper
@benchmark
def slow_function():
"""要基准测试的函数。"""
time.sleep(0.5)
return sum(range(1000000))
result = slow_function()
使用pytest-benchmark进行性能测试
# 安装: pip install pytest-benchmark
def test_list_comprehension(benchmark):
"""基准测试列表推导。"""
result = benchmark(lambda: [i**2 for i in range(10000)])
assert len(result) == 10000
def test_map_function(benchmark):
"""基准测试map函数。"""
result = benchmark(lambda: list(map(lambda x: x**2, range(10000))))
assert len(result) == 10000
# 运行: pytest test_performance.py --benchmark-compare
最佳实践
- 优化前先分析 - 测量以找到真实瓶颈
- 关注热点路径 - 优化最频繁运行的代码
- 使用适当的数据结构 - 字典用于查找,集合用于成员资格
- 避免过早优化 - 清晰度优先,然后优化
- 使用内置函数 - 它们用C实现
- 缓存昂贵计算 - 使用lru_cache
- 批量I/O操作 - 减少系统调用
- 对大数据集使用生成器
- 考虑NumPy用于数值操作
- 分析生产代码 - 对实时系统使用py-spy
常见陷阱
- 未分析就优化
- 不必要地使用全局变量
- 未使用适当的数据结构
- 创建不必要的数据副本
- 未使用数据库连接池
- 忽略算法复杂性
- 过度优化罕见代码路径
- 未考虑内存使用
资源
- cProfile: 内置CPU分析器
- memory_profiler: 内存使用分析
- line_profiler: 逐行分析
- py-spy: 用于生产的采样分析器
- NumPy: 高性能数值计算
- Cython: 将Python编译为C
- PyPy: 带JIT的替代Python解释器
性能检查清单
- [ ] 分析代码以识别瓶颈
- [ ] 使用适当的数据结构
- [ ] 在有益处时实现缓存
- [ ] 优化数据库查询
- [ ] 对大数据集使用生成器
- [ ] 对CPU绑定任务考虑多处理
- [ ] 对I/O绑定任务使用异步I/O
- [ ] 在热点循环中最小化函数调用开销
- [ ] 检查内存泄漏
- [ ] 优化前后进行基准测试