Spaces:
Running
Running
import json | |
import logging | |
import re | |
import huggingface_hub | |
from huggingface_hub import HfApi, ModelCard, hf_hub_download | |
from huggingface_hub.hf_api import ( | |
ModelInfo, | |
get_safetensors_metadata, | |
parse_safetensors_file_metadata, | |
) | |
from transformers import AutoConfig, AutoTokenizer | |
from src.submit import ModelSizeChecker | |
# ht to @Wauplin, thank you for the snippet! | |
# See https://huggingface.co/spaces/open-llm-leaderboard/open_llm_leaderboard/discussions/317 | |
def check_model_card(repo_id: str) -> tuple[bool, str]: | |
# Returns operation status, and error message | |
try: | |
card = ModelCard.load(repo_id) | |
except huggingface_hub.utils.EntryNotFoundError: | |
return ( | |
False, | |
"Please add a model card to your model to explain how you trained/fine-tuned it.", | |
None, | |
) | |
# Enforce license metadata | |
if card.data.license is None and not ( | |
"license_name" in card.data and "license_link" in card.data | |
): | |
return ( | |
False, | |
( | |
"License not found. Please add a license to your model card using the `license` metadata or a" | |
" `license_name`/`license_link` pair." | |
), | |
None, | |
) | |
# Enforce card content | |
if len(card.text) < 200: | |
return ( | |
False, | |
"Please add a description to your model card, it is too short.", | |
None, | |
) | |
return True, "", card | |
def is_model_on_hub( | |
model_name: str, | |
revision: str, | |
token: str | None = None, | |
trust_remote_code: bool = False, | |
test_tokenizer: bool = False, | |
) -> tuple[bool, str, AutoConfig]: | |
try: | |
config = AutoConfig.from_pretrained( | |
model_name, | |
revision=revision, | |
trust_remote_code=trust_remote_code, | |
token=token, | |
force_download=True, | |
) | |
if test_tokenizer: | |
try: | |
_ = AutoTokenizer.from_pretrained( | |
model_name, | |
revision=revision, | |
trust_remote_code=trust_remote_code, | |
token=token, | |
) | |
except ValueError as e: | |
return ( | |
False, | |
f"uses a tokenizer which is not in a transformers release: {e}", | |
None, | |
) | |
except Exception: | |
return ( | |
False, | |
"'s tokenizer cannot be loaded. Is your tokenizer class in a stable transformers release, and correctly configured?", | |
None, | |
) | |
except Exception: | |
return ( | |
False, | |
"'s tokenizer cannot be loaded. Is your tokenizer class in a stable transformers release, and correctly configured?", | |
None, | |
) | |
return True, None, config | |
except ValueError: | |
return ( | |
False, | |
"needs to be launched with `trust_remote_code=True`. For safety reason, we do not allow these models to be automatically submitted to the leaderboard.", | |
None, | |
) | |
except Exception as e: | |
if "You are trying to access a gated repo." in str(e): | |
return True, "uses a gated model.", None | |
return ( | |
False, | |
f"was not found or misconfigured on the hub! Error raised was {e.args[0]}", | |
None, | |
) | |
def get_model_size( | |
model_info: ModelInfo, precision: str, base_model: str | None | |
) -> tuple[float | None, str]: | |
size_pattern = re.compile(r"(\d+\.)?\d+(b|m)") | |
safetensors = None | |
adapter_safetensors = None | |
# hack way to check that model is adapter | |
is_adapter = "adapter_config.json" in ( | |
s.rfilename for s in model_info.siblings | |
) | |
try: | |
if is_adapter: | |
if not base_model: | |
return ( | |
None, | |
"Adapter model submission detected. Please ensure the base model information is provided.", | |
) | |
adapter_safetensors = parse_safetensors_file_metadata( | |
model_info.id, "adapter_model.safetensors" | |
) | |
safetensors = get_safetensors_metadata(base_model) | |
else: | |
safetensors = get_safetensors_metadata(model_info.id) | |
except Exception as e: | |
logging.warning( | |
f"Failed to get safetensors metadata for model {model_info.id}: {e!s}" | |
) | |
if safetensors is not None: | |
model_size = sum(safetensors.parameter_count.values()) | |
if adapter_safetensors is not None: | |
model_size += sum(safetensors.parameter_count.values()) | |
model_size = round(model_size / 1e9, 3) | |
else: | |
try: | |
size_match = re.search(size_pattern, model_info.id.lower()) | |
if size_match: | |
model_size = size_match.group(0) | |
model_size = round( | |
float(model_size[:-1]) | |
if model_size[-1] == "b" | |
else float(model_size[:-1]) / 1e3, | |
3, | |
) | |
else: | |
return None, "Unknown model size" | |
except AttributeError: | |
logging.warning( | |
f"Unable to parse model size from ID: {model_info.id}" | |
) | |
return None, "Unknown model size" | |
size_factor = ( | |
8 if (precision == "GPTQ" or "gptq" in model_info.id.lower()) else 1 | |
) | |
model_size = size_factor * model_size | |
return model_size, "" | |
def get_model_arch(model_info: ModelInfo): | |
return model_info.config.get("architectures", "Unknown") | |
def check_chat_template(model: str, revision: str) -> tuple[bool, str]: | |
try: | |
# Attempt to download only the tokenizer_config.json file | |
config_file = hf_hub_download( | |
repo_id=model, | |
filename="tokenizer_config.json", | |
revision=revision, | |
repo_type="model", | |
) | |
# Read and parse the tokenizer_config.json file | |
with open(config_file, "r") as f: | |
tokenizer_config = json.load(f) | |
# Check if chat_template exists in the tokenizer configuration | |
if "chat_template" not in tokenizer_config: | |
return ( | |
False, | |
f"The model {model} doesn't have a chat_template in its tokenizer_config.json. Please add a chat_template before submitting or submit without it.", | |
) | |
return True, "" | |
except Exception as e: | |
return ( | |
False, | |
f"Error checking chat_template for model {model}: {str(e)}", | |
) | |
def get_model_tags(model_card, model: str): | |
is_merge_from_metadata = False | |
is_moe_from_metadata = False | |
tags = [] | |
if model_card is None: | |
return tags | |
if model_card.data.tags: | |
is_merge_from_metadata = any( | |
[ | |
tag in model_card.data.tags | |
for tag in ["merge", "moerge", "mergekit", "lazymergekit"] | |
] | |
) | |
is_moe_from_metadata = any( | |
[tag in model_card.data.tags for tag in ["moe", "moerge"]] | |
) | |
is_merge_from_model_card = any( | |
keyword in model_card.text.lower() | |
for keyword in ["merged model", "merge model", "moerge"] | |
) | |
if is_merge_from_model_card or is_merge_from_metadata: | |
tags.append("merge") | |
is_moe_from_model_card = any( | |
keyword in model_card.text.lower() for keyword in ["moe", "mixtral"] | |
) | |
# Hardcoding because of gating problem | |
if "Qwen/Qwen1.5-32B" in model: | |
is_moe_from_model_card = False | |
is_moe_from_name = "moe" in model.lower().replace("/", "-").replace( | |
"_", "-" | |
).split("-") | |
if is_moe_from_model_card or is_moe_from_name or is_moe_from_metadata: | |
tags.append("moe") | |
return tags | |
def validate_model( | |
model, precision, base_model, weight_type, use_chat_template | |
): | |
""" | |
Validate model with some checkers to assure tha can be evaluated | |
:param model: hf model name | |
:param precision: model parameters data type | |
:param base_model: base model (if it is need it) | |
:param weight_type: | |
:param use_chat_template: | |
:return: | |
""" | |
API = HfApi() | |
try: | |
model_info = API.model_info(repo_id=model, revision="main") | |
except: | |
return ( | |
"Could not get your model information. Please fill it up properly." | |
) | |
# Check model size early | |
model_size, error_text = get_model_size( | |
model_info=model_info, precision=precision, base_model=base_model | |
) | |
if model_size is None: | |
return error_text | |
# Absolute size limit for float16 and bfloat16 | |
if precision in ["float16", "bfloat16"] and model_size > 100: | |
error_message = ( | |
f"Sadly, models larger than 100B parameters cannot be submitted in {precision} precision at this time. " | |
f"Your model size: {model_size:.2f}B parameters." | |
) | |
return error_message | |
# Precision-adjusted size limit for 8bit, 4bit, and GPTQ | |
if precision in ["8bit", "4bit", "GPTQ"]: | |
size_checker = ModelSizeChecker( | |
model=model, precision=precision, model_size_in_b=model_size | |
) | |
if not size_checker.can_evaluate(): | |
precision_factor = size_checker.get_precision_factor() | |
max_size = 140 * precision_factor | |
error_message = ( | |
f"Sadly, models this big ({model_size:.2f}B parameters) cannot be evaluated automatically " | |
f"at the moment on our cluster. The maximum size for {precision} precision is {max_size:.2f}B parameters." | |
) | |
return error_message | |
architecture = "?" | |
# Is the model on the hub? | |
if weight_type in ["Delta", "Adapter"]: | |
base_model_on_hub, error, _ = is_model_on_hub( | |
model_name=base_model, | |
revision="main", | |
token=None, | |
test_tokenizer=True, | |
) | |
if not base_model_on_hub: | |
return f'Base model "{base_model}" {error}' | |
if not weight_type == "Adapter": | |
model_on_hub, error, model_config = is_model_on_hub( | |
model_name=model, revision=model_info.sha, test_tokenizer=True | |
) | |
if not model_on_hub or model_config is None: | |
return f'Model "{model}" {error}' | |
if model_config is not None: | |
architectures = getattr(model_config, "architectures", None) | |
if architectures: | |
architecture = ";".join(architectures) | |
# Were the model card and license filled? | |
try: | |
_ = model_info.cardData["license"] | |
except Exception: | |
return "Please select a license for your model" | |
modelcard_OK, error_msg, model_card = check_model_card(model) | |
if not modelcard_OK: | |
return error_msg | |
# Check the chat template submission | |
if use_chat_template: | |
chat_template_valid, chat_template_error = check_chat_template( | |
model, "main" | |
) | |
if not chat_template_valid: | |
return chat_template_error | |
return None | |