Spaces:
Sleeping
Sleeping
# ONE EPOCH = one forward pass and one backward pass of all the training examples. | |
# | |
# BATCH SIZE = the number of training examples in one forward/backward pass. The | |
# higher the batch size, the more memory space you'll need. | |
# | |
# NUMBER OF ITERATIONS = number of passes, each pass using [batch size] number of | |
# examples. To be clear, one pass = one forward pass + one backward pass. | |
# | |
# Example: if you have 1000 training examples, and your batch size is 500, then | |
# it will take 2 iterations to complete 1 epoch. | |
import os | |
import time | |
import math | |
import torch | |
import torch.distributed as dist | |
from torch.utils.data.distributed import DistributedSampler | |
from torch.utils.data import DataLoader | |
from numpy import finfo | |
from Tacotron2 import tacotron_2 | |
from fp16_optimizer import FP16_Optimizer | |
# from distributed import apply_gradient_allreduce | |
from loss_function import Tacotron2Loss | |
from logger import Tacotron2Logger | |
def batchnorm_to_float(module): | |
"""Converts batch norm modules to FP32""" | |
if isinstance(module, torch.nn.modules.batchnorm._BatchNorm): | |
module.float() | |
for child in module.children(): | |
batchnorm_to_float(child) | |
return module | |
def reduce_tensor(tensor, n_gpus): | |
# this function is recorded in the computation graph. Gradients propagating to the cloned tensor will propagate to | |
# the original tensor | |
rt = tensor.clone() | |
# Each rank has a tensor and all_reduce sums up all tensors from different ranks to all ranks. Computes the average | |
# of the tensor results of all ranks (a rank is a gpu as far as I understood): | |
dist.all_reduce(rt, op=dist.reduce_op.SUM) | |
rt /= n_gpus | |
return rt | |
def prepare_directories_and_logger(output_directory, log_directory, rank): | |
if rank == 0: | |
if not os.path.isdir(output_directory): | |
os.makedirs(output_directory) | |
os.chmod(output_directory, 0o775) | |
logger = Tacotron2Logger(os.path.join(output_directory, log_directory)) | |
# logger = None | |
else: | |
logger = None | |
return logger | |
def warm_start_model(checkpoint_path, model): | |
assert os.path.isfile(checkpoint_path) | |
print("Warm starting model from checkpoint '{}'".format(checkpoint_path)) | |
checkpoint_dict = torch.load(checkpoint_path, map_location='cpu') | |
model.load_state_dict(checkpoint_dict['state_dict']) | |
return model | |
def load_checkpoint(checkpoint_path, model, optimizer): | |
assert os.path.isfile(checkpoint_path) | |
print("Loading checkpoint '{}'".format(checkpoint_path)) | |
checkpoint_dict = torch.load(checkpoint_path, map_location='cpu') | |
model.load_state_dict(checkpoint_dict['state_dict']) | |
optimizer.load_state_dict(checkpoint_dict['optimizer']) | |
learning_rate = checkpoint_dict['learning_rate'] | |
iteration = checkpoint_dict['iteration'] | |
print("Loaded checkpoint '{}' from iteration {}".format(checkpoint_path, iteration)) | |
return model, optimizer, learning_rate, iteration | |
def save_checkpoint(model, optimizer, learning_rate, iteration, filepath): | |
print("Saving model and optimizer state at iteration {} to {}".format(iteration, filepath)) | |
torch.save({'iteration': iteration, | |
'state_dict': model.state_dict(), | |
'optimizer': optimizer.state_dict(), | |
'learning_rate': learning_rate}, filepath) | |
def init_distributed(hyper_params, n_gpus, rank, group_name): | |
assert torch.cuda.is_available(), "Distributed mode requires CUDA" | |
print("Initializing distributed") | |
# Set CUDA device so everything is done on the right GPU | |
torch.cuda.set_device(rank % torch.cuda.device_count()) | |
# Initialize distributed communication | |
torch.distributed.init_process_group(backend=hyper_params['dist_backend'], rank=rank, world_size=n_gpus, | |
init_method=hyper_params['dist_url'], group_name=group_name) | |
print("Initializing distributed: Done") | |
def load_model(hyper_params): | |
# according to the documentation, it is recommended to move a model to GPU before constructing the optimizer | |
# model = tacotron_2(hyper_params).cuda() | |
model = tacotron_2(hyper_params) | |
if hyper_params['fp16_run']: # converts everything into half type (16 bits) | |
model = batchnorm_to_float(model.half()) | |
model.decoder.attention_layer.score_mask_value = float(finfo('float16').min) | |
# if hyper_params['distributed_run']: | |
# model = apply_gradient_allreduce(model) | |
return model | |
def validate(model, criterion, valset, iteration, batch_size, n_gpus, collate_fn, logger, distributed_run, rank): | |
"""Handles all the validation scoring and printing""" | |
# We change to eval() because this is an evaluation stage and not a training | |
model.eval() | |
# temporarily set all the requires_grad flag to false | |
with torch.no_grad(): | |
# Sampler that restricts data loading to a subset of the dataset. Distributed sampler for distributed batch. | |
# Which samples take (randomization?) | |
val_sampler = DistributedSampler(valset) if distributed_run else None | |
# data loader wraper to the validation data (same as for the training data) | |
val_loader = DataLoader(valset, sampler=val_sampler, num_workers=1, shuffle=False, batch_size=batch_size, | |
pin_memory=False, collate_fn=collate_fn) | |
val_loss = 0.0 | |
for i, batch in enumerate(val_loader): | |
x, y = model.parse_batch(batch) | |
y_pred = model(x) | |
_, _, _, _, gst_scores = y_pred | |
if i == 0: | |
validation_gst_scores = gst_scores | |
else: | |
validation_gst_scores = torch.cat((validation_gst_scores, gst_scores), 0) | |
loss = criterion(y_pred, y) | |
if distributed_run: | |
reduced_val_loss = reduce_tensor(loss.data, n_gpus).item() # gets the pure float value with item() | |
else: | |
reduced_val_loss = loss.item() | |
val_loss += reduced_val_loss | |
val_loss = val_loss / (i + 1) # Averaged val_loss from all batches | |
model.train() | |
if rank == 0: | |
print("Validation loss {}: {:9f} ".format(iteration, val_loss)) # I changed this | |
# print("GST scores of the validation set: {}".format(validation_gst_scores.shape)) | |
logger.log_validation(reduced_val_loss, model, y, y_pred, validation_gst_scores, iteration) | |
# ------------------------------------------- MAIN TRAINING METHOD -------------------------------------------------- # | |
def train(output_directory, log_directory, checkpoint_path, warm_start, n_gpus, rank, group_name, | |
hyper_params, train_loader, valset, collate_fn): | |
"""Training and validation method with logging results to tensorboard and stdout | |
:param output_directory (string): directory to save checkpoints | |
:param log_directory (string): directory to save tensorboard logs | |
:param checkpoint_path (string): checkpoint path | |
:param n_gpus (int): number of gpus | |
:param rank (int): rank of current gpu | |
:param hyper_params (object dictionary): dictionary with all hyper parameters | |
""" | |
# Check whether is a distributed running | |
if hyper_params['distributed_run']: | |
init_distributed(hyper_params, n_gpus, rank, group_name) | |
# set the same fixed seed to reproduce same results everytime we train | |
torch.manual_seed(hyper_params['seed']) | |
torch.cuda.manual_seed(hyper_params['seed']) | |
model = load_model(hyper_params) | |
learning_rate = hyper_params['learning_rate'] | |
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=hyper_params['weight_decay']) | |
if hyper_params['fp16_run']: | |
optimizer = FP16_Optimizer(optimizer, dynamic_loss_scale=hyper_params['dynamic_loss_scaling']) | |
# Define the criterion of the loss function. The objective. | |
criterion = Tacotron2Loss() | |
logger = prepare_directories_and_logger(output_directory, log_directory, rank) | |
# logger = '' | |
iteration = 0 | |
epoch_offset = 0 | |
if checkpoint_path is not None: | |
if warm_start: | |
# Re-start the model from the last checkpoint if we save the parameters and don't want to start from 0 | |
model = warm_start_model(checkpoint_path, model) | |
else: | |
# CHECK THIS OUT!!! | |
model, optimizer, _learning_rate, iteration = load_checkpoint(checkpoint_path, model, optimizer) | |
if hyper_params['use_saved_learning_rate']: | |
learning_rate = _learning_rate | |
iteration += 1 # next iteration is iteration + 1 | |
epoch_offset = max(0, int(iteration / len(train_loader))) | |
# Set this to make all modules and regularization aware this is the training stage: | |
model.train() | |
# MAIN LOOP | |
for epoch in range(epoch_offset, hyper_params['epochs']): | |
print("Epoch: {}".format(epoch)) | |
for i, batch in enumerate(train_loader): | |
start = time.perf_counter() | |
# CHECK THIS OUT!!! | |
for param_group in optimizer.param_groups: | |
param_group['lr'] = learning_rate | |
model.zero_grad() | |
input_data, output_target = model.parse_batch(batch) | |
output_predicted = model(input_data) | |
loss = criterion(output_predicted, output_target) | |
if hyper_params['distributed_run']: | |
reduced_loss = reduce_tensor(loss.data, n_gpus).item() | |
else: | |
reduced_loss = loss.item() | |
if hyper_params['fp16_run']: | |
optimizer.backward(loss) # transformed optimizer into fp16 type | |
grad_norm = optimizer.clip_fp32_grads(hyper_params['grad_clip_thresh']) | |
else: | |
loss.backward() | |
grad_norm = torch.nn.utils.clip_grad_norm_(model.parameters(), hyper_params['grad_clip_thresh']) | |
# Performs a single optimization step (parameter update) | |
optimizer.step() | |
# This boolean controls overflow when running in fp16 optimizer | |
overflow = optimizer.overflow if hyper_params['fp16_run'] else False | |
# If overflow is True, it will not enter. If isnan is True, it will not enter neither. | |
if not overflow and not math.isnan(reduced_loss) and rank == 0: | |
duration = time.perf_counter() - start | |
print("Train loss {} {:.6f} Grand Norm {:.6f} {:.2f}s/it".format(iteration, reduced_loss, | |
grad_norm, duration)) | |
# logs training information of the current iteration | |
logger.log_training(reduced_loss, grad_norm, learning_rate, duration, iteration) | |
# Every iters_per_checkpoint steps there is a validation of the model and its updated parameters | |
if not overflow and (iteration % hyper_params['iters_per_checkpoint'] == 0): | |
validate(model, criterion, valset, iteration, hyper_params['batch_size'], n_gpus, collate_fn, | |
logger, hyper_params['distributed_run'], rank) | |
if rank == 0: | |
checkpoint_path = os.path.join(output_directory, "checkpoint_{}".format(iteration)) | |
save_checkpoint(model, optimizer, learning_rate, iteration, checkpoint_path) | |
iteration += 1 | |