import pandas as pd import streamlit as st import seaborn as sns import matplotlib.pyplot as plt import os import requests import numpy as np from datasets import Dataset from huggingface_hub import hf_hub_download import matplotlib.patches as mpatches import matplotlib as mpl asr_systems_colors_mapping = { 'azure': '#1f77b4', # Blue 'google': '#2ca02c', # Green 'wav2vec2': '#d62728', # Red 'nemo': '#9467bd', # Purple 'assemblyai': '#8c564b', # Brown 'mms': '#e377c2', # Pink 'google_v2': '#7f7f7f', # Gray 'whisper_cloud': '#bcbd22', # Olive 'whisper_local': '#ff7f0e', # Orange # Add or override other systems and their colors } def download_tsv_from_google_sheet(sheet_url): # Modify the Google Sheet URL to export it as TSV tsv_url = sheet_url.replace('/edit#gid=', '/export?format=tsv&gid=') # Send a GET request to download the TSV file response = requests.get(tsv_url) response.encoding = 'utf-8' # Check if the request was successful if response.status_code == 200: # Read the TSV content into a pandas DataFrame from io import StringIO tsv_content = StringIO(response.text) df = pd.read_csv(tsv_content, sep='\t', encoding='utf-8') return df else: print("Failed to download the TSV file.") return None def generate_path_to_latest_tsv(dataset_name, split, type_of_result): fn = os.path.join("./data", dataset_name, split, "eval_results-{}-latest.tsv".format(type_of_result)) #print(fn) return(fn) @st.cache_data def read_latest_results(dataset_name, split, codename_to_shortname_mapping): # Set your Hugging Face API token as an environment variable # Define the path to your dataset directory repo_id = os.getenv('HF_SECRET_REPO_ID') #"michaljunczyk/bigos-eval-results-secret" dataset = dataset_name dataset_path = os.path.join("leaderboard_input", dataset, split) print(dataset_path) fn_results_per_dataset = 'eval_results-per_dataset-latest.tsv' fn_results_per_sample = 'eval_results-per_sample-latest.tsv' fp_results_per_dataset_repo = os.path.join(dataset_path, fn_results_per_dataset) print(fp_results_per_dataset_repo) fp_results_per_sample_repo = os.path.join(dataset_path, fn_results_per_sample) # Download the file from the Hugging Face Hub local_fp_per_dataset = hf_hub_download(repo_id=repo_id, repo_type='dataset', filename=fp_results_per_dataset_repo, use_auth_token=os.getenv('HF_TOKEN')) local_fp_per_sample = hf_hub_download(repo_id=repo_id, repo_type='dataset', filename=fp_results_per_sample_repo, use_auth_token=os.getenv('HF_TOKEN')) # Read the TSV file into a pandas DataFrame df_per_dataset = pd.read_csv(local_fp_per_dataset, delimiter='\t') df_per_sample = pd.read_csv(local_fp_per_sample, delimiter='\t') # Print the DataFrame print(df_per_dataset) print(df_per_sample) #replace column system with Shortname if (codename_to_shortname_mapping): df_per_sample['system'] = df_per_sample['system'].replace(codename_to_shortname_mapping) df_per_dataset['system'] = df_per_dataset['system'].replace(codename_to_shortname_mapping) return df_per_sample, df_per_dataset @st.cache_data def retrieve_asr_systems_meta_from_the_catalog(asr_systems_list): #print("Retrieving ASR systems metadata for systems: ", asr_systems_list) #print("Number of systems: ", len(asr_systems_list)) #print("Reading ASR systems catalog") asr_systems_cat_url = "https://docs.google.com/spreadsheets/d/1fVsE98Ulmt-EIEe4wx8sUdo7RLigDdAVjQxNpAJIrH8/edit#gid=681521237" #print("Reading the catalog from: ", asr_systems_cat_url) catalog = download_tsv_from_google_sheet(asr_systems_cat_url) #print("ASR systems catalog read") #print("Catalog contains information about {} ASR systems".format(len(catalog))) ##print("Catalog columns: ", catalog.columns) ##print("ASR systems available in the catalog: ", catalog["Codename"] ) #print("Filter only the systems we are interested in") catalog = catalog[(catalog["Codename"].isin(asr_systems_list)) | (catalog["Shortname"].isin(asr_systems_list))] return catalog def basic_stats_per_dimension(df_input, metric, dimension): #Median value df_median = df_input.groupby(dimension)[metric].median().sort_values().round(2) #Average value df_avg = df_input.groupby(dimension)[metric].mean().sort_values().round(2) #Standard deviation df_std = df_input.groupby(dimension)[metric].std().sort_values().round(2) # Min df_min = df_input.groupby(dimension)[metric].min().sort_values().round(2) # Max df_max = df_input.groupby(dimension)[metric].max().sort_values().round(2) # concatanate all WER statistics df_stats = pd.concat([df_median, df_avg, df_std, df_min, df_max], axis=1) df_stats.columns = ["med_{}".format(metric), "avg_{}".format(metric), "std_{}".format(metric), "min_{}".format(metric), "max_{}".format(metric)] # sort by median values df_stats = df_stats.sort_values(by="med_{}".format(metric)) return df_stats def ser_from_per_sample_results(df_per_sample, dimension): # group by dimension e.g dataset or sample and calculate fraction of samples with WER equal to 0 df_ser = df_per_sample.groupby(dimension)["WER"].apply(lambda x: (x != 0).mean()*100).sort_values().round(2) # change column names df_ser.name = "SER" return df_ser def get_total_audio_duration(df_per_sample): # filter the df_per_sample dataframe to leave only unique audio recordings df_per_sample_unique_audio = df_per_sample.drop_duplicates(subset='id') # calculate the total size of the dataset in hours based on the list of unique audio recordings total_duration_hours = df_per_sample_unique_audio['audio_duration'].sum() / 3600 #print(f"Total duration of the dataset: {total_duration_hours:.2f} hours") return total_duration_hours def extend_meta_per_sample_words_chars(df_per_sample): # extend the results with the number of words in the reference and hypothesis df_per_sample['ref_words'] = df_per_sample['ref'].apply(lambda x: len(x.split())) df_per_sample['hyp_words'] = df_per_sample['hyp'].apply(lambda x: len(x.split())) # extend the df_per_sample with the number of words per seconds (based on duration column) for reference and hypothesis df_per_sample['ref_wps'] = df_per_sample['ref_words'] / df_per_sample['audio_duration'].round(2) df_per_sample['hyp_wps'] = df_per_sample['hyp_words'] / df_per_sample['audio_duration'].round(2) # extend the df_per_sample with the number of characters per seconds (based on duration column) for reference and hypothesis df_per_sample['ref_cps'] = df_per_sample['ref'].apply(lambda x: len(x)) / df_per_sample['audio_duration'].round(2) df_per_sample['hyp_cps'] = df_per_sample['hyp'].apply(lambda x: len(x)) / df_per_sample['audio_duration'].round(2) # extend the df_per_sample with the number of characters per words for reference and hypothesis df_per_sample['ref_cpw'] = df_per_sample['ref'].apply(lambda x: len(x)) / df_per_sample['ref_words'].round(2) df_per_sample['hyp_cpw'] = df_per_sample['hyp'].apply(lambda x: len(x)) / df_per_sample['hyp_words'].round(2) # extend metadata with number of words and characters return df_per_sample def filter_top_outliers(df_input, metric, max_threshold): # filter out outliers exceeding max_threshold df_filtered = df_input[df_input[metric] < max_threshold] return df_filtered def filter_bottom_outliers(df_input, metric, min_threshold): # filter out outliers below min_threshold df_filtered = df_input[df_input[metric] > min_threshold] return df_filtered def box_plot_per_dimension(df_input, metric, dimension, title, xlabel, ylabel): # Box plot for WER per dataset fig, ax = plt.subplots(figsize=(20, 10)) # generate box plot without outliers sns.boxplot(x=dimension, y=metric, data=df_input, order=df_input.groupby(dimension)[metric].median().sort_values().index, showfliers=False) plt.title(title) plt.xlabel(xlabel) plt.ylabel(ylabel) plt.xticks(rotation=90) #return figure return plt def box_plot_per_dimension_with_colors(df_input, metric, dimension, title, xlabel, ylabel, system_col, type_col): # Create a figure and axis object fig, ax = plt.subplots(figsize=(12, 8)) # Define the order of categories based on the median of the metric order = df_input.groupby(dimension)[metric].median().sort_values().index.tolist() # Create custom color mapping for systems unique_systems = df_input[system_col].unique() # Define your custom colors here system_color_mapping = asr_systems_colors_mapping # For systems not specified, assign colors from a palette remaining_systems = [s for s in unique_systems if s not in system_color_mapping] palette = sns.color_palette("tab10", len(remaining_systems)) system_color_mapping.update(dict(zip(remaining_systems, palette))) # Create hatching patterns for types unique_types = df_input[type_col].unique() type_hatch_mapping = { 'free': '', # No hatching 'commercial': '///', # Diagonal hatching # Add more patterns if needed } # For types not specified, assign default hatches default_hatches = ['', '///', '\\\\', 'xx', '++', '--', '...'] for idx, t in enumerate(unique_types): if t not in type_hatch_mapping: type_hatch_mapping[t] = default_hatches[idx % len(default_hatches)] # Map colors and hatches to each dimension based on system and type dimension_system_mapping = df_input.drop_duplicates(subset=dimension).set_index(dimension)[system_col].reindex(order) colors = dimension_system_mapping.map(system_color_mapping).tolist() dimension_type_mapping = df_input.drop_duplicates(subset=dimension).set_index(dimension)[type_col].reindex(order) hatches = dimension_type_mapping.map(type_hatch_mapping).tolist() # Generate box plot without specifying hue sns.boxplot( x=dimension, y=metric, data=df_input, order=order, ax=ax, showfliers=False, linewidth=1.5, boxprops=dict(facecolor='white') # Set initial facecolor to white ) # Access the box artists box_patches = [patch for patch in ax.artists if isinstance(patch, mpatches.PathPatch)] # Alternatively, you can use ax.patches if ax.artists doesn't work if not box_patches: box_patches = [patch for patch in ax.patches if isinstance(patch, mpatches.PathPatch)] # Color the boxes and apply hatching patterns for patch, color, hatch in zip(box_patches, colors, hatches): patch.set_facecolor(color) patch.set_edgecolor('black') patch.set_linewidth(1.5) patch.set_hatch(hatch) # Create custom legend for systems (colors) system_handles = [] for system in unique_systems: color = system_color_mapping[system] handle = mpatches.Patch(facecolor=color, edgecolor='black', label=system) system_handles.append(handle) # Create custom legend for types (hatching patterns) type_handles = [] for typ in unique_types: hatch = type_hatch_mapping[typ] handle = mpatches.Patch(facecolor='white', edgecolor='black', hatch=hatch, label=typ) type_handles.append(handle) # Add legends to the plot legend1 = ax.legend(handles=system_handles, title='System', bbox_to_anchor=(0.01, 1), loc='upper left') legend2 = ax.legend(handles=type_handles, title='Type', bbox_to_anchor=(0.01, 0.6), loc='upper left') ax.add_artist(legend1) # Add the first legend back to the plot ax.set_title(title) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) # improve readibility of the x-axis labels # decrease the font size of x-axis labels ax.tick_params(axis='x', labelsize=8) # shift left to align the x-axis labels with the boxes ax.set_xticklabels(ax.get_xticklabels(), ha='right') # rotate them by 90 degrees ax.set_xticklabels(ax.get_xticklabels(), rotation=55) # add more granularity to the y-axis. Make sure the y-axis contains 20 ticks ax.yaxis.set_major_locator(plt.MaxNLocator(20)) plt.tight_layout() # Return the figure object return fig def check_impact_of_normalization(data_in, ref_type='orig'): # Filter the data to include only the specific reference type data_ref_type = data_in[data_in['ref_type'] == ref_type] data = data_ref_type.drop(columns=['system','subset', 'ref_type']) # Calculate the average impact of each normalization type on the metrics average_impact = data.groupby('norm_type').mean() baseline_metrics = average_impact.loc['none'] # Calculate the difference in metrics compared to the baseline difference_metrics = average_impact.subtract(baseline_metrics) # Removing the baseline row for clarity difference_metrics = difference_metrics.drop(index='none') # Rounding the results to 2 decimal places difference_metrics_rounded = difference_metrics.round(2) # add column with average impact on error reduction for all metric types difference_metrics_rounded['Average'] = difference_metrics_rounded.mean(axis=1).round(2) # Sorting the results based on the average impact on error reduction. The lower the absolute value, the higher the impact difference_metrics_sorted_abs = difference_metrics_rounded.sort_values(by='Average', key=abs) # Display the resulting differences return(difference_metrics_sorted_abs) def calculate_wer_per_meta_category(df_per_sample, selected_systems, metric, analysis_dimension = 'speaker_gender'): # filter out from df_per_sample rows where analysis_dimension is null df_per_sample_dimension = df_per_sample[df_per_sample[analysis_dimension].notnull()] #print(df_per_sample_dimension) meta_values = df_per_sample_dimension[analysis_dimension].unique() if (analysis_dimension == 'speaker_age'): # sort values in the meta_values list, so the order of the values is consistent, starting from teens, twenties, thirties, fourties, fifties, sixties, seventies, eighties, nineties # Example usage: sorted_values = sort_age_categories(meta_values) #print(sorted_values) print("meta values sorted:", sorted_values) meta_values = sorted_values # calculate number of available systems for specific category #print(df_per_sample_dimension) # create table with number of samples in df_per_sample_single_system for each meta category from meta_values df_per_sample_single_system = df_per_sample_dimension[df_per_sample['system'] == selected_systems[0]] # select the value with the smallest number of available samples for all systems min_samples = 0 df_available_samples_per_category_per_system = {} for system in selected_systems: df_per_sample_single_system = df_per_sample_dimension[df_per_sample['system'] == system] df_available_samples_per_category_per_system[system] = df_per_sample_single_system.groupby(analysis_dimension)[metric].count().reset_index() df_available_samples_per_category_per_system[system] = df_available_samples_per_category_per_system[system].rename(columns={metric: 'available_samples'}) # replace index with values from analysis_dimension df_available_samples_per_category_per_system[system] = df_available_samples_per_category_per_system[system].set_index(analysis_dimension) #print(df_available_samples_per_category_per_system[system]) min_samples_system = df_available_samples_per_category_per_system[system]['available_samples'].min() if (min_samples_system < min_samples) or (min_samples == 0): min_samples = min_samples_system #print(min_samples) # get the subset of the df_per_sample_dimension with results for all systems to analyze df_per_sample_selected_systems = df_per_sample_dimension[df_per_sample['system'].isin(selected_systems)] #print(df_per_sample_selected_systems) # select equal number of samples for each system and analysis_dimension equal to the number of samples for the dimension with the smallest number of samples (min_samples) df_per_sample_selected_systems = df_per_sample_selected_systems.groupby(['system',analysis_dimension]).apply(lambda x: x.sample(min_samples)).reset_index(drop=True) #print(df_per_sample_selected_systems) df_per_sample_metric_dimension = df_per_sample_selected_systems.groupby(['system', analysis_dimension])[metric].mean().round(2).reset_index() df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension.pivot(index=analysis_dimension, columns='system', values=metric) df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.round(2) # add row with the difference between the male and female metric values for values. Add "Difference" row at the end of the dataframe to the index # calculate the difference between the smallest and largest metric values # if there are only two values in the analysis_dimension, calculate the difference between them if len(meta_values) == 2: gap_metrics = ['Difference'] df_per_sample_metric_dimension_pivot.loc[gap_metrics[0]] = df_per_sample_metric_dimension_pivot.loc[meta_values[0]] - df_per_sample_metric_dimension_pivot.loc[meta_values[1]] # if there are more than two values in the analysis_dimension, calculate the difference between the smallest and the largest value elif len(meta_values) > 2: gap_metrics = ['Std Dev', 'MAD', 'Range'] metrics = pd.DataFrame([]) df = df_per_sample_metric_dimension_pivot print(df) # calculate the standard deviation of the metric values metrics[gap_metrics[0]] = df.std() # calculate the mean absolute deviation of the metric values metrics[gap_metrics[1]] = df.apply(lambda x: np.mean(np.abs(x - np.mean(x))), axis=0) # calculate the difference between the smallest and largest metric values metrics[gap_metrics[2]] = df.max() - df.min() metrics_t = metrics.round(2).transpose() print(metrics_t) #concatante the metrics dataframe to the df_per_sample_metric_dimension_pivot df_per_sample_metric_dimension_pivot = pd.concat([df_per_sample_metric_dimension_pivot, metrics_t], axis=0) print(df_per_sample_metric_dimension_pivot) # transpose the dataframe to have systems as rows # sort by the average difference from the smallest to the largest value df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.transpose() df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.sort_values(by=gap_metrics[0], axis=0) # add average, median and standard deviation as the last 3 rows to the dataframe # calculate average, median, and standard deviation of the difference between the smallest and largest metric values avg_difference = df_per_sample_metric_dimension_pivot.mean().round(2) median_difference = df_per_sample_metric_dimension_pivot.median().round(2) std_difference = df_per_sample_metric_dimension_pivot.std().round(2) # add average, median, and standard deviation as the last 3 rows to the dataframe df_per_sample_metric_dimension_pivot.loc['median'] = median_difference df_per_sample_metric_dimension_pivot.loc['average'] = avg_difference df_per_sample_metric_dimension_pivot.loc['std'] = std_difference analyzed_samples_per_category = min_samples # round all values to 2 decimal places df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot.round(2) # keep the order of columns as in the meta_values list columns = list(meta_values) + gap_metrics print(columns) df_per_sample_metric_dimension_pivot = df_per_sample_metric_dimension_pivot[columns] return df_per_sample_metric_dimension_pivot, df_available_samples_per_category_per_system, analyzed_samples_per_category def sort_age_categories(meta_values): order = ["teens", "twenties", "thirties", "fourties", "fifties", "sixties", "seventies", "eighties", "nineties"] order_dict = {age: index for index, age in enumerate(order)} sorted_values = sorted(meta_values, key=lambda x: order_dict.get(x, float('inf'))) return sorted_values def calculate_wer_per_audio_feature(df_per_sample, selected_systems, audio_feature_to_analyze, metric, no_of_buckets): # filter out results for selected systems print(df_per_sample) feature_values_uniq = df_per_sample[audio_feature_to_analyze].unique() df_per_sample_selected_systems = df_per_sample[df_per_sample['system'].isin(selected_systems)] # create buckets based on speech rate words unique values (min, max,step) min_feature_value = round(min(feature_values_uniq), 1) max_feature_value = round(max(feature_values_uniq), 1) step = max_feature_value / no_of_buckets audio_feature_buckets = [min_feature_value + i * step for i in range(no_of_buckets)] # add column with speech_rate_words rounded to nearest bucket value. # map audio duration to the closest bucket df_per_sample[audio_feature_to_analyze + '_bucket'] = df_per_sample[audio_feature_to_analyze].apply( lambda x: min(audio_feature_buckets, key=lambda y: abs(x - y))) # calculate average WER per audio duration bucket df_per_sample_wer_feature = df_per_sample_selected_systems.groupby(['system', audio_feature_to_analyze])[metric].mean().reset_index() # add column with number of samples for specific audio bucket size df_per_sample_wer_feature['number_of_samples'] = df_per_sample_selected_systems.groupby(['system', audio_feature_to_analyze])[metric].count().values df_per_sample_wer_feature = df_per_sample_wer_feature.sort_values(by=audio_feature_to_analyze) # round values in WER column in df_per_sample_wer to 2 decimal places df_per_sample_wer_feature[metric].round(2) # transform df_per_sample_wer. Use system values as columns, while audio_duration_buckets as main index df_per_sample_wer_feature_pivot = df_per_sample_wer_feature.pivot(index=audio_feature_to_analyze, columns='system', values=metric) df_per_sample_wer_feature_pivot = df_per_sample_wer_feature_pivot.round(2) df_per_sample_wer_feature_pivot['number_of_samples'] = df_per_sample_wer_feature[ df_per_sample_wer_feature['system'] == selected_systems[0]].groupby(audio_feature_to_analyze)[ 'number_of_samples'].sum().values # put number_of_samples as the first column after index df_per_sample_wer_feature_pivot = df_per_sample_wer_feature_pivot[ ['number_of_samples'] + [col for col in df_per_sample_wer_feature_pivot.columns if col != 'number_of_samples']] return df_per_sample_wer_feature_pivot, df_per_sample_wer_feature