Job-Interview / backend2.py
ruslanmv's picture
Upload backend2.py
7552eec verified
raw
history blame
18.5 kB
import gradio as gr
import tempfile
import os
import json
from io import BytesIO
from collections import deque
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain.schema import HumanMessage, SystemMessage
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from openai import OpenAI
import time
# Imports - Keep only what's actually used. I've organized them.
from generatorgr import (
generate_and_save_questions as generate_questions_manager,
update_max_questions,
)
from generator import (
PROFESSIONS_FILE,
TYPES_FILE,
OUTPUT_FILE,
load_json_data,
generate_questions, # Keep if needed, but ensure it exists
)
from splitgpt import (
generate_and_save_questions_from_pdf3,
generate_questions_from_job_description,
)
from ai_config import convert_text_to_speech
from knowledge_retrieval import get_next_response, get_initial_question
from prompt_instructions import get_interview_initial_message_hr
from settings import language
from utils import save_interview_history
from tools import store_interview_report, read_questions_from_json
load_dotenv() # Load .env variables
class InterviewState:
"""Manages the state of the interview."""
def __init__(self):
self.reset()
def reset(self, voice="alloy"):
self.question_count = 0
# Corrected history format: List of [user_msg, bot_msg] pairs.
self.interview_history = []
self.selected_interviewer = voice
self.interview_finished = False
self.audio_enabled = True
self.temp_audio_files = []
self.initial_audio_path = None
self.interview_chain = None
self.report_chain = None
self.current_questions = []
self.history_limit = 5 # Limit the history (good for performance)
def get_voice_setting(self):
return self.selected_interviewer
interview_state = InterviewState()
def initialize_chains():
"""Initializes the LangChain LLM chains."""
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
raise ValueError(
"OpenAI API key not found. Set it in your .env file."
)
llm = ChatOpenAI(
openai_api_key=openai_api_key, model="gpt-4", temperature=0.7, max_tokens=750
)
interview_prompt_template = """
You are Sarah, an empathetic HR interviewer conducting a technical interview in {language}.
Current Question: {current_question}
Previous conversation history:
{history}
User's response to current question: {user_input}
Your response:
"""
interview_prompt = PromptTemplate(
input_variables=["language", "current_question", "history", "user_input"],
template=interview_prompt_template,
)
interview_state.interview_chain = LLMChain(prompt=interview_prompt, llm=llm)
report_prompt_template = """
You are an HR assistant tasked with generating a concise report based on the following interview transcript in {language}:
{interview_transcript}
Summarize the candidate's performance, highlighting strengths and areas for improvement. Keep it to 3-5 sentences.
Report:
"""
report_prompt = PromptTemplate(
input_variables=["language", "interview_transcript"], template=report_prompt_template
)
interview_state.report_chain = LLMChain(prompt=report_prompt, llm=llm)
def generate_report(report_chain, history, language):
"""Generates a concise interview report."""
if report_chain is None:
raise ValueError("Report chain is not initialized.")
# Convert the Gradio-style history to a plain text transcript.
transcript = ""
for user_msg, bot_msg in history:
transcript += f"User: {user_msg}\nAssistant: {bot_msg}\n"
report = report_chain.invoke({"language": language, "interview_transcript": transcript})
return report["text"]
def reset_interview_action(voice):
"""Resets the interview state and prepares the initial message."""
interview_state.reset(voice)
initialize_chains()
print(f"[DEBUG] Interview reset. Voice: {voice}")
initial_message_text = get_interview_initial_message_hr(5) # Get initial message
# Convert to speech and save to a temporary file.
initial_audio_buffer = BytesIO()
convert_text_to_speech(initial_message_text, initial_audio_buffer, voice)
initial_audio_buffer.seek(0)
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
temp_audio_path = temp_file.name
temp_file.write(initial_audio_buffer.getvalue())
interview_state.temp_audio_files.append(temp_audio_path)
print(f"[DEBUG] Audio file saved at {temp_audio_path}")
# Return values in the correct format for Gradio.
return (
[[None, initial_message_text]], # [user_msg, bot_msg]. User starts with None.
gr.Audio(value=temp_audio_path, autoplay=True),
gr.Textbox(interactive=True), # Enable the textbox
)
def start_interview():
"""Starts the interview (used by the Gradio button)."""
return reset_interview_action(interview_state.selected_interviewer)
def construct_history_string(history):
"""Constructs a history string for the LangChain prompt."""
history_str = ""
for user_msg, bot_msg in history:
history_str += f"User: {user_msg}\nAssistant: {bot_msg}\n"
return history_str
def bot_response(chatbot, user_message_text):
"""Handles the bot's response logic."""
voice = interview_state.get_voice_setting()
history_str = construct_history_string(chatbot)
if interview_state.question_count < len(interview_state.current_questions):
current_question = interview_state.current_questions[interview_state.question_count]
response = interview_state.interview_chain.invoke(
{
"language": language,
"current_question": current_question,
"history": history_str,
"user_input": user_message_text,
}
)["text"]
interview_state.question_count += 1
# Text-to-speech
audio_buffer = BytesIO()
convert_text_to_speech(response, audio_buffer, voice)
audio_buffer.seek(0)
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
temp_audio_path = temp_file.name
temp_file.write(audio_buffer.getvalue())
interview_state.temp_audio_files.append(temp_audio_path)
# Update chatbot history in the correct format.
chatbot.append([user_message_text, response]) # Add user and bot messages
return chatbot, gr.Audio(value=temp_audio_path, autoplay=True), gr.File(visible=False)
else: # Interview finished
interview_state.interview_finished = True
conclusion_message = "Thank you for your time. The interview is complete. Please review your report."
# Text-to-speech for conclusion
conclusion_audio_buffer = BytesIO()
convert_text_to_speech(conclusion_message, conclusion_audio_buffer, voice)
conclusion_audio_buffer.seek(0)
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_conclusion_file:
temp_conclusion_audio_path = temp_conclusion_file.name
temp_conclusion_file.write(conclusion_audio_buffer.getvalue())
interview_state.temp_audio_files.append(temp_conclusion_audio_path)
# Update chatbot history.
chatbot.append([user_message_text, conclusion_message])
# Generate and save report.
report_content = generate_report(
interview_state.report_chain, chatbot, language
) # Pass Gradio history
txt_path = save_interview_history(
[f"User: {user}\nAssistant: {bot}" for user, bot in chatbot], language
) # Create plain text history
report_file_path = store_interview_report(report_content)
print(f"[DEBUG] Interview report saved at: {report_file_path}")
return (
chatbot,
gr.Audio(value=temp_conclusion_audio_path, autoplay=True),
gr.File(visible=True, value=txt_path),
)
def convert_text_to_speech_updated(text, voice="alloy"):
"""Converts text to speech and returns the file path."""
try:
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = client.audio.speech.create(model="tts-1", voice=voice, input=text)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
for chunk in response.iter_bytes():
tmp_file.write(chunk)
temp_audio_path = tmp_file.name
return temp_audio_path
except Exception as e:
print(f"Error in text-to-speech: {e}")
return None
def transcribe_audio(audio_file_path):
"""Transcribes audio to text."""
try:
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
with open(audio_file_path, "rb") as audio_file:
transcription = client.audio.transcriptions.create(
model="whisper-1", file=audio_file
)
return transcription.text
except Exception as e:
print(f"Error in transcription: {e}")
return ""
def conduct_interview_updated(questions, language="English", history_limit=5):
"""Conducts the interview (LangChain/OpenAI)."""
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
raise RuntimeError("OpenAI API key not found.")
chat = ChatOpenAI(
openai_api_key=openai_api_key, model="gpt-4o", temperature=0.7, max_tokens=750
)
conversation_history = deque(maxlen=history_limit) # For LangChain, not Gradio
system_prompt = (
f"You are Sarah, an empathetic HR interviewer conducting a technical interview in {language}. "
"Respond to user follow-up questions politely and concisely. Keep responses brief."
)
interview_data = [] # Store Q&A for potential later use
current_question_index = [0]
is_interview_finished = [False]
initial_message = (
"πŸ‘‹ Hi there, I'm Sarah, your friendly AI HR assistant! "
"I'll guide you through a series of interview questions. "
"Take your time."
)
final_message = "That wraps up our interview. Thank you for your responses!"
def interview_step(user_input, audio_input, history):
nonlocal current_question_index, is_interview_finished
if is_interview_finished[0]:
return history, "", None # No further interaction
if audio_input:
user_input = transcribe_audio(audio_input)
if not user_input:
history.append(["", "I couldn't understand your audio. Could you please repeat or type?"]) #Empty string "" so the user input is not None
audio_path = convert_text_to_speech_updated(history[-1][1]) #Access the content
return history, "", audio_path
if user_input.lower() in ["exit", "quit"]:
history.append(["", "The interview has ended. Thank you."])#Empty string "" so the user input is not None
is_interview_finished[0] = True
return history, "", None
# Crucial: Add USER INPUT to history *before* getting bot response.
history.append([user_input, ""]) # Add user input, bot response pending
question_text = questions[current_question_index[0]]
# Prepare history for LangChain (not Gradio chatbot format)
history_content = "\n".join(
[
f"Q: {entry['question']}\nA: {entry['answer']}"
for entry in conversation_history
]
)
combined_prompt = (
f"{system_prompt}\n\nPrevious conversation history:\n{history_content}\n\n"
f"Current question: {question_text}\nUser's input: {user_input}\n\n"
"Respond warmly."
)
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=combined_prompt),
]
response = chat.invoke(messages)
response_content = response.content.strip()
audio_path = convert_text_to_speech_updated(response_content)
conversation_history.append({"question": question_text, "answer": user_input})
interview_data.append({"question": question_text, "answer": user_input})
# Update Gradio-compatible history. Crucial for display.
history[-1][1] = response_content # Update the last entry with the bot's response
if current_question_index[0] + 1 < len(questions):
current_question_index[0] += 1
next_question = f"Next question: {questions[current_question_index[0]]}"
next_question_audio_path = convert_text_to_speech_updated(next_question)
# No need to add the "Next Question:" prompt to the displayed history.
# The bot will say it. Adding it here would cause a double entry.
return history, "", next_question_audio_path
else:
final_message_audio = convert_text_to_speech_updated(final_message)
history.append([None, final_message]) # Final message, no user input.
is_interview_finished[0] = True
return history, "", final_message_audio
return interview_step, initial_message, final_message
def launch_candidate_app_updated():
"""Launches the Gradio app for candidates."""
QUESTIONS_FILE_PATH = "questions.json"
try:
questions = read_questions_from_json(QUESTIONS_FILE_PATH)
if not questions:
raise ValueError("No questions found.")
except (FileNotFoundError, json.JSONDecodeError, ValueError) as e:
print(f"Error loading questions: {e}")
with gr.Blocks() as error_app:
gr.Markdown(f"# Error: {e}")
return error_app
interview_func, initial_message, _ = conduct_interview_updated(questions)
def start_interview_ui():
"""Starts the interview."""
history = []
initial_combined = (
initial_message + " Let's begin! Here's the first question: " + questions[0]
)
initial_audio_path = convert_text_to_speech_updated(initial_combined)
history.append(["", initial_combined]) # Correct format: [user, bot] Empty string for user.
return history, "", initial_audio_path, gr.Textbox(interactive=True) # Return interactive textbox
def clear_interview_ui():
"""Clears the interview and resets."""
# Recreate the object in order to clear the history of the interview
nonlocal interview_func, initial_message
interview_func, initial_message, _ = conduct_interview_updated(questions)
return [], "", None, gr.Textbox(interactive=True) # Return interactive textbox
def interview_step_wrapper(user_response, audio_response, history):
"""Wrapper for the interview step function."""
history, user_text, audio_path = interview_func(user_response, audio_response, history)
# Always return interactive=True, except when interview is finished
return history, "", audio_path, gr.Textbox(value=user_text if user_text is not None else "", interactive= True)
with gr.Blocks(title="AI HR Interview Assistant") as candidate_app:
gr.Markdown(
"<h1 style='text-align: center;'>πŸ‘‹ Welcome to Your AI HR Interview Assistant</h1>"
)
start_btn = gr.Button("Start Interview", variant="primary")
chatbot = gr.Chatbot(label="Interview Chat", height=650)
audio_input = gr.Audio(
sources=["microphone"], type="filepath", label="Record Your Answer"
)
user_input = gr.Textbox(
label="Your Response",
placeholder="Type your answer here or use the microphone...",
lines=1,
interactive=True, # Make the textbox interactive initially
)
audio_output = gr.Audio(label="Response Audio", autoplay=True)
with gr.Row():
submit_btn = gr.Button("Submit", variant="primary")
clear_btn = gr.Button("Clear Chat")
def on_enter_submit(history, user_response):
"""Handles submission when Enter is pressed."""
if not user_response.strip():
return history, "", None, gr.Textbox(interactive=True) # Prevent empty submissions, keep interactive
history, _, audio_path, new_textbox = interview_step_wrapper(
user_response, None, history
) # No audio on Enter
return history, "", audio_path, new_textbox
start_btn.click(
start_interview_ui, inputs=[], outputs=[chatbot, user_input, audio_output, user_input] # Include user_input as output
)
audio_input.stop_recording(
interview_step_wrapper,
inputs=[user_input, audio_input, chatbot],
outputs=[chatbot, user_input, audio_output, user_input], # Include user_input as output
)
submit_btn.click(
interview_step_wrapper,
inputs=[user_input, audio_input, chatbot],
outputs=[chatbot, user_input, audio_output, user_input], # Include user_input
)
user_input.submit(
on_enter_submit,
inputs=[chatbot, user_input],
outputs=[chatbot, user_input, audio_output, user_input], # Include user_input
)
clear_btn.click(
clear_interview_ui, inputs=[], outputs=[chatbot, user_input, audio_output, user_input] # Include user_input
)
return candidate_app
# --- (End of Candidate Interview Implementation) ---
def cleanup():
"""Cleans up temporary audio files."""
for audio_file in interview_state.temp_audio_files:
try:
if os.path.exists(audio_file):
os.unlink(audio_file)
except Exception as e:
print(f"Error deleting file {audio_file}: {e}")