RayTrain分布式训练Skill ray-train

Ray Train 是一个分布式训练框架,用于在多节点集群上编排机器学习训练任务,支持PyTorch、TensorFlow和HuggingFace,内置超参数调优、容错和弹性扩展功能,适用于大规模模型训练和分布式超参数搜索。关键词:分布式训练,机器学习,PyTorch,TensorFlow,HuggingFace,超参数调优,容错,弹性扩展,多节点,Ray Tune。

机器学习 0 次安装 0 次浏览 更新于 3/21/2026

name: ray-train description: 跨集群的分布式训练编排。将PyTorch/TensorFlow/HuggingFace从笔记本电脑扩展到数千个节点。内置使用Ray Tune的超参数调优、容错性和弹性扩展。适用于在多台机器上训练大规模模型或运行分布式超参数扫描。 version: 1.0.0 author: Orchestra Research license: MIT tags: [Ray Train, 分布式训练, 编排, Ray, 超参数调优, 容错性, 弹性扩展, 多节点, PyTorch, TensorFlow] dependencies: [ray[train], torch, transformers]

Ray Train - 分布式训练编排

快速开始

Ray Train 以最小的代码更改将机器学习训练从单个GPU扩展到多节点集群。

安装:

pip install -U "ray[train]"

基本PyTorch训练(单节点):

import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn

# 定义训练函数
def train_func(config):
    # 您的正常PyTorch代码
    model = nn.Linear(10, 1)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    # 为分布式准备(Ray处理设备放置)
    model = train.torch.prepare_model(model)

    for epoch in range(10):
        # 您的训练循环
        output = model(torch.randn(32, 10))
        loss = output.sum()
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        # 报告指标(自动记录)
        train.report({"loss": loss.item(), "epoch": epoch})

# 运行分布式训练
trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(
        num_workers=4,  # 4个GPU/工作器
        use_gpu=True
    )
)

result = trainer.fit()
print(f"最终损失: {result.metrics['loss']}")

就这样! Ray处理:

  • 分布式协调
  • GPU分配
  • 容错性
  • 检查点
  • 指标聚合

常见工作流

工作流1:扩展现有PyTorch代码

原始单GPU代码:

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())

for epoch in range(epochs):
    for batch in dataloader:
        loss = model(batch)
        loss.backward()
        optimizer.step()

Ray Train版本(扩展到多GPU/多节点):

from ray.train.torch import TorchTrainer
from ray import train

def train_func(config):
    model = MyModel()
    optimizer = torch.optim.Adam(model.parameters())

    # 为分布式准备(自动设备放置)
    model = train.torch.prepare_model(model)
    dataloader = train.torch.prepare_data_loader(dataloader)

    for epoch in range(epochs):
        for batch in dataloader:
            loss = model(batch)
            loss.backward()
            optimizer.step()

            # 报告指标
            train.report({"loss": loss.item()})

# 扩展到8个GPU
trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
trainer.fit()

好处: 相同代码在1个GPU或1000个GPU上运行

工作流2:HuggingFace Transformers集成

from ray.train.huggingface import TransformersTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments

def train_func(config):
    # 加载模型和分词器
    model = AutoModelForCausalLM.from_pretrained("gpt2")
    tokenizer = AutoTokenizer.from_pretrained("gpt2")

    # 训练参数(HuggingFace API)
    training_args = TrainingArguments(
        output_dir="./output",
        num_train_epochs=3,
        per_device_train_batch_size=8,
        learning_rate=2e-5,
    )

    # Ray自动处理分布式训练
    from transformers import Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
    )

    trainer.train()

# 扩展到多节点(2个节点 × 8个GPU = 16个工作器)
trainer = TransformersTrainer(
    train_func,
    scaling_config=ScalingConfig(
        num_workers=16,
        use_gpu=True,
        resources_per_worker={"GPU": 1}
    )
)

result = trainer.fit()

工作流3:使用Ray Tune进行超参数调优

from ray import tune
from ray.train.torch import TorchTrainer
from ray.tune.schedulers import ASHAScheduler

def train_func(config):
    # 使用配置中的超参数
    lr = config["lr"]
    batch_size = config["batch_size"]

    model = MyModel()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    model = train.torch.prepare_model(model)

    for epoch in range(10):
        # 训练循环
        loss = train_epoch(model, optimizer, batch_size)
        train.report({"loss": loss, "epoch": epoch})

# 定义搜索空间
param_space = {
    "lr": tune.loguniform(1e-5, 1e-2),
    "batch_size": tune.choice([16, 32, 64, 128])
}

# 运行20个试验,带有早期停止
tuner = tune.Tuner(
    TorchTrainer(
        train_func,
        scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
    ),
    param_space=param_space,
    tune_config=tune.TuneConfig(
        num_samples=20,
        scheduler=ASHAScheduler(metric="loss", mode="min")
    )
)

