hivecorp commited on
Commit
c2e3966
·
verified ·
1 Parent(s): 3b10a63

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +105 -132
app.py CHANGED
@@ -6,8 +6,9 @@ import asyncio
6
  import uuid
7
  import re
8
  from concurrent.futures import ThreadPoolExecutor
9
- from typing import List, Tuple
10
  import math
 
11
 
12
  class TimingManager:
13
  def __init__(self):
@@ -30,164 +31,136 @@ def format_time_ms(milliseconds):
30
  hrs, mins = divmod(mins, 60)
31
  return f"{hrs:02}:{mins:02}:{secs:02},{ms:03}"
32
 
33
- def smart_text_split(text, words_per_line, lines_per_segment):
34
- # Define natural break patterns
35
- end_sentence = r'[.!?]+'
36
- mid_sentence = r'[,;:]+'
37
-
38
- # First split by major punctuation
39
- sentences = []
40
- current = ""
41
-
42
- # Clean the text and ensure proper spacing after punctuation
43
- text = re.sub(r'([.!?,;:])\s*', r'\1 ', text).strip()
44
-
45
- # Split into initial chunks by strong punctuation
46
- chunks = re.split(f'({end_sentence})', text)
47
- temp_sentences = []
48
-
49
- for i in range(0, len(chunks)-1, 2):
50
- if i+1 < len(chunks):
51
- temp_sentences.append(chunks[i] + chunks[i+1])
52
- else:
53
- temp_sentences.append(chunks[i])
54
-
55
- # Further process each sentence
56
- for sentence in temp_sentences:
57
- # Split by mid-sentence punctuation if sentence is too long
58
- if len(sentence.split()) > words_per_line * 2:
59
- sub_chunks = re.split(f'({mid_sentence})', sentence)
60
- for i in range(0, len(sub_chunks)-1, 2):
61
- if i+1 < len(sub_chunks):
62
- sentences.append(sub_chunks[i] + sub_chunks[i+1])
63
- else:
64
- sentences.append(sub_chunks[i])
65
- else:
66
- sentences.append(sentence)
67
-
68
- # Process sentences into lines and segments
69
- segments = []
70
- current_segment = []
71
- current_line = []
72
-
73
- for sentence in sentences:
74
- words = sentence.strip().split()
75
 
76
- while words:
77
- # Determine natural break point
78
- break_point = min(words_per_line, len(words))
79
-
80
- # Look for natural breaks
81
- for i in range(break_point-1, 0, -1):
82
- if any(words[i-1].endswith(p) for p in '.!?,;:') or \
83
- any(words[i].startswith(p) for p in '([{'):
84
- break_point = i
85
- break
86
-
87
- current_line = words[:break_point]
88
- words = words[break_point:]
89
 
90
- current_segment.append(' '.join(current_line))
 
 
 
 
 
 
91
 
92
- if len(current_segment) >= lines_per_segment:
93
- segments.append('\n'.join(current_segment))
94
- current_segment = []
95
-
96
- # Handle remaining content
97
- if current_segment:
98
- segments.append('\n'.join(current_segment))
99
-
100
- return segments
 
 
 
 
 
 
 
 
 
101
 
102
- async def process_segment(segment: str, idx: int, voice: str, rate: str, pitch: str, timing_mgr: TimingManager) -> Tuple[str, AudioSegment]:
103
- """Process a single segment with accurate timing"""
104
- audio_file = f"temp_segment_{idx}_{uuid.uuid4()}.wav"
105
  try:
106
- tts = edge_tts.Communicate(segment, voice, rate=rate, pitch=pitch)
107
  await tts.save(audio_file)
108
 
109
- segment_audio = AudioSegment.from_file(audio_file)
110
- segment_duration = len(segment_audio)
111
-
112
- # Get timing from manager
113
- start_time, end_time = timing_mgr.get_timing(segment_duration)
114
 
115
- # Format SRT entry
116
- srt_content = (
117
- f"{idx}\n"
118
- f"{format_time_ms(start_time)} --> {format_time_ms(end_time)}\n"
119
- f"{segment}\n\n"
120
- )
121
-
122
- return srt_content, segment_audio
123
  finally:
124
  if os.path.exists(audio_file):
125
  os.remove(audio_file)
126
 
