transcribe / app.py
kavehtaheri's picture
Update app.py
ee913c5 verified
import gradio as gr
import whisper
import os
import tempfile
from pydub import AudioSegment
import math
import gc # Garbage Collector interface
import requests
import zipfile
import re
from urllib.parse import urlparse
# --- Helper Functions ---
def format_time(seconds):
"""Converts seconds to SRT time format (HH:MM:SS,ms)"""
hours = int(seconds / 3600)
minutes = int((seconds % 3600) / 60)
secs = int(seconds % 60)
milliseconds = int((seconds - int(seconds)) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{milliseconds:03d}"
def generate_srt_from_result(result, transcription_mode):
"""Generates SRT content from Whisper's result dictionary."""
srt_content = []
if transcription_mode == "word":
# Word-level SRT generation
entry_index = 1
for segment in result["segments"]:
for word_info in segment.get("words", []):
start_time = format_time(word_info["start"])
end_time = format_time(word_info["end"])
text = word_info["word"].strip()
if text: # Ensure we don't add empty entries
srt_content.append(f"{entry_index}\n{start_time} --> {end_time}\n{text}\n")
entry_index += 1
else: # Default to segment-level
for i, segment in enumerate(result["segments"], 1):
start_time = format_time(segment["start"])
end_time = format_time(segment["end"])
text = segment["text"].strip()
if text:
srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n")
return "\n".join(srt_content)
# --- Google Drive Helper Functions ---
def extract_file_id_from_drive_url(url):
"""Extract file ID from various Google Drive URL formats"""
patterns = [
r'/file/d/([a-zA-Z0-9-_]+)',
r'id=([a-zA-Z0-9-_]+)',
r'/d/([a-zA-Z0-9-_]+)'
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def download_from_google_drive(file_id, destination):
"""Download file from Google Drive using file ID"""
def get_confirm_token(response):
for key, value in response.cookies.items():
if key.startswith('download_warning'):
return value
return None
def save_response_content(response, destination):
CHUNK_SIZE = 32768
with open(destination, "wb") as f:
for chunk in response.iter_content(CHUNK_SIZE):
if chunk:
f.write(chunk)
URL = "https://docs.google.com/uc?export=download"
session = requests.Session()
response = session.get(URL, params={'id': file_id}, stream=True)
token = get_confirm_token(response)
if token:
params = {'id': file_id, 'confirm': token}
response = session.get(URL, params=params, stream=True)
save_response_content(response, destination)
def extract_zip_and_get_video_files(zip_path, extract_dir):
"""Extract zip file and return list of video files"""
video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm', '.m4v'}
video_files = []
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# Find all video files in extracted content
for root, dirs, files in os.walk(extract_dir):
for file in files:
if any(file.lower().endswith(ext) for ext in video_extensions):
video_files.append(os.path.join(root, file))
return video_files
def process_google_drive_zip(drive_url, temp_dir):
"""Download and extract Google Drive zip, return video files"""
# Extract file ID from URL
file_id = extract_file_id_from_drive_url(drive_url)
if not file_id:
raise ValueError("Invalid Google Drive URL. Please ensure it's a valid shareable link.")
# Download zip file
zip_path = os.path.join(temp_dir, "downloaded.zip")
download_from_google_drive(file_id, zip_path)
# Extract and find video files
extract_dir = os.path.join(temp_dir, "extracted")
os.makedirs(extract_dir, exist_ok=True)
video_files = extract_zip_and_get_video_files(zip_path, extract_dir)
if not video_files:
raise ValueError("No video files found in the zip archive.")
return video_files
# --- New Function for Advanced Mode ---
def process_advanced_segments(full_result, max_words):
"""
Post-processes segments for Word-level Advanced mode.
Groups words into new segments with <= max_words per segment, splitting at nearest punctuation.
Adjusts timestamps based on actual word times (or proportional if needed).
Optimized: Single pass with limited lookahead.
"""
# Define punctuation for natural splits
punctuation = {'.', '!', '?', ';', ',', '--'}
# Flatten all words into a single list for continuous processing
all_words = []
for segment in full_result["segments"]:
all_words.extend(segment.get("words", []))
if not all_words:
return full_result # Nothing to process
new_segments = []
current_words = []
i = 0
while i < len(all_words):
current_words.append(all_words[i])
if len(current_words) >= max_words:
# Find nearest punctuation for split
split_index = -1
# Look backward in current words for last punctuation
for j in range(len(current_words) - 1, -1, -1):
word_text = current_words[j]["word"].strip()
if word_text[-1] in punctuation:
split_index = j + 1 # Split after this word
break
# If none, look forward in next words (limited lookahead to optimize)
if split_index == -1:
lookahead_end = min(i + 1 + 10, len(all_words)) # Cap lookahead for efficiency
for j in range(i + 1, lookahead_end):
word_text = all_words[j]["word"].strip()
current_words.append(all_words[j]) # Temporarily add to current
i += 1 # Advance i as we add
if word_text[-1] in punctuation:
split_index = len(current_words) # Split after this added word
break
# Fallback: Split at max_words if no punctuation found
if split_index == -1:
split_index = max_words
# Create new segment for current group up to split
group_words = current_words[:split_index]
if group_words:
text = " ".join(w["word"].strip() for w in group_words)
start = group_words[0]["start"]
end = group_words[-1]["end"]
new_segments.append({"start": start, "end": end, "text": text, "words": group_words})
# Remaining words become start of next group (timestamp adjustment: shifted to next)
current_words = current_words[split_index:]
i += 1
# Add any remaining words as last segment
if current_words:
text = " ".join(w["word"].strip() for w in current_words)
start = current_words[0]["start"]
end = current_words[-1]["end"]
new_segments.append({"start": start, "end": end, "text": text, "words": current_words})
# Handle rare case: If no word timestamps, fall back to proportional adjustment
for seg in new_segments:
if "words" not in seg or not seg["words"]:
# Proportional split (as per your description: adjust based on word count ratio)
orig_start = seg["start"]
orig_end = seg["end"]
word_count = len(seg["text"].split())
if word_count > max_words:
ratio = max_words / word_count
split_time = orig_start + (orig_end - orig_start) * ratio
seg["end"] = split_time # Minus from current
# Next segment would start at split_time (but since we're rebuilding, it's handled in loop)
# Replace original segments with new ones
full_result["segments"] = new_segments
return full_result
# --- Main Transcription Logic ---
def transcribe_video(video_path, drive_url, model_name, transcription_mode, chunk_length_min, max_words):
"""
Transcribes video file(s) - either uploaded directly or from Google Drive zip.
"""
# Determine input source
if drive_url and drive_url.strip():
if video_path is not None:
return "Please provide either a video file OR a Google Drive URL, not both.", None
input_source = "drive"
yield "Processing Google Drive URL...", None
elif video_path is not None:
input_source = "upload"
yield "Processing uploaded video...", None
else:
return "Please upload a video file or provide a Google Drive zip URL.", None
yield "Loading model...", None
# Load the Whisper model
try:
model = whisper.load_model(model_name)
except Exception as e:
return f"Error loading model: {e}", None
yield f"Model '{model_name}' loaded.", None
# Use a temporary directory for all our files
with tempfile.TemporaryDirectory() as temp_dir:
try:
# Get video file(s) based on input source
if input_source == "drive":
yield "Downloading and extracting from Google Drive...", None
video_files = process_google_drive_zip(drive_url.strip(), temp_dir)
yield f"Found {len(video_files)} video file(s) in zip archive.", None
# For simplicity, process the first video file found
# You could modify this to process all files if needed
current_video_path = video_files[0]
if len(video_files) > 1:
yield f"Multiple videos found. Processing: {os.path.basename(current_video_path)}", None
else:
current_video_path = video_path
yield "Extracting audio...", None
# Extract audio from video using pydub
audio_path = os.path.join(temp_dir, "extracted_audio.wav")
try:
video = AudioSegment.from_file(current_video_path)
# Export as WAV, 16kHz, mono - ideal for Whisper
video.set_channels(1).set_frame_rate(16000).export(audio_path, format="wav")
audio = AudioSegment.from_wav(audio_path)
except Exception as e:
return f"Error processing video/audio: {e}", None
# --- Chunking Logic ---
chunk_length_ms = chunk_length_min * 60 * 1000
num_chunks = math.ceil(len(audio) / chunk_length_ms)
full_result = {"segments": []}
yield f"Audio extracted. Splitting into {num_chunks} chunk(s) of {chunk_length_min} min...", None
for i in range(num_chunks):
start_ms = i * chunk_length_ms
end_ms = start_ms + chunk_length_ms
chunk = audio[start_ms:end_ms]
chunk_path = os.path.join(temp_dir, f"chunk_{i}.wav")
chunk.export(chunk_path, format="wav")
yield f"Transcribing chunk {i+1}/{num_chunks}...", None
# Determine if word-level timestamps are needed
should_get_word_timestamps = (transcription_mode in ["Word-level", "Word-level Advanced"])
# Transcribe the chunk
try:
result = model.transcribe(
chunk_path,
word_timestamps=should_get_word_timestamps,
fp16=False # Set to False for CPU-only inference
)
except Exception as e:
# Clean up and report error
del model
gc.collect()
return f"Error during transcription of chunk {i+1}: {e}", None
# --- Timestamp Correction ---
# Add the chunk's start time to all timestamps in the result
time_offset_s = start_ms / 1000.0
for segment in result["segments"]:
segment["start"] += time_offset_s
segment["end"] += time_offset_s
if "words" in segment:
for word_info in segment["words"]:
word_info["start"] += time_offset_s
word_info["end"] += time_offset_s
full_result["segments"].append(segment)
# Clean up the chunk file immediately
os.remove(chunk_path)
# Clean up the model from memory to be safe
del model
gc.collect()
# --- New: Process for Advanced Mode ---
if transcription_mode == "Word-level Advanced":
yield "Processing advanced word-level grouping...", None
full_result = process_advanced_segments(full_result, max_words)
yield "All chunks transcribed. Generating SRT file...", None
# Generate the final SRT file from the combined results
# For Advanced mode, force segment-level generation (grouped lines)
srt_mode = "segment" if transcription_mode == "Word-level Advanced" else transcription_mode
if transcription_mode == "Word-level":
srt_mode = "word"
srt_output = generate_srt_from_result(full_result, srt_mode)
# Create a final SRT file in the temp directory to be returned by Gradio
srt_file_path = os.path.join(temp_dir, "output.srt")
with open(srt_file_path, "w", encoding="utf-8") as srt_file:
srt_file.write(srt_output)
yield "Done!", srt_file_path
except Exception as e:
return f"Error: {e}", None
# --- Gradio UI ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# Whisper Video Transcriber πŸŽ₯ -> πŸ“
Upload a video, provide a Google Drive zip URL, choose your settings, and get a timed SRT subtitle file.
This app handles large videos by automatically splitting them into manageable chunks.
"""
)
with gr.Row():
with gr.Column():
gr.Markdown("### Input Source (choose one):")
video_input = gr.Video(label="Upload Video File")
gr.Markdown("**OR**")
drive_url_input = gr.Textbox(
label="Google Drive Zip URL",
placeholder="https://drive.google.com/file/d/your-file-id/view?usp=sharing",
info="Paste a public Google Drive link to a zip file containing video(s)"
)
gr.Markdown("### Settings:")
model_name = gr.Radio(
["tiny.en", "base.en"],
label="Whisper Model",
value="base.en",
info="`tiny.en` is faster, `base.en` is more accurate."
)
transcription_mode = gr.Radio(
["Segment-level", "Word-level", "Word-level Advanced"], # Added new mode
label="Transcription Granularity",
value="Segment-level",
info="Word-level is more detailed but may be slightly slower. Word-level Advanced groups into lines with max words, splitting at punctuation."
)
chunk_length_min = gr.Slider(
minimum=5,
maximum=20,
value=10,
step=1,
label="Chunk Length (minutes)",
info="Shorter chunks use less RAM but may be slightly less accurate at boundaries."
)
max_words = gr.Slider( # New input for max_words
minimum=5,
maximum=30,
value=10,
step=1,
label="Max Words per Line (Advanced mode only)",
info="For Word-level Advanced: Limits words per subtitle line, splitting intelligently at punctuation."
)
submit_button = gr.Button("Transcribe Video", variant="primary")
with gr.Column():
status_output = gr.Textbox(label="Status", interactive=False, lines=5)
srt_output_file = gr.File(label="Download SRT File")
submit_button.click(
fn=transcribe_video,
inputs=[video_input, drive_url_input, model_name, transcription_mode, chunk_length_min, max_words], # Added drive_url_input
outputs=[status_output, srt_output_file]
)
gr.Markdown(
"""
### How to Use
1. **Choose input method:** Either upload a video file OR provide a Google Drive zip URL (not both).
2. **For Google Drive:** Share your zip file publicly and paste the link. The zip should contain video files.
3. **Select a Whisper model.** For English, `base.en` provides a great balance of speed and accuracy.
4. **Choose the granularity.** 'Segment-level' is good for standard subtitles. 'Word-level' is great for karaoke-style highlighting. 'Word-level Advanced' groups into optimized subtitle lines.
5. **Click 'Transcribe Video'.** The status box will show the progress.
6. **Download the SRT file** when the process is complete. You can open this file in any text editor or load it into a video player like VLC.
### Google Drive Setup
- Upload your video files in a zip archive to Google Drive
- Right-click the zip file β†’ Share β†’ Change to "Anyone with the link"
- Copy and paste the share link into the URL field above
"""
)
if __name__ == "__main__":
demo.launch(debug=True)