Distributed Training#

In this chapter, we will introduce how to efficiently use multiple GPUs for distributed training in MegEngine. Distributed training refers to the simultaneous use of GPUs on one or more machines for parallel computing. In the field of deep learning, the most common parallel computing method is performed at the data level, that is, each GPU is responsible for a part of the data and needs to run through the entire training and inference process. This method is called data parallel.

At present, the open interface of MegEngine supports the data parallel mode of single machine multi-card and multi-machine multi-card.

Single multi-card#

Single-machine multi-card is the most commonly used method, such as single-machine four-card, single-machine eight-card, enough to support us to complete most of the model training.

In this section, we will introduce in the following order:

  1. How to start a single multi-card training

  2. How to save and load models in a multi-process environment

How to start a single multi-card training#

We provide a single-machine multi-card launcher. Code example:

import numpy as np
import megengine as mge
import megengine.autodiff as ad
import megengine.distributed as dist
import megengine.optimizer as optim
from megengine.data.dataset.vision import MNIST
from megengine.data.dataloader import DataLoader
from megengine.data.sampler import SequentialSampler
from megengine import functional as F
from megengine import module as M

# pre download MNIST data
MNIST()

@dist.launcher
def main():
    rank = dist.get_rank()

    # 设置超参数
    bs = 100
    lr = 1e-6
    epochs = 5

    num_features = 784   # (28, 28, 1) Flatten -> 784
    num_classes = 10

    # 定义单层线性分类网络
    class Linear(M.Module):
        def __init__(self):
            super().__init__()
            # 初始化参数
            self.w = mge.Parameter(np.zeros((num_features, num_classes)))
            self.b = mge.Parameter(np.zeros((num_classes,)))

        def forward(self, data):
            data = f.flatten(data, 1)
            return F.matmul(data, self.w) + self.b

    # 初始化模型
    linear_cls = Linear()

    # 同步模型参数,默认全局同步,可以给bcast_list_加上group参数在指定group之间同步
    dist.bcast_list_(linear_cls.tensors())

    gm = ad.GradManager()
    gm.attach(linear_cls.parameters(), callbacks=[dist.make_allreduce_cb("sum")])
    opt = optim.SGD(linear_cls.parameters(), lr=lr)

    data = MNIST()
    sampler = SequentialSampler(data, batch_size=bs)
    data_loader = DataLoader(data, sampler=sampler)

    for epoch in range(epochs):
        total_loss = 0
        for data, label in data_loader:
            data = mge.tensor(data)
            label = mge.tensor(label)
            with gm:
                pred = linear_cls(data)
                loss = F.nn.cross_entropy(pred, label)
                gm.backward(loss)
            opt.step().clear_grad()
            loss = dist.functional.all_reduce_sum(loss) / dist.get_world_size()
            total_loss += loss.item()
        if rank == 0:
            print("epoch = {}, loss = {:.3f}".format(epoch, total_loss / len(data_loader)))

main()

# 期望结果
# epoch = 0, loss = 0.618
# epoch = 1, loss = 0.392
# epoch = 2, loss = 0.358
# epoch = 3, loss = 0.341
# epoch = 4, loss = 0.330

Compared with single-card training, the training code for single-machine multi-card is only a few lines of code different

  • @dist.launcher

  • dist.bcast_list_(linear_cls.tensors())

  • gm.attach(linear_cls.parameters(), callbacks=[dist.make_allreduce_cb(“sum”)])

Below I will explain the meaning of these sentences one by one

@dist.launcher

launcher wraps a function into a multi-process running function (by default, multiple processes are started based on the number of devices on the machine), and each process will set the default deivce according to the rank at the beginning, if it is one 8 card machine, then 8 processes will be started, the rank is 0 to 8, and the device is gpu0 to gpu7.

dist.bcast_list_(linear_cls.tensors())

bcast_list_ is used to synchronize the parameters between each process, the default is in the global scope (all computing devices) synchronization, you can set the group parameter to synchronize between specific groups

Warning

Note that the API used here is module.Module.tensors instead of module.Module.parameters, this is because not only parameters need to be synchronized, sometimes there are some statistics in the model, such as BatchNorm2d

gm.attach(linear_cls.parameters(), callbacks=[dist.make_allreduce_cb("sum")])

In the case of data parallelism, since each card is only responsible for part of the data, there will be only part of the derivative after the derivative. Register the callback function for the gradient in GradManager, and make a all_reduce_sum operation performs global summation, so that the derivatives of each computing device are synchronized to ensure the consistency of parameter updates

Note

There is special support for multi-machine training in DataLoader, and each process will be automatically assigned non-overlapping data for training, so no special processing is done in terms of data supply. If DataLoader, you need to manually assign non-overlapping data to devices of different ranks for training, as shown below

mnist_datasets = MNIST() # 下载并读取 MNIST 数据集

