Spaces:
Running
Running
File size: 7,096 Bytes
e9222b3 2d254f3 e9222b3 a056865 e9222b3 eca06e7 2d254f3 2b65c98 e9222b3 163fe52 e9222b3 a056865 e9222b3 a056865 e9222b3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
import json
import time
import requests
import schedule
import gradio as gr
import pandas as pd
from tqdm import tqdm
from functools import partial
from bs4 import BeautifulSoup
from urllib.parse import urlparse
TIMEOUT = 15
DELAY = 1
def get_studios(username: str):
# 请求负载
payload = {
"PageNumber": 1,
"PageSize": 1000,
"Name": "",
"SortBy": "gmt_modified",
"Order": "desc",
}
try:
# 发送PUT请求
response = requests.put(
f"https://www.modelscope.cn/api/v1/studios/{username}/list",
data=json.dumps(payload),
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
},
timeout=TIMEOUT,
)
# 检查请求是否成功
response.raise_for_status()
# 解析JSON响应
spaces: list = response.json()["Data"]["Studios"]
if spaces:
studios = []
for space in spaces:
studios.append(
f"https://www.modelscope.cn/studios/{username}/{space['Name']}"
)
return studios
except requests.exceptions.HTTPError as errh:
print(f"HTTP错误发生: {errh}")
except requests.exceptions.ConnectionError as errc:
print(f"连接错误发生: {errc}")
except requests.exceptions.Timeout as errt:
print(f"请求超时: {errt}, retrying...")
time.sleep(DELAY)
return get_studios(username)
except requests.exceptions.RequestException as err:
print(f"请求发生错误: {err}")
return []
def get_spaces(username: str):
try:
# 发送GET请求
response = requests.get(
"https://huggingface.co/spaces-json",
params={"sort": "trending", "search": username},
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537"
},
timeout=TIMEOUT,
)
# 检查请求是否成功
response.raise_for_status()
# 解析JSON响应
spaces: list = response.json()["spaces"]
studios = []
for space in spaces:
if space["author"] == username:
studios.append(f"https://huggingface.co/spaces/{space['id']}")
return studios
except requests.exceptions.HTTPError as errh:
print(f"HTTP错误发生: {errh}")
except requests.exceptions.Timeout as errt:
print(f"请求超时: {errt}, retrying...")
time.sleep(DELAY)
return get_spaces(username)
except requests.exceptions.RequestException as err:
print(f"请求发生错误: {err}")
return []
def activate_space(url: str):
success = "success"
try:
# 发送GET请求获取页面内容
response = requests.get(url, timeout=TIMEOUT)
response.raise_for_status() # 确保请求成功
web_page_content = response.text
# 使用BeautifulSoup解析页面
soup = BeautifulSoup(web_page_content, "html.parser")
# 寻找class为btn-lg的按钮
button = soup.find("button", class_="btn-lg")
if not button:
# print("未找到class为'btn-lg'的按钮。")
return success
# 获取表单的action属性和method属性
form = button.find_parent("form")
if not form:
return "按钮未在表单内找到。"
form_action = form.get("action")
form_method = form.get("method", "GET").upper() # 默认为GET
host = "https://" + urlparse(url).hostname
if not host in form_action:
form_action = host + form_action
# 提取表单数据
form_data = {
input_tag.get("name"): input_tag.get("value")
for input_tag in form.find_all("input")
if input_tag.has_attr("name")
}
# 发送请求提交表单
if form_method == "POST":
response = requests.post(form_action, data=form_data)
else:
response = requests.get(form_action, params=form_data)
# 打印响应内容
print("提交表单后的页面内容:")
print(response.text)
except requests.exceptions.Timeout as errt:
print(f"请求超时: {errt}, retrying...")
time.sleep(DELAY)
return activate_space(url)
except requests.RequestException as e:
success = f"{e}"
except Exception as e:
success = f"{e}"
return success
def activate(hf_users: str, ms_users: str):
hf_usernames = hf_users.split(";")
ms_usernames = ms_users.split(";")
spaces = []
for user in tqdm(hf_usernames, desc="Collecting spaces..."):
username = user.strip()
if username:
spaces += get_spaces(username)
time.sleep(DELAY)
for user in tqdm(ms_usernames, desc="Collecting studios..."):
username = user.strip()
if username:
spaces += get_studios(username)
time.sleep(DELAY)
output = []
for space in tqdm(spaces, desc="Activating spaces..."):
output.append({"space": space, "status": activate_space(space)})
time.sleep(DELAY)
print("Activation complete!")
return pd.DataFrame(output)
def monitor(hf_users: str, ms_users: str, period=3):
if schedule.get_jobs():
return
print(f"监控开启中...每 {period} 小时触发")
fixed_activate = partial(activate, hf_users=hf_users, ms_users=ms_users)
schedule.every(period).hours.do(fixed_activate)
while True:
schedule.run_pending()
time.sleep(DELAY)
def list_tasks():
jobs = schedule.get_jobs()
if jobs:
return f"{jobs}".replace("[", "").replace("]", "")
return "None"
with gr.Blocks() as iface:
gr.Interface(
title="Start keeping all spaces active periodically",
fn=monitor,
inputs=[
gr.Textbox(label="HuggingFace", value="monet-joe;MuGeminorum;ccmusic-database", placeholder="Usernames joint by ;"),
gr.Textbox(label="ModelScope", value="monetjoe;MuGeminorum;ccmusic", placeholder="Usernames joint by ;"),
],
outputs=None,
allow_flagging=False,
)
gr.Interface(
title="See current task status",
fn=list_tasks,
inputs=None,
outputs=gr.Textbox(label="Current task details"),
allow_flagging=False,
)
gr.Interface(
title="Test activation for all spaces once",
fn=activate,
inputs=[
gr.Textbox(label="HuggingFace", value="monet-joe;MuGeminorum;ccmusic-database", placeholder="Usernames joint by ;"),
gr.Textbox(label="ModelScope", value="monetjoe;MuGeminorum;ccmusic", placeholder="Usernames joint by ;"),
],
outputs=gr.Dataframe(label="Activated spaces"),
allow_flagging=False,
)
iface.launch()
|