Spaces:
Sleeping
Sleeping
import streamlit as st | |
from transformers import T5ForConditionalGeneration, T5Tokenizer | |
import spacy | |
import nltk | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from rake_nltk import Rake | |
import pandas as pd | |
from fpdf import FPDF | |
import wikipediaapi | |
from functools import lru_cache | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
nltk.download('brown') | |
from nltk.tokenize import sent_tokenize | |
nltk.download('wordnet') | |
from nltk.corpus import wordnet | |
import random | |
import sense2vec | |
from wordcloud import WordCloud | |
import matplotlib.pyplot as plt | |
import json | |
import os | |
from sentence_transformers import SentenceTransformer, util | |
import textstat | |
from spellchecker import SpellChecker | |
from transformers import pipeline | |
import re | |
import pymupdf | |
import fitz # PyMuPDF | |
import pytesseract | |
from PIL import Image | |
import io | |
import uuid | |
import time | |
import asyncio | |
import aiohttp | |
import easyocr | |
# '-----------------' | |
import smtplib | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.text import MIMEText | |
from email.mime.base import MIMEBase | |
from email import encoders | |
# '------------------' | |
print("***************************************************************") | |
st.set_page_config( | |
page_icon='cyclone', | |
page_title="Question Generator", | |
initial_sidebar_state="auto", | |
menu_items={ | |
"About" : "Hi this our project." | |
} | |
) | |
st.set_option('deprecation.showPyplotGlobalUse',False) | |
class QuestionGenerationError(Exception): | |
"""Custom exception for question generation errors.""" | |
pass | |
# Initialize Wikipedia API with a user agent | |
user_agent = 'QGen/1.2' | |
wiki_wiki = wikipediaapi.Wikipedia(user_agent= user_agent,language='en') | |
def get_session_id(): | |
if 'session_id' not in st.session_state: | |
st.session_state.session_id = str(uuid.uuid4()) | |
return st.session_state.session_id | |
def initialize_state(session_id): | |
if 'session_states' not in st.session_state: | |
st.session_state.session_states = {} | |
if session_id not in st.session_state.session_states: | |
st.session_state.session_states[session_id] = { | |
'generated_questions': [], | |
# add other state variables as needed | |
} | |
return st.session_state.session_states[session_id] | |
def get_state(session_id): | |
return st.session_state.session_states[session_id] | |
def set_state(session_id, key, value): | |
st.session_state.session_states[session_id][key] = value | |
def load_model(modelname): | |
model_name = modelname | |
model = T5ForConditionalGeneration.from_pretrained(model_name) | |
tokenizer = T5Tokenizer.from_pretrained(model_name) | |
return model, tokenizer | |
# Load Spacy Model | |
def load_nlp_models(): | |
nlp = spacy.load("en_core_web_md") | |
s2v = sense2vec.Sense2Vec().from_disk('s2v_old') | |
return nlp, s2v | |
# Load Quality Assurance Models | |
def load_qa_models(): | |
# Initialize BERT model for sentence similarity | |
similarity_model = SentenceTransformer('all-MiniLM-L6-v2') | |
spell = SpellChecker() | |
return similarity_model, spell | |
with st.sidebar: | |
select_model = st.selectbox("Select Model", ("T5-large","T5-small")) | |
if select_model == "T5-large": | |
modelname = "DevBM/t5-large-squad" | |
elif select_model == "T5-small": | |
modelname = "AneriThakkar/flan-t5-small-finetuned" | |
nlp, s2v = load_nlp_models() | |
similarity_model, spell = load_qa_models() | |
context_model = similarity_model | |
model, tokenizer = load_model(modelname) | |
# Info Section | |
def display_info(): | |
st.sidebar.title("Information") | |
st.sidebar.markdown(""" | |
### Question Generator System | |
This system is designed to generate questions based on the provided context. It uses various NLP techniques and models to: | |
- Extract keywords from the text | |
- Map keywords to sentences | |
- Generate questions | |
- Provide multiple choice options | |
- Assess the quality of generated questions | |
#### Key Features: | |
- **Keyword Extraction:** Combines RAKE, TF-IDF, and spaCy for comprehensive keyword extraction. | |
- **Question Generation:** Utilizes a pre-trained T5 model for generating questions. | |
- **Options Generation:** Creates contextually relevant multiple-choice options. | |
- **Question Assessment:** Scores questions based on relevance, complexity, and spelling correctness. | |
- **Feedback Collection:** Allows users to rate the generated questions and provides statistics on feedback. | |
#### Customization Options: | |
- Number of beams for question generation | |
- Context window size for mapping keywords to sentences | |
- Number of questions to generate | |
- Additional display elements (context, answer, options, entity link, QA scores) | |
#### Outputs: | |
- Generated questions with multiple-choice options | |
- Download options for CSV and PDF formats | |
- Visualization of overall scores | |
""") | |
import fitz # PyMuPDF | |
from PIL import Image | |
import io | |
import easyocr | |
import numpy as np | |
def extract_text_from_pdf(pdf_path): | |
"""Extract text from the given PDF file.""" | |
try: | |
pdf_file = fitz.open(pdf_path) | |
all_text = "" | |
for page_index in range(len(pdf_file)): | |
page = pdf_file.load_page(page_index) | |
text = page.get_text("text") | |
if text.strip(): # Check if the text is not empty | |
all_text += text.replace('\n', ' ') + " " | |
pdf_file.close() | |
if not all_text.strip(): | |
print("No direct text found in the PDF.") | |
return all_text.strip() # Strip any leading/trailing whitespace | |
except Exception as e: | |
print(f"Error extracting text from PDF: {e}") | |
return "" | |
def extract_images_from_pdf(pdf_path): | |
"""Extract images from the given PDF file.""" | |
try: | |
pdf_file = fitz.open(pdf_path) | |
images = [] | |
for page_index in range(len(pdf_file)): | |
page = pdf_file.load_page(page_index) | |
image_list = page.get_images(full=True) | |
for img_index, img in enumerate(image_list): | |
xref = img[0] | |
base_image = pdf_file.extract_image(xref) | |
image_bytes = base_image["image"] | |
image_ext = base_image["ext"] | |
image = Image.open(io.BytesIO(image_bytes)) | |
images.append(image) | |
pdf_file.close() | |
if not images: | |
print("No images found in the PDF.") | |
return images | |
except Exception as e: | |
print(f"Error extracting images from PDF: {e}") | |
return [] | |
def recognize_text(image): | |
"""Recognize text from a single image.""" | |
try: | |
reader = easyocr.Reader(['en']) | |
image_np = np.array(image) # Convert PIL image to numpy array | |
result = reader.readtext(image_np) | |
recognized_text = "" | |
for (bbox, text, prob) in result: | |
if prob > 0.2: | |
recognized_text += f'{text} ' | |
if not recognized_text.strip(): | |
print("No text recognized from the image.") | |
return recognized_text.strip() # Strip any leading/trailing whitespace | |
except Exception as e: | |
print(f"Error recognizing text from image: {e}") | |
return "" | |
def ocr_text_from_pdf(pdf_path): | |
"""Extract text from all images in the PDF.""" | |
images = extract_images_from_pdf(pdf_path) | |
all_text = "" | |
for image in images: | |
text = recognize_text(image) | |
if text.strip(): # Check if the recognized text is not empty | |
all_text += text + " " | |
if not all_text.strip(): | |
print("No OCR text found in the PDF images.") | |
return all_text.strip() # Strip any leading/trailing whitespace | |
def extract_all_text_from_pdf(pdf_path): | |
"""Extract both direct text and OCR text from a PDF.""" | |
direct_text = extract_text_from_pdf(pdf_path) | |
ocr_text = ocr_text_from_pdf(pdf_path) | |
all_text = direct_text + " " + ocr_text + " " | |
if not all_text.strip(): | |
print("No text extracted from the PDF.") | |
return all_text.strip() # Strip any leading/trailing whitespace | |
def save_feedback(question, answer, rating, options, context): | |
feedback_file = 'question_feedback.json' | |
if os.path.exists(feedback_file): | |
with open(feedback_file, 'r') as f: | |
feedback_data = json.load(f) | |
else: | |
feedback_data = [] | |
tpl = { | |
'question' : question, | |
'answer' : answer, | |
'context' : context, | |
'options' : options, | |
'rating' : rating, | |
} | |
# feedback_data[question] = rating | |
feedback_data.append(tpl) | |
print(feedback_data) | |
with open(feedback_file, 'w') as f: | |
json.dump(feedback_data, f) | |
return feedback_file | |
# ----------------------------------------------------------------------------------------- | |
def send_email_with_attachment(email_subject, email_body, recipient_emails, sender_email, sender_password, attachment_path): | |
msg = MIMEMultipart() | |
msg['From'] = sender_email | |
msg['To'] = ", ".join(recipient_emails) # Join the list of recipients with commas | |
msg['Subject'] = email_subject | |
msg.attach(MIMEText(email_body, 'plain')) | |
attachment = open(attachment_path, 'rb') | |
part = MIMEBase('application', 'octet-stream') | |
part.set_payload(attachment.read()) | |
encoders.encode_base64(part) | |
part.add_header('Content-Disposition', f'attachment; filename={os.path.basename(attachment_path)}') | |
msg.attach(part) | |
attachment.close() | |
with smtplib.SMTP('smtp.gmail.com', 587) as server: | |
server.starttls() | |
print(sender_email) | |
print(sender_password) | |
server.login(sender_email, sender_password) | |
text = msg.as_string() | |
server.sendmail(sender_email, recipient_emails, text) | |
# ---------------------------------------------------------------------------------- | |
# Function to clean text | |
def clean_text(text): | |
text = re.sub(r"[^\x00-\x7F]", " ", text) | |
text = re.sub(f"[\n]"," ", text) | |
return text | |
# Function to create text chunks | |
def segment_text(text, max_segment_length=700, batch_size=7): | |
sentences = sent_tokenize(text) | |
segments = [] | |
current_segment = "" | |
for sentence in sentences: | |
if len(current_segment) + len(sentence) <= max_segment_length: | |
current_segment += sentence + " " | |
else: | |
segments.append(current_segment.strip()) | |
current_segment = sentence + " " | |
if current_segment: | |
segments.append(current_segment.strip()) | |
# Create batches | |
batches = [segments[i:i + batch_size] for i in range(0, len(segments), batch_size)] | |
return batches | |
# Function to extract keywords using combined techniques | |
def extract_keywords(text, extract_all): | |
try: | |
doc = nlp(text) | |
spacy_keywords = set([ent.text for ent in doc.ents]) | |
spacy_entities = spacy_keywords | |
print(f"\n\nSpacy Entities: {spacy_entities} \n\n") | |
# Use Only Spacy Entities | |
if extract_all is False: | |
return list(spacy_entities) | |
# Use RAKE | |
rake = Rake() | |
rake.extract_keywords_from_text(text) | |
rake_keywords = set(rake.get_ranked_phrases()) | |
print(f"\n\nRake Keywords: {rake_keywords} \n\n") | |
# Use spaCy for NER and POS tagging | |
spacy_keywords.update([token.text for token in doc if token.pos_ in ["NOUN", "PROPN", "VERB", "ADJ"]]) | |
print(f"\n\nSpacy Keywords: {spacy_keywords} \n\n") | |
# Use TF-IDF | |
vectorizer = TfidfVectorizer(stop_words='english') | |
X = vectorizer.fit_transform([text]) | |
tfidf_keywords = set(vectorizer.get_feature_names_out()) | |
print(f"\n\nTFIDF Entities: {tfidf_keywords} \n\n") | |
# Combine all keywords | |
combined_keywords = rake_keywords.union(spacy_keywords).union(tfidf_keywords) | |
return list(combined_keywords) | |
except Exception as e: | |
raise QuestionGenerationError(f"Error in keyword extraction: {str(e)}") | |
def get_similar_words_sense2vec(word, n=3): | |
# Try to find the word with its most likely part-of-speech | |
word_with_pos = word + "|NOUN" | |
if word_with_pos in s2v: | |
similar_words = s2v.most_similar(word_with_pos, n=n) | |
return [word.split("|")[0] for word, _ in similar_words] | |
# If not found, try without POS | |
if word in s2v: | |
similar_words = s2v.most_similar(word, n=n) | |
return [word.split("|")[0] for word, _ in similar_words] | |
return [] | |
def get_synonyms(word, n=3): | |
synonyms = [] | |
for syn in wordnet.synsets(word): | |
for lemma in syn.lemmas(): | |
if lemma.name() != word and lemma.name() not in synonyms: | |
synonyms.append(lemma.name()) | |
if len(synonyms) == n: | |
return synonyms | |
return synonyms | |
def generate_options(answer, context, n=3): | |
options = [answer] | |
# Add contextually relevant words using a pre-trained model | |
context_embedding = context_model.encode(context) | |
answer_embedding = context_model.encode(answer) | |
context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] | |
# Compute similarity scores and sort context words | |
similarity_scores = [util.pytorch_cos_sim(context_model.encode(word), answer_embedding).item() for word in context_words] | |
sorted_context_words = [word for _, word in sorted(zip(similarity_scores, context_words), reverse=True)] | |
options.extend(sorted_context_words[:n]) | |
# Try to get similar words based on sense2vec | |
similar_words = get_similar_words_sense2vec(answer, n) | |
options.extend(similar_words) | |
# If we don't have enough options, try synonyms | |
if len(options) < n + 1: | |
synonyms = get_synonyms(answer, n - len(options) + 1) | |
options.extend(synonyms) | |
# If we still don't have enough options, extract other entities from the context | |
if len(options) < n + 1: | |
doc = nlp(context) | |
entities = [ent.text for ent in doc.ents if ent.text.lower() != answer.lower()] | |
options.extend(entities[:n - len(options) + 1]) | |
# If we still need more options, add some random words from the context | |
if len(options) < n + 1: | |
context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] | |
options.extend(random.sample(context_words, min(n - len(options) + 1, len(context_words)))) | |
print(f"\n\nAll Possible Options: {options}\n\n") | |
# Ensure we have the correct number of unique options | |
options = list(dict.fromkeys(options))[:n+1] | |
# Shuffle the options | |
random.shuffle(options) | |
return options | |
# Function to map keywords to sentences with customizable context window size | |
def map_keywords_to_sentences(text, keywords, context_window_size): | |
sentences = sent_tokenize(text) | |
keyword_sentence_mapping = {} | |
print(f"\n\nSentences: {sentences}\n\n") | |
for keyword in keywords: | |
for i, sentence in enumerate(sentences): | |
if keyword in sentence: | |
# Combine current sentence with surrounding sentences for context | |
start = max(0, i - context_window_size) | |
end = min(len(sentences), i + context_window_size + 1) | |
context = ' '.join(sentences[start:end]) | |
if keyword not in keyword_sentence_mapping: | |
keyword_sentence_mapping[keyword] = context | |
else: | |
keyword_sentence_mapping[keyword] += ' ' + context | |
return keyword_sentence_mapping | |
# Function to perform entity linking using Wikipedia API | |
def entity_linking(keyword): | |
page = wiki_wiki.page(keyword) | |
if page.exists(): | |
return page.fullurl | |
return None | |
async def generate_question_async(context, answer, num_beams): | |
try: | |
input_text = f"<context> {context} <answer> {answer}" | |
print(f"\n{input_text}\n") | |
input_ids = tokenizer.encode(input_text, return_tensors='pt') | |
outputs = await asyncio.to_thread(model.generate, input_ids, num_beams=num_beams, early_stopping=True, max_length=250) | |
question = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
print(f"\n{question}\n") | |
return question | |
except Exception as e: | |
raise QuestionGenerationError(f"Error in question generation: {str(e)}") | |
async def generate_options_async(answer, context, n=3): | |
try: | |
options = [answer] | |
# Add contextually relevant words using a pre-trained model | |
context_embedding = await asyncio.to_thread(context_model.encode, context) | |
answer_embedding = await asyncio.to_thread(context_model.encode, answer) | |
context_words = [token.text for token in nlp(context) if token.is_alpha and token.text.lower() != answer.lower()] | |
# Compute similarity scores and sort context words | |
similarity_scores = [util.pytorch_cos_sim(await asyncio.to_thread(context_model.encode, word), answer_embedding).item() for word in context_words] | |
sorted_context_words = [word for _, word in sorted(zip(similarity_scores, context_words), reverse=True)] | |
options.extend(sorted_context_words[:n]) | |
# Try to get similar words based on sense2vec | |
similar_words = await asyncio.to_thread(get_similar_words_sense2vec, answer, n) | |
options.extend(similar_words) | |
# If we don't have enough options, try synonyms | |
if len(options) < n + 1: | |
synonyms = await asyncio.to_thread(get_synonyms, answer, n - len(options) + 1) | |
options.extend(synonyms) | |
# Ensure we have the correct number of unique options | |
options = list(dict.fromkeys(options))[:n+1] | |
# Shuffle the options | |
random.shuffle(options) | |
return options | |
except Exception as e: | |
raise QuestionGenerationError(f"Error in generating options: {str(e)}") | |
# Function to generate questions using beam search | |
async def generate_questions_async(text, num_questions, context_window_size, num_beams, extract_all_keywords): | |
try: | |
batches = segment_text(text) | |
keywords = extract_keywords(text, extract_all_keywords) | |
all_questions = [] | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
for i, batch in enumerate(batches): | |
status_text.text(f"Processing batch {i+1} of {len(batches)}...") | |
batch_questions = await process_batch(batch, keywords, context_window_size, num_beams) | |
all_questions.extend(batch_questions) | |
progress_bar.progress((i + 1) / len(batches)) | |
if len(all_questions) >= num_questions: | |
break | |
progress_bar.empty() | |
status_text.empty() | |
return all_questions[:num_questions] | |
except QuestionGenerationError as e: | |
st.error(f"An error occurred during question generation: {str(e)}") | |
return [] | |
except Exception as e: | |
st.error(f"An unexpected error occurred: {str(e)}") | |
return [] | |
async def process_batch(batch, keywords, context_window_size, num_beams): | |
questions = [] | |
for text in batch: | |
keyword_sentence_mapping = map_keywords_to_sentences(text, keywords, context_window_size) | |
for keyword, context in keyword_sentence_mapping.items(): | |
question = await generate_question_async(context, keyword, num_beams) | |
options = await generate_options_async(keyword, context) | |
overall_score, relevance_score, complexity_score, spelling_correctness = assess_question_quality(context, question, keyword) | |
if overall_score >= 0.5: | |
questions.append({ | |
"question": question, | |
"context": context, | |
"answer": keyword, | |
"options": options, | |
"overall_score": overall_score, | |
"relevance_score": relevance_score, | |
"complexity_score": complexity_score, | |
"spelling_correctness": spelling_correctness, | |
}) | |
return questions | |
# Function to export questions to CSV | |
def export_to_csv(data): | |
# df = pd.DataFrame(data, columns=["Context", "Answer", "Question", "Options"]) | |
df = pd.DataFrame(data) | |
# csv = df.to_csv(index=False,encoding='utf-8') | |
csv = df.to_csv(index=False) | |
return csv | |
# Function to export questions to PDF | |
def export_to_pdf(data): | |
pdf = FPDF() | |
pdf.add_page() | |
pdf.set_font("Arial", size=12) | |
for item in data: | |
pdf.multi_cell(0, 10, f"Context: {item['context']}") | |
pdf.multi_cell(0, 10, f"Question: {item['question']}") | |
pdf.multi_cell(0, 10, f"Answer: {item['answer']}") | |
pdf.multi_cell(0, 10, f"Options: {', '.join(item['options'])}") | |
pdf.multi_cell(0, 10, f"Overall Score: {item['overall_score']:.2f}") | |
pdf.ln(10) | |
return pdf.output(dest='S').encode('latin-1') | |
def display_word_cloud(generated_questions): | |
word_frequency = {} | |
for question in generated_questions: | |
words = question.split() | |
for word in words: | |
word_frequency[word] = word_frequency.get(word, 0) + 1 | |
wordcloud = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(word_frequency) | |
plt.figure(figsize=(10, 5)) | |
plt.imshow(wordcloud, interpolation='bilinear') | |
plt.axis('off') | |
st.pyplot() | |
def assess_question_quality(context, question, answer): | |
# Assess relevance using cosine similarity | |
context_doc = nlp(context) | |
question_doc = nlp(question) | |
relevance_score = context_doc.similarity(question_doc) | |
# Assess complexity using token length (as a simple metric) | |
complexity_score = min(len(question_doc) / 20, 1) # Normalize to 0-1 | |
# Assess Spelling correctness | |
misspelled = spell.unknown(question.split()) | |
spelling_correctness = 1 - (len(misspelled) / len(question.split())) # Normalize to 0-1 | |
# Calculate overall score (you can adjust weights as needed) | |
overall_score = ( | |
0.4 * relevance_score + | |
0.4 * complexity_score + | |
0.2 * spelling_correctness | |
) | |
return overall_score, relevance_score, complexity_score, spelling_correctness | |
def main(): | |
# Streamlit interface | |
st.title(":blue[Question Generator System]") | |
session_id = get_session_id() | |
state = initialize_state(session_id) | |
with st.sidebar: | |
show_info = st.toggle('Show Info',False) | |
if show_info: | |
display_info() | |
st.subheader("Customization Options") | |
# Customization options | |
input_type = st.radio("Select Input Preference", ("Text Input","Upload PDF")) | |
with st.expander("Choose the Additional Elements to show"): | |
show_context = st.checkbox("Context",True) | |
show_answer = st.checkbox("Answer",True) | |
show_options = st.checkbox("Options",False) | |
show_entity_link = st.checkbox("Entity Link For Wikipedia",True) | |
show_qa_scores = st.checkbox("QA Score",False) | |
num_beams = st.slider("Select number of beams for question generation", min_value=2, max_value=10, value=2) | |
context_window_size = st.slider("Select context window size (number of sentences before and after)", min_value=1, max_value=5, value=1) | |
num_questions = st.slider("Select number of questions to generate", min_value=1, max_value=1000, value=5) | |
col1, col2 = st.columns(2) | |
with col1: | |
extract_all_keywords = st.toggle("Extract Max Keywords",value=False) | |
with col2: | |
enable_feedback_mode = st.toggle("Enable Feedback Mode",False) | |
text = None | |
if input_type == "Text Input": | |
text = st.text_area("Enter text here:", value="Joe Biden, the current US president is on a weak wicket going in for his reelection later this November against former President Donald Trump.", help="Enter or paste your text here") | |
elif input_type == "Upload PDF": | |
file = st.file_uploader("Upload PDF Files") | |
if file is not None: | |
try: | |
text = extract_all_text_from_pdf(file) | |
# text = get_pdf_text(file) | |
except Exception as e: | |
st.error(f"Error reading PDF file: {str(e)}") | |
text = None | |
if text: | |
text = clean_text(text) | |
generate_questions_button = st.button("Generate Questions") | |
st.markdown('<span aria-label="Generate questions button">Above is the generate questions button</span>', unsafe_allow_html=True) | |
# if generate_questions_button: | |
if generate_questions_button and text: | |
start_time = time.time() | |
with st.spinner("Generating questions..."): | |
try: | |
state['generated_questions'] = asyncio.run(generate_questions_async(text, num_questions, context_window_size, num_beams, extract_all_keywords)) | |
if not state['generated_questions']: | |
st.warning("No questions were generated. The text might be too short or lack suitable content.") | |
else: | |
st.success(f"Successfully generated {len(state['generated_questions'])} questions!") | |
except QuestionGenerationError as e: | |
st.error(f"An error occurred during question generation: {str(e)}") | |
except Exception as e: | |
st.error(f"An unexpected error occurred: {str(e)}") | |
print("\n\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n\n") | |
data = get_state(session_id) | |
print(data) | |
end_time = time.time() | |
print(f"Time Taken to generate: {end_time-start_time}") | |
set_state(session_id, 'generated_questions', state['generated_questions']) | |
# sort question based on their quality score | |
state['generated_questions'] = sorted(state['generated_questions'],key = lambda x: x['overall_score'], reverse=True) | |
# Display generated questions | |
if state['generated_questions']: | |
st.header("Generated Questions:",divider='blue') | |
for i, q in enumerate(state['generated_questions']): | |
st.subheader(body=f":orange[Q{i+1}:] {q['question']}") | |
if show_context is True: | |
st.write(f"**Context:** {q['context']}") | |
if show_answer is True: | |
st.write(f"**Answer:** {q['answer']}") | |
if show_options is True: | |
st.write(f"**Options:**") | |
for j, option in enumerate(q['options']): | |
st.write(f"{chr(65+j)}. {option}") | |
if show_entity_link is True: | |
linked_entity = entity_linking(q['answer']) | |
if linked_entity: | |
st.write(f"**Entity Link:** {linked_entity}") | |
if show_qa_scores is True: | |
m1,m2,m3,m4 = st.columns([1.7,1,1,1]) | |
m1.metric("Overall Quality Score", value=f"{q['overall_score']:,.2f}") | |
m2.metric("Relevance Score", value=f"{q['relevance_score']:,.2f}") | |
m3.metric("Complexity Score", value=f"{q['complexity_score']:,.2f}") | |
m4.metric("Spelling Correctness", value=f"{q['spelling_correctness']:,.2f}") | |
# q['context'] = st.text_area(f"Edit Context {i+1}:", value=q['context'], key=f"context_{i}") | |
if enable_feedback_mode: | |
q['question'] = st.text_input(f"Edit Question {i+1}:", value=q['question'], key=f"question_{i}") | |
q['rating'] = st.select_slider(f"Rate this question (1-5)", options=[1, 2, 3, 4, 5], key=f"rating_{i}") | |
if st.button(f"Submit Feedback for Question {i+1}", key=f"submit_{i}"): | |
feedback_file=save_feedback(q['question'], q['answer'], q['rating'], q['options'], q['context']) | |
st.success(f"Feedback submitted for Question {i+1}") | |
pswd = st.secrets['EMAIL_PASSWORD'] | |
send_email_with_attachment( | |
email_subject='feedback from QGen', | |
email_body='Please find the attached feedback JSON file.', | |
recipient_emails=['[email protected]', '[email protected]'], | |
sender_email='[email protected]', | |
sender_password=pswd, | |
attachment_path=feedback_file) | |
st.write("Feedback sent to admin") | |
st.write("---") | |
# Export buttons | |
# if st.session_state.generated_questions: | |
if state['generated_questions']: | |
with st.sidebar: | |
csv_data = export_to_csv(state['generated_questions']) | |
st.download_button(label="Download CSV", data=csv_data, file_name='questions.csv', mime='text/csv') | |
pdf_data = export_to_pdf(state['generated_questions']) | |
st.download_button(label="Download PDF", data=pdf_data, file_name='questions.pdf', mime='application/pdf') | |
with st.expander("View Visualizations"): | |
questions = [tpl['question'] for tpl in state['generated_questions']] | |
overall_scores = [tpl['overall_score'] for tpl in state['generated_questions']] | |
st.subheader('WordCloud of Questions',divider='rainbow') | |
display_word_cloud(questions) | |
st.subheader('Overall Scores',divider='violet') | |
overall_scores = pd.DataFrame(overall_scores,columns=['Overall Scores']) | |
st.line_chart(overall_scores) | |
# View Feedback Statistics | |
with st.expander("View Feedback Statistics"): | |
feedback_file = 'question_feedback.json' | |
if os.path.exists(feedback_file): | |
with open(feedback_file, 'r') as f: | |
feedback_data = json.load(f) | |
st.subheader("Feedback Statistics") | |
# Calculate average rating | |
ratings = [feedback['rating'] for feedback in feedback_data] | |
avg_rating = sum(ratings) / len(ratings) if ratings else 0 | |
st.write(f"Average Question Rating: {avg_rating:.2f}") | |
# Show distribution of ratings | |
rating_counts = {i: ratings.count(i) for i in range(1, 6)} | |
st.bar_chart(rating_counts) | |
# Show some highly rated questions | |
st.subheader("Highly Rated Questions") | |
sorted_feedback = sorted(feedback_data, key=lambda x: x['rating'], reverse=True) | |
top_questions = sorted_feedback[:5] | |
for feedback in top_questions: | |
st.write(f"Question: {feedback['question']}") | |
st.write(f"Answer: {feedback['answer']}") | |
st.write(f"Rating: {feedback['rating']}") | |
st.write("---") | |
else: | |
st.write("No feedback data available yet.") | |
print("********************************************************************************") | |
if __name__ == '__main__': | |
try: | |
main() | |
except Exception as e: | |
st.error(f"An unexpected error occurred: {str(e)}") | |
st.error("Please try refreshing the page. If the problem persists, contact support.") |