thinkbuddy2api / core /refresh_token.py
kevin
精简日志输出
0a3d38f
# 用于刷新idToken的定时任务
import asyncio
import os
import time
from typing import Optional
from datetime import datetime
from core.utils import sign_in_with_idp, handle_firebase_response, refresh_token_via_rest
class TokenManager:
def __init__(self):
self.id_token: Optional[str] = None
self.last_refresh_time: Optional[float] = None
self.refresh_interval = 30 * 60 # 30分钟,单位:秒
self.is_running = False
async def get_token(self) -> str:
"""
获取当前的 idToken,如果不存在或已过期则刷新
"""
if not self.id_token or self._should_refresh():
await self.refresh_token()
return self.id_token
def _should_refresh(self) -> bool:
"""
检查是否需要刷新 token
"""
if not self.last_refresh_time:
return True
return time.time() - self.last_refresh_time >= self.refresh_interval
async def refresh_token(self):
"""
刷新 idToken
"""
try:
if os.getenv("REFRESH_TOKEN", "") == "" or os.getenv("REFRESH_TOKEN", "") == "None":
response = await sign_in_with_idp()
result = await handle_firebase_response(response) # idToken 实际就是bearer token
else:
result = await refresh_token_via_rest(os.getenv("REFRESH_TOKEN"))
if result is not None:
self.id_token = result
# 修改配置中的 TOKEN
# print(f"Before Token is {os.getenv('TOKEN', '')}")
os.environ["TOKEN"] = self.id_token
# print(f"Now Token is {os.getenv('TOKEN', '')}")
self.last_refresh_time = time.time()
print(f"Token refreshed at {datetime.now()}")
else:
print(f"Failed to refresh token: {result['error']}")
except Exception as e:
print(f"Error refreshing token: {str(e)}")
async def start_auto_refresh(self):
"""
启动自动刷新任务
"""
if self.is_running:
return
self.is_running = True
while self.is_running:
try:
await self.refresh_token()
# 等待到下次刷新时间
await asyncio.sleep(self.refresh_interval)
except Exception as e:
print(f"Auto refresh error: {str(e)}")
# 发生错误时等待短暂时间后重试
await asyncio.sleep(60)
async def stop_auto_refresh(self):
"""
停止自动刷新任务
"""
self.is_running = False
# 使用示例
# async def main():
# # 创建 TokenManager 实例
# token_manager = TokenManager()
#
# try:
# # 启动自动刷新任务
# refresh_task = asyncio.create_task(token_manager.start_auto_refresh())
#
# # 模拟应用运行
# while True:
# # 获取当前 token
# token = await token_manager.get_token()
# print(f"Current token: {token[:20]}...")
#
# # 等待一段时间再次获取
# await asyncio.sleep(300) # 每5分钟打印一次当前token
#
# except KeyboardInterrupt:
# # 处理 Ctrl+C
# await token_manager.stop_auto_refresh()
# await refresh_task
#
# # 运行示例
# if __name__ == "__main__":
# asyncio.run(main())