pulse-be / routers /profile.py
khawir's picture
re-enabled camera exist and vector exist checks
3530bc5
from datetime import datetime
from typing import Annotated
from fastapi import Depends, APIRouter
from sqlmodel import Session
from models import User, UserPublicMe, UserPublicMeWith, UserUpdateMe
from models import Site, SiteCreate, SitePublicMe, SitePublicMeWith, SiteUpdate
from models import Guest, GuestCreateMe, GuestPublicMe, GuestPublicWith, GuestUpdateMe, SitePublicMeWithHosts
from core import crud, utils
router = APIRouter(
prefix="/dashboard",
tags=["dashboard"],
)
# @router.get("/", response_model=UserPublicMeWith)
# def read_me(current_user: Annotated[User, Depends(crud.get_current_active_user)]):
# return current_user
@router.get("/", response_model=UserPublicMeWith)
def read_dashboard(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)]
):
session.add(current_user)
return current_user
@router.patch("/", response_model=UserPublicMe)
def update_dashboard(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)],
user: UserUpdateMe
):
return crud.edit_user(session, current_user, user)
@router.delete("/")
def delete_dashboard(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)],
):
session.delete(current_user)
session.commit()
return {"ok": True}
# --------------------
# ------ Sites -------
# --------------------
@router.post("/sites", response_model=SitePublicMe)
def create_site(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)],
site: SiteCreate
):
crud.camera_exists(session, site)
extra_data = {"created_at": datetime.now(),
"updated_at": datetime.now()}
db_site = Site.model_validate(site, update=extra_data)
db_site = crud.push_site(session, db_site)
session.add(current_user)
current_user.sites.append(db_site)
session.commit()
return db_site
@router.get("/sites/{site_id}", response_model=SitePublicMeWith)
def read_site(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)],
site_id: int
):
current_site = crud.get_current_site(session, current_user, site_id)
# access_token = utils.get_key()
# in_url= utils.get_feed_url(access_token, current_site.in_camera)
# out_url= utils.get_feed_url(access_token, current_site.out_camera)
# current_site.in_url = in_url
# current_site.out_url = out_url
# session.add(current_site)
# session.commit()
return current_site
@router.patch("/sites/{site_id}", response_model=SitePublicMe)
def update_site(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)],
site_id: int,
site: SiteUpdate
):
db_site = crud.get_current_site(session, current_user, site_id)
crud.camera_exists(session, db_site)
site_data = site.model_dump(exclude_unset=True)
extra_data = {"updated_at": datetime.now()}
db_site.sqlmodel_update(site_data, update=extra_data)
return crud.push_site(session, db_site)
@router.delete("/sites/{site_id}")
def delete_site(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)],
site_id: int):
db_site = crud.get_current_site(session, current_user, site_id)
session.delete(db_site)
session.commit()
return {"ok": True}
# --------------------
# ------ Hosts -------
# --------------------
@router.post("/sites/{side_id}/hosts", response_model=GuestPublicMe)
def create_host(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)],
site_id: int,
host: GuestCreateMe
):
_ = crud.get_current_site(session, current_user, site_id)
if host.vector is not None:
crud.vector_exists(session, host)
extra_data = {"site_id": site_id}
db_host = Guest.model_validate(host, update=extra_data)
session.add(db_host)
session.commit()
session.refresh(db_host)
return db_host
@router.get("/sites/{site_id}/hosts", response_model=SitePublicMeWithHosts)
def read_hosts(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)],
site_id: int,
):
current_site = crud.get_current_site(session, current_user, site_id)
return current_site
@router.get("/sites/{site_id}/hosts/{host_id}", response_model=GuestPublicWith)
def read_host(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)],
site_id: int,
host_id: int,
):
current_site = crud.get_current_site(session, current_user, site_id)
return crud.get_host_of_site(session, current_site, host_id)
@router.patch("/sites/{site_id}/hosts/{host_id}", response_model=GuestPublicMe)
def update_host(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)],
site_id: int,
host_id: int,
host: GuestUpdateMe
):
current_site = crud.get_current_site(session, current_user, site_id)
db_host = crud.get_host_of_site(session, current_site, host_id)
if host.vector is not None:
crud.vector_exists(session, host)
host_data = host.model_dump(exclude_unset=True)
extra_data = {"site_id": site_id,
"updated_at": datetime.now()}
db_host.sqlmodel_update(host_data, update=extra_data)
session.add(db_host)
session.commit()
session.refresh(db_host)
return db_host
@router.delete("/sites/{site_id}/hosts/{host_id}")
def delete_host(*,
session: Session = Depends(utils.get_session),
current_user: Annotated[User, Depends(crud.get_current_active_user)],
site_id: int,
host_id: int
):
current_site = crud.get_current_site(session, current_user, site_id)
db_host = crud.get_host_of_site(session, current_site, host_id)
session.delete(db_host)
session.commit()
return {"ok": True}