|
--- |
|
library_name: transformers |
|
tags: |
|
- prosody |
|
- segmentation |
|
- audio |
|
- speech |
|
language: |
|
- sl |
|
base_model: |
|
- facebook/w2v-bert-2.0 |
|
--- |
|
|
|
# Wav2Vec2Bert Audio frame classifier for prosodic unit detection |
|
|
|
This model predicts prosodic units on speech. For each 20ms frame the model |
|
predicts 1 or 0, indicating whether there is a prosodic unit in this frame or |
|
not. |
|
|
|
This frame-level output can be grouped into events with the frames_to_intervals |
|
function provided in the code snippets below. |
|
|
|
It is known that the model is unreliable if the audio starts or ends within a |
|
prosodic unit. This can be somewhat circumvented by 1) using the largest |
|
possible chunks that will fit your machine and 2) use overlapping chunks and |
|
combining results smartly. |
|
|
|
|
|
|
|
|
|
## Model Details |
|
|
|
### Model Description |
|
|
|
|
|
|
|
- **Developed by:** Peter Rupnik, Nikola Ljubešić, Darinka Verdonik, Simona |
|
Majhenič |
|
- **Funded by:** MEZZANINE project |
|
- **Model type:** Wav2Vec2Bert for Audio Frame Classification |
|
- **Language(s) (NLP):** Trained and tested on Slovenian, ATM unclear if usable |
|
cross-lingually |
|
- **Finetuned from model:** facebook/w2v-bert-2.0 |
|
|
|
The model was trained on [ROG-Art dataset](http://hdl.handle.net/11356/1992), on |
|
train split only. |
|
|
|
### Model performance |
|
|
|
We evaluate the model indirectly, and only care about the positive class: |
|
|
|
1. first prosodic units (intervals with start and end times, e.g. `[0.123, |
|
5.546]`) are extracted from data and model outputs |
|
2. if a predicted prosodic unit has an overlapping counterpart in true prosodic |
|
units, we count it as a True Positive. If there is no overlapping true |
|
counterpart, we count it as a False Positive, and if we have a true prosodic |
|
unit without a counterpart in predictions, we count that as a False Negative. |
|
3. Based on the TP, FN, FP numbers recall, precision, and F1 score is |
|
calculated. |
|
|
|
In this fashion we obtain the following metrics: |
|
|
|
* Precision: 0.9423 |
|
* Recall: 0.7802 |
|
* F_1 score: 0.8538 |
|
|
|
![A gif illustrating correspondance between true and predicted prosodic |
|
units](output.gif) |
|
|
|
## Uses |
|
|
|
### Simple use (short files) |
|
|
|
For shorter audios that fit on your GPU the classifier can be used directly. |
|
```python |
|
import numpy as np |
|
|
|
from datasets import Audio, Dataset |
|
from transformers import AutoFeatureExtractor, Wav2Vec2BertForAudioFrameClassification |
|
import torch |
|
import numpy as np |
|
|
|
if torch.cuda.is_available(): |
|
device = torch.device("cuda") |
|
else: |
|
device = torch.device("cpu") |
|
|
|
model_name = "5roop/Wav2Vec2BertProsodicUnitsFrameClassifier" |
|
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name) |
|
model = Wav2Vec2BertForAudioFrameClassification.from_pretrained(model_name).to(device) |
|
f = "data/Rog-Art-N-G6007-P600702_181.070_211.070.wav" |
|
|
|
|
|
def frames_to_intervals(frames: list) -> list[tuple]: |
|
from itertools import pairwise |
|
import pandas as pd |
|
|
|
results = [] |
|
ndf = pd.DataFrame( |
|
data={ |
|
"time_s": [0.020 * i for i in range(len(frames))], |
|
"frames": frames, |
|
} |
|
) |
|
ndf = ndf.dropna() |
|
indices_of_change = ndf.frames.diff()[ndf.frames.diff() != 0].index.values |
|
for si, ei in pairwise(indices_of_change): |
|
if ndf.loc[si : ei - 1, "frames"].mode()[0] == 0: |
|
pass |
|
else: |
|
results.append( |
|
(round(ndf.loc[si, "time_s"], 3), round(ndf.loc[ei - 1, "time_s"], 3)) |
|
) |
|
return results |
|
|
|
|
|
def evaluator(chunks): |
|
sampling_rate = chunks["audio"][0]["sampling_rate"] |
|
with torch.no_grad(): |
|
inputs = feature_extractor( |
|
[i["array"] for i in chunks["audio"]], |
|
return_tensors="pt", |
|
sampling_rate=sampling_rate, |
|
).to(device) |
|
logits = model(**inputs).logits |
|
y_pred_raw = np.array(logits.cpu()) |
|
y_pred = y_pred_raw.argmax(axis=-1) |
|
prosodic_units = [frames_to_intervals(i) for i in y_pred] |
|
return { |
|
"y_pred": y_pred, |
|
"y_pred_logits": y_pred_raw, |
|
"prosodic_units": prosodic_units, |
|
} |
|
|
|
# Create a dataset with a single instance and map our evaluator function on it: |
|
ds = Dataset.from_dict({"audio": [f]}).cast_column("audio", Audio(16000, mono=True)) |
|
ds = ds.map(evaluator, batched=True, batch_size=1) # Adjust batch size according to your hardware specs |
|
print(ds["y_pred"][0]) |
|
# Outputs: [0, 0, 1, 1, 1, 1, 1, ...] |
|
print(ds["y_pred_logits"][0]) |
|
# Outputs: |
|
# [[ 0.89419061, -0.77746612], |
|
# [ 0.44213724, -0.34862748], |
|
# [-0.08605709, 0.13012762], |
|
# .... |
|
print(ds["prosodic_units"][0]) |
|
# Outputs: [[0.04, 2.4], [3.52, 6.6], .... |
|
``` |
|
|
|
|
|
### Inference on longer files |
|
If the file is too big for straight-forward inference, some chunking needs to be |
|
performed in order to process it. We know that for starts and ends of chunks the |
|
probability of false negatives increases, so it is best to process the file with |
|
some overlap between chunks or split it on silence. We illustrate the former |
|
approach here: |
|
```python |
|
import numpy as np |
|
|
|
from datasets import Audio, Dataset |
|
from transformers import AutoFeatureExtractor, Wav2Vec2BertForAudioFrameClassification |
|
import torch |
|
import numpy as np |
|
|
|
if torch.cuda.is_available(): |
|
device = torch.device("cuda") |
|
else: |
|
device = torch.device("cpu") |
|
|
|
model_name = "5roop/Wav2Vec2BertProsodicUnitsFrameClassifier" |
|
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name) |
|
model = Wav2Vec2BertForAudioFrameClassification.from_pretrained(model_name).to(device) |
|
f = "ROG/ROG-Art/WAV/Rog-Art-N-G5025-P600022.wav" |
|
|
|
OVERLAP_S = 10 |
|
CHUNK_LENGTH_S = 30 |
|
SAMPLING_RATE = 16_000 |
|
OVERLAP_SAMPLES = OVERLAP_S * SAMPLING_RATE |
|
CHUNK_LENGTH_SAMPLES = CHUNK_LENGTH_S * SAMPLING_RATE |
|
|
|
|
|
def frames_to_intervals(frames: list) -> list[tuple]: |
|
from itertools import pairwise |
|
import pandas as pd |
|
|
|
results = [] |
|
ndf = pd.DataFrame( |
|
data={ |
|
"time_s": [0.020 * i for i in range(len(frames))], |
|
"frames": frames, |
|
} |
|
) |
|
ndf = ndf.dropna() |
|
indices_of_change = ndf.frames.diff()[ndf.frames.diff() != 0].index.values |
|
for si, ei in pairwise(indices_of_change): |
|
if ndf.loc[si : ei - 1, "frames"].mode()[0] == 0: |
|
pass |
|
else: |
|
results.append( |
|
(round(ndf.loc[si, "time_s"], 3), round(ndf.loc[ei - 1, "time_s"], 3)) |
|
) |
|
return results |
|
|
|
|
|
def merge_events(events: list[list[float]], centroids): |
|
flattened_events = [] |
|
flattened_centroids = [] |
|
for batch_idx, batch in enumerate(events): |
|
for event in batch: |
|
flattened_events.append(event) |
|
flattened_centroids.append(centroids[batch_idx]) |
|
flattened_events.sort(key=lambda x: x[0]) |
|
|
|
# Merged list to store final intervals |
|
merged = [] |
|
|
|
for event, centroid in zip(flattened_events, flattened_centroids): |
|
if not merged: |
|
# If merged is empty, simply add the first event |
|
merged.append((event, centroid)) |
|
else: |
|
last_event, last_centroid = merged[-1] |
|
# Check for overlap |
|
if (last_event[0] < event[1]) and (last_event[1] > event[0]): |
|
# Calculate the midpoint of the intervals |
|
last_event_midpoint = (last_event[0] + last_event[1]) / 2 |
|
current_event_midpoint = (event[0] + event[1]) / 2 |
|
|
|
# Choose the event whose centroid is closer to its midpoint |
|
if abs(last_centroid - last_event_midpoint) <= abs( |
|
centroid - current_event_midpoint |
|
): |
|
continue |
|
else: |
|
merged[-1] = (event, centroid) |
|
else: |
|
merged.append((event, centroid)) |
|
|
|
final_intervals = [event for event, _ in merged] |
|
return final_intervals |
|
|
|
|
|
def evaluator(chunks): |
|
with torch.no_grad(): |
|
samples = [] |
|
for array, start, end in zip(chunks["audio"], chunks["start"], chunks["end"]): |
|
samples.append(array["array"][start:end]) |
|
inputs = feature_extractor( |
|
samples, |
|
return_tensors="pt", |
|
sampling_rate=SAMPLING_RATE, |
|
).to(device) |
|
logits = model(**inputs).logits |
|
y_pred_raw = np.array(logits.cpu()) |
|
y_pred = y_pred_raw.argmax(axis=-1) |
|
prosodic_units = [ |
|
np.array(frames_to_intervals(i)) + start / SAMPLING_RATE |
|
for i, start in zip(y_pred, chunks["start"]) |
|
] |
|
return { |
|
"y_pred": y_pred, |
|
"y_pred_logits": y_pred_raw, |
|
"prosodic_units": prosodic_units, |
|
} |
|
|
|
|
|
audio_duration_samples = ( |
|
Audio(SAMPLING_RATE, mono=True) |
|
.decode_example({"path": f, "bytes": None})["array"] |
|
.shape[0] |
|
) |
|
chunk_starts = np.arange( |
|
0, audio_duration_samples, CHUNK_LENGTH_SAMPLES - OVERLAP_SAMPLES |
|
) |
|
chunk_ends = chunk_starts + CHUNK_LENGTH_SAMPLES |
|
|
|
ds = Dataset.from_dict( |
|
{ |
|
"audio": [f for i in chunk_starts], |
|
"start": chunk_starts, |
|
"end": chunk_ends, |
|
"chunk_centroid_s": (chunk_starts + chunk_ends) / 2 / SAMPLING_RATE, |
|
} |
|
).cast_column("audio", Audio(SAMPLING_RATE, mono=True)) |
|
|
|
ds = ds.map(evaluator, batched=True, batch_size=10) |
|
|
|
|
|
final_intervals = merge_events(ds["prosodic_units"], ds["chunk_centroid_s"]) |
|
print(final_intervals) |
|
# Outputs: [[3.14, 4.96], [5.6, 8.4], [8.62, 9.32], [10.12, 10.7], [11.72, 13.1],.... |
|
``` |
|
|
|
## Training Details |
|
|
|
| hyperparameter | value | |
|
| --------------------------- | ----- | |
|
| learning rate | 3e-5 | |
|
| batch size | 1 | |
|
| gradient accumulation steps | 16 | |
|
| num train epochs | 20 | |
|
| weight decay | 0.01 | |
|
|
|
Software environment can be found in mamba/conda [environment export yml |
|
file](transformers_env.yml). To recreate the environment with conda/mamba, run |
|
`mamba create -f transformers_env.yml` (replace mamba with conda if you don't |
|
use mamba). |