Spaces:
Running
Running
import os | |
import time | |
import logging | |
import requests | |
import json | |
import random | |
import uuid | |
import concurrent.futures | |
from datetime import datetime, timedelta | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from flask import Flask, request, jsonify, Response, stream_with_context | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s') | |
API_ENDPOINT = "https://api.siliconflow.cn/v1/user/info" | |
TEST_MODEL_ENDPOINT = "https://api.siliconflow.cn/v1/chat/completions" | |
MODELS_ENDPOINT = "https://api.siliconflow.cn/v1/models" | |
EMBEDDINGS_ENDPOINT = "https://api.siliconflow.cn/v1/embeddings" | |
app = Flask(__name__) | |
all_models = [] | |
free_models = [] | |
embedding_models = [] | |
free_embedding_models = [] | |
invalid_keys_global = [] | |
free_keys_global = [] | |
unverified_keys_global = [] | |
valid_keys_global = [] | |
executor = concurrent.futures.ThreadPoolExecutor(max_workers=20) | |
model_key_indices = {} | |
def get_credit_summary(api_key): | |
""" | |
使用 API 密钥获取额度信息。 | |
""" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
try: | |
response = requests.get(API_ENDPOINT, headers=headers) | |
response.raise_for_status() | |
data = response.json().get("data", {}) | |
total_balance = data.get("totalBalance", 0) | |
return {"total_balance": float(total_balance)} | |
except requests.exceptions.RequestException as e: | |
logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}") | |
return None | |
except (KeyError, TypeError) as e: | |
logging.error(f"解析额度信息失败,API Key:{api_key},错误信息:{e}") | |
return None | |
except ValueError as e: | |
logging.error(f"total_balance 无法转换为浮点数,API Key:{api_key},错误信息:{e}") | |
return None | |
FREE_MODEL_TEST_KEY = "sk-bmjbjzleaqfgtqfzmcnsbagxrlohriadnxqrzfocbizaxukw" | |
def test_model_availability(api_key, model_name): | |
""" | |
测试指定的模型是否可用。 | |
""" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
try: | |
response = requests.post(TEST_MODEL_ENDPOINT, | |
headers=headers, | |
json={ | |
"model": model_name, | |
"messages": [{"role": "user", "content": "hi"}], | |
"max_tokens": 5, | |
"stream": False | |
}, | |
timeout=10) | |
if response.status_code == 429 or response.status_code == 200: | |
return True | |
else: | |
return False | |
except requests.exceptions.RequestException as e: | |
logging.error(f"测试模型 {model_name} 可用性失败,API Key:{api_key},错误信息:{e}") | |
return False | |
def refresh_models(): | |
""" | |
刷新模型列表和免费模型列表。 | |
""" | |
global all_models, free_models, embedding_models, free_embedding_models | |
all_models = get_all_models(FREE_MODEL_TEST_KEY, "chat") | |
embedding_models = get_all_models(FREE_MODEL_TEST_KEY, "embedding") | |
free_models = [] | |
free_embedding_models = [] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: | |
future_to_model = {executor.submit(test_model_availability, FREE_MODEL_TEST_KEY, model): model for model in | |
all_models} | |
for future in concurrent.futures.as_completed(future_to_model): | |
model = future_to_model[future] | |
try: | |
is_free = future.result() | |
if is_free: | |
free_models.append(model) | |
except Exception as exc: | |
logging.error(f"模型 {model} 测试生成异常: {exc}") | |
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: | |
future_to_model = { | |
executor.submit(test_embedding_model_availability, FREE_MODEL_TEST_KEY, model): model for model in | |
embedding_models} | |
for future in concurrent.futures.as_completed(future_to_model): | |
model = future_to_model[future] | |
try: | |
is_free = future.result() | |
if is_free: | |
free_embedding_models.append(model) | |
except Exception as exc: | |
logging.error(f"模型 {model} 测试生成异常: {exc}") | |
logging.info(f"所有文本模型列表:{all_models}") | |
logging.info(f"免费文本模型列表:{free_models}") | |
logging.info(f"所有向量模型列表:{embedding_models}") | |
logging.info(f"免费向量模型列表:{free_embedding_models}") | |
def test_embedding_model_availability(api_key, model_name): | |
""" | |
测试指定的向量模型是否可用。 | |
""" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
try: | |
response = requests.post(EMBEDDINGS_ENDPOINT, | |
headers=headers, | |
json={ | |
"model": model_name, | |
"input": ["hi"], | |
}, | |
timeout=10) | |
if response.status_code == 429 or response.status_code == 200: | |
return True | |
else: | |
return False | |
except requests.exceptions.RequestException as e: | |
logging.error(f"测试向量模型 {model_name} 可用性失败,API Key:{api_key},错误信息:{e}") | |
return False | |
def load_keys(): | |
""" | |
从环境变量中加载 keys,并根据额度和模型可用性进行分类,然后记录到日志中。 | |
使用线程池并发处理每个 key。 | |
""" | |
keys_str = os.environ.get("KEYS") | |
test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it") | |
if keys_str: | |
keys = [key.strip() for key in keys_str.split(',')] | |
logging.info(f"加载的 keys:{keys}") | |
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: | |
future_to_key = {executor.submit(process_key, key, test_model): key for key in keys} | |
invalid_keys = [] | |
free_keys = [] | |
unverified_keys = [] | |
valid_keys = [] | |
for future in concurrent.futures.as_completed(future_to_key): | |
key = future_to_key[future] | |
try: | |
key_type = future.result() | |
if key_type == "invalid": | |
invalid_keys.append(key) | |
elif key_type == "free": | |
free_keys.append(key) | |
elif key_type == "unverified": | |
unverified_keys.append(key) | |
elif key_type == "valid": | |
valid_keys.append(key) | |
except Exception as exc: | |
logging.error(f"处理 KEY {key} 生成异常: {exc}") | |
logging.info(f"无效 KEY:{invalid_keys}") | |
logging.info(f"免费 KEY:{free_keys}") | |
logging.info(f"未实名 KEY:{unverified_keys}") | |
logging.info(f"有效 KEY:{valid_keys}") | |
global invalid_keys_global, free_keys_global, unverified_keys_global, valid_keys_global | |
invalid_keys_global = invalid_keys | |
free_keys_global = free_keys | |
unverified_keys_global = unverified_keys | |
valid_keys_global = valid_keys | |
else: | |
logging.warning("环境变量 KEYS 未设置。") | |
def process_key(key, test_model): | |
""" | |
处理单个 key,判断其类型。 | |
""" | |
credit_summary = get_credit_summary(key) | |
if credit_summary is None: | |
return "invalid" | |
else: | |
total_balance = credit_summary.get("total_balance", 0) | |
if total_balance <= 0: | |
return "free" | |
else: | |
if test_model_availability(key, test_model): | |
return "valid" | |
else: | |
return "unverified" | |
def get_all_models(api_key, sub_type): | |
""" | |
获取所有模型列表。 | |
""" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
try: | |
response = requests.get(MODELS_ENDPOINT, headers=headers, params={"sub_type": sub_type}) | |
response.raise_for_status() | |
data = response.json() | |
if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list): | |
return [model.get("id") for model in data["data"] if | |
isinstance(model, dict) and "id" in model] | |
else: | |
logging.error("获取模型列表失败:响应数据格式不正确") | |
return [] | |
except requests.exceptions.RequestException as e: | |
logging.error(f"获取模型列表失败,API Key:{api_key},错误信息:{e}") | |
return [] | |
except (KeyError, TypeError) as e: | |
logging.error(f"解析模型列表失败,API Key:{api_key},错误信息:{e}") | |
return [] | |
def determine_request_type(model_name, model_list, free_model_list): | |
""" | |
根据用户请求的模型判断请求类型。 | |
""" | |
if model_name in free_model_list: | |
return "free" | |
elif model_name in model_list: | |
return "paid" | |
else: | |
return "unknown" | |
def select_key(request_type, model_name): | |
""" | |
根据请求类型和模型名称选择合适的 KEY,并实现轮询和重试机制。 | |
""" | |
if request_type == "free": | |
available_keys = free_keys_global + unverified_keys_global + valid_keys_global | |
elif request_type == "paid": | |
available_keys = unverified_keys_global + valid_keys_global | |
else: | |
available_keys = free_keys_global + unverified_keys_global + valid_keys_global | |
if not available_keys: | |
return None | |
current_index = model_key_indices.get(model_name, 0) | |
for _ in range(len(available_keys)): | |
key = available_keys[current_index % len(available_keys)] | |
current_index += 1 | |
if key_is_valid(key, request_type): | |
model_key_indices[model_name] = current_index | |
return key | |
else: | |
logging.warning(f"KEY {key} 无效或达到限制,尝试下一个 KEY") | |
model_key_indices[model_name] = 0 | |
return None | |
def key_is_valid(key, request_type): | |
""" | |
检查 KEY 是否有效,根据不同的请求类型进行不同的检查。 | |
""" | |
if request_type == "invalid": | |
return False | |
credit_summary = get_credit_summary(key) | |
if credit_summary is None: | |
return False | |
total_balance = credit_summary.get("total_balance", 0) | |
if request_type == "free": | |
return True | |
elif request_type == "paid" or request_type == "unverified": | |
return total_balance > 0 | |
else: | |
return False | |
def check_authorization(request): | |
""" | |
检查请求头中的 Authorization 字段是否匹配环境变量 AUTHORIZATION_KEY。 | |
""" | |
authorization_key = os.environ.get("AUTHORIZATION_KEY") | |
if not authorization_key: | |
logging.warning("环境变量 AUTHORIZATION_KEY 未设置,请设置后重试。") | |
return False | |
auth_header = request.headers.get('Authorization') | |
if not auth_header: | |
logging.warning("请求头中缺少 Authorization 字段。") | |
return False | |
if auth_header != f"Bearer {authorization_key}": | |
logging.warning(f"无效的 Authorization 密钥:{auth_header}") | |
return False | |
return True | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(load_keys, 'interval', hours=1) | |
scheduler.add_job(refresh_models, 'interval', minutes=10) | |
def index(): | |
return "<h1>Welcome to SiliconFlow</h1>" | |
def check_tokens(): | |
tokens = request.json.get('tokens', []) | |
test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it") | |
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: | |
future_to_token = {executor.submit(process_key, token, test_model): token for token in tokens} | |
results = [] | |
for future in concurrent.futures.as_completed(future_to_token): | |
token = future_to_token[future] | |
try: | |
key_type = future.result() | |
credit_summary = get_credit_summary(token) | |
balance = credit_summary.get("total_balance", 0) if credit_summary else 0 | |
if key_type == "invalid": | |
results.append( | |
{"token": token, "type": "无效 KEY", "balance": balance, "message": "无法获取额度信息"}) | |
elif key_type == "free": | |
results.append({"token": token, "type": "免费 KEY", "balance": balance, "message": "额度不足"}) | |
elif key_type == "unverified": | |
results.append( | |
{"token": token, "type": "未实名 KEY", "balance": balance, "message": "无法使用指定模型"}) | |
elif key_type == "valid": | |
results.append( | |
{"token": token, "type": "有效 KEY", "balance": balance, "message": "可以使用指定模型"}) | |
except Exception as exc: | |
logging.error(f"处理 Token {token} 生成异常: {exc}") | |
return jsonify(results) | |
def handsome_chat_completions(): | |
if not check_authorization(request): | |
return jsonify({"error": "Unauthorized"}), 401 | |
data = request.get_json() | |
if not data or 'model' not in data: | |
return jsonify({"error": "Invalid request data"}), 400 | |
model_name = data['model'] | |
request_type = determine_request_type(model_name, all_models, free_models) | |
api_key = select_key(request_type, model_name) | |
if not api_key: | |
return jsonify( | |
{"error": "No available API key for this request type or all keys have reached their limits"}), 429 | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
try: | |
start_time = time.time() | |
response = requests.post( | |
TEST_MODEL_ENDPOINT, | |
headers=headers, | |
json=data, | |
stream=data.get("stream", False), | |
timeout=60 | |
) | |
if response.status_code == 429: | |
return jsonify(response.json()), 429 | |
if data.get("stream", False): | |
def generate(): | |
first_chunk_time = None | |
full_response_content = "" | |
for chunk in response.iter_content(chunk_size=1024): | |
if chunk: | |
if first_chunk_time is None: | |
first_chunk_time = time.time() | |
full_response_content += chunk.decode("utf-8") | |
yield chunk | |
end_time = time.time() | |
first_token_time = first_chunk_time - start_time if first_chunk_time else 0 | |
total_time = end_time - start_time | |
prompt_tokens = 0 | |
completion_tokens = 0 | |
response_content = "" | |
for line in full_response_content.splitlines(): | |
if line.startswith("data:"): | |
line = line[5:].strip() | |
if line == "[DONE]": | |
continue | |
try: | |
response_json = json.loads(line) | |
if "usage" in response_json and "completion_tokens" in response_json["usage"]: | |
completion_tokens = response_json["usage"]["completion_tokens"] | |
if "choices" in response_json and len(response_json["choices"]) > 0 and "delta" in \ | |
response_json["choices"][0] and "content" in response_json["choices"][0][ | |
"delta"]: | |
response_content += response_json["choices"][0]["delta"]["content"] | |
if "usage" in response_json and "prompt_tokens" in response_json["usage"]: | |
prompt_tokens = response_json["usage"]["prompt_tokens"] | |
except (KeyError, ValueError, IndexError) as e: | |
logging.error(f"解析流式响应单行 JSON 失败: {e}, 行内容: {line}") | |
user_content = "" | |
messages = data.get("messages", []) | |
for message in messages: | |
if message["role"] == "user": | |
if isinstance(message["content"], str): | |
user_content += message["content"] + " " | |
elif isinstance(message["content"], list): | |
for item in message["content"]: | |
if isinstance(item, dict) and item.get("type") == "text": | |
user_content += item.get("text", "") + " " | |
user_content = user_content.strip() | |
user_content_replaced = user_content.replace('\n', '\\n').replace('\r', '\\n') | |
response_content_replaced = response_content.replace('\n', '\\n').replace('\r', '\\n') | |
logging.info( | |
f"使用的key: {api_key}, 提示token: {prompt_tokens}, 输出token: {completion_tokens}, 首字用时: {first_token_time:.4f}秒, 总共用时: {total_time:.4f}秒, 使用的模型: {model_name}, 用户的内容: {user_content_replaced}, 输出的内容: {response_content_replaced}" | |
) | |
return Response(stream_with_context(generate()), content_type=response.headers['Content-Type']) | |
else: | |
response.raise_for_status() | |
end_time = time.time() | |
response_json = response.json() | |
total_time = end_time - start_time | |
try: | |
prompt_tokens = response_json["usage"]["prompt_tokens"] | |
completion_tokens = response_json["usage"]["completion_tokens"] | |
response_content = response_json["choices"][0]["message"]["content"] | |
except (KeyError, ValueError, IndexError) as e: | |
logging.error(f"解析非流式响应 JSON 失败: {e}, 完整内容: {response_json}") | |
prompt_tokens = 0 | |
completion_tokens = 0 | |
response_content = "" | |
user_content = "" | |
messages = data.get("messages", []) | |
for message in messages: | |
if message["role"] == "user": | |
if isinstance(message["content"], str): | |
user_content += message["content"] + " " | |
elif isinstance(message["content"], list): | |
for item in message["content"]: | |
if isinstance(item, dict) and item.get("type") == "text": | |
user_content += item.get("text", "") + " " | |
user_content = user_content.strip() | |
user_content_replaced = user_content.replace('\n', '\\n').replace('\r', '\\n') | |
response_content_replaced = response_content.replace('\n', '\\n').replace('\r', '\\n') | |
logging.info( | |
f"使用的key: {api_key}, 提示token: {prompt_tokens}, 输出token: {completion_tokens}, 首字用时: 0, 总共用时: {total_time:.4f}秒, 使用的模型: {model_name}, 用户的内容: {user_content_replaced}, 输出的内容: {response_content_replaced}" | |
) | |
return jsonify(response_json) | |
except requests.exceptions.RequestException as e: | |
return jsonify({"error": str(e)}), 500 | |
def list_models(): | |
if not check_authorization(request): | |
return jsonify({"error": "Unauthorized"}), 401 | |
detailed_models = [] | |
for model in all_models: | |
detailed_models.append({ | |
"id": model, | |
"object": "model", | |
"created": 1678888888, | |
"owned_by": random.choice(["linux do"]), | |
"permission": [ | |
{ | |
"id": f"modelperm-{uuid.uuid4().hex}", | |
"object": "model_permission", | |
"created": 1678888888, | |
"allow_create_engine": False, | |
"allow_sampling": True, | |
"allow_logprobs": True, | |
"allow_search_indices": False, | |
"allow_view": True, | |
"allow_fine_tuning": False, | |
"organization": "*", | |
"group": None, | |
"is_blocking": False | |
} | |
], | |
"root": model, | |
"parent": None | |
}) | |
return jsonify({ | |
"object": "list", | |
"data": detailed_models | |
}) | |
def get_billing_info(): | |
keys = valid_keys_global + unverified_keys_global | |
total_balance = 0 | |
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: | |
futures = [executor.submit(get_credit_summary, key) for key in keys] | |
for future in concurrent.futures.as_completed(futures): | |
try: | |
credit_summary = future.result() | |
if credit_summary: | |
total_balance += credit_summary.get("total_balance", 0) | |
except Exception as exc: | |
logging.error(f"获取额度信息生成异常: {exc}") | |
return total_balance | |
def billing_usage(): | |
if not check_authorization(request): | |
return jsonify({"error": "Unauthorized"}), 401 | |
end_date = datetime.now() | |
start_date = end_date - timedelta(days=30) | |
daily_usage = [] | |
current_date = start_date | |
while current_date <= end_date: | |
daily_usage.append({ | |
"timestamp": int(current_date.timestamp()), | |
"daily_usage": 0 | |
}) | |
current_date += timedelta(days=1) | |
return jsonify({ | |
"object": "list", | |
"data": daily_usage, | |
"total_usage": 0 | |
}) | |
def billing_subscription(): | |
if not check_authorization(request): | |
return jsonify({"error": "Unauthorized"}), 401 | |
total_balance = get_billing_info() | |
return jsonify({ | |
"object": "billing_subscription", | |
"has_payment_method": False, | |
"canceled": False, | |
"canceled_at": None, | |
"delinquent": None, | |
"access_until": int(datetime(9999, 12, 31).timestamp()), | |
"soft_limit": 0, | |
"hard_limit": total_balance, | |
"system_hard_limit": total_balance, | |
"soft_limit_usd": 0, | |
"hard_limit_usd": total_balance, | |
"system_hard_limit_usd": total_balance, | |
"plan": { | |
"name": "SiliconFlow API", | |
"id": "siliconflow-api" | |
}, | |
"account_name": "SiliconFlow User", | |
"po_number": None, | |
"billing_email": None, | |
"tax_ids": [], | |
"billing_address": None, | |
"business_address": None | |
}) | |
def handsome_embeddings(): | |
if not check_authorization(request): | |
return jsonify({"error": "Unauthorized"}), 401 | |
data = request.get_json() | |
if not data or 'model' not in data: | |
return jsonify({"error": "Invalid request data"}), 400 | |
model_name = data['model'] | |
request_type = determine_request_type(model_name, embedding_models, free_embedding_models) | |
api_key = select_key(request_type, model_name) | |
if not api_key: | |
return jsonify( | |
{"error": "No available API key for this request type or all keys have reached their limits"}), 429 | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
try: | |
start_time = time.time() | |
response = requests.post( | |
EMBEDDINGS_ENDPOINT, | |
headers=headers, | |
json=data, | |
timeout=60 | |
) | |
if response.status_code == 429: | |
return jsonify(response.json()), 429 | |
response.raise_for_status() | |
end_time = time.time() | |
response_json = response.json() | |
total_time = end_time - start_time | |
try: | |
prompt_tokens = response_json["usage"]["prompt_tokens"] | |
embedding_data = response_json["data"] | |
except (KeyError, ValueError, IndexError) as e: | |
logging.error(f"解析响应 JSON 失败: {e}, 完整内容: {response_json}") | |
prompt_tokens = 0 | |
embedding_data = [] | |
logging.info( | |
f"使用的key: {api_key}, 提示token: {prompt_tokens}, 总共用时: {total_time:.4f}秒, 使用的模型: {model_name}" | |
) | |
return jsonify({ | |
"object": "list", | |
"data": embedding_data, | |
"model": model_name, | |
"usage": { | |
"prompt_tokens": prompt_tokens, | |
"total_tokens": prompt_tokens | |
} | |
}) | |
except requests.exceptions.RequestException as e: | |
return jsonify({"error": str(e)}), 500 | |
if __name__ == '__main__': | |
import json | |
logging.info(f"环境变量:{os.environ}") | |
invalid_keys_global = [] | |
free_keys_global = [] | |
unverified_keys_global = [] | |
valid_keys_global = [] | |
scheduler.start() | |
load_keys() | |
logging.info("首次加载 keys 已手动触发执行") | |
refresh_models() | |
logging.info("首次刷新模型列表已手动触发执行") | |
app.run(debug=False, host='0.0.0.0', port=int(os.environ.get('PORT', 7860))) | |