SnapFeast / core /security.py
Testys's picture
Update core/security.py
bb31c03 verified
raw
history blame
2.43 kB
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends, HTTPException
from datetime import timedelta, datetime
from jose import JWTError, jwt
from core.config import get_settings
from sqlalchemy.orm import Session
from core.database import get_db
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2scheme = OAuth2PasswordBearer(tokenUrl="auth/token/")
def get_password_hash(password):
return pwd_context.hash(password)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
async def create_access_token(data:dict, expiry:timedelta):
payload = data.copy()
expire = datetime.utcnow() + expiry
payload.update({"exp": expire})
token = jwt.encode(payload,
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM)
return token
async def create_refresh_token(data:dict):
payload = data.copy()
token = jwt.encode(payload,
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM)
return token
def get_token_payload(token:str):
try:
payload = jwt.decode(token,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM])
return payload
except JWTError:
return None
async def get_current_user(token: str = Depends(oauth2scheme), db: Session = Depends(get_db)):
from users.services import get_user_by_email # Local import
try:
payload = get_token_payload(token)
email = payload.get("sub")
if email is None:
raise HTTPException(status_code=401,
detail="Invalid Token",
headers={"WWW-Authenticate": "Bearer"})
except JWTError:
raise HTTPException(status_code=401,
detail="Invalid Token",
headers={"WWW-Authenticate": "Bearer"}
)
user = get_user_by_email(email, db=db)
if user is None:
raise HTTPException(status_code=401,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"}
)
return user