概述
基于pytorch自身的分布式训练,通过调用nccl等backend实现多机多卡并行训练。
面向 DDP手动启动
的实现
启动方式
python main.py
面向 torchrun
的实现
启动方式
torchrun main.py
重要环境变量参数说明
变量名 | 定义及说明 |
---|---|
LOCAL_RANK | 本地排名 |
RANK | 全局排名 |
GROUP_RANK | worker 组的排名。一个介于 0 和 max_nnodes 之间的数字。当每个节点运行一个 worker 组时,这是节点的排名 |
ROLE_RANK | 所有具有相同角色的 worker 中的 worker 的排名。worker 的角色在 WorkerSpec 中指定 |
LOCAL_WORLD_SIZE | 本地世界大小(例如本地运行的 worker 数);等于在 torchrun 上指定的 --nproc-per-node |
WORLD_SIZE | 世界大小(作业中 worker 的总数) |
ROLE_WORLD_SIZE | 使用在 WorkerSpec 中指定的相同角色启动的 worker 的总数 |
MASTER_ADDR | 运行排名为 0 的 worker 的主机的 FQDN;用于初始化 Torch 分布式后端 |
MASTER_PORT | MASTER_ADDR 上的端口,可用于托管 C10d TCP 存储 |
TORCHELASTIC_RESTART_COUNT | 到目前为止的 worker 组重启次数 |
TORCHELASTIC_MAX_RESTARTS | 配置的最大重启次数 |
TORCHELASTIC_RUN_ID | 等于 rendezvous run_id(例如唯一的作业 ID) |
PYTHON_EXEC | 系统可执行文件覆盖。如果提供,python 用户脚本将使用 PYTHON_EXEC 的值作为可执行文件。默认情况下使用 sys.executable |
rank问题
端口问题
个人模版
参考:
Pytorch - DDP教程 Pytorch - torchrun 弹性启动
import argparse
import logging
import os
import sys
from importlib import reload
import datasets
import torch
import torch.distributed as dist
import torch.nn as nn
from einops import rearrange
from torch.distributed.elastic.multiprocessing.errors import record
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.optim.adamw import AdamW
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler
from torchvision import transforms
class CustomModel(nn.Module):
def __init__(self, dim, heads=4, dim_head=32):
super().__init__()
self.heads = heads
hidden_dim = dim_head * heads
self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, bias=False)
self.to_out = nn.Conv2d(hidden_dim, dim, 1)
def forward(self, x):
b, c, h, w = x.shape
qkv = self.to_qkv(x)
q, k, v = rearrange(
qkv, "b (qkv heads c) h w -> qkv b heads c (h w)", heads=self.heads, qkv=3
)
k = k.softmax(dim=-1)
context = torch.einsum("bhdn,bhen->bhde", k, v)
out = torch.einsum("bhde,bhdn->bhen", context, q)
out = rearrange(
out, "b heads c (h w) -> b (heads c) h w", heads=self.heads, h=h, w=w
)
return self.to_out(out)
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
@record
def main(custom_args):
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
print(f"Start running basic DDP example on rank {rank}.")
# create model and move it to GPU with id rank
gpu = rank % torch.cuda.device_count()
world_size = dist.get_world_size()
torch.cuda.set_device(gpu)
torch.backends.cudnn.benchmark = True
# ...
dataset = Dataset()
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank) if (world_size > 1) else None
dataloader = DataLoader(
dataset,
sampler=sampler
)
rank_iter = iter(dataloader)
model=ToyModel().cuda()
ddp_model = DDP(model, device_ids=[gpu])
loss_fn = nn.MSELoss()
optimizer = AdamW(ddp_model.parameters())
optimizer.zero_grad()
# train
# batch = next(rank_iter)
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(gpu)
loss = loss_fn(outputs, labels)
loss.backward()
optimizer.step()
# A step finished
# loss = dist.all_reduce(loss, op=torch.distributed.SUM)
# print(loss / world_size)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Argparser for configuring [code base name to think of] codebase"
)
parser.add_argument("--cfg", type=str, default="config.yaml")
args = parser.parse_args()
main(args)
单机多卡模版
多机多卡模版
从 torch.distributed.launch
到 torchrun
主要优势
- worker 故障由重新启动所有 worker 优雅地处理。
- worker 的 RANK 和 WORLD_SIZE 被自动分配。
- 允许在最小和最大大小之间更改节点数量(弹性)。
修改说明
如果您的训练脚本适用于 torch.distributed.launch,则它将继续适用于 torchrun,但存在以下差异
- 无需手动传递 RANK、WORLD_SIZE、MASTER_ADDR 和 MASTER_PORT。
- 可以提供 rdzv_backend 和 rdzv_endpoint。对于大多数用户,这将设置为 c10d(参阅 rendezvous)。默认的 rdzv_backend 会创建一个非弹性 rendezvous,其中 rdzv_endpoint 保存主地址。
- 确保您的脚本中包含 load_checkpoint(path) 和 save_checkpoint(path) 逻辑。当任意数量的工作进程失败时,我们会使用相同的程序参数重启所有工作进程,因此您将丢失到最近检查点的进度(参阅弹性启动)。
- use_env 标志已移除。如果您之前通过解析 --local-rank 选项来解析本地排名,则需要从环境变量 LOCAL_RANK 中获取本地排名(例如,int(os.environ["LOCAL_RANK"]))。
以下是一个训练脚本的说明性示例,该脚本在每个 epoch 存储 checkpoint,因此在失败时丢失进度的最坏情况是一个完整 epoch 的训练量。
def main():
args = parse_args(sys.argv[1:])
state = load_checkpoint(args.checkpoint_path)
initialize(state)
# torch.distributed.run ensures that this will work
# by exporting all the env vars needed to initialize the process group
torch.distributed.init_process_group(backend=args.backend)
for i in range(state.epoch, state.total_num_epochs)
for batch in iter(state.dataset)
train(batch, state.model)
state.epoch += 1
save_checkpoint(state)
debug推荐使用
使用record修饰器,
from torch.distributed.elastic.multiprocessing.errors import record
@record
def main():
# do train
pass
if __name__ == "__main__":
main()
多卡损失(仅值,不涉及反向传播)收集方法与对backward某些常见误解的解释
根据:https://github.com/huggingface/diffusers/pull/9482#issuecomment-2400424101
Comments | NOTHING