import asyncio from datetime import datetime import logging import cv2 import numpy as np from pathlib import Path import torch from starlette.middleware import Middleware from starlette.responses import StreamingResponse, Response from starlette.requests import Request from starlette.routing import Mount, Route from starlette.staticfiles import StaticFiles from starlette.templating import Jinja2Templates from sse_starlette import EventSourceResponse from asgi_htmx import HtmxMiddleware from asgi_htmx import HtmxRequest from ultralytics import YOLO from ultralytics_solutions_modified import object_counter, speed_estimation from vidgear.gears import CamGear from vidgear.gears.asyncio import WebGear from vidgear.gears.asyncio.helper import reducer from helper import draw_text, try_site, make_table_from_dict HERE = Path(__file__).parent static = StaticFiles(directory=HERE / ".vidgear/webgear/static") templates = Jinja2Templates(directory=HERE / ".vidgear/webgear/templates") EVT_STREAM_DELAY_SEC = 0.05 # second RETRY_TIMEOUT_MILSEC = 15000 # milisecond # Create and configure logger # logger = logging.getLogger(__name__).addHandler(logging.NullHandler()) logging.basicConfig( format='%(asctime)s %(name)-8s->%(module)-20s->%(funcName)-20s:%(lineno)-4s::%(levelname)-8s %(message)s', level=logging.INFO ) class DemoCase: def __init__( self, FRAME_WIDTH: int = 1280, FRAME_HEIGHT: int = 720, YOLO_VERBOSE: bool = True ): self.FRAME_WIDTH: int = FRAME_WIDTH self.FRAME_HEIGHT: int = FRAME_HEIGHT self.YOLO_VERBOSE: bool = YOLO_VERBOSE # predefined yolov8 model references self.model_dict: dict = { "y8nano": "./data/models/yolov8n.pt", "y8small": "./data/models/yolov8s.pt", "y8medium": "./data/models/yolov8m.pt", "y8large": "./data/models/yolov8l.pt", "y8huge": "./data/models/yolov8x.pt", } self.model_choice_default: str = "y8small" # predefined youtube live stream urls self.url_dict: dict = { "Peace Bridge US": "https://youtu.be/9En2186vo5g", "Peace Bridge CA": "https://youtu.be/WPMgP2C3_co", "San Marcos TX": "https://youtu.be/E8LsKcVpL5A", "4Corners Downtown": "https://youtu.be/ByED80IKdIU", "Gangnam Seoul": "https://youtu.be/JbnJAsk1zII", "Time Square NY": "https://youtu.be/UVftxDFol90", "Port Everglades-1": "https://youtu.be/67-73mgWDf0", "Port Everglades-2": "https://youtu.be/Nhuu1QsW5LI", "Port Everglades-3": "https://youtu.be/Lpm-C_Gz6yM", } self.cam_loc_default: str = "Peace Bridge US" # run time parameters that are from user input self.model_choice: str = self.model_choice_default self.cam_loc: str = self.cam_loc_default self.roi_height: str = int(FRAME_HEIGHT / 2) self.roi_thickness_half: str = 30 self.frame_reduction: int = 50 self.obj_class_id: list[int] = [2, 3, 5, 7] # define some logic flow control booleans self._is_running: bool = False self._is_tracking: bool = False # self._model_changed: bool = True # self._cam_loc_changed: bool = True # self._roi_height_changed: bool = True # self._obj_class_id_changed: bool = False self.stream0: CamGear = None self.stream1: CamGear = None self.counter = None self.speed_obj = None def load_model( self, model_choice: str = "y8small", conf_threshold: float = 0.25, iou_threshold: float = 0.7, use_FP16: bool = False, use_stream_buffer: bool = False ) -> None: """ load the YOLOv8 model of choice """ if model_choice not in self.model_dict: logging.warning( f'\"{model_choice}\" not found in the model_dict, use ' f'\"{self.model_dict[self.model_choice_default]}\" instead!' ) self.model_choice = self.model_choice_default else: self.model_choice = model_choice self.model = YOLO(f"{self.model_dict[self.model_choice]}") # push the model to GPU if available device = "cuda" if torch.cuda.is_available() else "cpu" if device == "cuda": torch.cuda.set_device(0) self.model.to(device) logging.info( f"{self.model_dict[self.model_choice]} loaded using " f"torch w GPU0" ) else: logging.info( f"{self.model_dict[self.model_choice]} loaded using CPU" ) # setup some configs self.conf_threshold: float = conf_threshold if conf_threshold > 0.0 else 0.25 # noqa self.iou_threshold: float = iou_threshold if iou_threshold > 0.0 else 0.7 # noqa self.use_FP16: bool = use_FP16 self.use_stream_buffer: bool = use_stream_buffer logging.info( f"{self.model_choice}: conf={self.conf_threshold:.2f} | " f"iou={self.iou_threshold:.2f} | FP16={self.use_FP16} | " f"stream_buffer={self.use_stream_buffer}" ) def select_cam_loc( self, cam_loc_key: str = "Peace Bridge US", cam_loc_val: str = "https://www.youtube.com/watch?v=9En2186vo5g" ) -> None: """ select camera video feed from url_dict, or set as a new url """ if (bool(cam_loc_key) is False or bool(cam_loc_val) is False): self.cam_loc = self.cam_loc_default logging.warning( f'input cam_loc_key, cam_loc_val pair invalid, use default ' f'{{{self.cam_loc_default}: ' f'{self.url_dict[self.cam_loc_default]}}}' ) elif cam_loc_key not in self.url_dict: if try_site(self.url_dict[self.cam_loc]): self.url_dict.update({cam_loc_key: cam_loc_val}) self.cam_loc = cam_loc_key logging.info( f'input cam_loc key:val pair is new and playable, add ' f'{{{cam_loc_key}:{cam_loc_val}}} into url_dict' ) else: self.cam_loc = self.cam_loc_default logging.warning( f'input cam_loc key:val pair is new but not playable, ' f'roll back to default {{{self.cam_loc_default}: ' f'{self.url_dict[self.cam_loc_default]}}}' ) self.cam_loc = self.cam_loc_default else: self.cam_loc = cam_loc_key logging.info( f'use {{{self.cam_loc}: {self.url_dict[self.cam_loc]}}} as source' ) def set_roi_height(self, roi_height: int = 360): if (roi_height < 0 or roi_height > self.FRAME_HEIGHT): self.roi_height = int(self.FRAME_HEIGHT / 2) logging.warning( f'roi_height invalid, use default {int(self.FRAME_HEIGHT / 2)}' ) else: self.roi_height = roi_height logging.info(f'roi_height is set at {self.roi_height}') def set_frame_reduction(self, frame_reduction: int = 50): if (frame_reduction < 0 or frame_reduction > 100): self.frame_reduction = 50 logging.warning( f'frame_reduction:{frame_reduction} invalid, ' f'use default value 50' ) else: self.frame_reduction = frame_reduction logging.info(f'frame_reduction is set at {self.frame_reduction}') async def frame0_producer(self): """ !!! define your original video source here !!! Yields: _type_: an image frame as a bytestring output from the producer """ while True: if self._is_running: if self.stream0 is None: # Start the stream self.stream0 = CamGear( source=self.url_dict[self.cam_loc], colorspace=None, stream_mode=True, logging=True ).start() try: # loop over frames while (self.stream0 is not None and self._is_running): frame = self.stream0.read() if frame is None: frame = (np.random.standard_normal([ self.FRAME_HEIGHT, self.FRAME_WIDTH, 3 ]) * 255).astype(np.uint8) # do something with your OpenCV frame here draw_text( img=frame, text=datetime.now().strftime("%m/%d/%Y %H:%M:%S"), pos=(int(self.FRAME_WIDTH - 400), 50), font=cv2.FONT_HERSHEY_SIMPLEX, font_scale=1, font_thickness=2, line_type=cv2.LINE_AA, text_color=(0, 255, 255), text_color_bg=(0, 0, 0), ) # reducer frame size for more performance, percentage int frame = await reducer( frame, percentage=self.frame_reduction ) # handle JPEG encoding & yield frame in byte format img_encoded = cv2.imencode(".jpg", frame)[1].tobytes() yield ( b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" + img_encoded + b"\r\n" ) await asyncio.sleep(0.00001) if self.stream0 is not None: self.stream0.stop() self.stream0 = None self._is_running = False except asyncio.CancelledError: if self.stream0 is not None: self.stream0.stop() self.stream0 = None self._is_running = False logging.warning( "client disconneted in frame0_producer" ) frame = (np.random.standard_normal([ self.FRAME_HEIGHT, self.FRAME_WIDTH, 3 ]) * 255).astype(np.uint8) frame = await reducer( frame, percentage=self.frame_reduction ) img_encoded = cv2.imencode(".jpg", frame)[1].tobytes() logging.info( f"_is_running is {self._is_running} in frame0_producer" ) yield ( b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" + img_encoded + b"\r\n" ) await asyncio.sleep(0.00001) else: if self._is_running is True: pass frame = (np.random.standard_normal([ self.FRAME_HEIGHT, self.FRAME_WIDTH, 3 ]) * 255).astype(np.uint8) frame = await reducer( frame, percentage=self.frame_reduction ) img_encoded = cv2.imencode(".jpg", frame)[1].tobytes() logging.info( f"_is_running is {self._is_running} in frame0_producer" ) yield ( b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" + img_encoded + b"\r\n" ) await asyncio.sleep(0.00001) async def frame1_producer(self): """ !!! define your processed video producer here !!! Yields: _type_: an image frame as a bytestring output from the producer """ while True: if self._is_running: if self.stream1 is None: # Start the stream self.stream1 = CamGear( source=self.url_dict[self.cam_loc], colorspace=None, stream_mode=True, logging=True ).start() if self._is_tracking: if self.counter is None: # setup object counter & speed estimator region_points = [ (5, -self.roi_thickness_half + self.roi_height), (5, self.roi_thickness_half + self.roi_height), ( self.FRAME_WIDTH - 5, self.roi_thickness_half + self.roi_height ), ( self.FRAME_WIDTH - 5, -self.roi_thickness_half + self.roi_height ), ] # region_points = [ # (5, -20 + self.roi_height), # (self.FRAME_WIDTH - 5, -20 + self.roi_height), # ] self.counter = object_counter.ObjectCounter() self.counter.set_args( view_img=False, reg_pts=region_points, classes_names=self.model.names, draw_tracks=False, draw_boxes=False, draw_reg_pts=True, ) if self.speed_obj is None: # Init speed estimator line_points = [ (5, self.roi_height), (self.FRAME_WIDTH - 5, self.roi_height) ] self.speed_obj = speed_estimation.SpeedEstimator() self.speed_obj.set_args( reg_pts=line_points, names=self.model.names, view_img=False ) try: while (self.stream1 is not None and self._is_running): # read frame from provided source frame = self.stream1.read() if frame is None: frame = (np.random.standard_normal([ self.FRAME_HEIGHT, self.FRAME_WIDTH, 3 ]) * 255).astype(np.uint8) # break # do something with your OpenCV frame here draw_text( img=frame, text=datetime.now().strftime("%m/%d/%Y %H:%M:%S"), pos=(self.FRAME_WIDTH - 400, 50), font=cv2.FONT_HERSHEY_SIMPLEX, font_scale=1, font_thickness=2, line_type=cv2.LINE_AA, text_color=(0, 255, 255), text_color_bg=(0, 0, 0), ) frame_tagged = frame if self._is_tracking: # YOLOv8 tracking, persisting tracks between frames results = self.model.track( source=frame, classes=self.obj_class_id, conf=self.conf_threshold, iou=self.iou_threshold, half=self.use_FP16, stream_buffer=self.use_stream_buffer, persist=True, show=False, verbose=self.YOLO_VERBOSE ) if results[0].boxes.id is None: pass else: self.speed_obj.estimate_speed( frame_tagged, results ) self.counter.start_counting( frame_tagged, results ) # reducer frames size for performance, int percentage frame_tagged = await reducer( frame_tagged, percentage=self.frame_reduction ) # handle JPEG encoding & yield frame in byte format img_encoded = \ cv2.imencode(".jpg", frame_tagged)[1].tobytes() yield ( b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" + img_encoded + b"\r\n" ) await asyncio.sleep(0.00001) if self.stream1 is not None: self.stream1.stop() self.stream1 = None self._is_tracking = False self._is_running = False except asyncio.CancelledError: if self.stream1 is not None: self.stream1.stop() self.stream1 = None self._is_tracking = False self._is_running = False logging.warning( "client disconnected in frame1_producer" ) frame = (np.random.standard_normal([ self.FRAME_HEIGHT, self.FRAME_WIDTH, 3 ]) * 255).astype(np.uint8) frame = await reducer( frame, percentage=self.frame_reduction ) img_encoded = cv2.imencode(".jpg", frame)[1].tobytes() logging.info( f"_is_running is {self._is_running} in frame0_producer" ) yield ( b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" + img_encoded + b"\r\n" ) await asyncio.sleep(0.00001) else: if self._is_running is True: pass frame = (np.random.standard_normal([ self.FRAME_HEIGHT, self.FRAME_WIDTH, 3 ]) * 255).astype(np.uint8) # reducer frame size for more performance, percentage int frame = await reducer(frame, percentage=self.frame_reduction) # handle JPEG encoding & yield frame in byte format img_encoded = cv2.imencode(".jpg", frame)[1].tobytes() yield ( b"--frame\r\nContent-Type:video/jpeg2000\r\n\r\n" + img_encoded + b"\r\n" ) await asyncio.sleep(0.00001) async def custom_video_response(self, scope): """ Return a async video streaming response for `frame1_producer` generator Tip1: use BackgroundTask to handle the async cleanup https://github.com/tiangolo/fastapi/discussions/11022 Tip2: use is_disconnected to check client disconnection https://www.starlette.io/requests/#body https://github.com/encode/starlette/pull/320/files/d56c917460a1e6488e1206c428445c39854859c1 """ assert scope["type"] in ["http", "https"] await asyncio.sleep(0.00001) return StreamingResponse( content=self.frame1_producer(), media_type="multipart/x-mixed-replace; boundary=frame" ) async def models(self, request: HtmxRequest) -> Response: # assert (htmx := request.scope["htmx"]) if len(self.model_dict) == 0: template = "partials/ack.html" table_contents = ["model list unavailable!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-model-ack' else: template = "partials/yolo_models.html" table_contents = make_table_from_dict( self.model_dict, self.model_choice ) context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) await asyncio.sleep(0.001) return response async def urls(self, request: HtmxRequest) -> Response: # assert (htmx := request.scope["htmx"]) if len(self.url_dict) == 0: template = "partials/ack.html" table_contents = ["streaming url list unavailable!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-model-ack' else: template = "partials/camera_streams.html" table_contents = make_table_from_dict(self.url_dict, self.cam_loc) context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) await asyncio.sleep(0.01) return response async def geturl(self, request: HtmxRequest) -> Response: # assert (htmx := request.scope["htmx"]) if len(self.url_dict) == 0: template = "partials/ack.html" table_contents = ["streaming url list unavailable!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-model-ack' else: template = "partials/ack.html" if self.cam_loc in self.url_dict.keys(): table_contents = [f"{self.cam_loc} selected"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=201 ) else: table_contents = [ f"{self.cam_loc} is not in the registered url_list" ] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-url-ack' await asyncio.sleep(0.01) return response async def addurl(self, request: HtmxRequest) -> Response: # assert (htmx := request.scope["htmx"]) try: req_json = await request.json() except RuntimeError: template = "partials/ack.html" table_contents = ["receive channel unavailable!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) response.headers['Hx-Retarget'] = '#add-url-ack' await asyncio.sleep(0.01) return response if ( "payload" in req_json and "CamLoc" in req_json["payload"] and "URL" in req_json["payload"] ): cam_loc = req_json["payload"]["CamLoc"] cam_url = req_json["payload"]["URL"] if cam_loc != "" and cam_url != "": if try_site(cam_url) is False: template = "partials/ack.html" table_contents = ["invalid video URL!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) response.headers['Hx-Retarget'] = '#add-url-ack' else: self.select_cam_loc( cam_loc_key=cam_loc, cam_loc_val=cam_url ) template = "partials/camera_streams.html" table_contents = make_table_from_dict( self.url_dict, self.cam_loc ) context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=201 ) else: template = "partials/ack.html" table_contents = ["empty or invalid inputs!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) response.headers['Hx-Retarget'] = '#add-url-ack' else: template = "partials/ack.html" table_contents = ["invalid POST request!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) response.headers['Hx-Retarget'] = '#add-url-ack' await asyncio.sleep(0.01) return response async def seturl(self, request: HtmxRequest) -> Response: # assert (htmx := request.scope["htmx"]) template = "partials/ack.html" try: req_json = await request.json() except RuntimeError: table_contents = ["receive channel unavailable!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-url-ack' await asyncio.sleep(0.01) return response if ("payload" in req_json and "cam_url" in req_json["payload"]): logging.info( f"seturl: _is_running = {self._is_running}, " f"_is_tracking = {self._is_tracking}" ) if (self._is_running is True or self._is_tracking is True): table_contents = ["turn off streaming and tracking before \ setting a new camera stream!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-url-ack' else: cam_url = req_json["payload"]["cam_url"] url_list = list(filter( lambda x: self.url_dict[x] == cam_url, self.url_dict )) if len(url_list) > 0: self.cam_loc = url_list[0] table_contents = [f"{self.cam_loc} selected"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=201 ) else: table_contents = [ f"{cam_url} is not in the registered url_list" ] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-url-ack' else: table_contents = ["invalid POST request!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-url-ack' await asyncio.sleep(0.01) return response async def getmodel(self, request: HtmxRequest) -> Response: # assert (htmx := request.scope["htmx"]) if len(self.model_dict) == 0: template = "partials/ack.html" table_contents = ["model list unavailable!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-model-ack' else: template = "partials/ack.html" if self.model_choice in self.model_dict.keys(): table_contents = [f"{self.model_choice} selected"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=201 ) else: table_contents = [ f"{self.model_choice} is not in the registered model_list" ] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-url-ack' await asyncio.sleep(0.01) return response async def setmodel(self, request: HtmxRequest) -> Response: # assert (htmx := request.scope["htmx"]) template = "partials/ack.html" try: req_json = await request.json() except RuntimeError: table_contents = ["receive channel unavailable!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-model-ack' await asyncio.sleep(0.01) return response if ("payload" in req_json and "model_path" in req_json["payload"]): logging.info( f"setmodel: _is_running = {self._is_running}, " f"_is_tracking = {self._is_tracking}" ) if (self._is_tracking is True): table_contents = ["turn off tracking before setting a new \ YOLO model!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-model-ack' else: model_path = req_json["payload"]["model_path"] model_list = list(filter( lambda x: self.model_dict[x] == model_path, self.model_dict )) if len(model_list) > 0: self.model_choice = model_list[0] self.load_model( model_choice=self.model_choice, conf_threshold=self.conf_threshold, iou_threshold=self.iou_threshold, use_FP16=self.use_FP16, use_stream_buffer=self.use_stream_buffer ) table_contents = [f"{self.model_choice} selected"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=201 ) else: table_contents = [ f"{model_path} is not in the registered model_list" ] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-model-ack' else: table_contents = ["invalid POST request!"] context = {"request": request, "table": table_contents} response = templates.TemplateResponse( template, context, status_code=200 ) # response.headers['Hx-Retarget'] = '#set-model-ack' await asyncio.sleep(0.01) return response async def streamswitch(self, request: HtmxRequest) -> Response: # assert (htmx := request.scope["htmx"]) template = "partials/ack.html" try: req_json = await request.json() except RuntimeError: context = { "request": request, "table": ["receive channel unavailable!"] } status_code = 200 await asyncio.sleep(0.01) return templates.TemplateResponse( template, context, status_code=status_code ) if "payload" in req_json: logging.info(f"payload = {req_json['payload']}") if ( "stream_switch" in req_json["payload"] and req_json["payload"]["stream_switch"] == "on" ): self._is_running = True self._is_tracking = False table_contents = ["on"] status_code = 201 else: self._is_running = False self._is_tracking = False table_contents = ["off"] status_code = 201 else: table_contents = ["invalid POST request!"] status_code = 200 context = {"request": request, "table": table_contents} await asyncio.sleep(0.1) return templates.TemplateResponse( template, context, status_code=status_code ) async def trackingswitch(self, request: HtmxRequest) -> Response: # assert (htmx := request.scope["htmx"]) template = "partials/ack.html" try: req_json = await request.json() except RuntimeError: context = { "request": request, "table": ["receive channel unavailable!"] } status_code = 200 await asyncio.sleep(0.01) return templates.TemplateResponse( template, context, status_code=status_code ) if "payload" in req_json: logging.info(f"payload = {req_json['payload']}") if ( "tracking_switch" in req_json["payload"] and req_json["payload"]["tracking_switch"] == "on" ): self._is_tracking = True and self._is_running else: self._is_tracking = False if self._is_tracking: table_contents = ["on"] status_code = 201 # setup object counter & speed estimator region_points = [ (5, -20 + self.roi_height), (5, 20 + self.roi_height), (self.FRAME_WIDTH - 5, 20 + self.roi_height), (self.FRAME_WIDTH - 5, -20 + self.roi_height), ] self.counter = object_counter.ObjectCounter() self.counter.set_args( view_img=False, reg_pts=region_points, classes_names=self.model.names, draw_tracks=False, draw_boxes=False, draw_reg_pts=True, ) # Init speed estimator line_points = [ (5, self.roi_height), (self.FRAME_WIDTH - 5, self.roi_height) ] self.speed_obj = speed_estimation.SpeedEstimator() self.speed_obj.set_args( reg_pts=line_points, names=self.model.names, view_img=False ) else: table_contents = ["off"] status_code = 201 else: table_contents = ["invalid POST request!"] status_code = 200 context = {"request": request, "table": table_contents} await asyncio.sleep(0.1) return templates.TemplateResponse( template, context, status_code=status_code ) async def sse_incounts(self, request: Request): async def event_generator(): _stop_sse = False while True: # If client closes connection, stop sending events if await request.is_disconnected(): yield { "event": "evt_in_counts", "id": datetime.now().strftime("%m/%d/%Y %H:%M:%S"), "retry": RETRY_TIMEOUT_MILSEC, "data": "..." } break if self._is_running: if self._is_tracking: if _stop_sse is True: _stop_sse = False incounts_msg = self.counter.incounts_updated() if (self.counter is not None and incounts_msg): yield { "event": "evt_in_counts", "id": datetime.now().strftime("%m/%d/%Y %H:%M:%S"), "retry": RETRY_TIMEOUT_MILSEC, "data": f"{self.counter.in_counts}" } else: if _stop_sse is False: yield { "event": "evt_in_counts", "id": datetime.now().strftime("%m/%d/%Y %H:%M:%S"), "retry": RETRY_TIMEOUT_MILSEC, "data": "---" } _stop_sse = True await asyncio.sleep(EVT_STREAM_DELAY_SEC) return EventSourceResponse(event_generator()) async def sse_outcounts(self, request: Request): async def event_generator(): _stop_sse = False while True: # If client closes connection, stop sending events if await request.is_disconnected(): yield { "event": "evt_out_counts", "id": datetime.now().strftime("%m/%d/%Y %H:%M:%S"), "retry": RETRY_TIMEOUT_MILSEC, "data": "..." } break if self._is_running: if self._is_tracking: if _stop_sse is True: _stop_sse = False outcounts_msg = self.counter.outcounts_updated() if (self.counter is not None and outcounts_msg): yield { "event": "evt_out_counts", "id": datetime.now().strftime("%m/%d/%Y %H:%M:%S"), "retry": RETRY_TIMEOUT_MILSEC, "data": f"{self.counter.out_counts}" } else: if _stop_sse is False: yield { "event": "evt_out_counts", "id": datetime.now().strftime("%m/%d/%Y %H:%M:%S"), "retry": RETRY_TIMEOUT_MILSEC, "data": "---" } _stop_sse = True await asyncio.sleep(EVT_STREAM_DELAY_SEC) return EventSourceResponse(event_generator()) is_huggingface = False # define the host url and port for webgear server HOST_WEBGEAR, PORT_WEBGEAR = "localhost", 8080 # instantiate a demo case demo_case = DemoCase(YOLO_VERBOSE=False) demo_case.set_frame_reduction(frame_reduction=10) demo_case.load_model() # setup object counter & speed estimator region_points = [ (5, -demo_case.roi_thickness_half + demo_case.roi_height), (5, demo_case.roi_thickness_half + demo_case.roi_height), ( demo_case.FRAME_WIDTH - 5, demo_case.roi_thickness_half + demo_case.roi_height ), ( demo_case.FRAME_WIDTH - 5, -demo_case.roi_thickness_half + demo_case.roi_height ), ] demo_case.counter = object_counter.ObjectCounter() demo_case.counter.set_args( view_img=False, reg_pts=region_points, classes_names=demo_case.model.names, draw_tracks=False, draw_boxes=False, draw_reg_pts=True, ) # Init speed estimator line_points = [ (5, demo_case.roi_height), (demo_case.FRAME_WIDTH - 5, demo_case.roi_height) ] demo_case.speed_obj = speed_estimation.SpeedEstimator() demo_case.speed_obj.set_args( reg_pts=line_points, names=demo_case.model.names, view_img=False ) logging.info([f"{x}" for x in list(demo_case.url_dict.keys())]) logging.info([f"{x}" for x in list(demo_case.model_dict.keys())]) # setup webgear server options = { "custom_data_location": "./", "enable_infinite_frames": True, # "jpeg_compression_quality": 90, "jpeg_compression_fastdct": True, "jpeg_compression_fastupsample": True, } # demo_case.stream0 = CamGear( # source=demo_case.url_dict[demo_case.cam_loc], # colorspace=None, # stream_mode=True, # logging=True # ).start() # if demo_case.stream0 is None: # sys.exit("stream unaviable") web = WebGear( logging=True, **options ) # config webgear server web.config["generator"] = demo_case.frame0_producer web.config["middleware"] = [Middleware(HtmxMiddleware)] web.routes.append(Mount("/static", static, name="static")) web.routes.append( Route("/video1", endpoint=demo_case.custom_video_response) ) routes_dict = { "models": (demo_case.models, ["GET"]), "getmodel": (demo_case.getmodel, ["GET"]), "setmodel": (demo_case.setmodel, ["POST"]), "urls": (demo_case.urls, ["GET"]), "addurl": (demo_case.addurl, ["POST"]), "geturl": (demo_case.geturl, ["GET"]), "seturl": (demo_case.seturl, ["POST"]), "streamswitch": (demo_case.streamswitch, ["POST"]), "trackingswitch": (demo_case.trackingswitch, ["POST"]), } for k, v in routes_dict.items(): web.routes.append( Route(path=f"/{k}", endpoint=v[0], name=k, methods=v[1]) ) web.routes.append(Route( path="/sseincounts", endpoint=demo_case.sse_incounts, name="sseincounts" )) web.routes.append(Route( path="/sseoutcounts", endpoint=demo_case.sse_outcounts, name="sseoutcounts" )) # if is_huggingface is False: # # run this app on Uvicorn server at address http://localhost:8080/ # uvicorn.run( # web(), host=HOST_WEBGEAR, port=PORT_WEBGEAR, log_level="info" # ) # # close app safely # web.shutdown()