import pandas as pd
import streamlit as st
import seaborn as sns
import matplotlib.pyplot as plt
import os
import requests
import numpy as np
from datasets import Dataset
from huggingface_hub import hf_hub_download
import matplotlib.patches as mpatches
import matplotlib as mpl
asr_systems_colors_mapping = {
'azure': '#1f77b4', # Blue
'google': '#2ca02c', # Green
'wav2vec2': '#d62728', # Red
'nemo': '#9467bd', # Purple
'assemblyai': '#8c564b', # Brown
'mms': '#e377c2', # Pink
'google_v2': '#7f7f7f', # Gray
'whisper_cloud': '#bcbd22', # Olive
'whisper_local': '#ff7f0e', # Orange
# Add or override other systems and their colors
def download_tsv_from_google_sheet(sheet_url):
# Modify the Google Sheet URL to export it as TSV
tsv_url = sheet_url.replace('/edit#gid=', '/export?format=tsv&gid=')
# Send a GET request to download the TSV file
response = requests.get(tsv_url)
response.encoding = 'utf-8'
# Check if the request was successful
if response.status_code == 200:
# Read the TSV content into a pandas DataFrame
from io import StringIO
tsv_content = StringIO(response.text)
df = pd.read_csv(tsv_content, sep='\t', encoding='utf-8')
return df
print("Failed to download the TSV file.")
return None
def generate_path_to_latest_tsv(dataset_name, split, type_of_result):
fn = os.path.join("./data", dataset_name, split, "eval_results-{}-latest.tsv".format(type_of_result))
def read_latest_results(dataset_name, split, codename_to_shortname_mapping):
# Set your Hugging Face API token as an environment variable
# Define the path to your dataset directory
repo_id = os.getenv('HF_SECRET_REPO_ID')
dataset = dataset_name
dataset_path = os.path.join("leaderboard_input", dataset, split)
fn_results_per_dataset = 'eval_results-per_dataset-latest.tsv'
fn_results_per_sample = 'eval_results-per_sample-latest.tsv'
fp_results_per_dataset_repo = os.path.join(dataset_path, fn_results_per_dataset)
fp_results_per_sample_repo = os.path.join(dataset_path, fn_results_per_sample)
# Download the file from the Hugging Face Hub
local_fp_per_dataset = hf_hub_download(repo_id=repo_id, repo_type='dataset', filename=fp_results_per_dataset_repo, use_auth_token=os.getenv('HF_TOKEN'))
local_fp_per_sample = hf_hub_download(repo_id=repo_id, repo_type='dataset', filename=fp_results_per_sample_repo, use_auth_token=os.getenv('HF_TOKEN'))
# Read the TSV file into a pandas DataFrame
df_per_dataset = pd.read_csv(local_fp_per_dataset, delimiter='\t')
df_per_sample = pd.read_csv(local_fp_per_sample, delimiter='\t')
# Print the DataFrame
#replace column system with Shortname
if (codename_to_shortname_mapping):
df_per_sample['system'] = df_per_sample['system'].replace(codename_to_shortname_mapping)
df_per_dataset['system'] = df_per_dataset['system'].replace(codename_to_shortname_mapping)
return df_per_sample, df_per_dataset
def retrieve_asr_systems_meta_from_the_catalog(asr_systems_list):
#print("Retrieving ASR systems metadata for systems: ", asr_systems_list)
#print("Number of systems: ", len(asr_systems_list))
#print("Reading ASR systems catalog")
asr_systems_cat_url = ""
#print("Reading the catalog from: ", asr_systems_cat_url)
catalog = download_tsv_from_google_sheet(asr_systems_cat_url)
#print("ASR systems catalog read")
#print("Catalog contains information about {} ASR systems".format(len(catalog)))
##print("Catalog columns: ", catalog.columns)
##print("ASR systems available in the catalog: ", catalog["Codename"] )
#print("Filter only the systems we are interested in")
catalog = catalog[(catalog["Codename"].isin(asr_systems_list)) | (catalog["Shortname"].isin(asr_systems_list))]
return catalog
def basic_stats_per_dimension(df_input, metric, dimension):
#Median value
df_median = df_input.groupby(dimension)[metric].median().sort_values().round(2)
#Average value
df_avg = df_input.groupby(dimension)[metric].mean().sort_values().round(2)
#Standard deviation
df_std = df_input.groupby(dimension)[metric].std().sort_values().round(2)
# Min
df_min = df_input.groupby(dimension)[metric].min().sort_values().round(2)
# Max
df_max = df_input.groupby(dimension)[metric].max().sort_values().round(2)
# concatanate all WER statistics
df_stats = pd.concat([df_median, df_avg, df_std, df_min, df_max], axis=1)
df_stats.columns = ["med_{}".format(metric), "avg_{}".format(metric), "std_{}".format(metric), "min_{}".format(metric), "max_{}".format(metric)]
# sort by median values
df_stats = df_stats.sort_values(by="med_{}".format(metric))
return df_stats
def ser_from_per_sample_results(df_per_sample, dimension):
# group by dimension e.g dataset or sample and calculate fraction of samples with WER equal to 0
df_ser = df_per_sample.groupby(dimension)["WER"].apply(lambda x: (x != 0).mean()*100).sort_values().round(2)
# change column names = "SER"
return df_ser
def get_total_audio_duration(df_per_sample):
# filter the df_per_sample dataframe to leave only unique audio recordings
df_per_sample_unique_audio = df_per_sample.drop_duplicates(subset='id')
# calculate the total size of the dataset in hours based on the list of unique audio recordings
total_duration_hours = df_per_sample_unique_audio['audio_duration'].sum() / 3600
#print(f"Total duration of the dataset: {total_duration_hours:.2f} hours")
return total_duration_hours
def extend_meta_per_sample_words_chars(df_per_sample):
# extend the results with the number of words in the reference and hypothesis
df_per_sample['ref_words'] = df_per_sample['ref'].apply(lambda x: len(x.split()))
df_per_sample['hyp_words'] = df_per_sample['hyp'].apply(lambda x: len(x.split()))
# extend the df_per_sample with the number of words per seconds (based on duration column) for reference and hypothesis
df_per_sample['ref_wps'] = df_per_sample['ref_words'] / df_per_sample['audio_duration'].round(2)
df_per_sample['hyp_wps'] = df_per_sample['hyp_words'] / df_per_sample['audio_duration'].round(2)
# extend the df_per_sample with the number of characters per seconds (based on duration column) for reference and hypothesis
df_per_sample['ref_cps'] = df_per_sample['ref'].apply(lambda x: len(x)) / df_per_sample['audio_duration'].round(2)
df_per_sample['hyp_cps'] = df_per_sample['hyp'].apply(lambda x: len(x)) / df_per_sample['audio_duration'].round(2)
# extend the df_per_sample with the number of characters per words for reference and hypothesis
df_per_sample['ref_cpw'] = df_per_sample['ref'].apply(lambda x: len(x)) / df_per_sample['ref_words'].round(2)
df_per_sample['hyp_cpw'] = df_per_sample['hyp'].apply(lambda x: len(x)) / df_per_sample['hyp_words'].round(2)
# extend metadata with number of words and characters
return df_per_sample
def filter_top_outliers(df_input, metric, max_threshold):
# filter out outliers exceeding max_threshold
df_filtered = df_input[df_input[metric] < max_threshold]
return df_filtered
def filter_bottom_outliers(df_input, metric, min_threshold):
# filter out outliers below min_threshold
df_filtered = df_input[df_input[metric] > min_threshold]
return df_filtered
def box_plot_per_dimension(df_input, metric, dimension, title, xlabel, ylabel):
# Box plot for WER per dataset
fig, ax = plt.subplots(figsize=(20, 10))
# generate box plot without outliers
sns.boxplot(x=dimension, y=metric, data=df_input, order=df_input.groupby(dimension)[metric].median().sort_values().index, showfliers=False)
#return figure
return plt
def box_plot_per_dimension_with_colors(df_input, metric, dimension, title, xlabel, ylabel, system_col, type_col):
# Create a figure and axis object
fig, ax = plt.subplots(figsize=(12, 8))
# Define the order of categories based on the median of the metric
order = df_input.groupby(dimension)[metric].median().sort_values().index.tolist()
# Create custom color mapping for systems
unique_systems = df_input[system_col].unique()
# Define your custom colors here
system_color_mapping = asr_systems_colors_mapping
# For systems not specified, assign colors from a palette
remaining_systems = [s for s in unique_systems if s not in system_color_mapping]
palette = sns.color_palette("tab10", len(remaining_systems))
system_color_mapping.update(dict(zip(remaining_systems, palette)))
# Create hatching patterns for types
unique_types = df_input[type_col].unique()
type_hatch_mapping = {
'free': '', # No hatching
'commercial': '///', # Diagonal hatching
# Add more patterns if needed
# For types not specified, assign default hatches
default_hatches = ['', '///', '\\\\', 'xx', '++', '--', '...']
for idx, t in enumerate(unique_types):
if t not in type_hatch_mapping:
type_hatch_mapping[t] = default_hatches[idx % len(default_hatches)]
# Map colors and hatches to each dimension based on system and type
dimension_system_mapping = df_input.drop_duplicates(subset=dimension).set_index(dimension)[system_col].reindex(order)
colors =
dimension_type_mapping = df_input.drop_duplicates(subset=dimension).set_index(dimension)[type_col].reindex(order)
hatches =
# Generate box plot without specifying hue
boxprops=dict(facecolor='white') # Set initial facecolor to white
# Access the box artists
box_patches = [patch for patch in ax.artists if isinstance(patch, mpatches.PathPatch)]
# Alternatively, you can use ax.patches if ax.artists doesn't work
if not box_patches:
box_patches = [patch for patch in ax.patches if isinstance(patch, mpatches.PathPatch)]
# Color the boxes and apply hatching patterns
for patch, color, hatch in zip(box_patches, colors, hatches):
# Create custom legend for systems (colors)
system_handles = []
for system in unique_systems:
color = system_color_mapping[system]
handle = mpatches.Patch(facecolor=color, edgecolor='black', label=system)
# Create custom legend for types (hatching patterns)
type_handles = []
for typ in unique_types:
hatch = type_hatch_mapping[typ]
handle = mpatches.Patch(facecolor='white', edgecolor='black', hatch=hatch, label=typ)
# Add legends to the plot
legend1 = ax.legend(handles=system_handles, title='System', bbox_to_anchor=(0.01, 1), loc='upper left')
legend2 = ax.legend(handles=type_handles, title='Type', bbox_to_anchor=(0.01, 0.6), loc='upper left')
ax.add_artist(legend1) # Add the first legend back to the plot
# improve readibility of the x-axis labels
# decrease the font size of x-axis labels
ax.tick_params(axis='x', labelsize=8)
# shift left to align the x-axis labels with the boxes
ax.set_xticklabels(ax.get_xticklabels(), ha='right')
# rotate them by 90 degrees
ax.set_xticklabels(ax.get_xticklabels(), rotation=55)
# add more granularity to the y-axis. Make sure the y-axis contains 20 ticks
# Return the figure object
return fig
def check_impact_of_normalization(data_in, ref_type='orig'):
# Filter the data to include only the specific reference type
data_ref_type = data_in[data_in['ref_type'] == ref_type]
data = data_ref_type.drop(columns=['system','subset', 'ref_type'])
# Calculate the average impact of each normalization type on the metrics
average_impact = data.groupby('norm_type').mean()
baseline_metrics = average_impact.loc['none']
# Calculate the difference in metrics compared to the baseline
difference_metrics = average_impact.subtract(baseline_metrics)
# Removing the baseline row for clarity
difference_metrics = difference_metrics.drop(index='none')
# Rounding the results to 2 decimal places
difference_metrics_rounded = difference_metrics.round(2)
# add column with average impact on error reduction for all metric types
difference_metrics_rounded['Average'] = difference_metrics_rounded.mean(axis=1).round(2)
# Sorting the results based on the average impact on error reduction. The lower the absolute value, the higher the impact
difference_metrics_sorted_abs = difference_metrics_rounded.sort_values(by='Average', key=abs)
# Display the resulting differences
def calculate_wer_per_meta_category(df_per_sample, selected_systems, metric, analysis_dimension = 'speaker_gender'):
# filter out from df_per_sample rows where analysis_dimension is null
df_per_sample_dimension = df_per_sample[df_per_sample[analysis_dimension].notnull()]
meta_values = df_per_sample_dimension[analysis_dimension].unique()
if (analysis_dimension == 'speaker_age'):
# sort values in the meta_values list, so the order of the values is consistent, starting from teens, twenties, thirties, fourties, fifties, sixties, seventies, eighties, nineties
# Example usage:
sorted_values = sort_age_categories(meta_values)
print("meta values sorted:", sorted_values)
meta_values = sorted_values
# calculate number of available systems for specific category
# create table with number of samples in df_per_sample_single_system for each meta category from meta_values
df_per_sample_single_system = df_per_sample_dimension[df_per_sample['system'] == selected_systems[0]]
# select the value with the smallest number of available samples for all systems
min_samples = 0
df_available_samples_per_category_per_system = {}
for system in selected_systems:
df_per_sample_single_system = df_per_sample_dimension[df_per_sample['system'] == system]
df_available_samples_per_category_per_system[system] = df_per_sample_single_system.groupby(analysis_dimension)[metric].count().reset_index()
df_available_samples_per_category_per_system[system] = df_available_samples_per_category_per_system[system].rename(columns={metric: 'available_samples'})
# replace index with values from analysis_dimension
df_available_samples_per_category_per_system[system] = df_available_samples_per_category_per_system[system].set_index(analysis_dimension)
min_samples_system = df_available_samples_per_category_per_system[system]['available_samples'].min()
if (min_samples_system < min_samples) or (min_samples == 0):
min_samples = min_samples_system
# get the subset of the df_per_sample_dimension with results for all systems to analyze
df_per_sample_selected_systems = df_per_sample_dimension[df_per_sample['system'].isin(selected_systems)]
# select equal number of samples for each system and analysis_dimension equal to the number of samples for the dimension with the smallest number of samples (min_samples)
df_per_sample_selected_systems = df_per_sample_selected_systems.groupby(['system',analysis_dimension]).apply(lambda x: x.sample(min_samples)).reset_index(drop=True)
df_per_sample_metric_dimension = df_per_sample_selected_systems.groupby(['system', analysis_dimension])[metric].mean().round(2).reset_index()
df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension.pivot(index=analysis_dimension, columns='system', values=metric)
df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.round(2)
# add row with the difference between the male and female metric values for values. Add "Difference" row at the end of the dataframe to the index
# calculate the difference between the smallest and largest metric values
# if there are only two values in the analysis_dimension, calculate the difference between them
if len(meta_values) == 2:
gap_metrics = ['Difference']
df_per_sample_metric_dimension_pivot.loc[gap_metrics[0]] = df_per_sample_metric_dimension_pivot.loc[meta_values[0]] - df_per_sample_metric_dimension_pivot.loc[meta_values[1]]
# if there are more than two values in the analysis_dimension, calculate the difference between the smallest and the largest value
elif len(meta_values) > 2:
gap_metrics = ['Std Dev', 'MAD', 'Range']
metrics = pd.DataFrame([])
df = df_per_sample_metric_dimension_pivot
# calculate the standard deviation of the metric values
metrics[gap_metrics[0]] = df.std()
# calculate the mean absolute deviation of the metric values
metrics[gap_metrics[1]] = df.apply(lambda x: np.mean(np.abs(x - np.mean(x))), axis=0)
# calculate the difference between the smallest and largest metric values
metrics[gap_metrics[2]] = df.max() - df.min()
metrics_t = metrics.round(2).transpose()
#concatante the metrics dataframe to the df_per_sample_metric_dimension_pivot
df_per_sample_metric_dimension_pivot = pd.concat([df_per_sample_metric_dimension_pivot, metrics_t], axis=0)
# transpose the dataframe to have systems as rows
# sort by the average difference from the smallest to the largest value
df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.transpose()
df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.sort_values(by=gap_metrics[0], axis=0)
# add average, median and standard deviation as the last 3 rows to the dataframe
# calculate average, median, and standard deviation of the difference between the smallest and largest metric values
avg_difference = df_per_sample_metric_dimension_pivot.mean().round(2)
median_difference = df_per_sample_metric_dimension_pivot.median().round(2)
std_difference = df_per_sample_metric_dimension_pivot.std().round(2)
# add average, median, and standard deviation as the last 3 rows to the dataframe
df_per_sample_metric_dimension_pivot.loc['median'] = median_difference
df_per_sample_metric_dimension_pivot.loc['average'] = avg_difference
df_per_sample_metric_dimension_pivot.loc['std'] = std_difference
analyzed_samples_per_category = min_samples
# round all values to 2 decimal places
df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.round(2)
# keep the order of columns as in the meta_values list
columns = list(meta_values) + gap_metrics
df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot[columns]
return df_per_sample_metric_dimension_pivot, df_available_samples_per_category_per_system, analyzed_samples_per_category
def sort_age_categories(meta_values):
order = ["teens", "twenties", "thirties", "fourties", "fifties", "sixties", "seventies", "eighties", "nineties"]
order_dict = {age: index for index, age in enumerate(order)}
sorted_values = sorted(meta_values, key=lambda x: order_dict.get(x, float('inf')))
return sorted_values
def calculate_wer_per_audio_feature(df_per_sample, selected_systems, audio_feature_to_analyze, metric, no_of_buckets):
# filter out results for selected systems
feature_values_uniq = df_per_sample[audio_feature_to_analyze].unique()
df_per_sample_selected_systems = df_per_sample[df_per_sample['system'].isin(selected_systems)]
# create buckets based on speech rate words unique values (min, max,step)
min_feature_value = round(min(feature_values_uniq), 1)
max_feature_value = round(max(feature_values_uniq), 1)
step = max_feature_value / no_of_buckets
audio_feature_buckets = [min_feature_value + i * step for i in range(no_of_buckets)]
# add column with speech_rate_words rounded to nearest bucket value.
# map audio duration to the closest bucket
df_per_sample[audio_feature_to_analyze + '_bucket'] = df_per_sample[audio_feature_to_analyze].apply(
lambda x: min(audio_feature_buckets, key=lambda y: abs(x - y)))
# calculate average WER per audio duration bucket
df_per_sample_wer_feature = df_per_sample_selected_systems.groupby(['system', audio_feature_to_analyze])[metric].mean().reset_index()
# add column with number of samples for specific audio bucket size
df_per_sample_wer_feature['number_of_samples'] = df_per_sample_selected_systems.groupby(['system', audio_feature_to_analyze])[metric].count().values
df_per_sample_wer_feature = df_per_sample_wer_feature.sort_values(by=audio_feature_to_analyze)
# round values in WER column in df_per_sample_wer to 2 decimal places
# transform df_per_sample_wer. Use system values as columns, while audio_duration_buckets as main index
df_per_sample_wer_feature_pivot = df_per_sample_wer_feature.pivot(index=audio_feature_to_analyze, columns='system', values=metric)
df_per_sample_wer_feature_pivot = df_per_sample_wer_feature_pivot.round(2)
df_per_sample_wer_feature_pivot['number_of_samples'] = df_per_sample_wer_feature[
df_per_sample_wer_feature['system'] == selected_systems[0]].groupby(audio_feature_to_analyze)[
# put number_of_samples as the first column after index
df_per_sample_wer_feature_pivot = df_per_sample_wer_feature_pivot[
['number_of_samples'] + [col for col in df_per_sample_wer_feature_pivot.columns if col != 'number_of_samples']]
return df_per_sample_wer_feature_pivot, df_per_sample_wer_feature