Spaces:
Build error
Build error
#!/usr/bin/env/python3 | |
"""Recipe for training a whisper-based ctc ASR system with librispeech. | |
The system employs whisper from OpenAI (https://cdn.openai.com/papers/whisper.pdf). | |
This recipe take only the whisper encoder and add a DNN + CTC to fine-tune. | |
If you want to use the full whisper system, please refer to the recipe | |
speechbrain/recipes/LibriSpeech/ASR/transformer/train_with_whisper.py | |
To run this recipe, do the following: | |
> python train_with_whisper.py hparams/train_hf_whisper_encoder.yaml | |
Authors | |
* Titouan Parcollet 2022 | |
* Rudolf A Braun 2022 | |
* Sung-Lin Yeh 2021 | |
* Ju-Chieh Chou 2020 | |
* Mirco Ravanelli 2020 | |
* Abdel Heba 2020 | |
* Peter Plantinga 2020 | |
* Samuele Cornell 2020 | |
""" | |
import os | |
import sys | |
import torch | |
import logging | |
import speechbrain as sb | |
from speechbrain.utils.distributed import run_on_main | |
from speechbrain.tokenizers.SentencePiece import SentencePiece | |
from speechbrain.utils.data_utils import undo_padding | |
from hyperpyyaml import load_hyperpyyaml | |
from pathlib import Path | |
logger = logging.getLogger(__name__) | |
# Define training procedure | |
class ASR(sb.Brain): | |
def compute_forward(self, batch, stage): | |
"""Forward computations from the waveform batches to the output probabilities.""" | |
batch = batch.to(self.device) | |
wavs, wav_lens = batch.sig | |
wavs, wav_lens = wavs.to(self.device), wav_lens.to(self.device) | |
# Add augmentation if specified | |
if stage == sb.Stage.TRAIN: | |
if hasattr(self.hparams, "augmentation"): | |
wavs = self.hparams.augmentation(wavs, wav_lens) | |
# Forward pass | |
# Encode with Whisper and then DNN | |
feats = self.modules.whisper(wavs) | |
x = self.modules.enc(feats) | |
# Compute outputs | |
p_tokens = None | |
logits = self.modules.ctc_lin(x) | |
p_ctc = self.hparams.log_softmax(logits) | |
if stage != sb.Stage.TRAIN: | |
p_tokens = sb.decoders.ctc_greedy_decode( | |
p_ctc, wav_lens, blank_id=self.hparams.blank_index | |
) | |
return p_ctc, wav_lens, p_tokens | |
def compute_objectives(self, predictions, batch, stage): | |
"""Computes the loss (CTC) given predictions and targets.""" | |
p_ctc, wav_lens, predicted_tokens = predictions | |
ids = batch.id | |
tokens, tokens_lens = batch.tokens | |
loss_ctc = self.hparams.ctc_cost(p_ctc, tokens, wav_lens, tokens_lens) | |
loss = loss_ctc | |
if stage != sb.Stage.TRAIN: | |
# Decode token terms to words | |
predicted_words = self.tokenizer( | |
predicted_tokens, task="decode_from_list" | |
) | |
# Convert indices to words | |
target_words = undo_padding(tokens, tokens_lens) | |
target_words = self.tokenizer(target_words, task="decode_from_list") | |
self.wer_metric.append(ids, predicted_words, target_words) | |
self.cer_metric.append(ids, predicted_words, target_words) | |
return loss | |
def fit_batch(self, batch): | |
should_step = self.step % self.grad_accumulation_factor == 0 | |
# Managing automatic mixed precision | |
if self.auto_mix_prec: | |
self.whisper_optimizer.zero_grad() | |
self.model_optimizer.zero_grad() | |
with torch.cuda.amp.autocast(): | |
outputs = self.compute_forward(batch, sb.Stage.TRAIN) | |
loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN) | |
self.scaler.scale(loss / self.grad_accumulation_factor).backward() | |
if should_step: | |
self.scaler.unscale_(self.whisper_optimizer) | |
self.scaler.unscale_(self.model_optimizer) | |
if self.check_gradients(loss): | |
if self.optimizer_step > self.hparams.warmup_steps: | |
# Here we added a warmup to the CTC encoder to make sure that | |
# it does not screw the whisper with too large gradients. | |
self.scaler.step(self.whisper_optimizer) | |
self.scaler.step(self.model_optimizer) | |
self.scaler.update() | |
self.optimizer_step += 1 | |
else: | |
outputs = self.compute_forward(batch, sb.Stage.TRAIN) | |
loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN) | |
(loss / self.grad_accumulation_factor).backward() | |
if should_step: | |
if self.check_gradients(loss): | |
# Here we added a warmup to the CTC encoder to make sure that | |
# it does not screw the whisper with too large gradients. | |
if self.optimizer_step > self.hparams.warmup_steps: | |
self.whisper_optimizer.step() | |
self.model_optimizer.step() | |
self.whisper_optimizer.zero_grad() | |
self.model_optimizer.zero_grad() | |
self.optimizer_step += 1 | |
return loss.detach().cpu() | |
def on_stage_start(self, stage, epoch): | |
"""Gets called at the beginning of each epoch""" | |
if stage != sb.Stage.TRAIN: | |
self.cer_metric = self.hparams.cer_computer() | |
self.wer_metric = self.hparams.error_rate_computer() | |
def on_stage_end(self, stage, stage_loss, epoch): | |
"""Gets called at the end of an epoch.""" | |
# Compute/store important stats | |
stage_stats = {"loss": stage_loss} | |
if stage == sb.Stage.TRAIN: | |
self.train_stats = stage_stats | |
else: | |
stage_stats["CER"] = self.cer_metric.summarize("error_rate") | |
stage_stats["WER"] = self.wer_metric.summarize("error_rate") | |
# Perform end-of-iteration things, like annealing, logging, etc. | |
if stage == sb.Stage.VALID: | |
old_lr_model, new_lr_model = self.hparams.lr_annealing_model( | |
stage_stats["loss"] | |
) | |
old_lr_whisper, new_lr_whisper = self.hparams.lr_annealing_whisper( | |
stage_stats["loss"] | |
) | |
sb.nnet.schedulers.update_learning_rate( | |
self.model_optimizer, new_lr_model | |
) | |
sb.nnet.schedulers.update_learning_rate( | |
self.whisper_optimizer, new_lr_whisper | |
) | |
self.hparams.train_logger.log_stats( | |
stats_meta={ | |
"epoch": epoch, | |
"lr_model": old_lr_model, | |
"lr_whisperc": old_lr_whisper, | |
}, | |
train_stats=self.train_stats, | |
valid_stats=stage_stats, | |
) | |
self.checkpointer.save_and_keep_only( | |
meta={"WER": stage_stats["WER"]}, min_keys=["WER"], | |
) | |
elif stage == sb.Stage.TEST: | |
self.hparams.train_logger.log_stats( | |
stats_meta={"Epoch loaded": self.hparams.epoch_counter.current}, | |
test_stats=stage_stats, | |
) | |
with open(self.hparams.wer_file, "w") as w: | |
self.wer_metric.write_stats(w) | |
def init_optimizers(self): | |
"Initializes the whisper optimizer and model optimizer" | |
self.whisper_optimizer = self.hparams.whisper_opt_class( | |
self.modules.whisper.parameters() | |
) | |
self.model_optimizer = self.hparams.model_opt_class( | |
self.hparams.model.parameters() | |
) | |
if self.checkpointer is not None: | |
self.checkpointer.add_recoverable( | |
"whisper_opt", self.whisper_optimizer | |
) | |
self.checkpointer.add_recoverable("modelopt", self.model_optimizer) | |
def dataio_prepare(hparams, tokenizer): | |
"""This function prepares the datasets to be used in the brain class. | |
It also defines the data processing pipeline through user-defined functions.""" | |
data_folder = hparams["data_folder"] | |
train_data = sb.dataio.dataset.DynamicItemDataset.from_csv( | |
csv_path=hparams["train_csv"], replacements={"data_root": data_folder}, | |
) | |
if hparams["sorting"] == "ascending": | |
# we sort training data to speed up training and get better results. | |
train_data = train_data.filtered_sorted(sort_key="duration") | |
# when sorting do not shuffle in dataloader ! otherwise is pointless | |
hparams["train_dataloader_opts"]["shuffle"] = False | |
elif hparams["sorting"] == "descending": | |
train_data = train_data.filtered_sorted( | |
sort_key="duration", reverse=True | |
) | |
# when sorting do not shuffle in dataloader ! otherwise is pointless | |
hparams["train_dataloader_opts"]["shuffle"] = False | |
elif hparams["sorting"] == "random": | |
pass | |
else: | |
raise NotImplementedError( | |
"sorting must be random, ascending or descending" | |
) | |
valid_data = sb.dataio.dataset.DynamicItemDataset.from_csv( | |
csv_path=hparams["valid_csv"], replacements={"data_root": data_folder}, | |
) | |
valid_data = valid_data.filtered_sorted(sort_key="duration") | |
# test is separate | |
test_datasets = {} | |
for csv_file in hparams["test_csv"]: | |
name = Path(csv_file).stem | |
test_datasets[name] = sb.dataio.dataset.DynamicItemDataset.from_csv( | |
csv_path=csv_file, replacements={"data_root": data_folder} | |
) | |
test_datasets[name] = test_datasets[name].filtered_sorted( | |
sort_key="duration" | |
) | |
datasets = [train_data, valid_data] + [i for k, i in test_datasets.items()] | |
# 2. Define audio pipeline: | |
def audio_pipeline(wav): | |
sig = sb.dataio.dataio.read_audio(wav) | |
return sig | |
sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline) | |
# 3. Define text pipeline: | |
def text_pipeline(wrd): | |
yield wrd | |
char_list = list(wrd) | |
yield char_list | |
tokens_list = tokenizer.sp.encode_as_ids(wrd) | |
yield tokens_list | |
tokens = torch.LongTensor(tokens_list) | |
yield tokens | |
sb.dataio.dataset.add_dynamic_item(datasets, text_pipeline) | |
# 4. Set output: | |
sb.dataio.dataset.set_output_keys( | |
datasets, ["id", "sig", "wrd", "char_list", "tokens"], | |
) | |
return train_data, valid_data, test_datasets | |
if __name__ == "__main__": | |
# CLI: | |
hparams_file, run_opts, overrides = sb.parse_arguments(sys.argv[1:]) | |
# If distributed_launch=True then | |
# create ddp_group with the right communication protocol | |
sb.utils.distributed.ddp_init_group(run_opts) | |
with open(hparams_file) as fin: | |
hparams = load_hyperpyyaml(fin, overrides) | |
# Create experiment directory | |
sb.create_experiment_directory( | |
experiment_directory=hparams["output_folder"], | |
hyperparams_to_save=hparams_file, | |
overrides=overrides, | |
) | |
# Dataset prep (parsing Librispeech) | |
from librispeech_prepare import prepare_librispeech # noqa | |
# multi-gpu (ddp) save data preparation | |
run_on_main( | |
prepare_librispeech, | |
kwargs={ | |
"data_folder": hparams["data_folder"], | |
"tr_splits": hparams["train_splits"], | |
"dev_splits": hparams["dev_splits"], | |
"te_splits": hparams["test_splits"], | |
"save_folder": hparams["output_folder"], | |
"merge_lst": hparams["train_splits"], | |
"merge_name": "train.csv", | |
"skip_prep": hparams["skip_prep"], | |
}, | |
) | |
# Defining tokenizer and loading it | |
tokenizer = SentencePiece( | |
model_dir=hparams["save_folder"], | |
vocab_size=hparams["output_neurons"], | |
annotation_train=hparams["train_csv"], | |
annotation_read="wrd", | |
model_type=hparams["token_type"], | |
character_coverage=hparams["character_coverage"], | |
) | |
# here we create the datasets objects as well as tokenization and encoding | |
train_data, valid_data, test_datasets = dataio_prepare(hparams, tokenizer) | |
# Trainer initialization | |
asr_brain = ASR( | |
modules=hparams["modules"], | |
hparams=hparams, | |
run_opts=run_opts, | |
checkpointer=hparams["checkpointer"], | |
) | |
# We load the pretrained whisper model | |
if "pretrainer" in hparams.keys(): | |
run_on_main(hparams["pretrainer"].collect_files) | |
hparams["pretrainer"].load_collected(asr_brain.device) | |
# We dynamicaly add the tokenizer to our brain class. | |
# NB: This tokenizer corresponds to the one used for the LM!! | |
asr_brain.tokenizer = tokenizer | |
# Training | |
asr_brain.fit( | |
asr_brain.hparams.epoch_counter, | |
train_data, | |
valid_data, | |
train_loader_kwargs=hparams["train_dataloader_opts"], | |
valid_loader_kwargs=hparams["valid_dataloader_opts"], | |
) | |
# Testing | |
for k in test_datasets.keys(): # keys are test_clean, test_other etc | |
asr_brain.hparams.wer_file = os.path.join( | |
hparams["output_folder"], "wer_{}.txt".format(k) | |
) | |
asr_brain.evaluate( | |
test_datasets[k], test_loader_kwargs=hparams["test_dataloader_opts"] | |
) | |