127
- async def process_chunk_parallel(chunks: List[str], start_idx: int, voice: str, rate: str, pitch: str, timing_mgr: TimingManager) -> Tuple[str, AudioSegment]:
128
- """Process chunks with sequential timing"""
129
- combined_audio = AudioSegment.empty()
130
- srt_content = ""
131
-
132
- # Process segments sequentially to maintain timing
133
- for i, segment in enumerate(chunks, start_idx):
134
- srt_part, audio_part = await process_segment(segment, i, voice, rate, pitch, timing_mgr)
135
- srt_content += srt_part
136
- combined_audio += audio_part
137
-
138
- return srt_content, combined_audio
139
-
140
- async def generate_accurate_srt(text, voice, rate, pitch, words_per_line, lines_per_segment):
141
- segments = smart_text_split(text, words_per_line, lines_per_segment)
142
- timing_mgr = TimingManager()
143
-
144
- # Process in smaller chunks
145
- chunk_size = 5
146
- chunks = [segments[i:i + chunk_size] for i in range(0, len(segments), chunk_size)]
147
-
148
- final_srt = ""
149
  final_audio = AudioSegment.empty()
150
- current_index = 1
151
-
152
- # Process chunks in parallel but maintain sequential timing
153
- chunk_tasks = []
154
- for i, chunk in enumerate(chunks):
155
- start_idx = current_index + (i * chunk_size)
156
- task = process_chunk_parallel(chunk, start_idx, voice, rate, pitch, timing_mgr)
157
- chunk_tasks.append(task)
158
-
159
- # Gather results in order
160
- chunk_results = await asyncio.gather(*chunk_tasks)
161
 
162
- # Combine results
163
- for srt_content, audio_content in chunk_results:
164
- final_srt += srt_content
165
- final_audio += audio_content
 
 
 
 
 
 
 
 
 
 
 
 
 
166
 
167
- # Export final files
168
  unique_id = uuid.uuid4()
169
  audio_path = f"final_audio_{unique_id}.mp3"
170
  srt_path = f"final_subtitles_{unique_id}.srt"
171
 
172
  final_audio.export(audio_path, format="mp3", bitrate="320k")
173
  with open(srt_path, "w", encoding='utf-8') as f:
174
- f.write(final_srt)
175
 
176
  return srt_path, audio_path
177
 
178
  async def process_text(text, pitch, rate, voice, words_per_line, lines_per_segment):
179
- # Set default pitch and rate strings that work well
180
- pitch_str = "+0Hz" # neutral pitch
181
- rate_str = "+0%" # neutral rate
182
-
183
- # Only modify if user has changed values
184
- if pitch != 0:
185
- pitch_str = f"{pitch:+d}Hz"
186
- if rate != 0:
187
- rate_str = f"{rate:+d}%"
188
 
