QGen / app.py
DevBM's picture
adding the options to choose between input text and upload pdf
e0e7fd6 verified
raw
history blame
18.5 kB
import streamlit as st
from transformers import T5ForConditionalGeneration, T5Tokenizer
import spacy
import nltk
from sklearn.feature_extraction.text import TfidfVectorizer
from rake_nltk import Rake
import pandas as pd
from fpdf import FPDF
import wikipediaapi
from functools import lru_cache
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('brown')
from nltk.tokenize import sent_tokenize
nltk.download('wordnet')
from nltk.corpus import wordnet
import random
from sense2vec import Sense2Vec
import sense2vec
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import json
import os
from sentence_transformers import SentenceTransformer, util
import textstat
from spellchecker import SpellChecker
from transformers import pipeline
import re
import pymupdf
print("***************************************************************")
st.set_page_config(
page_title="Question Generator",
initial_sidebar_state="auto",
menu_items={
"About" : "#Hi this our project."
}
)
# Initialize Wikipedia API with a user agent
user_agent = 'QGen/1.0 ([email protected])'
wiki_wiki = wikipediaapi.Wikipedia(user_agent= user_agent,language='en')
@st.cache_resource
def load_model():
model_name = "DevBM/t5-large-squad"
model = T5ForConditionalGeneration.from_pretrained(model_name)
tokenizer = T5Tokenizer.from_pretrained(model_name)
return model, tokenizer
# Load Spacy Model
@st.cache_resource
def load_nlp_models():
nlp = spacy.load("en_core_web_md")
s2v = sense2vec.Sense2Vec().from_disk('s2v_old')
return nlp, s2v
# Load Quality Assurance Models
@st.cache_resource
def load_qa_models():
# Initialize BERT model for sentence similarity
similarity_model = SentenceTransformer('all-MiniLM-L6-v2')
spell = SpellChecker()
return similarity_model, spell
nlp, s2v = load_nlp_models()
model, tokenizer = load_model()
similarity_model, spell = load_qa_models()
context_model = similarity_model
def get_pdf_text(pdf_file):
doc = pymupdf.open(stream=pdf_file.read(), filetype="pdf")
text = ""
for page_num in range(doc.page_count):
page = doc.load_page(page_num)
text += page.get_text()
return text
def save_feedback(question, answer,rating):
feedback_file = 'question_feedback.json'
if os.path.exists(feedback_file):
with open(feedback_file, 'r') as f:
feedback_data = json.load(f)
else:
feedback_data = []
tpl = {
'question' : question,
'answer' : answer,
'rating' : rating,
}
# feedback_data[question] = rating
feedback_data.append(tpl)
with open(feedback_file, 'w') as f:
json.dump(feedback_data, f)
# Function to clean text
def clean_text(text):
text = re.sub(r"[^\x00-\x7F]", " ", text)
return text
# Function to create text chunks
def segment_text(text, max_segment_length=1000):
"""Segment the text into smaller chunks."""
sentences = sent_tokenize(text)
segments = []
current_segment = ""
for sentence in sentences:
if len(current_segment) + len(sentence) <= max_segment_length:
current_segment += sentence + " "
else:
segments.append(current_segment.strip())
current_segment = sentence + " "
if current_segment:
segments.append(current_segment.strip())
print(f"\n\nSegement Chunks: {segments}\n\n")
return segments
# Function to extract keywords using combined techniques
def extract_keywords(text, extract_all):
doc = nlp(text)
spacy_keywords = set([ent.text for ent in doc.ents])
spacy_entities = spacy_keywords
print(f"\n\nSpacy Entities: {spacy_entities} \n\n")
# Use Only Spacy Entities
if extract_all is False:
return list(spacy_entities)
# Use RAKE
rake = Rake()
rake.extract_keywords_from_text(text)
rake_keywords = set(rake.get_ranked_phrases())
print(f"\n\nRake Keywords: {rake_keywords} \n\n")
# Use spaCy for NER and POS tagging
spacy_keywords.update([token.text for token in doc if token.pos_ in ["NOUN", "PROPN", "VERB", "ADJ"]])
print(f"\n\nSpacy Keywords: {spacy_keywords} \n\n")
# Use TF-IDF
vectorizer = TfidfVectorizer(stop_words='english')
X = vectorizer.fit_transform([text])
tfidf_keywords = set(vectorizer.get_feature_names_out())
print(f"\n\nTFIDF Entities: {tfidf_keywords} \n\n")
# Combine all keywords
combined_keywords = rake_keywords.union(spacy_keywords).union(tfidf_keywords)
return list(combined_keywords)
def get_similar_words_sense2vec(word, n=3):
# Try to find the word with its most likely part-of-speech
word_with_pos = word + "|NOUN"
if word_with_pos in s2v:
similar_words = s2v.most_similar(word_with_pos, n=n)
return [word.split("|")[0] for word, _ in similar_words]
# If not found, try without POS
if word in s2v:
similar_words = s2v.most_similar(word, n=n)
return [word.split("|")[0] for word, _ in similar_words]
return []
def get_synonyms(word, n=3):
synonyms = []
for syn in wordnet.synsets(word):
for lemma in syn.lemmas():
if lemma.name() != word and lemma.name() not in synonyms:
synonyms.append(lemma.name())
if len(synonyms) == n:
return synonyms
return synonyms
def generate_options(answer, context, n=3):
options = [answer]
# Add contextually relevant words using a pre-trained model
context_embedding = context_model.encode(context)
answer_embedding = context_model.encode(answer)
context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()]
# Compute similarity scores and sort context words
similarity_scores = [util.pytorch_cos_sim(context_model.encode(word), answer_embedding).item() for word in context_words]
sorted_context_words = [word for _, word in sorted(zip(similarity_scores, context_words), reverse=True)]
options.extend(sorted_context_words[:n])
# Try to get similar words based on sense2vec
similar_words = get_similar_words_sense2vec(answer, n)
options.extend(similar_words)
# If we don't have enough options, try synonyms
if len(options) < n + 1:
synonyms = get_synonyms(answer, n - len(options) + 1)
options.extend(synonyms)
# If we still don't have enough options, extract other entities from the context
if len(options) < n + 1:
doc = nlp(context)
entities = [ent.text for ent in doc.ents if ent.text.lower() != answer.lower()]
options.extend(entities[:n - len(options) + 1])
# If we still need more options, add some random words from the context
if len(options) < n + 1:
context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()]
options.extend(random.sample(context_words, min(n - len(options) + 1, len(context_words))))
print(f"\n\nAll Possible Options: {options}\n\n")
# Ensure we have the correct number of unique options
options = list(dict.fromkeys(options))[:n+1]
# Shuffle the options
random.shuffle(options)
return options
# Function to map keywords to sentences with customizable context window size
def map_keywords_to_sentences(text, keywords, context_window_size):
sentences = sent_tokenize(text)
keyword_sentence_mapping = {}
print(f"\n\nSentences: {sentences}\n\n")
for keyword in keywords:
for i, sentence in enumerate(sentences):
if keyword in sentence:
# Combine current sentence with surrounding sentences for context
start = max(0, i - context_window_size)
end = min(len(sentences), i + context_window_size + 1)
context = ' '.join(sentences[start:end])
if keyword not in keyword_sentence_mapping:
keyword_sentence_mapping[keyword] = context
else:
keyword_sentence_mapping[keyword] += ' ' + context
return keyword_sentence_mapping
# Function to perform entity linking using Wikipedia API
@lru_cache(maxsize=128)
def entity_linking(keyword):
page = wiki_wiki.page(keyword)
if page.exists():
return page.fullurl
return None
# Function to generate questions using beam search
def generate_question(context, answer, num_beams):
input_text = f"<context> {context} <answer> {answer}"
input_ids = tokenizer.encode(input_text, return_tensors='pt')
outputs = model.generate(input_ids, num_beams=num_beams, early_stopping=True)
question = tokenizer.decode(outputs[0], skip_special_tokens=True)
return question
# Function to export questions to CSV
def export_to_csv(data):
# df = pd.DataFrame(data, columns=["Context", "Answer", "Question", "Options"])
df = pd.DataFrame(data)
# csv = df.to_csv(index=False,encoding='utf-8')
csv = df.to_csv(index=False)
return csv
# Function to export questions to PDF
def export_to_pdf(data):
pdf = FPDF()
pdf.add_page()
pdf.set_font("Arial", size=12)
for item in data:
pdf.multi_cell(0, 10, f"Context: {item['context']}")
pdf.multi_cell(0, 10, f"Question: {item['question']}")
pdf.multi_cell(0, 10, f"Answer: {item['answer']}")
pdf.multi_cell(0, 10, f"Options: {', '.join(item['options'])}")
pdf.multi_cell(0, 10, f"Overall Score: {item['overall_score']:.2f}")
pdf.ln(10)
return pdf.output(dest='S').encode('latin-1')
def display_word_cloud(generated_questions):
word_frequency = {}
for question in generated_questions:
words = question.split()
for word in words:
word_frequency[word] = word_frequency.get(word, 0) + 1
wordcloud = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(word_frequency)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
st.pyplot()
def assess_question_quality(context, question, answer):
# Assess relevance using cosine similarity
context_doc = nlp(context)
question_doc = nlp(question)
relevance_score = context_doc.similarity(question_doc)
# Assess complexity using token length (as a simple metric)
complexity_score = min(len(question_doc) / 20, 1) # Normalize to 0-1
# Assess Spelling correctness
misspelled = spell.unknown(question.split())
spelling_correctness = 1 - (len(misspelled) / len(question.split())) # Normalize to 0-1
# Calculate overall score (you can adjust weights as needed)
overall_score = (
0.4 * relevance_score +
0.4 * complexity_score +
0.2 * spelling_correctness
)
return overall_score, relevance_score, complexity_score, spelling_correctness
def main():
# Streamlit interface
st.title(":blue[Question Generator System]")
# Initialize session state
if 'generated_questions' not in st.session_state:
st.session_state.generated_questions = []
with st.sidebar:
st.subheader("Customization Options")
# Customization options
input_type = st.radio("Select Input Preference", ("Text Input","Upload PDF"))
num_beams = st.slider("Select number of beams for question generation", min_value=1, max_value=10, value=5)
context_window_size = st.slider("Select context window size (number of sentences before and after)", min_value=1, max_value=5, value=1)
num_questions = st.slider("Select number of questions to generate", min_value=1, max_value=1000, value=5)
with st.expander("Choose the Additional Elements to show"):
show_context = st.checkbox("Context",True)
show_answer = st.checkbox("Answer",True)
show_options = st.checkbox("Options",False)
show_entity_link = st.checkbox("Entity Link For Wikipedia",True)
show_qa_scores = st.checkbox("QA Score",False)
col1, col2 = st.columns(2)
with col1:
extract_all_keywords = st.toggle("Extract Max Keywords",value=False)
with col2:
enable_feedback_mode = st.toggle("Enable Feedback Mode",False)
text = None
if input_type == "Text Input":
text = st.text_area("Enter text here:", value="Joe Biden, the current US president is on a weak wicket going in for his reelection later this November against former President Donald Trump.")
elif input_type == "Upload PDF":
file = st.file_uploader("Upload PDF Files")
if file is not None:
text = get_pdf_text(file)
if text:
text = clean_text(text)
segments = segment_text(text)
generate_questions_button = st.button("Generate Questions")
if generate_questions_button and text:
st.session_state.generated_questions = []
for text in segments:
keywords = extract_keywords(text, extract_all_keywords)
print(f"\n\nFinal Keywords in Main Function: {keywords}\n\n")
keyword_sentence_mapping = map_keywords_to_sentences(text, keywords, context_window_size)
for i, (keyword, context) in enumerate(keyword_sentence_mapping.items()):
if i >= num_questions:
break
question = generate_question(context, keyword, num_beams=num_beams)
options = generate_options(keyword,context)
overall_score, relevance_score, complexity_score, spelling_correctness = assess_question_quality(context,question,keyword)
if overall_score < 0.5:
continue
tpl = {
"question" : question,
"context" : context,
"answer" : keyword,
"options" : options,
"overall_score" : overall_score,
"relevance_score" : relevance_score,
"complexity_score" : complexity_score,
"spelling_correctness" : spelling_correctness,
}
st.session_state.generated_questions.append(tpl)
# sort question based on their quality score
st.session_state.generated_questions = sorted(st.session_state.generated_questions,key = lambda x: x['overall_score'], reverse=True)
# Display generated questions
if st.session_state.generated_questions:
st.header("Generated Questions:",divider='blue')
for i, q in enumerate(st.session_state.generated_questions):
# with st.expander(f"Question {i+1}"):
st.subheader(body=f":orange[Q{i+1}:] {q['question']}")
if show_context is True:
st.write(f"**Context:** {q['context']}")
if show_answer is True:
st.write(f"**Answer:** {q['answer']}")
if show_options is True:
st.write(f"**Options:**")
for j, option in enumerate(q['options']):
st.write(f"{chr(65+j)}. {option}")
if show_entity_link is True:
linked_entity = entity_linking(q['answer'])
if linked_entity:
st.write(f"**Entity Link:** {linked_entity}")
if show_qa_scores is True:
st.write(f"**Overall Quality Score:** {q['overall_score']:.2f}")
st.write(f"**Relevance Score:** {q['relevance_score']:.2f}")
st.write(f"**Complexity Score:** {q['complexity_score']:.2f}")
st.write(f"**Spelling Correctness:** {q['spelling_correctness']:.2f}")
# q['context'] = st.text_area(f"Edit Context {i+1}:", value=q['context'], key=f"context_{i}")
if enable_feedback_mode:
q['question'] = st.text_input(f"Edit Question {i+1}:", value=q['question'], key=f"question_{i}")
q['rating'] = st.selectbox(f"Rate this question (1-5)", options=[1, 2, 3, 4, 5], key=f"rating_{i}")
if st.button(f"Submit Feedback for Question {i+1}", key=f"submit_{i}"):
save_feedback(q['question'], q['answer'], q['rating'])
st.success(f"Feedback submitted for Question {i+1}")
st.write("---")
# Export buttons
if st.session_state.generated_questions:
with st.sidebar:
csv_data = export_to_csv(st.session_state.generated_questions)
st.download_button(label="Download CSV", data=csv_data, file_name='questions.csv', mime='text/csv')
pdf_data = export_to_pdf(st.session_state.generated_questions)
st.download_button(label="Download PDF", data=pdf_data, file_name='questions.pdf', mime='application/pdf')
# View Feedback Statistics
with st.expander("View Feedback Statistics"):
feedback_file = 'question_feedback.json'
if os.path.exists(feedback_file):
with open(feedback_file, 'r') as f:
feedback_data = json.load(f)
st.subheader("Feedback Statistics")
# Calculate average rating
ratings = [feedback['rating'] for feedback in feedback_data]
avg_rating = sum(ratings) / len(ratings) if ratings else 0
st.write(f"Average Question Rating: {avg_rating:.2f}")
# Show distribution of ratings
rating_counts = {i: ratings.count(i) for i in range(1, 6)}
st.bar_chart(rating_counts)
# Show some highly rated questions
st.subheader("Highly Rated Questions")
sorted_feedback = sorted(feedback_data, key=lambda x: x['rating'], reverse=True)
top_questions = sorted_feedback[:5]
for feedback in top_questions:
st.write(f"Question: {feedback['question']}")
st.write(f"Answer: {feedback['answer']}")
st.write(f"Rating: {feedback['rating']}")
st.write("---")
else:
st.write("No feedback data available yet.")
print("********************************************************************************")
if __name__ == '__main__':
main()