import streamlit as st from transformers import T5ForConditionalGeneration, T5Tokenizer import spacy import nltk from sklearn.feature_extraction.text import TfidfVectorizer from rake_nltk import Rake import pandas as pd from fpdf import FPDF import wikipediaapi from functools import lru_cache nltk.download('punkt') nltk.download('stopwords') nltk.download('brown') from nltk.tokenize import sent_tokenize nltk.download('wordnet') from nltk.corpus import wordnet import random from sense2vec import Sense2Vec import sense2vec from wordcloud import WordCloud import matplotlib.pyplot as plt import json import os from sentence_transformers import SentenceTransformer, util import textstat from spellchecker import SpellChecker from transformers import pipeline print("***************************************************************") st.set_page_config( page_title="Question Generator", initial_sidebar_state="auto", ) # Initialize Wikipedia API with a user agent user_agent = 'QGen/1.0 (channingfisher7@gmail.com)' wiki_wiki = wikipediaapi.Wikipedia(user_agent= user_agent,language='en') @st.cache_resource def load_model(): model_name = "DevBM/t5-large-squad" model = T5ForConditionalGeneration.from_pretrained(model_name) tokenizer = T5Tokenizer.from_pretrained(model_name) return model, tokenizer # Load Spacy Model @st.cache_resource def load_nlp_models(): nlp = spacy.load("en_core_web_md") s2v = sense2vec.Sense2Vec().from_disk('s2v_old') return nlp, s2v # Load Quality Assurance Models @st.cache_resource def load_qa_models(): # Initialize BERT model for sentence similarity similarity_model = SentenceTransformer('all-MiniLM-L6-v2') spell = SpellChecker() return similarity_model, spell nlp, s2v = load_nlp_models() model, tokenizer = load_model() similarity_model, spell = load_qa_models() def save_feedback(question, answer,rating): feedback_file = 'question_feedback.json' if os.path.exists(feedback_file): with open(feedback_file, 'r') as f: feedback_data = json.load(f) else: feedback_data = [] tpl = { 'question' : question, 'answer' : answer, 'rating' : rating, } # feedback_data[question] = rating feedback_data.append(tpl) with open(feedback_file, 'w') as f: json.dump(feedback_data, f) # Function to extract keywords using combined techniques def extract_keywords(text, extract_all): doc = nlp(text) spacy_keywords = set([ent.text for ent in doc.ents]) spacy_entities = spacy_keywords print(f"\n\nSpacy Entities: {spacy_entities} \n\n") # Use Only Spacy Entities if extract_all is False: return list(spacy_entities) # Use RAKE rake = Rake() rake.extract_keywords_from_text(text) rake_keywords = set(rake.get_ranked_phrases()) print(f"\n\nRake Keywords: {rake_keywords} \n\n") # Use spaCy for NER and POS tagging spacy_keywords.update([token.text for token in doc if token.pos_ in ["NOUN", "PROPN", "VERB", "ADJ"]]) print(f"\n\nSpacy Keywords: {spacy_keywords} \n\n") # Use TF-IDF vectorizer = TfidfVectorizer(stop_words='english') X = vectorizer.fit_transform([text]) tfidf_keywords = set(vectorizer.get_feature_names_out()) print(f"\n\nTFIDF Entities: {tfidf_keywords} \n\n") # Combine all keywords combined_keywords = rake_keywords.union(spacy_keywords).union(tfidf_keywords) return list(combined_keywords) def get_similar_words_sense2vec(word, n=3): # Try to find the word with its most likely part-of-speech word_with_pos = word + "|NOUN" if word_with_pos in s2v: similar_words = s2v.most_similar(word_with_pos, n=n) return [word.split("|")[0] for word, _ in similar_words] # If not found, try without POS if word in s2v: similar_words = s2v.most_similar(word, n=n) return [word.split("|")[0] for word, _ in similar_words] return [] def get_synonyms(word, n=3): synonyms = [] for syn in wordnet.synsets(word): for lemma in syn.lemmas(): if lemma.name() != word and lemma.name() not in synonyms: synonyms.append(lemma.name()) if len(synonyms) == n: return synonyms return synonyms def generate_options(answer, context, n=3): options = [answer] # Try to get similar words based on sense2vec similar_words = get_similar_words_sense2vec(answer, n) options.extend(similar_words) # If we don't have enough options, try synonyms if len(options) < n + 1: synonyms = get_synonyms(answer, n - len(options) + 1) options.extend(synonyms) # If we still don't have enough options, extract other entities from the context if len(options) < n + 1: doc = nlp(context) entities = [ent.text for ent in doc.ents if ent.text.lower() != answer.lower()] options.extend(entities[:n - len(options) + 1]) # If we still need more options, add some random words from the context if len(options) < n + 1: context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] options.extend(random.sample(context_words, min(n - len(options) + 1, len(context_words)))) # Ensure we have the correct number of unique options options = list(dict.fromkeys(options))[:n+1] # Shuffle the options random.shuffle(options) return options # Function to map keywords to sentences with customizable context window size def map_keywords_to_sentences(text, keywords, context_window_size): sentences = sent_tokenize(text) keyword_sentence_mapping = {} for keyword in keywords: for i, sentence in enumerate(sentences): if keyword in sentence: # Combine current sentence with surrounding sentences for context start = max(0, i - context_window_size) end = min(len(sentences), i + context_window_size + 1) context = ' '.join(sentences[start:end]) if keyword not in keyword_sentence_mapping: keyword_sentence_mapping[keyword] = context else: keyword_sentence_mapping[keyword] += ' ' + context return keyword_sentence_mapping # Function to perform entity linking using Wikipedia API @lru_cache(maxsize=128) def entity_linking(keyword): page = wiki_wiki.page(keyword) if page.exists(): return page.fullurl return None # Function to generate questions using beam search def generate_question(context, answer, num_beams): input_text = f" {context} {answer}" input_ids = tokenizer.encode(input_text, return_tensors='pt') outputs = model.generate(input_ids, num_beams=num_beams, early_stopping=True) question = tokenizer.decode(outputs[0], skip_special_tokens=True) return question # Function to export questions to CSV def export_to_csv(data): # df = pd.DataFrame(data, columns=["Context", "Answer", "Question", "Options"]) df = pd.DataFrame(data) # csv = df.to_csv(index=False,encoding='utf-8') csv = df.to_csv(index=False) return csv # Function to export questions to PDF def export_to_pdf(data): pdf = FPDF() pdf.add_page() pdf.set_font("Arial", size=12) for item in data: pdf.multi_cell(0, 10, f"Context: {item['context']}") pdf.multi_cell(0, 10, f"Question: {item['question']}") pdf.multi_cell(0, 10, f"Answer: {item['answer']}") pdf.multi_cell(0, 10, f"Options: {', '.join(item['options'])}") pdf.multi_cell(0, 10, f"Overall Score: {item['overall_score']:.2f}") pdf.ln(10) return pdf.output(dest='S').encode('latin-1') def display_word_cloud(generated_questions): word_frequency = {} for question in generated_questions: words = question.split() for word in words: word_frequency[word] = word_frequency.get(word, 0) + 1 wordcloud = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(word_frequency) plt.figure(figsize=(10, 5)) plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') st.pyplot() def assess_question_quality(context, question, answer): # Assess relevance using cosine similarity context_doc = nlp(context) question_doc = nlp(question) relevance_score = context_doc.similarity(question_doc) # Assess complexity using token length (as a simple metric) complexity_score = min(len(question_doc) / 20, 1) # Normalize to 0-1 # Assess Spelling correctness misspelled = spell.unknown(question.split()) spelling_correctness = 1 - (len(misspelled) / len(question.split())) # Normalize to 0-1 # Calculate overall score (you can adjust weights as needed) overall_score = ( 0.4 * relevance_score + 0.4 * complexity_score + 0.2 * spelling_correctness ) return overall_score, relevance_score, complexity_score, spelling_correctness def main(): # Streamlit interface st.title(":blue[Question Generator System]") # Initialize session state if 'generated_questions' not in st.session_state: st.session_state.generated_questions = [] text = st.text_area("Enter text here:", value="Joe Biden, the current US president is on a weak wicket going in for his reelection later this November against former President Donald Trump.") with st.sidebar: st.subheader("Customization Options") # Customization options num_beams = st.slider("Select number of beams for question generation", min_value=1, max_value=10, value=5) context_window_size = st.slider("Select context window size (number of sentences before and after)", min_value=1, max_value=5, value=1) num_questions = st.slider("Select number of questions to generate", min_value=1, max_value=1000, value=5) with st.expander("Choose the Additional Elements to show"): show_context = st.checkbox("Context",True) show_answer = st.checkbox("Answer",True) show_options = st.checkbox("Options",False) show_entity_link = st.checkbox("Entity Link For Wikipedia",True) show_qa_scores = st.checkbox("QA Score",False) col1, col2 = st.columns(2) with col1: extract_all_keywords = st.toggle("Extract Max Keywords",value=False) with col2: enable_feedback_mode = st.toggle("Enable Feedback Mode",False) generate_questions_button = st.button("Generate Questions") if generate_questions_button and text: st.session_state.generated_questions = [] keywords = extract_keywords(text, extract_all_keywords) print(f"\n\nFinal Keywords in Main Function: {keywords}\n\n") keyword_sentence_mapping = map_keywords_to_sentences(text, keywords, context_window_size) for i, (keyword, context) in enumerate(keyword_sentence_mapping.items()): if i >= num_questions: break question = generate_question(context, keyword, num_beams=num_beams) options = generate_options(keyword,context) overall_score, relevance_score, complexity_score, spelling_correctness = assess_question_quality(context,question,keyword) tpl = { "question" : question, "context" : context, "answer" : keyword, "options" : options, "overall_score" : overall_score, "relevance_score" : relevance_score, "complexity_score" : complexity_score, "spelling_correctness" : spelling_correctness, } st.session_state.generated_questions.append(tpl) # Display generated questions if st.session_state.generated_questions: st.header("Generated Questions:",divider='blue') for i, q in enumerate(st.session_state.generated_questions): # with st.expander(f"Question {i+1}"): st.subheader(body=f":orange[Q{i+1}:] {q['question']}") if show_context is True: st.write(f"**Context:** {q['context']}") if show_answer is True: st.write(f"**Answer:** {q['answer']}") if show_options is True: st.write(f"**Options:**") for j, option in enumerate(q['options']): st.write(f"{chr(65+j)}. {option}") if show_entity_link is True: linked_entity = entity_linking(q['answer']) if linked_entity: st.write(f"**Entity Link:** {linked_entity}") if show_qa_scores is True: st.write(f"**Overall Quality Score:** {q['overall_score']:.2f}") st.write(f"**Relevance Score:** {q['relevance_score']:.2f}") st.write(f"**Complexity Score:** {q['complexity_score']:.2f}") st.write(f"**Spelling Correctness:** {q['spelling_correctness']:.2f}") # q['context'] = st.text_area(f"Edit Context {i+1}:", value=q['context'], key=f"context_{i}") if enable_feedback_mode: q['question'] = st.text_input(f"Edit Question {i+1}:", value=q['question'], key=f"question_{i}") q['rating'] = st.selectbox(f"Rate this question (1-5)", options=[1, 2, 3, 4, 5], key=f"rating_{i}") if st.button(f"Submit Feedback for Question {i+1}", key=f"submit_{i}"): save_feedback(q['question'], q['answer'], q['rating']) st.success(f"Feedback submitted for Question {i+1}") st.write("---") # Export buttons if st.session_state.generated_questions: with st.sidebar: csv_data = export_to_csv(st.session_state.generated_questions) st.download_button(label="Download CSV", data=csv_data, file_name='questions.csv', mime='text/csv') pdf_data = export_to_pdf(st.session_state.generated_questions) st.download_button(label="Download PDF", data=pdf_data, file_name='questions.pdf', mime='application/pdf') # View Feedback Statistics with st.expander("View Feedback Statistics"): feedback_file = 'question_feedback.json' if os.path.exists(feedback_file): with open(feedback_file, 'r') as f: feedback_data = json.load(f) st.subheader("Feedback Statistics") # Calculate average rating ratings = [feedback['rating'] for feedback in feedback_data] avg_rating = sum(ratings) / len(ratings) if ratings else 0 st.write(f"Average Question Rating: {avg_rating:.2f}") # Show distribution of ratings rating_counts = {i: ratings.count(i) for i in range(1, 6)} st.bar_chart(rating_counts) # Show some highly rated questions st.subheader("Highly Rated Questions") sorted_feedback = sorted(feedback_data, key=lambda x: x['rating'], reverse=True) top_questions = sorted_feedback[:5] for feedback in top_questions: st.write(f"Question: {feedback['question']}") st.write(f"Answer: {feedback['answer']}") st.write(f"Rating: {feedback['rating']}") st.write("---") else: st.write("No feedback data available yet.") print("********************************************************************************") if __name__ == '__main__': main()