File size: 9,594 Bytes
bea8fb1
 
 
 
f3fbcc1
 
 
 
 
bea8fb1
 
 
 
 
 
 
 
 
 
 
 
 
 
f3fbcc1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bea8fb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f3fbcc1
 
 
 
 
 
bea8fb1
f3fbcc1
bea8fb1
 
 
 
 
 
d098eb3
 
bea8fb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f3fbcc1
 
 
bea8fb1
 
2ac88d4
bea8fb1
 
f3fbcc1
bea8fb1
 
 
f3fbcc1
bea8fb1
 
 
 
3cfe0fd
bea8fb1
 
f3fbcc1
 
bea8fb1
 
3cfe0fd
992acbf
 
3cfe0fd
 
 
 
 
 
 
 
 
 
 
 
bea8fb1
 
f3fbcc1
 
bea8fb1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57047ad
 
 
 
bea8fb1
 
 
 
57047ad
 
e717cd1
bea8fb1
57047ad
 
 
bea8fb1
 
 
 
 
 
 
 
 
 
 
 
 
 
af33f98
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import gradio as gr
import requests
import uuid
import os
from typing import Optional
import tempfile
from pydub import AudioSegment
import re

ASR_API = "http://astarwiz.com:9998/asr"
TTS_SPEAK_SERVICE = 'http://astarwiz.com:9603/speak'
TTS_WAVE_SERVICE = 'http://astarwiz.com:9603/wave'

LANGUAGE_MAP = {
    "en": "English",
    "ma": "Malay",
    "ta": "Tamil",
    "zh": "Chinese"
}

# Add a password for developer mode
DEVELOPER_PASSWORD = os.getenv("DEV_PWD")

# Add this constant for the RapidAPI key
RAPID_API_KEY = os.getenv("RAPID_API_KEY")

def fetch_youtube_id(youtube_url: str) -> str:
    if 'v=' in youtube_url:
        return youtube_url.split("v=")[1]
    elif 'shorts' in youtube_url:
        return youtube_url.split("/")[-1]
    else:
        raise Exception("Unsupported URL format")

def download_youtube_audio(youtube_url: str, output_dir: Optional[str] = None) -> Optional[str]:
    video_id = fetch_youtube_id(youtube_url)
    
    if not video_id:
        return None

    if output_dir is None:
        output_dir = tempfile.gettempdir()

    output_filename = os.path.join(output_dir, f"{video_id}.mp3")
    
    if os.path.exists(output_filename):
        return output_filename  # Return if the file already exists
    
    url = "https://youtube86.p.rapidapi.com/api/youtube/links"
    headers = {
        'Content-Type': 'application/json',
        'x-rapidapi-host': 'youtube86.p.rapidapi.com',
        'x-rapidapi-key': RAPID_API_KEY
    }
    data = {
        "url": youtube_url
    }
    
    response = requests.post(url, headers=headers, json=data)
    print('Fetched audio links')
    
    if response.status_code == 200:
        result = response.json()
        for url in result[0]['urls']:
            if url.get('isBundle'):
                audio_url = url['url']
                extension = url['extension']
                audio_response = requests.get(audio_url)
                
                if audio_response.status_code == 200:
                    temp_filename = os.path.join(output_dir, f"{video_id}.{extension}")
                    with open(temp_filename, 'wb') as audio_file:
                        audio_file.write(audio_response.content)
                    
                    # Convert to MP3 and downsample to 16000 Hz
                    audio = AudioSegment.from_file(temp_filename, format=extension)
                    audio = audio.set_frame_rate(16000)
                    audio.export(output_filename, format="mp3", parameters=["-ar", "16000"])
                    
                    os.remove(temp_filename)  # Remove the temporary file
                    return output_filename  # Return the final MP3 filename
        
        return None  # Return None if no successful download occurs
    else:
        print("Error:", response.status_code, response.text)
        return None  # Return None on failure

def inference_via_llm_api(input_text, min_new_tokens=2, max_new_tokens=64):
    print(input_text)
    one_vllm_input = f"<|im_start|>system\nYou are a translation expert.<|im_end|>\n<|im_start|>user\n{input_text}<|im_end|>\n<|im_start|>assistant"
    vllm_api = 'http://astarwiz.com:2333/' + "v1/completions"
    data = {
        "prompt": one_vllm_input,
        'model': "./Edu-4B-NewTok-V2-20240904/",
        'min_tokens': min_new_tokens,
        'max_tokens': max_new_tokens,
        'temperature': 0.1,
        'top_p': 0.75,
        'repetition_penalty': 1.1,
        "stop_token_ids": [151645, ],
    }
    response = requests.post(vllm_api, headers={"Content-Type": "application/json"}, json=data).json()
    print(response)
    if "choices" in response.keys():
        return response["choices"][0]['text'].strip()
    else:
        return "The system got some error during vLLM generation. Please try it again."
    
