sone-latest / app.py
yangtb24's picture
Update app.py
ba8e699 verified
raw
history blame
11 kB
import os
import time
import logging
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask, request, jsonify, Response, stream_with_context
# 配置日志记录
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
API_ENDPOINT = "https://api.siliconflow.cn/v1/user/info"
TEST_MODEL_ENDPOINT = "https://api.siliconflow.cn/v1/chat/completions"
MODELS_ENDPOINT = "https://api.siliconflow.cn/v1/models"
app = Flask(__name__)
# 全局变量,用于存储模型列表和免费模型列表
all_models = []
free_models = []
def get_credit_summary(api_key):
"""
使用 API 密钥获取额度信息。
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
response = requests.get(API_ENDPOINT, headers=headers)
response.raise_for_status()
data = response.json().get("data", {})
total_balance = data.get("totalBalance", 0)
return {"total_balance": float(total_balance)}
except requests.exceptions.RequestException as e:
logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}")
return None
except (KeyError, TypeError) as e:
logging.error(f"解析额度信息失败,API Key:{api_key},错误信息:{e}")
return None
except ValueError as e:
logging.error(f"total_balance 无法转换为浮点数,API Key:{api_key},错误信息:{e}")
return None
def test_model_availability(api_key, model_name):
"""
测试指定的模型是否可用。
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
response = requests.post(TEST_MODEL_ENDPOINT,
headers=headers,
json={
"model": model_name,
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 10,
"stream": False
},
timeout=10)
# 检查是否是429错误
if response.status_code == 429:
return True
response.json() # 尝试解析 JSON 响应
return True
except requests.exceptions.RequestException as e:
logging.error(f"测试模型 {model_name} 可用性失败,API Key:{api_key},错误信息:{e}")
return False
except ValueError:
logging.error(f"测试模型 {model_name} 可用性失败,API Key:{api_key},响应不是有效的 JSON 格式")
return False
def load_keys():
"""
从环境变量中加载 keys,并根据额度和模型可用性进行分类,然后记录到日志中。
"""
keys_str = os.environ.get("KEYS")
test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it")
invalid_keys = []
free_keys = []
unverified_keys = []
valid_keys = []
if keys_str:
keys = [key.strip() for key in keys_str.split(',')]
logging.info(f"加载的 keys:{keys}")
for key in keys:
credit_summary = get_credit_summary(key)
if credit_summary is None:
invalid_keys.append(key)
else:
total_balance = credit_summary.get("total_balance", 0)
if total_balance <= 0:
free_keys.append(key)
else:
if test_model_availability(key, test_model):
valid_keys.append(key)
else:
unverified_keys.append(key)
logging.info(f"无效 KEY:{invalid_keys}")
logging.info(f"免费 KEY:{free_keys}")
logging.info(f"未实名 KEY:{unverified_keys}")
logging.info(f"有效 KEY:{valid_keys}")
# 更新全局的 key 列表
global invalid_keys_global, free_keys_global, unverified_keys_global, valid_keys_global
invalid_keys_global = invalid_keys
free_keys_global = free_keys
unverified_keys_global = unverified_keys
valid_keys_global = valid_keys
else:
logging.warning("环境变量 KEYS 未设置。")
def get_all_models(api_key):
"""
获取所有模型列表。
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
response = requests.get(MODELS_ENDPOINT, headers=headers, params={"sub_type": "chat"})
response.raise_for_status()
data = response.json()
# 确保 data 是字典且包含 'data' 键,'data' 对应的值是一个列表
if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list):
return [model.get("id") for model in data["data"] if isinstance(model, dict) and "id" in model]
else:
logging.error("获取模型列表失败:响应数据格式不正确")
return []
except requests.exceptions.RequestException as e:
logging.error(f"获取模型列表失败,API Key:{api_key},错误信息:{e}")
return []
except (KeyError, TypeError) as e:
logging.error(f"解析模型列表失败,API Key:{api_key},错误信息:{e}")
return []
def refresh_models():
"""
刷新模型列表和免费模型列表。
"""
global all_models, free_models
# 使用 valid_keys_global 中的第一个 key 获取完整模型列表
if valid_keys_global:
all_models = get_all_models(valid_keys_global[0])
else:
logging.warning("没有有效的key,无法获取完整模型列表。")
all_models = []
# 使用 free_keys_global 中的第一个 key 获取免费模型列表
if free_keys_global:
free_models = get_all_models(free_keys_global[0])
else:
logging.warning("没有免费的key,无法获取免费模型列表。")
free_models = []
logging.info(f"所有模型列表:{all_models}")
logging.info(f"免费模型列表:{free_models}")
def determine_request_type(model_name):
"""
根据用户请求的模型判断请求类型。
"""
if model_name in free_models:
return "free"
elif model_name in all_models:
return "paid"
else:
return "unknown"
def select_key(request_type):
"""
根据请求类型选择合适的 KEY。
"""
if request_type == "free":
# 免费请求:使用 2、3、4 类 KEY
available_keys = free_keys_global + unverified_keys_global + valid_keys_global
elif request_type == "paid":
# 付费请求:使用 3、4 类 KEY
available_keys = unverified_keys_global + valid_keys_global
else:
# 未知请求:使用所有 KEY
available_keys = free_keys_global + unverified_keys_global + valid_keys_global
if not available_keys:
return None
# 简单的轮询策略选择 KEY
key = available_keys[int(time.time() * 1000) % len(available_keys)]
return key
# 创建一个后台调度器
scheduler = BackgroundScheduler()
# 添加定时任务,每小时执行一次 load_keys 函数
scheduler.add_job(load_keys, 'interval', hours=1)
# 添加定时任务,每10分钟执行一次 refresh_models 函数
scheduler.add_job(refresh_models, 'interval', minutes=10)
@app.route('/')
def index():
"""
处理根路由的访问请求。
"""
return "<h1>Welcome to SiliconFlow</h1>"
@app.route('/check_tokens', methods=['POST'])
def check_tokens():
"""
处理前端发送的 Token 检测请求。
"""
tokens = request.json.get('tokens', [])
test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it")
results = []
for token in tokens:
credit_summary = get_credit_summary(token)
if credit_summary is None:
results.append({"token": token, "type": "无效 KEY", "balance": 0, "message": "无法获取额度信息"})
else:
total_balance = credit_summary.get("total_balance", 0)
if total_balance <= 0:
results.append({"token": token, "type": "免费 KEY", "balance": total_balance, "message": "额度不足"})
else:
if test_model_availability(token, test_model):
results.append({"token": token, "type": "有效 KEY", "balance": total_balance, "message": "可以使用指定模型"})
else:
results.append({"token": token, "type": "未实名 KEY", "balance": total_balance, "message": "无法使用指定模型"})
return jsonify(results)
@app.route('/handsome/v1/chat/completions', methods=['POST'])
def handsome_chat_completions():
"""
处理 /handsome/v1/chat/completions 路由的请求。
"""
data = request.get_json()
if not data or 'model' not in data:
return jsonify({"error": "Invalid request data"}), 400
model_name = data['model']
request_type = determine_request_type(model_name)
api_key = select_key(request_type)
if not api_key:
return jsonify({"error": "No available API key for this request type"}), 400
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# 转发请求到真正的 API
try:
response = requests.post(
TEST_MODEL_ENDPOINT,
headers=headers,
json=data,
stream=data.get("stream", False),
timeout=60
)
# 检查是否是429错误
if response.status_code == 429:
return jsonify(response.json()), 429
if data.get("stream", False):
return Response(stream_with_context(response.iter_content(chunk_size=1024)), content_type=response.headers['Content-Type'])
else:
response.raise_for_status()
return jsonify(response.json())
except requests.exceptions.RequestException as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
# 打印所有环境变量,方便调试
logging.info(f"环境变量:{os.environ}")
# 初始化全局的 key 列表
invalid_keys_global = []
free_keys_global = []
unverified_keys_global = []
valid_keys_global = []
# 启动调度器
scheduler.start()
# 手动触发一次 load_keys 任务
load_keys()
logging.info("首次加载 keys 已手动触发执行")
# 手动触发一次 refresh_models 任务
refresh_models()
logging.info("首次刷新模型列表已手动触发执行")
# 启动 Flask 应用,监听所有 IP 的 7860 端口(Hugging Face Space 默认端口)
app.run(debug=False, host='0.0.0.0', port=int(os.environ.get('PORT', 7860)))