Spaces:
Running
Running
# 用于刷新idToken的定时任务 | |
import asyncio | |
import os | |
import time | |
from typing import Optional | |
from datetime import datetime | |
from core.utils import sign_in_with_idp, handle_firebase_response, refresh_token_via_rest | |
class TokenManager: | |
def __init__(self): | |
self.id_token: Optional[str] = None | |
self.last_refresh_time: Optional[float] = None | |
self.refresh_interval = 30 * 60 # 30分钟,单位:秒 | |
self.is_running = False | |
async def get_token(self) -> str: | |
""" | |
获取当前的 idToken,如果不存在或已过期则刷新 | |
""" | |
if not self.id_token or self._should_refresh(): | |
await self.refresh_token() | |
return self.id_token | |
def _should_refresh(self) -> bool: | |
""" | |
检查是否需要刷新 token | |
""" | |
if not self.last_refresh_time: | |
return True | |
return time.time() - self.last_refresh_time >= self.refresh_interval | |
async def refresh_token(self): | |
""" | |
刷新 idToken | |
""" | |
try: | |
if os.getenv("REFRESH_TOKEN", "") == "" or os.getenv("REFRESH_TOKEN", "") == "None": | |
response = await sign_in_with_idp() | |
result = await handle_firebase_response(response) # idToken 实际就是bearer token | |
else: | |
result = await refresh_token_via_rest(os.getenv("REFRESH_TOKEN")) | |
if result is not None: | |
self.id_token = result | |
# 修改配置中的 TOKEN | |
# print(f"Before Token is {os.getenv('TOKEN', '')}") | |
os.environ["TOKEN"] = self.id_token | |
# print(f"Now Token is {os.getenv('TOKEN', '')}") | |
self.last_refresh_time = time.time() | |
print(f"Token refreshed at {datetime.now()}") | |
else: | |
print(f"Failed to refresh token: {result['error']}") | |
except Exception as e: | |
print(f"Error refreshing token: {str(e)}") | |
async def start_auto_refresh(self): | |
""" | |
启动自动刷新任务 | |
""" | |
if self.is_running: | |
return | |
self.is_running = True | |
while self.is_running: | |
try: | |
await self.refresh_token() | |
# 等待到下次刷新时间 | |
await asyncio.sleep(self.refresh_interval) | |
except Exception as e: | |
print(f"Auto refresh error: {str(e)}") | |
# 发生错误时等待短暂时间后重试 | |
await asyncio.sleep(60) | |
async def stop_auto_refresh(self): | |
""" | |
停止自动刷新任务 | |
""" | |
self.is_running = False | |
# 使用示例 | |
# async def main(): | |
# # 创建 TokenManager 实例 | |
# token_manager = TokenManager() | |
# | |
# try: | |
# # 启动自动刷新任务 | |
# refresh_task = asyncio.create_task(token_manager.start_auto_refresh()) | |
# | |
# # 模拟应用运行 | |
# while True: | |
# # 获取当前 token | |
# token = await token_manager.get_token() | |
# print(f"Current token: {token[:20]}...") | |
# | |
# # 等待一段时间再次获取 | |
# await asyncio.sleep(300) # 每5分钟打印一次当前token | |
# | |
# except KeyboardInterrupt: | |
# # 处理 Ctrl+C | |
# await token_manager.stop_auto_refresh() | |
# await refresh_task | |
# | |
# # 运行示例 | |
# if __name__ == "__main__": | |
# asyncio.run(main()) | |