def transcribe_and_speak(audio, source_lang, target_lang, youtube_url=None):
    if youtube_url:
        audio = download_youtube_audio(youtube_url)
        if not audio:
            return "Failed to download YouTube audio.", None, None

    if not audio:
        return "Please provide an audio input or a valid YouTube URL.", None, None

    # ASR
    file_id = str(uuid.uuid4())
    files = {'file': open(audio, 'rb')}
    data = {
        'language': 'ms' if source_lang == 'ma' else source_lang,
        'model_name': 'whisper-large-v2-local-cs',
        'with_timestamp': False
    }

    asr_response = requests.post(ASR_API, files=files, data=data)
    print(asr_response.json())
    if asr_response.status_code == 200:
        transcription = asr_response.json()['text']
    else:
        return "ASR failed", None, None

    translation_prompt = f"Translate the following text from {LANGUAGE_MAP[source_lang]} to {LANGUAGE_MAP[target_lang]}: {transcription}"
    translated_text = inference_via_llm_api(translation_prompt)
    print(f"Translation: {translated_text}")

    # TTS
    tts_params = {
        'language': target_lang,
        'speed': 1.1,
        'speaker': 'MS' if target_lang == 'en' else 'msFemale' if target_lang == 'ma' else 'ta_female1' if target_lang == 'ta' else 'childChinese2',
        'text': translated_text
    }
    
    tts_response = requests.get(TTS_SPEAK_SERVICE, params=tts_params)
    if tts_response.status_code == 200:
        audio_file = tts_response.text.strip()
        audio_url = f"{TTS_WAVE_SERVICE}?file={audio_file}"
        return transcription, translated_text, audio_url
    else:
        return transcription, translated_text, "TTS failed"

def check_password(password):
    return password == DEVELOPER_PASSWORD

def user_interface(audio, source_lang, target_lang, youtube_url):
    transcription, translated_text, audio_url = transcribe_and_speak(audio, source_lang, target_lang, youtube_url)
    return transcription, translated_text, audio_url

with gr.Blocks() as demo:
    gr.Markdown("# Speech Translation")
    
    with gr.Tab("User Mode"):
        gr.Markdown("Speak into the microphone, upload an audio file, or provide a YouTube URL. The app will translate and speak it back to you.")
        
        with gr.Row():
            user_audio_input = gr.Audio(sources=["microphone", "upload"], type="filepath")
            user_youtube_url = gr.Textbox(label="YouTube URL (optional)")
            user_source_lang = gr.Dropdown(choices=["en", "ma", "ta", "zh"], label="Source Language", value="en")
            user_target_lang = gr.Dropdown(choices=["en", "ma", "ta", "zh"], label="Target Language", value="zh")
        
        with gr.Row():
            user_button = gr.Button("Translate and Speak", interactive=False)
        
        with gr.Row():
            user_transcription_output = gr.Textbox(label="Transcription")
            user_translation_output = gr.Textbox(label="Translation")
            user_audio_output = gr.Audio(label="Translated Speech")
        
        def update_button_state(audio, youtube_url):
            print(audio, youtube_url)
            return gr.Button(interactive=bool(audio) or bool(youtube_url))

        user_audio_input.change(
            fn=update_button_state,
            inputs=[user_audio_input, user_youtube_url],
            outputs=user_button
        )
        user_youtube_url.change(
            fn=update_button_state,
            inputs=[user_audio_input, user_youtube_url],
            outputs=user_button
        )
        
        user_button.click(
            fn=user_interface,
            inputs=[user_audio_input, user_source_lang, user_target_lang, user_youtube_url],
            outputs=[user_transcription_output, user_translation_output, user_audio_output]
        )
    
    with gr.Tab("Developer Mode"):
        password_input = gr.Textbox(type="password", label="Enter Developer Password")
        login_button = gr.Button("Login")
        login_error = gr.Markdown(visible=False)
        
        dev_interface = gr.Column(visible=False)
        
        with dev_interface:
            gr.Markdown("Developer Mode: Transcription, Translation, and TTS")
            
            with gr.Row():
                dev_audio_input = gr.Audio(sources=["microphone", "upload"], type="filepath")
                dev_source_lang = gr.Dropdown(choices=["en", "ma", "ta", "zh"], label="Source Language", value="en")
                dev_target_lang = gr.Dropdown(choices=["en", "ma", "ta", "zh"], label="Target Language", value="zh")
            
            with gr.Row():
                dev_button = gr.Button("Transcribe, Translate, and Speak")
            
            with gr.Row():
                dev_text_output = gr.Textbox(label="Transcription")
            
            with gr.Row():
                dev_translation_output = gr.Textbox(label="Translation")
            
            with gr.Row():
                dev_audio_output = gr.Audio(label="Translated Speech")
            
            dev_button.click(
                fn=transcribe_and_speak,
                inputs=[dev_audio_input, dev_source_lang, dev_target_lang],
                outputs=[dev_text_output, dev_translation_output, dev_audio_output]
            )
        
        def login(password):
            if check_password(password):
                return gr.Column(visible=True), gr.Markdown(visible=False)
            else:
                return gr.Column(visible=False), gr.Markdown("Incorrect password. Please try again.", visible=True)
        
        login_button.click(
            fn=login,
            inputs=[password_input],
            outputs=[dev_interface, login_error]
        )

demo.launch(auth=(os.getenv("DEV_USER"), os.getenv("DEV_PWD")))