import os
import subprocess
import json
from datetime import timedelta
import tempfile
import re
import yt_dlp
import gradio as gr
import groq
from groq import Groq
# setup groq
client = Groq(api_key=os.environ.get("Groq_Api_Key"))
def handle_groq_error(e, model_name):
error_data = e.args[0]
if isinstance(error_data, str):
# Use regex to extract the JSON part of the string
json_match = re.search(r'(\{.*\})', error_data)
if json_match:
json_str = json_match.group(1)
# Ensure the JSON string is well-formed
json_str = json_str.replace("'", '"') # Replace single quotes with double quotes
error_data = json.loads(json_str)
if isinstance(e, groq.AuthenticationError):
if isinstance(error_data, dict) and 'error' in error_data and 'message' in error_data['error']:
error_message = error_data['error']['message']
raise gr.Error(error_message)
elif isinstance(e, groq.RateLimitError):
if isinstance(error_data, dict) and 'error' in error_data and 'message' in error_data['error']:
error_message = error_data['error']['message']
error_message = re.sub(r'org_[a-zA-Z0-9]+', 'org_(censored)', error_message) # censor org
raise gr.Error(error_message)
else:
raise gr.Error(f"Error during Groq API call: {e}")
# language codes for subtitle maker
LANGUAGE_CODES = {
"English": "en",
"Chinese": "zh",
"German": "de",
"Spanish": "es",
"Russian": "ru",
"Korean": "ko",
"French": "fr",
"Japanese": "ja",
"Portuguese": "pt",
"Turkish": "tr",
"Polish": "pl",
"Catalan": "ca",
"Dutch": "nl",
"Arabic": "ar",
"Swedish": "sv",
"Italian": "it",
"Indonesian": "id",
"Hindi": "hi",
"Finnish": "fi",
"Vietnamese": "vi",
"Hebrew": "he",
"Ukrainian": "uk",
"Greek": "el",
"Malay": "ms",
"Czech": "cs",
"Romanian": "ro",
"Danish": "da",
"Hungarian": "hu",
"Tamil": "ta",
"Norwegian": "no",
"Thai": "th",
"Urdu": "ur",
"Croatian": "hr",
"Bulgarian": "bg",
"Lithuanian": "lt",
"Latin": "la",
"Māori": "mi",
"Malayalam": "ml",
"Welsh": "cy",
"Slovak": "sk",
"Telugu": "te",
"Persian": "fa",
"Latvian": "lv",
"Bengali": "bn",
"Serbian": "sr",
"Azerbaijani": "az",
"Slovenian": "sl",
"Kannada": "kn",
"Estonian": "et",
"Macedonian": "mk",
"Breton": "br",
"Basque": "eu",
"Icelandic": "is",
"Armenian": "hy",
"Nepali": "ne",
"Mongolian": "mn",
"Bosnian": "bs",
"Kazakh": "kk",
"Albanian": "sq",
"Swahili": "sw",
"Galician": "gl",
"Marathi": "mr",
"Panjabi": "pa",
"Sinhala": "si",
"Khmer": "km",
"Shona": "sn",
"Yoruba": "yo",
"Somali": "so",
"Afrikaans": "af",
"Occitan": "oc",
"Georgian": "ka",
"Belarusian": "be",
"Tajik": "tg",
"Sindhi": "sd",
"Gujarati": "gu",
"Amharic": "am",
"Yiddish": "yi",
"Lao": "lo",
"Uzbek": "uz",
"Faroese": "fo",
"Haitian": "ht",
"Pashto": "ps",
"Turkmen": "tk",
"Norwegian Nynorsk": "nn",
"Maltese": "mt",
"Sanskrit": "sa",
"Luxembourgish": "lb",
"Burmese": "my",
"Tibetan": "bo",
"Tagalog": "tl",
"Malagasy": "mg",
"Assamese": "as",
"Tatar": "tt",
"Hawaiian": "haw",
"Lingala": "ln",
"Hausa": "ha",
"Bashkir": "ba",
"jw": "jw",
"Sundanese": "su",
}
# download link input
def yt_dlp_download(link):
try:
ydl_opts = {
'format': 'bestvideo+bestaudio/best', # Download best video and audio or best available
'outtmpl': '%(title)s.%(ext)s',
'nocheckcertificate': True,
'ignoreerrors': False,
'no_warnings': True,
'quiet': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([link])
result = ydl.extract_info(link, download=False)
download_path = ydl.prepare_filename(result)
return download_path
except yt_dlp.utils.DownloadError as e:
raise gr.Error(f"Download Error: {e}")
except ValueError as e:
raise gr.Error(f"Invalid Link or Format Error: {e}")
except Exception as e:
raise gr.Error(f"An unexpected error occurred: {e}")
# helper functions
def split_audio(input_file_path, chunk_size_mb):
chunk_size = chunk_size_mb * 1024 * 1024 # Convert MB to bytes
file_number = 1
chunks = []
with open(input_file_path, 'rb') as f:
chunk = f.read(chunk_size)
while chunk:
chunk_name = f"{os.path.splitext(input_file_path)[0]}_part{file_number:03}.mp3" # Pad file number for correct ordering
with open(chunk_name, 'wb') as chunk_file:
chunk_file.write(chunk)
chunks.append(chunk_name)
file_number += 1
chunk = f.read(chunk_size)
return chunks
def merge_audio(chunks, output_file_path):
with open("temp_list.txt", "w") as f:
for file in chunks:
f.write(f"file '{file}'\n")
try:
subprocess.run(
[
"ffmpeg",
"-f",
"concat",
"-safe", "0",
"-i",
"temp_list.txt",
"-c",
"copy",
"-y",
output_file_path
],
check=True
)
os.remove("temp_list.txt")
for chunk in chunks:
os.remove(chunk)
except subprocess.CalledProcessError as e:
raise gr.Error(f"Error during audio merging: {e}")
# Checks file extension, size, and downsamples or splits if needed.
ALLOWED_FILE_EXTENSIONS = ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm"]
MAX_FILE_SIZE_MB = 25
CHUNK_SIZE_MB = 25
def check_file(input_file_path):
if not input_file_path:
raise gr.Error("Please upload an audio/video file.")
file_size_mb = os.path.getsize(input_file_path) / (1024 * 1024)
file_extension = input_file_path.split(".")[-1].lower()
if file_extension not in ALLOWED_FILE_EXTENSIONS:
raise gr.Error(f"Invalid file type (.{file_extension}). Allowed types: {', '.join(ALLOWED_FILE_EXTENSIONS)}")
if file_size_mb > MAX_FILE_SIZE_MB:
gr.Warning(
f"File size too large ({file_size_mb:.2f} MB). Attempting to downsample to 16kHz MP3 128kbps. Maximum size allowed: {MAX_FILE_SIZE_MB} MB"
)
output_file_path = os.path.splitext(input_file_path)[0] + "_downsampled.mp3"
try:
subprocess.run(
[
"ffmpeg",
"-i",
input_file_path,
"-ar",
"16000",
"-ab",
"128k",
"-ac",
"1",
"-f",
"mp3",
"-y",
output_file_path,
],
check=True
)
# Check size after downsampling
downsampled_size_mb = os.path.getsize(output_file_path) / (1024 * 1024)
if downsampled_size_mb > MAX_FILE_SIZE_MB:
gr.Warning(f"File still too large after downsampling ({downsampled_size_mb:.2f} MB). Splitting into {CHUNK_SIZE_MB} MB chunks.")
return split_audio(output_file_path, CHUNK_SIZE_MB), "split"
return output_file_path, None
except subprocess.CalledProcessError as e:
raise gr.Error(f"Error during downsampling: {e}")
return input_file_path, None
# subtitle maker
def format_time(seconds):
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
milliseconds = int((seconds % 1) * 1000)
return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"
def json_to_srt(transcription_json):
srt_lines = []
for segment in transcription_json:
start_time = format_time(segment['start'])
end_time = format_time(segment['end'])
text = segment['text']
srt_line = f"{segment['id']+1}\n{start_time} --> {end_time}\n{text}\n"
srt_lines.append(srt_line)
return '\n'.join(srt_lines)
def generate_subtitles(input_mode, input_file, link_input, prompt, language, auto_detect_language, model, include_video, font_selection, font_file, font_color, font_size, outline_thickness, outline_color):
if input_mode == "Upload Video/Audio File":
input_file_path = input_file
elif input_mode == "Link Video/Audio":
input_file_path = yt_dlp_download(link_input)
processed_path, split_status = check_file(input_file_path)
full_srt_content = ""
total_duration = 0
segment_id_offset = 0
if split_status == "split":
srt_chunks = []
video_chunks = []
for i, chunk_path in enumerate(processed_path):
try:
with open(chunk_path, "rb") as file:
transcription_json_response = client.audio.transcriptions.create(
file=(os.path.basename(chunk_path), file.read()),
model=model,
prompt=prompt,
response_format="verbose_json",
language=None if auto_detect_language else language,
temperature=0.0,
)
transcription_json = transcription_json_response.segments
# Adjust timestamps and segment IDs
for segment in transcription_json:
segment['start'] += total_duration
segment['end'] += total_duration
segment['id'] += segment_id_offset
segment_id_offset += len(transcription_json)
total_duration += transcription_json[-1]['end'] # Update total duration
srt_content = json_to_srt(transcription_json)
full_srt_content += srt_content
temp_srt_path = f"{os.path.splitext(chunk_path)[0]}.srt"
with open(temp_srt_path, "w", encoding="utf-8") as temp_srt_file:
temp_srt_file.write(srt_content)
temp_srt_file.write("\n") # add a new line at the end of the srt chunk file to fix format when merged
srt_chunks.append(temp_srt_path)
if include_video and input_file_path.lower().endswith((".mp4", ".webm")):
try:
output_file_path = chunk_path.replace(os.path.splitext(chunk_path)[1], "_with_subs" + os.path.splitext(chunk_path)[1])
# Handle font selection
if font_selection == "Custom Font File" and font_file:
font_name = os.path.splitext(os.path.basename(font_file.name))[0] # Get font filename without extension
font_dir = os.path.dirname(font_file.name) # Get font directory path
elif font_selection == "Custom Font File" and not font_file:
font_name = None # Let FFmpeg use its default Arial
font_dir = None # No font directory
gr.Warning(f"You want to use a Custom Font File, but uploaded none. Using the default Arial font.")
elif font_selection == "Arial":
font_name = None # Let FFmpeg use its default Arial
font_dir = None # No font directory
# FFmpeg command
subprocess.run(
[
"ffmpeg",
"-y",
"-i",
chunk_path,
"-vf",
f"subtitles={temp_srt_path}:fontsdir={font_dir}:force_style='Fontname={font_name},Fontsize={int(font_size)},PrimaryColour=&H{font_color[1:]}&,OutlineColour=&H{outline_color[1:]}&,BorderStyle={int(outline_thickness)},Outline=1'",
"-preset", "fast",
output_file_path,
],
check=True,
)
video_chunks.append(output_file_path)
except subprocess.CalledProcessError as e:
raise gr.Error(f"Error during subtitle addition: {e}")
elif include_video and not input_file_path.lower().endswith((".mp4", ".webm")):
gr.Warning(f"You have checked on the 'Include Video with Subtitles', but the input file {input_file_path} isn't a video (.mp4 or .webm). Returning only the SRT File.", duration=15)
except groq.AuthenticationError as e:
handle_groq_error(e, model)
except groq.RateLimitError as e:
handle_groq_error(e, model)
gr.Warning(f"API limit reached during chunk {i+1}. Returning processed chunks only.")
if srt_chunks and video_chunks:
merge_audio(video_chunks, 'merged_output_video.mp4')
with open('merged_output.srt', 'w', encoding="utf-8") as outfile:
for chunk_srt in srt_chunks:
with open(chunk_srt, 'r', encoding="utf-8") as infile:
outfile.write(infile.read())
return 'merged_output.srt', 'merged_output_video.mp4'
else:
raise gr.Error("Subtitle generation failed due to API limits.")
# Merge SRT chunks
final_srt_path = os.path.splitext(input_file_path)[0] + "_final.srt"
with open(final_srt_path, 'w', encoding="utf-8") as outfile:
for chunk_srt in srt_chunks:
with open(chunk_srt, 'r', encoding="utf-8") as infile:
outfile.write(infile.read())
# Merge video chunks
if video_chunks:
merge_audio(video_chunks, 'merged_output_video.mp4')
return final_srt_path, 'merged_output_video.mp4'
else:
return final_srt_path, None
else: # Single file processing (no splitting)
try:
with open(processed_path, "rb") as file:
transcription_json_response = client.audio.transcriptions.create(
file=(os.path.basename(processed_path), file.read()),
model=model,
prompt=prompt,
response_format="verbose_json",
language=None if auto_detect_language else language,
temperature=0.0,
)
transcription_json = transcription_json_response.segments
srt_content = json_to_srt(transcription_json)
temp_srt_path = os.path.splitext(input_file_path)[0] + ".srt"
with open(temp_srt_path, "w", encoding="utf-8") as temp_srt_file:
temp_srt_file.write(srt_content)
if include_video and input_file_path.lower().endswith((".mp4", ".webm")):
try:
output_file_path = input_file_path.replace(
os.path.splitext(input_file_path)[1], "_with_subs" + os.path.splitext(input_file_path)[1]
)
# Handle font selection
if font_selection == "Custom Font File" and font_file:
font_name = os.path.splitext(os.path.basename(font_file.name))[0] # Get font filename without extension
font_dir = os.path.dirname(font_file.name) # Get font directory path
elif font_selection == "Custom Font File" and not font_file:
font_name = None # Let FFmpeg use its default Arial
font_dir = None # No font directory
gr.Warning(f"You want to use a Custom Font File, but uploaded none. Using the default Arial font.")
elif font_selection == "Arial":
font_name = None # Let FFmpeg use its default Arial
font_dir = None # No font directory
# FFmpeg command
subprocess.run(
[
"ffmpeg",
"-y",
"-i",
input_file_path,
"-vf",
f"subtitles={temp_srt_path}:fontsdir={font_dir}:force_style='FontName={font_name},Fontsize={int(font_size)},PrimaryColour=&H{font_color[1:]}&,OutlineColour=&H{outline_color[1:]}&,BorderStyle={int(outline_thickness)},Outline=1'",
"-preset", "fast",
output_file_path,
],
check=True,
)
return temp_srt_path, output_file_path
except subprocess.CalledProcessError as e:
raise gr.Error(f"Error during subtitle addition: {e}")
elif include_video and not input_file_path.lower().endswith((".mp4", ".webm")):
gr.Warning(f"You have checked on the 'Include Video with Subtitles', but the input file {input_file_path} isn't a video (.mp4 or .webm). Returning only the SRT File.", duration=15)
return temp_srt_path, None
except groq.AuthenticationError as e:
handle_groq_error(e, model)
except groq.RateLimitError as e:
handle_groq_error(e, model)
except ValueError as e:
raise gr.Error(f"Error creating SRT file: {e}")
theme = gr.themes.Soft(
primary_hue="sky",
secondary_hue="blue",
neutral_hue="neutral"
).set(
border_color_primary='*neutral_300',
block_border_width='1px',
block_border_width_dark='1px',
block_title_border_color='*secondary_100',
block_title_border_color_dark='*secondary_200',
input_background_fill_focus='*secondary_300',
input_border_color='*border_color_primary',
input_border_color_focus='*secondary_500',
input_border_width='1px',
input_border_width_dark='1px',
slider_color='*secondary_500',
slider_color_dark='*secondary_600'
)
css = """
.gradio-container{max-width: 1400px !important}
h1{text-align:center}
.extra-option {
display: none;
}
.extra-option.visible {
display: block;
}
"""
with gr.Blocks(theme=theme, css=css) as interface:
gr.Markdown(
"""
# Fast Subtitle Maker
Inference by Groq API
If you are having API Rate Limit issues, you can retry later based on the [rate limits](https://console.groq.com/docs/rate-limits) or with your own API Key