Spaces:
Running
Running
import os | |
import time | |
import logging | |
import requests | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from flask import Flask, request, jsonify | |
# 配置日志记录 | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s') | |
API_ENDPOINT = "https://api.siliconflow.cn/v1/user/info" | |
TEST_MODEL_ENDPOINT = "https://api.siliconflow.cn/v1/chat/completions" | |
app = Flask(__name__) | |
def get_credit_summary(api_key): | |
""" | |
使用 API 密钥获取额度信息。 | |
""" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
try: | |
response = requests.get(API_ENDPOINT, headers=headers) | |
response.raise_for_status() | |
data = response.json().get("data", {}) | |
total_balance = data.get("totalBalance", 0) | |
# 将 total_balance 转换为浮点数 | |
return {"total_balance": float(total_balance)} | |
except requests.exceptions.RequestException as e: | |
logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}") | |
return None | |
except (KeyError, TypeError) as e: | |
logging.error(f"解析额度信息失败,API Key:{api_key},错误信息:{e}") | |
return None | |
except ValueError as e: | |
logging.error(f"total_balance 无法转换为浮点数,API Key:{api_key},错误信息:{e}") | |
return None | |
def test_model_availability(api_key, model_name): | |
""" | |
测试指定的模型是否可用。 | |
""" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
try: | |
response = requests.post(TEST_MODEL_ENDPOINT, | |
headers=headers, | |
json={ | |
"model": model_name, | |
"messages": [{"role": "user", "content": "hi"}], | |
"max_tokens": 10, | |
"stream": False | |
}, | |
timeout=10) | |
response.raise_for_status() | |
# 检查是否是429错误 | |
if response.status_code == 429: | |
return True | |
response.json() # 尝试解析 JSON 响应 | |
return True | |
except requests.exceptions.RequestException as e: | |
logging.error(f"测试模型 {model_name} 可用性失败,API Key:{api_key},错误信息:{e}") | |
return False | |
except ValueError: | |
logging.error(f"测试模型 {model_name} 可用性失败,API Key:{api_key},响应不是有效的 JSON 格式") | |
return False | |
def load_keys(): | |
""" | |
从环境变量中加载 keys,并根据额度和模型可用性进行分类,然后记录到日志中。 | |
""" | |
keys_str = os.environ.get("KEYS") | |
test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it") | |
invalid_keys = [] | |
free_keys = [] | |
unverified_keys = [] | |
valid_keys = [] | |
if keys_str: | |
keys = [key.strip() for key in keys_str.split(',')] | |
logging.info(f"加载的 keys:{keys}") | |
for key in keys: | |
credit_summary = get_credit_summary(key) | |
if credit_summary is None: | |
invalid_keys.append(key) | |
else: | |
total_balance = credit_summary.get("total_balance", 0) | |
if total_balance <= 0: | |
free_keys.append(key) | |
else: | |
if test_model_availability(key, test_model): | |
valid_keys.append(key) | |
else: | |
unverified_keys.append(key) | |
logging.info(f"无效 KEY:{invalid_keys}") | |
logging.info(f"免费 KEY:{free_keys}") | |
logging.info(f"未实名 KEY:{unverified_keys}") | |
logging.info(f"有效 KEY:{valid_keys}") | |
else: | |
logging.warning("环境变量 KEYS 未设置。") | |
# 创建一个后台调度器 | |
scheduler = BackgroundScheduler() | |
# 添加定时任务,每小时执行一次 load_keys 函数 | |
scheduler.add_job(load_keys, 'interval', hours=1) | |
def index(): | |
""" | |
处理根路由的访问请求。 | |
""" | |
return "<h1>Welcome to SiliconFlow</h1>" | |
def check_tokens(): | |
""" | |
处理前端发送的 Token 检测请求。 | |
""" | |
tokens = request.json.get('tokens', []) | |
test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it") | |
results = [] | |
for token in tokens: | |
credit_summary = get_credit_summary(token) | |
if credit_summary is None: | |
results.append({"token": token, "type": "无效 KEY", "balance": 0, "message": "无法获取额度信息"}) | |
else: | |
total_balance = credit_summary.get("total_balance", 0) | |
if total_balance <= 0: | |
results.append({"token": token, "type": "免费 KEY", "balance": total_balance, "message": "额度不足"}) | |
else: | |
if test_model_availability(token, test_model): | |
results.append({"token": token, "type": "有效 KEY", "balance": total_balance, "message": "可以使用指定模型"}) | |
else: | |
results.append({"token": token, "type": "未实名 KEY", "balance": total_balance, "message": "无法使用指定模型"}) | |
return jsonify(results) | |
if __name__ == '__main__': | |
# 打印所有环境变量,方便调试 | |
logging.info(f"环境变量:{os.environ}") | |
# 启动调度器 | |
scheduler.start() | |
# 手动触发一次 load_keys 任务 | |
load_keys() | |
logging.info("首次加载 keys 已手动触发执行") | |
# 启动 Flask 应用,监听所有 IP 的 7860 端口(Hugging Face Space 默认端口) | |
app.run(debug=False, host='0.0.0.0', port=int(os.environ.get('PORT', 7860))) | |