File size: 2,451 Bytes
998e17b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from fastapi import HTTPException, Depends
from users.models import User
from core.security import verify_password
from core.security import create_access_token, create_refresh_token, get_token_payload
from core.config import get_settings
from auth.responses import TokenResponse
from datetime import timedelta
from sqlalchemy.orm import Session
from core.database import get_db


settings = get_settings()

async def get_token(data, db:Session):
    user = db.query(User).filter(User.email == data.username).first()
    if not user:
        raise HTTPException(status_code=401,
                             detail="Invalid Login Credentials",
                             headers={"WWW-Authenticate": "Bearer"})
    
    if not verify_password(data.password, user.password):
        raise HTTPException(status_code=401,
                             detail="Invalid Login Credentials",
                             headers={"WWW-Authenticate": "Bearer"})
    
    _verify_user_access(user=user)

    return _get_user_token(user=user)


async def get_refresh_token(token: str, db):
    paylod = get_token_payload(token)
    user_id = paylod.get("id")
    if not user_id:
        raise HTTPException(status_code=400,
                            detail="Invalid Token",
                            headers={"WWW-Authenticate": "Bearer"}
                            )
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=400,
                            detail="Invalid Token",
                            headers={"WWW-Authenticate": "Bearer"}
                            )

def _verify_user_access(user: User):
    if not user.is_active:
        raise HTTPException(status_code=400,
                            detail="User is inactive",
                            headers={"WWW-Authenticate": "Bearer"}
                            )
    return True

async def _get_user_token(user:User, refresh_token: bool = False):
    payload = {"id": user.id, "sub": user.email}

    access_token_expiry = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)

    access_token = await create_access_token(data=payload, expiry=access_token_expiry)
    if not refresh_token:
        refresh_token = await create_refresh_token(data=payload)

    return TokenResponse(
        access_token=access_token,
        refresh_token=refresh_token,
        expires_in= access_token_expiry.seconds
        )