Spaces:
Sleeping
Sleeping
import os | |
import io | |
import subprocess | |
import numpy as np | |
from difflib import SequenceMatcher | |
from datetime import datetime | |
from typing import List, Tuple, Dict | |
import asyncio | |
import base64 | |
import string | |
import re | |
import urllib.request | |
import gzip | |
import tempfile | |
# Set cache environment | |
os.environ['HF_HOME'] = '/tmp/hf' | |
os.environ['TORCH_HOME'] = '/tmp/torch' | |
os.environ['TRANSFORMERS_CACHE'] = '/tmp/hf' | |
os.environ['XDG_CACHE_HOME'] = '/tmp/hf' | |
os.environ['MPLCONFIGDIR'] = '/tmp/matplotlib' # Fix matplotlib permission issue | |
os.environ['PULSE_CONFIG_PATH'] = '/tmp/pulse' # Fix PulseAudio errors | |
os.environ['PULSE_RUNTIME_PATH'] = '/tmp/pulse' | |
os.makedirs('/tmp/hf', exist_ok=True) | |
os.makedirs('/tmp/torch', exist_ok=True) | |
os.makedirs('/tmp/matplotlib', exist_ok=True) | |
os.makedirs('/tmp/pulse', exist_ok=True) | |
from fastapi import FastAPI, UploadFile, File, Form | |
from fastapi.middleware.cors import CORSMiddleware | |
import torchaudio | |
import torch | |
from phonemizer import phonemize | |
import whisperx # New: WhisperX for precise alignment | |
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC | |
import edge_tts | |
def log(msg): | |
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") | |
app = FastAPI() | |
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]) | |
def normalize_phoneme_string(s: str) -> str: | |
"""Normalize phoneme string for comparison - remove spaces and extra chars""" | |
if not s: | |
return s | |
# Convert to lowercase and remove spaces, stress marks, and length markers | |
normalized = s.lower().strip() | |
normalized = normalized.replace(' ', '') # Remove spaces between phonemes | |
normalized = normalized.replace('ː', '') # Remove length markers | |
normalized = normalized.replace('ˈ', '') # Remove primary stress | |
normalized = normalized.replace('ˌ', '') # Remove secondary stress | |
normalized = normalized.replace('.', '') # Remove syllable boundaries | |
# CRITICAL: Explode affricate ligatures into their component parts | |
# These single-character affricates need to be expanded to match decomposed forms | |
affricate_ligatures = { | |
'ʧ': 'tʃ', # Voiceless postalveolar affricate (chip) | |
'ʤ': 'dʒ', # Voiced postalveolar affricate (jump) | |
'ʦ': 'ts', # Voiceless alveolar affricate (German Zeit) | |
'ʣ': 'dz', # Voiced alveolar affricate (Italian mezzo) | |
'ʨ': 'tɕ', # Voiceless alveolo-palatal affricate (Polish ć) | |
'ʥ': 'dʑ', # Voiced alveolo-palatal affricate (Polish dź) | |
'ƛ': 'tɬ', # Voiceless alveolar lateral affricate (Nahuatl tl) | |
'ꜩ': 'tɕ', # Variant for voiceless alveolo-palatal affricate | |
} | |
for ligature, expanded in affricate_ligatures.items(): | |
normalized = normalized.replace(ligature, expanded) | |
# CRITICAL: Normalize ASCII symbols to proper IPA equivalents | |
# Convert all wav2vec2 ASCII characters to standard IPA | |
ascii_to_ipa = { | |
'g': 'ɡ', # ASCII g → IPA script g (voiced velar stop) | |
'b': 'b', # ASCII b → IPA b (already correct, but explicit) | |
'd': 'd', # ASCII d → IPA d (already correct, but explicit) | |
'f': 'f', # ASCII f → IPA f (already correct, but explicit) | |
'h': 'h', # ASCII h → IPA h (already correct, but explicit) | |
'i': 'i', # ASCII i → IPA i (already correct, but explicit) | |
# Note: Most ASCII phonetic chars are already valid IPA, except 'g' | |
} | |
# Normalize variant IPA symbols to consistent forms | |
# Handle different representations of the same sounds | |
ipa_variants = { | |
'ɜ': 'ɝ', # Open-mid central → r-colored (American English "er") | |
'ɚ': 'ɝ', # R-colored schwa → r-colored vowel (both "er" sounds) | |
'ʌ': 'ə', # Open-mid back → schwa (both unstressed "uh" sounds) | |
'ð': 'θ', # Voiced th → voiceless th (accent training - treat as equivalent) | |
'ɹ': 'r', # Retroflex approximant → regular r (espeak vs CMUdict difference) | |
} | |
for ascii_char, ipa_char in ascii_to_ipa.items(): | |
normalized = normalized.replace(ascii_char, ipa_char) | |
for variant_char, standard_char in ipa_variants.items(): | |
normalized = normalized.replace(variant_char, standard_char) | |
return normalized | |
# Load models once at startup | |
phoneme_processor = Wav2Vec2Processor.from_pretrained("vitouphy/wav2vec2-xls-r-300m-timit-phoneme") | |
phoneme_model = Wav2Vec2ForCTC.from_pretrained("vitouphy/wav2vec2-xls-r-300m-timit-phoneme") | |
# Model inspection complete - wav2vec2 uses ASCII 'g' (token 15), not IPA 'ɡ' | |
log("✅ Phoneme models loaded - using ASCII/IPA normalization") | |
# WhisperX models - loaded lazily | |
whisperx_model = None | |
whisperx_align_model = None | |
whisperx_metadata = None | |
# Simple caches | |
phoneme_cache = {} | |
tts_cache = {} | |
# TTS configuration | |
TTS_VOICE = "en-US-AriaNeural" | |
# Phoneme to English letter sounds mapping | |
PHONEME_TO_ENGLISH = { | |
# Vowels (monophthongs) | |
'ɪ': 'IH', # "bit" | |
'ɛ': 'EH', # "bed" | |
'æ': 'AE', # "cat" | |
'ʌ': 'UH', # "but" | |
'ɑ': 'AH', # "father" | |
'ɔ': 'AW', # "law" | |
'ʊ': 'UU', # "book" | |
'u': 'OO', # "boot" | |
'i': 'EE', # "beat" | |
'ə': 'UH', # "about" (schwa) | |
'ɝ': 'ER', # "bird" | |
'ɚ': 'ER', # "letter" | |
# Diphthongs | |
'eɪ': 'AY', # "day" | |
'aɪ': 'EYE', # "my" | |
'ɔɪ': 'OY', # "boy" | |
'aʊ': 'OW', # "now" | |
'oʊ': 'OH', # "go" | |
# R-colored vowels | |
'ɪr': 'EER', # "near" | |
'ɛr': 'AIR', # "care" | |
'ɑr': 'AR', # "car" | |
'ɔr': 'OR', # "for" | |
'ʊr': 'OOR', # "tour" | |
'ər': 'ER', # "letter" | |
'ɚ': 'ER', # alternate schwa-r | |
# Consonants | |
'p': 'P', # "pat" | |
'b': 'B', # "bat" | |
't': 'T', # "tap" | |
'd': 'D', # "dap" | |
'k': 'K', # "cat" | |
'g': 'G', # "gap" (wav2vec2 uses ASCII g) | |
'ɡ': 'G', # "gap" (IPA script g - normalize to same) | |
'f': 'F', # "fat" | |
'v': 'V', # "vat" | |
'θ': 'TH', # "think" | |
'ð': 'TH', # "this" | |
's': 'S', # "sap" | |
'z': 'Z', # "zap" | |
'ʃ': 'SH', # "ship" | |
'ʒ': 'ZH', # "measure" | |
'h': 'H', # "hat" | |
'm': 'M', # "mat" | |
'n': 'N', # "nat" | |
'ŋ': 'NG', # "sing" | |
'l': 'L', # "lap" | |
'r': 'R', # "rap" | |
'j': 'Y', # "yes" | |
'w': 'W', # "wet" | |
# Affricates | |
'tʃ': 'CH', # "chip" | |
'dʒ': 'J', # "jump" | |
# Common combinations that might appear | |
' ': '-', # space becomes dash | |
'ː': '', # length marker (remove) | |
'ˈ': '', # primary stress (remove) | |
'ˌ': '', # secondary stress (remove) | |
} | |
# Phoneme example words - showing the sound in context | |
PHONEME_EXAMPLES = { | |
# Vowels (monophthongs) | |
'ɪ': 'bit', # IH sound | |
'ɛ': 'bed', # EH sound | |
'æ': 'cat', # AE sound | |
'ʌ': 'but', # UH sound (stressed) | |
'ɑ': 'father', # AH sound | |
'ɔ': 'law', # AW sound | |
'ʊ': 'book', # UU sound | |
'u': 'boot', # OO sound | |
'i': 'beat', # EE sound | |
'ə': 'about', # schwa (unstressed) | |
'ɝ': 'bird', # ER sound (stressed) | |
'ɚ': 'letter', # ER sound (unstressed) | |
# Diphthongs | |
'eɪ': 'day', # AY sound | |
'aɪ': 'my', # EYE sound | |
'ɔɪ': 'boy', # OY sound | |
'aʊ': 'now', # OW sound | |
'oʊ': 'go', # OH sound | |
# R-colored vowels | |
'ɪr': 'near', # EER sound | |
'ɛr': 'care', # AIR sound | |
'ɑr': 'car', # AR sound | |
'ɔr': 'for', # OR sound | |
'ʊr': 'tour', # OOR sound | |
'ər': 'letter', # ER sound | |
# Consonants | |
'p': 'pat', # P sound | |
'b': 'bat', # B sound | |
't': 'tap', # T sound | |
'd': 'dap', # D sound | |
'k': 'cat', # K sound | |
'g': 'gap', # G sound (ASCII) | |
'ɡ': 'gap', # G sound (IPA) | |
'f': 'fat', # F sound | |
'v': 'vat', # V sound | |
'θ': 'think', # TH sound (voiceless) | |
'ð': 'this', # TH sound (voiced) | |
's': 'sap', # S sound | |
'z': 'zap', # Z sound | |
'ʃ': 'ship', # SH sound | |
'ʒ': 'measure', # ZH sound | |
'h': 'hat', # H sound | |
'm': 'mat', # M sound | |
'n': 'nat', # N sound | |
'ŋ': 'sing', # NG sound | |
'l': 'lap', # L sound | |
'r': 'rap', # R sound | |
'j': 'yes', # Y sound | |
'w': 'wet', # W sound | |
# Affricates | |
'tʃ': 'chip', # CH sound | |
'dʒ': 'jump', # J sound | |
} | |
def clean_word_for_phonemes(word: str) -> str: | |
""" | |
Clean word by removing punctuation and extra spaces for phoneme processing. | |
Keeps only alphabetical characters. | |
""" | |
# Remove punctuation and extra whitespace | |
cleaned = word.strip().translate(str.maketrans('', '', string.punctuation)) | |
cleaned = ''.join(cleaned.split()) # Remove all whitespace | |
log(f"Word cleaning: '{word}' → '{cleaned}'") | |
return cleaned | |
def convert_digits_to_words(text: str) -> str: | |
"""Convert digits to word form for better phoneme analysis""" | |
# Dictionary for number conversion | |
number_words = { | |
'0': 'zero', '1': 'one', '2': 'two', '3': 'three', '4': 'four', | |
'5': 'five', '6': 'six', '7': 'seven', '8': 'eight', '9': 'nine', | |
'10': 'ten', '11': 'eleven', '12': 'twelve', '13': 'thirteen', '14': 'fourteen', | |
'15': 'fifteen', '16': 'sixteen', '17': 'seventeen', '18': 'eighteen', '19': 'nineteen', | |
'20': 'twenty', '30': 'thirty', '40': 'forty', '50': 'fifty', | |
'60': 'sixty', '70': 'seventy', '80': 'eighty', '90': 'ninety', | |
'100': 'one hundred', '1000': 'one thousand' | |
} | |
def convert_number(match): | |
num_str = match.group() | |
num = int(num_str) | |
# Direct lookup for common numbers | |
if num_str in number_words: | |
return number_words[num_str] | |
# Handle numbers 21-99 | |
if 21 <= num <= 99: | |
tens = (num // 10) * 10 | |
ones = num % 10 | |
if ones == 0: | |
return number_words[str(tens)] | |
else: | |
return number_words[str(tens)] + " " + number_words[str(ones)] | |
# Handle numbers 101-999 (basic implementation) | |
if 101 <= num <= 999: | |
hundreds = num // 100 | |
remainder = num % 100 | |
result = number_words[str(hundreds)] + " hundred" | |
if remainder > 0: | |
if remainder < 21: | |
result += " " + number_words[str(remainder)] | |
else: | |
tens = (remainder // 10) * 10 | |
ones = remainder % 10 | |
result += " " + number_words[str(tens)] | |
if ones > 0: | |
result += " " + number_words[str(ones)] | |
return result | |
# For larger numbers or edge cases, return original | |
return num_str | |
# Replace standalone digits/numbers with word equivalents | |
converted = re.sub(r'\b\d+\b', convert_number, text) | |
log(f"Number conversion: '{text}' → '{converted}'") | |
return converted | |
def load_whisperx_models(): | |
"""Load WhisperX models lazily with English-only configuration""" | |
global whisperx_model, whisperx_align_model, whisperx_metadata | |
if whisperx_model is None: | |
log("Loading WhisperX models for English-only processing...") | |
# First, try to set environment variable to disable executable stack | |
os.environ['LD_BIND_NOW'] = '1' | |
try: | |
# Try loading with base.en first | |
whisperx_model = whisperx.load_model("base.en", device="cpu", compute_type="float32", language="en") | |
log("WhisperX base.en model loaded successfully") | |
# Load alignment model for English | |
whisperx_align_model, whisperx_metadata = whisperx.load_align_model(language_code="en", device="cpu") | |
log("WhisperX English alignment model loaded successfully") | |
except ImportError as ie: | |
log(f"Import error loading WhisperX models: {ie}") | |
# Try to use regular Whisper as fallback | |
try: | |
log("Attempting to use standard Whisper instead of WhisperX...") | |
import whisper | |
# Load standard whisper model | |
whisper_model = whisper.load_model("base.en", device="cpu") | |
# Create a wrapper to make it compatible with WhisperX interface | |
class WhisperWrapper: | |
def __init__(self, model): | |
self.model = model | |
def transcribe(self, audio, batch_size=16, language="en"): | |
result = self.model.transcribe(audio, language=language) | |
# Convert to WhisperX format | |
return { | |
"segments": [{ | |
"text": result["text"], | |
"start": 0.0, | |
"end": len(audio) / 16000.0, # Approximate based on sample rate | |
"words": [] # Will need to handle word-level timing differently | |
}], | |
"language": language | |
} | |
whisperx_model = WhisperWrapper(whisper_model) | |
log("Using standard Whisper as fallback (limited word-level timing)") | |
# For alignment, we'll need to handle this differently | |
whisperx_align_model = None | |
whisperx_metadata = None | |
except Exception as whisper_error: | |
log(f"Standard Whisper fallback failed: {whisper_error}") | |
# Last resort: Create a minimal mock that at least returns something | |
class MinimalWhisperMock: | |
def transcribe(self, audio, batch_size=16, language="en"): | |
# Return a minimal valid structure | |
return { | |
"segments": [{ | |
"text": "[Audio processing unavailable - WhisperX loading failed]", | |
"start": 0.0, | |
"end": 1.0, | |
"words": [] | |
}], | |
"language": language | |
} | |
whisperx_model = MinimalWhisperMock() | |
whisperx_align_model = None | |
whisperx_metadata = None | |
log("WARNING: Using minimal mock - transcription will be limited") | |
except Exception as e: | |
log(f"Error loading WhisperX models: {e}") | |
raise RuntimeError(f"Unable to load speech recognition models: {e}") | |
def convert_webm_to_wav(bts): | |
p = subprocess.run(["ffmpeg", "-i", "pipe:0", "-f", "wav", "-ar", "16000", "-ac", "1", "pipe:1"], | |
input=bts, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
if p.returncode != 0: | |
raise RuntimeError(p.stderr.decode()) | |
return io.BytesIO(p.stdout) | |
def calculate_similarity(detected: str, expected: str) -> float: | |
"""Calculate similarity between detected and expected phonemes""" | |
detected_norm = normalize_phoneme_string(detected) | |
expected_norm = normalize_phoneme_string(expected) | |
return SequenceMatcher(None, detected_norm, expected_norm).ratio() | |
def detect_word_boundary_overlap(audio_segment: torch.Tensor, sample_rate: int, word: str) -> float: | |
""" | |
Analyze first 1/3 of audio segment for: [noise] → [silence] → [noise] pattern | |
Returns: offset in seconds to skip initial noise, or 0.0 if no pattern found | |
""" | |
if audio_segment.shape[-1] == 0: | |
return 0.0 | |
# Analyze only first 1/3 of segment | |
first_third_samples = audio_segment.shape[-1] // 3 | |
if first_third_samples < sample_rate * 0.1: # Less than 100ms total | |
return 0.0 | |
first_third = audio_segment[:, :first_third_samples] | |
# WORKAROUND: Audio segment appears to be reversed for unknown reason | |
# This flip corrects the chronological order for proper boundary detection | |
first_third = torch.flip(first_third, [-1]) | |
# Calculate energy in small windows (50ms chunks) | |
window_size = int(0.05 * sample_rate) # 50ms windows | |
if window_size <= 0: | |
return 0.0 | |
energy_levels = [] | |
for i in range(0, first_third_samples - window_size, window_size): | |
window = first_third[:, i:i + window_size] | |
energy = torch.mean(window ** 2).item() # RMS energy | |
energy_levels.append(energy) | |
if len(energy_levels) < 3: | |
return 0.0 | |
# Look for pattern: [high energy] → [low energy] → [high energy] | |
silence_threshold = np.percentile(energy_levels, 20) # Bottom 20% | |
noise_threshold = silence_threshold * 3 | |
# Find sustained silence (2+ consecutive low-energy windows) | |
for i in range(len(energy_levels) - 1): | |
if (energy_levels[i] < silence_threshold and | |
energy_levels[i + 1] < silence_threshold): | |
# Check if there was noise before silence | |
noise_before = any(e > noise_threshold for e in energy_levels[:i]) | |
# Check if there's noise after silence | |
noise_after = any(e > noise_threshold for e in energy_levels[i + 2:]) | |
if noise_before and noise_after: | |
# Found the pattern! Return offset to end of silence | |
silence_end_sample = (i + 2) * window_size | |
offset_seconds = silence_end_sample / sample_rate | |
log(f"🔧 Word '{word}': detected boundary overlap, trimming {offset_seconds:.3f}s from start") | |
return offset_seconds | |
return 0.0 # No pattern detected | |
def extract_audio_segment(waveform: torch.Tensor, sample_rate: int, | |
start_time: float, end_time: float, word: str, | |
verbose: bool = True) -> torch.Tensor: | |
"""Extract audio segment for a specific word""" | |
# Convert to samples | |
start_sample = int(start_time * sample_rate) | |
end_sample = int(end_time * sample_rate) | |
end_sample = min(waveform.shape[-1], end_sample) | |
if end_sample <= start_sample: | |
if verbose: | |
log(f"Invalid segment for '{word}': {start_time:.3f}s-{end_time:.3f}s") | |
return torch.zeros((1, 1600)) # Return 100ms of silence | |
segment = waveform[:, start_sample:end_sample] | |
if verbose: | |
log(f"Extracted '{word}': {start_time:.3f}s-{end_time:.3f}s ({segment.shape[-1]} samples)") | |
return segment | |
def detect_phoneme_from_audio(audio_segment: torch.Tensor, sample_rate: int, word: str) -> str: | |
"""Detect phoneme from audio segment using phoneme model""" | |
log(f"🔍 Starting phoneme detection for '{word}'...") | |
if audio_segment.shape[-1] == 0: | |
log(f"⚠️ Empty audio segment for '{word}'") | |
return "" | |
log(f"🔊 Original audio segment: {audio_segment.shape[-1]} samples") | |
# Pad or truncate to standard length for model | |
target_length = 16000 # 1 second | |
if audio_segment.shape[-1] < target_length: | |
log(f"🔧 Padding audio from {audio_segment.shape[-1]} to {target_length} samples") | |
audio_segment = torch.nn.functional.pad(audio_segment, (0, target_length - audio_segment.shape[-1])) | |
elif audio_segment.shape[-1] > target_length: | |
# Don't truncate long segments - keep full audio for complex words | |
log(f"⚠️ Audio longer than target ({audio_segment.shape[-1]} > {target_length}), keeping full length") | |
log(f" This preserves all phonemes for long words like 'sophisticated'") | |
else: | |
log(f"✅ Audio segment already correct length: {target_length} samples") | |
log(f"🎛️ Processing through phoneme processor...") | |
start_time = datetime.now() | |
# Process through phoneme model | |
try: | |
input_values = phoneme_processor(audio_segment.squeeze(), sampling_rate=sample_rate, return_tensors="pt").input_values | |
processor_time = (datetime.now() - start_time).total_seconds() | |
log(f"⏱️ Phoneme processor took: {processor_time:.3f}s") | |
log(f"🧠 Running through phoneme model...") | |
model_start_time = datetime.now() | |
with torch.no_grad(): | |
logits = phoneme_model(input_values).logits | |
predicted_ids = torch.argmax(logits, dim=-1) | |
detected_phoneme = phoneme_processor.decode(predicted_ids[0]) | |
model_time = (datetime.now() - model_start_time).total_seconds() | |
log(f"⏱️ Phoneme model inference took: {model_time:.3f}s") | |
total_time = (datetime.now() - start_time).total_seconds() | |
log(f"⏱️ Total phoneme detection time: {total_time:.3f}s") | |
except Exception as e: | |
log(f"❌ Error in phoneme detection: {e}") | |
return "" | |
log(f"🎯 Phoneme detection for '{word}': '{detected_phoneme}'") | |
return detected_phoneme | |
def sliding_window_phoneme_match(detected_phoneme: str, expected_phoneme: str, word: str) -> Tuple[str, float, int, int]: | |
""" | |
Find the best matching substring in detected phoneme using sliding window. | |
For zero scores, intelligently selects which phoneme substring to return. | |
Returns: (best_match_substring, best_score, start_index, end_index) | |
""" | |
detected_norm = normalize_phoneme_string(detected_phoneme) | |
expected_norm = normalize_phoneme_string(expected_phoneme) | |
log(f"🔍 Sliding window analysis for '{word}':") | |
log(f" Expected (norm): '{expected_norm}' (length: {len(expected_norm)})") | |
log(f" Detected (norm): '{detected_norm}' (length: {len(detected_norm)})") | |
# If detected is shorter than expected, just compare directly | |
if len(detected_norm) < len(expected_norm): | |
score = calculate_similarity(detected_norm, expected_norm) | |
log(f" Direct comparison (detected < expected): score = {score:.3f}") | |
return detected_norm, score, 0, len(detected_norm) | |
# Sliding window: detected is longer than expected | |
expected_len = len(expected_norm) | |
best_score = 0 | |
best_match = "" | |
best_start = 0 | |
best_end = expected_len | |
log(f" Sliding window search (window size: {expected_len}):") | |
# Slide through all possible positions | |
for i in range(len(detected_norm) - expected_len + 1): | |
substring = detected_norm[i:i + expected_len] | |
score = calculate_similarity(substring, expected_norm) | |
log(f" Position {i}: '{substring}' vs '{expected_norm}' = {score:.3f}") | |
if score > best_score: # Changed back from >= to > to prefer earlier matches | |
best_score = score | |
best_match = substring | |
best_start = i | |
best_end = i + expected_len | |
log(f" ✅ New best match!") | |
# Exit early on perfect match | |
if score >= 1.0: | |
log(f" 🎯 Perfect match found, stopping search") | |
break | |
# Handle zero score case - aim for middle substring when possible | |
if best_score == 0: | |
log(f" ⚠️ Zero score detected, selecting middle substring for audio alignment") | |
total_detected_len = len(detected_norm) | |
if total_detected_len == expected_len: | |
# Same length - use the whole string | |
best_start = 0 | |
best_end = expected_len | |
best_match = detected_norm | |
log(f" 🔍 Same length: using full string") | |
else: | |
# Longer detected - aim for middle | |
middle_start = max(0, (total_detected_len - expected_len) // 2) | |
best_start = middle_start | |
best_end = middle_start + expected_len | |
best_match = detected_norm[best_start:best_end] | |
log(f" 🔍 Aiming for middle: position {best_start}-{best_end}") | |
log(f" 🏆 Final selection: '{best_match}' at position {best_start}-{best_end} (score: {best_score:.3f})") | |
return best_match, best_score, best_start, best_end | |
def create_word_phoneme_mapping_v2(word: str, expected_phoneme: str) -> Dict[int, str]: | |
""" | |
Create mapping from phoneme positions to original word letters. | |
Simplified version that handles common cases more reliably. | |
Args: | |
word: The original word (already cleaned, no punctuation) | |
expected_phoneme: The expected phoneme string | |
Returns: | |
Dictionary mapping phoneme index to word letter(s) | |
""" | |
word_lower = word.lower() | |
phoneme_norm = normalize_phoneme_string(expected_phoneme) | |
log(f"🗺️ Creating mapping for '{word}' → '{phoneme_norm}'") | |
log(f" Word length: {len(word_lower)}, Phoneme length: {len(phoneme_norm)}") | |
if not phoneme_norm: | |
return {} | |
# Simple cases first | |
if len(word_lower) == len(phoneme_norm): | |
# Direct 1:1 mapping | |
mapping = {i: word[i] for i in range(len(phoneme_norm))} # Preserve original case | |
log(f" Direct mapping (equal lengths): {mapping}") | |
return mapping | |
# For length mismatches, use proportional distribution | |
mapping = {} | |
if len(phoneme_norm) > len(word_lower): | |
# More phonemes than letters (diphthongs, etc.) | |
# Distribute letters across phonemes without duplication | |
phonemes_per_letter = len(phoneme_norm) / len(word_lower) | |
for phoneme_idx in range(len(phoneme_norm)): | |
# Find which letter this phoneme belongs to | |
letter_idx = min(int(phoneme_idx / phonemes_per_letter), len(word_lower) - 1) | |
# Only assign each letter once (to its first phoneme) | |
start_phoneme_for_letter = int(letter_idx * phonemes_per_letter) | |
if phoneme_idx == start_phoneme_for_letter: | |
mapping[phoneme_idx] = word[letter_idx] # Preserve case | |
else: | |
mapping[phoneme_idx] = '' # Empty for additional phonemes | |
else: | |
# More letters than phonemes (silent letters) | |
# Distribute letters across available phonemes | |
letters_per_phoneme = len(word_lower) / len(phoneme_norm) | |
for phoneme_idx in range(len(phoneme_norm)): | |
# Calculate range of letters for this phoneme | |
start_letter = int(phoneme_idx * letters_per_phoneme) | |
end_letter = int((phoneme_idx + 1) * letters_per_phoneme) | |
# Collect all letters for this phoneme | |
letter_group = word[start_letter:end_letter] | |
mapping[phoneme_idx] = letter_group | |
log(f" Final mapping: {mapping}") | |
return mapping | |
def create_character_level_feedback_v2(word: str, expected_norm: str, | |
detected_norm: str, | |
mapping: Dict[int, str]) -> str: | |
""" | |
Create character-level feedback with simplified logic. | |
Args: | |
word: Original word (for display purposes) | |
expected_norm: Normalized expected phonemes | |
detected_norm: Normalized detected phonemes | |
mapping: Phoneme position to letter mapping | |
Returns: | |
HTML string with properly formatted feedback | |
""" | |
result = [] | |
log(f"📝 Character feedback for '{word}':") | |
log(f" Expected: '{expected_norm}' (len={len(expected_norm)})") | |
log(f" Detected: '{detected_norm}' (len={len(detected_norm)})") | |
# Ensure both strings are same length for comparison | |
max_len = max(len(expected_norm), len(detected_norm)) | |
expected_padded = expected_norm.ljust(max_len) | |
detected_padded = detected_norm.ljust(max_len) | |
# Track which word positions have been used | |
used_positions = set() | |
for i in range(min(len(expected_norm), max_len)): | |
expected_char = expected_padded[i] if i < len(expected_padded) else ' ' | |
detected_char = detected_padded[i] if i < len(detected_padded) else ' ' | |
# Get the word letter(s) for this phoneme position | |
word_letters = mapping.get(i, '') | |
# Skip empty mappings (extra phonemes in diphthongs) | |
if not word_letters: | |
continue | |
# Check if we've already used these letters | |
letter_key = (word_letters, i) | |
if letter_key in used_positions: | |
continue | |
used_positions.add(letter_key) | |
if expected_char == detected_char: | |
# Correct pronunciation - show original letters | |
result.append(word_letters) | |
else: | |
# Incorrect - create error span with tooltip | |
expected_english = PHONEME_TO_ENGLISH.get(expected_char, expected_char) | |
expected_example = PHONEME_EXAMPLES.get(expected_char, '') | |
detected_english = PHONEME_TO_ENGLISH.get(detected_char, 'silence' if detected_char == ' ' else detected_char) | |
detected_example = PHONEME_EXAMPLES.get(detected_char, '') | |
# Build tooltip text | |
if expected_example and detected_example: | |
tooltip = f"Expected '{expected_english}' as in '{expected_example}'<br>You said '{detected_english}' as in '{detected_example}'" | |
elif expected_example: | |
tooltip = f"Expected '{expected_english}' as in '{expected_example}'<br>You said '{detected_english}'" | |
else: | |
tooltip = f"Expected '{expected_english}'<br>You said '{detected_english}'" | |
# Create error span | |
error_html = f'<span class="phoneme-error" data-tooltip-html="{tooltip}"><strong><u>{word_letters}</u></strong></span>' | |
result.append(error_html) | |
feedback = ''.join(result) | |
log(f" Final feedback: {feedback}") | |
return feedback | |
def format_output_word_v2(word_original: str, word_clean: str, | |
similarity_score: float, detected_phoneme: str, | |
expected_phoneme: str, similarity_threshold: float) -> Tuple[str, str]: | |
""" | |
Format word output with cleaner logic. | |
Args: | |
word_original: Original word with punctuation (for display) | |
word_clean: Cleaned word (for phoneme processing) | |
similarity_score: Similarity score between detected and expected | |
detected_phoneme: Detected phoneme string | |
expected_phoneme: Expected phoneme string | |
similarity_threshold: User's threshold for acceptable pronunciation | |
Returns: | |
Tuple of (display_text, colored_html) | |
""" | |
# Determine color based on score | |
if similarity_score < similarity_threshold: | |
color = "red" | |
needs_feedback = True | |
elif similarity_score >= similarity_threshold + (1.0 - similarity_threshold) * 0.3: | |
color = "green" | |
needs_feedback = False | |
else: | |
color = "orange" | |
needs_feedback = False | |
score_percentage = int(similarity_score * 100) | |
if needs_feedback: | |
# Poor pronunciation - show character-level feedback | |
# Create phoneme mapping using cleaned word | |
mapping = create_word_phoneme_mapping_v2(word_clean, expected_phoneme) | |
# Normalize phonemes for comparison | |
expected_norm = normalize_phoneme_string(expected_phoneme) | |
detected_norm = normalize_phoneme_string(detected_phoneme) | |
# Generate character-level feedback | |
feedback_html = create_character_level_feedback_v2( | |
word_clean, expected_norm, detected_norm, mapping | |
) | |
# Preserve original punctuation if present | |
if word_original != word_clean: | |
# Find trailing punctuation | |
punct = '' | |
for i in range(len(word_original) - 1, -1, -1): | |
if word_original[i] in string.punctuation: | |
punct = word_original[i] + punct | |
else: | |
break | |
# Find leading punctuation | |
lead_punct = '' | |
for char in word_original: | |
if char in string.punctuation: | |
lead_punct += char | |
else: | |
break | |
display_text = lead_punct + feedback_html + punct | |
else: | |
display_text = feedback_html | |
# For tooltip, use the cleaned word | |
tooltip_text = word_clean | |
else: | |
# Good pronunciation - show original word | |
display_text = word_original | |
tooltip_text = word_original | |
# Create final colored HTML with embedded data | |
colored_html = f'<span style="color:{color}" data-score="{score_percentage}" data-word="{word_original}" data-tooltip="{tooltip_text}">{display_text}</span>' | |
return display_text, colored_html | |
def trim_audio_segment_by_phoneme_position(audio_segment: torch.Tensor, | |
detected_phoneme_full: str, | |
best_start: int, best_end: int, | |
word: str) -> torch.Tensor: | |
""" | |
Trim audio segment based on the position of best matching phoneme substring. | |
Uses 85% of calculated trim percentages to be less aggressive. | |
Ensures final segment is never shorter than 0.1 seconds. | |
Returns the original segment if no trimming is needed. | |
""" | |
detected_norm = normalize_phoneme_string(detected_phoneme_full) | |
total_phoneme_len = len(detected_norm) | |
if total_phoneme_len == 0 or (best_start == 0 and best_end == total_phoneme_len): | |
log(f"🎵 No audio trimming needed for '{word}' (using original segment)") | |
return None # Signal to use original WhisperX timing instead of expanded | |
# Calculate initial trim percentages | |
start_trim_pct = best_start / total_phoneme_len | |
end_trim_pct = (total_phoneme_len - best_end) / total_phoneme_len | |
# Apply 85% factor to be less aggressive | |
start_trim_pct_adjusted = start_trim_pct * 0.85 | |
end_trim_pct_adjusted = end_trim_pct * 0.85 | |
# Calculate samples and duration | |
total_samples = audio_segment.shape[-1] | |
sample_rate = 16000 # Known sample rate | |
original_duration = total_samples / sample_rate | |
# Calculate initial trim amounts | |
start_trim_samples = int(total_samples * start_trim_pct_adjusted) | |
end_trim_samples = int(total_samples * end_trim_pct_adjusted) | |
# Calculate resulting duration | |
trimmed_samples = total_samples - start_trim_samples - end_trim_samples | |
trimmed_duration = trimmed_samples / sample_rate | |
log(f"🎵 Audio trimming for '{word}':") | |
log(f" Original duration: {original_duration:.3f}s ({total_samples} samples)") | |
log(f" Phoneme position: {best_start}-{best_end-1} of {total_phoneme_len} chars") | |
log(f" Initial trim: start={start_trim_pct_adjusted:.1%} ({start_trim_samples} samples), end={end_trim_pct_adjusted:.1%} ({end_trim_samples} samples)") | |
log(f" Resulting duration: {trimmed_duration:.3f}s") | |
# MINIMUM DURATION CHECK: Ensure result is at least 0.1 seconds | |
min_duration = 0.1 | |
min_samples = int(min_duration * sample_rate) | |
if trimmed_samples < min_samples: | |
log(f" ⚠️ Trimmed duration ({trimmed_duration:.3f}s) below minimum ({min_duration}s)") | |
# Calculate how much we need to preserve | |
samples_to_preserve = min_samples | |
total_trim_needed = total_samples - samples_to_preserve | |
if total_trim_needed <= 0: | |
log(f" ⚠️ Original segment already at minimum length, no trimming") | |
return None # Use original WhisperX timing | |
# Redistribute the trimming proportionally while respecting minimum duration | |
original_total_trim = start_trim_samples + end_trim_samples | |
if original_total_trim > 0: | |
# Scale down both trims proportionally | |
scale_factor = total_trim_needed / original_total_trim | |
start_trim_samples = int(start_trim_samples * scale_factor) | |
end_trim_samples = int(end_trim_samples * scale_factor) | |
# Ensure we don't exceed total available trim | |
if start_trim_samples + end_trim_samples > total_trim_needed: | |
excess = (start_trim_samples + end_trim_samples) - total_trim_needed | |
# Remove excess from the larger trim | |
if start_trim_samples > end_trim_samples: | |
start_trim_samples -= excess | |
else: | |
end_trim_samples -= excess | |
log(f" 🔧 Adjusted trim: start={start_trim_samples} samples, end={end_trim_samples} samples") | |
log(f" 🔧 Scale factor applied: {scale_factor:.3f}") | |
else: | |
# Shouldn't happen, but safety check | |
start_trim_samples = 0 | |
end_trim_samples = 0 | |
# Apply final trimming | |
trimmed_start = start_trim_samples | |
trimmed_end = total_samples - end_trim_samples | |
if trimmed_end <= trimmed_start: | |
log(f" ⚠️ Invalid trim range after adjustment, using original segment") | |
return None # Signal to use original WhisperX timing | |
trimmed_segment = audio_segment[:, trimmed_start:trimmed_end] | |
final_duration = trimmed_segment.shape[-1] / sample_rate | |
log(f" ✅ Final result: {trimmed_segment.shape[-1]} samples ({final_duration:.3f}s)") | |
log(f" ✅ Trimmed: {start_trim_samples} from start, {end_trim_samples} from end") | |
return trimmed_segment | |
def get_expected_phonemes(words: List[str]) -> List[str]: | |
"""Get expected phonemes using espeak phonemizer""" | |
cache_key = tuple(words) | |
if cache_key in phoneme_cache: | |
log(f"📚 Using cached phonemes for: {words}") | |
cached_result = phoneme_cache[cache_key] | |
log(f" Cached phonemes: {list(zip(words, cached_result))}") | |
return cached_result | |
log(f"🔤 Getting expected phonemes using phonemizer for: {words}") | |
try: | |
# Use espeak phonemizer to get IPA phonemes | |
phonemes = phonemize(words, language='en-us', backend='espeak', strip=True) | |
# Cache the results | |
phoneme_cache[cache_key] = phonemes | |
# Log the phoneme results | |
log(f"✅ Phonemizer results:") | |
for word, phoneme in zip(words, phonemes): | |
log(f" '{word}' → '{phoneme}'") | |
return phonemes | |
except Exception as e: | |
log(f"❌ Error in phonemizer: {e}") | |
log(f" Returning empty phonemes for all words") | |
# Return empty strings as fallback | |
empty_results = [""] * len(words) | |
phoneme_cache[cache_key] = empty_results | |
return empty_results | |
async def generate_tts_audio(word: str) -> str: | |
"""Generate TTS audio for a word with silence padding""" | |
if word in tts_cache: | |
return tts_cache[word] | |
try: | |
communicate = edge_tts.Communicate(word, TTS_VOICE) | |
audio_data = b"" | |
async for chunk in communicate.stream(): | |
if chunk["type"] == "audio": | |
audio_data += chunk["data"] | |
if audio_data: | |
# Add silence padding to TTS audio as well | |
# First decode the MP3 to get raw audio | |
import tempfile | |
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as tmp_mp3: | |
tmp_mp3.write(audio_data) | |
tmp_mp3_path = tmp_mp3.name | |
try: | |
# Load the TTS audio | |
tts_waveform, tts_sample_rate = torchaudio.load(tmp_mp3_path) | |
# Resample if needed to match our standard rate | |
if tts_sample_rate != 16000: | |
tts_waveform = torchaudio.transforms.Resample(tts_sample_rate, 16000)(tts_waveform) | |
tts_sample_rate = 16000 | |
# Add 0.25s silence padding on each end | |
padding_samples = int(0.25 * tts_sample_rate) | |
silence_shape = list(tts_waveform.shape) | |
silence_shape[-1] = padding_samples | |
silence_padding = torch.zeros(silence_shape) | |
# Concatenate: silence + audio + silence | |
padded_waveform = torch.cat([silence_padding, tts_waveform, silence_padding], dim=-1) | |
# Convert back to base64 | |
buffer = io.BytesIO() | |
torchaudio.save(buffer, padded_waveform, tts_sample_rate, format="wav") | |
buffer.seek(0) | |
audio_b64 = base64.b64encode(buffer.read()).decode('utf-8') | |
tts_cache[word] = audio_b64 | |
log(f"🔇 TTS for '{word}': Added 0.25s silence padding on each end") | |
return audio_b64 | |
finally: | |
# Clean up temp file | |
if os.path.exists(tmp_mp3_path): | |
os.remove(tmp_mp3_path) | |
except Exception as e: | |
log(f"TTS failed for '{word}': {e}") | |
return "" | |
def audio_to_base64(audio_segment: torch.Tensor, sample_rate: int, add_padding: bool = True) -> str: | |
""" | |
Convert audio tensor to base64 string. | |
Args: | |
audio_segment: The audio tensor to convert | |
sample_rate: Sample rate of the audio | |
add_padding: If True, adds 0.25s of silence on each end to prevent audio processor lag | |
Returns: | |
Base64 encoded audio string | |
""" | |
try: | |
if add_padding: | |
# Add 0.25 seconds of silence on each end | |
padding_samples = int(0.25 * sample_rate) # 0.25 seconds worth of samples | |
# Create silence padding (zeros with same shape as audio segment) | |
silence_shape = list(audio_segment.shape) | |
silence_shape[-1] = padding_samples | |
silence_padding = torch.zeros(silence_shape) | |
# Concatenate: silence + audio + silence | |
padded_segment = torch.cat([silence_padding, audio_segment, silence_padding], dim=-1) | |
log(f"🔇 Added silence padding: {padding_samples} samples (0.25s) on each end") | |
log(f" Original: {audio_segment.shape[-1]} samples → Padded: {padded_segment.shape[-1]} samples") | |
audio_segment = padded_segment | |
buffer = io.BytesIO() | |
torchaudio.save(buffer, audio_segment, sample_rate, format="wav") | |
buffer.seek(0) | |
return base64.b64encode(buffer.read()).decode('utf-8') | |
except Exception as e: | |
log(f"Audio conversion failed: {e}") | |
return "" | |
async def transcribe(audio: UploadFile = File(...), similarity_threshold: float = Form(0.4)): | |
log("=== STARTING WHISPERX ENGLISH-ONLY PHONEME ANALYSIS ===") | |
# Use similarity threshold from frontend (default 0.4) | |
similarity = max(0.0, min(1.0, similarity_threshold)) # Clamp between 0 and 1 | |
log(f"Using similarity threshold: {similarity:.2f}") | |
try: | |
# Load WhisperX models if needed | |
load_whisperx_models() | |
# 1. Convert and load audio | |
data = await audio.read() | |
wav_io = convert_webm_to_wav(data) | |
# Save to temporary file for WhisperX | |
temp_audio_path = "/tmp/temp_audio.wav" | |
with open(temp_audio_path, "wb") as f: | |
f.write(wav_io.getvalue()) | |
# Load audio with WhisperX | |
audio_data = whisperx.load_audio(temp_audio_path) | |
log(f"Audio loaded for WhisperX: {len(audio_data)} samples") | |
# 2. Get transcription with WhisperX - EXPLICITLY SET TO ENGLISH | |
result = whisperx_model.transcribe(audio_data, batch_size=16, language="en") | |
# 3. Get precise word alignments with WhisperX (if alignment model available) | |
if whisperx_align_model is not None: | |
aligned_result = whisperx.align(result["segments"], whisperx_align_model, whisperx_metadata, audio_data, device="cpu") | |
else: | |
log("WARNING: Alignment model not available, using basic word splitting") | |
# Fallback: split text into words with approximate timing | |
aligned_result = {"segments": []} | |
for segment in result["segments"]: | |
text = segment.get("text", "").strip() | |
if not text: | |
continue | |
words = text.split() | |
duration = segment["end"] - segment["start"] | |
time_per_word = duration / len(words) if words else 0 | |
word_list = [] | |
for i, word in enumerate(words): | |
word_start = segment["start"] + (i * time_per_word) | |
word_end = segment["start"] + ((i + 1) * time_per_word) | |
word_list.append({ | |
"word": word, | |
"start": word_start, | |
"end": word_end, | |
"score": 0.9 # Default confidence | |
}) | |
aligned_result["segments"].append({ | |
"text": text, | |
"start": segment["start"], | |
"end": segment["end"], | |
"words": word_list | |
}) | |
# Extract word-level data from WhisperX results | |
words = [] | |
word_texts = [] # Original with punctuation for display | |
word_texts_clean = [] # Cleaned for phoneme processing | |
word_timings = [] | |
for segment in aligned_result["segments"]: | |
if "words" in segment: | |
for word_info in segment["words"]: | |
if "start" in word_info and "end" in word_info and word_info["word"]: | |
original_word = word_info["word"].strip() | |
# Convert digits to words for better phoneme analysis | |
word_converted = convert_digits_to_words(original_word) | |
cleaned_word = clean_word_for_phonemes(word_converted) | |
# Only process words that have alphabetical content after cleaning | |
if cleaned_word: | |
words.append(word_info) | |
word_texts.append(word_converted) # Use converted form for display | |
word_texts_clean.append(cleaned_word) # Clean for processing | |
word_timings.append((word_info["start"], word_info["end"])) | |
if not words: | |
return {"resolved": "", "resolved_colored": "", "audio_data": []} | |
log(f"Found {len(words)} words with precise WhisperX timings") | |
# Log WhisperX timings | |
log("=== WHISPERX PRECISE TIMINGS ===") | |
for i, (word_original, word_clean, (start, end)) in enumerate(zip(word_texts, word_texts_clean, word_timings)): | |
gap = "" | |
if i > 0: | |
prev_end = word_timings[i-1][1] | |
gap_duration = start - prev_end | |
gap = f" | gap: {gap_duration:.3f}s" | |
log(f"Word {i}: '{word_original}' (clean: '{word_clean}') at {start:.3f}s-{end:.3f}s{gap}") | |
# 4. Get expected phonemes using CLEANED words | |
expected_phonemes = get_expected_phonemes(word_texts_clean) | |
# 5. Load audio as tensor for phoneme analysis | |
waveform, sample_rate = torchaudio.load(temp_audio_path) | |
if sample_rate != 16000: | |
waveform = torchaudio.transforms.Resample(sample_rate, 16000)(waveform) | |
sample_rate = 16000 | |
# 6. Process each word using expanded timing with sliding window matching | |
results = [] | |
audio_data_list = [] | |
# Generate TTS for all words concurrently (using CLEANED words) | |
log("Generating TTS audio...") | |
tts_tasks = [generate_tts_audio(word_clean) for word_clean in word_texts_clean] | |
tts_results = await asyncio.gather(*tts_tasks) | |
log("\n=== PROCESSING WORDS WITH EXPANDED TIMING + SLIDING WINDOW ===") | |
for i, (word_info, word_original, word_clean, (start_time, end_time)) in enumerate(zip(words, word_texts, word_texts_clean, word_timings)): | |
expected_phoneme = expected_phonemes[i] if i < len(expected_phonemes) else "" | |
log(f"\n--- Processing word {i}: '{word_original}' (clean: '{word_clean}') ---") | |
log(f"🔊 WhisperX timing: {start_time:.3f}s - {end_time:.3f}s (duration: {end_time - start_time:.3f}s)") | |
log(f"🎯 Expected phoneme: '{expected_phoneme}'") | |
# DEBUGGING: Special attention to problematic words | |
if word_clean.lower() in ['go', 'no', 'so', 'to', 'do'] or len(expected_phoneme) > len(word_clean): | |
log(f"⚠️ SPECIAL CASE: Word '{word_clean}' has {len(word_clean)} letters but {len(expected_phoneme)} phonemes") | |
log(f" This may be a diphthong case requiring special handling") | |
# For very short words, expand the WhisperX timing itself before processing | |
original_duration = end_time - start_time | |
if original_duration < 0.1: | |
log(f"🔍 Ultra-short word detected ({original_duration:.3f}s), expanding WhisperX timing") | |
audio_duration = waveform.shape[-1] / sample_rate | |
# Expand WhisperX boundaries by ±0.05s | |
start_time = max(0, start_time - 0.05) | |
end_time = min(audio_duration, end_time + 0.05) | |
log(f" Expanded WhisperX timing: {start_time:.3f}s - {end_time:.3f}s (new duration: {end_time - start_time:.3f}s)") | |
# Show gaps between words | |
if i > 0: | |
prev_end = word_timings[i-1][1] | |
gap = start_time - prev_end | |
if gap > 0: | |
log(f"⏸️ Gap from previous word: {gap:.3f}s") | |
elif gap < 0: | |
log(f"⚠️ OVERLAP with previous word: {gap:.3f}s") | |
else: | |
log(f"🔗 No gap (continuous)") | |
# Calculate expanded timing (±0.125s with boundary protection) | |
expansion_seconds = 0.125 | |
audio_duration = waveform.shape[-1] / sample_rate | |
expanded_start = max(0, start_time - expansion_seconds) | |
expanded_end = min(audio_duration, end_time + expansion_seconds) | |
log(f"🔍 Timing expansion: {start_time:.3f}s-{end_time:.3f}s → {expanded_start:.3f}s-{expanded_end:.3f}s") | |
# Extract expanded audio segment | |
expanded_audio_segment = extract_audio_segment(waveform, sample_rate, expanded_start, expanded_end, word_clean, verbose=True) | |
# Check for word boundary overlap and trim if needed | |
log(f"🔍 Checking word boundary overlap for '{word_clean}'...") | |
boundary_offset = detect_word_boundary_overlap(expanded_audio_segment, sample_rate, word_clean) | |
if boundary_offset > 0: | |
log(f"🔧 Detected word overlap, trimming {boundary_offset:.3f}s from start") | |
trim_samples = int(boundary_offset * sample_rate) | |
expanded_audio_segment = expanded_audio_segment[:, trim_samples:] | |
# Update expanded_start for accurate timing logs | |
expanded_start += boundary_offset | |
log(f" Updated expanded start: {expanded_start:.3f}s") | |
# ALSO apply the boundary offset to WhisperX timing | |
original_start_time = start_time | |
start_time = max(0, start_time + boundary_offset) | |
end_time = max(start_time + 0.01, end_time) # Ensure minimum 10ms duration | |
log(f" Updated WhisperX timing: {original_start_time:.3f}s → {start_time:.3f}s (shifted +{boundary_offset:.3f}s)") | |
# Also extract WhisperX original timing for comparison (now using updated start_time) | |
whisperx_audio_segment = extract_audio_segment(waveform, sample_rate, start_time, end_time, word_clean, verbose=False) | |
# Detect phoneme from expanded audio segment | |
detected_phoneme_raw = detect_phoneme_from_audio(expanded_audio_segment, sample_rate, word_clean) | |
# Get expected phoneme and normalize both | |
detected_phoneme_norm = normalize_phoneme_string(detected_phoneme_raw) | |
expected_phoneme_norm = normalize_phoneme_string(expected_phoneme) | |
log(f"🔊 Raw detected phoneme (expanded): '{detected_phoneme_raw}'") | |
log(f"🧹 Normalized detected: '{detected_phoneme_norm}'") | |
log(f"🧹 Normalized expected: '{expected_phoneme_norm}'") | |
# Find best matching substring using sliding window | |
best_match_phoneme, similarity_score, match_start, match_end = sliding_window_phoneme_match( | |
detected_phoneme_raw, expected_phoneme, word_clean | |
) | |
log(f"🔊 Final similarity score: {similarity_score:.3f}") | |
# Trim audio segment based on best phoneme match position | |
trimmed_audio_segment = trim_audio_segment_by_phoneme_position( | |
expanded_audio_segment, detected_phoneme_raw, match_start, match_end, word_clean | |
) | |
# Use original WhisperX timing if no trimming was needed, otherwise use trimmed | |
if trimmed_audio_segment is None: | |
final_audio_segment = whisperx_audio_segment | |
log(f"🎵 Using original WhisperX timing (no trimming needed)") | |
log(f" Final segment: WhisperX original ({whisperx_audio_segment.shape[-1]} samples, {whisperx_audio_segment.shape[-1]/sample_rate:.3f}s)") | |
log(f" Segment timing: {start_time:.3f}s - {end_time:.3f}s") | |
else: | |
final_audio_segment = trimmed_audio_segment | |
log(f"🎵 Using trimmed segment from expanded timing") | |
log(f" Final segment: Processed ({trimmed_audio_segment.shape[-1]} samples, {trimmed_audio_segment.shape[-1]/sample_rate:.3f}s)") | |
# Calculate the actual timing of the processed segment | |
final_duration = trimmed_audio_segment.shape[-1] / sample_rate | |
expanded_duration = expanded_end - expanded_start | |
# Calculate trim amounts based on phoneme positions | |
detected_phoneme_norm = normalize_phoneme_string(detected_phoneme_raw) | |
total_phoneme_len = len(detected_phoneme_norm) | |
if total_phoneme_len > 0: | |
start_trim_pct = match_start / total_phoneme_len * 0.85 # Apply 85% factor | |
end_trim_pct = (total_phoneme_len - match_end) / total_phoneme_len * 0.85 | |
time_trimmed_from_start = expanded_duration * start_trim_pct | |
time_trimmed_from_end = expanded_duration * end_trim_pct | |
final_start_time = expanded_start + time_trimmed_from_start | |
final_end_time = expanded_end - time_trimmed_from_end | |
log(f" Segment timing: {final_start_time:.3f}s - {final_end_time:.3f}s") | |
else: | |
log(f" Segment timing: {expanded_start:.3f}s - {expanded_end:.3f}s (no phoneme-based calculation)") | |
log(f"🔊 Audio segments returned to user:") | |
log(f" 1️⃣ Expected (TTS): Generated speech") | |
log(f" 2️⃣ User audio: {'WhisperX original' if trimmed_audio_segment is None else 'Processed/trimmed'} ({final_audio_segment.shape[-1]} samples)") | |
log(f" 3️⃣ WhisperX raw: Original timing ({whisperx_audio_segment.shape[-1]} samples)") | |
if trimmed_audio_segment is not None and final_audio_segment.shape[-1] != whisperx_audio_segment.shape[-1]: | |
sample_diff = final_audio_segment.shape[-1] - whisperx_audio_segment.shape[-1] | |
time_diff = sample_diff / sample_rate | |
log(f" 📊 Segment difference: {sample_diff:+d} samples ({time_diff:+.3f}s) processed vs WhisperX") | |
log(f"🔊 Final similarity score: {similarity_score:.3f}") | |
log(f"🎨 Final audio segment samples: {final_audio_segment.shape[-1]} (duration: {final_audio_segment.shape[-1]/sample_rate:.3f}s)") | |
log(f"🎤 WhisperX original segment samples: {whisperx_audio_segment.shape[-1]} (duration: {whisperx_audio_segment.shape[-1]/sample_rate:.3f}s)") | |
log(f"⏰ Timing comparison:") | |
log(f" WhisperX original: {start_time:.3f}s - {end_time:.3f}s (duration: {end_time - start_time:.3f}s)") | |
log(f" Expanded timing: {expanded_start:.3f}s - {expanded_end:.3f}s (duration: {expanded_end - expanded_start:.3f}s)") | |
# Store results - now with both original and clean versions | |
results.append({ | |
'word_original': word_original, # Original with punctuation for display | |
'word_clean': word_clean, # Cleaned version for phoneme processing | |
'detected_phoneme': best_match_phoneme, # Use best matching substring | |
'expected_phoneme': expected_phoneme, | |
'similarity_score': float(similarity_score), | |
'start_time': float(start_time), | |
'end_time': float(end_time), | |
'whisperx_confidence': float(word_info.get('score', 1.0)) | |
}) | |
# Prepare audio data with all three segments (use ORIGINAL word for display) | |
# All three audio segments will have 0.25s silence padding added automatically | |
user_audio_b64 = audio_to_base64(final_audio_segment, sample_rate) # Padded | |
whisperx_audio_b64 = audio_to_base64(whisperx_audio_segment, sample_rate) # Padded | |
expected_audio_b64 = tts_results[i] # Already padded in generate_tts_audio | |
audio_data_list.append({ | |
"word": word_original, # Original with punctuation for display | |
"expected_audio": expected_audio_b64, # TTS with padding | |
"user_audio": user_audio_b64, # User's pronunciation with padding | |
"whisperx_audio": whisperx_audio_b64, # WhisperX original with padding | |
"start_time": float(start_time), | |
"end_time": float(end_time), | |
"similarity_score": float(similarity_score), | |
"detected_phoneme": best_match_phoneme, # Use best matching substring | |
"expected_phoneme": expected_phoneme, | |
"whisperx_confidence": float(word_info.get('score', 1.0)) | |
}) | |
# 7. Format output using the refactored v2 functions | |
resolved_output = [] | |
resolved_colored = [] | |
for result in results: | |
output_text, colored_text = format_output_word_v2( | |
result['word_original'], # Pass both versions | |
result['word_clean'], | |
result['similarity_score'], | |
result['detected_phoneme'], | |
result['expected_phoneme'], | |
similarity | |
) | |
resolved_output.append(output_text) | |
resolved_colored.append(colored_text) | |
# Clean up temporary file | |
os.remove(temp_audio_path) | |
log("=== WHISPERX ENGLISH-ONLY PHONEME ANALYSIS COMPLETE ===") | |
return { | |
"resolved": " ".join(resolved_output), | |
"resolved_colored": " ".join(resolved_colored), | |
"audio_data": audio_data_list, | |
"debug_info": { | |
"total_words": len(words), | |
"similarity_threshold": similarity, | |
"alignment_method": "WhisperX English-only + Sliding Window", | |
"results_summary": [ | |
{ | |
"word": r['word_original'], | |
"score": float(r['similarity_score']), | |
"detected": r['detected_phoneme'], | |
"expected": r['expected_phoneme'], | |
"whisperx_confidence": r['whisperx_confidence'] | |
} | |
for r in results | |
] | |
} | |
} | |
except Exception as e: | |
log(f"ERROR in transcribe: {str(e)}") | |
import traceback | |
log(f"Traceback: {traceback.format_exc()}") | |
return { | |
"resolved": "Error occurred", | |
"resolved_colored": "Error occurred", | |
"audio_data": [], | |
"debug_info": {"error": str(e)} | |
} | |
def root(): | |
return "Clean Fonetik with WhisperX English-only + Character-Level Feedback running" | |
def clear_cache(): | |
global phoneme_cache, tts_cache | |
phoneme_cache.clear() | |
tts_cache.clear() | |
return {"message": "Cache cleared"} |