import os import time import logging import requests import json import concurrent.futures from datetime import datetime, timedelta from apscheduler.schedulers.background import BackgroundScheduler from flask import Flask, request, jsonify, Response, stream_with_context logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') API_ENDPOINT = "https://api.siliconflow.cn/v1/user/info" TEST_MODEL_ENDPOINT = "https://api.siliconflow.cn/v1/chat/completions" MODELS_ENDPOINT = "https://api.siliconflow.cn/v1/models" app = Flask(__name__) all_models = [] free_models = [] invalid_keys_global = [] free_keys_global = [] unverified_keys_global = [] valid_keys_global = [] executor = concurrent.futures.ThreadPoolExecutor(max_workers=20) model_key_indices = {} def get_credit_summary(api_key): """ 使用 API 密钥获取额度信息。 """ headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: response = requests.get(API_ENDPOINT, headers=headers) response.raise_for_status() data = response.json().get("data", {}) total_balance = data.get("totalBalance", 0) return {"total_balance": float(total_balance)} except requests.exceptions.RequestException as e: logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}") return None except (KeyError, TypeError) as e: logging.error(f"解析额度信息失败,API Key:{api_key},错误信息:{e}") return None except ValueError as e: logging.error(f"total_balance 无法转换为浮点数,API Key:{api_key},错误信息:{e}") return None FREE_MODEL_TEST_KEY = "sk-bmjbjzleaqfgtqfzmcnsbagxrlohriadnxqrzfocbizaxukw" def test_model_availability(api_key, model_name): """ 测试指定的模型是否可用。 """ headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: response = requests.post(TEST_MODEL_ENDPOINT, headers=headers, json={ "model": model_name, "messages": [{"role": "user", "content": "hi"}], "max_tokens": 5, "stream": False }, timeout=10) if response.status_code == 429 or response.status_code == 200 : return True else: return False except requests.exceptions.RequestException as e: logging.error(f"测试模型 {model_name} 可用性失败,API Key:{api_key},错误信息:{e}") return False def refresh_models(): """ 刷新模型列表和免费模型列表。 """ global all_models, free_models all_models = get_all_models(FREE_MODEL_TEST_KEY) free_models = [] with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to_model = {executor.submit(test_model_availability, FREE_MODEL_TEST_KEY, model): model for model in all_models} for future in concurrent.futures.as_completed(future_to_model): model = future_to_model[future] try: is_free = future.result() if is_free: free_models.append(model) except Exception as exc: logging.error(f"模型 {model} 测试生成异常: {exc}") logging.info(f"所有模型列表:{all_models}") logging.info(f"免费模型列表:{free_models}") def load_keys(): """ 从环境变量中加载 keys,并根据额度和模型可用性进行分类,然后记录到日志中。 使用线程池并发处理每个 key。 """ keys_str = os.environ.get("KEYS") test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it") if keys_str: keys = [key.strip() for key in keys_str.split(',')] logging.info(f"加载的 keys:{keys}") with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: future_to_key = {executor.submit(process_key, key, test_model): key for key in keys} invalid_keys = [] free_keys = [] unverified_keys = [] valid_keys = [] for future in concurrent.futures.as_completed(future_to_key): key = future_to_key[future] try: key_type = future.result() if key_type == "invalid": invalid_keys.append(key) elif key_type == "free": free_keys.append(key) elif key_type == "unverified": unverified_keys.append(key) elif key_type == "valid": valid_keys.append(key) except Exception as exc: logging.error(f"处理 KEY {key} 生成异常: {exc}") logging.info(f"无效 KEY:{invalid_keys}") logging.info(f"免费 KEY:{free_keys}") logging.info(f"未实名 KEY:{unverified_keys}") logging.info(f"有效 KEY:{valid_keys}") global invalid_keys_global, free_keys_global, unverified_keys_global, valid_keys_global invalid_keys_global = invalid_keys free_keys_global = free_keys unverified_keys_global = unverified_keys valid_keys_global = valid_keys else: logging.warning("环境变量 KEYS 未设置。") def process_key(key, test_model): """ 处理单个 key,判断其类型。 """ credit_summary = get_credit_summary(key) if credit_summary is None: return "invalid" else: total_balance = credit_summary.get("total_balance", 0) if total_balance <= 0: return "free" else: if test_model_availability(key, test_model): return "valid" else: return "unverified" def get_all_models(api_key): """ 获取所有模型列表。 """ headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: response = requests.get(MODELS_ENDPOINT, headers=headers, params={"sub_type": "chat"}) response.raise_for_status() data = response.json() if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list): return [model.get("id") for model in data["data"] if isinstance(model, dict) and "id" in model] else: logging.error("获取模型列表失败:响应数据格式不正确") return [] except requests.exceptions.RequestException as e: logging.error(f"获取模型列表失败,API Key:{api_key},错误信息:{e}") return [] except (KeyError, TypeError) as e: logging.error(f"解析模型列表失败,API Key:{api_key},错误信息:{e}") return [] def determine_request_type(model_name): """ 根据用户请求的模型判断请求类型。 """ if model_name in free_models: return "free" elif model_name in all_models: return "paid" else: return "unknown" def select_key(request_type, model_name): """ 根据请求类型和模型名称选择合适的 KEY,并实现轮询和重试机制。 """ if request_type == "free": available_keys = free_keys_global + unverified_keys_global + valid_keys_global elif request_type == "paid": available_keys = unverified_keys_global + valid_keys_global else: available_keys = free_keys_global + unverified_keys_global + valid_keys_global if not available_keys: return None current_index = model_key_indices.get(model_name, 0) for _ in range(len(available_keys)): key = available_keys[current_index % len(available_keys)] current_index += 1 if key_is_valid(key, request_type): model_key_indices[model_name] = current_index return key else: logging.warning(f"KEY {key} 无效或达到限制,尝试下一个 KEY") model_key_indices[model_name] = 0 return None def key_is_valid(key, request_type): """ 检查 KEY 是否有效,根据不同的请求类型进行不同的检查。 """ if request_type == "invalid": return False credit_summary = get_credit_summary(key) if credit_summary is None: return False total_balance = credit_summary.get("total_balance", 0) if request_type == "free": return True elif request_type == "paid" or request_type == "unverified": return total_balance > 0 else: return False def check_authorization(request): """ 检查请求头中的 Authorization 字段是否匹配环境变量 AUTHORIZATION_KEY。 """ authorization_key = os.environ.get("AUTHORIZATION_KEY") if not authorization_key: logging.warning("环境变量 AUTHORIZATION_KEY 未设置,请设置后重试。") return False auth_header = request.headers.get('Authorization') if not auth_header: logging.warning("请求头中缺少 Authorization 字段。") return False if auth_header != f"Bearer {authorization_key}": logging.warning(f"无效的 Authorization 密钥:{auth_header}") return False return True scheduler = BackgroundScheduler() scheduler.add_job(load_keys, 'interval', hours=1) scheduler.add_job(refresh_models, 'interval', minutes=10) @app.route('/') def index(): return "

