from ragatouille import RAGPretrainedModel
import subprocess
import json
import spaces
import firebase_admin
from firebase_admin import credentials, firestore
import logging
from pathlib import Path
from time import perf_counter
from datetime import datetime
import gradio as gr
from jinja2 import Environment, FileSystemLoader
import numpy as np
from sentence_transformers import CrossEncoder
from backend.query_llm import generate_hf, generate_openai
from backend.semantic_search import table, retriever
VECTOR_COLUMN_NAME = "vector"
TEXT_COLUMN_NAME = "text"
proj_dir = Path(__file__).parent
# Setting up the logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Set up the template environment with the templates directory
env = Environment(loader=FileSystemLoader(proj_dir / 'templates'))
# Load the templates directly from the environment
template = env.get_template('template.j2')
template_html = env.get_template('template_html.j2')
#___________________
# service_account_key='firebase.json'
# # Create a Certificate object from the service account info
# cred = credentials.Certificate(service_account_key)
# # Initialize the Firebase Admin
# firebase_admin.initialize_app(cred)
# # # Create a reference to the Firestore database
# db = firestore.client()
# #db usage
# collection_name = 'Nirvachana' # Replace with your collection name
# field_name = 'message_count' # Replace with your field name for count
# Examples
examples = ['Tabulate the difference between veins and arteries','What are defects in Human eye?',
'Frame 5 short questions and 5 MCQ on Chapter 2 ','Suggest creative and engaging ideas to teach students on Chapter on Metals and Non Metals '
]
# def get_and_increment_value_count(db , collection_name, field_name):
# """
# Retrieves a value count from the specified Firestore collection and field,
# increments it by 1, and updates the field with the new value."""
# collection_ref = db.collection(collection_name)
# doc_ref = collection_ref.document('count_doc') # Assuming a dedicated document for count
# # Use a transaction to ensure consistency across reads and writes
# try:
# with db.transaction() as transaction:
# # Get the current value count (or initialize to 0 if it doesn't exist)
# current_count_doc = doc_ref.get()
# current_count_data = current_count_doc.to_dict()
# if current_count_data:
# current_count = current_count_data.get(field_name, 0)
# else:
# current_count = 0
# # Increment the count
# new_count = current_count + 1
# # Update the document with the new count
# transaction.set(doc_ref, {field_name: new_count})
# return new_count
# except Exception as e:
# print(f"Error retrieving and updating value count: {e}")
# return None # Indicate error
# def update_count_html():
# usage_count = get_and_increment_value_count(db ,collection_name, field_name)
# ccount_html = gr.HTML(value=f"""
#
# No of Usages:
# {usage_count}
#
# """)
# return count_html
# def store_message(db,query,answer,cross_encoder):
# timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# # Create a new document reference with a dynamic document name based on timestamp
# new_completion= db.collection('Nirvachana').document(f"chatlogs_{timestamp}")
# new_completion.set({
# 'query': query,
# 'answer':answer,
# 'created_time': firestore.SERVER_TIMESTAMP,
# 'embedding': cross_encoder,
# 'title': 'Expenditure observer bot'
# })
def add_text(history, text):
history = [] if history is None else history
history = history + [(text, None)]
return history, gr.Textbox(value="", interactive=False)
def bot(history, cross_encoder):
top_rerank = 15
top_k_rank = 10
query = history[-1][0]
if not query:
gr.Warning("Please submit a non-empty string as a prompt")
raise ValueError("Empty string was submitted")
logger.warning('Retrieving documents...')
# if COLBERT RAGATATOUILLE PROCEDURE :
if cross_encoder=='(HIGH ACCURATE) ColBERT':
gr.Warning('Retrieving using ColBERT.. First time query will take a minute for model to load..pls wait')
RAG= RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
RAG_db=RAG.from_index('.ragatouille/colbert/indexes/cbseclass10index')
documents_full=RAG_db.search(query,k=top_k_rank)
documents=[item['content'] for item in documents_full]
# Create Prompt
prompt = template.render(documents=documents, query=query)
prompt_html = template_html.render(documents=documents, query=query)
generate_fn = generate_hf
history[-1][1] = ""
for character in generate_fn(prompt, history[:-1]):
history[-1][1] = character
print('Final history is ',history)
yield history, prompt_html
#store_message(db,history[-1][0],history[-1][1],cross_encoder)
else:
# Retrieve documents relevant to query
document_start = perf_counter()
query_vec = retriever.encode(query)
logger.warning(f'Finished query vec')
doc1 = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_k_rank)
logger.warning(f'Finished search')
documents = table.search(query_vec, vector_column_name=VECTOR_COLUMN_NAME).limit(top_rerank).to_list()
documents = [doc[TEXT_COLUMN_NAME] for doc in documents]
logger.warning(f'start cross encoder {len(documents)}')
# Retrieve documents relevant to query
query_doc_pair = [[query, doc] for doc in documents]
if cross_encoder=='(FAST) MiniLM-L6v2' :
cross_encoder1 = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
elif cross_encoder=='(ACCURATE) BGE reranker':
cross_encoder1 = CrossEncoder('BAAI/bge-reranker-base')
cross_scores = cross_encoder1.predict(query_doc_pair)
sim_scores_argsort = list(reversed(np.argsort(cross_scores)))
logger.warning(f'Finished cross encoder {len(documents)}')
documents = [documents[idx] for idx in sim_scores_argsort[:top_k_rank]]
logger.warning(f'num documents {len(documents)}')
document_time = perf_counter() - document_start
logger.warning(f'Finished Retrieving documents in {round(document_time, 2)} seconds...')
# Create Prompt
prompt = template.render(documents=documents, query=query)
prompt_html = template_html.render(documents=documents, query=query)
generate_fn = generate_hf
history[-1][1] = ""
for character in generate_fn(prompt, history[:-1]):
history[-1][1] = character
print('Final history is ',history)
yield history, prompt_html
#store_message(db,history[-1][0],history[-1][1],cross_encoder)
def system_instructions(question_difficulty, topic,document_str):
return f""" [INST] Your are a great teacher and your task is to create 10 questions with 4 choices with a {question_difficulty} difficulty about topic request " {topic} " only from the below given documents, {document_str} then create an answers. Index in JSON format, the questions as "Q#":"" to "Q#":"", the four choices as "Q#:C1":"" to "Q#:C4":"", and the answers as "A#":"Q#:C#" to "A#":"Q#:C#". [/INST]"""
#with gr.Blocks(theme='Insuz/SimpleIndigo') as demo:
with gr.Blocks(theme='NoCrypt/miku') as CHATBOT:
with gr.Row():
with gr.Column(scale=10):
# gr.Markdown(
# """
# # Theme preview: `paris`
# To use this theme, set `theme='earneleh/paris'` in `gr.Blocks()` or `gr.Interface()`.
# You can append an `@` and a semantic version expression, e.g. @>=1.0.0,<2.0.0 to pin to a given version
# of this theme.
# """
# )
gr.HTML(value="""CHEERFULL CBSE-
AI Assisted Fun Learning
""", elem_id='heading')
gr.HTML(value=f"""
A free Artificial Intelligence Chatbot assistant trained on CBSE Class 10 Science Notes to engage and help students and teachers of Puducherry.
""", elem_id='Sub-heading')
#usage_count = get_and_increment_value_count(db,collection_name, field_name)
gr.HTML(value=f"""Developed by K M Ramyasri , PGT . Suggestions may be sent to mramesh.irs@gov.in.
""", elem_id='Sub-heading1 ')
with gr.Column(scale=3):
gr.Image(value='logo.png',height=200,width=200)
# gr.HTML(value="""CHEERFULL CBSE-
AI Assisted Fun Learning
#
#
""", elem_id='heading')
# gr.HTML(value=f"""
#
# A free Artificial Intelligence Chatbot assistant trained on CBSE Class 10 Science Notes to engage and help students and teachers of Puducherry.
#
# """, elem_id='Sub-heading')
# #usage_count = get_and_increment_value_count(db,collection_name, field_name)
# gr.HTML(value=f"""Developed by K M Ramyasri , PGT . Suggestions may be sent to mramesh.irs@gov.in.
""", elem_id='Sub-heading1 ')
# # count_html = gr.HTML(value=f"""
# #
# # No of Usages:
# # {usage_count}
# #
# # """)
chatbot = gr.Chatbot(
[],
elem_id="chatbot",
avatar_images=('https://aui.atlassian.com/aui/8.8/docs/images/avatar-person.svg',
'https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg'),
bubble_full_width=False,
show_copy_button=True,
show_share_button=True,
)
with gr.Row():
txt = gr.Textbox(
scale=3,
show_label=False,
placeholder="Enter text and press enter",
container=False,
)
txt_btn = gr.Button(value="Submit text", scale=1)
cross_encoder = gr.Radio(choices=['(FAST) MiniLM-L6v2','(ACCURATE) BGE reranker','(HIGH ACCURATE) ColBERT'], value='(ACCURATE) BGE reranker',label="Embeddings", info="Only First query to Colbert may take litte time)")
prompt_html = gr.HTML()
# Turn off interactivity while generating if you click
txt_msg = txt_btn.click(add_text, [chatbot, txt], [chatbot, txt], queue=False).then(
bot, [chatbot, cross_encoder], [chatbot, prompt_html])#.then(update_count_html,[],[count_html])
# Turn it back on
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False)
# Turn off interactivity while generating if you hit enter
txt_msg = txt.submit(add_text, [chatbot, txt], [chatbot, txt], queue=False).then(
bot, [chatbot, cross_encoder], [chatbot, prompt_html])#.then(update_count_html,[],[count_html])
# Turn it back on
txt_msg.then(lambda: gr.Textbox(interactive=True), None, [txt], queue=False)
# Examples
gr.Examples(examples, txt)
RAG_db=gr.State()
with gr.Blocks(title="Quiz Maker", theme=gr.themes.Default(primary_hue="green", secondary_hue="green"), css="style.css") as QUIZBOT:
def load_model():
RAG= RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
RAG_db.value=RAG.from_index('.ragatouille/colbert/indexes/cbseclass10index')
return 'Ready to Go!!'
with gr.Column(scale=4):
gr.HTML("""
AI NANBAN - CBSE Class Quiz Maker
AI-powered Learning Game
⚠️ Students create quiz from any topic /CBSE Chapter ! ⚠️
""")
#gr.Warning('Retrieving using ColBERT.. First time query will take a minute for model to load..pls wait')
with gr.Column(scale=2):
load_btn = gr.Button("Click to Load!🚀")
load_text=gr.Textbox()
load_btn.click(load_model,[],load_text)
topic = gr.Textbox(label="Enter the Topic for Quiz", placeholder="Write any topic from CBSE notes")
with gr.Row():
radio = gr.Radio(
["easy", "average", "hard"], label="How difficult should the quiz be?"
)
generate_quiz_btn = gr.Button("Generate Quiz!🚀")
quiz_msg=gr.Textbox()
question_radios = [gr.Radio(visible=False), gr.Radio(visible=False), gr.Radio(
visible=False), gr.Radio(visible=False), gr.Radio(visible=False), gr.Radio(visible=False), gr.Radio(visible=False), gr.Radio(
visible=False), gr.Radio(visible=False), gr.Radio(visible=False)]
print(question_radios)
@spaces.GPU
@generate_quiz_btn.click(inputs=[radio, topic], outputs=[quiz_msg,question_radios], api_name="generate_quiz")
def generate_quiz(question_difficulty, topic, user_prompt):
top_k_rank=10
RAG_db_=RAG_db.value
documents_full=RAG_db_.search(topic,k=top_k_rank)
documents=[item['content'] for item in documents_full]
document_summaries = [f"[DOCUMENT {i+1}]: {summary}" for i, summary in enumerate(documents)]
documents_str='\n'.join(document_summaries)
formatted_prompt = system_instructions(
question_difficulty, topic,document_str)
print(formatted_prompt)
pre_prompt = [
{"role": "system", "content": formatted_prompt}
]
generate_kwargs = dict(
temperature=0.2,
max_new_tokens=4000,
top_p=0.95,
repetition_penalty=1.0,
do_sample=True,
seed=42,
)
while True:
try:
response = client.text_generation(
formatted_prompt, **generate_kwargs, stream=False, details=False, return_full_text=False,
)
output_json = json.loads(f"{response}")
break
except Exception as e:
print(f"Exception occurred: {e}")
print("Trying again...please wait")
continue
response = client.text_generation(
formatted_prompt, **generate_kwargs, stream=False, details=False, return_full_text=False,
)
print(response)
print('output json', output_json)
global quiz_data
quiz_data = output_json
question_radio_list = []
for question_num in range(1, 11):
question_key = f"Q{question_num}"
answer_key = f"A{question_num}"
question = quiz_data.get(question_key)
answer = quiz_data.get(quiz_data.get(answer_key))
if not question or not answer:
continue
choice_keys = [f"{question_key}:C{i}" for i in range(1, 5)]
choice_list = []
for choice_key in choice_keys:
choice = quiz_data.get(choice_key, "Choice not found")
choice_list.append(f"{choice}")
radio = gr.Radio(choices=choice_list, label=question,
visible=True, interactive=True)
question_radio_list.append(radio)
print('Question radio list ' , question_radio_list)
return 'Quiz Generated!', question_radio_list
check_button = gr.Button("Check Score")
score_textbox = gr.Markdown()
@check_button.click(inputs=question_radios, outputs=score_textbox)
def compare_answers(*user_answers):
user_anwser_list = []
user_anwser_list = user_answers
answers_list = []
for question_num in range(1, 20):
answer_key = f"A{question_num}"
answer = quiz_data.get(quiz_data.get(answer_key))
if not answer:
break
answers_list.append(answer)
score = 0
for item in user_anwser_list:
if item in answers_list:
score += 1
if score>5:
message = f"### Good ! You got {score} over 10!"
elif score>7:
message = f"### Excellent ! You got {score} over 10!"
else:
message = f"### You got {score} over 10! Dont worry . You can prepare well and try better next time !"
return message
demo = gr.TabbedInterface([CHATBOT,QUIZBOT], ["AI ChatBot", "AI Nanban-Quizbot"])
demo.queue()
demo.launch(debug=True)