Spaces:
Running
Running
Update app.py
Browse files
app.py
CHANGED
@@ -5,13 +5,15 @@ import tempfile
|
|
5 |
from pydub import AudioSegment
|
6 |
import math
|
7 |
import gc # Garbage Collector interface
|
8 |
-
import requests
|
9 |
-
import zipfile
|
|
|
10 |
|
11 |
# --- Helper Functions ---
|
12 |
|
13 |
def format_time(seconds):
|
14 |
-
"""Converts seconds to SRT
|
|
|
15 |
hours = int(seconds / 3600)
|
16 |
minutes = int((seconds % 3600) / 60)
|
17 |
secs = int(seconds % 60)
|
@@ -52,13 +54,16 @@ def process_advanced_segments(full_result, max_words):
|
|
52 |
Adjusts timestamps based on actual word times (or proportional if needed).
|
53 |
Optimized: Single pass with limited lookahead.
|
54 |
"""
|
|
|
55 |
punctuation = {'.', '!', '?', ';', ',', '--'}
|
|
|
|
|
56 |
all_words = []
|
57 |
for segment in full_result["segments"]:
|
58 |
all_words.extend(segment.get("words", []))
|
59 |
|
60 |
if not all_words:
|
61 |
-
return full_result
|
62 |
|
63 |
new_segments = []
|
64 |
current_words = []
|
@@ -67,27 +72,32 @@ def process_advanced_segments(full_result, max_words):
|
|
67 |
current_words.append(all_words[i])
|
68 |
|
69 |
if len(current_words) >= max_words:
|
|
|
70 |
split_index = -1
|
71 |
|
|
|
72 |
for j in range(len(current_words) - 1, -1, -1):
|
73 |
word_text = current_words[j]["word"].strip()
|
74 |
-
if word_text
|
75 |
-
split_index = j + 1
|
76 |
break
|
77 |
|
|
|
78 |
if split_index == -1:
|
79 |
-
lookahead_end = min(i + 1 + 10, len(all_words))
|
80 |
for j in range(i + 1, lookahead_end):
|
81 |
word_text = all_words[j]["word"].strip()
|
82 |
-
current_words.append(all_words[j])
|
83 |
-
i += 1
|
84 |
-
if word_text
|
85 |
-
split_index = len(current_words)
|
86 |
break
|
87 |
|
|
|
88 |
if split_index == -1:
|
89 |
split_index = max_words
|
90 |
|
|
|
91 |
group_words = current_words[:split_index]
|
92 |
if group_words:
|
93 |
text = " ".join(w["word"].strip() for w in group_words)
|
@@ -95,129 +105,691 @@ def process_advanced_segments(full_result, max_words):
|
|
95 |
end = group_words[-1]["end"]
|
96 |
new_segments.append({"start": start, "end": end, "text": text, "words": group_words})
|
97 |
|
|
|
98 |
current_words = current_words[split_index:]
|
99 |
|
100 |
i += 1
|
101 |
|
|
|
102 |
if current_words:
|
103 |
text = " ".join(w["word"].strip() for w in current_words)
|
104 |
start = current_words[0]["start"]
|
105 |
end = current_words[-1]["end"]
|
106 |
new_segments.append({"start": start, "end": end, "text": text, "words": current_words})
|
107 |
|
|
|
108 |
for seg in new_segments:
|
109 |
if "words" not in seg or not seg["words"]:
|
|
|
110 |
orig_start = seg["start"]
|
111 |
orig_end = seg["end"]
|
112 |
word_count = len(seg["text"].split())
|
113 |
if word_count > max_words:
|
114 |
ratio = max_words / word_count
|
115 |
split_time = orig_start + (orig_end - orig_start) * ratio
|
116 |
-
seg["end"] = split_time
|
|
|
117 |
|
|
|
118 |
full_result["segments"] = new_segments
|
119 |
return full_result
|
120 |
|
121 |
-
# ---
|
122 |
-
|
|
|
123 |
"""
|
124 |
-
|
125 |
-
|
|
|
126 |
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
127 |
try:
|
128 |
-
|
129 |
-
file_id = url.split('/d/')[1].split('/')[0]
|
130 |
-
download_url = f'https://drive.google.com/uc?export=download&id={file_id}'
|
131 |
-
|
132 |
-
zip_path = os.path.join(target_dir, "downloaded.zip")
|
133 |
-
|
134 |
-
with requests.get(download_url, stream=True) as r:
|
135 |
-
r.raise_for_status()
|
136 |
-
with open(zip_path, 'wb') as f:
|
137 |
-
for chunk in r.iter_content(chunk_size=8192):
|
138 |
-
f.write(chunk)
|
139 |
-
return zip_path
|
140 |
except Exception as e:
|
141 |
-
|
142 |
-
|
143 |
|
144 |
-
|
145 |
|
146 |
-
|
147 |
-
"""
|
148 |
-
Transcribes a video file by either direct upload or from a Google Drive ZIP link.
|
149 |
-
It extracts audio, chunks it, processes chunks, and generates a full SRT file.
|
150 |
-
"""
|
151 |
-
# Determine the source of the video
|
152 |
-
source_path = None
|
153 |
-
|
154 |
-
# Use a single temp directory for all operations
|
155 |
with tempfile.TemporaryDirectory() as temp_dir:
|
156 |
-
|
157 |
-
if gdrive_url:
|
158 |
-
yield "Input is a Google Drive URL. Starting download...", None
|
159 |
try:
|
160 |
-
|
161 |
-
|
162 |
-
|
163 |
-
|
164 |
-
|
165 |
-
|
166 |
-
|
167 |
-
|
168 |
-
|
169 |
-
|
170 |
-
|
171 |
-
|
172 |
-
|
173 |
-
|
174 |
-
|
175 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
176 |
for file in files:
|
177 |
-
|
178 |
-
|
179 |
-
found_video = os.path.join(root, file)
|
180 |
-
break
|
181 |
-
|
182 |
-
if not found_video:
|
183 |
-
return "Error: No video file found in the provided ZIP archive.", None
|
184 |
-
|
185 |
-
source_path = found_video
|
186 |
-
yield f"Video file found: {os.path.basename(source_path)}. Proceeding with transcription...", None
|
187 |
|
188 |
-
|
189 |
-
|
|
|
|
|
190 |
|
191 |
-
|
192 |
-
|
193 |
-
source_path = video_path
|
194 |
-
yield "Input is a direct upload. Proceeding with transcription...", None
|
195 |
|
196 |
-
|
197 |
-
|
198 |
-
return "Please upload a video file or provide a Google Drive ZIP link.", None
|
199 |
|
200 |
-
#
|
201 |
-
|
202 |
-
|
203 |
-
|
204 |
try:
|
205 |
-
|
|
|
|
|
|
|
206 |
except Exception as e:
|
207 |
-
return f"Error
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
208 |
|
209 |
yield f"Model '{model_name}' loaded. Extracting audio...", None
|
210 |
|
211 |
audio_path = os.path.join(temp_dir, "extracted_audio.wav")
|
|
|
|
|
212 |
try:
|
213 |
-
video = AudioSegment.from_file(
|
|
|
214 |
video.set_channels(1).set_frame_rate(16000).export(audio_path, format="wav")
|
215 |
audio = AudioSegment.from_wav(audio_path)
|
216 |
except Exception as e:
|
217 |
return f"Error processing video/audio: {e}", None
|
218 |
|
|
|
219 |
chunk_length_ms = chunk_length_min * 60 * 1000
|
220 |
num_chunks = math.ceil(len(audio) / chunk_length_ms)
|
|
|
221 |
full_result = {"segments": []}
|
222 |
|
223 |
yield f"Audio extracted. Splitting into {num_chunks} chunk(s) of {chunk_length_min} min...", None
|
@@ -231,75 +803,81 @@ def transcribe_video(video_path, gdrive_url, model_name, transcription_mode, chu
|
|
231 |
chunk.export(chunk_path, format="wav")
|
232 |
|
233 |
yield f"Transcribing chunk {i+1}/{num_chunks}...", None
|
234 |
-
should_get_word_timestamps = (transcription_mode in ["Word-level", "Word-level Advanced"])
|
235 |
|
|
|
|
|
|
|
|
|
236 |
try:
|
237 |
result = model.transcribe(
|
238 |
chunk_path,
|
239 |
word_timestamps=should_get_word_timestamps,
|
240 |
-
fp16=False
|
241 |
)
|
242 |
except Exception as e:
|
|
|
243 |
del model
|
244 |
gc.collect()
|
245 |
return f"Error during transcription of chunk {i+1}: {e}", None
|
246 |
|
|
|
|
|
|
|
247 |
time_offset_s = start_ms / 1000.0
|
|
|
248 |
for segment in result["segments"]:
|
249 |
segment["start"] += time_offset_s
|
250 |
segment["end"] += time_offset_s
|
|
|
251 |
if "words" in segment:
|
252 |
for word_info in segment["words"]:
|
253 |
word_info["start"] += time_offset_s
|
254 |
word_info["end"] += time_offset_s
|
|
|
255 |
full_result["segments"].append(segment)
|
256 |
|
|
|
257 |
os.remove(chunk_path)
|
258 |
|
|
|
259 |
del model
|
260 |
gc.collect()
|
261 |
|
|
|
262 |
if transcription_mode == "Word-level Advanced":
|
263 |
yield "Processing advanced word-level grouping...", None
|
264 |
full_result = process_advanced_segments(full_result, max_words)
|
265 |
|
266 |
yield "All chunks transcribed. Generating SRT file...", None
|
|
|
|
|
|
|
267 |
srt_mode = "segment" if transcription_mode == "Word-level Advanced" else transcription_mode
|
268 |
srt_output = generate_srt_from_result(full_result, srt_mode)
|
269 |
|
|
|
270 |
srt_file_path = os.path.join(temp_dir, "output.srt")
|
271 |
with open(srt_file_path, "w", encoding="utf-8") as srt_file:
|
272 |
srt_file.write(srt_output)
|
273 |
|
274 |
-
|
275 |
-
# Gradio handles the final temp file cleanup
|
276 |
-
return "Done!", srt_file_path
|
277 |
|
278 |
|
279 |
-
# ---
|
280 |
|
281 |
with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
282 |
gr.Markdown(
|
283 |
"""
|
284 |
# Whisper Video Transcriber 🎥 -> 📝
|
285 |
-
Upload a video,
|
286 |
-
|
287 |
"""
|
288 |
)
|
289 |
with gr.Row():
|
290 |
-
with gr.Column(
|
291 |
-
|
292 |
-
|
293 |
-
with gr.TabItem("Upload Video"):
|
294 |
-
video_input = gr.Video(label="Upload a Video File")
|
295 |
-
with gr.TabItem("Google Drive Link"):
|
296 |
-
gdrive_url_input = gr.Textbox(
|
297 |
-
label="Public Google Drive ZIP File URL",
|
298 |
-
placeholder="e.g., https://drive.google.com/file/d/1a2b3c.../view?usp=sharing",
|
299 |
-
info="Paste the public share link to a ZIP file containing your video."
|
300 |
-
)
|
301 |
|
302 |
-
gr.Markdown("### Transcription Settings")
|
303 |
model_name = gr.Radio(
|
304 |
["tiny.en", "base.en"],
|
305 |
label="Whisper Model",
|
@@ -308,10 +886,10 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
|
308 |
)
|
309 |
|
310 |
transcription_mode = gr.Radio(
|
311 |
-
["Segment-level", "Word-level", "Word-level Advanced"],
|
312 |
label="Transcription Granularity",
|
313 |
value="Segment-level",
|
314 |
-
info="Word-level is more detailed. Word-level Advanced groups words
|
315 |
)
|
316 |
|
317 |
chunk_length_min = gr.Slider(
|
@@ -323,37 +901,35 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
|
323 |
info="Shorter chunks use less RAM but may be slightly less accurate at boundaries."
|
324 |
)
|
325 |
|
326 |
-
max_words = gr.Slider(
|
327 |
minimum=5,
|
328 |
maximum=30,
|
329 |
value=10,
|
330 |
step=1,
|
331 |
label="Max Words per Line (Advanced mode only)",
|
332 |
-
info="For Word-level Advanced: Limits words per subtitle line, splitting intelligently."
|
333 |
)
|
334 |
|
335 |
-
submit_button = gr.Button("Transcribe", variant="primary")
|
336 |
|
337 |
-
with gr.Column(
|
338 |
-
status_output = gr.Textbox(label="Status", interactive=False, lines=
|
339 |
srt_output_file = gr.File(label="Download SRT File")
|
340 |
|
341 |
-
# The click function now takes both video_input and gdrive_url_input
|
342 |
submit_button.click(
|
343 |
fn=transcribe_video,
|
344 |
-
inputs=[video_input,
|
345 |
outputs=[status_output, srt_output_file]
|
346 |
)
|
347 |
|
348 |
gr.Markdown(
|
349 |
"""
|
350 |
### How to Use
|
351 |
-
1. **
|
352 |
-
|
353 |
-
|
354 |
-
|
355 |
-
|
356 |
-
4. **Download the SRT file** when the process is complete.
|
357 |
"""
|
358 |
)
|
359 |
|
|
|
5 |
from pydub import AudioSegment
|
6 |
import math
|
7 |
import gc # Garbage Collector interface
|
8 |
+
import requests
|
9 |
+
import zipfile
|
10 |
+
import re
|
11 |
|
12 |
# --- Helper Functions ---
|
13 |
|
14 |
def format_time(seconds):
|
15 |
+
"""Converts seconds to SRT- Adding drive_link as an optional parameter to transcribe_video function helps manage inputs.
|
16 |
+
time format (HH:MM:SS,ms)"""
|
17 |
hours = int(seconds / 3600)
|
18 |
minutes = int((seconds % 3600) / 60)
|
19 |
secs = int(seconds % 60)
|
|
|
54 |
Adjusts timestamps based on actual word times (or proportional if needed).
|
55 |
Optimized: Single pass with limited lookahead.
|
56 |
"""
|
57 |
+
# Define punctuation for natural splits
|
58 |
punctuation = {'.', '!', '?', ';', ',', '--'}
|
59 |
+
|
60 |
+
# Flatten all words into a single list for continuous processing
|
61 |
all_words = []
|
62 |
for segment in full_result["segments"]:
|
63 |
all_words.extend(segment.get("words", []))
|
64 |
|
65 |
if not all_words:
|
66 |
+
return full_result # Nothing to process
|
67 |
|
68 |
new_segments = []
|
69 |
current_words = []
|
|
|
72 |
current_words.append(all_words[i])
|
73 |
|
74 |
if len(current_words) >= max_words:
|
75 |
+
# Find nearest punctuation for split
|
76 |
split_index = -1
|
77 |
|
78 |
+
# Look backward in current words for last punctuation
|
79 |
for j in range(len(current_words) - 1, -1, -1):
|
80 |
word_text = current_words[j]["word"].strip()
|
81 |
+
if word_text[-1] in punctuation:
|
82 |
+
split_index = j + 1 # Split after this word
|
83 |
break
|
84 |
|
85 |
+
# If none, look forward in next words (limited lookahead to optimize)
|
86 |
if split_index == -1:
|
87 |
+
lookahead_end = min(i + 1 + 10, len(all_words)) # Cap lookahead for efficiency
|
88 |
for j in range(i + 1, lookahead_end):
|
89 |
word_text = all_words[j]["word"].strip()
|
90 |
+
current_words.append(all_words[j]) # Temporarily add to current
|
91 |
+
i += 1 # Advance i as we add
|
92 |
+
if word_text[-1] in punctuation:
|
93 |
+
split_index = len(current_words) # Split after this added word
|
94 |
break
|
95 |
|
96 |
+
# Fallback: Split at max_words if no punctuation found
|
97 |
if split_index == -1:
|
98 |
split_index = max_words
|
99 |
|
100 |
+
# Create new segment for current group up to split
|
101 |
group_words = current_words[:split_index]
|
102 |
if group_words:
|
103 |
text = " ".join(w["word"].strip() for w in group_words)
|
|
|
105 |
end = group_words[-1]["end"]
|
106 |
new_segments.append({"start": start, "end": end, "text": text, "words": group_words})
|
107 |
|
108 |
+
# Remaining words become start of next group (timestamp adjustment: shifted to next)
|
109 |
current_words = current_words[split_index:]
|
110 |
|
111 |
i += 1
|
112 |
|
113 |
+
# Add any remaining words as last segment
|
114 |
if current_words:
|
115 |
text = " ".join(w["word"].strip() for w in current_words)
|
116 |
start = current_words[0]["start"]
|
117 |
end = current_words[-1]["end"]
|
118 |
new_segments.append({"start": start, "end": end, "text": text, "words": current_words})
|
119 |
|
120 |
+
# Handle rare case: If no word timestamps, fall back to proportional adjustment
|
121 |
for seg in new_segments:
|
122 |
if "words" not in seg or not seg["words"]:
|
123 |
+
# Proportional split (as per your description: adjust based on word count ratio)
|
124 |
orig_start = seg["start"]
|
125 |
orig_end = seg["end"]
|
126 |
word_count = len(seg["text"].split())
|
127 |
if word_count > max_words:
|
128 |
ratio = max_words / word_count
|
129 |
split_time = orig_start + (orig_end - orig_start) * ratio
|
130 |
+
seg["end"] = split_time # Minus from current
|
131 |
+
# Next segment would start at split_time (but since we're rebuilding, it's handled in loop)
|
132 |
|
133 |
+
# Replace original segments with new ones
|
134 |
full_result["segments"] = new_segments
|
135 |
return full_result
|
136 |
|
137 |
+
# --- Main Transcription Logic ---
|
138 |
+
|
139 |
+
def transcribe_video(video_path, model_name, transcription_mode, chunk_length_min, max_words, drive_link): # Added drive_link
|
140 |
"""
|
141 |
+
Transcribes a video file by extracting audio, chunking it, processing chunks,
|
142 |
+
and generating a full SRT file with corrected timestamps.
|
143 |
+
Supports either uploaded video or Google Drive public zip link containing the video.
|
144 |
"""
|
145 |
+
if video_path is None and not drive_link:
|
146 |
+
return "Please upload a video file or provide a Google Drive link.", None
|
147 |
+
|
148 |
+
yield "Loading model...", None # Update status for the user
|
149 |
+
|
150 |
+
# Load the Whisper model. This is cached by Gradio for subsequent calls.
|
151 |
+
# Note: On a Hugging Face Space, the model is loaded once when the app starts.
|
152 |
try:
|
153 |
+
model = whisper.load_model(model_name)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
154 |
except Exception as e:
|
155 |
+
return f"Error loading model: {e}", None
|
|
|
156 |
|
157 |
+
yield f"Model '{model_name}' loaded. Extracting audio...", None
|
158 |
|
159 |
+
# Use a temporary directory for all our files
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
160 |
with tempfile.TemporaryDirectory() as temp_dir:
|
161 |
+
if drive_link:
|
|
|
|
|
162 |
try:
|
163 |
+
yield "Parsing Google Drive link...", None
|
164 |
+
# Extract file ID from Google Drive link
|
165 |
+
match = re.search(r'/d/([a-zA-Z0-9_-]+)', drive_link)
|
166 |
+
if not match:
|
167 |
+
return "Invalid Google Drive link format.", None
|
168 |
+
file_id = match.group(1)
|
169 |
+
download_url = f"https://drive.google.com/uc?export=download&id={file_id}"
|
170 |
+
|
171 |
+
yield "Downloading zip from Google Drive...", None
|
172 |
+
zip_path = os.path.join(temp_dir, "downloaded.zip")
|
173 |
+
response = requests.get(download_url, stream=True)
|
174 |
+
if response.status_code != 200:
|
175 |
+
return f"Download failed with status {response.status_code}. Ensure the link is public.", None
|
176 |
+
|
177 |
+
# Handle large file confirmation if needed
|
178 |
+
if "confirm" in response.text:
|
179 |
+
confirm_token = re.search(r'confirm=([0-9A-Za-z\_-]+)', response.text)
|
180 |
+
if confirm_token:
|
181 |
+
confirm_token = confirm_token.group(1)
|
182 |
+
download_url = f"https://drive.google.com/uc?export=download&confirm={confirm_token}&id={file_id}"
|
183 |
+
response = requests.get(download_url, stream=True)
|
184 |
+
|
185 |
+
with open(zip_path, 'wb') as f:
|
186 |
+
for chunk in response.iter_content(chunk_size=8192):
|
187 |
+
f.write(chunk)
|
188 |
+
|
189 |
+
yield "Unzipping file...", None
|
190 |
+
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
191 |
+
zip_ref.extractall(temp_dir)
|
192 |
+
|
193 |
+
# Find the video file in the extracted contents (search recursively)
|
194 |
+
video_files = []
|
195 |
+
for root, _, files in os.walk(temp_dir):
|
196 |
for file in files:
|
197 |
+
if file.lower().endswith(('.mp4', '.avi', '.mkv', '.mov', '.wmv')):
|
198 |
+
video_files.append(os.path.join(root, file))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
199 |
|
200 |
+
if not video_files:
|
201 |
+
return "No video file found in the zip archive.", None
|
202 |
+
elif len(video_files) > 1:
|
203 |
+
return "Multiple video files found in the zip; only one is supported.", None
|
204 |
|
205 |
+
video_path = video_files[0]
|
206 |
+
yield f"Video extracted: {os.path.basename(video_path)}. Proceeding with transcription...", None
|
|
|
|
|
207 |
|
208 |
+
except Exception as e:
|
209 |
+
return f"Error handling Google Drive zip: {str(e)}", None
|
|
|
210 |
|
211 |
+
# Now proceed with audio extraction from video_path (either uploaded or extracted)
|
212 |
+
audio_path = os.path.join(temp_dir, "extracted_audio.wav")
|
213 |
+
|
214 |
+
# Extract audio from video using pydub
|
215 |
try:
|
216 |
+
video = AudioSegment.from_file(video_path)
|
217 |
+
# Export as WAV, 16kHz, mono - ideal for Whisper
|
218 |
+
video.set_channels(1).set_frame_rate(16000).export(audio_path, format="wav")
|
219 |
+
audio = AudioSegment.from_wav(audio_path)
|
220 |
except Exception as e:
|
221 |
+
return f"Error processing video/audio: {e}", None
|
222 |
+
|
223 |
+
# --- Chunking Logic ---
|
224 |
+
chunk_length_ms = chunk_length_min * 60 * 1000
|
225 |
+
num_chunks = math.ceil(len(audio) / chunk_length_ms)
|
226 |
+
|
227 |
+
full_result = {"segments": []}
|
228 |
+
|
229 |
+
yield f"Audio extracted. Splitting into {num_chunks} chunk(s) of {chunk_length_min} min...", None
|
230 |
+
|
231 |
+
for i in range(num_chunks):
|
232 |
+
start_ms = i * chunk_length_ms
|
233 |
+
end_ms = start_ms + chunk_length_ms
|
234 |
+
chunk = audio[start_ms:end_ms]
|
235 |
+
|
236 |
+
chunk_path = os.path.join(temp_dir, f"chunk_{i}.wav")
|
237 |
+
chunk.export(chunk_path, format="wav")
|
238 |
+
|
239 |
+
yield f"Transcribing chunk {i+1}/{num_chunks}...", None
|
240 |
+
|
241 |
+
# Determine if word-level timestamps are needed
|
242 |
+
should_get_word_timestamps = (transcription_mode in ["word", "Word-level Advanced"]) # Updated for new mode
|
243 |
+
|
244 |
+
# Transcribe the chunk
|
245 |
+
try:
|
246 |
+
result = model.transcribe(
|
247 |
+
chunk_path,
|
248 |
+
word_timestamps=should_get_word_timestamps,
|
249 |
+
fp16=False # Set to False for CPU-only inference
|
250 |
+
)
|
251 |
+
except Exception as e:
|
252 |
+
# Clean up and report error
|
253 |
+
del model
|
254 |
+
gc.collect()
|
255 |
+
return f"Error during transcription of chunk {i+1}: {e}", None
|
256 |
+
|
257 |
+
|
258 |
+
# --- Timestamp Correction ---
|
259 |
+
# Add the chunk's start time to all timestamps in the result
|
260 |
+
time_offset_s = start_ms / 1000.0
|
261 |
+
|
262 |
+
for segment in result["segments"]:
|
263 |
+
segment["start"] += time_offset_s
|
264 |
+
segment["end"] += time_offset_s
|
265 |
+
|
266 |
+
if "words" in segment:
|
267 |
+
for word_info in segment["words"]:
|
268 |
+
word_info["start"] += time_offset_s
|
269 |
+
word_info["end"] += time_offset_s
|
270 |
+
The user wants to modify their code to support transcribing from a public Google Drive zip file link. The process should be: take the link, download the zip, unzip it, and then transcribe like a normal video.
|
271 |
+
|
272 |
+
The current code uses Gradio for a UI where you upload a video, choose settings, and transcribe to SRT.
|
273 |
+
|
274 |
+
I need to edit the essential parts to add this new feature. The rest stays the same.
|
275 |
+
|
276 |
+
First, add an input for the Google Drive link in the UI.
|
277 |
+
|
278 |
+
The input will be either a video upload or a Google Drive link.
|
279 |
+
|
280 |
+
The transcribe function needs to handle either a video path or a link.
|
281 |
+
|
282 |
+
If it's a link, download the zip, unzip, and assume it contains the video file, then proceed.
|
283 |
+
|
284 |
+
The user said "public link of google drive zip file", so it's a zip file on Google Drive.
|
285 |
+
|
286 |
+
To download from Google Drive public link in Python.
|
287 |
+
|
288 |
+
Google Drive links for direct download need to handle the confirmation if it's large, but for public links, we can construct the direct download URL.
|
289 |
+
|
290 |
+
For a Google Drive file shared publicly, the link is like https://drive.google.com/file/d/FILE_ID/view?usp=sharing
|
291 |
+
|
292 |
+
To direct download: https://drive.google.com/uc?export=download&id=FILE_ID
|
293 |
+
|
294 |
+
For large files, it might require confirmation, but for scripts, we can use requests to handle it.
|
295 |
+
|
296 |
+
I need to add code to download the zip using requests, save to temp, unzip using zipfile, and then find the video file inside.
|
297 |
+
|
298 |
+
Assume the zip contains one video file.
|
299 |
+
|
300 |
+
Then, use that video path in the transcription.
|
301 |
+
|
302 |
+
In the UI, add a textbox for the Google Drive link.
|
303 |
+
|
304 |
+
Make it optional, so if link is provided, use that, else use uploaded video.
|
305 |
+
|
306 |
+
The user said "give a public link of google drive zip file", so probably add a new input.
|
307 |
+
|
308 |
+
To keep it simple, add a textbox for the link, and in the function, check if link is provided, if yes, download and unzip, else use video_path.
|
309 |
+
|
310 |
+
But video_input is gr.Video, which provides the path.
|
311 |
+
|
312 |
+
Add the link input.
|
313 |
+
|
314 |
+
Modify the transcribe_video function to accept the link as well.
|
315 |
+
|
316 |
+
Inputs: add the link.
|
317 |
+
|
318 |
+
If link is not None, process the link.
|
319 |
+
|
320 |
+
For downloading, use requests.
|
321 |
+
|
322 |
+
Import requests and zipfile.
|
323 |
+
|
324 |
+
In the function, if google_drive_link:
|
325 |
+
|
326 |
+
# extract file id
|
327 |
+
|
328 |
+
import re
|
329 |
+
|
330 |
+
file_id = re.search(r'/d/([^/]+)', google_drive_link).group(1)
|
331 |
+
|
332 |
+
download_url = f"https://drive.google.com/uc?export=download&id={file_id}"
|
333 |
+
|
334 |
+
# download
|
335 |
+
|
336 |
+
with requests.get(download_url, stream=True) as r:
|
337 |
+
|
338 |
+
r.raise_for_status()
|
339 |
+
|
340 |
+
zip_path = os.path.join(temp_dir, "downloaded.zip")
|
341 |
+
|
342 |
+
with open(zip_path, 'wb') as f:
|
343 |
+
|
344 |
+
for chunk in r.iter_content(chunk_size=8192):
|
345 |
+
|
346 |
+
f.write(chunk)
|
347 |
+
|
348 |
+
# unzip
|
349 |
+
|
350 |
+
import zipfile
|
351 |
+
|
352 |
+
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
353 |
+
|
354 |
+
zip_ref.extractall(temp_dir)
|
355 |
+
|
356 |
+
# find the video file, assume one .mp4 or something
|
357 |
+
|
358 |
+
video_files = [f for f in os.listdir(temp_dir) if f.endswith(('.mp4', '.avi', '.mkv', etc.))]
|
359 |
+
|
360 |
+
if not video_files:
|
361 |
+
|
362 |
+
error
|
363 |
+
|
364 |
+
video_path = os.path.join(temp_dir, video_files[0])
|
365 |
+
|
366 |
+
Then proceed with that video_path.
|
367 |
+
|
368 |
+
The temp_dir is already there.
|
369 |
+
|
370 |
+
For large files, Google Drive might return a virus scan page, so need to handle that.
|
371 |
+
|
372 |
+
For files that require confirmation, the initial GET returns a HTML with a confirm token.
|
373 |
+
|
374 |
+
To handle large files, need to:
|
375 |
+
|
376 |
+
- GET the download_url with params
|
377 |
+
|
378 |
+
- If content-type is not octet-stream, parse the cookie and confirm token, then POST or something.
|
379 |
+
|
380 |
+
But to keep it simple, assume small files or public without confirmation.
|
381 |
+
|
382 |
+
Since this is in Gradio, probably running on HF spaces, which might have limitations.
|
383 |
+
|
384 |
+
But anyway, I need to provide the code.
|
385 |
+
|
386 |
+
The user might expect a simple way.
|
387 |
+
|
388 |
+
Since it's Python, I can add the code.
|
389 |
+
|
390 |
+
Add imports: import requests, import zipfile, import re
|
391 |
+
|
392 |
+
In the UI, add google_drive_link = gr.Textbox(label="Or Google Drive Zip Link")
|
393 |
+
|
394 |
+
Then, in click, inputs add google_drive_link
|
395 |
+
|
396 |
+
In fn, def transcribe_video(video_path, model_name, ..., max_words, google_drive_link=None)
|
397 |
+
|
398 |
+
If google_drive_link:
|
399 |
+
|
400 |
+
yield "Downloading from Google Drive..."
|
401 |
+
|
402 |
+
try:
|
403 |
+
|
404 |
+
file_id = re.findall(r"\/d\/(.+?)\/", google_drive_link)[0]
|
405 |
+
|
406 |
+
# or better parsing
|
407 |
+
|
408 |
+
download_url = f"https://drive.google.com/uc?export=download&id={file_id}"
|
409 |
+
|
410 |
+
response = requests.get(download_url, stream=True)
|
411 |
+
|
412 |
+
if "Content-Disposition" not in response.headers:
|
413 |
+
|
414 |
+
# Handle confirmation
|
415 |
+
|
416 |
+
# This is complicated, perhaps use gdown library, but can't install.
|
417 |
+
|
418 |
+
The tool says: Keep in mind you have no internet access. Therefore, you CANNOT install any additional packages via pip install
|
419 |
+
|
420 |
+
But in the code, it's the user's code that will run, so if they run it locally, they can, but assuming in HF space, can't.
|
421 |
+
|
422 |
+
The code is using whisper, which is installed.
|
423 |
+
|
424 |
+
For downloading from GDrive, to handle large files, often people use gdown, but can't install.
|
425 |
+
|
426 |
+
So, for simplicity, I'll implement a basic download, and note that for large files it might not work.
|
427 |
+
|
428 |
+
Use the browse_page tool? No, this is for the code.
|
429 |
+
|
430 |
+
I am Grok, I need to edit the code.
|
431 |
+
|
432 |
+
Let's implement a function to download from GDrive handling the confirmation.
|
433 |
+
|
434 |
+
I can search for a snippet.
|
435 |
+
|
436 |
+
Use web_search to find how to download from GDrive without gdown.
|
437 |
+
|
438 |
+
But perhaps it's overkill.
|
439 |
+
|
440 |
+
Let me think.
|
441 |
+
|
442 |
+
A common way:
|
443 |
+
|
444 |
+
def download_from_gdrive(url, dest_path):
|
445 |
+
|
446 |
+
import requests
|
447 |
+
|
448 |
+
def get_confirm_token(response):
|
449 |
+
|
450 |
+
for key, value in response.cookies.items():
|
451 |
+
|
452 |
+
if key.startswith('download_warning'):
|
453 |
+
|
454 |
+
return value
|
455 |
+
|
456 |
+
return None
|
457 |
+
|
458 |
+
URL = "https://drive.google.com/uc?export=download"
|
459 |
+
|
460 |
+
file_id = # extract
|
461 |
+
|
462 |
+
session = requests.Session()
|
463 |
+
|
464 |
+
response = session.get(URL, params={'id': file_id}, stream=True)
|
465 |
+
|
466 |
+
token = get_confirm_token(response)
|
467 |
+
|
468 |
+
if token:
|
469 |
+
|
470 |
+
params = {'id': file_id, 'confirm': token}
|
471 |
+
|
472 |
+
response = session.get(URL, params=params, stream=True)
|
473 |
+
|
474 |
+
# then save
|
475 |
+
|
476 |
+
Yes, this is a standard snippet.
|
477 |
+
|
478 |
+
I can add this.
|
479 |
+
|
480 |
+
In the code, add this function.
|
481 |
+
|
482 |
+
Now, for extracting file_id, from the link.
|
483 |
+
|
484 |
+
If link is https://drive.google.com/file/d/FILE_ID/view?usp=sharing
|
485 |
+
|
486 |
+
Use re.search(r'file/d/([a-zA-Z0-9_-]+)', link)
|
487 |
+
|
488 |
+
Yes.
|
489 |
+
|
490 |
+
For the zip, after download, unzip to temp_dir, then find files ending with video extensions.
|
491 |
+
|
492 |
+
What if zip contains multiple files? Assume it's the video zip, perhaps the video is the only one or the main one.
|
493 |
+
|
494 |
+
User said "zip file", probably contains the video.
|
495 |
+
|
496 |
+
To be safe, look for the largest file or something, but simple: find the first video file.
|
497 |
+
|
498 |
+
Video extensions: mp4, mkv, avi, mov, etc.
|
499 |
+
|
500 |
+
I can list some.
|
501 |
+
|
502 |
+
In the code, after unzip, video_path = None
|
503 |
+
|
504 |
+
for f in os.listdir(temp_dir):
|
505 |
+
|
506 |
+
if f.lower().endswith(('.mp4', '.mkv', '.avi', '.mov', '.wmv')):
|
507 |
+
|
508 |
+
video_path = os.path.join(temp_dir, f)
|
509 |
+
|
510 |
+
break
|
511 |
+
|
512 |
+
If not, error.
|
513 |
+
|
514 |
+
If video_path from upload, use that.
|
515 |
+
|
516 |
+
If video_path is None and not google_drive_link, error.
|
517 |
+
|
518 |
+
But in current, video_path is required, but in Gradio, if not uploaded, it's None.
|
519 |
+
|
520 |
+
Yes.
|
521 |
+
|
522 |
+
Also, since it's generator, yield "Unzipping..."
|
523 |
+
|
524 |
+
Yes.
|
525 |
+
|
526 |
+
For the audio extraction, same.
|
527 |
+
|
528 |
+
The temp_dir is with tempfile.TemporaryDirectory(), so all good.
|
529 |
+
|
530 |
+
Add the imports at top: import requests, import re, import zipfile
|
531 |
+
|
532 |
+
Add the download function.
|
533 |
+
|
534 |
+
Since it's stateful? No.
|
535 |
+
|
536 |
+
Ok.
|
537 |
+
|
538 |
+
In the transcribe_video, it's a generator, yield status, None
|
539 |
+
|
540 |
+
Yes.
|
541 |
+
|
542 |
+
The outputs are status_output which is Textbox, but since it's generator, Gradio handles streaming?
|
543 |
+
|
544 |
+
In current code, it's yield "message", None
|
545 |
+
|
546 |
+
But for outputs=[status, file], and file is updated at end.
|
547 |
+
|
548 |
+
In Gradio, for streaming, fn should be generator.
|
549 |
+
|
550 |
+
Yes.
|
551 |
+
|
552 |
+
Ok.
|
553 |
+
|
554 |
+
In the UI, add the input.
|
555 |
+
|
556 |
+
And in how to use, add option for link.
|
557 |
+
|
558 |
+
But user said "edit essential part", rest same.
|
559 |
+
|
560 |
+
But to make it work, need to add the input.
|
561 |
+
|
562 |
+
Yes.
|
563 |
+
|
564 |
+
I think that's it.```python
|
565 |
+
import gradio as gr
|
566 |
+
import whisper
|
567 |
+
import os
|
568 |
+
import tempfile
|
569 |
+
from pydub import AudioSegment
|
570 |
+
import math
|
571 |
+
import gc # Garbage Collector interface
|
572 |
+
import requests
|
573 |
+
import re
|
574 |
+
import zipfile
|
575 |
+
|
576 |
+
# --- Helper Functions ---
|
577 |
+
|
578 |
+
def format_time(seconds):
|
579 |
+
"""Converts seconds to SRT time format (HH:MM:SS,ms)"""
|
580 |
+
hours = int(seconds / 3600)
|
581 |
+
minutes = int((seconds % 3600) / 60)
|
582 |
+
secs = int(seconds % 60)
|
583 |
+
milliseconds = int((seconds - int(seconds)) * 1000)
|
584 |
+
return f"{hours:02d}:{minutes:02d}:{secs:02d},{milliseconds:03d}"
|
585 |
+
|
586 |
+
def generate_srt_from_result(result, transcription_mode):
|
587 |
+
"""Generates SRT content from Whisper's result dictionary."""
|
588 |
+
srt_content = []
|
589 |
+
|
590 |
+
if transcription_mode == "word":
|
591 |
+
# Word-level SRT generation
|
592 |
+
entry_index = 1
|
593 |
+
for segment in result["segments"]:
|
594 |
+
for word_info in segment.get("words", []):
|
595 |
+
start_time = format_time(word_info["start"])
|
596 |
+
end_time = format_time(word_info["end"])
|
597 |
+
text = word_info["word"].strip()
|
598 |
+
if text: # Ensure we don't add empty entries
|
599 |
+
srt_content.append(f"{entry_index}\n{start_time} --> {end_time}\n{text}\n")
|
600 |
+
entry_index += 1
|
601 |
+
else: # Default to segment-level
|
602 |
+
for i, segment in enumerate(result["segments"], 1):
|
603 |
+
start_time = format_time(segment["start"])
|
604 |
+
end_time = format_time(segment["end"])
|
605 |
+
text = segment["text"].strip()
|
606 |
+
if text:
|
607 |
+
srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n")
|
608 |
+
|
609 |
+
return "\n".join(srt_content)
|
610 |
+
|
611 |
+
# --- New Function for Advanced Mode ---
|
612 |
+
|
613 |
+
def process_advanced_segments(full_result, max_words):
|
614 |
+
"""
|
615 |
+
Post-processes segments for Word-level Advanced mode.
|
616 |
+
Groups words into new segments with <= max_words per segment, splitting at nearest punctuation.
|
617 |
+
Adjusts timestamps based on actual word times (or proportional if needed).
|
618 |
+
Optimized: Single pass with limited lookahead.
|
619 |
+
"""
|
620 |
+
# Define punctuation for natural splits
|
621 |
+
punctuation = {'.', '!', '?', ';', ',', '--'}
|
622 |
+
|
623 |
+
# Flatten all words into a single list for continuous processing
|
624 |
+
all_words = []
|
625 |
+
for segment in full_result["segments"]:
|
626 |
+
all_words.extend(segment.get("words", []))
|
627 |
+
|
628 |
+
if not all_words:
|
629 |
+
return full_result # Nothing to process
|
630 |
+
|
631 |
+
new_segments = []
|
632 |
+
current_words = []
|
633 |
+
i = 0
|
634 |
+
while i < len(all_words):
|
635 |
+
current_words.append(all_words[i])
|
636 |
+
|
637 |
+
if len(current_words) >= max_words:
|
638 |
+
# Find nearest punctuation for split
|
639 |
+
split_index = -1
|
640 |
+
|
641 |
+
# Look backward in current words for last punctuation
|
642 |
+
for j in range(len(current_words) - 1, -1, -1):
|
643 |
+
word_text = current_words[j]["word"].strip()
|
644 |
+
if word_text[-1] in punctuation:
|
645 |
+
split_index = j + 1 # Split after this word
|
646 |
+
break
|
647 |
+
|
648 |
+
# If none, look forward in next words (limited lookahead to optimize)
|
649 |
+
if split_index == -1:
|
650 |
+
lookahead_end = min(i + 1 + 10, len(all_words)) # Cap lookahead for efficiency
|
651 |
+
for j in range(i + 1, lookahead_end):
|
652 |
+
word_text = all_words[j]["word"].strip()
|
653 |
+
current_words.append(all_words[j]) # Temporarily add to current
|
654 |
+
i += 1 # Advance i as we add
|
655 |
+
if word_text[-1] in punctuation:
|
656 |
+
split_index = len(current_words) # Split after this added word
|
657 |
+
break
|
658 |
+
|
659 |
+
# Fallback: Split at max_words if no punctuation found
|
660 |
+
if split_index == -1:
|
661 |
+
split_index = max_words
|
662 |
+
|
663 |
+
# Create new segment for current group up to split
|
664 |
+
group_words = current_words[:split_index]
|
665 |
+
if group_words:
|
666 |
+
text = " ".join(w["word"].strip() for w in group_words)
|
667 |
+
start = group_words[0]["start"]
|
668 |
+
end = group_words[-1]["end"]
|
669 |
+
new_segments.append({"start": start, "end": end, "text": text, "words": group_words})
|
670 |
+
|
671 |
+
# Remaining words become start of next group (timestamp adjustment: shifted to next)
|
672 |
+
current_words = current_words[split_index:]
|
673 |
+
|
674 |
+
i += 1
|
675 |
+
|
676 |
+
# Add any remaining words as last segment
|
677 |
+
if current_words:
|
678 |
+
text = " ".join(w["word"].strip() for w in current_words)
|
679 |
+
start = current_words[0]["start"]
|
680 |
+
end = current_words[-1]["end"]
|
681 |
+
new_segments.append({"start": start, "end": end, "text": text, "words": current_words})
|
682 |
+
|
683 |
+
# Handle rare case: If no word timestamps, fall back to proportional adjustment
|
684 |
+
for seg in new_segments:
|
685 |
+
if "words" not in seg or not seg["words"]:
|
686 |
+
# Proportional split (as per your description: adjust based on word count ratio)
|
687 |
+
orig_start = seg["start"]
|
688 |
+
orig_end = seg["end"]
|
689 |
+
word_count = len(seg["text"].split())
|
690 |
+
if word_count > max_words:
|
691 |
+
ratio = max_words / word_count
|
692 |
+
split_time = orig_start + (orig_end - orig_start) * ratio
|
693 |
+
seg["end"] = split_time # Minus from current
|
694 |
+
# Next segment would start at split_time (but since we're rebuilding, it's handled in loop)
|
695 |
+
|
696 |
+
# Replace original segments with new ones
|
697 |
+
full_result["segments"] = new_segments
|
698 |
+
return full_result
|
699 |
+
|
700 |
+
def download_from_gdrive(gdrive_link, dest_path):
|
701 |
+
"""
|
702 |
+
Downloads a file from a public Google Drive link, handling large file confirmation if needed.
|
703 |
+
"""
|
704 |
+
# Extract file ID from the link
|
705 |
+
match = re.search(r'/file/d/([a-zA-Z0-9_-]+)', gdrive_link)
|
706 |
+
if not match:
|
707 |
+
raise ValueError("Invalid Google Drive link. Ensure it's a public share link.")
|
708 |
+
file_id = match.group(1)
|
709 |
+
|
710 |
+
URL = "https://drive.google.com/uc?export=download"
|
711 |
+
session = requests.Session()
|
712 |
+
response = session.get(URL, params={'id': file_id}, stream=True)
|
713 |
+
token = get_confirm_token(response)
|
714 |
+
|
715 |
+
if token:
|
716 |
+
params = {'id': file_id, 'confirm': token}
|
717 |
+
response = session.get(URL, params=params, stream=True)
|
718 |
+
|
719 |
+
response.raise_for_status()
|
720 |
+
with open(dest_path, 'wb') as f:
|
721 |
+
for chunk in response.iter_content(chunk_size=8192):
|
722 |
+
if chunk:
|
723 |
+
f.write(chunk)
|
724 |
+
|
725 |
+
def get_confirm_token(response):
|
726 |
+
for key, value in response.cookies.items():
|
727 |
+
if key.startswith('download_warning'):
|
728 |
+
return value
|
729 |
+
return None
|
730 |
+
|
731 |
+
# --- Main Transcription Logic ---
|
732 |
+
|
733 |
+
def transcribe_video(video_path, model_name, transcription_mode, chunk_length_min, max_words, google_drive_link=None): # Added google_drive_link
|
734 |
+
"""
|
735 |
+
Transcribes a video file by extracting audio, chunking it, processing chunks,
|
736 |
+
and generating a full SRT file with corrected timestamps.
|
737 |
+
Supports downloading and unzipping from a Google Drive zip link.
|
738 |
+
"""
|
739 |
+
if video_path is None and not google_drive_link:
|
740 |
+
return "Please upload a video file or provide a Google Drive link.", None
|
741 |
+
|
742 |
+
yield "Loading model...", None # Update status for the user
|
743 |
+
|
744 |
+
# Load the Whisper model. This is cached by Gradio for subsequent calls.
|
745 |
+
# Note: On a Hugging Face Space, the model is loaded once when the app starts.
|
746 |
+
try:
|
747 |
+
model = whisper.load_model(model_name)
|
748 |
+
except Exception as e:
|
749 |
+
return f"Error loading model: {e}", None
|
750 |
+
|
751 |
+
# Use a temporary directory for all our files
|
752 |
+
with tempfile.TemporaryDirectory() as temp_dir:
|
753 |
+
if google_drive_link:
|
754 |
+
yield "Downloading zip from Google Drive...", None
|
755 |
+
try:
|
756 |
+
zip_path = os.path.join(temp_dir, "downloaded.zip")
|
757 |
+
download_from_gdrive(google_drive_link, zip_path)
|
758 |
+
except Exception as e:
|
759 |
+
return f"Error downloading from Google Drive: {e}", None
|
760 |
+
|
761 |
+
yield "Unzipping file...", None
|
762 |
+
try:
|
763 |
+
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
764 |
+
zip_ref.extractall(temp_dir)
|
765 |
+
os.remove(zip_path) # Clean up zip
|
766 |
+
except Exception as e:
|
767 |
+
return f"Error unzipping file: {e}", None
|
768 |
+
|
769 |
+
# Find the video file in the temp_dir
|
770 |
+
video_extensions = ('.mp4', '.mkv', '.avi', '.mov', '.wmv')
|
771 |
+
video_files = [f for f in os.listdir(temp_dir) if f.lower().endswith(video_extensions)]
|
772 |
+
if not video_files:
|
773 |
+
return "No video file found in the unzipped contents.", None
|
774 |
+
video_path = os.path.join(temp_dir, video_files[0]) # Assume first video file
|
775 |
|
776 |
yield f"Model '{model_name}' loaded. Extracting audio...", None
|
777 |
|
778 |
audio_path = os.path.join(temp_dir, "extracted_audio.wav")
|
779 |
+
|
780 |
+
# Extract audio from video using pydub
|
781 |
try:
|
782 |
+
video = AudioSegment.from_file(video_path)
|
783 |
+
# Export as WAV, 16kHz, mono - ideal for Whisper
|
784 |
video.set_channels(1).set_frame_rate(16000).export(audio_path, format="wav")
|
785 |
audio = AudioSegment.from_wav(audio_path)
|
786 |
except Exception as e:
|
787 |
return f"Error processing video/audio: {e}", None
|
788 |
|
789 |
+
# --- Chunking Logic ---
|
790 |
chunk_length_ms = chunk_length_min * 60 * 1000
|
791 |
num_chunks = math.ceil(len(audio) / chunk_length_ms)
|
792 |
+
|
793 |
full_result = {"segments": []}
|
794 |
|
795 |
yield f"Audio extracted. Splitting into {num_chunks} chunk(s) of {chunk_length_min} min...", None
|
|
|
803 |
chunk.export(chunk_path, format="wav")
|
804 |
|
805 |
yield f"Transcribing chunk {i+1}/{num_chunks}...", None
|
|
|
806 |
|
807 |
+
# Determine if word-level timestamps are needed
|
808 |
+
should_get_word_timestamps = (transcription_mode in ["word", "Word-level Advanced"]) # Updated for new mode
|
809 |
+
|
810 |
+
# Transcribe the chunk
|
811 |
try:
|
812 |
result = model.transcribe(
|
813 |
chunk_path,
|
814 |
word_timestamps=should_get_word_timestamps,
|
815 |
+
fp16=False # Set to False for CPU-only inference
|
816 |
)
|
817 |
except Exception as e:
|
818 |
+
# Clean up and report error
|
819 |
del model
|
820 |
gc.collect()
|
821 |
return f"Error during transcription of chunk {i+1}: {e}", None
|
822 |
|
823 |
+
|
824 |
+
# --- Timestamp Correction ---
|
825 |
+
# Add the chunk's start time to all timestamps in the result
|
826 |
time_offset_s = start_ms / 1000.0
|
827 |
+
|
828 |
for segment in result["segments"]:
|
829 |
segment["start"] += time_offset_s
|
830 |
segment["end"] += time_offset_s
|
831 |
+
|
832 |
if "words" in segment:
|
833 |
for word_info in segment["words"]:
|
834 |
word_info["start"] += time_offset_s
|
835 |
word_info["end"] += time_offset_s
|
836 |
+
|
837 |
full_result["segments"].append(segment)
|
838 |
|
839 |
+
# Clean up the chunk file immediately
|
840 |
os.remove(chunk_path)
|
841 |
|
842 |
+
# Clean up the model from memory to be safe
|
843 |
del model
|
844 |
gc.collect()
|
845 |
|
846 |
+
# --- New: Process for Advanced Mode ---
|
847 |
if transcription_mode == "Word-level Advanced":
|
848 |
yield "Processing advanced word-level grouping...", None
|
849 |
full_result = process_advanced_segments(full_result, max_words)
|
850 |
|
851 |
yield "All chunks transcribed. Generating SRT file...", None
|
852 |
+
|
853 |
+
# Generate the final SRT file from the combined results
|
854 |
+
# For Advanced mode, force segment-level generation (grouped lines)
|
855 |
srt_mode = "segment" if transcription_mode == "Word-level Advanced" else transcription_mode
|
856 |
srt_output = generate_srt_from_result(full_result, srt_mode)
|
857 |
|
858 |
+
# Create a final SRT file in the temp directory to be returned by Gradio
|
859 |
srt_file_path = os.path.join(temp_dir, "output.srt")
|
860 |
with open(srt_file_path, "w", encoding="utf-8") as srt_file:
|
861 |
srt_file.write(srt_output)
|
862 |
|
863 |
+
yield "Done!", srt_file_path
|
|
|
|
|
864 |
|
865 |
|
866 |
+
# --- Gradio UI ---
|
867 |
|
868 |
with gr.Blocks(theme=gr.themes.Soft()) as demo:
|
869 |
gr.Markdown(
|
870 |
"""
|
871 |
# Whisper Video Transcriber 🎥 -> 📝
|
872 |
+
Upload a video, choose your settings, and get a timed SRT subtitle file.
|
873 |
+
This app handles large videos by automatically splitting them into manageable chunks.
|
874 |
"""
|
875 |
)
|
876 |
with gr.Row():
|
877 |
+
with gr.Column():
|
878 |
+
video_input = gr.Video(label="Upload Video")
|
879 |
+
google_drive_link = gr.Textbox(label="Or Public Google Drive Zip Link", placeholder="https://drive.google.com/file/d/[FILE_ID]/view?usp=sharing")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
880 |
|
|
|
881 |
model_name = gr.Radio(
|
882 |
["tiny.en", "base.en"],
|
883 |
label="Whisper Model",
|
|
|
886 |
)
|
887 |
|
888 |
transcription_mode = gr.Radio(
|
889 |
+
["Segment-level", "Word-level", "Word-level Advanced"], # Added new mode
|
890 |
label="Transcription Granularity",
|
891 |
value="Segment-level",
|
892 |
+
info="Word-level is more detailed but may be slightly slower. Word-level Advanced groups into lines with max words, splitting at punctuation."
|
893 |
)
|
894 |
|
895 |
chunk_length_min = gr.Slider(
|
|
|
901 |
info="Shorter chunks use less RAM but may be slightly less accurate at boundaries."
|
902 |
)
|
903 |
|
904 |
+
max_words = gr.Slider( # New input for max_words
|
905 |
minimum=5,
|
906 |
maximum=30,
|
907 |
value=10,
|
908 |
step=1,
|
909 |
label="Max Words per Line (Advanced mode only)",
|
910 |
+
info="For Word-level Advanced: Limits words per subtitle line, splitting intelligently at punctuation."
|
911 |
)
|
912 |
|
913 |
+
submit_button = gr.Button("Transcribe Video", variant="primary")
|
914 |
|
915 |
+
with gr.Column():
|
916 |
+
status_output = gr.Textbox(label="Status", interactive=False, lines=5)
|
917 |
srt_output_file = gr.File(label="Download SRT File")
|
918 |
|
|
|
919 |
submit_button.click(
|
920 |
fn=transcribe_video,
|
921 |
+
inputs=[video_input, model_name, transcription_mode, chunk_length_min, max_words, google_drive_link], # Added google_drive_link
|
922 |
outputs=[status_output, srt_output_file]
|
923 |
)
|
924 |
|
925 |
gr.Markdown(
|
926 |
"""
|
927 |
### How to Use
|
928 |
+
1. **Upload a video file or provide a public Google Drive zip link containing the video.**
|
929 |
+
2. **Select a Whisper model.** For English, `base.en` provides a great balance of speed and accuracy.
|
930 |
+
3. **Choose the granularity.** 'Segment-level' is good for standard subtitles. 'Word-level' is great for karaoke-style highlighting. 'Word-level Advanced' groups into optimized subtitle lines.
|
931 |
+
4. **Click 'Transcribe Video'.** The status box will show the progress.
|
932 |
+
5. **Download the SRT file** when the process is complete. You can open this file in any text editor or load it into a video player like VLC.
|
|
|
933 |
"""
|
934 |
)
|
935 |
|