Spaces:
Sleeping
Sleeping
import gradio as gr | |
import whisper | |
import os | |
import tempfile | |
from pydub import AudioSegment | |
import math | |
import gc # Garbage Collector interface | |
import requests | |
import zipfile | |
import re | |
from urllib.parse import urlparse | |
# --- Helper Functions --- | |
def format_time(seconds): | |
"""Converts seconds to SRT time format (HH:MM:SS,ms)""" | |
hours = int(seconds / 3600) | |
minutes = int((seconds % 3600) / 60) | |
secs = int(seconds % 60) | |
milliseconds = int((seconds - int(seconds)) * 1000) | |
return f"{hours:02d}:{minutes:02d}:{secs:02d},{milliseconds:03d}" | |
def generate_srt_from_result(result, transcription_mode): | |
"""Generates SRT content from Whisper's result dictionary.""" | |
srt_content = [] | |
if transcription_mode == "word": | |
# Word-level SRT generation | |
entry_index = 1 | |
for segment in result["segments"]: | |
for word_info in segment.get("words", []): | |
start_time = format_time(word_info["start"]) | |
end_time = format_time(word_info["end"]) | |
text = word_info["word"].strip() | |
if text: # Ensure we don't add empty entries | |
srt_content.append(f"{entry_index}\n{start_time} --> {end_time}\n{text}\n") | |
entry_index += 1 | |
else: # Default to segment-level | |
for i, segment in enumerate(result["segments"], 1): | |
start_time = format_time(segment["start"]) | |
end_time = format_time(segment["end"]) | |
text = segment["text"].strip() | |
if text: | |
srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n") | |
return "\n".join(srt_content) | |
# --- Google Drive Helper Functions --- | |
def extract_file_id_from_drive_url(url): | |
"""Extract file ID from various Google Drive URL formats""" | |
patterns = [ | |
r'/file/d/([a-zA-Z0-9-_]+)', | |
r'id=([a-zA-Z0-9-_]+)', | |
r'/d/([a-zA-Z0-9-_]+)' | |
] | |
for pattern in patterns: | |
match = re.search(pattern, url) | |
if match: | |
return match.group(1) | |
return None | |
def download_from_google_drive(file_id, destination): | |
"""Download file from Google Drive using file ID""" | |
def get_confirm_token(response): | |
for key, value in response.cookies.items(): | |
if key.startswith('download_warning'): | |
return value | |
return None | |
def save_response_content(response, destination): | |
CHUNK_SIZE = 32768 | |
with open(destination, "wb") as f: | |
for chunk in response.iter_content(CHUNK_SIZE): | |
if chunk: | |
f.write(chunk) | |
URL = "https://docs.google.com/uc?export=download" | |
session = requests.Session() | |
response = session.get(URL, params={'id': file_id}, stream=True) | |
token = get_confirm_token(response) | |
if token: | |
params = {'id': file_id, 'confirm': token} | |
response = session.get(URL, params=params, stream=True) | |
save_response_content(response, destination) | |
def extract_zip_and_get_video_files(zip_path, extract_dir): | |
"""Extract zip file and return list of video files""" | |
video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm', '.m4v'} | |
video_files = [] | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(extract_dir) | |
# Find all video files in extracted content | |
for root, dirs, files in os.walk(extract_dir): | |
for file in files: | |
if any(file.lower().endswith(ext) for ext in video_extensions): | |
video_files.append(os.path.join(root, file)) | |
return video_files | |
def process_google_drive_zip(drive_url, temp_dir): | |
"""Download and extract Google Drive zip, return video files""" | |
# Extract file ID from URL | |
file_id = extract_file_id_from_drive_url(drive_url) | |
if not file_id: | |
raise ValueError("Invalid Google Drive URL. Please ensure it's a valid shareable link.") | |
# Download zip file | |
zip_path = os.path.join(temp_dir, "downloaded.zip") | |
download_from_google_drive(file_id, zip_path) | |
# Extract and find video files | |
extract_dir = os.path.join(temp_dir, "extracted") | |
os.makedirs(extract_dir, exist_ok=True) | |
video_files = extract_zip_and_get_video_files(zip_path, extract_dir) | |
if not video_files: | |
raise ValueError("No video files found in the zip archive.") | |
return video_files | |
# --- New Function for Advanced Mode --- | |
def process_advanced_segments(full_result, max_words): | |
""" | |
Post-processes segments for Word-level Advanced mode. | |
Groups words into new segments with <= max_words per segment, splitting at nearest punctuation. | |
Adjusts timestamps based on actual word times (or proportional if needed). | |
Optimized: Single pass with limited lookahead. | |
""" | |
# Define punctuation for natural splits | |
punctuation = {'.', '!', '?', ';', ',', '--'} | |
# Flatten all words into a single list for continuous processing | |
all_words = [] | |
for segment in full_result["segments"]: | |
all_words.extend(segment.get("words", [])) | |
if not all_words: | |
return full_result # Nothing to process | |
new_segments = [] | |
current_words = [] | |
i = 0 | |
while i < len(all_words): | |
current_words.append(all_words[i]) | |
if len(current_words) >= max_words: | |
# Find nearest punctuation for split | |
split_index = -1 | |
# Look backward in current words for last punctuation | |
for j in range(len(current_words) - 1, -1, -1): | |
word_text = current_words[j]["word"].strip() | |
if word_text[-1] in punctuation: | |
split_index = j + 1 # Split after this word | |
break | |
# If none, look forward in next words (limited lookahead to optimize) | |
if split_index == -1: | |
lookahead_end = min(i + 1 + 10, len(all_words)) # Cap lookahead for efficiency | |
for j in range(i + 1, lookahead_end): | |
word_text = all_words[j]["word"].strip() | |
current_words.append(all_words[j]) # Temporarily add to current | |
i += 1 # Advance i as we add | |
if word_text[-1] in punctuation: | |
split_index = len(current_words) # Split after this added word | |
break | |
# Fallback: Split at max_words if no punctuation found | |
if split_index == -1: | |
split_index = max_words | |
# Create new segment for current group up to split | |
group_words = current_words[:split_index] | |
if group_words: | |
text = " ".join(w["word"].strip() for w in group_words) | |
start = group_words[0]["start"] | |
end = group_words[-1]["end"] | |
new_segments.append({"start": start, "end": end, "text": text, "words": group_words}) | |
# Remaining words become start of next group (timestamp adjustment: shifted to next) | |
current_words = current_words[split_index:] | |
i += 1 | |
# Add any remaining words as last segment | |
if current_words: | |
text = " ".join(w["word"].strip() for w in current_words) | |
start = current_words[0]["start"] | |
end = current_words[-1]["end"] | |
new_segments.append({"start": start, "end": end, "text": text, "words": current_words}) | |
# Handle rare case: If no word timestamps, fall back to proportional adjustment | |
for seg in new_segments: | |
if "words" not in seg or not seg["words"]: | |
# Proportional split (as per your description: adjust based on word count ratio) | |
orig_start = seg["start"] | |
orig_end = seg["end"] | |
word_count = len(seg["text"].split()) | |
if word_count > max_words: | |
ratio = max_words / word_count | |
split_time = orig_start + (orig_end - orig_start) * ratio | |
seg["end"] = split_time # Minus from current | |
# Next segment would start at split_time (but since we're rebuilding, it's handled in loop) | |
# Replace original segments with new ones | |
full_result["segments"] = new_segments | |
return full_result | |
# --- Main Transcription Logic --- | |
def transcribe_video(video_path, drive_url, model_name, transcription_mode, chunk_length_min, max_words): | |
""" | |
Transcribes video file(s) - either uploaded directly or from Google Drive zip. | |
""" | |
# Determine input source | |
if drive_url and drive_url.strip(): | |
if video_path is not None: | |
return "Please provide either a video file OR a Google Drive URL, not both.", None | |
input_source = "drive" | |
yield "Processing Google Drive URL...", None | |
elif video_path is not None: | |
input_source = "upload" | |
yield "Processing uploaded video...", None | |
else: | |
return "Please upload a video file or provide a Google Drive zip URL.", None | |
yield "Loading model...", None | |
# Load the Whisper model | |
try: | |
model = whisper.load_model(model_name) | |
except Exception as e: | |
return f"Error loading model: {e}", None | |
yield f"Model '{model_name}' loaded.", None | |
# Use a temporary directory for all our files | |
with tempfile.TemporaryDirectory() as temp_dir: | |
try: | |
# Get video file(s) based on input source | |
if input_source == "drive": | |
yield "Downloading and extracting from Google Drive...", None | |
video_files = process_google_drive_zip(drive_url.strip(), temp_dir) | |
yield f"Found {len(video_files)} video file(s) in zip archive.", None | |
# For simplicity, process the first video file found | |
# You could modify this to process all files if needed | |
current_video_path = video_files[0] | |
if len(video_files) > 1: | |
yield f"Multiple videos found. Processing: {os.path.basename(current_video_path)}", None | |
else: | |
current_video_path = video_path | |
yield "Extracting audio...", None | |
# Extract audio from video using pydub | |
audio_path = os.path.join(temp_dir, "extracted_audio.wav") | |
try: | |
video = AudioSegment.from_file(current_video_path) | |
# Export as WAV, 16kHz, mono - ideal for Whisper | |
video.set_channels(1).set_frame_rate(16000).export(audio_path, format="wav") | |
audio = AudioSegment.from_wav(audio_path) | |
except Exception as e: | |
return f"Error processing video/audio: {e}", None | |
# --- Chunking Logic --- | |
chunk_length_ms = chunk_length_min * 60 * 1000 | |
num_chunks = math.ceil(len(audio) / chunk_length_ms) | |
full_result = {"segments": []} | |
yield f"Audio extracted. Splitting into {num_chunks} chunk(s) of {chunk_length_min} min...", None | |
for i in range(num_chunks): | |
start_ms = i * chunk_length_ms | |
end_ms = start_ms + chunk_length_ms | |
chunk = audio[start_ms:end_ms] | |
chunk_path = os.path.join(temp_dir, f"chunk_{i}.wav") | |
chunk.export(chunk_path, format="wav") | |
yield f"Transcribing chunk {i+1}/{num_chunks}...", None | |
# Determine if word-level timestamps are needed | |
should_get_word_timestamps = (transcription_mode in ["Word-level", "Word-level Advanced"]) | |
# Transcribe the chunk | |
try: | |
result = model.transcribe( | |
chunk_path, | |
word_timestamps=should_get_word_timestamps, | |
fp16=False # Set to False for CPU-only inference | |
) | |
except Exception as e: | |
# Clean up and report error | |
del model | |
gc.collect() | |
return f"Error during transcription of chunk {i+1}: {e}", None | |
# --- Timestamp Correction --- | |
# Add the chunk's start time to all timestamps in the result | |
time_offset_s = start_ms / 1000.0 | |
for segment in result["segments"]: | |
segment["start"] += time_offset_s | |
segment["end"] += time_offset_s | |
if "words" in segment: | |
for word_info in segment["words"]: | |
word_info["start"] += time_offset_s | |
word_info["end"] += time_offset_s | |
full_result["segments"].append(segment) | |
# Clean up the chunk file immediately | |
os.remove(chunk_path) | |
# Clean up the model from memory to be safe | |
del model | |
gc.collect() | |
# --- New: Process for Advanced Mode --- | |
if transcription_mode == "Word-level Advanced": | |
yield "Processing advanced word-level grouping...", None | |
full_result = process_advanced_segments(full_result, max_words) | |
yield "All chunks transcribed. Generating SRT file...", None | |
# Generate the final SRT file from the combined results | |
# For Advanced mode, force segment-level generation (grouped lines) | |
srt_mode = "segment" if transcription_mode == "Word-level Advanced" else transcription_mode | |
if transcription_mode == "Word-level": | |
srt_mode = "word" | |
srt_output = generate_srt_from_result(full_result, srt_mode) | |
# Create a final SRT file in the temp directory to be returned by Gradio | |
srt_file_path = os.path.join(temp_dir, "output.srt") | |
with open(srt_file_path, "w", encoding="utf-8") as srt_file: | |
srt_file.write(srt_output) | |
yield "Done!", srt_file_path | |
except Exception as e: | |
return f"Error: {e}", None | |
# --- Gradio UI --- | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown( | |
""" | |
# Whisper Video Transcriber π₯ -> π | |
Upload a video, provide a Google Drive zip URL, choose your settings, and get a timed SRT subtitle file. | |
This app handles large videos by automatically splitting them into manageable chunks. | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Input Source (choose one):") | |
video_input = gr.Video(label="Upload Video File") | |
gr.Markdown("**OR**") | |
drive_url_input = gr.Textbox( | |
label="Google Drive Zip URL", | |
placeholder="https://drive.google.com/file/d/your-file-id/view?usp=sharing", | |
info="Paste a public Google Drive link to a zip file containing video(s)" | |
) | |
gr.Markdown("### Settings:") | |
model_name = gr.Radio( | |
["tiny.en", "base.en"], | |
label="Whisper Model", | |
value="base.en", | |
info="`tiny.en` is faster, `base.en` is more accurate." | |
) | |
transcription_mode = gr.Radio( | |
["Segment-level", "Word-level", "Word-level Advanced"], # Added new mode | |
label="Transcription Granularity", | |
value="Segment-level", | |
info="Word-level is more detailed but may be slightly slower. Word-level Advanced groups into lines with max words, splitting at punctuation." | |
) | |
chunk_length_min = gr.Slider( | |
minimum=5, | |
maximum=20, | |
value=10, | |
step=1, | |
label="Chunk Length (minutes)", | |
info="Shorter chunks use less RAM but may be slightly less accurate at boundaries." | |
) | |
max_words = gr.Slider( # New input for max_words | |
minimum=5, | |
maximum=30, | |
value=10, | |
step=1, | |
label="Max Words per Line (Advanced mode only)", | |
info="For Word-level Advanced: Limits words per subtitle line, splitting intelligently at punctuation." | |
) | |
submit_button = gr.Button("Transcribe Video", variant="primary") | |
with gr.Column(): | |
status_output = gr.Textbox(label="Status", interactive=False, lines=5) | |
srt_output_file = gr.File(label="Download SRT File") | |
submit_button.click( | |
fn=transcribe_video, | |
inputs=[video_input, drive_url_input, model_name, transcription_mode, chunk_length_min, max_words], # Added drive_url_input | |
outputs=[status_output, srt_output_file] | |
) | |
gr.Markdown( | |
""" | |
### How to Use | |
1. **Choose input method:** Either upload a video file OR provide a Google Drive zip URL (not both). | |
2. **For Google Drive:** Share your zip file publicly and paste the link. The zip should contain video files. | |
3. **Select a Whisper model.** For English, `base.en` provides a great balance of speed and accuracy. | |
4. **Choose the granularity.** 'Segment-level' is good for standard subtitles. 'Word-level' is great for karaoke-style highlighting. 'Word-level Advanced' groups into optimized subtitle lines. | |
5. **Click 'Transcribe Video'.** The status box will show the progress. | |
6. **Download the SRT file** when the process is complete. You can open this file in any text editor or load it into a video player like VLC. | |
### Google Drive Setup | |
- Upload your video files in a zip archive to Google Drive | |
- Right-click the zip file β Share β Change to "Anyone with the link" | |
- Copy and paste the share link into the URL field above | |
""" | |
) | |
if __name__ == "__main__": | |
demo.launch(debug=True) | |