Spaces:
Sleeping
Sleeping
import os | |
import time | |
import httpx | |
from core.conf import settings | |
_token_cache = { | |
"access_token": None, | |
"expires_at": 0, | |
} | |
API_BASE_URL_ACCESS_TOKEN = settings.API_BASE_URL_ACCESS_TOKEN | |
async def get_access_token() -> str: | |
current_time = int(time.time()) | |
if _token_cache["access_token"] is None or current_time >= _token_cache["expires_at"]: | |
await refresh_access_token() | |
return _token_cache["access_token"] | |
async def refresh_access_token(): | |
async with httpx.AsyncClient() as client: | |
response = await client.get(url=API_BASE_URL_ACCESS_TOKEN) | |
response.raise_for_status() | |
data = response.json() | |
access_token = data.get("data") | |
expires_in = 3600 | |
_token_cache["access_token"] = access_token | |
_token_cache["expires_at"] = int(time.time()) + expires_in - 60 | |