Spaces:
Sleeping
Sleeping
from flask import Flask, Response, stream_with_context, request | |
import cv2 | |
from cv2.typing import MatLike | |
import threading | |
import time | |
import os | |
import psutil | |
class Session: | |
def __init__(self, id: str): | |
self.id = id | |
self.latest_frame: MatLike = None | |
self.published_frame: MatLike = None | |
self.measured_fps = 0 | |
self.cpu_usage = 0 | |
self.show_stats = True | |
app = Flask(__name__) | |
PORT = int(os.environ.get("PORT", 5000)) | |
sessions: dict[str, Session] = {} | |
def draw_stats(frame: MatLike, session: Session): | |
font = cv2.FONT_HERSHEY_SIMPLEX | |
font_scale = 1 | |
thickness = 2 | |
color = (0, 255, 0) | |
text = f""" | |
ID: {session.id} | |
FPS: {session.measured_fps:.0f} | |
CPU: {session.cpu_usage:02.0f}% | |
""" | |
y0, dy = 30, 36 | |
for i, line in enumerate(text.split("\n")): | |
(text_width, text_height), _ = cv2.getTextSize(line, font, font_scale, thickness) | |
x = frame.shape[1] - text_width - 10 | |
y = y0 + (i - 1) * dy | |
cv2.putText(frame, line, (x, y), font, font_scale, color, thickness, cv2.LINE_AA) | |
def process_frame(session_id: str, video_path: str): | |
global sessions | |
session: Session = sessions.get(session_id, None) | |
if session is not None: | |
cap = cv2.VideoCapture(video_path) | |
framerate = cap.get(cv2.CAP_PROP_FPS) | |
frame_duration = 1 / framerate | |
# variables to measure FPS | |
frame_count = 0 | |
last_time = time.time() | |
rolling_duration = 0.5 | |
# variables to control framerate | |
next_frame_time = time.time() | |
while True: | |
if session_id not in sessions: | |
break | |
# control framerate | |
if time.time() >= next_frame_time: | |
ret, frame = cap.read() | |
# measure FPS | |
frame_count += 1 | |
now = time.time() | |
if now - last_time >= rolling_duration: | |
session.measured_fps = frame_count / (now - last_time) | |
session.cpu_usage = psutil.cpu_percent(interval=None) | |
frame_count = 0 | |
last_time = now | |
if not ret: | |
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
continue | |
else: | |
if session.show_stats: | |
draw_stats(frame, session) | |
session.latest_frame = frame | |
next_frame_time += frame_duration | |
else: | |
time.sleep(0.01) | |
def stream_mjpeg(session_id: str): | |
global sessions | |
session: Session = sessions.get(session_id, None) | |
if session is not None: | |
while True: | |
if session.published_frame is session.latest_frame: | |
time.sleep(0.01) | |
else: | |
if session.latest_frame is None: | |
continue | |
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50] | |
ret, jpeg = cv2.imencode(".jpg", session.latest_frame, encode_param) | |
frame_bytes = jpeg.tobytes() if ret else None | |
if frame_bytes is not None: | |
session.published_frame = session.latest_frame | |
yield (b"--frame\r\n"b"Content-Type: image/jpeg\r\n\r\n" + frame_bytes + b"\r\n") | |
def get_video(): | |
global sessions | |
session_id = request.args.get("video_index", "0") | |
show_stats = request.args.get("show_stats", "1") == "1" | |
video_index = int(request.args.get("video_index", 0)) % 4 | |
video_files = sorted([f for f in os.listdir("videos") if f.lower().endswith((".mp4", ".avi", ".mov"))]) | |
video_path = os.path.join("videos", video_files[int(video_index)]) | |
if session_id not in sessions: | |
session = Session(session_id) | |
sessions[session_id] = session | |
threading.Thread(target=process_frame, args=[session_id, video_path], daemon=True).start() | |
else: | |
session = sessions[session_id] | |
session.show_stats = show_stats | |
return Response(stream_with_context(stream_mjpeg(session_id)), mimetype="multipart/x-mixed-replace; boundary=frame") | |
def get_sessions(): | |
global sessions | |
return { | |
"sessions": [session.id for session in sessions.values()] | |
} | |
def get_threads(): | |
threads = threading.enumerate() | |
return { | |
"threads_count": len(threads) | |
} | |
def kill_all(): | |
global sessions | |
sessions.clear() | |
for thread in threading.enumerate(): | |
if thread is not threading.current_thread(): | |
thread.join(0.1) | |
return {"status": "all sessions cleared and threads joined"} | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=PORT, threaded=True) |