jerrypan7's picture
Update app.py
a590991 verified
raw
history blame
11 kB
import gradio as gr
import requests
import uuid
import os
from typing import Optional
import tempfile
from pydub import AudioSegment
import re
ASR_API = "http://astarwiz.com:9998/asr"
TTS_SPEAK_SERVICE = 'http://astarwiz.com:9603/speak'
TTS_WAVE_SERVICE = 'http://astarwiz.com:9603/wave'
LANGUAGE_MAP = {
"en": "English",
"ma": "Malay",
"ta": "Tamil",
"zh": "Chinese"
}
# Add a password for developer mode
DEVELOPER_PASSWORD = os.getenv("DEV_PWD")
# Add this constant for the RapidAPI key
#RAPID_API_KEY = os.getenv("RAPID_API_KEY")
RAPID_API_KEY = os.getenv("RAPID_API_KEY")
# Add this constant for available speakers
AVAILABLE_SPEAKERS = {
"en": ["MS"],
"ma": ["msFemale"],
"ta": ["ta_female1"],
"zh": ["childChinese2"]
}
def fetch_youtube_id(youtube_url: str) -> str:
if 'v=' in youtube_url:
return youtube_url.split("v=")[1].split("&")[0]
elif 'youtu.be/' in youtube_url:
return youtube_url.split("youtu.be/")[1]
elif 'shorts' in youtube_url:
return youtube_url.split("/")[-1]
else:
raise Exception("Unsupported URL format")
def download_youtube_audio(youtube_url: str, output_dir: Optional[str] = None) -> Optional[str]:
video_id = fetch_youtube_id(youtube_url)
if not video_id:
return None
if output_dir is None:
output_dir = tempfile.gettempdir()
output_filename = os.path.join(output_dir, f"{video_id}.mp3")
if os.path.exists(output_filename):
return output_filename # Return if the file already exists
url = "https://youtube86.p.rapidapi.com/api/youtube/links"
headers = {
'Content-Type': 'application/json',
'x-rapidapi-host': 'youtube86.p.rapidapi.com',
'x-rapidapi-key': RAPID_API_KEY
}
data = {
"url": youtube_url
}
response = requests.post(url, headers=headers, json=data)
print('Fetched audio links')
if response.status_code == 200:
result = response.json()
for url in result[0]['urls']:
if url.get('isBundle'):
audio_url = url['url']
extension = url['extension']
audio_response = requests.get(audio_url)
if audio_response.status_code == 200:
temp_filename = os.path.join(output_dir, f"{video_id}.{extension}")
with open(temp_filename, 'wb') as audio_file:
audio_file.write(audio_response.content)
# Convert to MP3 and downsample to 16000 Hz
audio = AudioSegment.from_file(temp_filename, format=extension)
audio = audio.set_frame_rate(16000)
audio.export(output_filename, format="mp3", parameters=["-ar", "16000"])
os.remove(temp_filename) # Remove the temporary file
return output_filename # Return the final MP3 filename
return None # Return None if no successful download occurs
else:
print("Error:", response.status_code, response.text)
return None # Return None on failure
punctuation_marks = r'([\.!?!?。])'
"""
def split_text_with_punctuation(text):
# Split the text using the punctuation marks, keeping the punctuation marks
split_text = re.split(punctuation_marks, text)
# Combine each punctuation mark with the preceding segment
combined_segments = []
for i in range(0, len(split_text) - 1, 2):
combined_segments.append(split_text[i] + split_text[i + 1])
# If there's any remaining text after the last punctuation, append it as well
if len(split_text) % 2 != 0 and split_text[-1]:
combined_segments.append(split_text[-1])
return combined_segments
"""
def split_text_with_punctuation(text):
# Split the text using the punctuation marks, keeping the punctuation marks
split_text = re.split(punctuation_marks, text)
# Combine each punctuation mark with the preceding segment
combined_segments = []
# Loop through the split text in steps of 2
for i in range(0, len(split_text) - 1, 2):
combined_segments.append(split_text[i] + split_text[i + 1])
# Handle any remaining text that doesn't have a punctuation following it
if len(split_text) % 2 != 0 and split_text[-1]:
combined_segments.append(split_text[-1])
# Split any segment that exceeds 50 words
final_segments = []
for segment in combined_segments:
words = segment.split() # Split each segment into words
if len(words) > 50:
# Split the segment into chunks of no more than 50 words
for j in range(0, len(words), 50):
final_segments.append(' '.join(words[j:j+50]))
else:
final_segments.append(segment)
return [segment for segment in final_segments if segment] # Filter out empty strings
def inference_via_llm_api(input_text, min_new_tokens=2, max_new_tokens=64):
print(input_text)
one_vllm_input = f"<|im_start|>system\nYou are a translation expert.<|im_end|>\n<|im_start|>user\n{input_text}<|im_end|>\n<|im_start|>assistant"
vllm_api = 'http://astarwiz.com:2333/' + "v1/completions"
data = {
"prompt": one_vllm_input,
'model': "./Edu-4B-NewTok-V2-20240904/",
'min_tokens': min_new_tokens,
'max_tokens': max_new_tokens,
'temperature': 0.1,
'top_p': 0.75,
'repetition_penalty': 1.1,
"stop_token_ids": [151645, ],
}
response = requests.post(vllm_api, headers={"Content-Type": "application/json"}, json=data).json()
print(response)
if "choices" in response.keys():
return response["choices"][0]['text'].strip()
else:
return "The system got some error during vLLM generation. Please try it again."
def transcribe_and_speak(audio, source_lang, target_lang, youtube_url=None, target_speaker=None):
if youtube_url:
audio = download_youtube_audio(youtube_url)
if not audio:
return "Failed to download YouTube audio.", None, None
if not audio:
return "Please provide an audio input or a valid YouTube URL.", None, None
# ASR
file_id = str(uuid.uuid4())
files = {'file': open(audio, 'rb')}
data = {
'language': 'ms' if source_lang == 'ma' else source_lang,
'model_name': 'whisper-large-v2-local-cs',
'with_timestamp': False
}
asr_response = requests.post(ASR_API, files=files, data=data)
print(asr_response.json())
if asr_response.status_code == 200:
transcription = asr_response.json()['text']
else:
return "ASR failed", None, None
split_result = split_text_with_punctuation(transcription)
translate_segments=[]
for segment in split_result:
translation_prompt = f"Translate the following text from {LANGUAGE_MAP[source_lang]} to {LANGUAGE_MAP[target_lang]}: {segment}"
translated_seg_txt = inference_via_llm_api(translation_prompt)
translate_segments.append(translated_seg_txt)
print(f"Translation: {translated_seg_txt}")
translated_text = " ".join(translate_segments)
# TTS
tts_params = {
'language': target_lang,
'speed': 1.1,
'speaker': target_speaker or AVAILABLE_SPEAKERS[target_lang][0], # Use the first speaker as default
'text': translated_text
}
tts_response = requests.get(TTS_SPEAK_SERVICE, params=tts_params)
if tts_response.status_code == 200:
audio_file = tts_response.text.strip()
audio_url = f"{TTS_WAVE_SERVICE}?file={audio_file}"
return transcription, translated_text, audio_url
else:
return transcription, translated_text, "TTS failed"
def check_password(password):
return password == DEVELOPER_PASSWORD
def run_speech_translation(audio, source_lang, target_lang, youtube_url, target_speaker):
transcription, translated_text, audio_url = transcribe_and_speak(audio, source_lang, target_lang, youtube_url, target_speaker)
return transcription, translated_text, audio_url
with gr.Blocks() as demo:
gr.Markdown("# Speech Translation")
# with gr.Tab("User Mode"):
gr.Markdown("Speak into the microphone, upload an audio file, or provide a YouTube URL. The app will translate and speak it back to you.")
with gr.Row():
user_audio_input = gr.Audio(sources=["microphone", "upload"], type="filepath")
user_youtube_url = gr.Textbox(label="YouTube URL (optional)")
with gr.Row():
user_source_lang = gr.Dropdown(choices=["en", "ma", "ta", "zh"], label="Source Language", value="en")
user_target_lang = gr.Dropdown(choices=["en", "ma", "ta", "zh"], label="Target Language", value="zh")
user_target_speaker = gr.Dropdown(choices=AVAILABLE_SPEAKERS['zh'], label="Target Speaker", value="childChinese2")
with gr.Row():
user_button = gr.Button("Translate and Speak", interactive=False)
with gr.Row():
user_transcription_output = gr.Textbox(label="Transcription")
user_translation_output = gr.Textbox(label="Translation")
user_audio_output = gr.Audio(label="Translated Speech")
user_video_output = gr.HTML(label="YouTube Video")
def update_button_state(audio, youtube_url):
print(audio, youtube_url)
return gr.Button(interactive=bool(audio) or bool(youtube_url))
user_audio_input.change(
fn=update_button_state,
inputs=[user_audio_input, user_youtube_url],
outputs=user_button
)
user_youtube_url.change(
fn=update_button_state,
inputs=[user_audio_input, user_youtube_url],
outputs=user_button
)
user_button.click(
fn=run_speech_translation,
inputs=[user_audio_input, user_source_lang, user_target_lang, user_youtube_url, user_target_speaker],
outputs=[user_transcription_output, user_translation_output, user_audio_output]
)
def update_video_embed(youtube_url):
if youtube_url:
try:
video_id = fetch_youtube_id(youtube_url)
return f'<iframe width="560" height="315" src="https://www.youtube.com/embed/{video_id}" frameborder="0" allow="autoplay; encrypted-media" allowfullscreen></iframe>'
except Exception as e:
print(f"Error embedding video: {e}")
return ""
user_youtube_url.change(
fn=update_video_embed,
inputs=[user_youtube_url],
outputs=[user_video_output]
)
def update_target_speakers(target_lang):
return gr.Dropdown(choices=AVAILABLE_SPEAKERS[target_lang], value=AVAILABLE_SPEAKERS[target_lang][0])
user_target_lang.change(
fn=update_target_speakers,
inputs=[user_target_lang],
outputs=[user_target_speaker]
)
demo.launch(auth=(os.getenv("DEV_USER"), os.getenv("DEV_PWD")))