Spaces:
Running
Running
import os | |
import json | |
import time | |
import requests | |
import schedule | |
import gradio as gr | |
import pandas as pd | |
from tqdm import tqdm | |
from functools import partial | |
from bs4 import BeautifulSoup | |
from urllib.parse import urlparse | |
TIMEOUT = 15 | |
DELAY = 1 | |
def get_studios(username: str): | |
# 请求负载 | |
payload = { | |
"PageNumber": 1, | |
"PageSize": 1000, | |
"Name": "", | |
"SortBy": "gmt_modified", | |
"Order": "desc", | |
} | |
try: | |
# 发送PUT请求 | |
response = requests.put( | |
f"https://www.modelscope.cn/api/v1/studios/{username}/list", | |
data=json.dumps(payload), | |
headers={ | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" | |
}, | |
timeout=TIMEOUT, | |
) | |
# 检查请求是否成功 | |
response.raise_for_status() | |
# 解析JSON响应 | |
spaces: list = response.json()["Data"]["Studios"] | |
if spaces: | |
studios = [] | |
for space in spaces: | |
studios.append( | |
f"https://www.modelscope.cn/api/v1/studio/{username}/{space['Name']}/start_expired" | |
) | |
return studios | |
except requests.exceptions.HTTPError as errh: | |
print(f"HTTP错误发生: {errh}") | |
except requests.exceptions.ConnectionError as errc: | |
print(f"连接错误发生: {errc}") | |
except requests.exceptions.Timeout as errt: | |
print(f"请求超时: {errt}, retrying...") | |
time.sleep(DELAY) | |
return get_studios(username) | |
except requests.exceptions.RequestException as err: | |
print(f"请求发生错误: {err}") | |
return [] | |
def get_spaces(username: str): | |
try: | |
# 发送GET请求 | |
response = requests.get( | |
"https://huggingface.co/spaces-json", | |
params={"sort": "trending", "search": username}, | |
headers={ | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537" | |
}, | |
timeout=TIMEOUT, | |
) | |
# 检查请求是否成功 | |
response.raise_for_status() | |
# 解析JSON响应 | |
spaces: list = response.json()["spaces"] | |
studios = [] | |
for space in spaces: | |
if space["author"] == username: | |
studios.append(f"https://huggingface.co/spaces/{space['id']}") | |
return studios | |
except requests.exceptions.HTTPError as errh: | |
print(f"HTTP错误发生: {errh}") | |
except requests.exceptions.Timeout as errt: | |
print(f"请求超时: {errt}, retrying...") | |
time.sleep(DELAY) | |
return get_spaces(username) | |
except requests.exceptions.RequestException as err: | |
print(f"请求发生错误: {err}") | |
return [] | |
def activate_space(url: str): | |
success = "success" | |
try: | |
# 发送GET请求获取页面内容 | |
response = requests.get(url, timeout=TIMEOUT) | |
response.raise_for_status() # 确保请求成功 | |
web_page_content = response.text | |
# 使用BeautifulSoup解析页面 | |
soup = BeautifulSoup(web_page_content, "html.parser") | |
# 寻找class为btn-lg的按钮 | |
button = soup.find("button", class_="btn-lg") | |
if not button: | |
# print("未找到class为'btn-lg'的按钮。") | |
return success | |
# 获取表单的action属性和method属性 | |
form = button.find_parent("form") | |
if not form: | |
return "按钮未在表单内找到。" | |
form_action = form.get("action") | |
form_method = form.get("method", "GET").upper() # 默认为GET | |
host = "https://" + urlparse(url).hostname | |
if not host in form_action: | |
form_action = host + form_action | |
# 提取表单数据 | |
form_data = { | |
input_tag.get("name"): input_tag.get("value") | |
for input_tag in form.find_all("input") | |
if input_tag.has_attr("name") | |
} | |
# 发送请求提交表单 | |
if form_method == "POST": | |
response = requests.post(form_action, data=form_data) | |
else: | |
response = requests.get(form_action, params=form_data) | |
# 打印响应内容 | |
print("提交表单后的页面内容:") | |
print(response.text) | |
except requests.exceptions.Timeout as errt: | |
print(f"请求超时: {errt}, retrying...") | |
time.sleep(DELAY) | |
return activate_space(url) | |
except requests.RequestException as e: | |
success = f"{e}" | |
except Exception as e: | |
success = f"{e}" | |
return success | |
def activate(hf_users: str, ms_users: str): | |
if not hf_users: | |
hf_users = os.getenv("hf_users") | |
if not ms_users: | |
ms_users = os.getenv("ms_users") | |
hf_usernames = hf_users.split(";") | |
ms_usernames = ms_users.split(";") | |
spaces = [] | |
for user in tqdm(hf_usernames, desc="Collecting spaces..."): | |
username = user.strip() | |
if username: | |
spaces += get_spaces(username) | |
time.sleep(DELAY) | |
for user in tqdm(ms_usernames, desc="Collecting studios..."): | |
username = user.strip() | |
if username: | |
spaces += get_studios(username) | |
time.sleep(DELAY) | |
output = [] | |
for space in tqdm(spaces, desc="Activating spaces..."): | |
output.append({"space": space.split("/")[-1], "status": activate_space(space)}) | |
time.sleep(DELAY) | |
print("Activation complete!") | |
return pd.DataFrame(output) | |
def monitor(hf_users: str, ms_users: str, period=3): | |
if schedule.get_jobs(): | |
return | |
if not hf_users: | |
hf_users = os.getenv("hf_users") | |
if not ms_users: | |
ms_users = os.getenv("ms_users") | |
print(f"监控开启中...每 {period} 小时触发") | |
fixed_activate = partial(activate, hf_users=hf_users, ms_users=ms_users) | |
schedule.every(period).hours.do(fixed_activate) | |
while True: | |
schedule.run_pending() | |
time.sleep(DELAY) | |
def list_tasks(): | |
jobs = schedule.get_jobs() | |
if jobs: | |
return f"{jobs[0]}" | |
return "None" | |
with gr.Blocks() as iface: | |
gr.Interface( | |
title="Start keeping all spaces active periodically", | |
fn=monitor, | |
inputs=[ | |
gr.Textbox( | |
label="HuggingFace", | |
placeholder="Usernames joint by ;", | |
), | |
gr.Textbox( | |
label="ModelScope", | |
placeholder="Usernames joint by ;", | |
), | |
], | |
outputs=None, | |
allow_flagging=False, | |
) | |
gr.Interface( | |
title="See current task status", | |
fn=list_tasks, | |
inputs=None, | |
outputs=gr.Textbox(label="Current task details"), | |
allow_flagging=False, | |
) | |
gr.Interface( | |
title="Test activation for all spaces once", | |
fn=activate, | |
inputs=[ | |
gr.Textbox( | |
label="HuggingFace", | |
placeholder="Usernames joint by ;", | |
), | |
gr.Textbox( | |
label="ModelScope", | |
placeholder="Usernames joint by ;", | |
), | |
], | |
outputs=gr.Dataframe(label="Activated spaces"), | |
allow_flagging=False, | |
) | |
iface.launch() | |