name: dask description: “并行/分布式计算。将pandas/NumPy扩展到内存之外,并行DataFrames/Arrays,多文件处理,任务图,用于大于RAM的数据集和并行工作流。”
Dask
概述
Dask是一个用于并行和分布式计算的Python库,具备三个关键能力:
- 超出内存的执行,在单机上处理超过可用RAM的数据
- 并行处理,通过多核提高计算速度
- 分布式计算,支持跨多机处理太字节级数据集
Dask从笔记本电脑(处理约100 GiB)扩展到集群(处理约100 TiB),同时保持熟悉的Python API。
何时使用此技能
此技能应在以下情况使用:
- 处理超过可用RAM的数据集
- 将pandas或NumPy操作扩展到更大的数据集
- 并行化计算以提高性能
- 高效处理多个文件(CSV、Parquet、JSON、文本日志)
- 构建具有任务依赖的自定义并行工作流
- 跨多个核心或机器分配工作负载
核心能力
Dask提供五个主要组件,每个适用于不同的用例:
1. DataFrames - 并行Pandas操作
目的:通过并行处理将pandas操作扩展到更大的数据集。
何时使用:
- 表格数据超过可用RAM
- 需要一起处理多个CSV/Parquet文件
- Pandas操作慢且需要并行化
- 从pandas原型扩展到生产
参考文档:有关Dask DataFrames的全面指南,请参阅references/dataframes.md,包括:
- 读取数据(单个文件、多个文件、通配符模式)
- 常见操作(过滤、分组、连接、聚合)
- 使用
map_partitions的自定义操作 - 性能优化提示
- 常见模式(ETL、时间序列、多文件处理)
快速示例:
import dask.dataframe as dd
# 读取多个文件作为单个DataFrame
ddf = dd.read_csv('data/2024-*.csv')
# 操作是惰性的,直到调用compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').mean().compute()
关键点:
- 操作是惰性的(构建任务图),直到调用
.compute() - 使用
map_partitions进行高效自定义操作 - 当从其他来源处理结构化数据时,尽早转换为DataFrame
2. Arrays - 并行NumPy操作
目的:使用分块算法将NumPy能力扩展到大于内存的数据集。
何时使用:
- 数组超过可用RAM
- NumPy操作需要并行化
- 处理科学数据集(HDF5、Zarr、NetCDF)
- 需要并行线性代数或数组操作
参考文档:有关Dask Arrays的全面指南,请参阅references/arrays.md,包括:
- 创建数组(从NumPy、随机、从磁盘)
- 分块策略和优化
- 常见操作(算术、归约、线性代数)
- 使用
map_blocks的自定义操作 - 与HDF5、Zarr和XArray的集成
快速示例:
import dask.array as da
# 创建带分块的大数组
x = da.random.random((100000, 100000), chunks=(10000, 10000))
# 操作是惰性的
y = x + 100
z = y.mean(axis=0)
# 计算结果
result = z.compute()
关键点:
- 分块大小至关重要(目标约100 MB每块)
- 操作在分块上并行执行
- 需要时重新分块以提高操作效率
- 使用
map_blocks处理Dask中不可用的操作
3. Bags - 非结构化数据的并行处理
目的:通过功能操作处理非结构化或半结构化数据(文本、JSON、日志)。
何时使用:
- 处理文本文件、日志或JSON记录
- 结构化分析前的数据清洗和ETL
- 处理不适合数组/DataFrame格式的Python对象
- 需要内存高效的流式处理
参考文档:有关Dask Bags的全面指南,请参阅references/bags.md,包括:
- 读取文本和JSON文件
- 功能操作(映射、过滤、折叠、分组)
- 转换为DataFrames
- 常见模式(日志分析、JSON处理、文本处理)
- 性能考虑
快速示例:
import dask.bag as db
import json
# 读取和解析JSON文件
bag = db.read_text('logs/*.json').map(json.loads)
# 过滤和转换
valid = bag.filter(lambda x: x['status'] == 'valid')
processed = valid.map(lambda x: {'id': x['id'], 'value': x['value']})
# 转换为DataFrame以进行分析
ddf = processed.to_dataframe()
关键点:
- 用于初始数据清洗,然后转换为DataFrame/Array
- 使用
foldby而非groupby以获得更好性能 - 操作是流式和内存高效的
- 为复杂操作转换为结构化格式(DataFrame)
4. Futures - 基于任务的并行化
目的:通过细粒度控制任务执行和依赖构建自定义并行工作流。
何时使用:
- 构建动态、演进的工作流
- 需要立即任务执行(非惰性)
- 计算依赖运行时条件
- 实现自定义并行算法
- 需要状态计算
参考文档:有关Dask Futures的全面指南,请参阅references/futures.md,包括:
- 设置分布式客户端
- 提交任务和使用futures
- 任务依赖和数据移动
- 高级协调(队列、锁、事件、actors)
- 常见模式(参数扫描、动态任务、迭代算法)
快速示例:
from dask.distributed import Client
client = Client() # 创建本地集群
# 提交任务(立即执行)
def process(x):
return x ** 2
futures = client.map(process, range(100))
# 收集结果
results = client.gather(futures)
client.close()
关键点:
- 需要分布式客户端(即使单机)
- 任务提交后立即执行
- 预分散大数据以避免重复传输
- 每个任务约1毫秒开销(不适合数百万个微小任务)
- 使用actors进行状态工作流
5. Schedulers - 执行后端
目的:控制Dask任务如何以及在哪里执行(线程、进程、分布式)。
何时选择调度器:
- 线程(默认):NumPy/Pandas操作、释放GIL的库、共享内存优势
- 进程:纯Python代码、文本处理、GIL绑定操作
- 同步:使用pdb调试、性能分析、理解错误
- 分布式:需要仪表板、多机集群、高级功能
参考文档:有关Dask Schedulers的全面指南,请参阅references/schedulers.md,包括:
- 详细调度器描述和特性
- 配置方法(全局、上下文管理器、每个计算)
- 性能考虑和开销
- 常见模式和故障排除
- 线程配置以优化性能
快速示例:
import dask
import dask.dataframe as dd
# 对DataFrame使用线程(默认,适用于数值)
ddf = dd.read_csv('data.csv')
result1 = ddf.mean().compute() # 使用线程
# 对Python密集型工作使用进程
import dask.bag as db
bag = db.read_text('logs/*.txt')
result2 = bag.map(python_function).compute(scheduler='processes')
# 对调试使用同步
dask.config.set(scheduler='synchronous')
result3 = problematic_computation.compute() # 可使用pdb
# 对监控和扩展使用分布式
from dask.distributed import Client
client = Client()
result4 = computation.compute() # 使用带仪表板的分布式
关键点:
- 线程:最低开销(约10微秒/任务),最适合数值工作
- 进程:避免GIL(约10毫秒/任务),最适合Python工作
- 分布式:监控仪表板(约1毫秒/任务),可扩展到集群
- 可按计算或全局切换调度器
最佳实践
有关全面的性能优化指南、内存管理策略和常见陷阱避免,请参阅references/best-practices.md。关键原则包括:
从简单解决方案开始
在使用Dask之前,探索:
- 更好的算法
- 高效文件格式(Parquet替代CSV)
- 编译代码(Numba、Cython)
- 数据采样
关键性能规则
1. 不要本地加载数据然后交给Dask
# 错误:先加载所有数据到内存
import pandas as pd
df = pd.read_csv('large.csv')
ddf = dd.from_pandas(df, npartitions=10)
# 正确:让Dask处理加载
import dask.dataframe as dd
ddf = dd.read_csv('large.csv')
2. 避免重复compute()调用
# 错误:每个compute是单独的
for item in items:
result = dask_computation(item).compute()
# 正确:对所有项目单次compute
computations = [dask_computation(item) for item in items]
results = dask.compute(*computations)
3. 不要构建过大的任务图
- 如果任务数百万,增加分块大小
- 使用
map_partitions/map_blocks融合操作 - 检查任务图大小:
len(ddf.__dask_graph__())
4. 选择适当的分块大小
- 目标:约100 MB每块(或工作内存中每核10块)
- 太大:内存溢出
- 太小:调度开销
5. 使用仪表板
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # 监控性能,识别瓶颈
常见工作流模式
ETL管道
import dask.dataframe as dd
# 提取:读取数据
ddf = dd.read_csv('raw_data/*.csv')
# 转换:清洗和处理
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])
# 加载:聚合和保存
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean']})
summary.to_parquet('output/summary.parquet')
非结构化到结构化管道
import dask.bag as db
import json
# 从Bag开始处理非结构化数据
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'valid')
# 转换为DataFrame进行结构化分析
ddf = bag.to_dataframe()
result = ddf.groupby('category').mean().compute()
大规模数组计算
import dask.array as da
# 加载或创建大数组
x = da.from_zarr('large_dataset.zarr')
# 在分块中处理
normalized = (x - x.mean()) / x.std()
# 保存结果
da.to_zarr(normalized, 'normalized.zarr')
自定义并行工作流
from dask.distributed import Client
client = Client()
# 一次性分散大数据集
data = client.scatter(large_dataset)
# 带依赖并行处理
futures = []
for param in parameters:
future = client.submit(process, data, param)
futures.append(future)
# 收集结果
results = client.gather(futures)
选择正确组件
使用此决策指南选择适当的Dask组件:
数据类型:
- 表格数据 → DataFrames
- 数值数组 → Arrays
- 文本/JSON/日志 → Bags(然后转换为DataFrame)
- 自定义Python对象 → Bags或Futures
操作类型:
- 标准pandas操作 → DataFrames
- 标准NumPy操作 → Arrays
- 自定义并行任务 → Futures
- 文本处理/ETL → Bags
控制级别:
- 高级、自动 → DataFrames/Arrays
- 低级、手动 → Futures
工作流类型:
- 静态计算图 → DataFrames/Arrays/Bags
- 动态、演进 → Futures
集成考虑
文件格式
- 高效:Parquet、HDF5、Zarr(列式、压缩、并行友好)
- 兼容但慢:CSV(仅用于初始摄取)
- 对数组:HDF5、Zarr、NetCDF
集合间转换
# Bag → DataFrame
ddf = bag.to_dataframe()
# DataFrame → Array(用于数值数据)
arr = ddf.to_dask_array(lengths=True)
# Array → DataFrame
ddf = dd.from_dask_array(arr, columns=['col1', 'col2'])
与其他库集成
- XArray:用标签维度包装Dask数组(地理空间、成像)
- Dask-ML:与scikit-learn兼容API的机器学习
- Distributed:高级集群管理和监控
调试和开发
迭代开发工作流
- 使用同步调度器测试小数据:
dask.config.set(scheduler='synchronous')
result = computation.compute() # 可使用pdb,易于调试
- 在线程上验证样本:
sample = ddf.head(1000) # 小样本
# 测试逻辑,然后扩展到全数据集
- 使用分布式进行监控扩展:
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # 监控性能
result = computation.compute()
常见问题
内存错误:
- 减小分块大小
- 策略性使用
persist()并在完成后删除 - 检查自定义函数中的内存泄漏
启动慢:
- 任务图太大(增加分块大小)
- 使用
map_partitions或map_blocks减少任务
并行化差:
- 分块太大(增加分区数)
- 对Python代码使用线程(切换到进程)
- 数据依赖阻碍并行
参考文件
所有参考文档文件可在需要详细信息时阅读:
references/dataframes.md- 完整的Dask DataFrame指南references/arrays.md- 完整的Dask Array指南references/bags.md- 完整的Dask Bag指南references/futures.md- 完整的Dask Futures和分布式计算指南references/schedulers.md- 完整的调度器选择和配置指南references/best-practices.md- 全面的性能优化和故障排除
当用户需要关于特定Dask组件、操作或模式的详细信息超出此处提供的快速指导时,请加载这些文件。