Spaces:
Runtime error
Runtime error
import gradio as gr | |
import cv2 | |
import torch | |
from pipelines.pipeline import InferencePipeline | |
import time | |
from huggingface_hub import hf_hub_download | |
import os | |
class ChaplinGradio: | |
def __init__(self): | |
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
self.vsr_model = None | |
self.download_models() | |
self.load_models() | |
# Video params | |
self.fps = 16 | |
self.frame_interval = 1 / self.fps | |
self.frame_compression = 25 | |
self.last_frame_time = time.time() | |
# Frame buffer | |
self.frame_buffer = [] | |
self.min_frames = 32 # 2 seconds of video at 16 fps | |
self.last_prediction = "" | |
print(f"Initialized with device: {self.device}, fps: {self.fps}, min_frames: {self.min_frames}") | |
def download_models(self): | |
"""Download required model files from HuggingFace""" | |
# Create directories if they don't exist | |
os.makedirs("benchmarks/LRS3/models/LRS3_V_WER19.1", exist_ok=True) | |
os.makedirs("benchmarks/LRS3/language_models/lm_en_subword", exist_ok=True) | |
# Download VSR model files | |
hf_hub_download(repo_id="willwade/LRS3_V_WER19.1", | |
filename="model.pth", | |
local_dir="benchmarks/LRS3/models/LRS3_V_WER19.1") | |
hf_hub_download(repo_id="willwade/LRS3_V_WER19.1", | |
filename="model.json", | |
local_dir="benchmarks/LRS3/models/LRS3_V_WER19.1") | |
# Download language model files | |
hf_hub_download(repo_id="willwade/lm_en_subword", | |
filename="model.pth", | |
local_dir="benchmarks/LRS3/language_models/lm_en_subword") | |
hf_hub_download(repo_id="willwade/lm_en_subword", | |
filename="model.json", | |
local_dir="benchmarks/LRS3/language_models/lm_en_subword") | |
print("Models downloaded successfully!") | |
def load_models(self): | |
"""Load models using the InferencePipeline with LRS3 config""" | |
config_path = "configs/LRS3_V_WER19.1.ini" | |
self.vsr_model = InferencePipeline( | |
config_path, | |
device=self.device, | |
detector="mediapipe", | |
face_track=True | |
) | |
print("Model loaded successfully!") | |
def process_frame(self, frame): | |
"""Process frames with buffering""" | |
current_time = time.time() | |
debug_log = [] # List to collect debug messages | |
# Add initial debug info | |
debug_log.append(f"Current time: {current_time}") | |
debug_log.append(f"Last prediction: {self.last_prediction}") | |
if current_time - self.last_frame_time < self.frame_interval: | |
debug_log.append("Skipping frame - too soon") | |
return self.last_prediction, "\n".join(debug_log) | |
self.last_frame_time = current_time | |
if frame is None: | |
debug_log.append("Received None frame") | |
return "No video input detected", "\n".join(debug_log) | |
try: | |
debug_log.append(f"Received frame with shape: {frame.shape}") | |
# Convert frame to grayscale if it's not already | |
if len(frame.shape) == 3: | |
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) | |
debug_log.append("Converted frame to grayscale") | |
# Add frame to buffer | |
self.frame_buffer.append(frame) | |
debug_log.append(f"Buffer size now: {len(self.frame_buffer)}/{self.min_frames}") | |
# Process when we have enough frames | |
if len(self.frame_buffer) >= self.min_frames: | |
debug_log.append("Processing buffer - have enough frames") | |
# Create temp directory if it doesn't exist | |
os.makedirs("temp", exist_ok=True) | |
# Generate temporary video file path | |
temp_video = f"temp/frames_{time.time_ns()}.mp4" | |
debug_log.append(f"Created temp video path: {temp_video}") | |
# Get frame dimensions from first frame | |
frame_height, frame_width = self.frame_buffer[0].shape[:2] | |
debug_log.append(f"Video dimensions: {frame_width}x{frame_height}") | |
# Create video writer | |
out = cv2.VideoWriter( | |
temp_video, | |
cv2.VideoWriter_fourcc(*'mp4v'), | |
self.fps, | |
(frame_width, frame_height), | |
False # isColor | |
) | |
# Write all frames to video | |
for i, f in enumerate(self.frame_buffer): | |
out.write(f) | |
debug_log.append(f"Wrote {i+1} frames to video") | |
out.release() | |
# Verify video was created | |
if not os.path.exists(temp_video): | |
debug_log.append("Error: Video file was not created!") | |
else: | |
debug_log.append(f"Video file created successfully, size: {os.path.getsize(temp_video)} bytes") | |
# Clear buffer but keep last few frames for continuity | |
self.frame_buffer = self.frame_buffer[-8:] # Keep last 0.5 seconds | |
debug_log.append(f"Cleared buffer, kept {len(self.frame_buffer)} frames") | |
try: | |
# Process the video file using the pipeline | |
debug_log.append("Starting model inference...") | |
predicted_text = self.vsr_model(temp_video) | |
debug_log.append(f"Raw model prediction: '{predicted_text}'") | |
if predicted_text: | |
self.last_prediction = predicted_text | |
debug_log.append(f"Updated last prediction to: '{self.last_prediction}'") | |
else: | |
debug_log.append("Model returned empty prediction") | |
return (self.last_prediction or "Waiting for speech..."), "\n".join(debug_log) | |
except Exception as e: | |
error_msg = f"Error during inference: {str(e)}" | |
debug_log.append(error_msg) | |
import traceback | |
debug_log.append(f"Full error: {traceback.format_exc()}") | |
return f"Error processing frames: {str(e)}", "\n".join(debug_log) | |
finally: | |
# Clean up temp file | |
if os.path.exists(temp_video): | |
os.remove(temp_video) | |
debug_log.append("Cleaned up temp video file") | |
else: | |
debug_log.append("No temp file to clean up") | |
return (self.last_prediction or "Waiting for speech..."), "\n".join(debug_log) | |
except Exception as e: | |
error_msg = f"Error processing: {str(e)}" | |
debug_log.append(error_msg) | |
import traceback | |
debug_log.append(f"Full error: {traceback.format_exc()}") | |
return f"Error processing: {str(e)}", "\n".join(debug_log) | |
# Create Gradio interface | |
chaplin = ChaplinGradio() | |
iface = gr.Interface( | |
fn=chaplin.process_frame, | |
inputs=gr.Image(sources=["webcam"], streaming=True), | |
outputs=[ | |
gr.Textbox(label="Predicted Text", interactive=False), | |
gr.Textbox(label="Debug Log", interactive=False) | |
], | |
title="Chaplin - Live Visual Speech Recognition", | |
description="Speak clearly into the webcam. The model will process your speech in ~2 second chunks.", | |
live=True | |
) | |
if __name__ == "__main__": | |
iface.launch() |