import os
import time
import logging
import requests
import json
import concurrent.futures
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask, request, jsonify, Response, stream_with_context
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
API_ENDPOINT = "https://api.siliconflow.cn/v1/user/info"
TEST_MODEL_ENDPOINT = "https://api.siliconflow.cn/v1/chat/completions"
MODELS_ENDPOINT = "https://api.siliconflow.cn/v1/models"
app = Flask(__name__)
all_models = []
free_models = []
invalid_keys_global = []
free_keys_global = []
unverified_keys_global = []
valid_keys_global = []
executor = concurrent.futures.ThreadPoolExecutor(max_workers=20)
model_key_indices = {}
def get_credit_summary(api_key):
"""
使用 API 密钥获取额度信息。
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
response = requests.get(API_ENDPOINT, headers=headers)
response.raise_for_status()
data = response.json().get("data", {})
total_balance = data.get("totalBalance", 0)
return {"total_balance": float(total_balance)}
except requests.exceptions.RequestException as e:
logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}")
return None
except (KeyError, TypeError) as e:
logging.error(f"解析额度信息失败,API Key:{api_key},错误信息:{e}")
return None
except ValueError as e:
logging.error(f"total_balance 无法转换为浮点数,API Key:{api_key},错误信息:{e}")
return None
FREE_MODEL_TEST_KEY = "sk-bmjbjzleaqfgtqfzmcnsbagxrlohriadnxqrzfocbizaxukw"
def test_model_availability(api_key, model_name):
"""
测试指定的模型是否可用。
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
response = requests.post(TEST_MODEL_ENDPOINT,
headers=headers,
json={
"model": model_name,
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 5,
"stream": False
},
timeout=10)
if response.status_code == 429 or response.status_code == 200 :
return True
else:
return False
except requests.exceptions.RequestException as e:
logging.error(f"测试模型 {model_name} 可用性失败,API Key:{api_key},错误信息:{e}")
return False
def refresh_models():
"""
刷新模型列表和免费模型列表。
"""
global all_models, free_models
all_models = get_all_models(FREE_MODEL_TEST_KEY)
free_models = []
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_model = {executor.submit(test_model_availability, FREE_MODEL_TEST_KEY, model): model for model in all_models}
for future in concurrent.futures.as_completed(future_to_model):
model = future_to_model[future]
try:
is_free = future.result()
if is_free:
free_models.append(model)
except Exception as exc:
logging.error(f"模型 {model} 测试生成异常: {exc}")
logging.info(f"所有模型列表:{all_models}")
logging.info(f"免费模型列表:{free_models}")
def load_keys():
"""
从环境变量中加载 keys,并根据额度和模型可用性进行分类,然后记录到日志中。
使用线程池并发处理每个 key。
"""
keys_str = os.environ.get("KEYS")
test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it")
if keys_str:
keys = [key.strip() for key in keys_str.split(',')]
logging.info(f"加载的 keys:{keys}")
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_to_key = {executor.submit(process_key, key, test_model): key for key in keys}
invalid_keys = []
free_keys = []
unverified_keys = []
valid_keys = []
for future in concurrent.futures.as_completed(future_to_key):
key = future_to_key[future]
try:
key_type = future.result()
if key_type == "invalid":
invalid_keys.append(key)
elif key_type == "free":
free_keys.append(key)
elif key_type == "unverified":
unverified_keys.append(key)
elif key_type == "valid":
valid_keys.append(key)
except Exception as exc:
logging.error(f"处理 KEY {key} 生成异常: {exc}")
logging.info(f"无效 KEY:{invalid_keys}")
logging.info(f"免费 KEY:{free_keys}")
logging.info(f"未实名 KEY:{unverified_keys}")
logging.info(f"有效 KEY:{valid_keys}")
global invalid_keys_global, free_keys_global, unverified_keys_global, valid_keys_global
invalid_keys_global = invalid_keys
free_keys_global = free_keys
unverified_keys_global = unverified_keys
valid_keys_global = valid_keys
else:
logging.warning("环境变量 KEYS 未设置。")
def process_key(key, test_model):
"""
处理单个 key,判断其类型。
"""
credit_summary = get_credit_summary(key)
if credit_summary is None:
return "invalid"
else:
total_balance = credit_summary.get("total_balance", 0)
if total_balance <= 0:
return "free"
else:
if test_model_availability(key, test_model):
return "valid"
else:
return "unverified"
def get_all_models(api_key):
"""
获取所有模型列表。
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
response = requests.get(MODELS_ENDPOINT, headers=headers, params={"sub_type": "chat"})
response.raise_for_status()
data = response.json()
if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list):
return [model.get("id") for model in data["data"] if isinstance(model, dict) and "id" in model]
else:
logging.error("获取模型列表失败:响应数据格式不正确")
return []
except requests.exceptions.RequestException as e:
logging.error(f"获取模型列表失败,API Key:{api_key},错误信息:{e}")
return []
except (KeyError, TypeError) as e:
logging.error(f"解析模型列表失败,API Key:{api_key},错误信息:{e}")
return []
def determine_request_type(model_name):
"""
根据用户请求的模型判断请求类型。
"""
if model_name in free_models:
return "free"
elif model_name in all_models:
return "paid"
else:
return "unknown"
def select_key(request_type, model_name):
"""
根据请求类型和模型名称选择合适的 KEY,并实现轮询和重试机制。
"""
if request_type == "free":
available_keys = free_keys_global + unverified_keys_global + valid_keys_global
elif request_type == "paid":
available_keys = unverified_keys_global + valid_keys_global
else:
available_keys = free_keys_global + unverified_keys_global + valid_keys_global
if not available_keys:
return None
current_index = model_key_indices.get(model_name, 0)
for _ in range(len(available_keys)):
key = available_keys[current_index % len(available_keys)]
current_index += 1
if key_is_valid(key, request_type):
model_key_indices[model_name] = current_index
return key
else:
logging.warning(f"KEY {key} 无效或达到限制,尝试下一个 KEY")
model_key_indices[model_name] = 0
return None
def key_is_valid(key, request_type):
"""
检查 KEY 是否有效,根据不同的请求类型进行不同的检查。
"""
if request_type == "invalid":
return False
credit_summary = get_credit_summary(key)
if credit_summary is None:
return False
total_balance = credit_summary.get("total_balance", 0)
if request_type == "free":
return True
elif request_type == "paid" or request_type == "unverified":
return total_balance > 0
else:
return False
def check_authorization(request):
"""
检查请求头中的 Authorization 字段是否匹配环境变量 AUTHORIZATION_KEY。
"""
authorization_key = os.environ.get("AUTHORIZATION_KEY")
if not authorization_key:
logging.warning("环境变量 AUTHORIZATION_KEY 未设置,请设置后重试。")
return False
auth_header = request.headers.get('Authorization')
if not auth_header:
logging.warning("请求头中缺少 Authorization 字段。")
return False
if auth_header != f"Bearer {authorization_key}":
logging.warning(f"无效的 Authorization 密钥:{auth_header}")
return False
return True
scheduler = BackgroundScheduler()
scheduler.add_job(load_keys, 'interval', hours=1)
scheduler.add_job(refresh_models, 'interval', minutes=10)
@app.route('/')
def index():
return "
Welcome to SiliconFlow
"
@app.route('/check_tokens', methods=['POST'])
def check_tokens():
tokens = request.json.get('tokens', [])
test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it")
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_to_token = {executor.submit(process_key, token, test_model): token for token in tokens}
results = []
for future in concurrent.futures.as_completed(future_to_token):
token = future_to_token[future]
try:
key_type = future.result()
credit_summary = get_credit_summary(token)
balance = credit_summary.get("total_balance", 0) if credit_summary else 0
if key_type == "invalid":
results.append({"token": token, "type": "无效 KEY", "balance": balance, "message": "无法获取额度信息"})
elif key_type == "free":
results.append({"token": token, "type": "免费 KEY", "balance": balance, "message": "额度不足"})
elif key_type == "unverified":
results.append({"token": token, "type": "未实名 KEY", "balance": balance, "message": "无法使用指定模型"})
elif key_type == "valid":
results.append({"token": token, "type": "有效 KEY", "balance": balance, "message": "可以使用指定模型"})
except Exception as exc:
logging.error(f"处理 Token {token} 生成异常: {exc}")
return jsonify(results)
@app.route('/handsome/v1/chat/completions', methods=['POST'])
def handsome_chat_completions():
if not check_authorization(request):
return jsonify({"error": "Unauthorized"}), 401
data = request.get_json()
if not data or 'model' not in data:
return jsonify({"error": "Invalid request data"}), 400
model_name = data['model']
request_type = determine_request_type(model_name)
api_key = select_key(request_type, model_name)
if not api_key:
return jsonify({"error": "No available API key for this request type or all keys have reached their limits"}), 429
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
start_time = time.time()
response = requests.post(
TEST_MODEL_ENDPOINT,
headers=headers,
json=data,
stream=data.get("stream", False),
timeout=60
)
if response.status_code == 429:
return jsonify(response.json()), 429
if data.get("stream", False):
def generate():
first_chunk_time = None
full_response_content = ""
for chunk in response.iter_content(chunk_size=1024):
if chunk:
if first_chunk_time is None:
first_chunk_time = time.time()
full_response_content += chunk.decode("utf-8")
yield chunk
end_time = time.time()
first_token_time = first_chunk_time - start_time if first_chunk_time else 0
total_time = end_time - start_time
# 处理流式响应,逐行解析 JSON
prompt_tokens = 0
completion_tokens = 0
response_content = ""
for line in full_response_content.splitlines():
if line.startswith("data:"):
line = line[5:].strip()
if line == "[DONE]":
continue
try:
response_json = json.loads(line)
# 提取信息,这里只累加 completion_tokens 和 content
if "usage" in response_json and "completion_tokens" in response_json["usage"]:
completion_tokens = response_json["usage"]["completion_tokens"]
if "choices" in response_json and len(response_json["choices"]) > 0 and "delta" in response_json["choices"][0] and "content" in response_json["choices"][0]["delta"]:
response_content += response_json["choices"][0]["delta"]["content"]
if "usage" in response_json and "prompt_tokens" in response_json["usage"]:
prompt_tokens = response_json["usage"]["prompt_tokens"]
except (KeyError, ValueError, IndexError) as e:
logging.error(f"解析流式响应单行 JSON 失败: {e}, 行内容: {line}")
# 提取用户输入的内容,忽略非文本内容
user_content = ""
messages = data.get("messages", [])
for message in messages:
if message["role"] == "user" and isinstance(message["content"], str):
user_content += message["content"] + " "
user_content = user_content.strip()
# 记录日志,将换行符替换为 \n
# Create temporary variables to hold the replaced strings
user_content_replaced = user_content.replace('\n', '\\n').replace('\r', '\\n')
response_content_replaced = response_content.replace('\n', '\\n').replace('\r', '\\n')
logging.info(
f"使用的key: {api_key}, 提示token: {prompt_tokens}, 输出token: {completion_tokens}, 首字用时: {first_token_time:.4f}秒, 总共用时: {total_time:.4f}秒, 使用的模型: {model_name}, 用户的内容: {user_content_replaced}, 输出的内容: {response_content_replaced}"
)
return Response(stream_with_context(generate()), content_type=response.headers['Content-Type'])
else:
# 非流式响应处理... (保持原样)
response.raise_for_status()
end_time = time.time()
response_json = response.json()
total_time = end_time - start_time
# 从响应中提取信息
try:
prompt_tokens = response_json["usage"]["prompt_tokens"]
completion_tokens = response_json["usage"]["completion_tokens"]
response_content = response_json["choices"][0]["message"]["content"]
except (KeyError, ValueError, IndexError) as e:
logging.error(f"解析非流式响应 JSON 失败: {e}, 完整内容: {response_json}")
prompt_tokens = 0
completion_tokens = 0
response_content = ""
# 提取用户输入的内容,忽略非文本内容
user_content = ""
messages = data.get("messages", [])
for message in messages:
if message["role"] == "user" and isinstance(message["content"], str):
user_content += message["content"] + " "
user_content = user_content.strip()
# 记录日志,将换行符替换为 \n
# Create temporary variables to hold the replaced strings
user_content_replaced = user_content.replace('\n', '\\n').replace('\r', '\\n')
response_content_replaced = response_content.replace('\n', '\\n').replace('\r', '\\n')
logging.info(
f"使用的key: {api_key}, 提示token: {prompt_tokens}, 输出token: {completion_tokens}, 首字用时: 0, 总共用时: {total_time:.4f}秒, 使用的模型: {model_name}, 用户的内容: {user_content_replaced}, 输出的内容: {response_content_replaced}"
)
return jsonify(response_json)
except requests.exceptions.RequestException as e:
return jsonify({"error": str(e)}), 500
@app.route('/handsome/v1/models', methods=['GET'])
def list_models():
if not check_authorization(request):
return jsonify({"error": "Unauthorized"}), 401
return jsonify({
"data": [{"id": model, "object": "model"} for model in all_models],
"free_models": free_models
})
def get_billing_info():
keys = valid_keys_global + unverified_keys_global
total_balance = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(get_credit_summary, key) for key in keys]
for future in concurrent.futures.as_completed(futures):
try:
credit_summary = future.result()
if credit_summary:
total_balance += credit_summary.get("total_balance", 0)
except Exception as exc:
logging.error(f"获取额度信息生成异常: {exc}")
return total_balance
@app.route('/handsome/v1/dashboard/billing/usage', methods=['GET'])
def billing_usage():
if not check_authorization(request):
return jsonify({"error": "Unauthorized"}), 401
end_date = datetime.now()
start_date = end_date - timedelta(days=30)
daily_usage = []
current_date = start_date
while current_date <= end_date:
daily_usage.append({
"timestamp": int(current_date.timestamp()),
"daily_usage": 0
})
current_date += timedelta(days=1)
return jsonify({
"object": "list",
"data": daily_usage,
"total_usage": 0
})
@app.route('/handsome/v1/dashboard/billing/subscription', methods=['GET'])
def billing_subscription():
if not check_authorization(request):
return jsonify({"error": "Unauthorized"}), 401
total_balance = get_billing_info()
return jsonify({
"object": "billing_subscription",
"has_payment_method": False,
"canceled": False,
"canceled_at": None,
"delinquent": None,
"access_until": int(datetime(9999, 12, 31).timestamp()),
"soft_limit": 0,
"hard_limit": total_balance,
"system_hard_limit": total_balance,
"soft_limit_usd": 0,
"hard_limit_usd": total_balance,
"system_hard_limit_usd": total_balance,
"plan": {
"name": "SiliconFlow API",
"id": "siliconflow-api"
},
"account_name": "SiliconFlow User",
"po_number": None,
"billing_email": None,
"tax_ids": [],
"billing_address": None,
"business_address": None
})
if __name__ == '__main__':
import json
logging.info(f"环境变量:{os.environ}")
invalid_keys_global = []
free_keys_global = []
unverified_keys_global = []
valid_keys_global = []
scheduler.start()
load_keys()
logging.info("首次加载 keys 已手动触发执行")
refresh_models()
logging.info("首次刷新模型列表已手动触发执行")
app.run(debug=False, host='0.0.0.0', port=int(os.environ.get('PORT', 7860)))