File size: 2,530 Bytes
998e17b 7470f85 998e17b 34d2fd7 7ffac05 998e17b 7ffac05 998e17b 5830ea5 7ffac05 998e17b 7ffac05 5830ea5 998e17b 7ffac05 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
from fastapi import HTTPException, Depends
from users.models import User
from core.security import verify_password
from core.security import create_access_token, create_refresh_token, get_token_payload
from core.config import get_settings
from auth.responses import TokenResponse
from datetime import timedelta
from sqlalchemy.orm import Session
from core.database import get_db
settings = get_settings()
async def get_token(data, db:Session):
user = db.query(User).filter(User.email == data.username).first()
if not user:
raise HTTPException(status_code=401,
detail="Invalid Login Credentials",
headers={"WWW-Authenticate": "Bearer"})
if not verify_password(data.password, user.password):
raise HTTPException(status_code=401,
detail="Invalid Login Credentials",
headers={"WWW-Authenticate": "Bearer"})
_verify_user_access(user=user)
return await _get_user_token(user=user)
async def get_refresh_token(token: str, db):
paylod = get_token_payload(token)
user_id = paylod.get("id")
if not user_id:
raise HTTPException(status_code=400,
detail="Invalid Token",
headers={"WWW-Authenticate": "Bearer"}
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=400,
detail="Invalid Token",
headers={"WWW-Authenticate": "Bearer"}
)
_verify_user_access(user=user)
return await _get_user_token(user=user)
def _verify_user_access(user: User):
if not user.is_active:
raise HTTPException(status_code=400,
detail="User is inactive",
headers={"WWW-Authenticate": "Bearer"}
)
return True
async def _get_user_token(user: User):
payload = {"id": user.id, "sub": user.email}
access_token_expiry = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = await create_access_token(data=payload, expiry=access_token_expiry)
# Always generate a refresh token
refresh_token_value = await create_refresh_token(data=payload)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token_value,
expires_in=access_token_expiry.seconds
)
|