import gradio as gr import cv2 import torch from pipelines.pipeline import InferencePipeline import time from huggingface_hub import hf_hub_download import os class ChaplinGradio: def __init__(self): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.vsr_model = None self.download_models() self.load_models() # Video params self.fps = 16 self.frame_interval = 1 / self.fps self.frame_compression = 25 self.last_frame_time = time.time() # Frame buffer self.frame_buffer = [] self.min_frames = 32 # 2 seconds of video at 16 fps self.last_prediction = "" print(f"Initialized with device: {self.device}, fps: {self.fps}, min_frames: {self.min_frames}") def download_models(self): """Download required model files from HuggingFace""" # Create directories if they don't exist os.makedirs("benchmarks/LRS3/models/LRS3_V_WER19.1", exist_ok=True) os.makedirs("benchmarks/LRS3/language_models/lm_en_subword", exist_ok=True) # Download VSR model files hf_hub_download(repo_id="willwade/LRS3_V_WER19.1", filename="model.pth", local_dir="benchmarks/LRS3/models/LRS3_V_WER19.1") hf_hub_download(repo_id="willwade/LRS3_V_WER19.1", filename="model.json", local_dir="benchmarks/LRS3/models/LRS3_V_WER19.1") # Download language model files hf_hub_download(repo_id="willwade/lm_en_subword", filename="model.pth", local_dir="benchmarks/LRS3/language_models/lm_en_subword") hf_hub_download(repo_id="willwade/lm_en_subword", filename="model.json", local_dir="benchmarks/LRS3/language_models/lm_en_subword") print("Models downloaded successfully!") def load_models(self): """Load models using the InferencePipeline with LRS3 config""" config_path = "configs/LRS3_V_WER19.1.ini" self.vsr_model = InferencePipeline( config_path, device=self.device, detector="mediapipe", face_track=True ) print("Model loaded successfully!") def process_frame(self, frame): """Process frames with buffering""" current_time = time.time() debug_log = [] # List to collect debug messages # Add initial debug info debug_log.append(f"Current time: {current_time}") debug_log.append(f"Last prediction: {self.last_prediction}") if current_time - self.last_frame_time < self.frame_interval: debug_log.append("Skipping frame - too soon") return self.last_prediction, "\n".join(debug_log) self.last_frame_time = current_time if frame is None: debug_log.append("Received None frame") return "No video input detected", "\n".join(debug_log) try: debug_log.append(f"Received frame with shape: {frame.shape}") # Convert frame to grayscale if it's not already if len(frame.shape) == 3: frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) debug_log.append("Converted frame to grayscale") # Add frame to buffer self.frame_buffer.append(frame) debug_log.append(f"Buffer size now: {len(self.frame_buffer)}/{self.min_frames}") # Process when we have enough frames if len(self.frame_buffer) >= self.min_frames: debug_log.append("Processing buffer - have enough frames") # Create temp directory if it doesn't exist os.makedirs("temp", exist_ok=True) # Generate temporary video file path temp_video = f"temp/frames_{time.time_ns()}.mp4" debug_log.append(f"Created temp video path: {temp_video}") # Get frame dimensions from first frame frame_height, frame_width = self.frame_buffer[0].shape[:2] debug_log.append(f"Video dimensions: {frame_width}x{frame_height}") # Create video writer out = cv2.VideoWriter( temp_video, cv2.VideoWriter_fourcc(*'mp4v'), self.fps, (frame_width, frame_height), False # isColor ) # Write all frames to video for i, f in enumerate(self.frame_buffer): out.write(f) debug_log.append(f"Wrote {i+1} frames to video") out.release() # Verify video was created if not os.path.exists(temp_video): debug_log.append("Error: Video file was not created!") else: debug_log.append(f"Video file created successfully, size: {os.path.getsize(temp_video)} bytes") # Clear buffer but keep last few frames for continuity self.frame_buffer = self.frame_buffer[-8:] # Keep last 0.5 seconds debug_log.append(f"Cleared buffer, kept {len(self.frame_buffer)} frames") try: # Process the video file using the pipeline debug_log.append("Starting model inference...") predicted_text = self.vsr_model(temp_video) debug_log.append(f"Raw model prediction: '{predicted_text}'") if predicted_text: self.last_prediction = predicted_text debug_log.append(f"Updated last prediction to: '{self.last_prediction}'") else: debug_log.append("Model returned empty prediction") return (self.last_prediction or "Waiting for speech..."), "\n".join(debug_log) except Exception as e: error_msg = f"Error during inference: {str(e)}" debug_log.append(error_msg) import traceback debug_log.append(f"Full error: {traceback.format_exc()}") return f"Error processing frames: {str(e)}", "\n".join(debug_log) finally: # Clean up temp file if os.path.exists(temp_video): os.remove(temp_video) debug_log.append("Cleaned up temp video file") else: debug_log.append("No temp file to clean up") return (self.last_prediction or "Waiting for speech..."), "\n".join(debug_log) except Exception as e: error_msg = f"Error processing: {str(e)}" debug_log.append(error_msg) import traceback debug_log.append(f"Full error: {traceback.format_exc()}") return f"Error processing: {str(e)}", "\n".join(debug_log) # Create Gradio interface chaplin = ChaplinGradio() iface = gr.Interface( fn=chaplin.process_frame, inputs=gr.Image(sources=["webcam"], streaming=True), outputs=[ gr.Textbox(label="Predicted Text", interactive=False), gr.Textbox(label="Debug Log", interactive=False) ], title="Chaplin - Live Visual Speech Recognition", description="Speak clearly into the webcam. The model will process your speech in ~2 second chunks.", live=True ) if __name__ == "__main__": iface.launch()