Spaces:
Build error
Build error
# Original source: https://github.com/NVIDIA/waveglow/blob/master/distributed.py | |
# | |
# Original license text: | |
# ***************************************************************************** | |
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved. | |
# | |
# Redistribution and use in source and binary forms, with or without | |
# modification, are permitted provided that the following conditions are met: | |
# * Redistributions of source code must retain the above copyright | |
# notice, this list of conditions and the following disclaimer. | |
# * Redistributions in binary form must reproduce the above copyright | |
# notice, this list of conditions and the following disclaimer in the | |
# documentation and/or other materials provided with the distribution. | |
# * Neither the name of the NVIDIA CORPORATION nor the | |
# names of its contributors may be used to endorse or promote products | |
# derived from this software without specific prior written permission. | |
# | |
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | |
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
# DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY | |
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | |
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | |
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | |
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | |
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
# | |
# ***************************************************************************** | |
import os | |
import torch | |
import torch.distributed as dist | |
from torch.autograd import Variable | |
def reduce_tensor(tensor, num_gpus, reduce_dst=None): | |
if num_gpus <= 1: # pass-thru | |
return tensor | |
rt = tensor.clone() | |
if reduce_dst is not None: | |
dist.reduce(rt, reduce_dst, op=dist.ReduceOp.SUM) | |
else: | |
dist.all_reduce(rt, op=dist.ReduceOp.SUM) | |
rt /= num_gpus | |
return rt | |
def init_distributed(rank, num_gpus, dist_backend, dist_url): | |
assert torch.cuda.is_available(), "Distributed mode requires CUDA." | |
print("> initializing distributed for rank {} out of {}".format(rank, num_gpus)) | |
# Set cuda device so everything is done on the right GPU. | |
torch.cuda.set_device(rank % torch.cuda.device_count()) | |
init_method = "tcp://" | |
master_ip = os.getenv("MASTER_ADDR", "localhost") | |
master_port = os.getenv("MASTER_PORT", "6000") | |
init_method += master_ip + ":" + master_port | |
torch.distributed.init_process_group( | |
backend="nccl", world_size=num_gpus, rank=rank, init_method=init_method | |
) | |
def _flatten_dense_tensors(tensors): | |
"""Flatten dense tensors into a contiguous 1D buffer. Assume tensors are of | |
same dense type. | |
Since inputs are dense, the resulting tensor will be a concatenated 1D | |
buffer. Element-wise operation on this buffer will be equivalent to | |
operating individually. | |
Arguments: | |
tensors (Iterable[Tensor]): dense tensors to flatten. | |
Returns: | |
A contiguous 1D buffer containing input tensors. | |
""" | |
if len(tensors) == 1: | |
return tensors[0].contiguous().view(-1) | |
flat = torch.cat([t.contiguous().view(-1) for t in tensors], dim=0) | |
return flat | |
def _unflatten_dense_tensors(flat, tensors): | |
"""View a flat buffer using the sizes of tensors. Assume that tensors are of | |
same dense type, and that flat is given by _flatten_dense_tensors. | |
Arguments: | |
flat (Tensor): flattened dense tensors to unflatten. | |
tensors (Iterable[Tensor]): dense tensors whose sizes will be used to | |
unflatten flat. | |
Returns: | |
Unflattened dense tensors with sizes same as tensors and values from | |
flat. | |
""" | |
outputs = [] | |
offset = 0 | |
for tensor in tensors: | |
numel = tensor.numel() | |
outputs.append(flat.narrow(0, offset, numel).view_as(tensor)) | |
offset += numel | |
return tuple(outputs) | |
def apply_gradient_allreduce(module): | |
""" | |
Modifies existing model to do gradient allreduce, but doesn't change class | |
so you don't need "module" | |
""" | |
if not hasattr(dist, "_backend"): | |
module.warn_on_half = True | |
else: | |
module.warn_on_half = True if dist._backend == dist.dist_backend.GLOO else False | |
for p in module.state_dict().values(): | |
if not torch.is_tensor(p): | |
continue | |
dist.broadcast(p, 0) | |
def allreduce_params(): | |
if module.needs_reduction: | |
module.needs_reduction = False | |
buckets = {} | |
for param in module.parameters(): | |
if param.requires_grad and param.grad is not None: | |
tp = type(param.data) | |
if tp not in buckets: | |
buckets[tp] = [] | |
buckets[tp].append(param) | |
if module.warn_on_half: | |
if torch.cuda.HalfTensor in buckets: | |
print( | |
"WARNING: gloo dist backend for half parameters may be extremely slow." | |
+ " It is recommended to use the NCCL backend in this case. This currently requires" | |
+ "PyTorch built from top of tree master." | |
) | |
module.warn_on_half = False | |
for tp in buckets: | |
bucket = buckets[tp] | |
grads = [param.grad.data for param in bucket] | |
coalesced = _flatten_dense_tensors(grads) | |
dist.all_reduce(coalesced) | |
coalesced /= dist.get_world_size() | |
for buf, synced in zip( | |
grads, _unflatten_dense_tensors(coalesced, grads) | |
): | |
buf.copy_(synced) | |
for param in list(module.parameters()): | |
def allreduce_hook(*unused): | |
Variable._execution_engine.queue_callback(allreduce_params) | |
if param.requires_grad: | |
param.register_hook(allreduce_hook) | |
dir(param) | |
def set_needs_reduction(self, input, output): | |
self.needs_reduction = True | |
module.register_forward_hook(set_needs_reduction) | |
return module | |