size = ceil(len(mnist_datasets) / num_devices) # 将所有数据划分为 num_devices 份
l = size * rank # 得到本进程负责的数据段的起始索引
r = min(size * (rank + 1), len(mnist_datasets)) # 得到本进程负责的数据段的终点索引
data, label = mnist_datasets[l:r] # 得到本进程的数据和标签
data = np.concatenate([*data]).reshape(r-l, 28, 28, 1) # data 的数据类型为 list of nparray,需要拼接起来作为模型的输入

Model saving and loading#

In MegEngine, relying on the state synchronization mechanism mentioned above, we keep the state of each process consistent, so the model can be saved and loaded easily.

For loading, we only need to load the model parameters in the main process (rank 0 process), and then call bcast_list_ to synchronize the parameters of each process to keep the state of each process consistent.

For saving, since we have inserted the callback function in the gradient calculation to accumulate the gradient of each process, the parameters after we update the parameters are still consistent and can be saved directly.

You can refer to the following sample code to achieve:

# 加载模型参数
if rank == 0:
    net.load_state_dict(checkpoint['net'])
dist.bcast_list_(net.tensors())
opt = SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
gm = GradManager().attach(net.parameters(), callbacks=[dist.make_allreduce_cb("sum")])

# 训练
for epoch in range(epochs):
    for data, label in data_loader:
        data = mge.tensor(data)
        label = mge.tensor(label)
        with gm:
            pred = net(data)
            loss = F.nn.cross_entropy(pred, label)
            gm.backward(loss)
        opt.step().clear_grad()

# 保存模型参数
if rank == 0:
    checkpoint = {
        'net': net.state_dict(),
        'acc': best_acc,
    }
    mge.save(checkpoint, path)

Multi-machine multi-card#

In MegEngine, we can easily modify the above single-machine multi-card code to multi-machine multi-card, just modify launcher to perform multi-machine and multi-card training. The part is the same as the stand-alone multi-card.

@dist.launcher(world_size=world_size,
               n_gpus=n_gpus,
               rank_start=rank_start,
               master_ip=master_ip,
               port=port)

Parameter meaning

parameter name

type of data

Actual meaning

world_size

int

Total number of cards used for training

n_gpus

int

The number of cards of this physical machine at runtime

rank_start

int

The starting value of the rank of this machine

master_ip

str

rank 0 the IP address of the machine

port

int

Port number used by distributed training master server

Parallel pipeline#

In MegEngine, the pipeline method is also supported for training.

The simplest pipeline parallelism is to split a model into upper and lower parts, which can be easily implemented in MegEngine.

Here is a simple example to show how to write a line of training:

import megengine as mge
import numpy as np
import megengine.module as M
import megengine.autodiff as ad
import megengine.distributed as dist
import megengine.optimizer as optim

@dist.launcher(n_gpus=2)
def main():

    rank = dist.get_rank()
    # client 用于各个 rank 之间互相通信
    client = dist.get_client()
    if rank == 0:
        layer1 = M.Linear(1, 1) # 模型上半部分

        x = mge.tensor(np.random.randn(1))
        gm = ad.GradManager()
        opt = optim.SGD(layer1.parameters(), lr=1e-3)
        gm.attach(layer1.parameters())

        with gm:
            feat = layer1(x)
            dist.functional.remote_send(feat, dest_rank=1)
            gm.backward([])
            print("layer1 grad:", layer1.weight.grad)
            opt.step().clear_grad()
    else:
        layer2 = M.Linear(1, 1) # 模型下半部分

        gm = ad.GradManager()
        opt = optim.SGD(layer2.parameters(), lr=1e-3)
        gm.attach(layer2.parameters())

        with gm:
            feat = dist.functional.remote_recv(src_rank=0)
            loss = layer2(feat)
            gm.backward(loss)
            print("layer2 grad:", layer2.weight.grad)
            opt.step().clear_grad()

main()

# 期望输出
# layer2 grad: Tensor([[-2.4756]], device=gpu1:0)
# layer1 grad: Tensor([[-0.7784]], device=gpu0:0)

common problem#

Q:why normal multi-machine card before the start of training, after training to enter the multi-card on the error `` cuda init error``?

A:Make sure that the primary process is not carried out cuda related operations before entering the multi-machine card training, cuda were fork action causes the process fork in cuda is not available, refer to here in the initialized state <https://stackoverflow.com/questions/22950047/cuda-initialization-error-after-fork> _ is recommended with numpy Arrays are used as input and output to use launcher-wrapped functions.

Q:do I use multiprocessing to write multi-machine and multi-card training by myself and always get stuck?

A: group_barrier before the end of the function to avoid stuck

  • In MegEngine, in order to ensure performance, the corresponding cuda kernel will be executed asynchronously, so when the python code is executed, the corresponding kernel execution has not yet ended.

  • In order to ensure that the kernel is all executed, MegEngine registered global synchronization in atexit when it was initialized, but the default fork mode of multiprocess will not execute atexit registration when the process exits. The function of the kernel has not finished execution.

  • If there are operators that need to communicate between processes, and several processes exit early, the remaining processes will wait for other processes to cause stuck (if you need to take the value of a parameter for a process such as rank0).