從 PyTorch DDP 到 Accelerate 到 Trainer,輕鬆掌握分散式訓練

語言: CN / TW / HK

概述

本教程假定你已經對於 PyToch 訓練一個簡單模型有一定的基礎理解。本教程將展示使用 3 種封裝層級不同的方法呼叫 DDP (DistributedDataParallel) 程序,在多個 GPU 上訓練同一個模型:

  • 使用 pytorch.distributed 模組的原生 PyTorch DDP 模組
  • 使用 🤗 Accelerate 對 pytorch.distributed 的輕量封裝,確保程式可以在不修改程式碼或者少量修改程式碼的情況下在單個 GPU 或 TPU 下正常執行
  • 使用 🤗 Transformer 的高階 Trainer API ,該 API 抽象封裝了所有程式碼模板並且支援不同裝置和分散式場景。

什麼是分散式訓練,為什麼它很重要?

下面是一些非常基礎的 PyTorch 訓練程式碼,它基於 Pytorch 官方在 MNIST 上建立和訓練模型的示例

```python import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transforms

class BasicNet(nn.Module): def init(self): super().init() self.conv1 = nn.Conv2d(1, 32, 3, 1) self.conv2 = nn.Conv2d(32, 64, 3, 1) self.dropout1 = nn.Dropout(0.25) self.dropout2 = nn.Dropout(0.5) self.fc1 = nn.Linear(9216, 128) self.fc2 = nn.Linear(128, 10) self.act = F.relu

def forward(self, x):
    x = self.act(self.conv1(x))
    x = self.act(self.conv2(x))
    x = F.max_pool2d(x, 2)
    x = self.dropout1(x)
    x = torch.flatten(x, 1)
    x = self.act(self.fc1(x))
    x = self.dropout2(x)
    x = self.fc2(x)
    output = F.log_softmax(x, dim=1)
    return output

```

我們定義訓練裝置 (cuda):

python device = "cuda"

構建一些基本的 PyTorch DataLoaders:

```python transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307), (0.3081)) ])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform) test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64) test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64) ```

把模型放入 CUDA 裝置:

python model = BasicNet().to(device)

構建 PyTorch optimizer (優化器)

python optimizer = optim.AdamW(model.parameters(), lr=1e-3)

最終建立一個簡單的訓練和評估迴圈,訓練迴圈會使用全部訓練資料集進行訓練,評估迴圈會計算訓練後模型在測試資料集上的準確度:

```python model.train() for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.step() optimizer.zero_grad()

model.eval() correct = 0 with torch.no_grad(): for data, target in test_loader: output = model(data) pred = output.argmax(dim=1, keepdim=True) correct += pred.eq(target.view_as(pred)).sum().item() print(f'Accuracy: {100. * correct / len(test_loader.dataset)}') ```

通常從這裡開始,就可以將所有的程式碼放入 Python 指令碼或在 Jupyter Notebook 上執行它。

然而,只執行 python myscript.py 只會使用單個 GPU 執行指令碼。如果有多個 GPU 資源可用,您將如何讓這個指令碼在兩個 GPU 或多臺機器上執行,通過分散式訓練提高訓練速度?這是 torch.distributed 發揮作用的地方。

PyTorch 分散式資料並行

顧名思義,torch.distributed 旨在配置分散式訓練。你可以使用它配置多個節點進行訓練,例如:多機器下的單個 GPU,或者單臺機器下的多個 GPU,或者兩者的任意組合。

為了將上述程式碼轉換為分散式訓練,必須首先定義一些設定配置,具體細節請參閱 DDP 使用教程

首先必須宣告 setupcleanup 函式。這將建立一個程序組,並且所有計算程序都可以通過這個程序組通訊。

注意:在本教程的這一部分中,假定這些程式碼是在 Python 指令碼檔案中啟動。稍後將討論使用 🤗 Accelerate 的啟動器,就不必宣告 setupcleanup 函數了

```python import os import torch.distributed as dist

def setup(rank, world_size): "Sets up the process group and configuration for PyTorch Distributed Data Parallelism" os.environ["MASTER_ADDR"] = 'localhost' os.environ["MASTER_PORT"] = "12355"

# Initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup(): "Cleans up the distributed environment" dist.destroy_process_group() ```

