sone-latest / app.py
yangtb24's picture
Update app.py
16b7f0b verified
raw
history blame
2.3 kB
import os
import time
import logging
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
# 配置日志记录
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
API_ENDPOINT = "https://api.siliconflow.cn/v1/usage/credit_summary"
app = Flask(__name__)
def get_credit_summary(api_key):
"""
使用 API 密钥获取额度信息。
Args:
api_key: 用于访问 API 的密钥。
Returns:
包含额度信息的字典,如果请求失败则返回 None。
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
response = requests.get(API_ENDPOINT, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}")
return None
def load_keys():
"""
从环境变量中加载 keys,并记录到日志中,同时获取并记录每个 key 的额度信息。
"""
keys_str = os.environ.get("KEYS")
if keys_str:
keys = [key.strip() for key in keys_str.split(',')]
logging.info(f"加载的 keys:{keys}")
for key in keys:
credit_summary = get_credit_summary(key)
if credit_summary:
logging.info(f"API Key:{key},额度信息:{credit_summary}")
else:
logging.warning("环境变量 KEYS 未设置。")
# 创建一个后台调度器
scheduler = BackgroundScheduler()
# 添加定时任务,每小时执行一次 load_keys 函数
scheduler.add_job(load_keys, 'interval', hours=1)
@app.route('/')
def index():
"""
处理根路由的访问请求。
"""
return "<h1>Welcome to SiliconFlow</h1>"
if __name__ == '__main__':
logging.info(f"===== Application Startup at {time.strftime('%Y-%m-%d %H:%M:%S')} =====")
# 启动调度器
scheduler.start()
# 手动触发一次 load_keys 任务
load_keys()
logging.info("首次加载 keys 已手动触发执行")
# 启动 Flask 应用,监听所有 IP 的 5000 端口
app.run(debug=False, host='0.0.0.0', port=5000)