Spaces:
Sleeping
Sleeping
import gradio as gr | |
from pydub import AudioSegment | |
import edge_tts | |
import os | |
import asyncio | |
import uuid | |
import re | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import List, Tuple | |
import math | |
def get_audio_length(audio_file): | |
audio = AudioSegment.from_file(audio_file) | |
return len(audio) / 1000 | |
def format_time_ms(milliseconds): | |
seconds, ms = divmod(int(milliseconds), 1000) | |
mins, secs = divmod(seconds, 60) | |
hrs, mins = divmod(mins, 60) | |
return f"{hrs:02}:{mins:02}:{secs:02},{ms:03}" | |
def smart_text_split(text, words_per_line, lines_per_segment): | |
# First split by major punctuation (periods, exclamation marks, question marks) | |
sentences = re.split(r'([.!?]+)', text) | |
# Recombine sentences with their punctuation | |
sentences = [''.join(i) for i in zip(sentences[::2], sentences[1::2] + [''])] | |
segments = [] | |
current_segment = [] | |
current_line = [] | |
for sentence in sentences: | |
# Split sentence into words | |
words = sentence.strip().split() | |
for word in words: | |
current_line.append(word) | |
# Check if current line has reached words_per_line | |
if len(current_line) >= words_per_line: | |
current_segment.append(' '.join(current_line)) | |
current_line = [] | |
# Check if current segment has reached lines_per_segment | |
if len(current_segment) >= lines_per_segment: | |
segments.append('\n'.join(current_segment)) | |
current_segment = [] | |
# If there are words in current_line, add them as a line | |
if current_line: | |
current_segment.append(' '.join(current_line)) | |
current_line = [] | |
# Check if we should start a new segment at sentence boundary | |
if len(current_segment) >= lines_per_segment: | |
segments.append('\n'.join(current_segment)) | |
current_segment = [] | |
# Add any remaining lines | |
if current_segment: | |
segments.append('\n'.join(current_segment)) | |
return segments | |
async def process_segment(segment: str, idx: int, voice: str, rate: str, pitch: str) -> Tuple[str, AudioSegment, int]: | |
"""Process a single segment concurrently""" | |
audio_file = f"temp_segment_{idx}_{uuid.uuid4()}.wav" | |
try: | |
tts = edge_tts.Communicate(segment, voice, rate=rate, pitch=pitch) | |
await tts.save(audio_file) | |
segment_audio = AudioSegment.from_file(audio_file) | |
segment_duration = len(segment_audio) | |
srt_content = f"{idx}\n" | |
return srt_content, segment_audio, segment_duration | |
finally: | |
if os.path.exists(audio_file): | |
os.remove(audio_file) | |
async def process_chunk_parallel(chunks: List[str], start_idx: int, voice: str, rate: str, pitch: str) -> Tuple[str, AudioSegment]: | |
"""Process a chunk of segments in parallel""" | |
tasks = [ | |
process_segment(segment, i + start_idx, voice, rate, pitch) | |
for i, segment in enumerate(chunks, 1) | |
] | |
results = await asyncio.gather(*tasks) | |
combined_audio = AudioSegment.empty() | |
srt_content = "" | |
current_time = 0 | |
for srt_part, audio_part, duration in results: | |
srt_content += srt_part | |
srt_content += f"{format_time_ms(current_time)} --> {format_time_ms(current_time + duration)}\n" | |
srt_content += chunks[len(combined_audio.get_dc_offset())] + "\n\n" | |
combined_audio += audio_part | |
current_time += duration | |
return srt_content, combined_audio | |
async def generate_accurate_srt(text, voice, rate, pitch, words_per_line, lines_per_segment): | |
segments = smart_text_split(text, words_per_line, lines_per_segment) | |
# Split segments into chunks for parallel processing | |
chunk_size = 10 # Process 10 segments at a time | |
chunks = [segments[i:i + chunk_size] for i in range(0, len(segments), chunk_size)] | |
final_srt = "" | |
final_audio = AudioSegment.empty() | |
# Process chunks in parallel | |
chunk_tasks = [] | |
for i, chunk in enumerate(chunks): | |
start_idx = i * chunk_size + 1 | |
task = process_chunk_parallel(chunk, start_idx, voice, rate, pitch) | |
chunk_tasks.append(task) | |
# Gather results | |
chunk_results = await asyncio.gather(*chunk_tasks) | |
# Combine results | |
for srt_content, audio_content in chunk_results: | |
final_srt += srt_content | |
final_audio += audio_content | |
# Export final files | |
unique_id = uuid.uuid4() | |
audio_path = f"final_audio_{unique_id}.mp3" | |
srt_path = f"final_subtitles_{unique_id}.srt" | |
final_audio.export(audio_path, format="mp3", bitrate="320k") | |
with open(srt_path, "w", encoding='utf-8') as f: | |
f.write(final_srt) | |
return srt_path, audio_path | |
async def process_text(text, pitch, rate, voice, words_per_line, lines_per_segment): | |
pitch_str = f"{pitch}Hz" if pitch != 0 else "0Hz" | |
rate_str = f"{'+' if rate > 0 else ''}{rate}%" | |
srt_path, audio_path = await generate_accurate_srt( | |
text, | |
voice_options[voice], | |
rate_str, | |
pitch_str, | |
words_per_line, | |
lines_per_segment | |
) | |
return srt_path, audio_path, audio_path | |
# Voice options dictionary (same as before) | |
voice_options = { | |
"Andrew Male": "en-US-AndrewNeural", | |
"Jenny Female": "en-US-JennyNeural", | |
"Guy Male": "en-US-GuyNeural", | |
"Ana Female": "en-US-AnaNeural", | |
"Aria Female": "en-US-AriaNeural", | |
"Brian Male": "en-US-BrianNeural", | |
"Christopher Male": "en-US-ChristopherNeural", | |
"Eric Male": "en-US-EricNeural", | |
"Michelle Male": "en-US-MichelleNeural", | |
"Roger Male": "en-US-RogerNeural", | |
"Natasha Female": "en-AU-NatashaNeural", | |
"William Male": "en-AU-WilliamNeural", | |
"Clara Female": "en-CA-ClaraNeural", | |
"Liam Female ": "en-CA-LiamNeural", | |
"Libby Female": "en-GB-LibbyNeural", | |
"Maisie": "en-GB-MaisieNeural", | |
"Ryan": "en-GB-RyanNeural", | |
"Sonia": "en-GB-SoniaNeural", | |
"Thomas": "en-GB-ThomasNeural", | |
"Sam": "en-HK-SamNeural", | |
"Yan": "en-HK-YanNeural", | |
"Connor": "en-IE-ConnorNeural", | |
"Emily": "en-IE-EmilyNeural", | |
"Neerja": "en-IN-NeerjaNeural", | |
"Prabhat": "en-IN-PrabhatNeural", | |
"Asilia": "en-KE-AsiliaNeural", | |
"Chilemba": "en-KE-ChilembaNeural", | |
"Abeo": "en-NG-AbeoNeural", | |
"Ezinne": "en-NG-EzinneNeural", | |
"Mitchell": "en-NZ-MitchellNeural", | |
"James": "en-PH-JamesNeural", | |
"Rosa": "en-PH-RosaNeural", | |
"Luna": "en-SG-LunaNeural", | |
"Wayne": "en-SG-WayneNeural", | |
"Elimu": "en-TZ-ElimuNeural", | |
"Imani": "en-TZ-ImaniNeural", | |
"Leah": "en-ZA-LeahNeural", | |
"Luke": "en-ZA-LukeNeural" | |
# Add other voices here... | |
} | |
# Create Gradio interface | |
app = gr.Interface( | |
fn=process_text, | |
inputs=[ | |
gr.Textbox(label="Enter Text", lines=10), | |
gr.Slider(label="Pitch Adjustment (Hz)", minimum=-20, maximum=20, value=0, step=1), | |
gr.Slider(label="Rate Adjustment (%)", minimum=-50, maximum=50, value=0, step=1), | |
gr.Dropdown(label="Select Voice", choices=list(voice_options.keys()), value="Jenny Female"), | |
gr.Slider(label="Words per Line", minimum=1, maximum=15, value=8, step=1), | |
gr.Slider(label="Lines per Segment", minimum=1, maximum=5, value=2, step=1) | |
], | |
outputs=[ | |
gr.File(label="Download SRT"), | |
gr.File(label="Download Audio"), | |
gr.Audio(label="Preview Audio") | |
], | |
title="Advanced TTS with Configurable SRT Generation", | |
description="Generate perfectly synchronized audio and subtitles with custom segmentation control." | |
) | |
app.launch() |