from fastapi import HTTPException, Depends from users.models import User from core.security import verify_password from core.security import create_access_token, create_refresh_token, get_token_payload from core.config import get_settings from auth.responses import TokenResponse from datetime import timedelta from sqlalchemy.orm import Session from core.database import get_db settings = get_settings() async def get_token(data, db:Session): user = db.query(User).filter(User.email == data.username).first() if not user: raise HTTPException(status_code=401, detail="Invalid Login Credentials", headers={"WWW-Authenticate": "Bearer"}) if not verify_password(data.password, user.password): raise HTTPException(status_code=401, detail="Invalid Login Credentials", headers={"WWW-Authenticate": "Bearer"}) _verify_user_access(user=user) return await _get_user_token(user=user) async def get_refresh_token(token: str, db): paylod = get_token_payload(token) user_id = paylod.get("id") if not user_id: raise HTTPException(status_code=400, detail="Invalid Token", headers={"WWW-Authenticate": "Bearer"} ) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=400, detail="Invalid Token", headers={"WWW-Authenticate": "Bearer"} ) _verify_user_access(user=user) return await _get_user_token(user=user) def _verify_user_access(user: User): if not user.is_active: raise HTTPException(status_code=400, detail="User is inactive", headers={"WWW-Authenticate": "Bearer"} ) return True async def _get_user_token(user: User): payload = {"id": user.id, "sub": user.email} access_token_expiry = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = await create_access_token(data=payload, expiry=access_token_expiry) # Always generate a refresh token refresh_token_value = await create_refresh_token(data=payload) return TokenResponse( access_token=access_token, refresh_token=refresh_token_value, expires_in=access_token_expiry.seconds )