Welcome to SiliconFlow

" @app.route('/check_tokens', methods=['POST']) def check_tokens(): tokens = request.json.get('tokens', []) test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it") with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: future_to_token = {executor.submit(process_key, token, test_model): token for token in tokens} results = [] for future in concurrent.futures.as_completed(future_to_token): token = future_to_token[future] try: key_type = future.result() credit_summary = get_credit_summary(token) balance = credit_summary.get("total_balance", 0) if credit_summary else 0 if key_type == "invalid": results.append({"token": token, "type": "无效 KEY", "balance": balance, "message": "无法获取额度信息"}) elif key_type == "free": results.append({"token": token, "type": "免费 KEY", "balance": balance, "message": "额度不足"}) elif key_type == "unverified": results.append({"token": token, "type": "未实名 KEY", "balance": balance, "message": "无法使用指定模型"}) elif key_type == "valid": results.append({"token": token, "type": "有效 KEY", "balance": balance, "message": "可以使用指定模型"}) except Exception as exc: logging.error(f"处理 Token {token} 生成异常: {exc}") return jsonify(results) @app.route('/handsome/v1/chat/completions', methods=['POST']) def handsome_chat_completions(): if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 data = request.get_json() if not data or 'model' not in data: return jsonify({"error": "Invalid request data"}), 400 model_name = data['model'] request_type = determine_request_type(model_name) api_key = select_key(request_type, model_name) if not api_key: return jsonify({"error": "No available API key for this request type or all keys have reached their limits"}), 429 headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: start_time = time.time() response = requests.post( TEST_MODEL_ENDPOINT, headers=headers, json=data, stream=data.get("stream", False), timeout=60 ) if response.status_code == 429: return jsonify(response.json()), 429 if data.get("stream", False): def generate(): first_chunk_time = None full_response_content = "" for chunk in response.iter_content(chunk_size=1024): if chunk: if first_chunk_time is None: first_chunk_time = time.time() full_response_content += chunk.decode("utf-8") yield chunk end_time = time.time() first_token_time = first_chunk_time - start_time if first_chunk_time else 0 total_time = end_time - start_time # 处理流式响应,逐行解析 JSON prompt_tokens = 0 completion_tokens = 0 response_content = "" for line in full_response_content.splitlines(): if line.startswith("data:"): line = line[5:].strip() if line == "[DONE]": continue try: response_json = json.loads(line) # 提取信息,这里只累加 completion_tokens 和 content if "usage" in response_json and "completion_tokens" in response_json["usage"]: completion_tokens = response_json["usage"]["completion_tokens"] if "choices" in response_json and len(response_json["choices"]) > 0 and "delta" in response_json["choices"][0] and "content" in response_json["choices"][0]["delta"]: response_content += response_json["choices"][0]["delta"]["content"] if "usage" in response_json and "prompt_tokens" in response_json["usage"]: prompt_tokens = response_json["usage"]["prompt_tokens"] except (KeyError, ValueError, IndexError) as e: logging.error(f"解析流式响应单行 JSON 失败: {e}, 行内容: {line}") # 提取用户输入的内容,忽略非文本内容 user_content = "" messages = data.get("messages", []) for message in messages: if message["role"] == "user" and isinstance(message["content"], str): user_content += message["content"] + " " user_content = user_content.strip() # 记录日志,将换行符替换为 \n # Create temporary variables to hold the replaced strings user_content_replaced = user_content.replace('\n', '\\n').replace('\r', '\\n') response_content_replaced = response_content.replace('\n', '\\n').replace('\r', '\\n') logging.info( f"使用的key: {api_key}, 提示token: {prompt_tokens}, 输出token: {completion_tokens}, 首字用时: {first_token_time:.4f}秒, 总共用时: {total_time:.4f}秒, 使用的模型: {model_name}, 用户的内容: {user_content_replaced}, 输出的内容: {response_content_replaced}" ) return Response(stream_with_context(generate()), content_type=response.headers['Content-Type']) else: # 非流式响应处理... (保持原样) response.raise_for_status() end_time = time.time() response_json = response.json() total_time = end_time - start_time # 从响应中提取信息 try: prompt_tokens = response_json["usage"]["prompt_tokens"] completion_tokens = response_json["usage"]["completion_tokens"] response_content = response_json["choices"][0]["message"]["content"] except (KeyError, ValueError, IndexError) as e: logging.error(f"解析非流式响应 JSON 失败: {e}, 完整内容: {response_json}") prompt_tokens = 0 completion_tokens = 0 response_content = "" # 提取用户输入的内容,忽略非文本内容 user_content = "" messages = data.get("messages", []) for message in messages: if message["role"] == "user" and isinstance(message["content"], str): user_content += message["content"] + " " user_content = user_content.strip() # 记录日志,将换行符替换为 \n # Create temporary variables to hold the replaced strings user_content_replaced = user_content.replace('\n', '\\n').replace('\r', '\\n') response_content_replaced = response_content.replace('\n', '\\n').replace('\r', '\\n') logging.info( f"使用的key: {api_key}, 提示token: {prompt_tokens}, 输出token: {completion_tokens}, 首字用时: 0, 总共用时: {total_time:.4f}秒, 使用的模型: {model_name}, 用户的内容: {user_content_replaced}, 输出的内容: {response_content_replaced}" ) return jsonify(response_json) except requests.exceptions.RequestException as e: return jsonify({"error": str(e)}), 500 @app.route('/handsome/v1/models', methods=['GET']) def list_models(): if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 return jsonify({ "data": [{"id": model, "object": "model"} for model in all_models], "free_models": free_models }) def get_billing_info(): keys = valid_keys_global + unverified_keys_global total_balance = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: futures = [executor.submit(get_credit_summary, key) for key in keys] for future in concurrent.futures.as_completed(futures): try: credit_summary = future.result() if credit_summary: total_balance += credit_summary.get("total_balance", 0) except Exception as exc: logging.error(f"获取额度信息生成异常: {exc}") return total_balance @app.route('/handsome/v1/dashboard/billing/usage', methods=['GET']) def billing_usage(): if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 end_date = datetime.now() start_date = end_date - timedelta(days=30) daily_usage = [] current_date = start_date while current_date <= end_date: daily_usage.append({ "timestamp": int(current_date.timestamp()), "daily_usage": 0 }) current_date += timedelta(days=1) return jsonify({ "object": "list", "data": daily_usage, "total_usage": 0 }) @app.route('/handsome/v1/dashboard/billing/subscription', methods=['GET']) def billing_subscription(): if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 total_balance = get_billing_info() return jsonify({ "object": "billing_subscription", "has_payment_method": False, "canceled": False, "canceled_at": None, "delinquent": None, "access_until": int(datetime(9999, 12, 31).timestamp()), "soft_limit": 0, "hard_limit": total_balance, "system_hard_limit": total_balance, "soft_limit_usd": 0, "hard_limit_usd": total_balance, "system_hard_limit_usd": total_balance, "plan": { "name": "SiliconFlow API", "id": "siliconflow-api" }, "account_name": "SiliconFlow User", "po_number": None, "billing_email": None, "tax_ids": [], "billing_address": None, "business_address": None }) if __name__ == '__main__': import json logging.info(f"环境变量:{os.environ}") invalid_keys_global = [] free_keys_global = [] unverified_keys_global = [] valid_keys_global = [] scheduler.start() load_keys() logging.info("首次加载 keys 已手动触发执行") refresh_models() logging.info("首次刷新模型列表已手动触发执行") app.run(debug=False, host='0.0.0.0', port=int(os.environ.get('PORT', 7860)))