|
from constants import EVAL_REQUESTS_PATH |
|
from pathlib import Path |
|
from huggingface_hub import HfApi |
|
from dotenv import load_dotenv |
|
import git |
|
import os |
|
|
|
load_dotenv() |
|
|
|
|
|
TOKEN_HUB = os.environ.get("TOKEN_HUB_V2", None) |
|
|
|
QUEUE_REPO = os.environ.get("QUEUE_REPO", None) |
|
|
|
QUEUE_PATH = os.environ.get("QUEUE_PATH", None) |
|
|
|
hf_api = HfApi( |
|
endpoint="https://huggingface.co", |
|
token=TOKEN_HUB, |
|
) |
|
|
|
|
|
def load_all_info_from_dataset_hub(): |
|
eval_queue_repo = None |
|
csv_results = None |
|
requested_models = None |
|
|
|
if TOKEN_HUB is None: |
|
print( |
|
"No HuggingFace token provided. Skipping evaluation requests and results." |
|
) |
|
return eval_queue_repo, requested_models, csv_results |
|
else: |
|
print("Pulling evaluation requests and results.") |
|
|
|
|
|
user_name = QUEUE_REPO.split("/")[0] |
|
repo_url = ( |
|
f"https://{user_name}:{TOKEN_HUB}@huggingface.co/datasets/{QUEUE_REPO}" |
|
) |
|
git.Repo.clone_from(repo_url, QUEUE_PATH) |
|
|
|
|
|
directory = QUEUE_PATH / EVAL_REQUESTS_PATH |
|
requested_models = get_all_requested_models(directory) |
|
requested_models = [p.stem for p in requested_models] |
|
|
|
csv_results = get_csv_with_results(QUEUE_PATH) |
|
|
|
return eval_queue_repo, requested_models, csv_results |
|
|
|
|
|
def upload_file(requested_model_name, path_or_fileobj): |
|
dest_repo_file = Path(EVAL_REQUESTS_PATH) / path_or_fileobj.name |
|
dest_repo_file = str(dest_repo_file) |
|
hf_api.upload_file( |
|
path_or_fileobj=path_or_fileobj, |
|
path_in_repo=str(dest_repo_file), |
|
repo_id=QUEUE_REPO, |
|
token=TOKEN_HUB, |
|
repo_type="dataset", |
|
commit_message=f"Add {requested_model_name} to eval queue", |
|
) |
|
|
|
|
|
def get_all_requested_models(directory): |
|
directory = Path(directory) |
|
all_requested_models = list(directory.glob("*.txt")) |
|
return all_requested_models |
|
|
|
|
|
def get_csv_with_results(directory): |
|
directory = Path(directory) |
|
all_csv_files = list(directory.glob("*.csv")) |
|
latest = [f for f in all_csv_files if f.stem.endswith("latest")] |
|
if len(latest) != 1: |
|
return None |
|
return latest[0] |
|
|
|
|
|
def is_model_on_hub(model_name, revision="main") -> bool: |
|
try: |
|
model_name = model_name.replace(" ", "") |
|
author = model_name.split("/")[0] |
|
model_id = model_name.split("/")[1] |
|
if len(author) == 0 or len(model_id) == 0: |
|
return ( |
|
False, |
|
"is not a valid model name. Please use the format `author/model_name`.", |
|
) |
|
except Exception: |
|
return ( |
|
False, |
|
"is not a valid model name. Please use the format `author/model_name`.", |
|
) |
|
|
|
try: |
|
models = list(hf_api.list_models(author=author, search=model_id)) |
|
matched = [model_name for m in models if m.modelId == model_name] |
|
if len(matched) != 1: |
|
return False, "was not found on the hub!" |
|
else: |
|
return True, None |
|
except Exception as e: |
|
print(f"Could not get the model from the hub.: {e}") |
|
return False, "was not found on hub!" |
|
|