Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pytorch distributed job failed when master replica start later than worker replica #547

Open
meibenjin opened this issue Apr 27, 2021 · 1 comment

Comments

@meibenjin
Copy link
Contributor

example code

# -*- coding: utf-8 -*-
from __future__ import print_function
import math
import torch
import torchvision.transforms as transforms
from torch.distributed import get_world_size, get_rank
from torch.utils.data.sampler import Sampler
from torchvision import datasets
from tqdm import tqdm
from torch.autograd import Variable
import torch.nn as nn

import os
print('MASTER_ADDR', os.environ['MASTER_ADDR'])
print('MASTER_PORT', os.environ['MASTER_PORT'])
print('RANK', os.environ['RANK'])
print('WORLD_SIZE', os.environ['WORLD_SIZE'])


torch.distributed.init_process_group(backend='nccl')


class DistributedSampler(Sampler):
  """Sampler that restricts data loading to a subset of the dataset.
  It is especially useful in conjunction with
  :class:`torch.nn.parallel.DistributedDataParallel`. In such case, each
  process can pass a DistributedSampler instance as a DataLoader sampler,
  and load a subset of the original dataset that is exclusive to it.
  .. note::
      Dataset is assumed to be of constant size.
  Arguments:
      dataset: Dataset used for sampling.
      num_replicas (optional): Number of processes participating in
          distributed training.
      rank (optional): Rank of the current process within num_replicas.
  """

  def __init__(self, dataset, num_replicas=None, rank=None):
    if num_replicas is None:
      num_replicas = get_world_size()
    if rank is None:
      rank = get_rank()
    self.dataset = dataset
    self.num_replicas = num_replicas
    self.rank = rank
    self.epoch = 0
    self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.num_replicas))
    self.total_size = self.num_samples * self.num_replicas

  def __iter__(self):
    # deterministically shuffle based on epoch
    g = torch.Generator()
    g.manual_seed(self.epoch)
    indices = list(torch.randperm(len(self.dataset), generator=g))

    # add extra samples to make it evenly divisible
    indices += indices[:(self.total_size - len(indices))]
    assert len(indices) == self.total_size

    # subsample
    offset = self.num_samples * self.rank
    indices = indices[offset:offset + self.num_samples]
    assert len(indices) == self.num_samples

    return iter(indices)

  def __len__(self):
    return self.num_samples

  def set_epoch(self, epoch):
    self.epoch = epoch


transform = transforms.Compose(
  [transforms.ToTensor(),
   transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = datasets.CIFAR10(
  root="./data/volume1/", train=True, download=True, transform=transform)
train_sampler = DistributedSampler(trainset)
trainloader = torch.utils.data.DataLoader(
  trainset, batch_size=32, sampler=train_sampler, num_workers=2, drop_last=True)

testset = datasets.CIFAR10(
  root="./data/volume1/", train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(
  testset, batch_size=4, shuffle=False, num_workers=2, drop_last=True)

classes = (
  'plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')


class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.conv1 = nn.Conv2d(3, 6, 5)
    self.bn1 = nn.BatchNorm2d(6)
    self.pool = nn.MaxPool2d(2, 2)
    self.conv2 = nn.Conv2d(6, 16, 5)
    self.fc1 = nn.Linear(16 * 5 * 5, 1200)
    self.fc2 = nn.Linear(1200, 840)
    self.fc3 = nn.Linear(840, 10)
    self.relu = nn.ReLU()

  def forward(self, x):
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.pool(x)
    x = self.pool(self.relu(self.conv2(x)))
    x = x.view(-1, 16 * 5 * 5)
    x = self.relu(self.fc1(x))
    x = self.relu(self.fc2(x))
    x = self.fc3(x)
    return x


net = Net().cuda()  # GPU
# net = Net()

net = torch.nn.parallel.DistributedDataParallel(net)

import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9)

for epoch in range(1000):  # loop over the dataset multiple times
  trainloader.sampler.set_epoch(epoch)
  running_loss = 0.0
  for i, data in enumerate(tqdm(trainloader), 0):
    # get the inputs
    inputs, labels = data

    # wrap them in Variable
    inputs, labels = Variable(inputs.cuda()), Variable(labels.cuda())  # GPU
    # inputs, labels = Variable(inputs), Variable(labels)

    # zero the parameter gradients
    optimizer.zero_grad()

    # forward + backward + optimize
    outputs = net(inputs)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()

    # print statistics
    running_loss += loss.item()
    if i % 200 == 199:  # print every 2000 mini-batches
      print('[%d, %5d] loss: %.3f' %
            (epoch + 1, i + 1, running_loss / 200))
      running_loss = 0.0

print('Finished Training')

########################################################################
# The results seem pretty good.
#
# Let us look at how the network performs on the whole dataset.

correct = 0
total = 0
for data in testloader:
  images, labels = data
  images, labels = images.cuda(), labels.cuda()  # GPU
  outputs = net(Variable(images))
  _, predicted = torch.max(outputs.data, 1)
  total += labels.size(0)
  correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
    100 * correct / total))

########################################################################
# That looks waaay better than chance, which is 10% accuracy (randomly picking
# a class out of 10 classes).
# Seems like the network learnt something.
#
# Hmmm, what are the classes that performed well, and the classes that did
# not perform well:

class_correct = list(0. for i in range(10))
class_total = list(0. for i in range(10))
for data in testloader:
  images, labels = data
  images, labels = images.cuda(), labels.cuda()  # GPU
  outputs = net(Variable(images))
  _, predicted = torch.max(outputs.data, 1)
  c = (predicted == labels).squeeze()
  for i in range(4):
    label = labels[i]
    class_correct[label] += c[i].item()
    class_total[label] += 1

for i in range(10):
  print('Accuracy of %5s : %2d %%' % (
    classes[i], 100 * class_correct[i] / class_total[i]))

arena command:

arena submit pytorchjob --name=torch-simple --workers=3 --gpus=4 --image=registry.cn-beijing.aliyuncs.com/pai-dlc/pytorch-training:1.4.0PAI-gpu-py37-cu100-ubuntu16.04 --data=pai-dlc:/pai-dlc "cd /pai-dlc; python train.py"

error occured when master start later than worker pod
image

After modify restartPolicy to OnFailure in file ~/charts/pytorchjob/templates/pytorchjob.yaml,we can workaround this problem

Suggestions:

  1. pytorch job template use OnFailure as default restartPolicy of worker replica;
  2. allow user set restartPolicy flag in arena command。
@2sin18
Copy link

2sin18 commented Sep 28, 2021

See kubeflow/pytorch-operator#125 for more details.

https://github.com/kubeflow/arena/blob/master/charts/pytorchjob/templates/pytorchjob.yaml#L313 Maybe changing this line to OnFailure can help resolving this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants