Spaces:
Runtime error
Runtime error
File size: 5,654 Bytes
745ebf9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import json
import os
from difflib import SequenceMatcher
from typing import Any, Dict, Optional, Tuple
from fastapi import FastAPI, Request, Response
from huggingface_hub import (DatasetCard, HfApi, ModelCard, comment_discussion,
create_discussion, get_discussion_details,
get_repo_discussions, login)
from huggingface_hub.utils import EntryNotFoundError
from tabulate import tabulate
from toolz import valmap
KEY = os.environ.get("WEBHOOK_SECRET")
HF_TOKEN = os.environ.get("HF_TOKEN")
api = HfApi(token=HF_TOKEN)
login(HF_TOKEN)
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World!"}
def similar(a, b):
"""Check similarity of two sequences"""
return SequenceMatcher(None, a, b).ratio()
def create_metadata_key_dict(card_data, repo_type: str):
shared_keys = ["tags", "license"]
if repo_type == "model":
model_keys = ["library_name", "datasets", "metrics", "co2", "pipeline_tag"]
shared_keys.extend(model_keys)
keys = shared_keys
return {key: card_data.get(key) for key in keys}
if repo_type == "dataset":
data_keys = [
"pretty_name",
"size_categories",
"task_categories",
"task_ids",
"source_datasets",
]
shared_keys.extend(data_keys)
keys = shared_keys
return {key: card_data.get(key) for key in keys}
def create_metadata_breakdown_table(desired_metadata_dictionary):
data = valmap(lambda x: x or "Field Missing", desired_metadata_dictionary)
metadata_fields_column = list(data.keys())
metadata_values_column = list(data.values())
table_data = list(zip(metadata_fields_column, metadata_values_column))
return tabulate(
table_data, tablefmt="github", headers=("Metadata Field", "Provided Value")
)
def calculate_grade(desired_metadata_dictionary):
metadata_values = list(desired_metadata_dictionary.values())
score = sum(1 if field else 0 for field in metadata_values) / len(metadata_values)
return round(score, 2)
def create_markdown_report(
desired_metadata_dictionary, repo_name, repo_type, score, update: bool = False
):
report = f"""# {repo_type.title()} metadata report card {"(updated)" if update else ""}
\n
This is an automatically produced metadata quality report card for {repo_name}. This report is meant as a POC!
\n
## Breakdown of metadata fields for your{repo_type}
\n
{create_metadata_breakdown_table(desired_metadata_dictionary)}
\n
You scored a metadata coverage grade of: **{score}**% \n {f"We're not angry we're just disappointed! {repo_type.title()} metadata is super important. Please try harder..."
if score <= 0.5 else f"Not too shabby! Make sure you also fill in a {repo_type} card too!"}
"""
return report
def parse_webhook_post(data: Dict[str, Any]) -> Optional[Tuple[str, str]]:
event = data["event"]
if event["scope"] != "repo":
return None
repo = data["repo"]
repo_name = repo["name"]
repo_type = repo["type"]
if repo_type not in {"model", "dataset"}:
raise ValueError("Unknown hub type")
return repo_type, repo_name
def load_repo_card(repo_type, repo_name):
if repo_type == "dataset":
try:
return DatasetCard.load(repo_name).data.to_dict()
except EntryNotFoundError:
return {}
if repo_type == "model":
try:
return ModelCard.load(repo_name).data.to_dict()
except EntryNotFoundError:
return {}
def create_or_update_report(data):
if parsed_post := parse_webhook_post(data):
repo_type, repo_name = parsed_post
else:
return Response("Unable to parse webhook data", status_code=400)
card_data = load_repo_card(repo_type, repo_name)
desired_metadata_dictionary = create_metadata_key_dict(card_data, repo_type)
score = calculate_grade(desired_metadata_dictionary)
report = create_markdown_report(
desired_metadata_dictionary, repo_name, repo_type, score, update=False
)
repo_discussions = get_repo_discussions(
repo_name,
repo_type=repo_type,
)
for discussion in repo_discussions:
if (
discussion.title == "Metadata Report Card" and discussion.status == "open"
): # An existing open report card thread
discussion_details = get_discussion_details(
repo_name, discussion.num, repo_type=repo_type
)
last_comment = discussion_details.events[-1].content
if similar(report, last_comment) <= 0.999:
report = create_markdown_report(
desired_metadata_dictionary,
repo_name,
repo_type,
score,
update=True,
)
comment_discussion(
repo_name,
discussion.num,
comment=report,
repo_type=repo_type,
)
return True
create_discussion(
repo_name,
"Metadata Report Card",
description=report,
repo_type=repo_type,
)
return True
@app.post("/webhook")
async def webhook(request: Request):
if request.method == "POST":
if request.headers.get("X-Webhook-Secret") != KEY:
return Response("Invalid secret", status_code=401)
payload = await request.body()
data = json.loads(payload)
result = create_or_update_report(data)
return "Webhook received!" if result else result
|