yangtb24 commited on
Commit
bae67bf
·
verified ·
1 Parent(s): ddaefda

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +54 -20
app.py CHANGED
@@ -1,35 +1,69 @@
1
  import os
2
  import time
3
  import logging
4
- from threading import Thread
 
5
 
6
- # 配置日志记录器
7
  logging.basicConfig(level=logging.INFO,
8
  format='%(asctime)s - %(levelname)s - %(message)s')
9
 
10
- def load_keys():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  """
12
- 从环境变量中加载 API 密钥,并在日志中输出。
13
  """
14
  keys_str = os.environ.get("KEYS")
15
  if keys_str:
16
  keys = [key.strip() for key in keys_str.split(',')]
17
- logging.info("已加载的 API 密钥:")
18
  for key in keys:
19
- logging.info(key)
 
 
 
 
20
  else:
21
- logging.warning("未找到名为 KEYS 的环境变量")
22
 
23
- def scheduled_task():
24
- """
25
- 每小时执行一次的定时任务。
26
- """
27
- while True:
28
- load_keys()
29
- # 休眠一小时
30
- time.sleep(3600)
31
-
32
- # 启动定时任务
33
- thread = Thread(target=scheduled_task)
34
- thread.daemon = True
35
- thread.start()
 
 
 
 
1
  import os
2
  import time
3
  import logging
4
+ import requests
5
+ from apscheduler.schedulers.background import BackgroundScheduler
6
 
7
+ # 配置日志记录
8
  logging.basicConfig(level=logging.INFO,
9
  format='%(asctime)s - %(levelname)s - %(message)s')
10
 
11
+ API_ENDPOINT = "https://api.siliconflow.cn/v1/usage/status"
12
+
13
+ def get_credit_usage(key):
14
+ """
15
+ 获取单个 key 的额度使用情况。
16
+
17
+ Args:
18
+ key: 要查询的 API key。
19
+
20
+ Returns:
21
+ 包含额度信息的字典,如果出错则返回 None。
22
+ """
23
+ headers = {
24
+ "Authorization": f"Bearer {key}"
25
+ }
26
+ try:
27
+ response = requests.get(API_ENDPOINT, headers=headers)
28
+ response.raise_for_status() # 检查请求是否成功
29
+ data = response.json()
30
+ return data
31
+ except requests.exceptions.RequestException as e:
32
+ logging.error(f"请求 API 时出错,key:{key}, 错误信息:{e}")
33
+ except ValueError as e:
34
+ logging.error(f"解析 JSON 响应时出错,key:{key}, 错误信息:{e}")
35
+ return None
36
+
37
+ def load_keys_and_get_usage():
38
  """
39
+ 从环境变量中加载 keys,并获取每个 key 的额度使用情况,然后记录到日志中。
40
  """
41
  keys_str = os.environ.get("KEYS")
42
  if keys_str:
43
  keys = [key.strip() for key in keys_str.split(',')]
44
+ logging.info(f"加载的 keys:{keys}")
45
  for key in keys:
46
+ usage_data = get_credit_usage(key)
47
+ if usage_data:
48
+ logging.info(f"Key: {key}, 额度信息: {usage_data}")
49
+ else:
50
+ logging.warning(f"获取 key:{key} 的额度信息失败。")
51
  else:
52
+ logging.warning("环境变量 KEYS 未设置。")
53
 
54
+ # 创建一个后台调度器
55
+ scheduler = BackgroundScheduler()
56
+
57
+ # 添加定时任务,每小时执行一次 load_keys_and_get_usage 函数
58
+ scheduler.add_job(load_keys_and_get_usage, 'interval', hours=1)
59
+
60
+ # 启动调度器
61
+ scheduler.start()
62
+
63
+ # 为了保持程序运行,添加一个循环
64
+ if __name__ == '__main__':
65
+ try:
66
+ while True:
67
+ time.sleep(120)
68
+ except (KeyboardInterrupt, SystemExit):
69
+ scheduler.shutdown()