File size: 2,690 Bytes
998e17b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7470f85
998e17b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34d2fd7
 
 
998e17b
 
 
 
 
 
 
 
 
5830ea5
998e17b
 
 
 
5830ea5
 
 
 
 
 
998e17b
 
 
5830ea5
 
 
 
998e17b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from fastapi import HTTPException, Depends
from users.models import User
from core.security import verify_password
from core.security import create_access_token, create_refresh_token, get_token_payload
from core.config import get_settings
from auth.responses import TokenResponse
from datetime import timedelta
from sqlalchemy.orm import Session
from core.database import get_db


settings = get_settings()

async def get_token(data, db:Session):
    user = db.query(User).filter(User.email == data.username).first()
    if not user:
        raise HTTPException(status_code=401,
                             detail="Invalid Login Credentials",
                             headers={"WWW-Authenticate": "Bearer"})
    
    if not verify_password(data.password, user.password):
        raise HTTPException(status_code=401,
                             detail="Invalid Login Credentials",
                             headers={"WWW-Authenticate": "Bearer"})
    
    _verify_user_access(user=user)

    return await _get_user_token(user=user)


async def get_refresh_token(token: str, db):
    paylod = get_token_payload(token)
    user_id = paylod.get("id")
    if not user_id:
        raise HTTPException(status_code=400,
                            detail="Invalid Token",
                            headers={"WWW-Authenticate": "Bearer"}
                            )
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=400,
                            detail="Invalid Token",
                            headers={"WWW-Authenticate": "Bearer"}
                            )
    _verify_user_access(user=user)
    
    return await _get_user_token(user=user, refresh_token=True)

def _verify_user_access(user: User):
    if not user.is_active:
        raise HTTPException(status_code=400,
                            detail="User is inactive",
                            headers={"WWW-Authenticate": "Bearer"}
                            )
    return True

async def _get_user_token(user: User, refresh_token: bool = False):
    payload = {"id": user.id, "sub": user.email}

    access_token_expiry = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = await create_access_token(data=payload, expiry=access_token_expiry)

    # Ensure refresh_token is a string or None
    if refresh_token:
        refresh_token_value = await create_refresh_token(data=payload)
    else:
        refresh_token_value = None

    return TokenResponse(
        access_token=access_token,
        refresh_token=refresh_token_value,  # This is now a string or None
        expires_in=access_token_expiry.seconds
    )