|
import os |
|
import torch |
|
import gradio as gr |
|
import numpy as np |
|
import soundfile as sf |
|
from transformers import ( |
|
AutoModelForSeq2SeqLM, |
|
AutoTokenizer, |
|
VitsModel, |
|
AutoProcessor, |
|
AutoModelForCTC, |
|
WhisperProcessor, |
|
WhisperForConditionalGeneration |
|
) |
|
from typing import Optional, Tuple, Dict, List |
|
|
|
class TalklasTranslator: |
|
""" |
|
Speech-to-Speech translation pipeline for Philippine languages. |
|
Uses MMS/Whisper for STT, NLLB for MT, and MMS for TTS. |
|
""" |
|
|
|
LANGUAGE_MAPPING = { |
|
"English": "eng", |
|
"Tagalog": "tgl", |
|
"Cebuano": "ceb", |
|
"Ilocano": "ilo", |
|
"Waray": "war", |
|
"Pangasinan": "pag" |
|
} |
|
|
|
NLLB_LANGUAGE_CODES = { |
|
"eng": "eng_Latn", |
|
"tgl": "tgl_Latn", |
|
"ceb": "ceb_Latn", |
|
"ilo": "ilo_Latn", |
|
"war": "war_Latn", |
|
"pag": "pag_Latn" |
|
} |
|
|
|
def __init__( |
|
self, |
|
source_lang: str = "eng", |
|
target_lang: str = "tgl", |
|
device: Optional[str] = None |
|
): |
|
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
|
self.source_lang = source_lang |
|
self.target_lang = target_lang |
|
self.sample_rate = 16000 |
|
|
|
print(f"Initializing Talklas Translator on {self.device}") |
|
|
|
|
|
self._initialize_stt_model() |
|
self._initialize_mt_model() |
|
self._initialize_tts_model() |
|
|
|
def _initialize_stt_model(self): |
|
"""Initialize speech-to-text model with fallback to Whisper""" |
|
try: |
|
print("Loading STT model...") |
|
try: |
|
|
|
self.stt_processor = AutoProcessor.from_pretrained("facebook/mms-1b-all") |
|
self.stt_model = AutoModelForCTC.from_pretrained("facebook/mms-1b-all") |
|
|
|
|
|
if self.source_lang in self.stt_processor.tokenizer.vocab.keys(): |
|
self.stt_processor.tokenizer.set_target_lang(self.source_lang) |
|
self.stt_model.load_adapter(self.source_lang) |
|
print(f"Loaded MMS STT model for {self.source_lang}") |
|
else: |
|
print(f"Language {self.source_lang} not in MMS, using default") |
|
|
|
except Exception as mms_error: |
|
print(f"MMS loading failed: {mms_error}") |
|
|
|
print("Loading Whisper as fallback...") |
|
self.stt_processor = WhisperProcessor.from_pretrained("openai/whisper-small") |
|
self.stt_model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small") |
|
print("Loaded Whisper STT model") |
|
|
|
self.stt_model.to(self.device) |
|
|
|
except Exception as e: |
|
print(f"STT model initialization failed: {e}") |
|
raise RuntimeError("Could not initialize STT model") |
|
|
|
def _initialize_mt_model(self): |
|
"""Initialize machine translation model""" |
|
try: |
|
print("Loading NLLB Translation model...") |
|
self.mt_model = AutoModelForSeq2SeqLM.from_pretrained("facebook/nllb-200-distilled-600M") |
|
self.mt_tokenizer = AutoTokenizer.from_pretrained("facebook/nllb-200-distilled-600M") |
|
self.mt_model.to(self.device) |
|
print("NLLB Translation model loaded") |
|
except Exception as e: |
|
print(f"MT model initialization failed: {e}") |
|
raise |
|
|
|
def _initialize_tts_model(self): |
|
"""Initialize text-to-speech model""" |
|
try: |
|
print("Loading TTS model...") |
|
try: |
|
self.tts_model = VitsModel.from_pretrained(f"facebook/mms-tts-{self.target_lang}") |
|
self.tts_tokenizer = AutoTokenizer.from_pretrained(f"facebook/mms-tts-{self.target_lang}") |
|
print(f"Loaded TTS model for {self.target_lang}") |
|
except Exception as tts_error: |
|
print(f"Target language TTS failed: {tts_error}") |
|
print("Falling back to English TTS") |
|
self.tts_model = VitsModel.from_pretrained("facebook/mms-tts-eng") |
|
self.tts_tokenizer = AutoTokenizer.from_pretrained("facebook/mms-tts-eng") |
|
|
|
self.tts_model.to(self.device) |
|
except Exception as e: |
|
print(f"TTS model initialization failed: {e}") |
|
raise |
|
|
|
def update_languages(self, source_lang: str, target_lang: str) -> str: |
|
"""Update languages and reinitialize models if needed""" |
|
if source_lang == self.source_lang and target_lang == self.target_lang: |
|
return "Languages already set" |
|
|
|
self.source_lang = source_lang |
|
self.target_lang = target_lang |
|
|
|
|
|
self._initialize_stt_model() |
|
self._initialize_tts_model() |
|
|
|
return f"Languages updated to {source_lang} → {target_lang}" |
|
|
|
def speech_to_text(self, audio_path: str) -> str: |
|
"""Convert speech to text using loaded STT model""" |
|
try: |
|
waveform, sample_rate = sf.read(audio_path) |
|
|
|
if sample_rate != 16000: |
|
import librosa |
|
waveform = librosa.resample(waveform, orig_sr=sample_rate, target_sr=16000) |
|
|
|
inputs = self.stt_processor( |
|
waveform, |
|
sampling_rate=16000, |
|
return_tensors="pt" |
|
).to(self.device) |
|
|
|
with torch.no_grad(): |
|
if isinstance(self.stt_model, WhisperForConditionalGeneration): |
|
generated_ids = self.stt_model.generate(**inputs) |
|
transcription = self.stt_processor.batch_decode(generated_ids, skip_special_tokens=True)[0] |
|
else: |
|
logits = self.stt_model(**inputs).logits |
|
predicted_ids = torch.argmax(logits, dim=-1) |
|
transcription = self.stt_processor.batch_decode(predicted_ids)[0] |
|
|
|
return transcription |
|
|
|
except Exception as e: |
|
print(f"Speech recognition failed: {e}") |
|
raise RuntimeError("Speech recognition failed") |
|
|
|
def translate_text(self, text: str) -> str: |
|
"""Translate text using NLLB model""" |
|
try: |
|
source_code = self.NLLB_LANGUAGE_CODES[self.source_lang] |
|
target_code = self.NLLB_LANGUAGE_CODES[self.target_lang] |
|
|
|
self.mt_tokenizer.src_lang = source_code |
|
inputs = self.mt_tokenizer(text, return_tensors="pt").to(self.device) |
|
|
|
with torch.no_grad(): |
|
generated_tokens = self.mt_model.generate( |
|
**inputs, |
|
forced_bos_token_id=self.mt_tokenizer.convert_tokens_to_ids(target_code), |
|
max_length=448 |
|
) |
|
|
|
return self.mt_tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0] |
|
|
|
except Exception as e: |
|
print(f"Translation failed: {e}") |
|
raise RuntimeError("Text translation failed") |
|
|
|
def text_to_speech(self, text: str) -> Tuple[int, np.ndarray]: |
|
"""Convert text to speech""" |
|
try: |
|
inputs = self.tts_tokenizer(text, return_tensors="pt").to(self.device) |
|
|
|
with torch.no_grad(): |
|
output = self.tts_model(**inputs) |
|
|
|
speech = output.waveform.cpu().numpy().squeeze() |
|
speech = (speech * 32767).astype(np.int16) |
|
|
|
return self.tts_model.config.sampling_rate, speech |
|
|
|
except Exception as e: |
|
print(f"Speech synthesis failed: {e}") |
|
raise RuntimeError("Speech synthesis failed") |
|
|
|
def translate_speech(self, audio_path: str) -> Dict: |
|
"""Full speech-to-speech translation""" |
|
try: |
|
source_text = self.speech_to_text(audio_path) |
|
translated_text = self.translate_text(source_text) |
|
sample_rate, audio = self.text_to_speech(translated_text) |
|
|
|
return { |
|
"source_text": source_text, |
|
"translated_text": translated_text, |
|
"output_audio": (sample_rate, audio), |
|
"performance": "Translation successful" |
|
} |
|
except Exception as e: |
|
return { |
|
"source_text": "Error", |
|
"translated_text": "Error", |
|
"output_audio": (16000, np.zeros(1000, dtype=np.int16)), |
|
"performance": f"Error: {str(e)}" |
|
} |
|
|
|
def translate_text_only(self, text: str) -> Dict: |
|
"""Text-to-speech translation""" |
|
try: |
|
translated_text = self.translate_text(text) |
|
sample_rate, audio = self.text_to_speech(translated_text) |
|
|
|
return { |
|
"source_text": text, |
|
"translated_text": translated_text, |
|
"output_audio": (sample_rate, audio), |
|
"performance": "Translation successful" |
|
} |
|
except Exception as e: |
|
return { |
|
"source_text": text, |
|
"translated_text": "Error", |
|
"output_audio": (16000, np.zeros(1000, dtype=np.int16)), |
|
"performance": f"Error: {str(e)}" |
|
} |
|
|
|
class TranslatorSingleton: |
|
_instance = None |
|
|
|
@classmethod |
|
def get_instance(cls): |
|
if cls._instance is None: |
|
cls._instance = TalklasTranslator() |
|
return cls._instance |
|
|
|
def process_audio(audio_path, source_lang, target_lang): |
|
"""Process audio through the full translation pipeline""" |
|
|
|
if not audio_path: |
|
return None, "No audio provided", "No translation available", "Please provide audio input" |
|
|
|
|
|
source_code = TalklasTranslator.LANGUAGE_MAPPING[source_lang] |
|
target_code = TalklasTranslator.LANGUAGE_MAPPING[target_lang] |
|
|
|
translator = TranslatorSingleton.get_instance() |
|
status = translator.update_languages(source_code, target_code) |
|
|
|
|
|
results = translator.translate_speech(audio_path) |
|
|
|
return results["output_audio"], results["source_text"], results["translated_text"], results["performance"] |
|
|
|
def process_text(text, source_lang, target_lang): |
|
"""Process text through the translation pipeline""" |
|
|
|
if not text: |
|
return None, "No text provided", "No translation available", "Please provide text input" |
|
|
|
|
|
source_code = TalklasTranslator.LANGUAGE_MAPPING[source_lang] |
|
target_code = TalklasTranslator.LANGUAGE_MAPPING[target_lang] |
|
|
|
translator = TranslatorSingleton.get_instance() |
|
status = translator.update_languages(source_code, target_code) |
|
|
|
|
|
results = translator.translate_text_only(text) |
|
|
|
return results["output_audio"], results["source_text"], results["translated_text"], results["performance"] |
|
|
|
def create_gradio_interface(): |
|
"""Create and launch Gradio interface""" |
|
|
|
languages = list(TalklasTranslator.LANGUAGE_MAPPING.keys()) |
|
|
|
|
|
demo = gr.Blocks(title="Talklas - Speech & Text Translation") |
|
|
|
with demo: |
|
gr.Markdown("# Talklas: Speech-to-Speech Translation System") |
|
gr.Markdown("### Translate between Philippine Languages and English") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
source_lang = gr.Dropdown( |
|
choices=languages, |
|
value="English", |
|
label="Source Language" |
|
) |
|
|
|
target_lang = gr.Dropdown( |
|
choices=languages, |
|
value="Tagalog", |
|
label="Target Language" |
|
) |
|
|
|
language_status = gr.Textbox(label="Language Status") |
|
update_btn = gr.Button("Update Languages") |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("Audio Input"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("### Audio Input") |
|
audio_input = gr.Audio( |
|
type="filepath", |
|
label="Upload Audio File" |
|
) |
|
audio_translate_btn = gr.Button("Translate Audio", variant="primary") |
|
|
|
with gr.Column(): |
|
gr.Markdown("### Output") |
|
audio_output = gr.Audio( |
|
label="Translated Speech", |
|
type="numpy", |
|
autoplay=True |
|
) |
|
|
|
with gr.TabItem("Text Input"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("### Text Input") |
|
text_input = gr.Textbox( |
|
label="Enter text to translate", |
|
lines=3 |
|
) |
|
text_translate_btn = gr.Button("Translate Text", variant="primary") |
|
|
|
with gr.Column(): |
|
gr.Markdown("### Output") |
|
text_output = gr.Audio( |
|
label="Translated Speech", |
|
type="numpy", |
|
autoplay=True |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
source_text = gr.Textbox(label="Source Text") |
|
translated_text = gr.Textbox(label="Translated Text") |
|
performance_info = gr.Textbox(label="Performance Metrics") |
|
|
|
|
|
update_btn.click( |
|
lambda source_lang, target_lang: TranslatorSingleton.get_instance().update_languages( |
|
TalklasTranslator.LANGUAGE_MAPPING[source_lang], |
|
TalklasTranslator.LANGUAGE_MAPPING[target_lang] |
|
), |
|
inputs=[source_lang, target_lang], |
|
outputs=[language_status] |
|
) |
|
|
|
|
|
audio_translate_btn.click( |
|
process_audio, |
|
inputs=[audio_input, source_lang, target_lang], |
|
outputs=[audio_output, source_text, translated_text, performance_info] |
|
).then( |
|
None, |
|
None, |
|
None, |
|
js="""() => { |
|
const audioElements = document.querySelectorAll('audio'); |
|
if (audioElements.length > 0) { |
|
const lastAudio = audioElements[audioElements.length - 1]; |
|
lastAudio.play().catch(error => { |
|
console.warn('Autoplay failed:', error); |
|
alert('Audio may require user interaction to play'); |
|
}); |
|
} |
|
}""" |
|
) |
|
|
|
|
|
text_translate_btn.click( |
|
process_text, |
|
inputs=[text_input, source_lang, target_lang], |
|
outputs=[text_output, source_text, translated_text, performance_info] |
|
).then( |
|
None, |
|
None, |
|
None, |
|
js="""() => { |
|
const audioElements = document.querySelectorAll('audio'); |
|
if (audioElements.length > 0) { |
|
const lastAudio = audioElements[audioElements.length - 1]; |
|
lastAudio.play().catch(error => { |
|
console.warn('Autoplay failed:', error); |
|
alert('Audio may require user interaction to play'); |
|
}); |
|
} |
|
}""" |
|
) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = create_gradio_interface() |
|
demo.launch(share=True, debug=True) |