|
from datetime import datetime |
|
from typing import Annotated |
|
from fastapi import Depends, APIRouter |
|
from sqlmodel import Session |
|
from models import User, UserPublicMe, UserPublicMeWith, UserUpdateMe |
|
from models import Site, SiteCreate, SitePublicMe, SitePublicMeWith, SiteUpdate |
|
from models import Guest, GuestCreateMe, GuestPublicMe, GuestPublicWith, GuestUpdateMe, SitePublicMeWithHosts |
|
from core import crud, utils |
|
|
|
router = APIRouter( |
|
prefix="/dashboard", |
|
tags=["dashboard"], |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/", response_model=UserPublicMeWith) |
|
def read_dashboard(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)] |
|
): |
|
session.add(current_user) |
|
return current_user |
|
|
|
|
|
@router.patch("/", response_model=UserPublicMe) |
|
def update_dashboard(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)], |
|
user: UserUpdateMe |
|
): |
|
return crud.edit_user(session, current_user, user) |
|
|
|
|
|
|
|
@router.delete("/") |
|
def delete_dashboard(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)], |
|
): |
|
session.delete(current_user) |
|
session.commit() |
|
return {"ok": True} |
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/sites", response_model=SitePublicMe) |
|
def create_site(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)], |
|
site: SiteCreate |
|
): |
|
crud.camera_exists(session, site) |
|
extra_data = {"created_at": datetime.now(), |
|
"updated_at": datetime.now()} |
|
db_site = Site.model_validate(site, update=extra_data) |
|
db_site = crud.push_site(session, db_site) |
|
session.add(current_user) |
|
current_user.sites.append(db_site) |
|
session.commit() |
|
return db_site |
|
|
|
|
|
|
|
@router.get("/sites/{site_id}", response_model=SitePublicMeWith) |
|
def read_site(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)], |
|
site_id: int |
|
): |
|
current_site = crud.get_current_site(session, current_user, site_id) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return current_site |
|
|
|
|
|
|
|
@router.patch("/sites/{site_id}", response_model=SitePublicMe) |
|
def update_site(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)], |
|
site_id: int, |
|
site: SiteUpdate |
|
): |
|
db_site = crud.get_current_site(session, current_user, site_id) |
|
crud.camera_exists(session, db_site) |
|
site_data = site.model_dump(exclude_unset=True) |
|
extra_data = {"updated_at": datetime.now()} |
|
db_site.sqlmodel_update(site_data, update=extra_data) |
|
return crud.push_site(session, db_site) |
|
|
|
|
|
|
|
@router.delete("/sites/{site_id}") |
|
def delete_site(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)], |
|
site_id: int): |
|
db_site = crud.get_current_site(session, current_user, site_id) |
|
session.delete(db_site) |
|
session.commit() |
|
return {"ok": True} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/sites/{side_id}/hosts", response_model=GuestPublicMe) |
|
def create_host(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)], |
|
site_id: int, |
|
host: GuestCreateMe |
|
): |
|
_ = crud.get_current_site(session, current_user, site_id) |
|
if host.vector is not None: |
|
crud.vector_exists(session, host) |
|
extra_data = {"site_id": site_id} |
|
db_host = Guest.model_validate(host, update=extra_data) |
|
session.add(db_host) |
|
session.commit() |
|
session.refresh(db_host) |
|
return db_host |
|
|
|
|
|
|
|
@router.get("/sites/{site_id}/hosts", response_model=SitePublicMeWithHosts) |
|
def read_hosts(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)], |
|
site_id: int, |
|
): |
|
current_site = crud.get_current_site(session, current_user, site_id) |
|
return current_site |
|
|
|
|
|
|
|
@router.get("/sites/{site_id}/hosts/{host_id}", response_model=GuestPublicWith) |
|
def read_host(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)], |
|
site_id: int, |
|
host_id: int, |
|
): |
|
current_site = crud.get_current_site(session, current_user, site_id) |
|
return crud.get_host_of_site(session, current_site, host_id) |
|
|
|
|
|
|
|
@router.patch("/sites/{site_id}/hosts/{host_id}", response_model=GuestPublicMe) |
|
def update_host(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)], |
|
site_id: int, |
|
host_id: int, |
|
host: GuestUpdateMe |
|
): |
|
current_site = crud.get_current_site(session, current_user, site_id) |
|
db_host = crud.get_host_of_site(session, current_site, host_id) |
|
if host.vector is not None: |
|
crud.vector_exists(session, host) |
|
host_data = host.model_dump(exclude_unset=True) |
|
extra_data = {"site_id": site_id, |
|
"updated_at": datetime.now()} |
|
db_host.sqlmodel_update(host_data, update=extra_data) |
|
session.add(db_host) |
|
session.commit() |
|
session.refresh(db_host) |
|
return db_host |
|
|
|
|
|
@router.delete("/sites/{site_id}/hosts/{host_id}") |
|
def delete_host(*, |
|
session: Session = Depends(utils.get_session), |
|
current_user: Annotated[User, Depends(crud.get_current_active_user)], |
|
site_id: int, |
|
host_id: int |
|
): |
|
current_site = crud.get_current_site(session, current_user, site_id) |
|
db_host = crud.get_host_of_site(session, current_site, host_id) |
|
session.delete(db_host) |
|
session.commit() |
|
return {"ok": True} |