Spaces:
Running
Running
File size: 5,323 Bytes
a1e2c7e 144adde 34b46b3 97c794b 34b46b3 db6a69b 6de2043 db6a69b 34b46b3 9553b85 144adde 9553b85 3f10236 14a62cf db6a69b 14a62cf db6a69b 14a62cf 34b46b3 ceeca07 6de2043 34b46b3 6de2043 34b46b3 ceeca07 34b46b3 7167921 34b46b3 9553b85 34b46b3 08ee561 34b46b3 144adde 08ee561 144adde 7167921 144adde 7167921 144adde 08ee561 144adde 08ee561 144adde 08ee561 144adde a1e2c7e 08ee561 34b46b3 08ee561 34b46b3 144adde 08ee561 144adde 08ee561 bc1a27d 144adde bc1a27d 34b46b3 08ee561 97c794b 08ee561 db6a69b 08ee561 97c794b 34b46b3 08ee561 34b46b3 db6a69b 34b46b3 869524e 97c794b 08ee561 |
|
import os
import json
import time
import requests
import schedule
import threading
import gradio as gr
from tqdm import tqdm
from zoneinfo import ZoneInfo
from datetime import datetime
from huggingface_hub import HfApi
from tzlocal import get_localzone
DELAY = 1
TIMEOUT = 15
DOMAIN = "https://www.modelscope.cn"
HEADER = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537"
}
def fix_datetime(naive_time: datetime, target_tz=ZoneInfo("Asia/Shanghai")):
if not naive_time:
return None
local_tz = get_localzone()
aware_local = naive_time.replace(tzinfo=local_tz)
return aware_local.astimezone(target_tz).strftime("%Y-%m-%d %H:%M:%S")
def get_spaces(username: str):
try:
studios = []
spaces = HfApi().list_spaces(author=username)
for space in spaces:
space_id = space.id.replace("/", "-").replace("_", "-").lower()
if space.sdk == "gradio":
studios.append(f"https://{space_id}.hf.space")
else:
studios.append(f"https://{space_id}.static.hf.space")
return studios
except Exception as e:
print(f"An error occurred in the request: {e}")
return []
def activate_space(url: str):
try:
response = requests.get(url, headers=HEADER, timeout=TIMEOUT)
response.raise_for_status()
except Exception as e:
print(e)
def get_studios(username: str):
try:
response = requests.put(
f"{DOMAIN}/api/v1/studios/{username}/list",
data=json.dumps(
{
"PageNumber": 1,
"PageSize": 1000,
"Name": "",
"SortBy": "gmt_modified",
"Order": "desc",
}
),
headers=HEADER,
timeout=TIMEOUT,
)
response.raise_for_status()
spaces: list = response.json()["Data"]["Studios"]
if spaces:
studios = []
for space in spaces:
repo = f"{username}/{space['Name']}"
if (
requests.get(
f"{DOMAIN}/api/v1/studio/{repo}/status",
headers=HEADER,
timeout=TIMEOUT,
).json()["Data"]["Status"]
== "Expired"
):
studios.append(repo)
return studios
except requests.exceptions.Timeout as e:
print(f"Timeout: {e}, retrying...")
time.sleep(DELAY)
return get_studios(username)
except Exception as e:
print(f"Requesting error: {e}")
return []
def activate_studio(repo: str, holding_delay=5):
repo_page = f"{DOMAIN}/studios/{repo}"
status_api = f"{DOMAIN}/api/v1/studio/{repo}/status"
start_expired_api = f"{DOMAIN}/api/v1/studio/{repo}/start_expired"
try:
response = requests.put(start_expired_api, headers=HEADER, timeout=TIMEOUT)
response.raise_for_status()
while (
requests.get(status_api, headers=HEADER, timeout=TIMEOUT).json()["Data"][
"Status"
]
!= "Running"
):
requests.get(repo_page, headers=HEADER, timeout=TIMEOUT)
time.sleep(holding_delay)
except requests.exceptions.Timeout as e:
print(f"Failed to activate {repo}: {e}, retrying...")
activate_studio(repo)
except Exception as e:
print(e)
def activate(users=os.getenv("users")):
spaces = []
usernames = users.split(";")
for user in tqdm(usernames, desc="Collecting spaces"):
username = user.strip()
if username:
spaces += get_spaces(username)
time.sleep(DELAY)
for space in tqdm(spaces, desc="Activating spaces"):
activate_space(space)
time.sleep(DELAY)
studios = []
for user in tqdm(usernames, desc="Collecting studios"):
username = user.strip()
if username:
studios += get_studios(username)
time.sleep(DELAY)
for studio in tqdm(studios, desc="Activating studios"):
threading.Thread(target=activate_studio, args=(studio,), daemon=True).start()
time.sleep(DELAY)
print("\n".join(spaces + studios) + "\nActivation complete!")
def run_schedule():
while True:
schedule.run_pending()
time.sleep(DELAY)
def monitor(period=os.getenv("period")):
activate()
print(f"Monitor is on and triggered every {period}h...")
schedule.every(int(period)).hours.do(activate)
threading.Thread(target=run_schedule, daemon=True).start()
def tasklist():
jobs = schedule.get_jobs()
for job in jobs:
last_run = fix_datetime(job.last_run)
if not last_run:
last_run = "never"
next_run = fix_datetime(job.next_run)
return f"Every {job.interval}h do (last run: {last_run}, next run: {next_run})"
return "None"
if __name__ == "__main__":
monitor()
gr.Interface(
title="See current task status",
fn=tasklist,
inputs=None,
outputs=gr.Textbox(label="Current task details"),
flagging_mode="never",
).launch()
|