Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import gradio as gr | |
import numpy as np | |
import os | |
import time | |
from itertools import chain | |
from typing import List, Dict, Generator, Optional, Tuple, Any | |
from functools import partial | |
from resources.data import fixed_messages, topic_lists, interview_types | |
from utils.ui import add_candidate_message, add_interviewer_message | |
from api.llm import LLMManager | |
from api.audio import TTSManager, STTManager | |
DEMO_MESSAGE: str = """<span style="color: red;"> | |
This service is running in demo mode with limited performance (e.g. slow voice recognition). For a better experience, run the service locally, refer to the Instruction tab for more details. | |
</span>""" | |
def send_request( | |
code: str, | |
previous_code: str, | |
chat_history: List[Dict[str, str]], | |
chat_display: List[List[Optional[str]]], | |
llm: LLMManager, | |
tts: Optional[TTSManager], | |
silent: Optional[bool] = False, | |
) -> Generator[Tuple[List[Dict[str, str]], List[List[Optional[str]]], str, bytes], None, None]: | |
""" | |
Send a request to the LLM and process the response. | |
Args: | |
code (str): Current code. | |
previous_code (str): Previous code. | |
chat_history (List[Dict[str, str]]): Current chat history. | |
chat_display (List[List[Optional[str]]]): Current chat display. | |
llm (LLMManager): LLM manager instance. | |
tts (Optional[TTSManager]): TTS manager instance. | |
silent (Optional[bool]): Whether to silence audio output. Defaults to False. | |
Yields: | |
Tuple[List[Dict[str, str]], List[List[Optional[str]]], str, bytes]: Updated chat history, chat display, code, and audio chunk. | |
""" | |
# TODO: Find the way to simplify it and remove duplication in logic | |
if silent is None: | |
silent = os.getenv("SILENT", False) | |
if chat_display[-1][0] is None and code == previous_code: | |
yield chat_history, chat_display, code, b"" | |
return | |
chat_history = llm.update_chat_history(code, previous_code, chat_history, chat_display) | |
original_len = len(chat_display) | |
chat_display.append([None, ""]) | |
text_chunks = [] | |
reply = llm.get_text(chat_history) | |
chat_history.append({"role": "assistant", "content": ""}) | |
audio_generator = iter(()) | |
has_text_item = True | |
has_audio_item = not silent | |
audio_created = 0 | |
is_notes = False | |
while has_text_item or has_audio_item: | |
try: | |
text_chunk = next(reply) | |
text_chunks.append(text_chunk) | |
has_text_item = True | |
except StopIteration: | |
has_text_item = False | |
chat_history[-1]["content"] = "".join(text_chunks) | |
if silent: | |
audio_chunk = b"" | |
else: | |
try: | |
audio_chunk = next(audio_generator) | |
has_audio_item = True | |
except StopIteration: | |
audio_chunk = b"" | |
has_audio_item = False | |
if has_text_item and not is_notes: | |
last_message = chat_display[-1][1] | |
last_message += text_chunk | |
split_notes = last_message.split("#NOTES#") | |
if len(split_notes) > 1: | |
is_notes = True | |
last_message = split_notes[0] | |
split_messages = last_message.split("\n\n") | |
chat_display[-1][1] = split_messages[0] | |
for m in split_messages[1:]: | |
chat_display.append([None, m]) | |
if not silent: | |
if len(chat_display) - original_len > audio_created + has_text_item: | |
audio_generator = chain(audio_generator, tts.read_text(chat_display[original_len + audio_created][1])) | |
audio_created += 1 | |
has_audio_item = True | |
yield chat_history, chat_display, code, audio_chunk | |
if chat_display and len(chat_display) > 1 and chat_display[-1][1] == "" and chat_display[-2][1]: | |
chat_display.pop() | |
yield chat_history, chat_display, code, b"" | |
def change_code_area(interview_type: str) -> gr.update: | |
""" | |
Update the code area based on the interview type. | |
Args: | |
interview_type (str): Type of interview. | |
Returns: | |
gr.update: Gradio update object for the code area. | |
""" | |
if interview_type == "coding": | |
return gr.update( | |
label="Please write your code here. You can use any language, but only Python syntax highlighting is available.", | |
language="python", | |
) | |
elif interview_type == "sql": | |
return gr.update( | |
label="Please write your query here.", | |
language="sql", | |
) | |
else: | |
return gr.update( | |
label="Please write any notes for your solution here.", | |
language=None, | |
) | |
def get_problem_solving_ui( | |
llm: LLMManager, tts: TTSManager, stt: STTManager, default_audio_params: Dict[str, Any], audio_output: gr.Audio | |
) -> gr.Tab: | |
""" | |
Create the problem-solving UI for the interview application. | |
Args: | |
llm (LLMManager): LLM manager instance. | |
tts (TTSManager): TTS manager instance. | |
stt (STTManager): STT manager instance. | |
default_audio_params (Dict[str, Any]): Default audio parameters. | |
audio_output (gr.Audio): Gradio audio output component. | |
Returns: | |
gr.Tab: Gradio tab containing the problem-solving UI. | |
""" | |
send_request_partial = partial(send_request, llm=llm, tts=tts) | |
with gr.Tab("Interview", render=False, elem_id=f"tab") as problem_tab: | |
if os.getenv("IS_DEMO"): | |
gr.Markdown(DEMO_MESSAGE) | |
chat_history = gr.State([]) | |
previous_code = gr.State("") | |
start_time = gr.State(None) | |
hi_markdown = gr.Markdown( | |
"<h2 style='text-align: center;'> Hi! I'm here to guide you through a practice session for your technical interview. Choose the interview settings to begin.</h2>\n" | |
) | |
# UI components for interview settings | |
with gr.Row() as init_acc: | |
with gr.Column(scale=3): | |
interview_type_select = gr.Dropdown( | |
show_label=False, | |
info="Type of the interview.", | |
choices=interview_types, | |
value="coding", | |
container=True, | |
allow_custom_value=False, | |
elem_id=f"interview_type_select", | |
scale=2, | |
) | |
difficulty_select = gr.Dropdown( | |
show_label=False, | |
info="Difficulty of the problem.", | |
choices=["Easy", "Medium", "Hard"], | |
value="Medium", | |
container=True, | |
allow_custom_value=True, | |
elem_id=f"difficulty_select", | |
scale=2, | |
) | |
topic_select = gr.Dropdown( | |
show_label=False, | |
info="Topic (you can type any value).", | |
choices=topic_lists[interview_type_select.value], | |
value=np.random.choice(topic_lists[interview_type_select.value]), | |
container=True, | |
allow_custom_value=True, | |
elem_id=f"topic_select", | |
scale=2, | |
) | |
with gr.Column(scale=4): | |
requirements = gr.Textbox( | |
label="Requirements", | |
show_label=False, | |
placeholder="Specify additional requirements if any.", | |
container=False, | |
lines=5, | |
elem_id=f"requirements", | |
) | |
with gr.Row(): | |
terms_checkbox = gr.Checkbox( | |
label="", | |
container=False, | |
value=not os.getenv("IS_DEMO", False), | |
interactive=True, | |
elem_id=f"terms_checkbox", | |
min_width=20, | |
) | |
with gr.Column(scale=100): | |
gr.Markdown( | |
"#### I agree to the [terms and conditions](https://github.com/IliaLarchenko/Interviewer?tab=readme-ov-file#important-legal-and-compliance-information)" | |
) | |
start_btn = gr.Button("Generate a problem", elem_id=f"start_btn", interactive=not os.getenv("IS_DEMO", False)) | |
# Problem statement and solution components | |
with gr.Accordion("Problem statement", open=True, visible=False) as problem_acc: | |
description = gr.Markdown(elem_id=f"problem_description", line_breaks=True) | |
with gr.Accordion("Solution", open=True, visible=False) as solution_acc: | |
with gr.Row() as content: | |
with gr.Column(scale=2): | |
code = gr.Code( | |
label="Please write your code here.", | |
language="python", | |
lines=46, | |
elem_id=f"code", | |
) | |
with gr.Column(scale=1): | |
end_btn = gr.Button("Finish the interview", interactive=False, variant="stop", elem_id=f"end_btn") | |
chat = gr.Chatbot(label="Chat", show_label=False, show_share_button=False, elem_id=f"chat") | |
audio_input = gr.Audio(interactive=False, **default_audio_params, elem_id=f"audio_input") | |
audio_buffer = gr.State(np.array([], dtype=np.int16)) | |
audio_to_transcribe = gr.State(np.array([], dtype=np.int16)) | |
with gr.Accordion("Feedback", open=True, visible=False) as feedback_acc: | |
interview_time = gr.Markdown() | |
feedback = gr.Markdown(elem_id=f"feedback", line_breaks=True) | |
# Event handlers | |
def start_timer(): | |
return time.time() | |
def get_duration_string(start_time): | |
if start_time is None: | |
duration_str = "" | |
else: | |
duration = int(time.time() - start_time) | |
minutes, seconds = divmod(duration, 60) | |
duration_str = f"Interview duration: {minutes} minutes, {seconds} seconds" | |
return duration_str | |
start_btn.click(fn=start_timer, outputs=[start_time]).success( | |
fn=add_interviewer_message(fixed_messages["start"]), inputs=[chat], outputs=[chat] | |
).success(fn=tts.read_last_message, inputs=[chat], outputs=[audio_output]).success( | |
fn=lambda: ( | |
gr.update(visible=False), | |
gr.update(interactive=False), | |
gr.update(interactive=False), | |
gr.update(interactive=False), | |
gr.update(visible=False), | |
), | |
outputs=[init_acc, start_btn, terms_checkbox, interview_type_select, hi_markdown], | |
).success( | |
fn=lambda: (gr.update(visible=True)), | |
outputs=[problem_acc], | |
).success( | |
fn=llm.get_problem, | |
inputs=[requirements, difficulty_select, topic_select, interview_type_select], | |
outputs=[description], | |
scroll_to_output=True, | |
).success( | |
fn=llm.init_bot, inputs=[description, interview_type_select], outputs=[chat_history] | |
).success( | |
fn=lambda: (gr.update(visible=True), gr.update(interactive=True), gr.update(interactive=True)), | |
outputs=[solution_acc, end_btn, audio_input], | |
) | |
end_btn.click(fn=lambda x: add_candidate_message("Let's stop here.", x), inputs=[chat], outputs=[chat]).success( | |
fn=add_interviewer_message(fixed_messages["end"]), | |
inputs=[chat], | |
outputs=[chat], | |
).success(fn=tts.read_last_message, inputs=[chat], outputs=[audio_output]).success( | |
fn=lambda: ( | |
gr.update(open=False), | |
gr.update(interactive=False), | |
gr.update(open=False), | |
gr.update(interactive=False), | |
), | |
outputs=[solution_acc, end_btn, problem_acc, audio_input], | |
).success( | |
fn=lambda: (gr.update(visible=True)), | |
outputs=[feedback_acc], | |
).success( | |
fn=llm.end_interview, inputs=[description, chat_history, interview_type_select], outputs=[feedback] | |
).success( | |
fn=get_duration_string, inputs=[start_time], outputs=[interview_time] | |
) | |
hidden_text = gr.State("") | |
is_transcribing = gr.State(False) | |
audio_input.stream( | |
stt.process_audio_chunk, | |
inputs=[audio_input, audio_buffer], | |
outputs=[audio_buffer, audio_to_transcribe], | |
).success(fn=lambda: True, outputs=[is_transcribing]).success( | |
fn=stt.transcribe_audio, inputs=[audio_to_transcribe, hidden_text], outputs=[hidden_text] | |
).success( | |
fn=stt.add_to_chat, inputs=[hidden_text, chat], outputs=[chat] | |
).success( | |
fn=lambda: False, outputs=[is_transcribing] | |
) | |
# We need to wait until the last chunk of audio is transcribed before sending the request | |
# I didn't find a native way of gradio to handle this, and used a workaround | |
WAIT_TIME = 3 | |
TIME_STEP = 0.3 | |
STEPS = int(WAIT_TIME / TIME_STEP) | |
stop_audio_recording = audio_input.stop_recording(fn=lambda: gr.update(visible=False), outputs=[audio_input]) | |
for _ in range(STEPS): | |
stop_audio_recording = stop_audio_recording.success(fn=lambda x: time.sleep(TIME_STEP) if x else None, inputs=[is_transcribing]) | |
stop_audio_recording.success( | |
fn=send_request_partial, | |
inputs=[code, previous_code, chat_history, chat], | |
outputs=[chat_history, chat, previous_code, audio_output], | |
show_progress="full", | |
).then(fn=lambda: (np.array([], dtype=np.int16), "", False), outputs=[audio_buffer, hidden_text, is_transcribing]).then( | |
fn=lambda: gr.update(visible=True), outputs=[audio_input] | |
) | |
interview_type_select.change( | |
fn=lambda x: gr.update(choices=topic_lists[x], value=np.random.choice(topic_lists[x])), | |
inputs=[interview_type_select], | |
outputs=[topic_select], | |
).success(fn=change_code_area, inputs=[interview_type_select], outputs=[code]) | |
terms_checkbox.change(fn=lambda x: gr.update(interactive=x), inputs=[terms_checkbox], outputs=[start_btn]) | |
return problem_tab | |