名称: ray-data 描述: 用于机器学习工作负载的可扩展数据处理。支持在CPU/GPU上进行流式执行,支持Parquet/CSV/JSON/图像等格式。集成Ray Train、PyTorch、TensorFlow。从单机扩展到数百个节点。用于批量推断、数据预处理、多模态数据加载或分布式ETL管道。 版本: 1.0.0 作者: Orchestra Research 许可证: MIT 标签: [数据处理, Ray Data, 分布式计算, ML管道, 批量推断, ETL, 可扩展, Ray, PyTorch, TensorFlow] 依赖项: [ray[data], pyarrow, pandas]
Ray Data - 可扩展的机器学习数据处理
用于机器学习和人工智能工作负载的分布式数据处理库。
何时使用 Ray Data
在以下情况下使用 Ray Data:
- 为机器学习训练处理大型数据集(>100GB)
- 需要在集群上进行分布式数据预处理
- 构建批量推断管道
- 加载多模态数据(图像、音频、视频)
- 从笔记本电脑扩展到集群进行数据处理
关键功能:
- 流式执行:处理大于内存的数据
- GPU支持:使用GPU加速变换
- 框架集成:PyTorch、TensorFlow、HuggingFace
- 多模态:图像、Parquet、CSV、JSON、音频、视频
使用替代方案:
- Pandas:单机上的小数据(<1GB)
- Dask:表格数据、类SQL操作
- Spark:企业ETL、SQL查询
快速入门
安装
pip install -U 'ray[data]'
加载和转换数据
import ray
# 读取Parquet文件
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# 转换数据(惰性执行)
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})
# 消费数据
for batch in ds.iter_batches(batch_size=100):
print(batch)
与Ray Train集成
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# 创建数据集
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")
def train_func(config):
# 在训练中访问数据集
train_ds = ray.train.get_dataset_shard("train")
for epoch in range(10):
for batch in train_ds.iter_batches(batch_size=32):
# 在批处理上训练
pass
# 使用Ray训练
trainer = TorchTrainer(
train_func,
datasets={"train": train_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()
读取数据
从云存储
import ray
# Parquet(推荐用于ML)
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")
# CSV
ds = ray.data.read_csv("s3://bucket/data/*.csv")
# JSON
ds = ray.data.read_json("gs://bucket/data/*.json")
# 图像
ds = ray.data.read_images("s3://bucket/images/")
从Python对象
# 从列表
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])
# 从范围
ds = ray.data.range(1000000) # 合成数据
# 从pandas
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)
转换
映射批处理(向量化)
# 批处理转换(快速)
def process_batch(batch):
batch["doubled"] = batch["value"] * 2
return batch
ds = ds.map_batches(process_batch, batch_size=1000)
行转换
# 逐行(较慢)
def process_row(row):
row["squared"] = row["value"] ** 2
return row
ds = ds.map(process_row)
过滤
# 过滤行
ds = ds.filter(lambda row: row["value"] > 100)
分组和聚合
# 按列分组
ds = ds.groupby("category").count()
# 自定义聚合
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})
GPU加速的转换
# 使用GPU进行预处理
def preprocess_images_gpu(batch):
import torch
images = torch.tensor(batch["image"]).cuda()
# GPU预处理
processed = images * 255
return {"processed": processed.cpu().numpy()}
ds = ds.map_batches(
preprocess_images_gpu,
batch_size=64,
num_gpus=1 # 请求GPU
)
写入数据
# 写入Parquet
ds.write_parquet("s3://bucket/output/")
# 写入CSV
ds.write_csv("output/")
# 写入JSON
ds.write_json("output/")
性能优化
重新分区
# 控制并行度
ds = ds.repartition(100) # 100个块用于100核集群
批处理大小调优
# 更大的批处理 = 更快的向量化操作
ds.map_batches(process_fn, batch_size=10000) # 对比 batch_size=100
流式执行
# 处理大于内存的数据
ds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
process(batch) # 流式处理,不加载到内存
常见模式
批量推断
import ray
# 加载模型
def load_model():
# 每个工作器加载一次
return MyModel()
# 推断函数
class BatchInference:
def __init__(self):
self.model = load_model()
def __call__(self, batch):
predictions = self.model(batch["input"])
return {"prediction": predictions}
# 运行分布式推断
ds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")
数据预处理管道
# 多步骤管道
ds = (
ray.data.read_parquet("s3://raw/")
.map_batches(clean_data)
.map_batches(tokenize)
.map_batches(augment)
.write_parquet("s3://processed/")
)
与ML框架集成
PyTorch
# 转换为PyTorch
torch_ds = ds.to_torch(label_column="label", batch_size=32)
for batch in torch_ds:
# batch是包含张量的字典
inputs, labels = batch["features"], batch["label"]
TensorFlow
# 转换为TensorFlow
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)
for features, labels in tf_ds:
# 训练模型
pass
支持的数据格式
| 格式 | 读取 | 写入 | 用例 |
|---|---|---|---|
| Parquet | ✅ | ✅ | ML数据(推荐) |
| CSV | ✅ | ✅ | 表格数据 |
| JSON | ✅ | ✅ | 半结构化 |
| 图像 | ✅ | ❌ | 计算机视觉 |
| NumPy | ✅ | ✅ | 数组 |
| Pandas | ✅ | ❌ | 数据框 |
性能基准
扩展性(处理100GB数据):
- 1个节点(16核):约30分钟
- 4个节点(64核):约8分钟
- 16个节点(256核):约2分钟
GPU加速(图像预处理):
- 仅CPU:1,000 图像/秒
- 1个GPU:5,000 图像/秒
- 4个GPU:18,000 图像/秒
使用案例
生产部署:
- Pinterest:模型训练的最后一英里数据处理
- ByteDance:使用多模态LLM扩展离线推断
- Spotify:批量推断的ML平台
参考资料
资源
- 文档:https://docs.ray.io/en/latest/data/data.html
- GitHub:https://github.com/ray-project/ray ⭐ 36,000+
- 版本:Ray 2.40.0+
- 示例:https://docs.ray.io/en/latest/data/examples/overview.html