results = tuner.fit()
best = results.get_best_result(metric="loss", mode="min")
print(f"最佳超参数: {best.config}")

结果: 跨集群的分布式超参数搜索

工作流4:检查点和容错性

from ray import train
from ray.train import Checkpoint

def train_func(config):
    model = MyModel()
    optimizer = torch.optim.Adam(model.parameters())

    # 尝试从检查点恢复
    checkpoint = train.get_checkpoint()
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            state = torch.load(f"{checkpoint_dir}/model.pt")
            model.load_state_dict(state["model"])
            optimizer.load_state_dict(state["optimizer"])
            start_epoch = state["epoch"]
    else:
        start_epoch = 0

    model = train.torch.prepare_model(model)

    for epoch in range(start_epoch, 100):
        loss = train_epoch(model, optimizer)

        # 每10个epoch保存检查点
        if epoch % 10 == 0:
            checkpoint = Checkpoint.from_directory(
                train.get_context().get_trial_dir()
            )
            torch.save({
                "model": model.state_dict(),
                "optimizer": optimizer.state_dict(),
                "epoch": epoch
            }, checkpoint.path / "model.pt")

            train.report({"loss": loss}, checkpoint=checkpoint)

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)

# 如果训练失败,自动从检查点恢复
result = trainer.fit()

工作流5:多节点训练

from ray.train import ScalingConfig

# 连接到Ray集群
ray.init(address="auto")  # 或 ray.init("ray://head-node:10001")

# 跨4个节点 × 8个GPU = 32个工作器训练
trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(
        num_workers=32,
        use_gpu=True,
        resources_per_worker={"GPU": 1, "CPU": 4},
        placement_strategy="SPREAD"  # 跨节点分布
    )
)

result = trainer.fit()

启动Ray集群:

# 在头节点上
ray start --head --port=6379

# 在工作节点上
ray start --address=<head-node-ip>:6379

何时使用与替代方案

使用Ray Train当:

  • 跨多台机器训练(多节点)
  • 需要大规模超参数调优
  • 想要容错性(自动重启失败的工作器)
  • 弹性扩展(训练期间添加/移除节点)
  • 统一框架(PyTorch/TF/HF的相同代码)

关键优势:

  • 多节点编排: 最简单的多节点设置
  • Ray Tune集成: 顶级的超参数调优
  • 容错性: 自动从故障恢复
  • 弹性: 无需重启即可添加/移除节点
  • 框架无关: PyTorch、TensorFlow、HuggingFace、XGBoost

使用替代方案代替:

  • Accelerate: 单节点多GPU,更简单
  • PyTorch Lightning: 高级抽象,回调
  • DeepSpeed: 最大性能,复杂设置
  • 原始DDP: 最大控制,最小开销

常见问题

问题: Ray集群未连接

检查ray状态:

ray status

# 应显示:
# - 节点: 4
# - GPU: 32
# - 工作器: 就绪

如果未连接:

# 重启头节点
ray stop
ray start --head --port=6379 --dashboard-host=0.0.0.0

# 重启工作节点
ray stop
ray start --address=<head-ip>:6379

问题: 内存不足

减少工作器或使用梯度累积:

scaling_config=ScalingConfig(
    num_workers=4,  # 从8减少
    use_gpu=True
)

# 在train_func中,累积梯度
for i, batch in enumerate(dataloader):
    loss = model(batch) / accumulation_steps
    loss.backward()

    if (i + 1) % accumulation_steps == 0:
        optimizer.step()
        optimizer.zero_grad()

问题: 训练缓慢

检查数据加载是否为瓶颈:

import time

def train_func(config):
    for epoch in range(epochs):
        start = time.time()
        for batch in dataloader:
            data_time = time.time() - start
            # 训练...
            start = time.time()
            print(f"数据加载: {data_time:.3f}秒")

如果数据加载缓慢,增加工作器:

dataloader = DataLoader(dataset, num_workers=8)

高级主题

多节点设置: 参考references/multi-node.md了解在AWS、GCP、Kubernetes和SLURM上部署Ray集群。

超参数调优: 参考references/hyperparameter-tuning.md了解Ray Tune集成、搜索算法(Optuna、HyperOpt)和基于群体的训练。

自定义训练循环: 参考references/custom-loops.md了解高级Ray Train使用、自定义后端以及与其他框架的集成。

硬件要求

  • 单节点: 1+个GPU(或CPU)
  • 多节点: 2+台具有网络连接的机器
  • : AWS、GCP、Azure(Ray自动扩展)
  • 本地: Kubernetes、SLURM集群

支持的加速器:

  • NVIDIA GPU(CUDA)
  • AMD GPU(ROCm)
  • TPU(Google Cloud)
  • CPU

资源