RayData分布式数据处理Skill ray-data

Ray Data 是一个用于机器学习和人工智能工作负载的可扩展分布式数据处理库,支持流式执行、GPU加速,并集成PyTorch、TensorFlow等框架。适用于大规模数据预处理、批量推断、多模态数据加载和分布式ETL管道。关键词:分布式数据处理、机器学习、AI、流式执行、GPU加速、ETL、Ray、数据预处理、批量推断。

ETL开发 0 次安装 2 次浏览 更新于 3/21/2026

名称: ray-data 描述: 用于机器学习工作负载的可扩展数据处理。支持在CPU/GPU上进行流式执行,支持Parquet/CSV/JSON/图像等格式。集成Ray Train、PyTorch、TensorFlow。从单机扩展到数百个节点。用于批量推断、数据预处理、多模态数据加载或分布式ETL管道。 版本: 1.0.0 作者: Orchestra Research 许可证: MIT 标签: [数据处理, Ray Data, 分布式计算, ML管道, 批量推断, ETL, 可扩展, Ray, PyTorch, TensorFlow] 依赖项: [ray[data], pyarrow, pandas]

Ray Data - 可扩展的机器学习数据处理

用于机器学习和人工智能工作负载的分布式数据处理库。

何时使用 Ray Data

在以下情况下使用 Ray Data:

  • 为机器学习训练处理大型数据集(>100GB)
  • 需要在集群上进行分布式数据预处理
  • 构建批量推断管道
  • 加载多模态数据(图像、音频、视频)
  • 从笔记本电脑扩展到集群进行数据处理

关键功能:

  • 流式执行:处理大于内存的数据
  • GPU支持:使用GPU加速变换
  • 框架集成:PyTorch、TensorFlow、HuggingFace
  • 多模态:图像、Parquet、CSV、JSON、音频、视频

使用替代方案:

  • Pandas:单机上的小数据(<1GB)
  • Dask:表格数据、类SQL操作
  • Spark:企业ETL、SQL查询

快速入门

安装

pip install -U 'ray[data]'

加载和转换数据

import ray

# 读取Parquet文件
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")

# 转换数据(惰性执行)
ds = ds.map_batches(lambda batch: {"processed": batch["text"].str.lower()})

# 消费数据
for batch in ds.iter_batches(batch_size=100):
    print(batch)

与Ray Train集成

import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

# 创建数据集
train_ds = ray.data.read_parquet("s3://bucket/train/*.parquet")

def train_func(config):
    # 在训练中访问数据集
    train_ds = ray.train.get_dataset_shard("train")

    for epoch in range(10):
        for batch in train_ds.iter_batches(batch_size=32):
            # 在批处理上训练
            pass

# 使用Ray训练
trainer = TorchTrainer(
    train_func,
    datasets={"train": train_ds},
    scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
trainer.fit()

读取数据

从云存储

import ray

# Parquet(推荐用于ML)
ds = ray.data.read_parquet("s3://bucket/data/*.parquet")

# CSV
ds = ray.data.read_csv("s3://bucket/data/*.csv")

# JSON
ds = ray.data.read_json("gs://bucket/data/*.json")

# 图像
ds = ray.data.read_images("s3://bucket/images/")

从Python对象

# 从列表
ds = ray.data.from_items([{"id": i, "value": i * 2} for i in range(1000)])

# 从范围
ds = ray.data.range(1000000)  # 合成数据

# 从pandas
import pandas as pd
df = pd.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]})
ds = ray.data.from_pandas(df)

转换

映射批处理(向量化)

# 批处理转换(快速)
def process_batch(batch):
    batch["doubled"] = batch["value"] * 2
    return batch

ds = ds.map_batches(process_batch, batch_size=1000)

行转换

# 逐行(较慢)
def process_row(row):
    row["squared"] = row["value"] ** 2
    return row

ds = ds.map(process_row)

过滤

# 过滤行
ds = ds.filter(lambda row: row["value"] > 100)

分组和聚合

# 按列分组
ds = ds.groupby("category").count()

# 自定义聚合
ds = ds.groupby("category").map_groups(lambda group: {"sum": group["value"].sum()})

GPU加速的转换

# 使用GPU进行预处理
def preprocess_images_gpu(batch):
    import torch
    images = torch.tensor(batch["image"]).cuda()
    # GPU预处理
    processed = images * 255
    return {"processed": processed.cpu().numpy()}

ds = ds.map_batches(
    preprocess_images_gpu,
    batch_size=64,
    num_gpus=1  # 请求GPU
)

写入数据

# 写入Parquet
ds.write_parquet("s3://bucket/output/")

# 写入CSV
ds.write_csv("output/")

# 写入JSON
ds.write_json("output/")

性能优化

重新分区

# 控制并行度
ds = ds.repartition(100)  # 100个块用于100核集群

批处理大小调优

# 更大的批处理 = 更快的向量化操作
ds.map_batches(process_fn, batch_size=10000)  # 对比 batch_size=100

流式执行

# 处理大于内存的数据
ds = ray.data.read_parquet("s3://huge-dataset/")
for batch in ds.iter_batches(batch_size=1000):
    process(batch)  # 流式处理,不加载到内存

常见模式

批量推断

import ray

# 加载模型
def load_model():
    # 每个工作器加载一次
    return MyModel()

# 推断函数
class BatchInference:
    def __init__(self):
        self.model = load_model()

    def __call__(self, batch):
        predictions = self.model(batch["input"])
        return {"prediction": predictions}

# 运行分布式推断
ds = ray.data.read_parquet("s3://data/")
predictions = ds.map_batches(BatchInference, batch_size=32, num_gpus=1)
predictions.write_parquet("s3://output/")

数据预处理管道

# 多步骤管道
ds = (
    ray.data.read_parquet("s3://raw/")
    .map_batches(clean_data)
    .map_batches(tokenize)
    .map_batches(augment)
    .write_parquet("s3://processed/")
)

与ML框架集成

PyTorch

# 转换为PyTorch
torch_ds = ds.to_torch(label_column="label", batch_size=32)

for batch in torch_ds:
    # batch是包含张量的字典
    inputs, labels = batch["features"], batch["label"]

TensorFlow

# 转换为TensorFlow
tf_ds = ds.to_tf(feature_columns=["image"], label_column="label", batch_size=32)

for features, labels in tf_ds:
    # 训练模型
    pass

支持的数据格式

格式 读取 写入 用例
Parquet ML数据(推荐)
CSV 表格数据
JSON 半结构化
图像 计算机视觉
NumPy 数组
Pandas 数据框

性能基准

扩展性(处理100GB数据):

  • 1个节点(16核):约30分钟
  • 4个节点(64核):约8分钟
  • 16个节点(256核):约2分钟

GPU加速(图像预处理):

  • 仅CPU:1,000 图像/秒
  • 1个GPU:5,000 图像/秒
  • 4个GPU:18,000 图像/秒

使用案例

生产部署:

  • Pinterest:模型训练的最后一英里数据处理
  • ByteDance:使用多模态LLM扩展离线推断
  • Spotify:批量推断的ML平台

参考资料

资源