SnapFeast / core /security.py
Testys's picture
Push new edits
60cc4ec
raw
history blame
2.1 kB
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends, HTTPException, status
from datetime import timedelta, datetime
from jose import JWTError, jwt
from core.config import get_settings
from sqlalchemy.orm import Session
from core.database import get_db
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token/")
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
async def create_access_token(data: dict, expiry: timedelta) -> str:
payload = data.copy()
expire = datetime.utcnow() + expiry
payload.update({"exp": expire})
return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
async def create_refresh_token(data: dict) -> str:
payload = data.copy()
return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
def get_token_payload(token: str) -> dict:
try:
return jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
except JWTError:
return None
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
from users.services import get_user_by_email # Local import to avoid circular dependency
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = get_token_payload(token)
if payload is None:
raise credentials_exception
email: str = payload.get("sub")
if email is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user_by_email(email, db=db)
if user is None:
raise credentials_exception
return user