mock-video-feed / app.py
jukrapopk's picture
allow client to select video index
018bd02
from flask import Flask, Response, stream_with_context, request
import cv2
from cv2.typing import MatLike
import threading
import time
import os
import psutil
class Session:
def __init__(self, id: str):
self.id = id
self.latest_frame: MatLike = None
self.published_frame: MatLike = None
self.measured_fps = 0
self.cpu_usage = 0
self.show_stats = True
app = Flask(__name__)
PORT = int(os.environ.get("PORT", 5000))
sessions: dict[str, Session] = {}
def draw_stats(frame: MatLike, session: Session):
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
thickness = 2
color = (0, 255, 0)
text = f"""
ID: {session.id}
FPS: {session.measured_fps:.0f}
CPU: {session.cpu_usage:02.0f}%
"""
y0, dy = 30, 36
for i, line in enumerate(text.split("\n")):
(text_width, text_height), _ = cv2.getTextSize(line, font, font_scale, thickness)
x = frame.shape[1] - text_width - 10
y = y0 + (i - 1) * dy
cv2.putText(frame, line, (x, y), font, font_scale, color, thickness, cv2.LINE_AA)
def process_frame(session_id: str, video_path: str):
global sessions
session: Session = sessions.get(session_id, None)
if session is not None:
cap = cv2.VideoCapture(video_path)
framerate = cap.get(cv2.CAP_PROP_FPS)
frame_duration = 1 / framerate
# variables to measure FPS
frame_count = 0
last_time = time.time()
rolling_duration = 0.5
# variables to control framerate
next_frame_time = time.time()
while True:
if session_id not in sessions:
break
# control framerate
if time.time() >= next_frame_time:
ret, frame = cap.read()
# measure FPS
frame_count += 1
now = time.time()
if now - last_time >= rolling_duration:
session.measured_fps = frame_count / (now - last_time)
session.cpu_usage = psutil.cpu_percent(interval=None)
frame_count = 0
last_time = now
if not ret:
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
continue
else:
if session.show_stats:
draw_stats(frame, session)
session.latest_frame = frame
next_frame_time += frame_duration
else:
time.sleep(0.01)
def stream_mjpeg(session_id: str):
global sessions
session: Session = sessions.get(session_id, None)
if session is not None:
while True:
if session.published_frame is session.latest_frame:
time.sleep(0.01)
else:
if session.latest_frame is None:
continue
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 50]
ret, jpeg = cv2.imencode(".jpg", session.latest_frame, encode_param)
frame_bytes = jpeg.tobytes() if ret else None
if frame_bytes is not None:
session.published_frame = session.latest_frame
yield (b"--frame\r\n"b"Content-Type: image/jpeg\r\n\r\n" + frame_bytes + b"\r\n")
@app.get("/")
def get_video():
global sessions
session_id = request.args.get("video_index", "0")
show_stats = request.args.get("show_stats", "1") == "1"
video_index = int(request.args.get("video_index", 0)) % 4
video_files = sorted([f for f in os.listdir("videos") if f.lower().endswith((".mp4", ".avi", ".mov"))])
video_path = os.path.join("videos", video_files[int(video_index)])
if session_id not in sessions:
session = Session(session_id)
sessions[session_id] = session
threading.Thread(target=process_frame, args=[session_id, video_path], daemon=True).start()
else:
session = sessions[session_id]
session.show_stats = show_stats
return Response(stream_with_context(stream_mjpeg(session_id)), mimetype="multipart/x-mixed-replace; boundary=frame")
@app.get("/sessions")
def get_sessions():
global sessions
return {
"sessions": [session.id for session in sessions.values()]
}
@app.get("/threads")
def get_threads():
threads = threading.enumerate()
return {
"threads_count": len(threads)
}
@app.get("/killall")
def kill_all():
global sessions
sessions.clear()
for thread in threading.enumerate():
if thread is not threading.current_thread():
thread.join(0.1)
return {"status": "all sessions cleared and threads joined"}
if __name__ == "__main__":
app.run(host="0.0.0.0", port=PORT, threaded=True)