John6666's picture
Upload app.py
1d13075 verified
raw
history blame
17.4 kB
import gradio as gr
import requests
from pathlib import Path
import re
import os
import tempfile
import shutil
import urllib
from huggingface_hub import whoami, HfApi, hf_hub_download, RepoCard
from huggingface_hub.utils import build_hf_headers, hf_raise_for_status
from gradio_huggingfacehub_search import HuggingfaceHubSearch
ENDPOINT = "https://huggingface.co"
# ENDPOINT = "http://localhost:5564"
REPO_TYPES = ["model", "dataset", "space"]
HF_REPO = os.environ.get("HF_REPO") if os.environ.get("HF_REPO") else "" # set your default repo
HF_REPO_PREFIX = os.environ.get("HF_REPO_PREFIX") if os.environ.get("HF_REPO_PREFIX") else "" # set your default repo prefix
HF_REPO_SUFFIX = os.environ.get("HF_REPO_SUFFIX") if os.environ.get("HF_REPO_SUFFIX") else "" # set your default repo suffix
HF_USER = os.environ.get("HF_USER") if os.environ.get("HF_USER") else "" # set your username
REGEX_HF_REPO = r'^[\w_\-\.]+/[\w_\-\.]+$'
def remove_repo_tags(repo_id: str, tags: list[str], repo_type: str, hf_token: str):
try:
card = RepoCard.load(repo_id, repo_type=repo_type, token=hf_token)
orig_content = card.content
for tag in tags:
if 'tags' in card.data and tag in card.data['tags']: card.data['tags'].remove(tag)
if card.content == orig_content: return
card.push_to_hub(repo_id=repo_id, repo_type=repo_type, token=hf_token)
except Exception as e:
print(f"Failed to remove tags from repocard. {e}")
def duplicate(source_repo, dst_repo, repo_type, private, overwrite, auto_dir, remove_tag, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)):
hf_token = oauth_token.token
api = HfApi(token=hf_token)
try:
if not repo_type in REPO_TYPES:
raise ValueError("need to select valid repo type")
_ = whoami(oauth_token.token)
# ^ this will throw if token is invalid
except Exception as e:
raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""")
try:
if re.fullmatch(REGEX_HF_REPO, source_repo): target = ""
else:
source_repo, target = re.findall(r'^(?:http.+\.co/)?(?:datasets)?(?:spaces)?([\w_\-\.]+/[\w_\-\.]+)/?(?:blob/main/)?(?:resolve/main/)?(.+)?$', source_repo)[0]
target = urllib.parse.unquote(target.removesuffix("/"))
if re.fullmatch(REGEX_HF_REPO, dst_repo): subfolder = ""
else:
dst_repo, subfolder = re.findall(r'^([\w_\-\.]+/[\w_\-\.]+)/?(.+)?$', dst_repo)[0]
subfolder = subfolder.removesuffix("/")
if auto_dir: subfolder = source_repo
if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): raise gr.Error(f"Repo already exists {dst_repo}")
if overwrite or subfolder:
temp_dir = tempfile.mkdtemp()
api.create_repo(repo_id=dst_repo, repo_type=repo_type, private=private, exist_ok=True, token=hf_token)
for path in api.list_repo_files(repo_id=source_repo, repo_type=repo_type, token=hf_token):
if target and target not in path: continue
file = hf_hub_download(repo_id=source_repo, filename=path, repo_type=repo_type, local_dir=temp_dir, token=hf_token)
if not Path(file).exists(): continue
if Path(file).is_dir(): # unused for now
api.upload_folder(repo_id=dst_repo, folder_path=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token)
elif Path(file).is_file():
api.upload_file(repo_id=dst_repo, path_or_fileobj=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token)
if Path(file).exists(): Path(file).unlink()
if repo_type == "dataset": repo_url = f"https://huggingface.co/datasets/{dst_repo}"
elif repo_type == "space": repo_url = f"https://huggingface.co/spaces/{dst_repo}"
else: repo_url = f"https://huggingface.co/{dst_repo}"
shutil.rmtree(temp_dir)
else:
r = requests.post(
f"{ENDPOINT}/api/{repo_type}s/{source_repo}/duplicate",
headers=build_hf_headers(token=oauth_token.token),
json={"repository": dst_repo, "private": private},
)
hf_raise_for_status(r)
repo_url = r.json().get("url")
if remove_tag: remove_repo_tags(dst_repo, ["not-for-all-audiences"], repo_type, hf_token)
return (
f'Find your repo <a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">here</a>',
"sp.jpg",
)
except Exception as e:
print(e)
raise gr.Error(f"Error occured: {e}")
def parse_repos(s):
repo_pattern = r'[^\w_\-\.]?([\w_\-\.]+/[\w_\-\.]+)[^\w_\-\.]?'
try:
s = re.sub("https?://[\\w/:%#\\$&\\?\\(\\)~\\.=\\+\\-]+", "", s)
repos = re.findall(repo_pattern, s)
return list(repos)
except Exception:
return []
def duplicate_m2o(source_repos_str, dst_repo, repo_type, private, overwrite, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)):
hf_token = oauth_token.token
api = HfApi(token=hf_token)
try:
if not repo_type in REPO_TYPES:
raise ValueError("need to select valid repo type")
_ = whoami(oauth_token.token)
# ^ this will throw if token is invalid
except Exception as e:
raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""")
try:
if re.fullmatch(REGEX_HF_REPO, dst_repo): subfolder_prefix = ""
else:
dst_repo, subfolder_prefix = re.findall(r'^([\w_\-\.]+/[\w_\-\.]+)/?(.+)?$', dst_repo)[0]
subfolder_prefix = subfolder.removesuffix("/")
if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): raise gr.Error(f"Repo already exists {dst_repo}")
source_repos = parse_repos(source_repos_str)
for source_repo in source_repos:
if re.fullmatch(REGEX_HF_REPO, source_repo): target = ""
else:
source_repo, target = re.findall(r'^(?:http.+\.co/)?(?:datasets)?(?:spaces)?([\w_\-\.]+/[\w_\-\.]+)/?(?:blob/main/)?(?:resolve/main/)?(.+)?$', source_repo)[0]
target = urllib.parse.unquote(target.removesuffix("/"))
subfolder = subfolder_prefix + "/" + source_repo if subfolder_prefix else source_repo
temp_dir = tempfile.mkdtemp()
api.create_repo(repo_id=dst_repo, repo_type=repo_type, private=private, exist_ok=True, token=hf_token)
for path in api.list_repo_files(repo_id=source_repo, repo_type=repo_type, token=hf_token):
if target and target not in path: continue
file = hf_hub_download(repo_id=source_repo, filename=path, repo_type=repo_type, local_dir=temp_dir, token=hf_token)
if not Path(file).exists(): continue
if Path(file).is_dir(): # unused for now
api.upload_folder(repo_id=dst_repo, folder_path=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token)
elif Path(file).is_file():
api.upload_file(repo_id=dst_repo, path_or_fileobj=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token)
if Path(file).exists(): Path(file).unlink()
if repo_type == "dataset": repo_url = f"https://huggingface.co/datasets/{dst_repo}"
elif repo_type == "space": repo_url = f"https://huggingface.co/spaces/{dst_repo}"
else: repo_url = f"https://huggingface.co/{dst_repo}"
shutil.rmtree(temp_dir)
return (
f'Find your repo <a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">here</a>',
"sp.jpg",
)
except Exception as e:
print(e)
raise gr.Error(f"Error occured: {e}")
def duplicate_m2m(source_repos_str, hf_user, repo_type, private, overwrite, remove_tag, repo_prefix, repo_suffix, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)):
hf_token = oauth_token.token
api = HfApi(token=hf_token)
try:
if not repo_type in REPO_TYPES:
raise ValueError("need to select valid repo type")
_ = whoami(oauth_token.token)
# ^ this will throw if token is invalid
except Exception as e:
raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""")
try:
source_repos = parse_repos(source_repos_str)
repo_url_result = 'Find your repo '
for source_repo in source_repos:
if not re.fullmatch(REGEX_HF_REPO, source_repo) or not api.repo_exists(repo_id=source_repo, repo_type=repo_type, token=hf_token): continue
dst_repo = hf_user + "/" + repo_prefix + source_repo.split("/")[-1] + repo_suffix
if not re.fullmatch(REGEX_HF_REPO, dst_repo): continue
if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token):
gr.Info(f"Repo already exists {dst_repo}")
continue
r = requests.post(
f"{ENDPOINT}/api/{repo_type}s/{source_repo}/duplicate",
headers=build_hf_headers(token=oauth_token.token),
json={"repository": dst_repo, "private": private},
)
hf_raise_for_status(r)
repo_url = r.json().get("url")
repo_url_result += f'<a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">{dst_repo}</a><br>\n'
if remove_tag: remove_repo_tags(dst_repo, ["not-for-all-audiences"], repo_type, hf_token)
return (
repo_url_result,
"sp.jpg",
)
except Exception as e:
print(e)
raise gr.Error(f"Error occured: {e}")
def add_repo_text(repo_id: str, source_repos: str):
return source_repos + "\n" + repo_id if source_repos else repo_id
def swap_visibilty(profile: gr.OAuthProfile | None):
return gr.update(elem_classes=["main_ui_logged_in"]) if profile else gr.update(elem_classes=["main_ui_logged_out"])
css = '''
.main_ui_logged_out{opacity: 0.3; pointer-events: none}
.title {text-align: center; align-items: center}
'''
with gr.Blocks(css=css) as demo:
gr.LoginButton()
with gr.Column(elem_classes="main_ui_logged_out") as main_ui:
gr.Markdown("# Duplicate your repo!", elem_classes="title")
gr.Markdown("Duplicate a Hugging Face repository! This Space is a an experimental demo.")
with gr.Tab("One to One"):
with gr.Row():
with gr.Column():
search = HuggingfaceHubSearch(
label="source_repo",
placeholder="Source repository (e.g. osanseviero/src)",
search_type=["model", "dataset", "space"],
sumbit_on_select=False,
)
with gr.Group():
dst_repo = gr.Textbox(label="dst_repo", placeholder="Destination repository (e.g. osanseviero/dst)", value=HF_REPO)
repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model")
with gr.Row():
is_private = gr.Checkbox(label="Make new repo private?", value=True)
is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=True)
is_subdir = gr.Checkbox(label="Create subdirectories automatically?", value=True)
is_remtag = gr.Checkbox(label="Remove NFAA tag?", value=True)
with gr.Row():
submit_button = gr.Button("Submit", variant="primary")
clear_button = gr.Button("Clear", variant="secondary")
with gr.Column():
output_md = gr.Markdown(label="output")
output_image = gr.Image(show_label=False)
with gr.Tab("Multi to One"):
with gr.Row():
with gr.Column():
m2o_search = HuggingfaceHubSearch(
label="source_repo",
placeholder="Source repository (e.g. osanseviero/src)",
search_type=["model", "dataset", "space"],
sumbit_on_select=True,
)
m2o_source_repos = gr.Textbox(label="source_repos", placeholder="Source repositories (e.g. osanseviero/src)\n...", value="", lines=10)
with gr.Group():
m2o_dst_repo = gr.Textbox(label="dst_repo", placeholder="Destination repository (e.g. osanseviero/dst)", value=HF_REPO)
m2o_repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model")
with gr.Row():
m2o_is_private = gr.Checkbox(label="Make new repo private?", value=True)
m2o_is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=True)
with gr.Row():
m2o_submit_button = gr.Button("Submit", variant="primary")
m2o_clear_button = gr.Button("Clear", variant="secondary")
with gr.Column():
m2o_output_md = gr.Markdown(label="output")
m2o_output_image = gr.Image(show_label=False)
with gr.Tab("Multi to Multi"):
with gr.Row():
with gr.Column():
m2m_search = HuggingfaceHubSearch(
label="source_repo",
placeholder="Source repository (e.g. osanseviero/src)",
search_type=["model", "dataset", "space"],
sumbit_on_select=True,
)
m2m_source_repos = gr.Textbox(label="source_repos", placeholder="Source repositories (e.g. osanseviero/src)\n...", value="", lines=10)
with gr.Group():
with gr.Row():
m2m_user = gr.Textbox(label="hf_user", placeholder="Your HF username", value=HF_USER)
m2m_prefix = gr.Textbox(label="repo_prefix", value=HF_REPO_PREFIX)
m2m_suffix = gr.Textbox(label="repo_suffix", value=HF_REPO_SUFFIX)
m2m_repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model")
with gr.Row():
m2m_is_private = gr.Checkbox(label="Make new repo private?", value=True)
m2m_is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=False)
m2m_is_remtag = gr.Checkbox(label="Remove NFAA tag?", value=True)
with gr.Row():
m2m_submit_button = gr.Button("Submit", variant="primary")
m2m_clear_button = gr.Button("Clear", variant="secondary")
with gr.Column():
m2m_output_md = gr.Markdown(label="output")
m2m_output_image = gr.Image(show_label=False)
demo.load(fn=swap_visibilty, outputs=main_ui)
submit_button.click(duplicate, [search, dst_repo, repo_type, is_private, is_overwrite, is_subdir, is_remtag], [output_md, output_image])
clear_button.click(lambda: ("", HF_REPO, "model", True, True, True, True), None, [search, dst_repo, repo_type, is_private, is_overwrite, is_subdir, is_remtag], queue=False)
m2o_search.submit(add_repo_text, [m2o_search, m2o_source_repos], [m2o_source_repos], queue=False)
m2o_submit_button.click(duplicate_m2o, [m2o_source_repos, m2o_dst_repo, m2o_repo_type, m2o_is_private, m2o_is_overwrite], [m2o_output_md, m2o_output_image])
m2o_clear_button.click(lambda: ("", HF_REPO, "model", True, True, ""), None,
[m2o_search, m2o_dst_repo, m2o_repo_type, m2o_is_private, m2o_is_overwrite, m2o_source_repos], queue=False)
m2m_search.submit(add_repo_text, [m2m_search, m2m_source_repos], [m2m_source_repos], queue=False)
m2m_submit_button.click(duplicate_m2m, [m2m_source_repos, m2m_user, m2m_repo_type, m2m_is_private, m2m_is_overwrite, m2m_is_remtag, m2m_prefix, m2m_suffix],
[m2m_output_md, m2m_output_image])
m2m_clear_button.click(lambda: ("", HF_USER, "model", True, False, True, "", HF_REPO_PREFIX, HF_REPO_SUFFIX), None,
[m2m_search, m2m_user, m2m_repo_type, m2m_is_private, m2m_is_overwrite, m2m_is_remtag, m2m_source_repos, m2m_prefix, m2m_suffix], queue=False)
demo.queue()
demo.launch()