Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import os.path | |
from typing import Dict, List | |
import pandas as pd | |
from src.columns import COL_NAME_IS_ANONYMOUS, COL_NAME_REVISION, COL_NAME_TIMESTAMP | |
from src.envs import BENCHMARK_VERSION_LIST, DEFAULT_METRIC_LONG_DOC, DEFAULT_METRIC_QA | |
from src.models import FullEvalResult, LeaderboardDataStore, TaskType, get_safe_name | |
from src.utils import get_default_cols, get_leaderboard_df | |
pd.options.mode.copy_on_write = True | |
def load_raw_eval_results(results_path: str) -> List[FullEvalResult]: | |
""" | |
Load the evaluation results from a json file | |
""" | |
model_result_filepaths = [] | |
for root, dirs, files in os.walk(results_path): | |
if len(files) == 0: | |
continue | |
# select the latest results | |
for file in files: | |
if not (file.startswith("results") and file.endswith(".json")): | |
print(f"skip {file}") | |
continue | |
model_result_filepaths.append(os.path.join(root, file)) | |
eval_results = {} | |
for model_result_filepath in model_result_filepaths: | |
# create evaluation results | |
try: | |
eval_result = FullEvalResult.init_from_json_file(model_result_filepath) | |
except UnicodeDecodeError: | |
print(f"loading file failed. {model_result_filepath}") | |
continue | |
print(f"file loaded: {model_result_filepath}") | |
timestamp = eval_result.timestamp | |
eval_results[timestamp] = eval_result | |
results = [] | |
for k, v in eval_results.items(): | |
try: | |
v.to_dict() | |
results.append(v) | |
except KeyError: | |
print(f"loading failed: {k}") | |
continue | |
return results | |
def load_leaderboard_datastore(file_path, version) -> LeaderboardDataStore: | |
slug = get_safe_name(version)[-4:] | |
datastore = LeaderboardDataStore(version, slug, None, None, None, None, None, None, None, None) | |
datastore.raw_data = load_raw_eval_results(file_path) | |
print(f"raw data: {len(datastore.raw_data)}") | |
datastore.qa_raw_df = get_leaderboard_df(datastore, TaskType.qa, DEFAULT_METRIC_QA) | |
print(f"QA data loaded: {datastore.qa_raw_df.shape}") | |
datastore.qa_fmt_df = datastore.qa_raw_df.copy() | |
qa_cols, datastore.qa_types = get_default_cols(TaskType.qa, datastore.slug, add_fix_cols=True) | |
datastore.qa_fmt_df = datastore.qa_fmt_df[~datastore.qa_fmt_df[COL_NAME_IS_ANONYMOUS]][qa_cols] | |
datastore.qa_fmt_df.drop([COL_NAME_REVISION, COL_NAME_TIMESTAMP], axis=1, inplace=True) | |
datastore.doc_raw_df = get_leaderboard_df(datastore, TaskType.long_doc, DEFAULT_METRIC_LONG_DOC) | |
print(f"Long-Doc data loaded: {len(datastore.doc_raw_df)}") | |
datastore.doc_fmt_df = datastore.doc_raw_df.copy() | |
doc_cols, datastore.doc_types = get_default_cols(TaskType.long_doc, datastore.slug, add_fix_cols=True) | |
datastore.doc_fmt_df = datastore.doc_fmt_df[~datastore.doc_fmt_df[COL_NAME_IS_ANONYMOUS]][doc_cols] | |
datastore.doc_fmt_df.drop([COL_NAME_REVISION, COL_NAME_TIMESTAMP], axis=1, inplace=True) | |
datastore.reranking_models = sorted( | |
list(frozenset([eval_result.reranking_model for eval_result in datastore.raw_data])) | |
) | |
return datastore | |
def load_eval_results(file_path: str) -> Dict[str, LeaderboardDataStore]: | |
output = {} | |
for version in BENCHMARK_VERSION_LIST: | |
fn = f"{file_path}/{version}" | |
output[version] = load_leaderboard_datastore(fn, version) | |
return output | |