Spaces:
Sleeping
Sleeping
import gradio as gr | |
from pydub import AudioSegment | |
import edge_tts | |
import os | |
import asyncio | |
import uuid | |
import re | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import List, Tuple, Optional | |
import math | |
from dataclasses import dataclass | |
class TimingManager: | |
def __init__(self): | |
self.current_time = 0 | |
self.segment_gap = 100 # ms gap between segments | |
def get_timing(self, duration): | |
start_time = self.current_time | |
end_time = start_time + duration | |
self.current_time = end_time + self.segment_gap | |
return start_time, end_time | |
def get_audio_length(audio_file): | |
audio = AudioSegment.from_file(audio_file) | |
return len(audio) / 1000 | |
def format_time_ms(milliseconds): | |
seconds, ms = divmod(int(milliseconds), 1000) | |
mins, secs = divmod(seconds, 60) | |
hrs, mins = divmod(mins, 60) | |
return f"{hrs:02}:{mins:02}:{secs:02},{ms:03}" | |
class Segment: | |
id: int | |
text: str | |
start_time: int = 0 | |
end_time: int = 0 | |
duration: int = 0 | |
audio: Optional[AudioSegment] = None | |
lines: List[str] = None # Add lines field for display purposes only | |
class TextProcessor: | |
def __init__(self, words_per_line: int, lines_per_segment: int): | |
self.words_per_line = words_per_line | |
self.lines_per_segment = lines_per_segment | |
self.min_segment_words = 3 | |
self.max_segment_words = words_per_line * lines_per_segment * 1.5 # Allow 50% more for natural breaks | |
self.punctuation_weights = { | |
'.': 1.0, # Strong break | |
'!': 1.0, | |
'?': 1.0, | |
';': 0.8, # Medium-strong break | |
':': 0.7, | |
',': 0.5, # Medium break | |
'-': 0.3, # Weak break | |
'(': 0.2, | |
')': 0.2 | |
} | |
def analyze_sentence_complexity(self, text: str) -> float: | |
"""Analyze sentence complexity to determine optimal segment length""" | |
words = text.split() | |
complexity = 1.0 | |
# Adjust for sentence length | |
if len(words) > self.words_per_line * 2: | |
complexity *= 1.2 | |
# Adjust for punctuation density | |
punct_count = sum(text.count(p) for p in self.punctuation_weights.keys()) | |
complexity *= (1 + (punct_count / len(words)) * 0.5) | |
return complexity | |
def find_natural_breaks(self, text: str) -> List[Tuple[int, float]]: | |
"""Find natural break points with their weights""" | |
breaks = [] | |
words = text.split() | |
for i, word in enumerate(words): | |
weight = 0 | |
# Check for punctuation | |
for punct, punct_weight in self.punctuation_weights.items(): | |
if word.endswith(punct): | |
weight = max(weight, punct_weight) | |
# Check for natural phrase boundaries | |
phrase_starters = {'however', 'therefore', 'moreover', 'furthermore', 'meanwhile', 'although', 'because'} | |
if i < len(words) - 1 and words[i+1].lower() in phrase_starters: | |
weight = max(weight, 0.6) | |
# Check for conjunctions at natural points | |
if i > self.min_segment_words: | |
conjunctions = {'and', 'but', 'or', 'nor', 'for', 'yet', 'so'} | |
if word.lower() in conjunctions: | |
weight = max(weight, 0.4) | |
if weight > 0: | |
breaks.append((i, weight)) | |
return breaks | |
def split_into_segments(self, text: str) -> List[Segment]: | |
# Normalize text and add proper spacing around punctuation | |
text = re.sub(r'\s+', ' ', text.strip()) | |
text = re.sub(r'([.!?,;:])\s*', r'\1 ', text) | |
text = re.sub(r'\s+([.!?,;:])', r'\1', text) | |
# First, split into major segments by strong punctuation | |
segments = [] | |
current_segment = [] | |
current_text = "" | |
words = text.split() | |
i = 0 | |
while i < len(words): | |
complexity = self.analyze_sentence_complexity(' '.join(words[i:i + self.words_per_line * 2])) | |
breaks = self.find_natural_breaks(' '.join(words[i:i + int(self.max_segment_words * complexity)])) | |
# Find best break point | |
best_break = None | |
best_weight = 0 | |
for break_idx, weight in breaks: | |
actual_idx = i + break_idx | |
if (actual_idx - i >= self.min_segment_words and | |
actual_idx - i <= self.max_segment_words): | |
if weight > best_weight: | |
best_break = break_idx | |
best_weight = weight | |
if best_break is None: | |
# If no good break found, use maximum length | |
best_break = min(self.words_per_line * self.lines_per_segment, len(words) - i) | |
# Create segment | |
segment_words = words[i:i + best_break + 1] | |
segment_text = ' '.join(segment_words) | |
# Split segment into lines | |
lines = self.split_into_lines(segment_text) | |
final_segment_text = '\n'.join(lines) | |
segments.append(Segment( | |
id=len(segments) + 1, | |
text=final_segment_text | |
)) | |
i += best_break + 1 | |
return segments | |
def split_into_lines(self, text: str) -> List[str]: | |
"""Split segment text into natural lines""" | |
words = text.split() | |
lines = [] | |
current_line = [] | |
word_count = 0 | |
for word in words: | |
current_line.append(word) | |
word_count += 1 | |
# Check for natural line breaks | |
is_break = ( | |
word_count >= self.words_per_line or | |
any(word.endswith(p) for p in '.!?') or | |
(word_count >= self.words_per_line * 0.7 and | |
any(word.endswith(p) for p in ',;:')) | |
) | |
if is_break: | |
lines.append(' '.join(current_line)) | |
current_line = [] | |
word_count = 0 | |
if current_line: | |
lines.append(' '.join(current_line)) | |
return lines | |
async def process_segment_with_timing(segment: Segment, voice: str, rate: str, pitch: str) -> Segment: | |
"""Process a complete segment as a single TTS unit""" | |
audio_file = f"temp_segment_{segment.id}_{uuid.uuid4()}.wav" | |
try: | |
# Process the entire segment text as one unit, replacing newlines with spaces | |
segment_text = ' '.join(segment.text.split('\n')) | |
tts = edge_tts.Communicate(segment_text, voice, rate=rate, pitch=pitch) | |
await tts.save(audio_file) | |
segment.audio = AudioSegment.from_file(audio_file) | |
# Add small silence at start and end for natural spacing | |
silence = AudioSegment.silent(duration=50) | |
segment.audio = silence + segment.audio + silence | |
segment.duration = len(segment.audio) | |
return segment | |
finally: | |
if os.path.exists(audio_file): | |
os.remove(audio_file) | |
async def generate_accurate_srt(text: str, voice: str, rate: str, pitch: str, words_per_line: int, lines_per_segment: int) -> Tuple[str, str]: | |
processor = TextProcessor(words_per_line, lines_per_segment) | |
segments = processor.split_into_segments(text) | |
# Process segments sequentially for better timing control | |
processed_segments = [] | |
current_time = 0 | |
final_audio = AudioSegment.empty() | |
srt_content = "" | |
for segment in segments: | |
# Process segment | |
processed_segment = await process_segment_with_timing(segment, voice, rate, pitch) | |
# Calculate precise timing | |
processed_segment.start_time = current_time | |
processed_segment.end_time = current_time + processed_segment.duration | |
# Add to SRT with precise timing | |
srt_content += ( | |
f"{processed_segment.id}\n" | |
f"{format_time_ms(processed_segment.start_time)} --> {format_time_ms(processed_segment.end_time)}\n" | |
f"{processed_segment.text}\n\n" | |
) | |
# Add to final audio with precise positioning | |
final_audio = final_audio.append(processed_segment.audio, crossfade=0) | |
# Update timing with precise gap | |
current_time = processed_segment.end_time | |
processed_segments.append(processed_segment) | |
# Export with high precision | |
unique_id = uuid.uuid4() | |
audio_path = f"final_audio_{unique_id}.mp3" | |
srt_path = f"final_subtitles_{unique_id}.srt" | |
# Export with high quality settings for precise timing | |
final_audio.export( | |
audio_path, | |
format="mp3", | |
bitrate="320k", | |
parameters=["-ar", "48000", "-ac", "2"] | |
) | |
with open(srt_path, "w", encoding='utf-8') as f: | |
f.write(srt_content) | |
return srt_path, audio_path | |
async def process_text(text, pitch, rate, voice, words_per_line, lines_per_segment): | |
# Format pitch and rate strings | |
pitch_str = f"{pitch:+d}Hz" if pitch != 0 else "+0Hz" | |
rate_str = f"{rate:+d}%" if rate != 0 else "+0%" | |
srt_path, audio_path = await generate_accurate_srt( | |
text, | |
voice_options[voice], | |
rate_str, | |
pitch_str, | |
words_per_line, | |
lines_per_segment | |
) | |
return srt_path, audio_path, audio_path | |
# Voice options dictionary (same as before) | |
voice_options = { | |
"Andrew Male": "en-US-AndrewNeural", | |
"Jenny Female": "en-US-JennyNeural", | |
"Guy Male": "en-US-GuyNeural", | |
"Ana Female": "en-US-AnaNeural", | |
"Aria Female": "en-US-AriaNeural", | |
"Brian Male": "en-US-BrianNeural", | |
"Christopher Male": "en-US-ChristopherNeural", | |
"Eric Male": "en-US-EricNeural", | |
"Michelle Male": "en-US-MichelleNeural", | |
"Roger Male": "en-US-RogerNeural", | |
"Natasha Female": "en-AU-NatashaNeural", | |
"William Male": "en-AU-WilliamNeural", | |
"Clara Female": "en-CA-ClaraNeural", | |
"Liam Female ": "en-CA-LiamNeural", | |
"Libby Female": "en-GB-LibbyNeural", | |
"Maisie": "en-GB-MaisieNeural", | |
"Ryan": "en-GB-RyanNeural", | |
"Sonia": "en-GB-SoniaNeural", | |
"Thomas": "en-GB-ThomasNeural", | |
"Sam": "en-HK-SamNeural", | |
"Yan": "en-HK-YanNeural", | |
"Connor": "en-IE-ConnorNeural", | |
"Emily": "en-IE-EmilyNeural", | |
"Neerja": "en-IN-NeerjaNeural", | |
"Prabhat": "en-IN-PrabhatNeural", | |
"Asilia": "en-KE-AsiliaNeural", | |
"Chilemba": "en-KE-ChilembaNeural", | |
"Abeo": "en-NG-AbeoNeural", | |
"Ezinne": "en-NG-EzinneNeural", | |
"Mitchell": "en-NZ-MitchellNeural", | |
"James": "en-PH-JamesNeural", | |
"Rosa": "en-PH-RosaNeural", | |
"Luna": "en-SG-LunaNeural", | |
"Wayne": "en-SG-WayneNeural", | |
"Elimu": "en-TZ-ElimuNeural", | |
"Imani": "en-TZ-ImaniNeural", | |
"Leah": "en-ZA-LeahNeural", | |
"Luke": "en-ZA-LukeNeural" | |
# Add other voices here... | |
} | |
# Create Gradio interface | |
app = gr.Interface( | |
fn=process_text, | |
inputs=[ | |
gr.Textbox(label="Enter Text", lines=10), | |
gr.Slider(label="Pitch Adjustment (Hz)", minimum=-10, maximum=10, value=0, step=1), | |
gr.Slider(label="Speed Adjustment (%)", minimum=-25, maximum=25, value=0, step=1), | |
gr.Dropdown(label="Select Voice", choices=list(voice_options.keys()), value="Jenny Female"), | |
gr.Slider(label="Words per Line", minimum=3, maximum=12, value=6, step=1), | |
gr.Slider(label="Lines per Segment", minimum=1, maximum=4, value=2, step=1) | |
], | |
outputs=[ | |
gr.File(label="Download SRT"), | |
gr.File(label="Download Audio"), | |
gr.Audio(label="Preview Audio") | |
], | |
title="Advanced TTS with Configurable SRT Generation", | |
description="Generate perfectly synchronized audio and subtitles with natural speech patterns." | |
) | |
app.launch() |