Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import itertools | |
import os | |
import numpy as np | |
import pandas as pd | |
from datasets import load_dataset | |
import style | |
from style import LANG_SYMBOLS, T_SYMBOLS | |
ZERO_SHOT_ONLY = ["BELEBELE"] | |
FEW_SHOT_ONLY = ["GSM8K", "TruthfulQA"] | |
def init(): | |
global repo_id, config_name, split_name, hidden_df, task_group_names_list, task_group_type_dict, task_groups_shots_dict, languages_list, model_type_dict | |
repo_id = os.getenv("OGX_LEADERBOARD_DATASET_NAME") | |
config_name = os.getenv("OGX_LEADERBOARD_DATASET_CONFIG") | |
split_name = os.getenv("OGX_LEADERBOARD_DATASET_SPLIT") | |
dataset = load_dataset(repo_id, config_name, split=split_name) | |
hidden_df = dataset.to_pandas() | |
task_group_names_list = hidden_df["Task_Group"].unique().tolist() | |
task_group_type_df = hidden_df[["Task_Group", "Task_Type"]].drop_duplicates() | |
task_group_type_dict = task_group_type_df.set_index("Task_Group")["Task_Type"].to_dict() | |
task_groups_shots_df = hidden_df[hidden_df["Few_Shot"] == True][["Task_Group", "Number_Shots"]].drop_duplicates() | |
task_groups_shots_dict = task_groups_shots_df.set_index("Task_Group")["Number_Shots"].to_dict() | |
languages_list = hidden_df["Language"].drop_duplicates().str.upper().tolist() | |
model_type_df = hidden_df[["Model_Name", "Model_Type"]].drop_duplicates() | |
model_type_dict = model_type_df.set_index("Model_Name")["Model_Type"].to_dict() | |
hidden_df = hidden_df.pivot_table( | |
columns=["Task_Group", "Few_Shot", "Language"], | |
index=["Model_Name"], | |
values="Value", | |
dropna=False, | |
).reset_index(inplace=False) | |
hidden_df["Type"] = hidden_df["Model_Name"].apply(lambda x: style.T_SYMBOLS[model_type_dict[x]]) | |
def sort_cols(df: pd.DataFrame, fewshot: bool = False) -> pd.DataFrame: | |
task_cols = get_task_columns(df) | |
return df.reindex(["Type", "Model_Name", "Average"] + sorted(task_cols), axis=1) | |
def get_task_columns(df: pd.DataFrame) -> pd.DataFrame: | |
l = list(df.columns) | |
l.remove("Model_Name") | |
l.remove("Average") | |
l.remove("Type") | |
return l | |
def get_models(df: pd.DataFrame) -> pd.DataFrame: | |
return df["Model_Name"].unique() | |
def filter_type(df: pd.DataFrame, model_types: list[str]) -> pd.DataFrame: | |
"""Keep only rows for which model type is in list of types""" | |
return df[df["Type"].isin(model_types)] | |
def search_model(df: pd.DataFrame, query: str) -> pd.DataFrame: | |
"""Keep only rows for which model name matches search query""" | |
query = query.replace(";", "|") | |
return df[df["Model_Name"].str.contains(query, case=False)] | |
def aggregate_langs(df: pd.DataFrame, tasks: list, langs: list): | |
"""Aggregates results over langs for each task in tasks. | |
If a language does not exist for a task, the aggregate for | |
that task will be shown as NaN. | |
""" | |
langs_lower = [item.lower() for item in langs] | |
df.columns = ["_".join(filter(None, col)) for col in df.columns] | |
colset = set(df.columns) | |
for t in tasks: | |
cols = [(f"{a}_{b}") for a, b in itertools.product([t], langs_lower)] | |
if set(cols).issubset(colset): | |
df.loc[:, t] = df[cols].mean(axis=1, skipna=False) | |
else: | |
df.loc[:, t] = np.nan | |
df.loc[:, "Average"] = df[tasks].mean(axis=1) | |
return df[["Type", "Model_Name", "Average"] + tasks] | |
def select_shots(df: pd.DataFrame, fewshot: bool = False): | |
cols = [col for col in df.columns if col[1] == fewshot] + [] | |
# Move model name and type icon to the end | |
cols.append(("Model_Name", "", "")) | |
cols.append(("Type", "", "")) | |
return df[cols].droplevel(level=1, axis="columns") | |
def update_df( | |
tasks: list[str], | |
model_query: str, | |
langs: list[str], | |
model_types: list[str], | |
fewshot: bool = False, | |
format: bool = True, | |
) -> pd.DataFrame: | |
"""Return a filtered dataframe according to selected models, tasks and | |
languages. The format flag controls whether the output dataframe should | |
be formatted to tw significant figures. | |
""" | |
# keep only selected shots | |
df = select_shots(hidden_df, fewshot) | |
# aggregate results over languages per task | |
df = aggregate_langs(df, tasks, langs) | |
df = df.sort_values(by="Average", ascending=False) | |
# filter models by search bar and model type | |
df = search_model(df, model_query) | |
df = filter_type(df, model_types) | |
if format: | |
return sort_cols(df, fewshot).style.format(precision=2, decimal=".", na_rep="N/A") | |
else: | |
return sort_cols(df, fewshot) | |
def get_selected_task_type(task_type_id): | |
task_types = {0: "accuracy", 1: "misc"} | |
selected_task_type = task_types[task_type_id] | |
return selected_task_type | |
def get_available_task_groups(selected_task_type, fewshot): | |
task_groups = [task_group_name for task_group_name, task_type in task_group_type_dict.items() if task_type == selected_task_type] | |
if fewshot: | |
available_tasks = [c for c in task_groups if c not in ZERO_SHOT_ONLY] | |
else: | |
available_tasks = [c for c in task_groups if c not in FEW_SHOT_ONLY] | |
return available_tasks | |
init() | |