Job-Interview / splitgpt.py
ruslanmv's picture
Upload splitgpt.py
d53ccad verified
import os
import json
from dotenv import load_dotenv
import fitz # PyMuPDF
from langchain_openai import ChatOpenAI # Correct import from langchain-openai
from langchain.schema import HumanMessage, SystemMessage # For creating structured chat messages
QUESTIONS_PATH = "questions.json"
# Load environment variables
load_dotenv()
def split_text_into_chunks(text: str, chunk_size: int) -> list:
"""
Splits the text into chunks of a specified maximum size.
"""
# Trim the text to remove leading/trailing whitespace and reduce multiple spaces to a single space
cleaned_text = " ".join(text.split())
words = cleaned_text.split(" ")
chunks = []
current_chunk = []
current_length = 0
for word in words:
if current_length + len(word) + 1 > chunk_size:
chunks.append(" ".join(current_chunk))
current_chunk = [word]
current_length = len(word)
else:
current_chunk.append(word)
current_length += len(word) + 1
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def distribute_questions_across_chunks(n_chunks: int, n_questions: int) -> list:
"""
Distributes a specified number of questions across a specified number of chunks.
"""
questions_per_chunk = [1] * min(n_chunks, n_questions)
remaining_questions = n_questions - len(questions_per_chunk)
if remaining_questions > 0:
for i in range(len(questions_per_chunk)):
if remaining_questions == 0:
break
questions_per_chunk[i] += 1
remaining_questions -= 1
while len(questions_per_chunk) < n_chunks:
questions_per_chunk.append(0)
return questions_per_chunk
def extract_text_from_pdf(pdf_path):
text = ""
try:
print(f"[DEBUG] Opening PDF: {pdf_path}")
with fitz.open(pdf_path) as pdf:
print(f"[DEBUG] Extracting text from PDF: {pdf_path}")
for page in pdf:
text += page.get_text()
except Exception as e:
print(f"Error reading PDF: {e}")
raise RuntimeError("Unable to extract text from PDF.")
return text
def generate_questions_from_text(text, n_questions=5):
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
raise RuntimeError(
"OpenAI API key not found. Please add it to your .env file as OPENAI_API_KEY."
)
chat = ChatOpenAI(
openai_api_key=openai_api_key, model="gpt-4", temperature=0.7, max_tokens=750
)
messages = [
SystemMessage(
content="You are an expert interviewer who generates concise technical interview questions. Do not enumerate the questions. Answer only with questions."
),
HumanMessage(
content=f"Based on the following content, generate {n_questions} technical interview questions:\n{text}"
),
]
try:
print(f"[DEBUG] Sending request to OpenAI with {n_questions} questions.")
response = chat.invoke(messages)
questions = response.content.strip().split("\n\n")
questions = [q.strip() for q in questions if q.strip()]
print(f"[DEBUG] Raw questions from LLM: {questions}")
formatted_questions = []
for i, q in enumerate(questions):
formatted_questions.append(f"Question {i+1}: {q}")
print(f"[DEBUG] Formatted questions: {formatted_questions}")
return formatted_questions
except Exception as e:
print(f"[ERROR] Failed to generate questions: {e}")
return ["An error occurred while generating questions."]
def save_questions(questions):
with open(QUESTIONS_PATH, "w") as f:
json.dump(questions, f, indent=4)
import os
import json
import re
def generate_and_save_questions_from_pdf3(pdf_path, total_questions=5):
print(f"[INFO] Generating questions from PDF: {pdf_path}")
print(f"[DEBUG] Number of total questions to generate: {total_questions}")
if not os.path.exists(pdf_path):
yield "❌ Error: PDF file not found.", []
return
yield "πŸ“„ PDF uploaded successfully. Processing started...", []
try:
# 1. Extract text from the PDF
pdf_text = extract_text_from_pdf(pdf_path)
if not pdf_text.strip():
yield "❌ Error: The PDF content is empty or could not be read.", []
return
# 2. Split the PDF content into chunks
chunk_size = 2000 # Adjust as necessary
chunks = split_text_into_chunks(pdf_text, chunk_size)
n_chunks = len(chunks)
yield f"πŸ”„ Splitting text into {n_chunks} chunks...", []
# 3. Distribute total_questions evenly across the chunks
base = total_questions // n_chunks
remainder = total_questions % n_chunks
questions_per_chunk = [base] * n_chunks
for i in range(remainder):
questions_per_chunk[i] += 1
print(f"[DEBUG] Questions per chunk distribution: {questions_per_chunk}")
combined_questions = []
# Helper function to split any chunk's output into individual questions
def split_into_individual_questions(text_block):
"""
Attempts to split a text block that might contain multiple questions
(like '1. Some question? 2. Another question?') into separate items.
"""
# 1) Remove any "Question X:" prefix (e.g., "Question 1: ")
text_block = re.sub(r'Question\s*\d+\s*:\s*', '', text_block, flags=re.IGNORECASE)
# 2) Split on patterns like "1. Something", "2. Something"
# This looks for one or more digits, then a dot, then whitespace: "(\d+\.\s+)"
splitted = re.split(r'\d+\.\s+', text_block.strip())
# 3) Clean up and filter out empty items
splitted = [s.strip() for s in splitted if s.strip()]
return splitted
# 4. Process each chunk and generate questions
for i, (chunk, n_questions) in enumerate(zip(chunks, questions_per_chunk)):
yield f"πŸ”„ Processing chunk {i+1} of {n_chunks} with {n_questions} questions...", []
if n_questions > 0:
# This function returns either a list of questions or a single string with multiple questions
questions_output = generate_questions_from_text(chunk, n_questions=n_questions)
if isinstance(questions_output, list):
# If it's already a list, we further ensure each item is split if needed
for item in questions_output:
combined_questions.extend(split_into_individual_questions(str(item)))
else:
# If it's a single string, we split it
combined_questions.extend(split_into_individual_questions(str(questions_output)))
# 5. Check if the number of generated questions matches the desired total
if len(combined_questions) != total_questions:
yield f"⚠️ Warning: Expected {total_questions}, but generated {len(combined_questions)}.", []
yield f"βœ… Total {len(combined_questions)} questions generated. Saving questions...", []
# 6. Save the combined questions in `generated_questions_from_pdf.json`
detailed_save_path = "generated_questions_from_pdf.json"
with open(detailed_save_path, "w", encoding="utf-8") as f:
json.dump({"questions": combined_questions}, f, indent=4, ensure_ascii=False)
# 7. Save only the questions (overwrite `questions.json` if it already exists)
#simple_save_path = "questions.json"
#with open(simple_save_path, "w", encoding="utf-8") as f:
# json.dump(combined_questions, f, indent=4, ensure_ascii=False)
save_questions(combined_questions)
print(f"[INFO] Questions saved to {QUESTIONS_PATH}")
yield "βœ… PDF processing complete. Questions saved successfully!", combined_questions
except Exception as e:
error_message = f"❌ Error during question generation: {str(e)}"
print(f"[ERROR] {error_message}")
yield error_message, []
def generate_questions_from_job_description_old(job_description, num_questions):
print(f"[DEBUG] Generating {num_questions} questions from job description.")
if not job_description.strip():
return "❌ Error: Job description is empty.", []
try:
questions = generate_questions_from_text(job_description, num_questions=num_questions)
if not questions:
return "❌ Error: No questions generated.", []
return "βœ… Questions generated successfully!", questions
except Exception as e:
error_message = f"❌ Error during question generation: {str(e)}"
print(f"[ERROR] {error_message}")
return error_message, []
import os
import json
import math
import re
import os
import json
import math
import re
def distribute_questions_evenly(total_questions, n_chunks):
base = total_questions // n_chunks
remainder = total_questions % n_chunks
questions_per_chunk = [base] * n_chunks
# Distribute the remainder by incrementing the first `remainder` chunks
for i in range(remainder):
questions_per_chunk[i] += 1
return questions_per_chunk
def generate_questions_from_job_description(job_description, total_questions=5):
print(f"[DEBUG] Generating {total_questions} questions from job description.")
if not job_description.strip():
return "❌ Error: Job description is empty.", []
try:
# 1. Split the job description into chunks
chunk_size = 2000 # Adjust as necessary
chunks = split_text_into_chunks(job_description, chunk_size)
n_chunks = len(chunks)
print(f"[DEBUG] Splitting text into {n_chunks} chunks...")
# 2. Distribute total_questions evenly across the chunks
questions_per_chunk = distribute_questions_evenly(total_questions, n_chunks)
print(f"[DEBUG] Questions per chunk distribution: {questions_per_chunk}")
combined_questions = []
# Helper function to split any chunk's output into individual questions
def split_into_individual_questions(text_block):
"""
Attempts to split a text block that might contain multiple questions
(like '1. Some question? 2. Another question?') into separate items.
"""
# Remove any "Question X:" prefix (e.g., "Question 1: ")
text_block = re.sub(r'Question\s*\d+\s*:\s*', '', text_block, flags=re.IGNORECASE)
# Split on patterns like "1. Something", "2. Something"
splitted = re.split(r'\d+\.\s+', text_block.strip())
# Clean up and filter out empty items
return [s.strip() for s in splitted if s.strip()]
# 3. Process each chunk and generate questions
for i, (chunk, n_questions) in enumerate(zip(chunks, questions_per_chunk)):
print(f"[DEBUG] Processing chunk {i+1} of {n_chunks} with {n_questions} questions...")
if n_questions > 0:
questions_output = generate_questions_from_text(chunk, n_questions=n_questions)
if isinstance(questions_output, list):
for item in questions_output:
combined_questions.extend(split_into_individual_questions(str(item)))
else:
combined_questions.extend(split_into_individual_questions(str(questions_output)))
if len(combined_questions) != total_questions:
print(f"⚠️ Warning: Expected {total_questions}, but generated {len(combined_questions)}.")
print(f"βœ… Total {len(combined_questions)} questions generated. Saving questions...")
# Save the combined questions in `generated_questions_from_job_description.json`
detailed_save_path = "generated_questions_from_job_description.json"
with open(detailed_save_path, "w", encoding="utf-8") as f:
json.dump({"questions": combined_questions}, f, indent=4, ensure_ascii=False)
# Save only the questions (overwrite `questions.json` if it already exists)
#simple_save_path = "questions.json"
#with open(simple_save_path, "w", encoding="utf-8") as f:
# json.dump(combined_questions, f, indent=4, ensure_ascii=False)
save_questions(combined_questions)
print(f"[INFO] Questions saved to {QUESTIONS_PATH}")
return "βœ… Job description processing complete. Questions saved successfully!", combined_questions
except Exception as e:
error_message = f"❌ Error during question generation: {str(e)}"
print(f"[ERROR] {error_message}")
return error_message, []
if __name__ == "__main__":
pdf_path = "professional_machine_learning_engineer_exam_guide_english.pdf" # Replace with your PDF path
try:
# Using the generator to get the results
for status, questions in generate_and_save_questions_from_pdf3(pdf_path, total_questions=5):
print(status) # Print the status message
if questions:
print(json.dumps(questions, indent=2)) # Print the questions if available
except Exception as e:
print(f"Failed to generate questions: {e}")