Spaces:
Runtime error
Runtime error
"""Main loading function. | |
Copyright PolyAI Limited. | |
""" | |
import json | |
import os | |
import random | |
from pathlib import Path | |
import numpy as np | |
import soundfile as sf | |
import torch | |
from librosa.util import normalize | |
from pyannote.audio import Inference | |
from torch.utils import data | |
import constants as c | |
def random_crop(x, maxseqlen): | |
if x.shape[0] >= maxseqlen: | |
offset = random.randrange(x.shape[0] - maxseqlen + 1) | |
x = x[offset: offset + maxseqlen] | |
else: | |
offset = 0 | |
return x, offset | |
def dynamic_range_compression(x, C=0.3, M=6.5, clip_val=1e-5): | |
return (np.log(np.clip(x, a_min=clip_val, a_max=None)) + M) * C | |
def dynamic_range_decompression(x, C=0.3, M=6.5): | |
return np.exp(x / C - M) | |
class QuantizeDataset(data.Dataset): | |
def __init__(self, hp, metapath, datadir=None, speaker_embedding_dir=None): | |
self.hp = hp | |
self.datadir = Path(datadir) | |
self.speaker_embedding_dir = speaker_embedding_dir | |
self.sem_mask_id = hp.n_semantic_codes | |
print(f"Loading metadata in {metapath}...") | |
with open(metapath, "r") as f: | |
self.text = json.load(f) | |
if 0 < self.hp.max_dataset_samples < len(self.text): | |
self.new_text = {} | |
num = 0 | |
for k, v in self.text.items(): | |
if num >= self.hp.max_dataset_samples: | |
break | |
self.new_text[k] = v | |
num += 1 | |
self.text = self.new_text | |
self.datasetbase = [x for x in self.text.keys()] | |
self.dataset = [ | |
os.path.join(self.datadir, x) for x in self.datasetbase] | |
if self.speaker_embedding_dir is None: | |
self.spkr_embedding = Inference( | |
"pyannote/embedding", | |
window="whole", | |
use_auth_token=os.environ["HUGGING_FACE_HUB_TOKEN"], | |
) | |
# Print statistics: | |
n = len(self.dataset) | |
print(f"Total {n} examples") | |
self.lengths = [float(v["duration"]) for v in self.text.values()] | |
total_duration = sum(self.lengths) | |
avglen = total_duration / len(self.lengths) | |
maxlen = max(self.lengths) | |
minlen = min(self.lengths) | |
print( | |
f"Average duration of audio: {avglen} sec, " | |
"Maximum duration: {maxlen} sec, Minimum duration: {minlen} sec" | |
) | |
def __len__(self): | |
return len(self.dataset) | |
def load_quantization(self, _name): | |
if self.hp.vocoder_type == 'NATIVE': | |
metadata = self.text[_name] | |
quantization = np.array(metadata["quantization"]).T # ..., 4 | |
elif self.hp.vocoder_type == 'DAC': | |
codes_path = self.datadir.parent / 'audios-dac' / (os.path.splitext(_name)[0] + ".npy") # noqa | |
quantization = np.load(codes_path).T # ..., 12 | |
elif self.hp.vocoder_type == 'ENCODEC': | |
codes_path = self.datadir.parent / 'audios-encodec' / (os.path.splitext(_name)[0] + ".npy") # noqa | |
quantization = np.load(codes_path).squeeze(0).T # ..., 8 | |
elif self.hp.vocoder_type == 'SPEECHTOKENIZER': | |
codes_path = self.datadir.parent / 'audios-speech-tokenizer/acoustic' / (os.path.splitext(_name)[0] + ".npy") # noqa | |
quantization = np.load(codes_path).T # ..., 7 | |
else: | |
raise ValueError(f"Unknown vocoder_type {self.hp.vocoder_type}") | |
return quantization | |
def __getitem__(self, i): | |
dataname = self.dataset[i] | |
_name = self.datasetbase[i] | |
metadata = self.text[_name] | |
# Speaker 1 | |
acoustic_tokens = self.load_quantization(_name) | |
acoustic_tokens = np.pad( | |
acoustic_tokens, [[1, 0],[0,0]], constant_values=c.SPKR_1) | |
npy_path = self.datadir.parent / 'audios-speech-tokenizer/semantic' / (os.path.splitext(_name)[0] + ".npy") # noqa | |
semantic_tokens = np.load(npy_path)[None] | |
semantic_tokens = np.pad( | |
semantic_tokens,[[0,0], [1, 0]], constant_values=c.SPKR_1) | |
if "name_2" in metadata: | |
wav, _ = sf.read(dataname.split(".")[0] + "_1.wav") | |
else: | |
wav, _ = sf.read(dataname) | |
audio = normalize(wav) * 0.95 | |
speaker_embedding = self.spkr_embedding( | |
{"waveform": torch.FloatTensor(audio).unsqueeze(0), | |
"sample_rate": self.hp.sample_rate,} | |
).reshape(1, -1) | |
speaker_embedding = np.repeat( | |
speaker_embedding, semantic_tokens.shape[1], axis=0) | |
# Speaker 2 | |
if "text_2" in metadata: | |
_name = _name.split(".wav")[0] + "_2.wav" | |
acoustic_tokens_2 = self.load_quantization(_name) | |
acoustic_tokens_2 = np.pad( | |
acoustic_tokens_2, [[1, 0],[0,0]], constant_values=c.SPKR_2) | |
npy_path = self.datadir.parent / 'audios-speech-tokenizer/semantic' / (os.path.splitext(_name)[0] + ".npy") # noqa | |
semantic_tokens_2 = np.load(npy_path)[None] | |
semantic_tokens_2 = np.pad( | |
semantic_tokens_2,[[0,0], [1, 0]], constant_values=c.SPKR_2) | |
wav, _ = sf.read(dataname.split(".wav")[0] + "_2.wav") | |
audio = normalize(wav) * 0.95 | |
speaker_embedding_2 = self.spkr_embedding( | |
{"waveform": torch.FloatTensor(audio).unsqueeze(0), | |
"sample_rate": self.hp.sample_rate,} | |
).reshape(1, -1) | |
speaker_embedding_2 = np.repeat( | |
speaker_embedding_2, semantic_tokens_2.shape[1], axis=0) | |
# Merge both speakers | |
acoustic_tokens = np.concatenate( | |
(acoustic_tokens, acoustic_tokens_2), axis=0) | |
semantic_tokens = np.concatenate( | |
(semantic_tokens, semantic_tokens_2), axis=1) | |
speaker_embedding = np.concatenate( | |
(speaker_embedding, speaker_embedding_2), axis=0) | |
speaker_embedding = speaker_embedding[:self.hp.max_length, :] | |
acoustic_tokens = acoustic_tokens[:self.hp.max_length, :] | |
semantic_tokens = semantic_tokens[:, :self.hp.max_length] | |
# # HACK - we have no 8 lvls pfb30 | |
# acoustic_tokens = np.concatenate((semantic_tokens.T, acoustic_tokens), axis=1) | |
# # END HACK | |
return speaker_embedding, acoustic_tokens, acoustic_tokens, dataname, semantic_tokens # noqa | |