from flask import Flask, Response, stream_with_context, request import cv2 from cv2.typing import MatLike import threading import time import os import psutil class Session: def __init__(self, id: str): self.id = id self.latest_frame: MatLike = None self.published_frame: MatLike = None self.measured_fps = 0 self.cpu_usage = 0 self.show_stats = True app = Flask(__name__) PORT = int(os.environ.get("PORT", 5000)) sessions: dict[str, Session] = {} def draw_stats(frame: MatLike, session: Session): font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 1 thickness = 2 color = (0, 255, 0) text = f""" ID: {session.id} FPS: {session.measured_fps:.0f} CPU: {session.cpu_usage:02.0f}% """ y0, dy = 30, 36 for i, line in enumerate(text.split("\n")): (text_width, text_height), _ = cv2.getTextSize(line, font, font_scale, thickness) x = frame.shape[1] - text_width - 10 y = y0 + (i - 1) * dy cv2.putText(frame, line, (x, y), font, font_scale, color, thickness, cv2.LINE_AA) def process_frame(session_id: str, video_path: str): global sessions session: Session = sessions.get(session_id, None) if session is not None: cap = cv2.VideoCapture(video_path) framerate = cap.get(cv2.CAP_PROP_FPS) frame_duration = 1 / framerate # variables to measure FPS frame_count = 0 last_time = time.time() rolling_duration = 0.5 # variables to control framerate next_frame_time = time.time() while True: if session_id not in sessions: break # control framerate if time.time() >= next_frame_time: ret, frame = cap.read() # measure FPS frame_count += 1 now = time.time() if now - last_time >= rolling_duration: session.measured_fps = frame_count / (now - last_time) session.cpu_usage = psutil.cpu_percent(interval=None) frame_count = 0 last_time = now if not ret: cap.set(cv2.CAP_PROP_POS_FRAMES, 0) continue else: if session.show_stats: draw_stats(frame, session) session.latest_frame = frame next_frame_time += frame_duration else: time.sleep(0.01) def stream_mjpeg(session_id: str): global sessions session: Session = sessions.get(session_id, None) if session is not None: while True: if session.published_frame is session.latest_frame: time.sleep(0.01) else: if session.latest_frame is None: continue encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50] ret, jpeg = cv2.imencode(".jpg", session.latest_frame, encode_param) frame_bytes = jpeg.tobytes() if ret else None if frame_bytes is not None: session.published_frame = session.latest_frame yield (b"--frame\r\n"b"Content-Type: image/jpeg\r\n\r\n" + frame_bytes + b"\r\n") @app.get("/") def get_video(): global sessions session_id = request.args.get("video_index", "0") show_stats = request.args.get("show_stats", "1") == "1" video_index = int(request.args.get("video_index", 0)) % 4 video_files = sorted([f for f in os.listdir("videos") if f.lower().endswith((".mp4", ".avi", ".mov"))]) video_path = os.path.join("videos", video_files[int(video_index)]) if session_id not in sessions: session = Session(session_id) sessions[session_id] = session threading.Thread(target=process_frame, args=[session_id, video_path], daemon=True).start() else: session = sessions[session_id] session.show_stats = show_stats return Response(stream_with_context(stream_mjpeg(session_id)), mimetype="multipart/x-mixed-replace; boundary=frame") @app.get("/sessions") def get_sessions(): global sessions return { "sessions": [session.id for session in sessions.values()] } @app.get("/threads") def get_threads(): threads = threading.enumerate() return { "threads_count": len(threads) } @app.get("/killall") def kill_all(): global sessions sessions.clear() for thread in threading.enumerate(): if thread is not threading.current_thread(): thread.join(0.1) return {"status": "all sessions cleared and threads joined"} if __name__ == "__main__": app.run(host="0.0.0.0", port=PORT, threaded=True)