name: ray-train description: 跨集群的分布式训练编排。将PyTorch/TensorFlow/HuggingFace从笔记本电脑扩展到数千个节点。内置使用Ray Tune的超参数调优、容错性和弹性扩展。适用于在多台机器上训练大规模模型或运行分布式超参数扫描。 version: 1.0.0 author: Orchestra Research license: MIT tags: [Ray Train, 分布式训练, 编排, Ray, 超参数调优, 容错性, 弹性扩展, 多节点, PyTorch, TensorFlow] dependencies: [ray[train], torch, transformers]
Ray Train - 分布式训练编排
快速开始
Ray Train 以最小的代码更改将机器学习训练从单个GPU扩展到多节点集群。
安装:
pip install -U "ray[train]"
基本PyTorch训练(单节点):
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import torch
import torch.nn as nn
# 定义训练函数
def train_func(config):
# 您的正常PyTorch代码
model = nn.Linear(10, 1)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 为分布式准备(Ray处理设备放置)
model = train.torch.prepare_model(model)
for epoch in range(10):
# 您的训练循环
output = model(torch.randn(32, 10))
loss = output.sum()
loss.backward()
optimizer.step()
optimizer.zero_grad()
# 报告指标(自动记录)
train.report({"loss": loss.item(), "epoch": epoch})
# 运行分布式训练
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=4, # 4个GPU/工作器
use_gpu=True
)
)
result = trainer.fit()
print(f"最终损失: {result.metrics['loss']}")
就这样! Ray处理:
- 分布式协调
- GPU分配
- 容错性
- 检查点
- 指标聚合
常见工作流
工作流1:扩展现有PyTorch代码
原始单GPU代码:
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(epochs):
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
Ray Train版本(扩展到多GPU/多节点):
from ray.train.torch import TorchTrainer
from ray import train
def train_func(config):
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
# 为分布式准备(自动设备放置)
model = train.torch.prepare_model(model)
dataloader = train.torch.prepare_data_loader(dataloader)
for epoch in range(epochs):
for batch in dataloader:
loss = model(batch)
loss.backward()
optimizer.step()
# 报告指标
train.report({"loss": loss.item()})
# 扩展到8个GPU
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
trainer.fit()
好处: 相同代码在1个GPU或1000个GPU上运行
工作流2:HuggingFace Transformers集成
from ray.train.huggingface import TransformersTrainer
from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments
def train_func(config):
# 加载模型和分词器
model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
# 训练参数(HuggingFace API)
training_args = TrainingArguments(
output_dir="./output",
num_train_epochs=3,
per_device_train_batch_size=8,
learning_rate=2e-5,
)
# Ray自动处理分布式训练
from transformers import Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
)
trainer.train()
# 扩展到多节点(2个节点 × 8个GPU = 16个工作器)
trainer = TransformersTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=16,
use_gpu=True,
resources_per_worker={"GPU": 1}
)
)
result = trainer.fit()
工作流3:使用Ray Tune进行超参数调优
from ray import tune
from ray.train.torch import TorchTrainer
from ray.tune.schedulers import ASHAScheduler
def train_func(config):
# 使用配置中的超参数
lr = config["lr"]
batch_size = config["batch_size"]
model = MyModel()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
model = train.torch.prepare_model(model)
for epoch in range(10):
# 训练循环
loss = train_epoch(model, optimizer, batch_size)
train.report({"loss": loss, "epoch": epoch})
# 定义搜索空间
param_space = {
"lr": tune.loguniform(1e-5, 1e-2),
"batch_size": tune.choice([16, 32, 64, 128])
}
# 运行20个试验,带有早期停止
tuner = tune.Tuner(
TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
),
param_space=param_space,
tune_config=tune.TuneConfig(
num_samples=20,
scheduler=ASHAScheduler(metric="loss", mode="min")
)
)
results = tuner.fit()
best = results.get_best_result(metric="loss", mode="min")
print(f"最佳超参数: {best.config}")
结果: 跨集群的分布式超参数搜索
工作流4:检查点和容错性
from ray import train
from ray.train import Checkpoint
def train_func(config):
model = MyModel()
optimizer = torch.optim.Adam(model.parameters())
# 尝试从检查点恢复
checkpoint = train.get_checkpoint()
if checkpoint:
with checkpoint.as_directory() as checkpoint_dir:
state = torch.load(f"{checkpoint_dir}/model.pt")
model.load_state_dict(state["model"])
optimizer.load_state_dict(state["optimizer"])
start_epoch = state["epoch"]
else:
start_epoch = 0
model = train.torch.prepare_model(model)
for epoch in range(start_epoch, 100):
loss = train_epoch(model, optimizer)
# 每10个epoch保存检查点
if epoch % 10 == 0:
checkpoint = Checkpoint.from_directory(
train.get_context().get_trial_dir()
)
torch.save({
"model": model.state_dict(),
"optimizer": optimizer.state_dict(),
"epoch": epoch
}, checkpoint.path / "model.pt")
train.report({"loss": loss}, checkpoint=checkpoint)
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
)
# 如果训练失败,自动从检查点恢复
result = trainer.fit()
工作流5:多节点训练
from ray.train import ScalingConfig
# 连接到Ray集群
ray.init(address="auto") # 或 ray.init("ray://head-node:10001")
# 跨4个节点 × 8个GPU = 32个工作器训练
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=32,
use_gpu=True,
resources_per_worker={"GPU": 1, "CPU": 4},
placement_strategy="SPREAD" # 跨节点分布
)
)
result = trainer.fit()
启动Ray集群:
# 在头节点上
ray start --head --port=6379
# 在工作节点上
ray start --address=<head-node-ip>:6379
何时使用与替代方案
使用Ray Train当:
- 跨多台机器训练(多节点)
- 需要大规模超参数调优
- 想要容错性(自动重启失败的工作器)
- 弹性扩展(训练期间添加/移除节点)
- 统一框架(PyTorch/TF/HF的相同代码)
关键优势:
- 多节点编排: 最简单的多节点设置
- Ray Tune集成: 顶级的超参数调优
- 容错性: 自动从故障恢复
- 弹性: 无需重启即可添加/移除节点
- 框架无关: PyTorch、TensorFlow、HuggingFace、XGBoost
使用替代方案代替:
- Accelerate: 单节点多GPU,更简单
- PyTorch Lightning: 高级抽象,回调
- DeepSpeed: 最大性能,复杂设置
- 原始DDP: 最大控制,最小开销
常见问题
问题: Ray集群未连接
检查ray状态:
ray status
# 应显示:
# - 节点: 4
# - GPU: 32
# - 工作器: 就绪
如果未连接:
# 重启头节点
ray stop
ray start --head --port=6379 --dashboard-host=0.0.0.0
# 重启工作节点
ray stop
ray start --address=<head-ip>:6379
问题: 内存不足
减少工作器或使用梯度累积:
scaling_config=ScalingConfig(
num_workers=4, # 从8减少
use_gpu=True
)
# 在train_func中,累积梯度
for i, batch in enumerate(dataloader):
loss = model(batch) / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
问题: 训练缓慢
检查数据加载是否为瓶颈:
import time
def train_func(config):
for epoch in range(epochs):
start = time.time()
for batch in dataloader:
data_time = time.time() - start
# 训练...
start = time.time()
print(f"数据加载: {data_time:.3f}秒")
如果数据加载缓慢,增加工作器:
dataloader = DataLoader(dataset, num_workers=8)
高级主题
多节点设置: 参考references/multi-node.md了解在AWS、GCP、Kubernetes和SLURM上部署Ray集群。
超参数调优: 参考references/hyperparameter-tuning.md了解Ray Tune集成、搜索算法(Optuna、HyperOpt)和基于群体的训练。
自定义训练循环: 参考references/custom-loops.md了解高级Ray Train使用、自定义后端以及与其他框架的集成。
硬件要求
- 单节点: 1+个GPU(或CPU)
- 多节点: 2+台具有网络连接的机器
- 云: AWS、GCP、Azure(Ray自动扩展)
- 本地: Kubernetes、SLURM集群
支持的加速器:
- NVIDIA GPU(CUDA)
- AMD GPU(ROCm)
- TPU(Google Cloud)
- CPU
资源
- 文档: https://docs.ray.io/en/latest/train/train.html
- GitHub: https://github.com/ray-project/ray ⭐ 36,000+
- 版本: 2.40.0+
- 示例: https://docs.ray.io/en/latest/train/examples.html
- Slack: https://forms.gle/9TSdDYUgxYs8SA9e8
- 使用方: OpenAI、Uber、Spotify、Shopify、Instacart