Tunisian-ASR-v0 / train_with_whisper.py
anonymoussubmitter222
added app file
e63fe3d
raw
history blame
13.2 kB
#!/usr/bin/env/python3
"""Recipe for training a whisper-based ctc ASR system with librispeech.
The system employs whisper from OpenAI (https://cdn.openai.com/papers/whisper.pdf).
This recipe take only the whisper encoder and add a DNN + CTC to fine-tune.
If you want to use the full whisper system, please refer to the recipe
speechbrain/recipes/LibriSpeech/ASR/transformer/train_with_whisper.py
To run this recipe, do the following:
> python train_with_whisper.py hparams/train_hf_whisper_encoder.yaml
Authors
* Titouan Parcollet 2022
* Rudolf A Braun 2022
* Sung-Lin Yeh 2021
* Ju-Chieh Chou 2020
* Mirco Ravanelli 2020
* Abdel Heba 2020
* Peter Plantinga 2020
* Samuele Cornell 2020
"""
import os
import sys
import torch
import logging
import speechbrain as sb
from speechbrain.utils.distributed import run_on_main
from speechbrain.tokenizers.SentencePiece import SentencePiece
from speechbrain.utils.data_utils import undo_padding
from hyperpyyaml import load_hyperpyyaml
from pathlib import Path
logger = logging.getLogger(__name__)
# Define training procedure
class ASR(sb.Brain):
def compute_forward(self, batch, stage):
"""Forward computations from the waveform batches to the output probabilities."""
batch = batch.to(self.device)
wavs, wav_lens = batch.sig
wavs, wav_lens = wavs.to(self.device), wav_lens.to(self.device)
# Add augmentation if specified
if stage == sb.Stage.TRAIN:
if hasattr(self.hparams, "augmentation"):
wavs = self.hparams.augmentation(wavs, wav_lens)
# Forward pass
# Encode with Whisper and then DNN
feats = self.modules.whisper(wavs)
x = self.modules.enc(feats)
# Compute outputs
p_tokens = None
logits = self.modules.ctc_lin(x)
p_ctc = self.hparams.log_softmax(logits)
if stage != sb.Stage.TRAIN:
p_tokens = sb.decoders.ctc_greedy_decode(
p_ctc, wav_lens, blank_id=self.hparams.blank_index
)
return p_ctc, wav_lens, p_tokens
def compute_objectives(self, predictions, batch, stage):
"""Computes the loss (CTC) given predictions and targets."""
p_ctc, wav_lens, predicted_tokens = predictions
ids = batch.id
tokens, tokens_lens = batch.tokens
loss_ctc = self.hparams.ctc_cost(p_ctc, tokens, wav_lens, tokens_lens)
loss = loss_ctc
if stage != sb.Stage.TRAIN:
# Decode token terms to words
predicted_words = self.tokenizer(
predicted_tokens, task="decode_from_list"
)
# Convert indices to words
target_words = undo_padding(tokens, tokens_lens)
target_words = self.tokenizer(target_words, task="decode_from_list")
self.wer_metric.append(ids, predicted_words, target_words)
self.cer_metric.append(ids, predicted_words, target_words)
return loss
def fit_batch(self, batch):
should_step = self.step % self.grad_accumulation_factor == 0
# Managing automatic mixed precision
if self.auto_mix_prec:
self.whisper_optimizer.zero_grad()
self.model_optimizer.zero_grad()
with torch.cuda.amp.autocast():
outputs = self.compute_forward(batch, sb.Stage.TRAIN)
loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN)
self.scaler.scale(loss / self.grad_accumulation_factor).backward()
if should_step:
self.scaler.unscale_(self.whisper_optimizer)
self.scaler.unscale_(self.model_optimizer)
if self.check_gradients(loss):
if self.optimizer_step > self.hparams.warmup_steps:
# Here we added a warmup to the CTC encoder to make sure that
# it does not screw the whisper with too large gradients.
self.scaler.step(self.whisper_optimizer)
self.scaler.step(self.model_optimizer)
self.scaler.update()
self.optimizer_step += 1
else:
outputs = self.compute_forward(batch, sb.Stage.TRAIN)
loss = self.compute_objectives(outputs, batch, sb.Stage.TRAIN)
(loss / self.grad_accumulation_factor).backward()
if should_step:
if self.check_gradients(loss):
# Here we added a warmup to the CTC encoder to make sure that
# it does not screw the whisper with too large gradients.
if self.optimizer_step > self.hparams.warmup_steps:
self.whisper_optimizer.step()
self.model_optimizer.step()
self.whisper_optimizer.zero_grad()
self.model_optimizer.zero_grad()
self.optimizer_step += 1
return loss.detach().cpu()
def on_stage_start(self, stage, epoch):
"""Gets called at the beginning of each epoch"""
if stage != sb.Stage.TRAIN:
self.cer_metric = self.hparams.cer_computer()
self.wer_metric = self.hparams.error_rate_computer()
def on_stage_end(self, stage, stage_loss, epoch):
"""Gets called at the end of an epoch."""
# Compute/store important stats
stage_stats = {"loss": stage_loss}
if stage == sb.Stage.TRAIN:
self.train_stats = stage_stats
else:
stage_stats["CER"] = self.cer_metric.summarize("error_rate")
stage_stats["WER"] = self.wer_metric.summarize("error_rate")
# Perform end-of-iteration things, like annealing, logging, etc.
if stage == sb.Stage.VALID:
old_lr_model, new_lr_model = self.hparams.lr_annealing_model(
stage_stats["loss"]
)
old_lr_whisper, new_lr_whisper = self.hparams.lr_annealing_whisper(
stage_stats["loss"]
)
sb.nnet.schedulers.update_learning_rate(
self.model_optimizer, new_lr_model
)
sb.nnet.schedulers.update_learning_rate(
self.whisper_optimizer, new_lr_whisper
)
self.hparams.train_logger.log_stats(
stats_meta={
"epoch": epoch,
"lr_model": old_lr_model,
"lr_whisperc": old_lr_whisper,
},
train_stats=self.train_stats,
valid_stats=stage_stats,
)
self.checkpointer.save_and_keep_only(
meta={"WER": stage_stats["WER"]}, min_keys=["WER"],
)
elif stage == sb.Stage.TEST:
self.hparams.train_logger.log_stats(
stats_meta={"Epoch loaded": self.hparams.epoch_counter.current},
test_stats=stage_stats,
)
with open(self.hparams.wer_file, "w") as w:
self.wer_metric.write_stats(w)
def init_optimizers(self):
"Initializes the whisper optimizer and model optimizer"
self.whisper_optimizer = self.hparams.whisper_opt_class(
self.modules.whisper.parameters()
)
self.model_optimizer = self.hparams.model_opt_class(
self.hparams.model.parameters()
)
if self.checkpointer is not None:
self.checkpointer.add_recoverable(
"whisper_opt", self.whisper_optimizer
)
self.checkpointer.add_recoverable("modelopt", self.model_optimizer)
def dataio_prepare(hparams, tokenizer):
"""This function prepares the datasets to be used in the brain class.
It also defines the data processing pipeline through user-defined functions."""
data_folder = hparams["data_folder"]
train_data = sb.dataio.dataset.DynamicItemDataset.from_csv(
csv_path=hparams["train_csv"], replacements={"data_root": data_folder},
)
if hparams["sorting"] == "ascending":
# we sort training data to speed up training and get better results.
train_data = train_data.filtered_sorted(sort_key="duration")
# when sorting do not shuffle in dataloader ! otherwise is pointless
hparams["train_dataloader_opts"]["shuffle"] = False
elif hparams["sorting"] == "descending":
train_data = train_data.filtered_sorted(
sort_key="duration", reverse=True
)
# when sorting do not shuffle in dataloader ! otherwise is pointless
hparams["train_dataloader_opts"]["shuffle"] = False
elif hparams["sorting"] == "random":
pass
else:
raise NotImplementedError(
"sorting must be random, ascending or descending"
)
valid_data = sb.dataio.dataset.DynamicItemDataset.from_csv(
csv_path=hparams["valid_csv"], replacements={"data_root": data_folder},
)
valid_data = valid_data.filtered_sorted(sort_key="duration")
# test is separate
test_datasets = {}
for csv_file in hparams["test_csv"]:
name = Path(csv_file).stem
test_datasets[name] = sb.dataio.dataset.DynamicItemDataset.from_csv(
csv_path=csv_file, replacements={"data_root": data_folder}
)
test_datasets[name] = test_datasets[name].filtered_sorted(
sort_key="duration"
)
datasets = [train_data, valid_data] + [i for k, i in test_datasets.items()]
# 2. Define audio pipeline:
@sb.utils.data_pipeline.takes("wav")
@sb.utils.data_pipeline.provides("sig")
def audio_pipeline(wav):
sig = sb.dataio.dataio.read_audio(wav)
return sig
sb.dataio.dataset.add_dynamic_item(datasets, audio_pipeline)
# 3. Define text pipeline:
@sb.utils.data_pipeline.takes("wrd")
@sb.utils.data_pipeline.provides(
"wrd", "char_list", "tokens_list", "tokens"
)
def text_pipeline(wrd):
yield wrd
char_list = list(wrd)
yield char_list
tokens_list = tokenizer.sp.encode_as_ids(wrd)
yield tokens_list
tokens = torch.LongTensor(tokens_list)
yield tokens
sb.dataio.dataset.add_dynamic_item(datasets, text_pipeline)
# 4. Set output:
sb.dataio.dataset.set_output_keys(
datasets, ["id", "sig", "wrd", "char_list", "tokens"],
)
return train_data, valid_data, test_datasets
if __name__ == "__main__":
# CLI:
hparams_file, run_opts, overrides = sb.parse_arguments(sys.argv[1:])
# If distributed_launch=True then
# create ddp_group with the right communication protocol
sb.utils.distributed.ddp_init_group(run_opts)
with open(hparams_file) as fin:
hparams = load_hyperpyyaml(fin, overrides)
# Create experiment directory
sb.create_experiment_directory(
experiment_directory=hparams["output_folder"],
hyperparams_to_save=hparams_file,
overrides=overrides,
)
# Dataset prep (parsing Librispeech)
from librispeech_prepare import prepare_librispeech # noqa
# multi-gpu (ddp) save data preparation
run_on_main(
prepare_librispeech,
kwargs={
"data_folder": hparams["data_folder"],
"tr_splits": hparams["train_splits"],
"dev_splits": hparams["dev_splits"],
"te_splits": hparams["test_splits"],
"save_folder": hparams["output_folder"],
"merge_lst": hparams["train_splits"],
"merge_name": "train.csv",
"skip_prep": hparams["skip_prep"],
},
)
# Defining tokenizer and loading it
tokenizer = SentencePiece(
model_dir=hparams["save_folder"],
vocab_size=hparams["output_neurons"],
annotation_train=hparams["train_csv"],
annotation_read="wrd",
model_type=hparams["token_type"],
character_coverage=hparams["character_coverage"],
)
# here we create the datasets objects as well as tokenization and encoding
train_data, valid_data, test_datasets = dataio_prepare(hparams, tokenizer)
# Trainer initialization
asr_brain = ASR(
modules=hparams["modules"],
hparams=hparams,
run_opts=run_opts,
checkpointer=hparams["checkpointer"],
)
# We load the pretrained whisper model
if "pretrainer" in hparams.keys():
run_on_main(hparams["pretrainer"].collect_files)
hparams["pretrainer"].load_collected(asr_brain.device)
# We dynamicaly add the tokenizer to our brain class.
# NB: This tokenizer corresponds to the one used for the LM!!
asr_brain.tokenizer = tokenizer
# Training
asr_brain.fit(
asr_brain.hparams.epoch_counter,
train_data,
valid_data,
train_loader_kwargs=hparams["train_dataloader_opts"],
valid_loader_kwargs=hparams["valid_dataloader_opts"],
)
# Testing
for k in test_datasets.keys(): # keys are test_clean, test_other etc
asr_brain.hparams.wer_file = os.path.join(
hparams["output_folder"], "wer_{}.txt".format(k)
)
asr_brain.evaluate(
test_datasets[k], test_loader_kwargs=hparams["test_dataloader_opts"]
)