SnapFeast / auth /services.py
Testys's picture
Making sure refresh token is returned at all time
7ffac05
raw
history blame
2.53 kB
from fastapi import HTTPException, Depends
from users.models import User
from core.security import verify_password
from core.security import create_access_token, create_refresh_token, get_token_payload
from core.config import get_settings
from auth.responses import TokenResponse
from datetime import timedelta
from sqlalchemy.orm import Session
from core.database import get_db
settings = get_settings()
async def get_token(data, db:Session):
user = db.query(User).filter(User.email == data.username).first()
if not user:
raise HTTPException(status_code=401,
detail="Invalid Login Credentials",
headers={"WWW-Authenticate": "Bearer"})
if not verify_password(data.password, user.password):
raise HTTPException(status_code=401,
detail="Invalid Login Credentials",
headers={"WWW-Authenticate": "Bearer"})
_verify_user_access(user=user)
return await _get_user_token(user=user)
async def get_refresh_token(token: str, db):
paylod = get_token_payload(token)
user_id = paylod.get("id")
if not user_id:
raise HTTPException(status_code=400,
detail="Invalid Token",
headers={"WWW-Authenticate": "Bearer"}
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=400,
detail="Invalid Token",
headers={"WWW-Authenticate": "Bearer"}
)
_verify_user_access(user=user)
return await _get_user_token(user=user)
def _verify_user_access(user: User):
if not user.is_active:
raise HTTPException(status_code=400,
detail="User is inactive",
headers={"WWW-Authenticate": "Bearer"}
)
return True
async def _get_user_token(user: User):
payload = {"id": user.id, "sub": user.email}
access_token_expiry = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = await create_access_token(data=payload, expiry=access_token_expiry)
# Always generate a refresh token
refresh_token_value = await create_refresh_token(data=payload)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token_value,
expires_in=access_token_expiry.seconds
)