|
import gradio as gr |
|
import speech_recognition as sr |
|
from huggingface_hub import InferenceClient |
|
import random |
|
import textwrap |
|
import pyttsx3 |
|
|
|
|
|
recognizer = sr.Recognizer() |
|
tts_engine = pyttsx3.init() |
|
|
|
|
|
model = "mistralai/Mixtral-8x7B-Instruct-v0.1" |
|
client = InferenceClient(model) |
|
|
|
|
|
system_prompt_text = ( |
|
"You are a smart and helpful co-worker of Thailand based multi-national company PTT, " |
|
"and PTTEP. You help with any kind of request and provide a detailed answer to the question. " |
|
"But if you are asked about something unethical or dangerous, you must refuse and provide a safe and respectful way to handle that." |
|
) |
|
|
|
|
|
with open("info.md", "r", encoding="utf-8") as file: |
|
info_md_content = file.read() |
|
|
|
|
|
chunk_size = 2500 |
|
info_md_chunks = textwrap.wrap(info_md_content, chunk_size) |
|
|
|
def get_all_chunks(chunks): |
|
return "\n\n".join(chunks) |
|
|
|
def format_prompt_mixtral(message, history, info_md_chunks): |
|
prompt = "<s>" |
|
all_chunks = get_all_chunks(info_md_chunks) |
|
prompt += f"{all_chunks}\n\n" |
|
prompt += f"{system_prompt_text}\n\n" |
|
|
|
if history: |
|
for user_prompt, bot_response in history: |
|
prompt += f"[INST] {user_prompt} [/INST]" |
|
prompt += f" {bot_response}</s> " |
|
prompt += f"[INST] {message} [/INST]" |
|
return prompt |
|
|
|
def chat_inf(prompt, history, seed, temp, tokens, top_p, rep_p): |
|
generate_kwargs = dict( |
|
temperature=temp, |
|
max_new_tokens=tokens, |
|
top_p=top_p, |
|
repetition_penalty=rep_p, |
|
do_sample=True, |
|
seed=seed, |
|
) |
|
|
|
formatted_prompt = format_prompt_mixtral(prompt, history, info_md_chunks) |
|
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) |
|
output = "" |
|
for response in stream: |
|
output += response.token.text |
|
yield [(prompt, output)] |
|
history.append((prompt, output)) |
|
yield history |
|
|
|
def clear_fn(): |
|
return None, None |
|
|
|
rand_val = random.randint(1, 1111111111111111) |
|
|
|
def check_rand(inp, val): |
|
if inp: |
|
return gr.Slider(label="Seed", minimum=1, maximum=1111111111111111, value=random.randint(1, 1111111111111111)) |
|
else: |
|
return gr.Slider(label="Seed", minimum=1, maximum=1111111111111111, value=int(val)) |
|
|
|
def recognize_speech(audio): |
|
with sr.AudioFile(audio) as source: |
|
audio_data = recognizer.record(source) |
|
try: |
|
|
|
text = recognizer.recognize_google(audio_data) |
|
return text |
|
except sr.UnknownValueError: |
|
return "Sorry, I could not understand the audio." |
|
except sr.RequestError: |
|
return "Error: Could not request results from the speech recognition service." |
|
|
|
def speak_text(text): |
|
|
|
tts_engine.save_to_file(text, 'output.mp3') |
|
tts_engine.runAndWait() |
|
|
|
with gr.Blocks() as app: |
|
gr.HTML("""<center><h1 style='font-size:xx-large;'>PTT Chatbot</h1><br><h3>running on Huggingface Inference</h3><br><h7>EXPERIMENTAL</center>""") |
|
|
|
with gr.Row(): |
|
chat = gr.Chatbot(height=500) |
|
|
|
with gr.Group(): |
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
inp = gr.Audio(type="filepath") |
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
btn = gr.Button("Chat") |
|
with gr.Column(scale=1): |
|
with gr.Group(): |
|
stop_btn = gr.Button("Stop") |
|
clear_btn = gr.Button("Clear") |
|
with gr.Column(scale=1): |
|
with gr.Group(): |
|
rand = gr.Checkbox(label="Random Seed", value=True) |
|
seed = gr.Slider(label="Seed", minimum=1, maximum=1111111111111111, step=1, value=rand_val) |
|
tokens = gr.Slider(label="Max new tokens", value=3840, minimum=0, maximum=8000, step=64, interactive=True, visible=True, info="The maximum number of tokens") |
|
temp = gr.Slider(label="Temperature", step=0.01, minimum=0.01, maximum=1.0, value=0.9) |
|
top_p = gr.Slider(label="Top-P", step=0.01, minimum=0.01, maximum=1.0, value=0.9) |
|
rep_p = gr.Slider(label="Repetition Penalty", step=0.1, minimum=0.1, maximum=2.0, value=1.0) |
|
|
|
hid1 = gr.Number(value=1, visible=False) |
|
|
|
output_audio = gr.Audio(label="Output Audio", type="filepath", interactive=False) |
|
|
|
def handle_chat(audio_input, chat_history, seed, temp, tokens, top_p, rep_p): |
|
user_message = recognize_speech(audio_input) |
|
if "Sorry" in user_message: |
|
return chat_history, user_message, None |
|
response_gen = chat_inf(user_message, chat_history, seed, temp, tokens, top_p, rep_p) |
|
response = next(response_gen)[0][-1][1] |
|
speak_text(response) |
|
return chat_history + [(user_message, response)], response, 'output.mp3' |
|
|
|
go = btn.click(handle_chat, [inp, chat, seed, temp, tokens, top_p, rep_p], [chat, inp, output_audio]) |
|
|
|
stop_btn.click(None, None, None, cancels=[go]) |
|
clear_btn.click(clear_fn, None, [inp, chat]) |
|
|
|
app.queue(default_concurrency_limit=10).launch(share=True, auth=("admin", "0112358")) |
|
|
|
|