模型训练 ModelTraining

此技能是关于使用PyTorch进行机器学习模型训练的全面指南。它涵盖从数据准备、训练循环设计、超参数优化到实验跟踪的完整工作流程。关键词包括机器学习、深度学习、PyTorch、模型训练、数据增强、超参数调整、实验跟踪、优化算法,旨在帮助开发者高效构建和调优AI模型,提升模型性能和可重复性。

深度学习 0 次安装 0 次浏览 更新于 3/5/2026

名称: 模型训练 描述: 使用PyTorch进行机器学习模型训练工作流程的全面指南,涵盖数据准备、训练循环、超参数调优和实验跟踪。

模型训练

概述

模型训练是教导机器学习模型基于数据做出预测或决策的过程。此技能涵盖全面的训练工作流程,包括管道设计、数据准备、训练循环、超参数调优、实验跟踪、检查点管理、早停、学习率调度、分布式训练和模型评估。

前提条件

  • 理解PyTorch和深度学习基础知识
  • 神经网络架构知识
  • 熟悉数据预处理和增强
  • 理解损失函数和优化器
  • 机器学习指标基础知识

关键概念

训练管道架构

  • 模块化设计: 数据、模型、优化器和训练逻辑的分离
  • 配置管理: 基于YAML的配置以实现可重复性
  • 检查点管理: 保存和加载模型状态
  • 早停: 通过早停防止过拟合
  • 实验跟踪: 记录指标和超参数

数据准备

  • 训练/验证/测试分割: 适当的模型评估数据划分
  • 自定义数据集: 实现PyTorch Dataset类
  • 数据加载器: 高效的数据加载,支持批处理和洗牌
  • 数据增强: 增加数据多样性以提高泛化能力

训练循环模式

  • 基本训练循环: 标准前向/后向传播
  • 混合精度训练: 使用FP16加速训练
  • 梯度累积: 模拟更大的批大小
  • 分布式训练: 多GPU和多节点训练

超参数调优

  • 网格搜索: 参数空间中的穷举搜索
  • 随机搜索: 参数的随机采样
  • 贝叶斯优化: 使用Optuna的智能参数探索

学习率调度

  • StepLR: 周期性学习率衰减
  • CosineAnnealingLR: 余弦退火调度
  • ReduceLROnPlateau: 基于指标的自适应学习率
  • OneCycleLR: 单周期学习率策略

实现指南

训练管道设计

