admin
fix pop monitors
0e56268
raw
history blame
8.08 kB
import os
import re
import json
import time
import random
import string
import requests
import schedule
import gradio as gr
import pandas as pd
from tqdm import tqdm
from functools import partial
from datetime import datetime, timedelta
TIMEOUT = 15
DELAY = 1
def start_monitor(url: str):
payload = {
"data": ["", ""],
"event_data": None, # 使用None来表示null
"fn_index": 0,
"trigger_id": 11,
"session_hash": "".join(
random.choice(string.ascii_lowercase) for _ in range(11)
),
}
response = requests.post(f"{url}/queue/join?", json=payload)
# 检查请求是否成功
if response.status_code == 200:
return "monitoring"
return "running"
def add_six_hours(match):
datetime_str = match.group(0)
dt = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
dt_plus_six = dt + timedelta(hours=6)
return dt_plus_six.strftime("%Y-%m-%d %H:%M:%S")
def fix_datetime(text: str):
datetime_pattern = r"\b\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\b"
return re.sub(datetime_pattern, add_six_hours, text)
def get_studios(username: str):
# 请求负载
payload = {
"PageNumber": 1,
"PageSize": 1000,
"Name": "",
"SortBy": "gmt_modified",
"Order": "desc",
}
try:
# 发送PUT请求
response = requests.put(
f"https://www.modelscope.cn/api/v1/studios/{username}/list",
data=json.dumps(payload),
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
},
timeout=TIMEOUT,
)
# 检查请求是否成功
response.raise_for_status()
# 解析JSON响应
spaces: list = response.json()["Data"]["Studios"]
if spaces:
studios = []
for space in spaces:
studios.append(
f"https://www.modelscope.cn/api/v1/studio/{username}/{space['Name']}/start_expired"
)
return studios
except requests.exceptions.Timeout as errt:
print(f"请求超时: {errt}, retrying...")
time.sleep(DELAY)
return get_studios(username)
except Exception as err:
print(f"请求发生错误: {err}")
return []
def get_spaces(username: str):
try:
# 发送GET请求
response = requests.get(
"https://huggingface.co/spaces-json",
params={"sort": "trending", "search": username},
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537"
},
timeout=TIMEOUT,
)
# 检查请求是否成功
response.raise_for_status()
# 解析JSON响应
spaces: list = response.json()["spaces"]
studios = []
for space in spaces:
if space["author"] == username:
studios.append(f"https://{space['id'].replace('/', '-')}.hf.space")
return studios
except requests.exceptions.Timeout as errt:
print(f"请求超时: {errt}, retrying...")
time.sleep(DELAY)
return get_spaces(username)
except Exception as err:
print(f"请求发生错误: {err}")
return []
def activate_space(url: str):
status = "running"
try:
if ".hf.space" in url:
response = requests.get(url, timeout=TIMEOUT)
response.raise_for_status()
if "-keep-spaces-active.hf.space" in url:
status = start_monitor(url)
else:
response = requests.put(url, timeout=TIMEOUT)
response.raise_for_status()
print("Expired studio found, restarting...")
while (
requests.get(
url.replace("/start_expired", "/status"),
timeout=TIMEOUT,
).json()["Data"]["Status"]
== "ExpiredCreating"
):
requests.get(
url.replace("/api/v1/studio/", "/studios/").replace(
"/start_expired", ""
),
timeout=TIMEOUT,
)
time.sleep(5)
except requests.exceptions.Timeout as e:
if ".hf.space" in url:
status = "restarting"
else:
print(f"Failed to activate {url} : {e}, retrying...")
return activate_space(url)
except requests.RequestException as e:
if (
"500 Server Error:" in f"{e}"
and response.json()["Message"] == "studio is not expired"
):
status = "running"
else:
status = f"{e}"
except Exception as e:
status = f"{e}"
return status
def activate(hf_users: str, ms_users: str):
if not hf_users:
hf_users = os.getenv("hf_users")
if not ms_users:
ms_users = hf_users
hf_usernames = hf_users.split(";")
ms_usernames = ms_users.split(";")
spaces = []
for user in tqdm(hf_usernames, desc="Collecting spaces..."):
username = user.strip()
if username:
spaces += get_spaces(username)
time.sleep(DELAY)
monitors, studios = [], []
for space in spaces:
if "keep-spaces-active" in space:
monitors.append(space)
else:
studios.append(space)
space = monitors + studios
for user in tqdm(ms_usernames, desc="Collecting studios..."):
username = user.strip()
if username:
spaces += get_studios(username)
time.sleep(DELAY)
output = []
for space in tqdm(spaces, desc="Activating spaces..."):
output.append(
{
"space": space.split("//")[-1].replace(
"www.modelscope.cn/api/v1/studio/", ""
),
"status": activate_space(space),
}
)
time.sleep(DELAY)
print("Activation complete!")
return pd.DataFrame(output)
def monitor(hf_users: str, ms_users: str, period=4):
if schedule.get_jobs():
return
if not hf_users:
hf_users = os.getenv("hf_users")
if not ms_users:
ms_users = hf_users
print(f"监控开启中...每 {period} 小时触发")
fixed_activate = partial(activate, hf_users=hf_users, ms_users=ms_users)
schedule.every(period).hours.do(fixed_activate)
while True:
schedule.run_pending()
time.sleep(DELAY)
def listasks():
jobs = schedule.get_jobs()
if jobs:
details = f"{jobs}".replace("[", "").replace("]", "")
return fix_datetime(
details.split("functools.")[0] + "(" + details.split(") (")[-1]
)
return "None"
with gr.Blocks() as demo:
gr.Interface(
title="Start keeping all spaces active periodically",
fn=monitor,
inputs=[
gr.Textbox(
label="HuggingFace",
placeholder="Usernames joint by ;",
),
gr.Textbox(
label="ModelScope",
placeholder="Usernames joint by ;",
),
],
outputs=None,
allow_flagging="never",
)
gr.Interface(
title="See current task status",
fn=listasks,
inputs=None,
outputs=gr.Textbox(label="Current task details"),
allow_flagging="never",
)
gr.Interface(
title="Test activation for all spaces once",
fn=activate,
inputs=[
gr.Textbox(
label="HuggingFace",
placeholder="Usernames joint by ;",
),
gr.Textbox(
label="ModelScope",
placeholder="Usernames joint by ;",
),
],
outputs=gr.Dataframe(label="Activated spaces"),
allow_flagging="never",
)
demo.launch()