insta-maker-2 / app.py
hivecorp's picture
Update app.py
3b10a63 verified
raw
history blame
9.1 kB
import gradio as gr
from pydub import AudioSegment
import edge_tts
import os
import asyncio
import uuid
import re
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple
import math
class TimingManager:
def __init__(self):
self.current_time = 0
self.segment_gap = 100 # ms gap between segments
def get_timing(self, duration):
start_time = self.current_time
end_time = start_time + duration
self.current_time = end_time + self.segment_gap
return start_time, end_time
def get_audio_length(audio_file):
audio = AudioSegment.from_file(audio_file)
return len(audio) / 1000
def format_time_ms(milliseconds):
seconds, ms = divmod(int(milliseconds), 1000)
mins, secs = divmod(seconds, 60)
hrs, mins = divmod(mins, 60)
return f"{hrs:02}:{mins:02}:{secs:02},{ms:03}"
def smart_text_split(text, words_per_line, lines_per_segment):
# Define natural break patterns
end_sentence = r'[.!?]+'
mid_sentence = r'[,;:]+'
# First split by major punctuation
sentences = []
current = ""
# Clean the text and ensure proper spacing after punctuation
text = re.sub(r'([.!?,;:])\s*', r'\1 ', text).strip()
# Split into initial chunks by strong punctuation
chunks = re.split(f'({end_sentence})', text)
temp_sentences = []
for i in range(0, len(chunks)-1, 2):
if i+1 < len(chunks):
temp_sentences.append(chunks[i] + chunks[i+1])
else:
temp_sentences.append(chunks[i])
# Further process each sentence
for sentence in temp_sentences:
# Split by mid-sentence punctuation if sentence is too long
if len(sentence.split()) > words_per_line * 2:
sub_chunks = re.split(f'({mid_sentence})', sentence)
for i in range(0, len(sub_chunks)-1, 2):
if i+1 < len(sub_chunks):
sentences.append(sub_chunks[i] + sub_chunks[i+1])
else:
sentences.append(sub_chunks[i])
else:
sentences.append(sentence)
# Process sentences into lines and segments
segments = []
current_segment = []
current_line = []
for sentence in sentences:
words = sentence.strip().split()
while words:
# Determine natural break point
break_point = min(words_per_line, len(words))
# Look for natural breaks
for i in range(break_point-1, 0, -1):
if any(words[i-1].endswith(p) for p in '.!?,;:') or \
any(words[i].startswith(p) for p in '([{'):
break_point = i
break
current_line = words[:break_point]
words = words[break_point:]
current_segment.append(' '.join(current_line))
if len(current_segment) >= lines_per_segment:
segments.append('\n'.join(current_segment))
current_segment = []
# Handle remaining content
if current_segment:
segments.append('\n'.join(current_segment))
return segments
async def process_segment(segment: str, idx: int, voice: str, rate: str, pitch: str, timing_mgr: TimingManager) -> Tuple[str, AudioSegment]:
"""Process a single segment with accurate timing"""
audio_file = f"temp_segment_{idx}_{uuid.uuid4()}.wav"
try:
tts = edge_tts.Communicate(segment, voice, rate=rate, pitch=pitch)
await tts.save(audio_file)
segment_audio = AudioSegment.from_file(audio_file)
segment_duration = len(segment_audio)
# Get timing from manager
start_time, end_time = timing_mgr.get_timing(segment_duration)
# Format SRT entry
srt_content = (
f"{idx}\n"
f"{format_time_ms(start_time)} --> {format_time_ms(end_time)}\n"
f"{segment}\n\n"
)
return srt_content, segment_audio
finally:
if os.path.exists(audio_file):
os.remove(audio_file)
async def process_chunk_parallel(chunks: List[str], start_idx: int, voice: str, rate: str, pitch: str, timing_mgr: TimingManager) -> Tuple[str, AudioSegment]:
"""Process chunks with sequential timing"""
combined_audio = AudioSegment.empty()
srt_content = ""
# Process segments sequentially to maintain timing
for i, segment in enumerate(chunks, start_idx):
srt_part, audio_part = await process_segment(segment, i, voice, rate, pitch, timing_mgr)
srt_content += srt_part
combined_audio += audio_part
return srt_content, combined_audio
async def generate_accurate_srt(text, voice, rate, pitch, words_per_line, lines_per_segment):
segments = smart_text_split(text, words_per_line, lines_per_segment)
timing_mgr = TimingManager()
# Process in smaller chunks
chunk_size = 5
chunks = [segments[i:i + chunk_size] for i in range(0, len(segments), chunk_size)]
final_srt = ""
final_audio = AudioSegment.empty()
current_index = 1
# Process chunks in parallel but maintain sequential timing
chunk_tasks = []
for i, chunk in enumerate(chunks):
start_idx = current_index + (i * chunk_size)
task = process_chunk_parallel(chunk, start_idx, voice, rate, pitch, timing_mgr)
chunk_tasks.append(task)
# Gather results in order
chunk_results = await asyncio.gather(*chunk_tasks)
# Combine results
for srt_content, audio_content in chunk_results:
final_srt += srt_content
final_audio += audio_content
# Export final files
unique_id = uuid.uuid4()
audio_path = f"final_audio_{unique_id}.mp3"
srt_path = f"final_subtitles_{unique_id}.srt"
final_audio.export(audio_path, format="mp3", bitrate="320k")
with open(srt_path, "w", encoding='utf-8') as f:
f.write(final_srt)
return srt_path, audio_path
async def process_text(text, pitch, rate, voice, words_per_line, lines_per_segment):
# Set default pitch and rate strings that work well
pitch_str = "+0Hz" # neutral pitch
rate_str = "+0%" # neutral rate
# Only modify if user has changed values
if pitch != 0:
pitch_str = f"{pitch:+d}Hz"
if rate != 0:
rate_str = f"{rate:+d}%"
srt_path, audio_path = await generate_accurate_srt(
text,
voice_options[voice],
rate_str,
pitch_str,
words_per_line,
lines_per_segment
)
return srt_path, audio_path, audio_path
# Voice options dictionary (same as before)
voice_options = {
"Andrew Male": "en-US-AndrewNeural",
"Jenny Female": "en-US-JennyNeural",
"Guy Male": "en-US-GuyNeural",
"Ana Female": "en-US-AnaNeural",
"Aria Female": "en-US-AriaNeural",
"Brian Male": "en-US-BrianNeural",
"Christopher Male": "en-US-ChristopherNeural",
"Eric Male": "en-US-EricNeural",
"Michelle Male": "en-US-MichelleNeural",
"Roger Male": "en-US-RogerNeural",
"Natasha Female": "en-AU-NatashaNeural",
"William Male": "en-AU-WilliamNeural",
"Clara Female": "en-CA-ClaraNeural",
"Liam Female ": "en-CA-LiamNeural",
"Libby Female": "en-GB-LibbyNeural",
"Maisie": "en-GB-MaisieNeural",
"Ryan": "en-GB-RyanNeural",
"Sonia": "en-GB-SoniaNeural",
"Thomas": "en-GB-ThomasNeural",
"Sam": "en-HK-SamNeural",
"Yan": "en-HK-YanNeural",
"Connor": "en-IE-ConnorNeural",
"Emily": "en-IE-EmilyNeural",
"Neerja": "en-IN-NeerjaNeural",
"Prabhat": "en-IN-PrabhatNeural",
"Asilia": "en-KE-AsiliaNeural",
"Chilemba": "en-KE-ChilembaNeural",
"Abeo": "en-NG-AbeoNeural",
"Ezinne": "en-NG-EzinneNeural",
"Mitchell": "en-NZ-MitchellNeural",
"James": "en-PH-JamesNeural",
"Rosa": "en-PH-RosaNeural",
"Luna": "en-SG-LunaNeural",
"Wayne": "en-SG-WayneNeural",
"Elimu": "en-TZ-ElimuNeural",
"Imani": "en-TZ-ImaniNeural",
"Leah": "en-ZA-LeahNeural",
"Luke": "en-ZA-LukeNeural"
# Add other voices here...
}
# Create Gradio interface
app = gr.Interface(
fn=process_text,
inputs=[
gr.Textbox(label="Enter Text", lines=10),
gr.Slider(label="Pitch Adjustment (Hz)", minimum=-10, maximum=10, value=0, step=1),
gr.Slider(label="Rate Adjustment (%)", minimum=-25, maximum=25, value=0, step=1),
gr.Dropdown(label="Select Voice", choices=list(voice_options.keys()), value="Jenny Female"),
gr.Slider(label="Words per Line", minimum=3, maximum=12, value=6, step=1),
gr.Slider(label="Lines per Segment", minimum=1, maximum=4, value=2, step=1)
],
outputs=[
gr.File(label="Download SRT"),
gr.File(label="Download Audio"),
gr.Audio(label="Preview Audio")
],
title="Advanced TTS with Configurable SRT Generation",
description="Generate perfectly synchronized audio and subtitles with natural speech patterns."
)
app.launch()