ljy266987
add lfs
12bfd03
raw
history blame
3.31 kB
# ------------------------------------------
# Diffsound
# code based https://github.com/cientgu/VQ-Diffusion
# ------------------------------------------
import pickle
import torch
from torch import distributed as dist
from torch.utils import data
LOCAL_PROCESS_GROUP = None
def is_primary():
return get_rank() == 0
def get_rank():
if not dist.is_available():
return 0
if not dist.is_initialized():
return 0
return dist.get_rank()
def get_local_rank():
if not dist.is_available():
return 0
if not dist.is_initialized():
return 0
if LOCAL_PROCESS_GROUP is None:
raise ValueError("tensorfn.distributed.LOCAL_PROCESS_GROUP is None")
return dist.get_rank(group=LOCAL_PROCESS_GROUP)
def synchronize():
if not dist.is_available():
return
if not dist.is_initialized():
return
world_size = dist.get_world_size()
if world_size == 1:
return
dist.barrier()
def get_world_size():
if not dist.is_available():
return 1
if not dist.is_initialized():
return 1
return dist.get_world_size()
def is_distributed():
raise RuntimeError('Please debug this function!')
return get_world_size() > 1
def all_reduce(tensor, op=dist.ReduceOp.SUM, async_op=False):
world_size = get_world_size()
if world_size == 1:
return tensor
dist.all_reduce(tensor, op=op, async_op=async_op)
return tensor
def all_gather(data):
world_size = get_world_size()
if world_size == 1:
return [data]
buffer = pickle.dumps(data)
storage = torch.ByteStorage.from_buffer(buffer)
tensor = torch.ByteTensor(storage).to("cuda")
local_size = torch.IntTensor([tensor.numel()]).to("cuda")
size_list = [torch.IntTensor([1]).to("cuda") for _ in range(world_size)]
dist.all_gather(size_list, local_size)
size_list = [int(size.item()) for size in size_list]
max_size = max(size_list)
tensor_list = []
for _ in size_list:
tensor_list.append(torch.ByteTensor(size=(max_size, )).to("cuda"))
if local_size != max_size:
padding = torch.ByteTensor(size=(max_size - local_size, )).to("cuda")
tensor = torch.cat((tensor, padding), 0)
dist.all_gather(tensor_list, tensor)
data_list = []
for size, tensor in zip(size_list, tensor_list):
buffer = tensor.cpu().numpy().tobytes()[:size]
data_list.append(pickle.loads(buffer))
return data_list
def reduce_dict(input_dict, average=True):
world_size = get_world_size()
if world_size < 2:
return input_dict
with torch.no_grad():
keys = []
values = []
for k in sorted(input_dict.keys()):
keys.append(k)
values.append(input_dict[k])
values = torch.stack(values, 0)
dist.reduce(values, dst=0)
if dist.get_rank() == 0 and average:
values /= world_size
reduced_dict = {k: v for k, v in zip(keys, values)}
return reduced_dict
def data_sampler(dataset, shuffle, distributed):
if distributed:
return data.distributed.DistributedSampler(dataset, shuffle=shuffle)
if shuffle:
return data.RandomSampler(dataset)
else:
return data.SequentialSampler(dataset)