# Original source: https://github.com/NVIDIA/waveglow/blob/master/distributed.py # # Original license text: # ***************************************************************************** # Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of the NVIDIA CORPORATION nor the # names of its contributors may be used to endorse or promote products # derived from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # ***************************************************************************** import os import torch import torch.distributed as dist from torch.autograd import Variable def reduce_tensor(tensor, num_gpus, reduce_dst=None): if num_gpus <= 1: # pass-thru return tensor rt = tensor.clone() if reduce_dst is not None: dist.reduce(rt, reduce_dst, op=dist.ReduceOp.SUM) else: dist.all_reduce(rt, op=dist.ReduceOp.SUM) rt /= num_gpus return rt def init_distributed(rank, num_gpus, dist_backend, dist_url): assert torch.cuda.is_available(), "Distributed mode requires CUDA." print("> initializing distributed for rank {} out of {}".format(rank, num_gpus)) # Set cuda device so everything is done on the right GPU. torch.cuda.set_device(rank % torch.cuda.device_count()) init_method = "tcp://" master_ip = os.getenv("MASTER_ADDR", "localhost") master_port = os.getenv("MASTER_PORT", "6000") init_method += master_ip + ":" + master_port torch.distributed.init_process_group( backend="nccl", world_size=num_gpus, rank=rank, init_method=init_method ) def _flatten_dense_tensors(tensors): """Flatten dense tensors into a contiguous 1D buffer. Assume tensors are of same dense type. Since inputs are dense, the resulting tensor will be a concatenated 1D buffer. Element-wise operation on this buffer will be equivalent to operating individually. Arguments: tensors (Iterable[Tensor]): dense tensors to flatten. Returns: A contiguous 1D buffer containing input tensors. """ if len(tensors) == 1: return tensors[0].contiguous().view(-1) flat = torch.cat([t.contiguous().view(-1) for t in tensors], dim=0) return flat def _unflatten_dense_tensors(flat, tensors): """View a flat buffer using the sizes of tensors. Assume that tensors are of same dense type, and that flat is given by _flatten_dense_tensors. Arguments: flat (Tensor): flattened dense tensors to unflatten. tensors (Iterable[Tensor]): dense tensors whose sizes will be used to unflatten flat. Returns: Unflattened dense tensors with sizes same as tensors and values from flat. """ outputs = [] offset = 0 for tensor in tensors: numel = tensor.numel() outputs.append(flat.narrow(0, offset, numel).view_as(tensor)) offset += numel return tuple(outputs) def apply_gradient_allreduce(module): """ Modifies existing model to do gradient allreduce, but doesn't change class so you don't need "module" """ if not hasattr(dist, "_backend"): module.warn_on_half = True else: module.warn_on_half = True if dist._backend == dist.dist_backend.GLOO else False for p in module.state_dict().values(): if not torch.is_tensor(p): continue dist.broadcast(p, 0) def allreduce_params(): if module.needs_reduction: module.needs_reduction = False buckets = {} for param in module.parameters(): if param.requires_grad and param.grad is not None: tp = type(param.data) if tp not in buckets: buckets[tp] = [] buckets[tp].append(param) if module.warn_on_half: if torch.cuda.HalfTensor in buckets: print( "WARNING: gloo dist backend for half parameters may be extremely slow." + " It is recommended to use the NCCL backend in this case. This currently requires" + "PyTorch built from top of tree master." ) module.warn_on_half = False for tp in buckets: bucket = buckets[tp] grads = [param.grad.data for param in bucket] coalesced = _flatten_dense_tensors(grads) dist.all_reduce(coalesced) coalesced /= dist.get_world_size() for buf, synced in zip( grads, _unflatten_dense_tensors(coalesced, grads) ): buf.copy_(synced) for param in list(module.parameters()): def allreduce_hook(*unused): Variable._execution_engine.queue_callback(allreduce_params) if param.requires_grad: param.register_hook(allreduce_hook) dir(param) def set_needs_reduction(self, input, output): self.needs_reduction = True module.register_forward_hook(set_needs_reduction) return module