Spaces:
Running
Running
from uuid import UUID | |
from fastapi import APIRouter, status, Depends, HTTPException | |
from sqlalchemy.orm import Session | |
from app.api.models.user import ( | |
User, | |
UserResponse, | |
PasswordUpdate, | |
UserDeletedResponse, | |
User_GET_TOKEN, | |
Response_Token, | |
) | |
from app.core.database import get_db | |
from app.core.models.User import UserController | |
from app.utils import logger | |
router = APIRouter() | |
async def create_user(user: User, db: Session = Depends(get_db)): | |
try: | |
USER = UserController(db) | |
USER.create( | |
username=user.username, | |
email=user.email, | |
password=user.password, | |
) | |
return UserResponse.model_validate(USER.details()) | |
except HTTPException as exc: | |
logger.error(exc) | |
raise exc | |
except Exception as exc: | |
logger.error(exc) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc) | |
) from exc | |
async def get_user_token(user: User_GET_TOKEN, db: Session = Depends(get_db)): | |
try: | |
USER = UserController(db) | |
token = USER.read_token(user.email, user.password) | |
return {"token": token} | |
except HTTPException as exc: | |
logger.error(exc) | |
raise exc | |
except Exception as exc: | |
logger.error(exc) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc) | |
) from exc | |
async def read_user(user_id: UUID, db: Session = Depends(get_db)): | |
try: | |
USER = UserController(db) | |
USER.read(user_id) | |
return UserResponse.model_validate(USER.details()) | |
except HTTPException as exc: | |
logger.error(exc) | |
raise exc | |
except Exception as exc: | |
logger.error(exc) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc) | |
) from exc | |
async def update_password( | |
user_id: UUID, password: PasswordUpdate, db: Session = Depends(get_db) | |
): | |
try: | |
USER = UserController(db) | |
USER.update_password(user_id, password.current_password, password.new_password) | |
return UserResponse.model_validate(USER.details()) | |
except HTTPException as exc: | |
logger.error(exc) | |
raise exc | |
except Exception as exc: | |
logger.error(exc) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc) | |
) from exc | |
async def delete_user(user_id: UUID, db: Session = Depends(get_db)): | |
try: | |
USER = UserController(db) | |
USER.delete(user_id) | |
return {"detail": "User Deleted"} | |
except HTTPException as exc: | |
logger.error(exc) | |
raise exc | |
except Exception as exc: | |
logger.error(exc) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc) | |
) from exc | |