import gradio as gr from pydub import AudioSegment import edge_tts import os import asyncio import uuid import re from concurrent.futures import ThreadPoolExecutor from typing import List, Tuple import math def get_audio_length(audio_file): audio = AudioSegment.from_file(audio_file) return len(audio) / 1000 def format_time_ms(milliseconds): seconds, ms = divmod(int(milliseconds), 1000) mins, secs = divmod(seconds, 60) hrs, mins = divmod(mins, 60) return f"{hrs:02}:{mins:02}:{secs:02},{ms:03}" def smart_text_split(text, words_per_line, lines_per_segment): # First split by major punctuation (periods, exclamation marks, question marks) sentences = re.split(r'([.!?]+)', text) # Recombine sentences with their punctuation sentences = [''.join(i) for i in zip(sentences[::2], sentences[1::2] + [''])] segments = [] current_segment = [] current_line = [] for sentence in sentences: # Split sentence into words words = sentence.strip().split() for word in words: current_line.append(word) # Check if current line has reached words_per_line if len(current_line) >= words_per_line: current_segment.append(' '.join(current_line)) current_line = [] # Check if current segment has reached lines_per_segment if len(current_segment) >= lines_per_segment: segments.append('\n'.join(current_segment)) current_segment = [] # If there are words in current_line, add them as a line if current_line: current_segment.append(' '.join(current_line)) current_line = [] # Check if we should start a new segment at sentence boundary if len(current_segment) >= lines_per_segment: segments.append('\n'.join(current_segment)) current_segment = [] # Add any remaining lines if current_segment: segments.append('\n'.join(current_segment)) return segments async def process_segment(segment: str, idx: int, voice: str, rate: str, pitch: str) -> Tuple[str, AudioSegment, int]: """Process a single segment concurrently""" audio_file = f"temp_segment_{idx}_{uuid.uuid4()}.wav" try: tts = edge_tts.Communicate(segment, voice, rate=rate, pitch=pitch) await tts.save(audio_file) segment_audio = AudioSegment.from_file(audio_file) segment_duration = len(segment_audio) srt_content = f"{idx}\n" return srt_content, segment_audio, segment_duration finally: if os.path.exists(audio_file): os.remove(audio_file) async def process_chunk_parallel(chunks: List[str], start_idx: int, voice: str, rate: str, pitch: str) -> Tuple[str, AudioSegment]: """Process a chunk of segments in parallel""" tasks = [ process_segment(segment, i + start_idx, voice, rate, pitch) for i, segment in enumerate(chunks, 1) ] results = await asyncio.gather(*tasks) combined_audio = AudioSegment.empty() srt_content = "" current_time = 0 for srt_part, audio_part, duration in results: srt_content += srt_part srt_content += f"{format_time_ms(current_time)} --> {format_time_ms(current_time + duration)}\n" srt_content += chunks[len(combined_audio.get_dc_offset())] + "\n\n" combined_audio += audio_part current_time += duration return srt_content, combined_audio async def generate_accurate_srt(text, voice, rate, pitch, words_per_line, lines_per_segment): segments = smart_text_split(text, words_per_line, lines_per_segment) # Split segments into chunks for parallel processing chunk_size = 10 # Process 10 segments at a time chunks = [segments[i:i + chunk_size] for i in range(0, len(segments), chunk_size)] final_srt = "" final_audio = AudioSegment.empty() # Process chunks in parallel chunk_tasks = [] for i, chunk in enumerate(chunks): start_idx = i * chunk_size + 1 task = process_chunk_parallel(chunk, start_idx, voice, rate, pitch) chunk_tasks.append(task) # Gather results chunk_results = await asyncio.gather(*chunk_tasks) # Combine results for srt_content, audio_content in chunk_results: final_srt += srt_content final_audio += audio_content # Export final files unique_id = uuid.uuid4() audio_path = f"final_audio_{unique_id}.mp3" srt_path = f"final_subtitles_{unique_id}.srt" final_audio.export(audio_path, format="mp3", bitrate="320k") with open(srt_path, "w", encoding='utf-8') as f: f.write(final_srt) return srt_path, audio_path async def process_text(text, pitch, rate, voice, words_per_line, lines_per_segment): pitch_str = f"{pitch}Hz" if pitch != 0 else "0Hz" rate_str = f"{'+' if rate > 0 else ''}{rate}%" srt_path, audio_path = await generate_accurate_srt( text, voice_options[voice], rate_str, pitch_str, words_per_line, lines_per_segment ) return srt_path, audio_path, audio_path # Voice options dictionary (same as before) voice_options = { "Andrew Male": "en-US-AndrewNeural", "Jenny Female": "en-US-JennyNeural", "Guy Male": "en-US-GuyNeural", "Ana Female": "en-US-AnaNeural", "Aria Female": "en-US-AriaNeural", "Brian Male": "en-US-BrianNeural", "Christopher Male": "en-US-ChristopherNeural", "Eric Male": "en-US-EricNeural", "Michelle Male": "en-US-MichelleNeural", "Roger Male": "en-US-RogerNeural", "Natasha Female": "en-AU-NatashaNeural", "William Male": "en-AU-WilliamNeural", "Clara Female": "en-CA-ClaraNeural", "Liam Female ": "en-CA-LiamNeural", "Libby Female": "en-GB-LibbyNeural", "Maisie": "en-GB-MaisieNeural", "Ryan": "en-GB-RyanNeural", "Sonia": "en-GB-SoniaNeural", "Thomas": "en-GB-ThomasNeural", "Sam": "en-HK-SamNeural", "Yan": "en-HK-YanNeural", "Connor": "en-IE-ConnorNeural", "Emily": "en-IE-EmilyNeural", "Neerja": "en-IN-NeerjaNeural", "Prabhat": "en-IN-PrabhatNeural", "Asilia": "en-KE-AsiliaNeural", "Chilemba": "en-KE-ChilembaNeural", "Abeo": "en-NG-AbeoNeural", "Ezinne": "en-NG-EzinneNeural", "Mitchell": "en-NZ-MitchellNeural", "James": "en-PH-JamesNeural", "Rosa": "en-PH-RosaNeural", "Luna": "en-SG-LunaNeural", "Wayne": "en-SG-WayneNeural", "Elimu": "en-TZ-ElimuNeural", "Imani": "en-TZ-ImaniNeural", "Leah": "en-ZA-LeahNeural", "Luke": "en-ZA-LukeNeural" # Add other voices here... } # Create Gradio interface app = gr.Interface( fn=process_text, inputs=[ gr.Textbox(label="Enter Text", lines=10), gr.Slider(label="Pitch Adjustment (Hz)", minimum=-20, maximum=20, value=0, step=1), gr.Slider(label="Rate Adjustment (%)", minimum=-50, maximum=50, value=0, step=1), gr.Dropdown(label="Select Voice", choices=list(voice_options.keys()), value="Jenny Female"), gr.Slider(label="Words per Line", minimum=1, maximum=15, value=8, step=1), gr.Slider(label="Lines per Segment", minimum=1, maximum=5, value=2, step=1) ], outputs=[ gr.File(label="Download SRT"), gr.File(label="Download Audio"), gr.Audio(label="Preview Audio") ], title="Advanced TTS with Configurable SRT Generation", description="Generate perfectly synchronized audio and subtitles with custom segmentation control." ) app.launch()