189
  srt_path, audio_path = await generate_accurate_srt(
190
- text,
191
  voice_options[voice],
192
  rate_str,
193
  pitch_str,
 
6
  import uuid
7
  import re
8
  from concurrent.futures import ThreadPoolExecutor
9
+ from typing import List, Tuple, Optional
10
  import math
11
+ from dataclasses import dataclass
12
 
13
  class TimingManager:
14
  def __init__(self):
 
31
  hrs, mins = divmod(mins, 60)
32
  return f"{hrs:02}:{mins:02}:{secs:02},{ms:03}"
33
 
34
+ @dataclass
35
+ class Segment:
36
+ id: int
37
+ text: str
38
+ start_time: int = 0
39
+ end_time: int = 0
40
+ duration: int = 0
41
+ audio: Optional[AudioSegment] = None
42
+
43
+ class TextProcessor:
44
+ def __init__(self, words_per_line: int, lines_per_segment: int):
45
+ self.words_per_line = words_per_line
46
+ self.lines_per_segment = lines_per_segment
47
+ self.break_patterns = {
48
+ 'strong': r'[.!?]+',
49
+ 'medium': r'[,;:]',
50
+ 'weak': r'[\s]+'
51
+ }
52
+
53
+ def split_into_segments(self, text: str) -> List[Segment]:
54
+ # Clean and normalize text
55
+ text = re.sub(r'\s+', ' ', text.strip())
56
+ text = re.sub(r'([.!?,;:])\s*', r'\1 ', text)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
+ # Split into natural segments
59
+ segments = []
60
+ current_lines = []
61
+ current_words = []
62
+ words = text.split()
63
+
64
+ segment_id = 1
65
+
66
+ for i, word in enumerate(words):
67
+ current_words.append(word)
 
 
 
68
 
69
+ # Check for natural breaks or line length
70
+ is_break = (
71
+ any(word.endswith(p) for p in '.!?') or # Strong break
72
+ (len(current_words) >= self.words_per_line and # Line length
73
+ (any(word.endswith(p) for p in ',;:') or # Medium break
74
+ i == len(words) - 1)) # End of text
75
+ )
76
 
77
+ if is_break or len(current_words) >= self.words_per_line:
78
+ current_lines.append(' '.join(current_words))
79
+ current_words = []
80
+
81
+ if len(current_lines) >= self.lines_per_segment or i == len(words) - 1:
82
+ segment_text = '\n'.join(current_lines)
83
+ segments.append(Segment(id=segment_id, text=segment_text))
84
+ segment_id += 1
85
+ current_lines = []
86
+
87
+ # Handle remaining content
88
+ if current_words:
89
+ current_lines.append(' '.join(current_words))
90
+ if current_lines:
91
+ segment_text = '\n'.join(current_lines)
92
+ segments.append(Segment(id=segment_id, text=segment_text))
93
+
94
+ return segments
95
 
96
+ async def process_segment_with_timing(segment: Segment, voice: str, rate: str, pitch: str) -> Segment:
97
+ """Process a single segment and calculate its timing"""
98
+ audio_file = f"temp_segment_{segment.id}_{uuid.uuid4()}.wav"
99
  try:
100
+ tts = edge_tts.Communicate(segment.text, voice, rate=rate, pitch=pitch)
101
  await tts.save(audio_file)
102
 
103
+ segment.audio = AudioSegment.from_file(audio_file)
104
+ segment.duration = len(segment.audio)
 
 
 
105
 
106
+ return segment
 
 
 
 
 
 
 
107
  finally:
108
  if os.path.exists(audio_file):
109
  os.remove(audio_file)
110
 
111
+ async def generate_accurate_srt(text: str, voice: str, rate: str, pitch: str, words_per_line: int, lines_per_segment: int) -> Tuple[str, str]:
112
+ # Initialize text processor and split text
113
+ processor = TextProcessor(words_per_line, lines_per_segment)
114
+ segments = processor.split_into_segments(text)
115
+
116
+ # Process all segments in parallel
117
+ tasks = [
118
+ process_segment_with_timing(segment, voice, rate, pitch)
119
+ for segment in segments
120
+ ]
121
+ processed_segments = await asyncio.gather(*tasks)
122
+
123
+ # Calculate timing for each segment
124
+ current_time = 0
 
 
 
 
 
 
 
 
125
  final_audio = AudioSegment.empty()
126
+ srt_content = ""
 
 
 
 
 
 
 
 
 
 
127
 
128
+ for segment in processed_segments:
129
+ # Set segment timing
130
+ segment.start_time = current_time
131
+ segment.end_time = current_time + segment.duration
132
+
133
+ # Add to SRT content
134
+ srt_content += (
135
+ f"{segment.id}\n"
136
+ f"{format_time_ms(segment.start_time)} --> {format_time_ms(segment.end_time)}\n"
137
+ f"{segment.text}\n\n"
138
+ )
139
+
140
+ # Add to final audio
141
+ final_audio += segment.audio
142
+
143
+ # Update timing
144
+ current_time = segment.end_time + 100 # 100ms gap between segments
145
 
146
+ # Export files
147
  unique_id = uuid.uuid4()
148
  audio_path = f"final_audio_{unique_id}.mp3"
149
  srt_path = f"final_subtitles_{unique_id}.srt"
150
 
151
  final_audio.export(audio_path, format="mp3", bitrate="320k")
152
  with open(srt_path, "w", encoding='utf-8') as f:
153
+ f.write(srt_content)
154
 
155
  return srt_path, audio_path
156
 
157
  async def process_text(text, pitch, rate, voice, words_per_line, lines_per_segment):
158
+ # Format pitch and rate strings
159
+ pitch_str = f"{pitch:+d}Hz" if pitch != 0 else "+0Hz"
160
+ rate_str = f"{rate:+d}%" if rate != 0 else "+0%"
 
 
 
 
 
 
161
 
162
  srt_path, audio_path = await generate_accurate_srt(
163
+ text,
164
  voice_options[voice],
165
  rate_str,
166
  pitch_str,