Spaces:
Sleeping
Sleeping
File size: 7,759 Bytes
f7e1683 0ce84cc f4b5c65 0ce84cc 3a1afda 0ce84cc b2e635f f5e4024 b2e635f 0ce84cc b2e635f f5e4024 b2e635f f5e4024 b2e635f 0ce84cc 3a1afda b2e635f 0ce84cc b2e635f 3a1afda 0ce84cc 3a1afda b2e635f 3a1afda b2e635f 0ce84cc b2e635f 3a1afda b2e635f 3a1afda b2e635f f4b5c65 b2e635f 0ce84cc 7697af6 b2e635f 0ce84cc f5e4024 0ce84cc a0f7708 4b97382 0ce84cc b2e635f 0ce84cc b2e635f c14c0c8 b2e635f 0ce84cc b2e635f 0ce84cc b2e635f 3927c7f b2e635f 3927c7f 077e0e7 b2e635f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
import gradio as gr
from pydub import AudioSegment
import edge_tts
import os
import asyncio
import uuid
import re
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple
import math
def get_audio_length(audio_file):
audio = AudioSegment.from_file(audio_file)
return len(audio) / 1000
def format_time_ms(milliseconds):
seconds, ms = divmod(int(milliseconds), 1000)
mins, secs = divmod(seconds, 60)
hrs, mins = divmod(mins, 60)
return f"{hrs:02}:{mins:02}:{secs:02},{ms:03}"
def smart_text_split(text, words_per_line, lines_per_segment):
# First split by major punctuation (periods, exclamation marks, question marks)
sentences = re.split(r'([.!?]+)', text)
# Recombine sentences with their punctuation
sentences = [''.join(i) for i in zip(sentences[::2], sentences[1::2] + [''])]
segments = []
current_segment = []
current_line = []
for sentence in sentences:
# Split sentence into words
words = sentence.strip().split()
for word in words:
current_line.append(word)
# Check if current line has reached words_per_line
if len(current_line) >= words_per_line:
current_segment.append(' '.join(current_line))
current_line = []
# Check if current segment has reached lines_per_segment
if len(current_segment) >= lines_per_segment:
segments.append('\n'.join(current_segment))
current_segment = []
# If there are words in current_line, add them as a line
if current_line:
current_segment.append(' '.join(current_line))
current_line = []
# Check if we should start a new segment at sentence boundary
if len(current_segment) >= lines_per_segment:
segments.append('\n'.join(current_segment))
current_segment = []
# Add any remaining lines
if current_segment:
segments.append('\n'.join(current_segment))
return segments
async def process_segment(segment: str, idx: int, voice: str, rate: str, pitch: str) -> Tuple[str, AudioSegment, int]:
"""Process a single segment concurrently"""
audio_file = f"temp_segment_{idx}_{uuid.uuid4()}.wav"
try:
tts = edge_tts.Communicate(segment, voice, rate=rate, pitch=pitch)
await tts.save(audio_file)
segment_audio = AudioSegment.from_file(audio_file)
segment_duration = len(segment_audio)
srt_content = f"{idx}\n"
return srt_content, segment_audio, segment_duration
finally:
if os.path.exists(audio_file):
os.remove(audio_file)
async def process_chunk_parallel(chunks: List[str], start_idx: int, voice: str, rate: str, pitch: str) -> Tuple[str, AudioSegment]:
"""Process a chunk of segments in parallel"""
tasks = [
process_segment(segment, i + start_idx, voice, rate, pitch)
for i, segment in enumerate(chunks, 1)
]
results = await asyncio.gather(*tasks)
combined_audio = AudioSegment.empty()
srt_content = ""
current_time = 0
for srt_part, audio_part, duration in results:
srt_content += srt_part
srt_content += f"{format_time_ms(current_time)} --> {format_time_ms(current_time + duration)}\n"
srt_content += chunks[len(combined_audio.get_dc_offset())] + "\n\n"
combined_audio += audio_part
current_time += duration
return srt_content, combined_audio
async def generate_accurate_srt(text, voice, rate, pitch, words_per_line, lines_per_segment):
segments = smart_text_split(text, words_per_line, lines_per_segment)
# Split segments into chunks for parallel processing
chunk_size = 10 # Process 10 segments at a time
chunks = [segments[i:i + chunk_size] for i in range(0, len(segments), chunk_size)]
final_srt = ""
final_audio = AudioSegment.empty()
# Process chunks in parallel
chunk_tasks = []
for i, chunk in enumerate(chunks):
start_idx = i * chunk_size + 1
task = process_chunk_parallel(chunk, start_idx, voice, rate, pitch)
chunk_tasks.append(task)
# Gather results
chunk_results = await asyncio.gather(*chunk_tasks)
# Combine results
for srt_content, audio_content in chunk_results:
final_srt += srt_content
final_audio += audio_content
# Export final files
unique_id = uuid.uuid4()
audio_path = f"final_audio_{unique_id}.mp3"
srt_path = f"final_subtitles_{unique_id}.srt"
final_audio.export(audio_path, format="mp3", bitrate="320k")
with open(srt_path, "w", encoding='utf-8') as f:
f.write(final_srt)
return srt_path, audio_path
async def process_text(text, pitch, rate, voice, words_per_line, lines_per_segment):
pitch_str = f"{pitch}Hz" if pitch != 0 else "0Hz"
rate_str = f"{'+' if rate > 0 else ''}{rate}%"
srt_path, audio_path = await generate_accurate_srt(
text,
voice_options[voice],
rate_str,
pitch_str,
words_per_line,
lines_per_segment
)
return srt_path, audio_path, audio_path
# Voice options dictionary (same as before)
voice_options = {
"Andrew Male": "en-US-AndrewNeural",
"Jenny Female": "en-US-JennyNeural",
"Guy Male": "en-US-GuyNeural",
"Ana Female": "en-US-AnaNeural",
"Aria Female": "en-US-AriaNeural",
"Brian Male": "en-US-BrianNeural",
"Christopher Male": "en-US-ChristopherNeural",
"Eric Male": "en-US-EricNeural",
"Michelle Male": "en-US-MichelleNeural",
"Roger Male": "en-US-RogerNeural",
"Natasha Female": "en-AU-NatashaNeural",
"William Male": "en-AU-WilliamNeural",
"Clara Female": "en-CA-ClaraNeural",
"Liam Female ": "en-CA-LiamNeural",
"Libby Female": "en-GB-LibbyNeural",
"Maisie": "en-GB-MaisieNeural",
"Ryan": "en-GB-RyanNeural",
"Sonia": "en-GB-SoniaNeural",
"Thomas": "en-GB-ThomasNeural",
"Sam": "en-HK-SamNeural",
"Yan": "en-HK-YanNeural",
"Connor": "en-IE-ConnorNeural",
"Emily": "en-IE-EmilyNeural",
"Neerja": "en-IN-NeerjaNeural",
"Prabhat": "en-IN-PrabhatNeural",
"Asilia": "en-KE-AsiliaNeural",
"Chilemba": "en-KE-ChilembaNeural",
"Abeo": "en-NG-AbeoNeural",
"Ezinne": "en-NG-EzinneNeural",
"Mitchell": "en-NZ-MitchellNeural",
"James": "en-PH-JamesNeural",
"Rosa": "en-PH-RosaNeural",
"Luna": "en-SG-LunaNeural",
"Wayne": "en-SG-WayneNeural",
"Elimu": "en-TZ-ElimuNeural",
"Imani": "en-TZ-ImaniNeural",
"Leah": "en-ZA-LeahNeural",
"Luke": "en-ZA-LukeNeural"
# Add other voices here...
}
# Create Gradio interface
app = gr.Interface(
fn=process_text,
inputs=[
gr.Textbox(label="Enter Text", lines=10),
gr.Slider(label="Pitch Adjustment (Hz)", minimum=-20, maximum=20, value=0, step=1),
gr.Slider(label="Rate Adjustment (%)", minimum=-50, maximum=50, value=0, step=1),
gr.Dropdown(label="Select Voice", choices=list(voice_options.keys()), value="Jenny Female"),
gr.Slider(label="Words per Line", minimum=1, maximum=15, value=8, step=1),
gr.Slider(label="Lines per Segment", minimum=1, maximum=5, value=2, step=1)
],
outputs=[
gr.File(label="Download SRT"),
gr.File(label="Download Audio"),
gr.Audio(label="Preview Audio")
],
title="Advanced TTS with Configurable SRT Generation",
description="Generate perfectly synchronized audio and subtitles with custom segmentation control."
)
app.launch() |