管道架构

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from pathlib import Path
import yaml
from typing import Dict, Any, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TrainingPipeline:
    """具有模块化的完整训练管道。"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = None
        self.train_loader = None
        self.val_loader = None
        self.optimizer = None
        self.scheduler = None
        self.criterion = None

    def setup_data(self):
        """设置数据加载器。"""
        raise NotImplementedError

    def setup_model(self):
        """设置模型架构。"""
        raise NotImplementedError

    def setup_optimizer(self):
        """设置优化器和调度器。"""
        raise NotImplementedError

    def train_epoch(self, epoch: int):
        """训练一个轮次。"""
        raise NotImplementedError

    def validate(self, epoch: int):
        """验证模型。"""
        raise NotImplementedError

    def train(self):
        """主训练循环。"""
        logger.info(f"Starting training on {self.device}")

        best_metric = float('inf') if self.config.get('minimize_metric', True) else 0.0
        patience_counter = 0

        for epoch in range(self.config['epochs']):
            # 训练
            train_metrics = self.train_epoch(epoch)

            # 验证
            val_metrics = self.validate(epoch)

            # 记录指标
            self._log_metrics(epoch, train_metrics, val_metrics)

            # 学习率调度
            if self.scheduler:
                if isinstance(self.scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
                    self.scheduler.step(val_metrics[self.config['monitor_metric']])
                else:
                    self.scheduler.step()

            # 保存检查点
            self._save_checkpoint(epoch, val_metrics)

            # 早停
            current_metric = val_metrics[self.config['monitor_metric']]
            is_better = (current_metric < best_metric) if self.config.get('minimize_metric', True) \
                       else (current_metric > best_metric)

            if is_better:
                best_metric = current_metric
                patience_counter = 0
                self._save_best_model(epoch, val_metrics)
            else:
                patience_counter += 1
                if patience_counter >= self.config.get('early_stopping_patience', 10):
                    logger.info(f"Early stopping at epoch {epoch}")
                    break

    def _log_metrics(self, epoch: int, train_metrics: Dict, val_metrics: Dict):
        """记录训练和验证指标。"""
        logger.info(f"Epoch {epoch}: Train {train_metrics} | Val {val_metrics}")

    def _save_checkpoint(self, epoch: int, metrics: Dict):
        """保存训练检查点。"""
        checkpoint = {
            'epoch': epoch,
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'scheduler_state_dict': self.scheduler.state_dict() if self.scheduler else None,
            'metrics': metrics,
            'config': self.config
        }

        path = Path(self.config['checkpoint_dir']) / f"checkpoint_epoch_{epoch}.pt"
        path.parent.mkdir(parents=True, exist_ok=True)
        torch.save(checkpoint, path)

    def _save_best_model(self, epoch: int, metrics: Dict):
        """保存最佳模型。"""
        path = Path(self.config['checkpoint_dir']) / "best_model.pt"
        path.parent.mkdir(parents=True, exist_ok=True)

        torch.save({
            'model_state_dict': self.model.state_dict(),
            'epoch': epoch,
            'metrics': metrics
        }, path)

    def load_checkpoint(self, checkpoint_path: str):
        """加载训练检查点。"""
        checkpoint = torch.load(checkpoint_path, map_location=self.device)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        if self.scheduler and checkpoint['scheduler_state_dict']:
            self.scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
        return checkpoint['epoch'], checkpoint['metrics']

配置管理

# config.yaml
model:
  name: "resnet50"
  num_classes: 10
  pretrained: true

data:
  data_dir: "./data"
  batch_size: 32
  num_workers: 4
  train_split: 0.8
  val_split: 0.1
  test_split: 0.1

training:
  epochs: 100
  learning_rate: 0.001
  weight_decay: 0.0001
  momentum: 0.9

optimizer:
  type: "Adam"
  betas: [0.9, 0.999]

scheduler:
  type: "CosineAnnealingLR"
  T_max: 100
  eta_min: 0.00001

early_stopping:
  patience: 10
  monitor_metric: "val_loss"
  minimize_metric: true

checkpoint_dir: "./checkpoints"
log_dir: "./logs"
import yaml

def load_config(config_path: str) -> Dict:
    """从YAML文件加载配置。"""
    with open(config_path, 'r') as f:
        return yaml.safe_load(f)

# 使用
config = load_config("config.yaml")
pipeline = TrainingPipeline(config)

数据准备

训练/验证/测试分割

from torch.utils.data import random_split, DataLoader
from sklearn.model_selection import train_test_split
import numpy as np

def create_splits(dataset, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1, seed=42):
    """创建训练/验证/测试分割。"""
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6, "比例总和必须为1"

    total_size = len(dataset)
    train_size = int(train_ratio * total_size)
    val_size = int(val_ratio * total_size)
    test_size = total_size - train_size - val_size

    train_dataset, val_dataset, test_dataset = random_split(
        dataset,
        [train_size, val_size, test_size],
        generator=torch.Generator().manual_seed(seed)
    )

    return train_dataset, val_dataset, test_dataset

def stratified_split(dataset, labels, train_ratio=0.8, val_ratio=0.1, seed=42):
    """为分类创建分层分割。"""
    train_indices, temp_indices = train_test_split(
        np.arange(len(dataset)),
        test_size=(1 - train_ratio),
        stratify=labels,
        random_state=seed
    )

    val_ratio_adjusted = val_ratio / (val_ratio + (1 - train_ratio - val_ratio))
    val_indices, test_indices = train_test_split(
        temp_indices,
        test_size=(1 - val_ratio_adjusted),
        stratify=[labels[i] for i in temp_indices],
        random_state=seed
    )

    return train_indices, val_indices, test_indices

自定义数据集

from torch.utils.data import Dataset
from PIL import Image
import json
from pathlib import Path

class CustomImageDataset(Dataset):
    """自定义图像数据集。"""

    def __init__(self, data_dir, transform=None, split='train'):
        self.data_dir = Path(data_dir)
        self.transform = transform
        self.split = split

        # 加载注释
        annotation_file = self.data_dir / f"{split}_annotations.json"
        with open(annotation_file, 'r') as f:
            self.annotations = json.load(f)

        self.image_paths = list(self.annotations.keys())

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # 加载图像
        image_path = self.data_dir / self.split / "images" / self.image_paths[idx]
        image = Image.open(image_path).convert('RGB')

        # 获取标签
        label = self.annotations[self.image_paths[idx]]['label']

        # 应用变换
        if self.transform:
            image = self.transform(image)

        return image, label

class CustomTextDataset(Dataset):
    """自定义文本数据集。"""

    def __init__(self, texts, labels, tokenizer, max_length=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

数据加载器

from torchvision import transforms

def create_data_loaders(config):
    """使用变换创建数据加载器。"""

    # 定义变换
    train_transform = transforms.Compose([
        transforms.Resize((config['data']['image_size'], config['data']['image_size'])),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(degrees=15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    val_transform = transforms.Compose([
        transforms.Resize((config['data']['image_size'], config['data']['image_size'])),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # 创建数据集
    full_dataset = CustomImageDataset(
        config['data']['data_dir'],
        transform=None
    )

    train_dataset, val_dataset, test_dataset = create_splits(full_dataset)

    # 应用变换
    train_dataset.dataset.transform = train_transform
    val_dataset.dataset.transform = val_transform
    test_dataset.dataset.transform = val_transform

    # 创建数据加载器
    train_loader = DataLoader(
        train_dataset,
        batch_size=config['data']['batch_size'],
        shuffle=True,
        num_workers=config['data']['num_workers'],
        pin_memory=True,
        drop_last=True
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=config['data']['batch_size'],
        shuffle=False,
        num_workers=config['data']['num_workers'],
        pin_memory=True
    )

    test_loader = DataLoader(
        test_dataset,
        batch_size=config['data']['batch_size'],
        shuffle=False,
        num_workers=config['data']['num_workers'],
        pin_memory=True
    )

    return train_loader, val_loader, test_loader

数据增强

from torchvision import transforms
import albumentations as A
from albumentations.pytorch import ToTensorV2

# PyTorch变换
advanced_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(30),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
    transforms.RandomGrayscale(p=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Albumentations变换
albumentations_transform = A.Compose([
    A.Resize(256, 256),
    A.RandomCrop(224, 224),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.2),
    A.Rotate(limit=30, p=0.5),
    A.OneOf([
        A.GaussNoise(p=1.0),
        A.ISONoise(p=1.0),
    ], p=0.2),
    A.OneOf([
        A.MotionBlur(p=1.0),
        A.MedianBlur(p=1.0),
        A.GaussianBlur(p=1.0),
    ], p=0.2),
    A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=15, p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.HueSaturationValue(p=0.3),
    A.Cutout(num_holes=8, max_h_size=16, max_w_size=16, p=0.3),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

class AlbumentationsDataset(Dataset):
    """使用Albumentations变换的数据集。"""

    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image=image)['image']

        return image, label

训练循环模式

基本训练循环

import torch
import torch.nn as nn
from tqdm import tqdm

def train_epoch(model, train_loader, criterion, optimizer, device):
    """训练一个轮次。"""
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0

    pbar = tqdm(train_loader, desc="训练")
    for batch_idx, (inputs, targets) in enumerate(pbar):
        inputs, targets = inputs.to(device), targets.to(device)

        # 前向传播
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # 后向传播
        loss.backward()
        optimizer.step()

        # 指标
        total_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

        # 更新进度条
        pbar.set_postfix({
            'loss': total_loss / (batch_idx + 1),
            'acc': 100. * correct / total
        })

    avg_loss = total_loss / len(train_loader)
    accuracy = 100. * correct / total

    return {'loss': avg_loss, 'accuracy': accuracy}

验证循环

def validate(model, val_loader, criterion, device):
    """验证模型。"""
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, targets in tqdm(val_loader, desc="验证"):
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, targets)

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

    avg_loss = total_loss / len(val_loader)
    accuracy = 100. * correct / total

    return {'loss': avg_loss, 'accuracy': accuracy}

使用混合精度训练

from torch.cuda.amp import autocast, GradScaler

def train_epoch_amp(model, train_loader, criterion, optimizer, device, scaler=None):
    """使用自动混合精度训练。"""
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0

    if scaler is None:
        scaler = GradScaler()

    for inputs, targets in tqdm(train_loader, desc="训练(AMP)"):
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()

        with autocast():
            outputs = model(inputs)
            loss = criterion(outputs, targets)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    avg_loss = total_loss / len(train_loader)
    accuracy = 100. * correct / total

    return {'loss': avg_loss, 'accuracy': accuracy}

使用梯度累积训练

def train_epoch_accumulation(model, train_loader, criterion, optimizer, device, accumulation_steps=4):
    """使用梯度累积训练以获取更大的有效批大小。"""
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0

    optimizer.zero_grad()

    for batch_idx, (inputs, targets) in enumerate(tqdm(train_loader, desc="训练")):
        inputs, targets = inputs.to(device), targets.to(device)

        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, targets) / accumulation_steps

        # 后向传播
        loss.backward()

        # 累积梯度
        if (batch_idx + 1) % accumulation_steps == 0:
            optimizer.step()
            optimizer.zero_grad()

        # 指标
        total_loss += loss.item() * accumulation_steps
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    # 处理剩余批次
    if (batch_idx + 1) % accumulation_steps != 0:
        optimizer.step()
        optimizer.zero_grad()

    avg_loss = total_loss / len(train_loader)
    accuracy = 100. * correct / total

    return {'loss': avg_loss, 'accuracy': accuracy}

超参数调优

网格搜索

import itertools
from copy import deepcopy

def grid_search(model_class, train_loader, val_loader, param_grid, device):
    """在超参数上执行网格搜索。"""
    results = []

    # 生成所有组合
    keys, values = zip(*param_grid.items())
    combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]

    for params in combinations:
        print(f"
测试参数: {params}")

        # 使用当前参数创建模型
        model = model_class(**params)
        model = model.to(device)

        optimizer = torch.optim.Adam(
            model.parameters(),
            lr=params.get('learning_rate', 0.001)
        )

        criterion = nn.CrossEntropyLoss()

        # 训练几个轮次
        val_acc = 0
        for epoch in range(5):  # 快速训练
            train_metrics = train_epoch(model, train_loader, criterion, optimizer, device)
            val_metrics = validate(model, val_loader, criterion, device)
            val_acc = val_metrics['accuracy']

        results.append({
            'params': params,
            'val_accuracy': val_acc
        })

    # 按验证准确率排序
    results.sort(key=lambda x: x['val_accuracy'], reverse=True)

    return results

# 使用
param_grid = {
    'learning_rate': [0.001, 0.0001, 0.00001],
    'dropout': [0.1, 0.3, 0.5],
    'hidden_size': [128, 256, 512]
}

results = grid_search(MyModel, train_loader, val_loader, param_grid, device)
print(f"最佳参数: {results[0]['params']}")

随机搜索

import random

def random_search(model_class, train_loader, val_loader, param_ranges, n_iterations=20, device='cuda'):
    """在超参数上执行随机搜索。"""
    results = []

    for i in range(n_iterations):
        # 采样随机参数
        params = {}
        for key, (min_val, max_val, is_log) in param_ranges.items():
            if is_log:
                params[key] = 10 ** random.uniform(min_val, max_val)
            else:
                params[key] = random.uniform(min_val, max_val)

        print(f"
迭代 {i+1}/{n_iterations}: {params}")

        # 训练和评估
        model = model_class(**params)
        model = model.to(device)

        optimizer = torch.optim.Adam(model.parameters(), lr=params['learning_rate'])
        criterion = nn.CrossEntropyLoss()

        val_acc = 0
        for epoch in range(5):
            train_metrics = train_epoch(model, train_loader, criterion, optimizer, device)
            val_metrics = validate(model, val_loader, criterion, device)
            val_acc = val_metrics['accuracy']

        results.append({
            'params': params,
            'val_accuracy': val_acc
        })

    results.sort(key=lambda x: x['val_accuracy'], reverse=True)
    return results

# 使用
param_ranges = {
    'learning_rate': (-4, -2, True),  # 对数尺度: 10^-4 到 10^-2
    'dropout': (0.1, 0.5, False),
    'hidden_size': (128, 512, False)
}

使用Optuna进行贝叶斯优化

import optuna

def objective(trial, model_class, train_loader, val_loader, device):
    """Optuna目标函数。"""

    # 建议超参数
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
    dropout = trial.suggest_float('dropout', 0.1, 0.5)
    hidden_size = trial.suggest_categorical('hidden_size', [128, 256, 512])

    # 创建模型
    model = model_class(dropout=dropout, hidden_size=hidden_size)
    model = model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()

    # 训练
    best_val_acc = 0
    for epoch in range(10):
        train_metrics = train_epoch(model, train_loader, criterion, optimizer, device)
        val_metrics = validate(model, val_loader, criterion, device)
        val_acc = val_metrics['accuracy']

        if val_acc > best_val_acc:
            best_val_acc = val_acc

        # 剪枝不理想的试验
        trial.report(val_acc, epoch)
        if trial.should_prune():
            raise optuna.TrialPruned()

    return best_val_acc

def run_optuna_study(model_class, train_loader, val_loader, device, n_trials=50):
    """运行Optuna研究。"""
    study = optuna.create_study(direction='maximize')

    study.optimize(
        lambda trial: objective(trial, model_class, train_loader, val_loader, device),
        n_trials=n_trials,
        timeout=None,
        show_progress_bar=True
    )

    print("
最佳试验:")
    trial = study.best_trial
    print(f"  值: {trial.value}")
    print(f"  参数: {trial.params}")

    return study

# 使用
study = run_optuna_study(MyModel, train_loader, val_loader, device, n_trials=50)

实验跟踪

MLflow集成

import mlflow
import mlflow.pytorch
from mlflow.tracking import MlflowClient

class MLflowTracker:
    """MLflow实验跟踪器。"""

    def __init__(self, experiment_name, tracking_uri=None):
        if tracking_uri:
            mlflow.set_tracking_uri(tracking_uri)

        self.experiment = mlflow.set_experiment(experiment_name)
        self.run = None

    def start_run(self, run_name=None, params=None):
        """开始MLflow运行。"""
        self.run = mlflow.start_run(run_name=run_name)

        if params:
            mlflow.log_params(params)

    def log_metrics(self, metrics, step=None):
        """记录指标。"""
        mlflow.log_metrics(metrics, step=step)

    def log_model(self, model, model_name="model"):
        """记录PyTorch模型。"""
        mlflow.pytorch.log_model(model, model_name)

    def log_artifact(self, file_path):
        """记录工件。"""
        mlflow.log_artifact(file_path)

    def end_run(self, status="FINISHED"):
        """结束MLflow运行。"""
        mlflow.end_run(status=status)

# 使用
tracker = MLflowTracker("my_experiment")

tracker.start_run(run_name="experiment_1", params={
    'learning_rate': 0.001,
    'batch_size': 32,
    'epochs': 100
})

for epoch in range(100):
    train_metrics = train_epoch(model, train_loader, criterion, optimizer, device)
    val_metrics = validate(model, val_loader, criterion, device)

    tracker.log_metrics({f"train_{k}": v for k, v in train_metrics.items()}, step=epoch)
    tracker.log_metrics({f"val_{k}": v for k, v in val_metrics.items()}, step=epoch)

tracker.log_model(model)
tracker.end_run()

Weights & Biases集成

import wandb

class WandBTracker:
    """Weights & Biases跟踪器。"""

    def __init__(self, project_name, config=None):
        wandb.init(project=project_name, config=config)
        self.config = wandb.config

    def log_metrics(self, metrics, step=None):
        """记录指标。"""
        wandb.log(metrics, step=step)

    def log_model(self, model, model_name="model"):
        """记录模型。"""
        torch.save(model.state_dict(), f"{model_name}.pt")
        wandb.save(f"{model_name}.pt")

    def log_image(self, image, caption):
        """记录图像。"""
        wandb.log({caption: wandb.Image(image)})

    def finish(self):
        """完成W&B运行。"""
        wandb.finish()

# 使用
tracker = WandBTracker("my_project", config={
    'learning_rate': 0.001,
    'batch_size': 32,
    'epochs': 100
})

for epoch in range(100):
    train_metrics = train_epoch(model, train_loader, criterion, optimizer, device)
    val_metrics = validate(model, val_loader, criterion, device)

    tracker.log_metrics({**{f"train_{k}": v for k, v in train_metrics.items()},
                        **{f"val_{k}": v for k, v in val_metrics.items()}}, step=epoch)

tracker.log_model(model)
tracker.finish()

TensorBoard集成

from torch.utils.tensorboard import SummaryWriter
import torchvision

class TensorBoardTracker:
    """TensorBoard跟踪器。"""

    def __init__(self, log_dir):
        self.writer = SummaryWriter(log_dir)

    def log_metrics(self, metrics, step, prefix=""):
        """记录指标。"""
        for key, value in metrics.items():
            self.writer.add_scalar(f"{prefix}/{key}", value, step)

    def log_images(self, images, step, tag="images"):
        """记录图像。"""
        grid = torchvision.utils.make_grid(images)
        self.writer.add_image(tag, grid, step)

    def log_model_graph(self, model, inputs):
        """记录模型图。"""
        self.writer.add_graph(model, inputs)

    def log_histograms(self, model, step):
        """记录参数直方图。"""
        for name, param in model.named_parameters():
            self.writer.add_histogram(name, param, step)

    def close(self):
        """关闭写入器。"""
        self.writer.close()

# 使用
tracker = TensorBoardTracker("./logs")

for epoch in range(100):
    train_metrics = train_epoch(model, train_loader, criterion, optimizer, device)
    val_metrics = validate(model, val_loader, criterion, device)

    tracker.log_metrics(train_metrics, epoch, "train")
    tracker.log_metrics(val_metrics, epoch, "val")
    tracker.log_histograms(model, epoch)

tracker.close()

检查点管理

检查点保存和加载

import os
from pathlib import Path
import shutil

class CheckpointManager:
    """管理模型检查点。"""

    def __init__(self, checkpoint_dir, max_to_keep=5):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
        self.max_to_keep = max_to_keep
        self.checkpoints = []

    def save_checkpoint(self, model, optimizer, scheduler, epoch, metrics, filename=None):
        """保存检查点。"""
        if filename is None:
            filename = f"checkpoint_epoch_{epoch}.pt"

        checkpoint_path = self.checkpoint_dir / filename

        checkpoint = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'scheduler_state_dict': scheduler.state_dict() if scheduler else None,
            'metrics': metrics
        }

        torch.save(checkpoint, checkpoint_path)
        self.checkpoints.append(checkpoint_path)

        # 仅保留max_to_keep个检查点
        if len(self.checkpoints) > self.max_to_keep:
            oldest = self.checkpoints.pop(0)
            if oldest.exists():
                oldest.unlink()

        return checkpoint_path

    def load_checkpoint(self, checkpoint_path, model, optimizer=None, scheduler=None):
        """加载检查点。"""
        checkpoint = torch.load(checkpoint_path, map_location='cpu')

        model.load_state_dict(checkpoint['model_state_dict'])

        if optimizer and 'optimizer_state_dict' in checkpoint:
            optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

        if scheduler and 'scheduler_state_dict' in checkpoint and checkpoint['scheduler_state_dict']:
            scheduler.load_state_dict(checkpoint['scheduler_state_dict'])

        return checkpoint['epoch'], checkpoint.get('metrics', {})

    def get_latest_checkpoint(self):
        """获取最新检查点路径。"""
        if not self.checkpoints:
            return None
        return max(self.checkpoints, key=os.path.getctime)

    def get_best_checkpoint(self, metric_name, minimize=True):
        """获取具有最佳指标的检查点。"""
        best_checkpoint = None
        best_value = float('inf') if minimize else float('-inf')

        for checkpoint_path in self.checkpoints:
            checkpoint = torch.load(checkpoint_path)
            value = checkpoint['metrics'].get(metric_name)

            if value is not None:
                if (minimize and value < best_value) or (not minimize and value > best_value):
                    best_value = value
                    best_checkpoint = checkpoint_path

        return best_checkpoint

早停

class EarlyStopping:
    """早停,当验证指标停止改善时停止训练。"""

    def __init__(self, patience=10, min_delta=0, mode='min', verbose=True):
        """
        参数:
            patience: 停止前等待的轮次数
            min_delta: 被视为改进的最小变化
            mode: 'min'用于最小化指标,'max'用于最大化指标
            verbose: 打印消息
        """
        self.patience = patience
        self.min_delta = min_delta
        self.mode = mode
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, metric):
        """检查是否应停止训练。"""
        if self.best_score is None:
            self.best_score = metric
        elif self._is_improvement(metric):
            self.best_score = metric
            self.counter = 0
        else:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True

        return self.early_stop

    def _is_improvement(self, metric):
        """检查指标是否改进。"""
        if self.mode == 'min':
            return metric < self.best_score - self.min_delta
        else:
            return metric > self.best_score + self.min_delta

# 使用
early_stopping = EarlyStopping(patience=10, mode='min')

for epoch in range(100):
    train_metrics = train_epoch(model, train_loader, criterion, optimizer, device)
    val_metrics = validate(model, val_loader, criterion, device)

    if early_stopping(val_metrics['loss']):
        print(f'Early stopping at epoch {epoch}')
        break

学习率调度

常见调度器

import torch.optim as optim

# StepLR - 每step_size轮次按gamma衰减LR
scheduler_step = optim.lr_scheduler.StepLR(
    optimizer,
    step_size=30,
    gamma=0.1
)

# ExponentialLR - 每轮次按gamma衰减LR
scheduler_exp = optim.lr_scheduler.ExponentialLR(
    optimizer,
    gamma=0.95
)

# CosineAnnealingLR - 余弦退火
scheduler_cosine = optim.lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=100,
    eta_min=1e-6
)

# ReduceLROnPlateau - 当指标平台时减少LR
scheduler_plateau = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.1,
    patience=5,
    verbose=True
)

# OneCycleLR - 单周期策略
scheduler_onecycle = optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=0.01,
    total_steps=1000,
    pct_start=0.3,
    anneal_strategy='cos'
)

# CosineAnnealingWarmRestarts
scheduler_warm = optim.lr_scheduler.CosineAnnealingWarmRestarts(
    optimizer,
    T_0=10,
    T_mult=2
)

自定义调度器

from torch.optim.lr_scheduler import _LRScheduler
import numpy as np

class WarmupCosineScheduler(_LRScheduler):
    """具有预热和余弦退火的学习率调度器。"""

    def __init__(self, optimizer, warmup_epochs, max_epochs, min_lr=0, max_lr=None):
        self.warmup_epochs = warmup_epochs
        self.max_epochs = max_epochs
        self.min_lr = min_lr
        self.max_lr = max_lr if max_lr else optimizer.param_groups[0]['lr']
        super().__init__(optimizer)

    def get_lr(self):
        if self.last_epoch < self.warmup_epochs:
            # 线性预热
            return [self.max_lr * (self.last_epoch + 1) / self.warmup_epochs
                    for _ in self.base_lrs]
        else:
            # 余弦退火
            progress = (self.last_epoch - self.warmup_epochs) / (self.max_epochs - self.warmup_epochs)
            cosine_factor = 0.5 * (1 + np.cos(np.pi * progress))
            return [self.min_lr + (self.max_lr - self.min_lr) * cosine_factor
                    for _ in self.base_lrs]

# 使用
scheduler = WarmupCosineScheduler(
    optimizer,
    warmup_epochs=10,
    max_epochs=100,
    min_lr=1e-6
)

分布式训练

DataParallel(单节点,多GPU)

import torch.nn as nn

# 使用DataParallel包装模型
if torch.cuda.device_count() > 1:
    print(f"使用 {torch.cuda.device_count()} 个GPU")
    model = nn.DataParallel(model)

model = model.to(device)

# 训练循环保持不变
for epoch in range(epochs):
    train_metrics = train_epoch(model, train_loader, criterion, optimizer, device)
    val_metrics = validate(model, val_loader, criterion, device)

DistributedDataParallel(多节点,多GPU)

import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler

def setup_distributed():
    """设置分布式训练。"""
    dist.init_process_group(backend='nccl')
    local_rank = int(os.environ['LOCAL_RANK'])
    torch.cuda.set_device(local_rank)
    return local_rank

def cleanup_distributed():
    """清理分布式训练。"""
    dist.destroy_process_group()

def create_dataloader_distributed(dataset, batch_size, num_workers, rank, world_size):
    """创建分布式数据加载器。"""
    sampler = DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True
    )

    dataloader = DataLoader(
        dataset,
        batch_size=batch_size,
        sampler=sampler,
        num_workers=num_workers,
        pin_memory=True
    )

    return dataloader, sampler

# 训练脚本
def train_distributed(rank, world_size, config):
    """分布式训练函数。"""
    local_rank = setup_distributed()
    device = torch.device(f"cuda:{local_rank}")

    # 创建模型
    model = MyModel().to(device)
    model = DDP(model, device_ids=[local_rank])

    # 创建数据加载器
    train_dataset = CustomDataset(...)
    train_loader, train_sampler = create_dataloader_distributed(
        train_dataset, config['batch_size'], config['num_workers'], rank, world_size
    )

    # 训练循环
    for epoch in range(config['epochs']):
        train_sampler.set_epoch(epoch)

        for batch_idx, (inputs, targets) in enumerate(train_loader):
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

    cleanup_distributed()

# 使用torchrun启动
# torchrun --nproc_per_node=4 train_script.py

模型评估

分类指标

from sklearn.metrics import (
    accuracy_score, precision_score, recall_score,
    f1_score, confusion_matrix, classification_report
)
import numpy as np

def evaluate_classification(model, dataloader, device, num_classes):
    """评估分类模型。"""
    model.eval()

    all_predictions = []
    all_targets = []
    all_probabilities = []

    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs)
            probabilities = torch.softmax(outputs, dim=1)
            _, predictions = outputs.max(1)

            all_predictions.extend(predictions.cpu().numpy())
            all_targets.extend(targets.cpu().numpy())
            all_probabilities.extend(probabilities.cpu().numpy())

    # 转换为numpy数组
    predictions = np.array(all_predictions)
    targets = np.array(all_targets)
    probabilities = np.array(all_probabilities)

    # 计算指标
    metrics = {
        'accuracy': accuracy_score(targets, predictions),
        'precision': precision_score(targets, predictions, average='weighted'),
        'recall': recall_score(targets, predictions, average='weighted'),
        'f1': f1_score(targets, predictions, average='weighted'),
        'confusion_matrix': confusion_matrix(targets, predictions).tolist(),
        'classification_report': classification_report(targets, predictions, output_dict=True)
    }

    return metrics, predictions, probabilities

对象检测指标

from collections import defaultdict
import numpy as np

def calculate_iou(box1, box2):
    """计算两个框之间的IoU。"""
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    intersection = max(0, x2 - x1) * max(0, y2 - y1)

    area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])

    union = area1 + area2 - intersection

    return intersection / union if union > 0 else 0

def calculate_ap(predictions, targets, iou_threshold=0.5, num_classes=80):
    """计算对象检测的平均精度。"""
    ap_per_class = []

    for class_id in range(num_classes):
        # 过滤此类的预测和目标
        class_preds = [p for p in predictions if p['class_id'] == class_id]
        class_targets = [t for t in targets if t['class_id'] == class_id]

        if len(class_targets) == 0:
            continue

        # 按置信度排序预测
        class_preds.sort(key=lambda x: x['confidence'], reverse=True)

        # 计算TP和FP
        tp = np.zeros(len(class_preds))
        fp = np.zeros(len(class_preds))
        matched_targets = set()

        for i, pred in enumerate(class_preds):
            best_iou = 0
            best_target_idx = -1

            for j, target in enumerate(class_targets):
                if j in matched_targets:
                    continue

                iou = calculate_iou(pred['bbox'], target['bbox'])
                if iou > best_iou:
                    best_iou = iou
                    best_target_idx = j

            if best_iou >= iou_threshold:
                tp[i] = 1
                matched_targets.add(best_target_idx)
            else:
                fp[i] = 1

        # 计算精确率和召回率
        tp_cumsum = np.cumsum(tp)
        fp_cumsum = np.cumsum(fp)
        recalls = tp_cumsum / len(class_targets)
        precisions = tp_cumsum / (tp_cumsum + fp_cumsum + 1e-10)

        # 使用11点插值计算AP
        ap = 0
        for t in np.arange(0, 1.1, 0.1):
            mask = recalls >= t
            if np.any(mask):
                p = np.max(precisions[mask])
                ap += p / 11

        ap_per_class.append(ap)

    # 计算mAP
    mAP = np.mean(ap_per_class) if ap_per_class else 0

    return mAP, ap_per_class

最佳实践

训练技巧

  1. 为可重复性设置随机种子

    def set_seed(seed=42):
        torch.manual_seed(seed)
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        np.random.seed(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
    
  2. 使用梯度裁剪

    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
    
  3. 使用学习率查找器

    def find_lr(model, train_loader, criterion, device, init_lr=1e-7, final_lr=10, num_iter=100):
        """查找最优学习率。"""
        model.train()
        optimizer = torch.optim.Adam(model.parameters(), lr=init_lr)
        gamma = (final_lr / init_lr) ** (1 / num_iter)
    
        lrs = []
        losses = []
    
        for i, (inputs, targets) in enumerate(train_loader):
            if i >= num_iter:
                break
    
            inputs, targets = inputs.to(device), targets.to(device)
    
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
    
            lrs.append(optimizer.param_groups[0]['lr'])
            losses.append(loss.item())
    
            optimizer.param_groups[0]['lr'] *= gamma
    
        return lrs, losses
    
  4. 使用最佳验证指标的模型检查点

    best_val_loss = float('inf')
    for epoch in range(epochs):
        val_loss = validate(model, val_loader, criterion, device)['loss']
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pt')
    
  5. 使用梯度累积以获得更大的有效批大小

    accumulation_steps = 4
    for i, (inputs, targets) in enumerate(train_loader):
        loss = criterion(model(inputs), targets) / accumulation_steps
        loss.backward()
    
        if (i + 1) % accumulation_steps == 0:
            optimizer.step()
            optimizer.zero_grad()
    

调试技巧

  1. 过拟合单个批次

    def overfit_single_batch(model, train_loader, criterion, optimizer, device, epochs=100):
        """过拟合单个批次以验证模型。"""
        model.train()
    
        inputs, targets = next(iter(train_loader))
        inputs, targets = inputs.to(device), targets.to(device)
    
        for epoch in range(epochs):
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
    
            if epoch % 10 == 0:
                print(f"Epoch {epoch}, Loss: {loss.item():.4f}")
    
  2. 检查梯度中的NaN

    def check_nan_gradients(model):
        """检查梯度中的NaN。"""
        for name, param in model.named_parameters():
            if param.grad is not None:
                if torch.isnan(param.grad).any():
                    print(f"NaN gradient found in {name}")
                    return True
        return False
    
  3. 监控梯度范数

    def get_gradient_norm(model):
        """计算梯度范数。"""
        total_norm = 0
        for p in model.parameters():
            if p.grad is not None:
                param_norm = p.grad.data.norm(2)
                total_norm += param_norm.item() ** 2
        total_norm = total_norm ** 0.5
        return total_norm
    

常见陷阱

  1. 在验证期间忘记调用model.eval()

    # 错误:
    for inputs, targets in val_loader:
        outputs = model(inputs)  # 模型仍在训练模式!
    
    # 正确:
    model.eval()
    with torch.no_grad():
        for inputs, targets in val_loader:
            outputs = model(inputs)
    
  2. 不一致地使用.to(device)

    # 错误:
    inputs = inputs.to(device)
    outputs = model(inputs)  # 模型可能在CPU上!
    
    # 正确:
    model = model.to(device)
    inputs, targets = inputs.to(device), targets.to(device)
    outputs = model(inputs)
    
  3. 在CrossEntropyLoss之前使用Softmax

    # 错误:
    outputs = model(inputs)
    outputs = torch.softmax(outputs, dim=1)
    loss = criterion(outputs, targets)  # 双重softmax!
    
    # 正确:
    outputs = model(inputs)
    loss = criterion(outputs, targets)  # CrossEntropyLoss包括LogSoftmax
    
  4. 不洗牌训练数据

    # 错误:
    train_loader = DataLoader(dataset, batch_size=32, shuffle=False)
    
    # 正确:
    train_loader = DataLoader(dataset, batch_size=32, shuffle=True)
    
  5. 使用测试数据进行超参数调优

    # 错误:
    # 在测试集上调优超参数会导致过拟合
    
    # 正确:
    # 使用验证集调优,保持测试集独立
    

性能优化

  1. 增加num_workers以加速数据加载

    train_loader = DataLoader(
        dataset,
        batch_size=32,
        num_workers=8,  # 根据CPU核心数增加
        pin_memory=True  # 加速GPU传输
    )
    
  2. 使用混合精度训练

    scaler = GradScaler()
    with autocast():
        outputs = model(inputs)
        loss = criterion(outputs, targets)
    scaler.scale(loss).backward()
    scaler.step(optimizer)
    scaler.update()
    
  3. 使用梯度检查点以提高内存效率

    from torch.utils.checkpoint import checkpoint
    
    class CheckpointedModel(nn.Module):
        def forward(self, x):
            # 为内存密集型层使用检查点
            x = checkpoint(self.layer1, x)
            x = checkpoint(self.layer2, x)
            return x
    
  4. 处理内存不足问题

    # 选项1: 减少批大小
    batch_size = 16  # 而不是32
    
    # 选项2: 使用梯度累积
    accumulation_steps = 4
    for i, (inputs, targets) in enumerate(train_loader):
        loss = criterion(model(inputs), targets) / accumulation_steps
        loss.backward()
        if (i + 1) % accumulation_steps == 0:
            optimizer.step()
            optimizer.zero_grad()
    

相关技能