最後一個疑問是,我怎樣把我的資料和模型傳送到另一個 GPU 上?

這正是 DistributedDataParallel 模組發揮作用的地方, 它將您的模型複製到每個 GPU 上 ,並且當 loss.backward() 被呼叫進行反向傳播的時候,所有這些模型副本的梯度將被同步地平均/下降 (reduce)。這確保每個裝置在執行優化器步驟後具有相同的權重。

下面是我們的訓練設定示例,我們使用了 DistributedDataParallel 重構了訓練函式:

注意:此處的 rank 是當前 GPU 與所有其他可用 GPU 相比的總體 rank,這意味著它們的 rank 為 0 -> n-1

```python from torch.nn.parallel import DistributedDataParallel as DDP

def train(model, rank, world_size): setup(rank, world_size) model = model.to(rank) ddp_model = DDP(model, device_ids=[rank]) optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3) # Train for one epoch model.train() for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.step() optimizer.zero_grad() cleanup() ```

在上述的程式碼中需要為每個副本裝置上的模型 (因此在這裡是ddp_model的引數而不是 model 的引數) 宣告優化器,以便正確計算每個副本裝置上的梯度。

最後,要執行指令碼,PyTorch 有一個方便的 torchrun 命令列模組可以提供幫助。只需傳入它應該使用的節點數以及要執行的指令碼即可:

bash torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py

上面的程式碼可以在在一臺機器上的兩個 GPU 上執行訓練指令碼,這是使用 PyTorch 只進行分散式訓練的情況 (不可以在單機單卡上執行)。

現在讓我們談談 🤗 Accelerate,一個旨在使並行化更加無縫並有助於一些最佳實踐的庫。

🤗 Accelerate

🤗 Accelerate 是一個庫,旨在無需大幅修改程式碼的情況下完成並行化。除此之外,🤗 Accelerate 附帶的資料 pipeline 還可以提高程式碼的效能。

首先,讓我們將剛剛執行的所有上述程式碼封裝到一個函式中,以幫助我們直觀地看到差異:

```python def train_ddp(rank, world_size): setup(rank, world_size) # Build DataLoaders transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307), (0.3081)) ])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

# Build model
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])

# Build optimizer
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)

# Train for a single epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)
    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

# Evaluate
model.eval()
correct = 0
with torch.no_grad():
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

```

接下來讓我們談談 🤗 Accelerate 如何便利地實現並行化的。上面的程式碼有幾個問題:

  1. 該程式碼有點低效,因為每個裝置都會建立一個 dataloader
  2. 這些程式碼只能執行在多 GPU 下,當想讓這個程式碼執行在單個 GPU 或 TPU 時,還需要額外進行一些修改。

🤗 Accelerate 通過 Accelerator 類解決上述問題。通過它,不論是單節點還是多節點,除了三行程式碼外,其餘程式碼幾乎保持不變,如下所示:

```python def train_ddp_accelerate(): accelerator = Accelerator() # Build DataLoaders transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307), (0.3081)) ])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

# Build model
model = BasicModel()

# Build optimizer
optimizer = optim.AdamW(model.parameters(), lr=1e-3)

# Send everything through `accelerator.prepare`
train_loader, test_loader, model, optimizer = accelerator.prepare(
    train_loader, test_loader, model, optimizer
)

# Train for a single epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
    output = model(data)
    loss = F.nll_loss(output, target)
    accelerator.backward(loss)
    optimizer.step()
    optimizer.zero_grad()

# Evaluate
model.eval()
correct = 0
with torch.no_grad():
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

```

藉助 Accelerator 物件,您的 PyTorch 訓練迴圈現在已配置為可以在任何分散式情況執行。使用 Accelerator 改造後的程式碼仍然可以通過 torchrun CLI 或通過 🤗 Accelerate 自己的 CLI 介面啟動(啟動你的🤗 Accelerate 指令碼)。

因此,現在可以儘可能保持 PyTorch 原生程式碼不變的前提下,使用 🤗 Accelerate 執行分散式訓練。

