Spaces:
Running
Running
import os | |
import time | |
import logging | |
import requests | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from flask import Flask | |
# 配置日志记录 | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s') | |
API_ENDPOINT = "https://api.siliconflow.cn/v1/usage/credit_summary" | |
app = Flask(__name__) | |
def get_credit_summary(api_key): | |
""" | |
使用 API 密钥获取额度信息。 | |
Args: | |
api_key: 用于访问 API 的密钥。 | |
Returns: | |
包含额度信息的字典,如果请求失败则返回 None。 | |
""" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
try: | |
response = requests.get(API_ENDPOINT, headers=headers) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}") | |
return None | |
def load_keys(): | |
""" | |
从环境变量中加载 keys,并记录到日志中,同时获取并记录每个 key 的额度信息。 | |
""" | |
keys_str = os.environ.get("KEYS") | |
if keys_str: | |
keys = [key.strip() for key in keys_str.split(',')] | |
logging.info(f"加载的 keys:{keys}") | |
for key in keys: | |
credit_summary = get_credit_summary(key) | |
if credit_summary: | |
logging.info(f"API Key:{key},额度信息:{credit_summary}") | |
else: | |
logging.warning("环境变量 KEYS 未设置。") | |
# 创建一个后台调度器 | |
scheduler = BackgroundScheduler() | |
# 添加定时任务,每小时执行一次 load_keys 函数 | |
scheduler.add_job(load_keys, 'interval', hours=1) | |
def index(): | |
""" | |
处理根路由的访问请求。 | |
""" | |
return "<h1>Welcome to SiliconFlow</h1>" | |
if __name__ == '__main__': | |
logging.info(f"===== Application Startup at {time.strftime('%Y-%m-%d %H:%M:%S')} =====") | |
# 启动调度器 | |
scheduler.start() | |
# 手动触发一次 load_keys 任务 | |
load_keys() | |
logging.info("首次加载 keys 已手动触发执行") | |
# 启动 Flask 应用,监听所有 IP 的 5000 端口 | |
app.run(debug=False, host='0.0.0.0', port=5000) | |