|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
Feature extractor class for MERaLiON-SpeechEncoder, modified from original WhisperFeatureExtractor
|
|
"""
|
|
|
|
from typing import List, Optional, Union
|
|
|
|
import numpy as np
|
|
|
|
from transformers import is_torch_available
|
|
from transformers.audio_utils import mel_filter_bank, spectrogram, window_function
|
|
from transformers.feature_extraction_sequence_utils import SequenceFeatureExtractor
|
|
from transformers.feature_extraction_utils import BatchFeature
|
|
from transformers.utils import TensorType, logging
|
|
|
|
|
|
if is_torch_available():
|
|
import torch
|
|
|
|
logger = logging.get_logger(__name__)
|
|
|
|
|
|
class ModifiedWhisperFeatureExtractor(SequenceFeatureExtractor):
|
|
r"""
|
|
Constructs a modified Whisper feature extractor.
|
|
|
|
This feature extractor inherits from [`~feature_extraction_sequence_utils.SequenceFeatureExtractor`] which contains
|
|
most of the main methods. Users should refer to this superclass for more information regarding those methods.
|
|
|
|
This class extracts mel-filter bank features from raw speech using a custom numpy implementation of the `Short Time
|
|
Fourier Transform` which should match pytorch's `torch.stft` equivalent.
|
|
|
|
Differences from WhisperFeatureExtractor:
|
|
- mel_filter_bank
|
|
- norm: "slaney" -> None
|
|
- mel_scale: "slaney" -> "htk"
|
|
- still uses log scaling and clamp but removes additional min-max/mean normalization
|
|
|
|
Args:
|
|
feature_size (`int`, *optional*, defaults to 80):
|
|
The feature dimension of the extracted features.
|
|
sampling_rate (`int`, *optional*, defaults to 16000):
|
|
The sampling rate at which the audio files should be digitalized expressed in hertz (Hz).
|
|
hop_length (`int`, *optional*, defaults to 160):
|
|
Length of the overlapping windows for the STFT used to obtain the Mel Frequency coefficients.
|
|
chunk_length (`int`, *optional*, defaults to 30):
|
|
The maximum number of chunks of `sampling_rate` samples used to trim and pad longer or shorter audio
|
|
sequences.
|
|
n_fft (`int`, *optional*, defaults to 400):
|
|
Size of the Fourier transform.
|
|
padding_value (`float`, *optional*, defaults to 0.0):
|
|
Padding value used to pad the audio. Should correspond to silences.
|
|
"""
|
|
|
|
model_input_names = ["input_values"]
|
|
|
|
def __init__(
|
|
self,
|
|
feature_size=80,
|
|
sampling_rate=16000,
|
|
hop_length=160,
|
|
chunk_length=30,
|
|
n_fft=400,
|
|
padding_value=0.0,
|
|
return_attention_mask=False,
|
|
**kwargs,
|
|
):
|
|
super().__init__(
|
|
feature_size=feature_size,
|
|
sampling_rate=sampling_rate,
|
|
padding_value=padding_value,
|
|
return_attention_mask=return_attention_mask,
|
|
**kwargs,
|
|
)
|
|
self.n_fft = n_fft
|
|
self.hop_length = hop_length
|
|
self.chunk_length = chunk_length
|
|
self.n_samples = chunk_length * sampling_rate
|
|
self.nb_max_frames = self.n_samples // hop_length
|
|
self.sampling_rate = sampling_rate
|
|
self.mel_filters = mel_filter_bank(
|
|
num_frequency_bins=1 + n_fft // 2,
|
|
num_mel_filters=feature_size,
|
|
min_frequency=0.0,
|
|
max_frequency=8000.0,
|
|
sampling_rate=sampling_rate,
|
|
norm=None,
|
|
mel_scale="htk",
|
|
)
|
|
|
|
def _np_extract_fbank_features(self, waveform_batch: np.array, device: str) -> np.ndarray:
|
|
"""
|
|
Compute the log-mel spectrogram of the provided audio, gives similar results to Whisper's original torch
|
|
implementation with 1e-5 tolerance.
|
|
"""
|
|
if device != "cpu":
|
|
raise ValueError(
|
|
f"Got device `{device}` for feature extraction, but feature extraction on CUDA accelerator "
|
|
"devices requires torch, which is not installed. Either set `device='cpu'`, or "
|
|
"install torch according to the official instructions: https://pytorch.org/get-started/locally/"
|
|
)
|
|
log_spec_batch = []
|
|
for waveform in waveform_batch:
|
|
log_spec = spectrogram(
|
|
waveform,
|
|
window_function(self.n_fft, "hann"),
|
|
frame_length=self.n_fft,
|
|
hop_length=self.hop_length,
|
|
power=2.0,
|
|
mel_filters=self.mel_filters,
|
|
log_mel="log10",
|
|
)
|
|
log_spec = log_spec[:, :-1]
|
|
|
|
log_spec_batch.append(log_spec)
|
|
log_spec_batch = np.array(log_spec_batch)
|
|
return log_spec_batch
|
|
|
|
def _torch_extract_fbank_features(self, waveform: np.array, device: str = "cpu") -> np.ndarray:
|
|
"""
|
|
Compute the log-mel spectrogram of the audio using PyTorch's GPU-accelerated STFT implementation with batching,
|
|
yielding results similar to cpu computing with 1e-5 tolerance.
|
|
"""
|
|
waveform = torch.from_numpy(waveform).type(torch.float32)
|
|
|
|
window = torch.hann_window(self.n_fft)
|
|
if device != "cpu":
|
|
waveform = waveform.to(device)
|
|
window = window.to(device)
|
|
stft = torch.stft(waveform, self.n_fft, self.hop_length, window=window, return_complex=True)
|
|
magnitudes = stft[..., :-1].abs() ** 2
|
|
|
|
mel_filters = torch.from_numpy(self.mel_filters).type(torch.float32)
|
|
if device != "cpu":
|
|
mel_filters = mel_filters.to(device)
|
|
mel_spec = mel_filters.T @ magnitudes
|
|
|
|
log_spec = torch.clamp(mel_spec, min=1e-10).log10()
|
|
|
|
if device != "cpu":
|
|
log_spec = log_spec.detach().cpu()
|
|
return log_spec.numpy()
|
|
|
|
@staticmethod
|
|
|
|
def zero_mean_unit_var_norm(
|
|
input_values: List[np.ndarray], attention_mask: List[np.ndarray], padding_value: float = 0.0
|
|
) -> List[np.ndarray]:
|
|
"""
|
|
Every array in the list is normalized to have zero mean and unit variance
|
|
"""
|
|
if attention_mask is not None:
|
|
attention_mask = np.array(attention_mask, np.int32)
|
|
normed_input_values = []
|
|
|
|
for vector, length in zip(input_values, attention_mask.sum(-1)):
|
|
normed_slice = (vector - vector[:length].mean()) / np.sqrt(vector[:length].var() + 1e-7)
|
|
if length < normed_slice.shape[0]:
|
|
normed_slice[length:] = padding_value
|
|
|
|
normed_input_values.append(normed_slice)
|
|
else:
|
|
normed_input_values = [(x - x.mean()) / np.sqrt(x.var() + 1e-7) for x in input_values]
|
|
|
|
return normed_input_values
|
|
|
|
def __call__(
|
|
self,
|
|
raw_speech: Union[np.ndarray, List[float], List[np.ndarray], List[List[float]]],
|
|
truncation: bool = True,
|
|
pad_to_multiple_of: Optional[int] = None,
|
|
return_tensors: Optional[Union[str, TensorType]] = None,
|
|
return_attention_mask: Optional[bool] = None,
|
|
padding: Optional[str] = "max_length",
|
|
max_length: Optional[int] = None,
|
|
sampling_rate: Optional[int] = None,
|
|
do_normalize: Optional[bool] = None,
|
|
device: Optional[str] = "cpu",
|
|
return_token_timestamps: Optional[bool] = None,
|
|
**kwargs,
|
|
) -> BatchFeature:
|
|
"""
|
|
Main method to featurize and prepare for the model one or several sequence(s). Implementation uses PyTorch for
|
|
the STFT computation if available, otherwise a slower NumPy based one.
|
|
|
|
Args:
|
|
raw_speech (`np.ndarray`, `List[float]`, `List[np.ndarray]`, `List[List[float]]`):
|
|
The sequence or batch of sequences to be padded. Each sequence can be a numpy array, a list of float
|
|
values, a list of numpy arrays or a list of list of float values. Must be mono channel audio, not
|
|
stereo, i.e. single float per timestep.
|
|
truncation (`bool`, *optional*, default to `True`):
|
|
Activates truncation to cut input sequences longer than *max_length* to *max_length*.
|
|
pad_to_multiple_of (`int`, *optional*, defaults to None):
|
|
If set will pad the sequence to a multiple of the provided value.
|
|
|
|
This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability
|
|
`>= 7.5` (Volta), or on TPUs which benefit from having sequence lengths be a multiple of 128.
|
|
return_attention_mask (`bool`, *optional*):
|
|
Whether to return the attention mask. If left to the default, will return the attention mask according
|
|
to the specific feature_extractor's default.
|
|
|
|
[What are attention masks?](../glossary#attention-mask)
|
|
|
|
<Tip>
|
|
|
|
For Whisper models, `attention_mask` should always be passed for batched inference, to avoid subtle
|
|
bugs.
|
|
|
|
</Tip>
|
|
|
|
return_tensors (`str` or [`~utils.TensorType`], *optional*):
|
|
If set, will return tensors instead of list of python integers. Acceptable values are:
|
|
|
|
- `'tf'`: Return TensorFlow `tf.constant` objects.
|
|
- `'pt'`: Return PyTorch `torch.Tensor` objects.
|
|
- `'np'`: Return Numpy `np.ndarray` objects.
|
|
sampling_rate (`int`, *optional*):
|
|
The sampling rate at which the `raw_speech` input was sampled. It is strongly recommended to pass
|
|
`sampling_rate` at the forward call to prevent silent errors and allow automatic speech recognition
|
|
pipeline.
|
|
padding_value (`float`, *optional*, defaults to 0.0):
|
|
The value that is used to fill the padding values / vectors.
|
|
do_normalize (`bool`, *optional*, defaults to `False`):
|
|
Whether or not to zero-mean unit-variance normalize the input. Normalizing can help to significantly
|
|
improve the performance of the model.
|
|
device (`str`, *optional*, defaults to `'cpu'`):
|
|
Specifies the device for computation of the log-mel spectrogram of audio signals in the
|
|
`_torch_extract_fbank_features` method. (e.g., "cpu", "cuda")
|
|
return_token_timestamps (`bool`, *optional*, defaults to `None`):
|
|
Whether or not to return the number of frames of the input raw_speech.
|
|
These num_frames can be used by the model to compute word level timestamps.
|
|
"""
|
|
|
|
if sampling_rate is not None:
|
|
if sampling_rate != self.sampling_rate:
|
|
raise ValueError(
|
|
f"The model corresponding to this feature extractor: {self.__class__.__name__} was trained using a"
|
|
f" sampling rate of {self.sampling_rate}. Please make sure that the provided `raw_speech` input"
|
|
f" was sampled with {self.sampling_rate} and not {sampling_rate}."
|
|
)
|
|
else:
|
|
logger.warning(
|
|
"It is strongly recommended to pass the `sampling_rate` argument to this function. "
|
|
"Failing to do so can result in silent errors that might be hard to debug."
|
|
)
|
|
|
|
is_batched_numpy = isinstance(raw_speech, np.ndarray) and len(raw_speech.shape) > 1
|
|
if is_batched_numpy and len(raw_speech.shape) > 2:
|
|
raise ValueError(f"Only mono-channel audio is supported for input to {self}")
|
|
is_batched = is_batched_numpy or (
|
|
isinstance(raw_speech, (list, tuple)) and (isinstance(raw_speech[0], (np.ndarray, tuple, list)))
|
|
)
|
|
|
|
if is_batched:
|
|
raw_speech = [np.asarray([speech], dtype=np.float32).T for speech in raw_speech]
|
|
elif not is_batched and not isinstance(raw_speech, np.ndarray):
|
|
raw_speech = np.asarray(raw_speech, dtype=np.float32)
|
|
elif isinstance(raw_speech, np.ndarray) and raw_speech.dtype is np.dtype(np.float64):
|
|
raw_speech = raw_speech.astype(np.float32)
|
|
|
|
|
|
if not is_batched:
|
|
raw_speech = [np.asarray([raw_speech]).T]
|
|
|
|
batched_speech = BatchFeature({"input_values": raw_speech})
|
|
|
|
|
|
|
|
padded_inputs = self.pad(
|
|
batched_speech,
|
|
padding=padding,
|
|
max_length=max_length if max_length else self.n_samples,
|
|
truncation=truncation,
|
|
pad_to_multiple_of=pad_to_multiple_of,
|
|
return_attention_mask=return_attention_mask or do_normalize,
|
|
)
|
|
|
|
|
|
if do_normalize:
|
|
padded_inputs["input_values"] = self.zero_mean_unit_var_norm(
|
|
padded_inputs["input_values"],
|
|
attention_mask=padded_inputs["attention_mask"],
|
|
padding_value=self.padding_value,
|
|
)
|
|
padded_inputs["input_values"] = np.stack(padded_inputs["input_values"], axis=0)
|
|
|
|
|
|
input_values = padded_inputs.get("input_values").transpose(2, 0, 1)
|
|
|
|
extract_fbank_features = (
|
|
self._torch_extract_fbank_features if is_torch_available() else self._np_extract_fbank_features
|
|
)
|
|
input_values = extract_fbank_features(input_values[0], device)
|
|
|
|
if isinstance(input_values[0], List):
|
|
padded_inputs["input_values"] = [np.asarray(feature, dtype=np.float32) for feature in input_values]
|
|
|
|
else:
|
|
padded_inputs["input_values"] = input_values
|
|
|
|
if return_attention_mask:
|
|
|
|
padded_inputs["attention_mask"] = padded_inputs["attention_mask"][:, :: self.hop_length]
|
|
|
|
if return_token_timestamps is not None:
|
|
padded_inputs["num_frames"] = [len(raw_speech_i) // self.hop_length for raw_speech_i in raw_speech]
|
|
|
|
if return_tensors is not None:
|
|
padded_inputs = padded_inputs.convert_to_tensors(return_tensors)
|
|
|
|
return padded_inputs
|
|
|