Spaces:
Running
Running
File size: 1,181 Bytes
df37f6e 5564ecb df37f6e 5564ecb d8fb01c df37f6e 5564ecb df37f6e 5564ecb df37f6e 5564ecb d8fb01c 5564ecb df37f6e 5564ecb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
import os
import time
import httpx
from loguru import logger
from core.conf import settings
_token_cache = {
"access_token": None,
"expires_at": 0,
}
API_BASE_URL_ACCESS_TOKEN = settings.API_BASE_URL_ACCESS_TOKEN
# API_BASE_URL_ACCESS_TOKEN = "https://api.futabus.vn/identity/api/token/anonymous-token"
async def get_access_token() -> str:
current_time = int(time.time())
if (
_token_cache["access_token"] is None
or current_time >= _token_cache["expires_at"]
):
await refresh_access_token()
return _token_cache["access_token"]
async def refresh_access_token():
async with httpx.AsyncClient() as client:
response = await client.get(url=API_BASE_URL_ACCESS_TOKEN)
response.raise_for_status()
data = response.json()
access_token = data.get("data", "")
if not access_token:
logger.error("Access token is empty or not found in the response")
raise ValueError("Access token is empty or not found in the response")
expires_in = 3600
_token_cache["access_token"] = access_token
_token_cache["expires_at"] = int(time.time()) + expires_in - 60
|