Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 5,082 Bytes
2b62c4c a200cc8 2b62c4c 625e239 2b62c4c a200cc8 5373df2 2b62c4c 65504f2 2b62c4c 625e239 2b62c4c 625e239 2b62c4c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
import itertools
import os
import numpy as np
import pandas as pd
from datasets import load_dataset
import style
from style import LANG_SYMBOLS, T_SYMBOLS
ZERO_SHOT_ONLY = ["BELEBELE"]
FEW_SHOT_ONLY = ["GSM8K", "TruthfulQA"]
def init():
global repo_id, config_name, split_name, hidden_df, task_group_names_list, task_group_type_dict, task_groups_shots_dict, languages_list, model_type_dict
repo_id = os.getenv("OGX_LEADERBOARD_DATASET_NAME")
config_name = os.getenv("OGX_LEADERBOARD_DATASET_CONFIG")
split_name = os.getenv("OGX_LEADERBOARD_DATASET_SPLIT")
dataset = load_dataset(repo_id, config_name, split=split_name)
hidden_df = dataset.to_pandas()
task_group_names_list = hidden_df["Task_Group"].unique().tolist()
task_group_type_df = hidden_df[["Task_Group", "Task_Type"]].drop_duplicates()
task_group_type_dict = task_group_type_df.set_index("Task_Group")["Task_Type"].to_dict()
task_groups_shots_df = hidden_df[hidden_df["Few_Shot"] == True][["Task_Group", "Number_Shots"]].drop_duplicates()
task_groups_shots_dict = task_groups_shots_df.set_index("Task_Group")["Number_Shots"].to_dict()
languages_list = hidden_df["Language"].drop_duplicates().str.upper().tolist()
model_type_df = hidden_df[["Model_Name", "Model_Type"]].drop_duplicates()
model_type_dict = model_type_df.set_index("Model_Name")["Model_Type"].to_dict()
hidden_df = hidden_df.pivot_table(
columns=["Task_Group", "Few_Shot", "Language"],
index=["Model_Name"],
values="Value",
dropna=False,
).reset_index(inplace=False)
hidden_df["Type"] = hidden_df["Model_Name"].apply(lambda x: style.T_SYMBOLS[model_type_dict[x]])
def sort_cols(df: pd.DataFrame, fewshot: bool = False) -> pd.DataFrame:
task_cols = get_task_columns(df)
return df.reindex(["Type", "Model_Name", "Average"] + sorted(task_cols), axis=1)
def get_task_columns(df: pd.DataFrame) -> pd.DataFrame:
l = list(df.columns)
l.remove("Model_Name")
l.remove("Average")
l.remove("Type")
return l
def get_models(df: pd.DataFrame) -> pd.DataFrame:
return df["Model_Name"].unique()
def filter_type(df: pd.DataFrame, model_types: list[str]) -> pd.DataFrame:
"""Keep only rows for which model type is in list of types"""
return df[df["Type"].isin(model_types)]
def search_model(df: pd.DataFrame, query: str) -> pd.DataFrame:
"""Keep only rows for which model name matches search query"""
query = query.replace(";", "|")
return df[df["Model_Name"].str.contains(query, case=False)]
def aggregate_langs(df: pd.DataFrame, tasks: list, langs: list):
"""Aggregates results over langs for each task in tasks.
If a language does not exist for a task, the aggregate for
that task will be shown as NaN.
"""
langs_lower = [item.lower() for item in langs]
df.columns = ["_".join(filter(None, col)) for col in df.columns]
colset = set(df.columns)
for t in tasks:
cols = [(f"{a}_{b}") for a, b in itertools.product([t], langs_lower)]
if set(cols).issubset(colset):
df.loc[:, t] = df[cols].mean(axis=1, skipna=False)
else:
df.loc[:, t] = np.nan
df.loc[:, "Average"] = df[tasks].mean(axis=1)
return df[["Type", "Model_Name", "Average"] + tasks]
def select_shots(df: pd.DataFrame, fewshot: bool = False):
cols = [col for col in df.columns if col[1] == fewshot] + []
# Move model name and type icon to the end
cols.append(("Model_Name", "", ""))
cols.append(("Type", "", ""))
return df[cols].droplevel(level=1, axis="columns")
def update_df(
tasks: list[str],
model_query: str,
langs: list[str],
model_types: list[str],
fewshot: bool = False,
format: bool = True,
) -> pd.DataFrame:
"""Return a filtered dataframe according to selected models, tasks and
languages. The format flag controls whether the output dataframe should
be formatted to tw significant figures.
"""
# keep only selected shots
df = select_shots(hidden_df, fewshot)
# aggregate results over languages per task
df = aggregate_langs(df, tasks, langs)
df = df.sort_values(by="Average", ascending=False)
# filter models by search bar and model type
df = search_model(df, model_query)
df = filter_type(df, model_types)
if format:
return sort_cols(df, fewshot).style.format(precision=2, decimal=".", na_rep="N/A")
else:
return sort_cols(df, fewshot)
def get_selected_task_type(task_type_id):
task_types = {0: "accuracy", 1: "misc"}
selected_task_type = task_types[task_type_id]
return selected_task_type
def get_available_task_groups(selected_task_type, fewshot):
task_groups = [task_group_name for task_group_name, task_type in task_group_type_dict.items() if task_type == selected_task_type]
if fewshot:
available_tasks = [c for c in task_groups if c not in ZERO_SHOT_ONLY]
else:
available_tasks = [c for c in task_groups if c not in FEW_SHOT_ONLY]
return available_tasks
init()
|