import gradio as gr import requests import uuid import os from typing import Optional import tempfile from pydub import AudioSegment import re ASR_API = "http://astarwiz.com:9998/asr" TTS_SPEAK_SERVICE = 'http://astarwiz.com:9603/speak' TTS_WAVE_SERVICE = 'http://astarwiz.com:9603/wave' LANGUAGE_MAP = { "en": "English", "ma": "Malay", "ta": "Tamil", "zh": "Chinese" } # Add a password for developer mode DEVELOPER_PASSWORD = os.getenv("DEV_PWD") # Add this constant for the RapidAPI key RAPID_API_KEY = os.getenv("RAPID_API_KEY") # Add this constant for available speakers AVAILABLE_SPEAKERS = { "en": ["MS"], "ma": ["msFemale"], "ta": ["ta_female1"], "zh": ["childChinese2"] } def fetch_youtube_id(youtube_url: str) -> str: if 'v=' in youtube_url: return youtube_url.split("v=")[1].split("&")[0] elif 'youtu.be/' in youtube_url: return youtube_url.split("youtu.be/")[1] elif 'shorts' in youtube_url: return youtube_url.split("/")[-1] else: raise Exception("Unsupported URL format") def download_youtube_audio(youtube_url: str, output_dir: Optional[str] = None) -> Optional[str]: video_id = fetch_youtube_id(youtube_url) if not video_id: return None if output_dir is None: output_dir = tempfile.gettempdir() output_filename = os.path.join(output_dir, f"{video_id}.mp3") if os.path.exists(output_filename): return output_filename # Return if the file already exists url = "https://youtube86.p.rapidapi.com/api/youtube/links" headers = { 'Content-Type': 'application/json', 'x-rapidapi-host': 'youtube86.p.rapidapi.com', 'x-rapidapi-key': RAPID_API_KEY } data = { "url": youtube_url } response = requests.post(url, headers=headers, json=data) print('Fetched audio links') if response.status_code == 200: result = response.json() for url in result[0]['urls']: if url.get('isBundle'): audio_url = url['url'] extension = url['extension'] audio_response = requests.get(audio_url) if audio_response.status_code == 200: temp_filename = os.path.join(output_dir, f"{video_id}.{extension}") with open(temp_filename, 'wb') as audio_file: audio_file.write(audio_response.content) # Convert to MP3 and downsample to 16000 Hz audio = AudioSegment.from_file(temp_filename, format=extension) audio = audio.set_frame_rate(16000) audio.export(output_filename, format="mp3", parameters=["-ar", "16000"]) os.remove(temp_filename) # Remove the temporary file return output_filename # Return the final MP3 filename return None # Return None if no successful download occurs else: print("Error:", response.status_code, response.text) return None # Return None on failure def inference_via_llm_api(input_text, min_new_tokens=2, max_new_tokens=64): print(input_text) one_vllm_input = f"<|im_start|>system\nYou are a translation expert.<|im_end|>\n<|im_start|>user\n{input_text}<|im_end|>\n<|im_start|>assistant" vllm_api = 'http://astarwiz.com:2333/' + "v1/completions" data = { "prompt": one_vllm_input, 'model': "./Edu-4B-NewTok-V2-20240904/", 'min_tokens': min_new_tokens, 'max_tokens': max_new_tokens, 'temperature': 0.1, 'top_p': 0.75, 'repetition_penalty': 1.1, "stop_token_ids": [151645, ], } response = requests.post(vllm_api, headers={"Content-Type": "application/json"}, json=data).json() print(response) if "choices" in response.keys(): return response["choices"][0]['text'].strip() else: return "The system got some error during vLLM generation. Please try it again." def transcribe_and_speak(audio, source_lang, target_lang, youtube_url=None, target_speaker=None): if youtube_url: audio = download_youtube_audio(youtube_url) if not audio: return "Failed to download YouTube audio.", None, None if not audio: return "Please provide an audio input or a valid YouTube URL.", None, None # ASR file_id = str(uuid.uuid4()) files = {'file': open(audio, 'rb')} data = { 'language': 'ms' if source_lang == 'ma' else source_lang, 'model_name': 'whisper-large-v2-local-cs', 'with_timestamp': False } asr_response = requests.post(ASR_API, files=files, data=data) print(asr_response.json()) if asr_response.status_code == 200: transcription = asr_response.json()['text'] else: return "ASR failed", None, None translation_prompt = f"Translate the following text from {LANGUAGE_MAP[source_lang]} to {LANGUAGE_MAP[target_lang]}: {transcription}" translated_text = inference_via_llm_api(translation_prompt) print(f"Translation: {translated_text}") # TTS tts_params = { 'language': target_lang, 'speed': 1.1, 'speaker': target_speaker or AVAILABLE_SPEAKERS[target_lang][0], # Use the first speaker as default 'text': translated_text } tts_response = requests.get(TTS_SPEAK_SERVICE, params=tts_params) if tts_response.status_code == 200: audio_file = tts_response.text.strip() audio_url = f"{TTS_WAVE_SERVICE}?file={audio_file}" return transcription, translated_text, audio_url else: return transcription, translated_text, "TTS failed" def check_password(password): return password == DEVELOPER_PASSWORD def run_speech_translation(audio, source_lang, target_lang, youtube_url, target_speaker): transcription, translated_text, audio_url = transcribe_and_speak(audio, source_lang, target_lang, youtube_url, target_speaker) return transcription, translated_text, audio_url with gr.Blocks() as demo: gr.Markdown("# Speech Translation") # with gr.Tab("User Mode"): gr.Markdown("Speak into the microphone, upload an audio file, or provide a YouTube URL. The app will translate and speak it back to you.") with gr.Row(): user_audio_input = gr.Audio(sources=["microphone", "upload"], type="filepath") user_youtube_url = gr.Textbox(label="YouTube URL (optional)") user_source_lang = gr.Dropdown(choices=["en", "ma", "ta", "zh"], label="Source Language", value="en") user_target_lang = gr.Dropdown(choices=["en", "ma", "ta", "zh"], label="Target Language", value="zh") user_target_speaker = gr.Dropdown(choices=[], label="Target Speaker") with gr.Row(): user_button = gr.Button("Translate and Speak", interactive=False) with gr.Row(): user_transcription_output = gr.Textbox(label="Transcription") user_translation_output = gr.Textbox(label="Translation") user_audio_output = gr.Audio(label="Translated Speech") user_video_output = gr.HTML(label="YouTube Video") def update_button_state(audio, youtube_url): print(audio, youtube_url) return gr.Button(interactive=bool(audio) or bool(youtube_url)) user_audio_input.change( fn=update_button_state, inputs=[user_audio_input, user_youtube_url], outputs=user_button ) user_youtube_url.change( fn=update_button_state, inputs=[user_audio_input, user_youtube_url], outputs=user_button ) user_button.click( fn=run_speech_translation, inputs=[user_audio_input, user_source_lang, user_target_lang, user_youtube_url, user_target_speaker], outputs=[user_transcription_output, user_translation_output, user_audio_output] ) def update_video_embed(youtube_url): if youtube_url: try: video_id = fetch_youtube_id(youtube_url) return f'' except Exception as e: print(f"Error embedding video: {e}") return "" user_youtube_url.change( fn=update_video_embed, inputs=[user_youtube_url], outputs=[user_video_output] ) def update_target_speakers(target_lang): return gr.Dropdown(choices=AVAILABLE_SPEAKERS[target_lang], value=AVAILABLE_SPEAKERS[target_lang][0]) user_target_lang.change( fn=update_target_speakers, inputs=[user_target_lang], outputs=[user_target_speaker] ) demo.launch(auth=(os.getenv("DEV_USER"), os.getenv("DEV_PWD")))