File size: 2,101 Bytes
571122c
 
60cc4ec
571122c
 
 
 
 
 
 
 
 
60cc4ec
571122c
60cc4ec
571122c
 
60cc4ec
571122c
 
60cc4ec
571122c
 
 
60cc4ec
571122c
60cc4ec
571122c
60cc4ec
571122c
60cc4ec
571122c
60cc4ec
571122c
 
60cc4ec
 
 
 
 
 
 
 
 
bb31c03
571122c
 
60cc4ec
 
 
571122c
60cc4ec
571122c
60cc4ec
 
571122c
 
60cc4ec
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends, HTTPException, status
from datetime import timedelta, datetime
from jose import JWTError, jwt
from core.config import get_settings
from sqlalchemy.orm import Session
from core.database import get_db

settings = get_settings()

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token/")

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

async def create_access_token(data: dict, expiry: timedelta) -> str:
    payload = data.copy()
    expire = datetime.utcnow() + expiry
    payload.update({"exp": expire})
    return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)

async def create_refresh_token(data: dict) -> str:
    payload = data.copy()
    return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)

def get_token_payload(token: str) -> dict:
    try:
        return jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
    except JWTError:
        return None

async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    from users.services import get_user_by_email  # Local import to avoid circular dependency

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        payload = get_token_payload(token)
        if payload is None:
            raise credentials_exception
        email: str = payload.get("sub")
        if email is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = get_user_by_email(email, db=db)
    if user is None:
        raise credentials_exception
    return user