|
import gradio as gr |
|
import pandas as pd |
|
from apscheduler.schedulers.background import BackgroundScheduler |
|
from huggingface_hub import snapshot_download |
|
|
|
from src.display.about import ( |
|
CITATION_BUTTON_LABEL, |
|
CITATION_BUTTON_TEXT, |
|
EVALUATION_QUEUE_TEXT, |
|
INTRODUCTION_TEXT, |
|
LLM_BENCHMARKS_TEXT, |
|
TITLE, |
|
) |
|
from src.display.css_html_js import custom_css |
|
from src.display.utils import ( |
|
BENCHMARK_COLS, |
|
COLS, |
|
EVAL_COLS, |
|
EVAL_TYPES, |
|
NUMERIC_INTERVALS, |
|
TYPES, |
|
AutoEvalColumn, |
|
ModelType, |
|
fields, |
|
WeightType, |
|
Precision |
|
) |
|
from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, TOKEN, QUEUE_REPO, REPO_ID, RESULTS_REPO |
|
from src.populate import get_evaluation_queue_df, get_leaderboard_df |
|
from src.submission.submit import add_new_eval |
|
|
|
|
|
def restart_space(): |
|
API.restart_space(repo_id=REPO_ID, token=TOKEN) |
|
|
|
try: |
|
print(EVAL_REQUESTS_PATH) |
|
print("Downloading eval requests") |
|
snapshot_download( |
|
repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30 |
|
) |
|
except Exception: |
|
restart_space() |
|
try: |
|
print(EVAL_RESULTS_PATH) |
|
print("Downloading results into local cache") |
|
snapshot_download( |
|
repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30 |
|
) |
|
except Exception: |
|
restart_space() |
|
|
|
|
|
raw_data, original_df = get_leaderboard_df(EVAL_RESULTS_PATH, EVAL_REQUESTS_PATH, COLS, BENCHMARK_COLS) |
|
print("og df: ", original_df) |
|
leaderboard_df = original_df.copy() |
|
|
|
( |
|
finished_eval_queue_df, |
|
running_eval_queue_df, |
|
pending_eval_queue_df, |
|
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS) |
|
|
|
|
|
|
|
def update_table( |
|
hidden_df: pd.DataFrame, |
|
columns: list, |
|
type_query: list, |
|
precision_query: str, |
|
size_query: list, |
|
show_deleted: bool, |
|
query: str, |
|
): |
|
filtered_df = filter_models(hidden_df, type_query, size_query, precision_query, show_deleted) |
|
filtered_df = filter_queries(query, filtered_df) |
|
df = select_columns(filtered_df, columns) |
|
return df |
|
|
|
|
|
def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame: |
|
return df[(df[AutoEvalColumn.dummy.name].str.contains(query, case=False))] |
|
|
|
|
|
def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame: |
|
always_here_cols = [ |
|
AutoEvalColumn.model_type_symbol.name, |
|
AutoEvalColumn.model.name, |
|
] |
|
|
|
filtered_df = df[ |
|
always_here_cols + [c for c in COLS if c in df.columns and c in columns] + [AutoEvalColumn.dummy.name] |
|
] |
|
return filtered_df |
|
|
|
|
|
def filter_queries(query: str, filtered_df: pd.DataFrame) -> pd.DataFrame: |
|
final_df = [] |
|
if query != "": |
|
queries = [q.strip() for q in query.split(";")] |
|
for _q in queries: |
|
_q = _q.strip() |
|
if _q != "": |
|
temp_filtered_df = search_table(filtered_df, _q) |
|
if len(temp_filtered_df) > 0: |
|
final_df.append(temp_filtered_df) |
|
if len(final_df) > 0: |
|
filtered_df = pd.concat(final_df) |
|
filtered_df = filtered_df.drop_duplicates( |
|
subset=[AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name] |
|
) |
|
|
|
return filtered_df |
|
|
|
|
|
def filter_models( |
|
df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, show_deleted: bool |
|
) -> pd.DataFrame: |
|
|
|
if show_deleted: |
|
filtered_df = df |
|
else: |
|
filtered_df = df[df[AutoEvalColumn.still_on_hub.name] == True] |
|
|
|
type_emoji = [t[0] for t in type_query] |
|
filtered_df = filtered_df.loc[df[AutoEvalColumn.model_type_symbol.name].isin(type_emoji)] |
|
filtered_df = filtered_df.loc[df[AutoEvalColumn.precision.name].isin(precision_query + ["None"])] |
|
|
|
numeric_interval = pd.IntervalIndex(sorted([NUMERIC_INTERVALS[s] for s in size_query])) |
|
params_column = pd.to_numeric(df[AutoEvalColumn.params.name], errors="coerce") |
|
mask = params_column.apply(lambda x: any(numeric_interval.contains(x))) |
|
filtered_df = filtered_df.loc[mask] |
|
|
|
return filtered_df |
|
|
|
|
|
leaderboard_df = filter_models( |
|
df=leaderboard_df, |
|
type_query=[t.to_str(" : ") for t in ModelType], |
|
size_query=list(NUMERIC_INTERVALS.keys()), |
|
precision_query=[i.value.name for i in Precision], |
|
show_deleted=False, |
|
) |
|
|
|
import unicodedata |
|
|
|
def is_valid_unicode(char): |
|
try: |
|
unicodedata.name(char) |
|
return True |
|
except ValueError: |
|
return False |
|
|
|
def remove_invalid_unicode(input_string): |
|
if isinstance(input_string, str): |
|
valid_chars = [char for char in input_string if is_valid_unicode(char)] |
|
return ''.join(valid_chars) |
|
else: |
|
return input_string |
|
|
|
dummy1 = gr.Textbox(visible=False) |
|
|
|
hidden_leaderboard_table_for_search = gr.components.Dataframe( |
|
headers=COLS, |
|
datatype=TYPES, |
|
visible=False, |
|
line_breaks=False, |
|
interactive=False |
|
) |
|
|
|
def display(x, y): |
|
|
|
for column in leaderboard_df.columns: |
|
if leaderboard_df[column].dtype == 'object': |
|
leaderboard_df[column] = leaderboard_df[column].apply(remove_invalid_unicode) |
|
|
|
subset_df = leaderboard_df[COLS] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return subset_df |
|
|
|
INTRODUCTION_TEXT = """ |
|
This is a copied space from Open Source LLM leaderboard. Instead of displaying |
|
the results as table the space simply provides a gradio API interface to access |
|
the full leaderboard data easily. |
|
Example python on how to access the data: |
|
```python |
|
from gradio_client import Client |
|
import json |
|
client = Client("https://felixz-open-llm-leaderboard.hf.space/") |
|
json_data = client.predict("","", api_name='/predict') |
|
with open(json_data, 'r') as file: |
|
file_data = file.read() |
|
# Load the JSON data |
|
data = json.loads(file_data) |
|
# Get the headers and the data |
|
headers = data['headers'] |
|
data = data['data'] |
|
``` |
|
""" |
|
|
|
interface = gr.Interface( |
|
fn=display, |
|
inputs=[gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text"), dummy1], |
|
outputs=[hidden_leaderboard_table_for_search] |
|
) |
|
|
|
scheduler.add_job(restart_space, "interval", seconds=1800) |
|
scheduler.start() |
|
scheduler = BackgroundScheduler() |
|
|
|
interface.launch() |