早些時候有人提到 🤗 Accelerate 還可以使 DataLoaders 更高效。這是通過自定義取樣器實現的,它可以在訓練期間自動將部分批次傳送到不同的裝置,從而允許每個裝置只需要儲存資料的一部分,而不是一次將資料複製四份存入記憶體,具體取決於配置。因此,記憶體總量中只有原始資料集的一個完整副本。該資料集會拆分後分配到各個訓練節點上,從而允許在單個例項上訓練更大的資料集,而不會使記憶體爆炸

使用 notebook_launcher

之前提到您可以直接從 Jupyter Notebook 執行分散式程式碼。這來自 🤗 Accelerate 的 notebook_launcher 模組,它可以在 Jupyter Notebook 內部的程式碼啟動多 GPU 訓練。

使用它就像匯入 launcher 一樣簡單:

python from accelerate import notebook_launcher

接著傳遞我們之前宣告的訓練函式、要傳遞的任何引數以及要使用的程序數(例如 TPU 上的 8 個,或兩個 GPU 上的 2 個)。下面兩個訓練函式都可以執行,但請注意,啟動單次啟動後,例項需要重新啟動才能產生另一個:

python notebook_launcher(train_ddp, args=(), num_processes=2)

或者:

python notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)

使用 🤗 Trainer

終於我們來到了最高階的 API——Hugging Face Trainer.

它涵蓋了儘可能多的訓練型別,同時仍然能夠在分散式系統上進行訓練,使用者根本不需要做任何事情。

首先我們需要匯入 🤗 Trainer:

python from transformers import Trainer

然後我們定義一些 TrainingArguments 來控制所有常用的超引數。🤗 Trainer 需要的訓練資料是字典型別的,因此需要製作自定義整理功能。

最後,我們將訓練器子類化並編寫我們自己的 compute_loss.

之後,這段程式碼也可以分散式執行,而無需修改任何訓練程式碼!

```python from transformers import Trainer, TrainingArguments

model = BasicNet()

training_args = TrainingArguments( "basic-trainer", per_device_train_batch_size=64, per_device_eval_batch_size=64, num_train_epochs=1, evaluation_strategy="epoch", remove_unused_columns=False )

def collate_fn(examples): pixel_values = torch.stack([example[0] for example in examples]) labels = torch.tensor([example[1] for example in examples]) return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer): def compute_loss(self, model, inputs, return_outputs=False): outputs = model(inputs["x"]) target = inputs["labels"] loss = F.nll_loss(outputs, target) return (loss, outputs) if return_outputs else loss

trainer = MyTrainer( model, training_args, train_dataset=train_dset, eval_dataset=test_dset, data_collator=collate_fn, ) ```

python trainer.train()

bash ***** Running training ***** Num examples = 60000 Num Epochs = 1 Instantaneous batch size per device = 64 Total train batch size (w. parallel, distributed & accumulation) = 64 Gradient Accumulation steps = 1 Total optimization steps = 938

| Epoch | 訓練損失 | 驗證損失 |--|--|--| |1|0.875700|0.282633|

與上面的 notebook_launcher 示例類似,也可以將這個過程封裝成一個訓練函式:

```python def train_trainer_ddp(): model = BasicNet()

training_args = TrainingArguments(
    "basic-trainer",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    num_train_epochs=1,
    evaluation_strategy="epoch",
    remove_unused_columns=False
)

def collate_fn(examples):
    pixel_values = torch.stack([example[0] for example in examples])
    labels = torch.tensor([example[1] for example in examples])
    return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(inputs["x"])
        target = inputs["labels"]
        loss = F.nll_loss(outputs, target)
        return (loss, outputs) if return_outputs else loss

trainer = MyTrainer(
    model,
    training_args,
    train_dataset=train_dset,
    eval_dataset=test_dset,
    data_collator=collate_fn,
)

trainer.train()

notebook_launcher(train_trainer_ddp, args=(), num_processes=2) ```

相關資源

  • 要了解有關 PyTorch 分散式資料並行性的更多資訊,請檢視: https://pytorch.org/docs/stable/distributed.html
  • 要了解有關 🤗 Accelerate 的更多資訊,請檢視: https://hf.co/docs/accelerate
  • 要了解有關 🤗 Transformer 的更多資訊,請檢視: https://hf.co/docs/transformers

原文作者:Zachary Mueller

譯者:innovation64 (李洋)

審校:yaoqi (胡耀淇)

排版:zhongdongy (阿東)