Spaces:
Runtime error
Runtime error
File size: 7,910 Bytes
5b30a24 2bc52c8 5b30a24 2bc52c8 5b30a24 13f27c7 87a5eeb fea9cb0 5b30a24 2bc52c8 5b30a24 2bc52c8 5b30a24 2bc52c8 5b30a24 13f27c7 5b30a24 428c083 7e47c96 5b30a24 428c083 7e47c96 5b30a24 428c083 5b30a24 88a03bc 428c083 fea9cb0 88a03bc 428c083 88a03bc 13f27c7 428c083 13f27c7 87a5eeb 428c083 13f27c7 428c083 13f27c7 428c083 88a03bc 13f27c7 88a03bc 13f27c7 fea9cb0 13f27c7 428c083 13f27c7 88a03bc 7e47c96 87a5eeb 428c083 13f27c7 428c083 13f27c7 7e47c96 87a5eeb 7e47c96 428c083 13f27c7 428c083 7e47c96 428c083 13f27c7 428c083 7e47c96 13f27c7 428c083 88a03bc 428c083 7e47c96 428c083 5b30a24 f0a5581 fea9cb0 5b30a24 87a5eeb 5b30a24 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
import gradio as gr
import cv2
import torch
from pipelines.pipeline import InferencePipeline
import time
from huggingface_hub import hf_hub_download
import os
class ChaplinGradio:
def __init__(self):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.vsr_model = None
self.download_models()
self.load_models()
# Video params
self.fps = 16
self.frame_interval = 1 / self.fps
self.frame_compression = 25
self.last_frame_time = time.time()
# Frame buffer
self.frame_buffer = []
self.min_frames = 32 # 2 seconds of video at 16 fps
self.last_prediction = ""
print(f"Initialized with device: {self.device}, fps: {self.fps}, min_frames: {self.min_frames}")
def download_models(self):
"""Download required model files from HuggingFace"""
# Create directories if they don't exist
os.makedirs("benchmarks/LRS3/models/LRS3_V_WER19.1", exist_ok=True)
os.makedirs("benchmarks/LRS3/language_models/lm_en_subword", exist_ok=True)
# Download VSR model files
hf_hub_download(repo_id="willwade/LRS3_V_WER19.1",
filename="model.pth",
local_dir="benchmarks/LRS3/models/LRS3_V_WER19.1")
hf_hub_download(repo_id="willwade/LRS3_V_WER19.1",
filename="model.json",
local_dir="benchmarks/LRS3/models/LRS3_V_WER19.1")
# Download language model files
hf_hub_download(repo_id="willwade/lm_en_subword",
filename="model.pth",
local_dir="benchmarks/LRS3/language_models/lm_en_subword")
hf_hub_download(repo_id="willwade/lm_en_subword",
filename="model.json",
local_dir="benchmarks/LRS3/language_models/lm_en_subword")
print("Models downloaded successfully!")
def load_models(self):
"""Load models using the InferencePipeline with LRS3 config"""
config_path = "configs/LRS3_V_WER19.1.ini"
self.vsr_model = InferencePipeline(
config_path,
device=self.device,
detector="mediapipe",
face_track=True
)
print("Model loaded successfully!")
def process_frame(self, frame):
"""Process frames with buffering"""
current_time = time.time()
debug_log = [] # List to collect debug messages
# Add initial debug info
debug_log.append(f"Current time: {current_time}")
debug_log.append(f"Last prediction: {self.last_prediction}")
if current_time - self.last_frame_time < self.frame_interval:
debug_log.append("Skipping frame - too soon")
return self.last_prediction, "\n".join(debug_log)
self.last_frame_time = current_time
if frame is None:
debug_log.append("Received None frame")
return "No video input detected", "\n".join(debug_log)
try:
debug_log.append(f"Received frame with shape: {frame.shape}")
# Convert frame to grayscale if it's not already
if len(frame.shape) == 3:
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
debug_log.append("Converted frame to grayscale")
# Add frame to buffer
self.frame_buffer.append(frame)
debug_log.append(f"Buffer size now: {len(self.frame_buffer)}/{self.min_frames}")
# Process when we have enough frames
if len(self.frame_buffer) >= self.min_frames:
debug_log.append("Processing buffer - have enough frames")
# Create temp directory if it doesn't exist
os.makedirs("temp", exist_ok=True)
# Generate temporary video file path
temp_video = f"temp/frames_{time.time_ns()}.mp4"
debug_log.append(f"Created temp video path: {temp_video}")
# Get frame dimensions from first frame
frame_height, frame_width = self.frame_buffer[0].shape[:2]
debug_log.append(f"Video dimensions: {frame_width}x{frame_height}")
# Create video writer
out = cv2.VideoWriter(
temp_video,
cv2.VideoWriter_fourcc(*'mp4v'),
self.fps,
(frame_width, frame_height),
False # isColor
)
# Write all frames to video
for i, f in enumerate(self.frame_buffer):
out.write(f)
debug_log.append(f"Wrote {i+1} frames to video")
out.release()
# Verify video was created
if not os.path.exists(temp_video):
debug_log.append("Error: Video file was not created!")
else:
debug_log.append(f"Video file created successfully, size: {os.path.getsize(temp_video)} bytes")
# Clear buffer but keep last few frames for continuity
self.frame_buffer = self.frame_buffer[-8:] # Keep last 0.5 seconds
debug_log.append(f"Cleared buffer, kept {len(self.frame_buffer)} frames")
try:
# Process the video file using the pipeline
debug_log.append("Starting model inference...")
predicted_text = self.vsr_model(temp_video)
debug_log.append(f"Raw model prediction: '{predicted_text}'")
if predicted_text:
self.last_prediction = predicted_text
debug_log.append(f"Updated last prediction to: '{self.last_prediction}'")
else:
debug_log.append("Model returned empty prediction")
return (self.last_prediction or "Waiting for speech..."), "\n".join(debug_log)
except Exception as e:
error_msg = f"Error during inference: {str(e)}"
debug_log.append(error_msg)
import traceback
debug_log.append(f"Full error: {traceback.format_exc()}")
return f"Error processing frames: {str(e)}", "\n".join(debug_log)
finally:
# Clean up temp file
if os.path.exists(temp_video):
os.remove(temp_video)
debug_log.append("Cleaned up temp video file")
else:
debug_log.append("No temp file to clean up")
return (self.last_prediction or "Waiting for speech..."), "\n".join(debug_log)
except Exception as e:
error_msg = f"Error processing: {str(e)}"
debug_log.append(error_msg)
import traceback
debug_log.append(f"Full error: {traceback.format_exc()}")
return f"Error processing: {str(e)}", "\n".join(debug_log)
# Create Gradio interface
chaplin = ChaplinGradio()
iface = gr.Interface(
fn=chaplin.process_frame,
inputs=gr.Image(sources=["webcam"], streaming=True),
outputs=[
gr.Textbox(label="Predicted Text", interactive=False),
gr.Textbox(label="Debug Log", interactive=False)
],
title="Chaplin - Live Visual Speech Recognition",
description="Speak clearly into the webcam. The model will process your speech in ~2 second chunks.",
live=True
)
if __name__ == "__main__":
iface.launch() |