|
import os |
|
from glob import glob |
|
import re |
|
import sys |
|
from pathlib import Path |
|
|
|
from tqdm import tqdm |
|
|
|
from TTS.tts.utils.generic_utils import split_dataset |
|
|
|
|
|
|
|
|
|
|
|
def load_meta_data(datasets, eval_split=True): |
|
meta_data_train_all = [] |
|
meta_data_eval_all = [] if eval_split else None |
|
for dataset in datasets: |
|
name = dataset['name'] |
|
root_path = dataset['path'] |
|
meta_file_train = dataset['meta_file_train'] |
|
meta_file_val = dataset['meta_file_val'] |
|
|
|
preprocessor = get_preprocessor_by_name(name) |
|
|
|
meta_data_train = preprocessor(root_path, meta_file_train) |
|
print(f" | > Found {len(meta_data_train)} files in {Path(root_path).resolve()}") |
|
|
|
if eval_split: |
|
if meta_file_val is None: |
|
meta_data_eval, meta_data_train = split_dataset(meta_data_train) |
|
else: |
|
meta_data_eval = preprocessor(root_path, meta_file_val) |
|
meta_data_eval_all += meta_data_eval |
|
meta_data_train_all += meta_data_train |
|
|
|
if 'meta_file_attn_mask' in dataset: |
|
meta_data = dict(load_attention_mask_meta_data(dataset['meta_file_attn_mask'])) |
|
for idx, ins in enumerate(meta_data_train_all): |
|
attn_file = meta_data[ins[1]].strip() |
|
meta_data_train_all[idx].append(attn_file) |
|
if meta_data_eval_all is not None: |
|
for idx, ins in enumerate(meta_data_eval_all): |
|
attn_file = meta_data[ins[1]].strip() |
|
meta_data_eval_all[idx].append(attn_file) |
|
return meta_data_train_all, meta_data_eval_all |
|
|
|
|
|
def load_attention_mask_meta_data(metafile_path): |
|
"""Load meta data file created by compute_attention_masks.py""" |
|
with open(metafile_path, 'r') as f: |
|
lines = f.readlines() |
|
|
|
meta_data = [] |
|
for line in lines: |
|
wav_file, attn_file = line.split('|') |
|
meta_data.append([wav_file, attn_file]) |
|
return meta_data |
|
|
|
|
|
def get_preprocessor_by_name(name): |
|
"""Returns the respective preprocessing function.""" |
|
thismodule = sys.modules[__name__] |
|
return getattr(thismodule, name.lower()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def tweb(root_path, meta_file): |
|
"""Normalize TWEB dataset. |
|
https://www.kaggle.com/bryanpark/the-world-english-bible-speech-dataset |
|
""" |
|
txt_file = os.path.join(root_path, meta_file) |
|
items = [] |
|
speaker_name = "tweb" |
|
with open(txt_file, 'r') as ttf: |
|
for line in ttf: |
|
cols = line.split('\t') |
|
wav_file = os.path.join(root_path, cols[0] + '.wav') |
|
text = cols[1] |
|
items.append([text, wav_file, speaker_name]) |
|
return items |
|
|
|
|
|
def mozilla(root_path, meta_file): |
|
"""Normalizes Mozilla meta data files to TTS format""" |
|
txt_file = os.path.join(root_path, meta_file) |
|
items = [] |
|
speaker_name = "mozilla" |
|
with open(txt_file, 'r') as ttf: |
|
for line in ttf: |
|
cols = line.split('|') |
|
wav_file = cols[1].strip() |
|
text = cols[0].strip() |
|
wav_file = os.path.join(root_path, "wavs", wav_file) |
|
items.append([text, wav_file, speaker_name]) |
|
return items |
|
|
|
|
|
def mozilla_de(root_path, meta_file): |
|
"""Normalizes Mozilla meta data files to TTS format""" |
|
txt_file = os.path.join(root_path, meta_file) |
|
items = [] |
|
speaker_name = "mozilla" |
|
with open(txt_file, 'r', encoding="ISO 8859-1") as ttf: |
|
for line in ttf: |
|
cols = line.strip().split('|') |
|
wav_file = cols[0].strip() |
|
text = cols[1].strip() |
|
folder_name = f"BATCH_{wav_file.split('_')[0]}_FINAL" |
|
wav_file = os.path.join(root_path, folder_name, wav_file) |
|
items.append([text, wav_file, speaker_name]) |
|
return items |
|
|
|
|
|
def mailabs(root_path, meta_files=None): |
|
"""Normalizes M-AI-Labs meta data files to TTS format""" |
|
speaker_regex = re.compile( |
|
"by_book/(male|female)/(?P<speaker_name>[^/]+)/") |
|
if meta_files is None: |
|
csv_files = glob(root_path + "/**/metadata.csv", recursive=True) |
|
else: |
|
csv_files = meta_files |
|
|
|
items = [] |
|
for csv_file in csv_files: |
|
txt_file = os.path.join(root_path, csv_file) |
|
folder = os.path.dirname(txt_file) |
|
|
|
speaker_name_match = speaker_regex.search(txt_file) |
|
if speaker_name_match is None: |
|
continue |
|
speaker_name = speaker_name_match.group("speaker_name") |
|
print(" | > {}".format(csv_file)) |
|
with open(txt_file, 'r') as ttf: |
|
for line in ttf: |
|
cols = line.split('|') |
|
if meta_files is None: |
|
wav_file = os.path.join(folder, 'wavs', cols[0] + '.wav') |
|
else: |
|
wav_file = os.path.join(root_path, |
|
folder.replace("metadata.csv", ""), |
|
'wavs', cols[0] + '.wav') |
|
if os.path.isfile(wav_file): |
|
text = cols[1].strip() |
|
items.append([text, wav_file, speaker_name]) |
|
else: |
|
raise RuntimeError("> File %s does not exist!" % |
|
(wav_file)) |
|
return items |
|
|
|
|
|
def ljspeech(root_path, meta_file): |
|
"""Normalizes the Nancy meta data file to TTS format""" |
|
txt_file = os.path.join(root_path, meta_file) |
|
items = [] |
|
speaker_name = "ljspeech" |
|
with open(txt_file, 'r') as ttf: |
|
for line in ttf: |
|
cols = line.split('|') |
|
wav_file = os.path.join(root_path, 'wavs', cols[0] + '.wav') |
|
text = cols[1] |
|
items.append([text, wav_file, speaker_name]) |
|
return items |
|
|
|
|
|
def nancy(root_path, meta_file): |
|
"""Normalizes the Nancy meta data file to TTS format""" |
|
txt_file = os.path.join(root_path, meta_file) |
|
items = [] |
|
speaker_name = "nancy" |
|
with open(txt_file, 'r') as ttf: |
|
for line in ttf: |
|
utt_id = line.split()[1] |
|
text = line[line.find('"') + 1:line.rfind('"') - 1] |
|
wav_file = os.path.join(root_path, "wavn", utt_id + ".wav") |
|
items.append([text, wav_file, speaker_name]) |
|
return items |
|
|
|
|
|
def common_voice(root_path, meta_file): |
|
"""Normalize the common voice meta data file to TTS format.""" |
|
txt_file = os.path.join(root_path, meta_file) |
|
items = [] |
|
with open(txt_file, 'r') as ttf: |
|
for line in ttf: |
|
if line.startswith("client_id"): |
|
continue |
|
cols = line.split("\t") |
|
text = cols[2] |
|
speaker_name = cols[0] |
|
wav_file = os.path.join(root_path, "clips", cols[1].replace(".mp3", ".wav")) |
|
items.append([text, wav_file, 'MCV_' + speaker_name]) |
|
return items |
|
|
|
|
|
def libri_tts(root_path, meta_files=None): |
|
"""https://ai.google/tools/datasets/libri-tts/""" |
|
items = [] |
|
if meta_files is None: |
|
meta_files = glob(f"{root_path}/**/*trans.tsv", recursive=True) |
|
for meta_file in meta_files: |
|
_meta_file = os.path.basename(meta_file).split('.')[0] |
|
speaker_name = _meta_file.split('_')[0] |
|
chapter_id = _meta_file.split('_')[1] |
|
_root_path = os.path.join(root_path, f"{speaker_name}/{chapter_id}") |
|
with open(meta_file, 'r') as ttf: |
|
for line in ttf: |
|
cols = line.split('\t') |
|
wav_file = os.path.join(_root_path, cols[0] + '.wav') |
|
text = cols[1] |
|
items.append([text, wav_file, 'LTTS_' + speaker_name]) |
|
for item in items: |
|
assert os.path.exists( |
|
item[1]), f" [!] wav files don't exist - {item[1]}" |
|
return items |
|
|
|
|
|
def custom_turkish(root_path, meta_file): |
|
txt_file = os.path.join(root_path, meta_file) |
|
items = [] |
|
speaker_name = "turkish-female" |
|
skipped_files = [] |
|
with open(txt_file, 'r', encoding='utf-8') as ttf: |
|
for line in ttf: |
|
cols = line.split('|') |
|
wav_file = os.path.join(root_path, 'wavs', |
|
cols[0].strip() + '.wav') |
|
if not os.path.exists(wav_file): |
|
skipped_files.append(wav_file) |
|
continue |
|
text = cols[1].strip() |
|
items.append([text, wav_file, speaker_name]) |
|
print(f" [!] {len(skipped_files)} files skipped. They don't exist...") |
|
return items |
|
|
|
|
|
|
|
def brspeech(root_path, meta_file): |
|
'''BRSpeech 3.0 beta''' |
|
txt_file = os.path.join(root_path, meta_file) |
|
items = [] |
|
with open(txt_file, 'r') as ttf: |
|
for line in ttf: |
|
if line.startswith("wav_filename"): |
|
continue |
|
cols = line.split('|') |
|
wav_file = os.path.join(root_path, cols[0]) |
|
text = cols[2] |
|
speaker_name = cols[3] |
|
items.append([text, wav_file, speaker_name]) |
|
return items |
|
|
|
|
|
def vctk(root_path, meta_files=None, wavs_path='wav48'): |
|
"""homepages.inf.ed.ac.uk/jyamagis/release/VCTK-Corpus.tar.gz""" |
|
test_speakers = meta_files |
|
items = [] |
|
meta_files = glob(f"{os.path.join(root_path,'txt')}/**/*.txt", recursive=True) |
|
for meta_file in meta_files: |
|
_, speaker_id, txt_file = os.path.relpath(meta_file, |
|
root_path).split(os.sep) |
|
file_id = txt_file.split('.')[0] |
|
if isinstance(test_speakers, |
|
list): |
|
if speaker_id in test_speakers: |
|
continue |
|
with open(meta_file) as file_text: |
|
text = file_text.readlines()[0] |
|
wav_file = os.path.join(root_path, wavs_path, speaker_id, |
|
file_id + '.wav') |
|
items.append([text, wav_file, 'VCTK_' + speaker_id]) |
|
|
|
return items |
|
|
|
|
|
def vctk_slim(root_path, meta_files=None, wavs_path='wav48'): |
|
"""homepages.inf.ed.ac.uk/jyamagis/release/VCTK-Corpus.tar.gz""" |
|
items = [] |
|
txt_files = glob(f"{os.path.join(root_path,'txt')}/**/*.txt", recursive=True) |
|
for text_file in txt_files: |
|
_, speaker_id, txt_file = os.path.relpath(text_file, |
|
root_path).split(os.sep) |
|
file_id = txt_file.split('.')[0] |
|
if isinstance(meta_files, list): |
|
if speaker_id in meta_files: |
|
continue |
|
wav_file = os.path.join(root_path, wavs_path, speaker_id, |
|
file_id + '.wav') |
|
items.append([None, wav_file, 'VCTK_' + speaker_id]) |
|
|
|
return items |
|
|
|
|
|
def voxceleb2(root_path, meta_file=None): |
|
""" |
|
:param meta_file Used only for consistency with load_meta_data api |
|
""" |
|
return _voxcel_x(root_path, meta_file, voxcel_idx="2") |
|
|
|
|
|
def voxceleb1(root_path, meta_file=None): |
|
""" |
|
:param meta_file Used only for consistency with load_meta_data api |
|
""" |
|
return _voxcel_x(root_path, meta_file, voxcel_idx="1") |
|
|
|
|
|
def _voxcel_x(root_path, meta_file, voxcel_idx): |
|
assert voxcel_idx in ["1", "2"] |
|
expected_count = 148_000 if voxcel_idx == "1" else 1_000_000 |
|
voxceleb_path = Path(root_path) |
|
cache_to = voxceleb_path / f"metafile_voxceleb{voxcel_idx}.csv" |
|
cache_to.parent.mkdir(exist_ok=True) |
|
|
|
|
|
if meta_file is not None: |
|
with open(str(meta_file), 'r') as f: |
|
return [x.strip().split('|') for x in f.readlines()] |
|
|
|
elif not cache_to.exists(): |
|
cnt = 0 |
|
meta_data = [] |
|
wav_files = voxceleb_path.rglob("**/*.wav") |
|
for path in tqdm(wav_files, desc=f"Building VoxCeleb {voxcel_idx} Meta file ... this needs to be done only once.", |
|
total=expected_count): |
|
speaker_id = str(Path(path).parent.parent.stem) |
|
assert speaker_id.startswith('id') |
|
text = None |
|
meta_data.append(f"{text}|{path}|voxcel{voxcel_idx}_{speaker_id}\n") |
|
cnt += 1 |
|
with open(str(cache_to), 'w') as f: |
|
f.write("".join(meta_data)) |
|
if cnt < expected_count: |
|
raise ValueError(f"Found too few instances for Voxceleb. Should be around {expected_count}, is: {cnt}") |
|
|
|
with open(str(cache_to), 'r') as f: |
|
return [x.strip().split('|') for x in f.readlines()] |
|
|