|
from fastapi import HTTPException, Depends |
|
from users.models import User |
|
from core.security import verify_password |
|
from core.security import create_access_token, create_refresh_token, get_token_payload |
|
from core.config import get_settings |
|
from auth.responses import TokenResponse |
|
from datetime import timedelta |
|
from sqlalchemy.orm import Session |
|
from core.database import get_db |
|
|
|
|
|
settings = get_settings() |
|
|
|
async def get_token(data, db:Session): |
|
user = db.query(User).filter(User.email == data.username).first() |
|
if not user: |
|
raise HTTPException(status_code=401, |
|
detail="Invalid Login Credentials", |
|
headers={"WWW-Authenticate": "Bearer"}) |
|
|
|
if not verify_password(data.password, user.password): |
|
raise HTTPException(status_code=401, |
|
detail="Invalid Login Credentials", |
|
headers={"WWW-Authenticate": "Bearer"}) |
|
|
|
_verify_user_access(user=user) |
|
|
|
return await _get_user_token(user=user) |
|
|
|
|
|
async def get_refresh_token(token: str, db): |
|
paylod = get_token_payload(token) |
|
user_id = paylod.get("id") |
|
if not user_id: |
|
raise HTTPException(status_code=400, |
|
detail="Invalid Token", |
|
headers={"WWW-Authenticate": "Bearer"} |
|
) |
|
user = db.query(User).filter(User.id == user_id).first() |
|
if not user: |
|
raise HTTPException(status_code=400, |
|
detail="Invalid Token", |
|
headers={"WWW-Authenticate": "Bearer"} |
|
) |
|
_verify_user_access(user=user) |
|
|
|
return await _get_user_token(user=user) |
|
|
|
def _verify_user_access(user: User): |
|
if not user.is_active: |
|
raise HTTPException(status_code=400, |
|
detail="User is inactive", |
|
headers={"WWW-Authenticate": "Bearer"} |
|
) |
|
return True |
|
|
|
async def _get_user_token(user: User): |
|
payload = {"id": user.id, "sub": user.email} |
|
|
|
access_token_expiry = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) |
|
access_token = await create_access_token(data=payload, expiry=access_token_expiry) |
|
|
|
|
|
refresh_token_value = await create_refresh_token(data=payload) |
|
|
|
return TokenResponse( |
|
access_token=access_token, |
|
refresh_token=refresh_token_value, |
|
expires_in=access_token_expiry.seconds |
|
) |
|
|
|
|
|
|
|
|