kpal002's picture
Update app.py
038f9aa verified
raw
history blame
17.1 kB
import os
import re
import json
import getpass
import logging
import openai
import asyncio
from typing import Any, List, Tuple, Dict
import gradio as gr
import llama_index
from fpdf import FPDF
from llama_index import Document
from llama_index.llms import OpenAI
from llama_index.embeddings import OpenAIEmbedding, HuggingFaceEmbedding
from llama_index.llms import HuggingFaceLLM
import requests
from RAG_utils import PDFProcessor_Unstructured, PDFQueryEngine, HybridRetriever, MixtralLLM, KeywordSearch, base_utils, ConfigManager
# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Create a logger object
logger = logging.getLogger(__name__)
os.environ["TOKENIZERS_PARALLELISM"] = "false"
config_manager = ConfigManager()
#config_manager.load_config("api", "Config/api_config.json")
config_manager.load_config("model", "model_config.json")
openai.api_key = os.environ['OPENAI_API_KEY'] #config_manager.get_config_value("api", "OPENAI_API_KEY")
hf_token = os.environ['HF_TOKEN']#config_manager.get_config_value("api", "HF_TOKEN")
# PDF rendering and chunking parameters
pdf_processing_config = config_manager.get_config_value("model", "pdf_processing")
ALLOWED_EXTENSIONS = config_manager.get_config_value("model", "allowed_extensions")
embed = config_manager.get_config_value("model", "embeddings")
embed_model_name = config_manager.get_config_value("model", "embeddings_model")
#llm_model = config_manager.get_config_value("model", "llm_model")
model_temperature = config_manager.get_config_value("model", "model_temp")
output_token_size = config_manager.get_config_value("model", "max_tokens")
model_context_window = config_manager.get_config_value("model", "context_window")
gpt_prompt_path = config_manager.get_config_value("model","GPT_PROMPT_PATH")
mistral_prompt_path = config_manager.get_config_value("model","MISTRAL_PROMPT_PATH")
info_prompt_path = config_manager.get_config_value("model", "INFO_PROMPT_PATH")
peer_review_journals_path = config_manager.get_config_value("model", "peer_review_journals_path")
eq_network_journals_path = config_manager.get_config_value("model", "eq_network_journals_path")
queries = config_manager.get_config_value("model", "queries")
criteria = config_manager.get_config_value("model", "criteria")
num_criteria = len(queries)
author_query = config_manager.get_config_value("model", "author_query")
journal_query = config_manager.get_config_value("model", "journal_query")
# Helper function to check if the file extension is allowed
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def generate_score_bar(score, num_criteria):
# Convert and round the score from a 9-point scale to a 100-point scale
score_out_of_100 = round((score / num_criteria) * 100)
# Determine the color and text based on the original score
if score == 9:
color = "#4CAF50" # green
text = "Very good"
elif score in [7, 8]:
color = "#FFEB3B" # yellow
text = "Good"
elif score in [5, 6]:
color = "#FF9800" # orange
text = "Ok"
elif score in [3, 4]:
color = "#F44336" # red
text = "Bad"
else: # score < 3
color = "#800000" # maroon
text = "Very bad"
# Create the HTML for the score bar
score_bar_html = f"""
<div style="background-color: #ddd; border-radius: 10px; position: relative; height: 20px; width: 100%;">
<div style="background-color: {color}; height: 100%; border-radius: 10px; width: {score_out_of_100}%;"></div>
</div>
<p style="color: {color};">{text}</p> <!-- Display the text -->
"""
return score_bar_html
class PDF(FPDF):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Load the DejaVu font files
self.add_font('DejaVu', '', 'DejaVu_Sans/DejaVuSansCondensed.ttf', uni=True)
self.add_font('DejaVu', 'B', 'DejaVu_Sans/DejaVuSansCondensed-Bold.ttf', uni=True)
self.add_font('DejaVu', 'I', 'DejaVu_Sans/DejaVuSansCondensed-Oblique.ttf', uni=True)
def header(self):
self.set_font('DejaVu', 'B', 12)
self.cell(0, 10, 'Paper Analysis Report', 0, 1, 'C')
def footer(self):
self.set_y(-15)
self.set_font('DejaVu', 'I', 8)
self.cell(0, 10, f'Page {self.page_no()}', 0, 0, 'C')
def create_pdf_report(title, author_info, score, reasoning_text, output_path):
pdf = PDF()
pdf.add_page()
# Set margins
pdf.set_left_margin(10)
pdf.set_right_margin(10)
# Title
pdf.set_font("DejaVu", 'B', 14) # Bold and larger font for the heading
pdf.cell(0, 10, "Title:", 0, 1)
pdf.set_font("DejaVu", '', 12) # Regular and smaller font for the content
pdf.multi_cell(0, 10, title, 0, 1)
# Author Information
pdf.set_font("DejaVu", 'B', 14) # Bold and larger font for the heading
pdf.cell(0, 10, "Author Information:", 0, 1)
pdf.set_font("DejaVu", '', 12) # Regular and smaller font for the content
pdf.multi_cell(0, 10, author_info, 0, 1)
# Score
pdf.set_font("DejaVu", 'B', 14) # Bold and larger font for the heading
pdf.cell(0, 10, "Score:", 0, 1)
pdf.set_font("DejaVu", '', 12) # Regular and smaller font for the content
pdf.multi_cell(0, 10, score, 0, 1)
# Reasoning
pdf.set_font("DejaVu", 'B', 14) # Bold and larger font for the heading
pdf.cell(0, 10, "Reasoning:", 0, 1)
pdf.set_font("DejaVu", '', 12) # Regular and smaller font for the content
pdf.multi_cell(0, 10, reasoning_text, 0, 1)
# Save the PDF to the specified output path
pdf.output(output_path)
def process_pdf(uploaded_files, llm_model, n_criteria = num_criteria):
# Initialize aggregation variables
final_score = 0
final_reasoning = []
final_score_bar_html = ""
final_author_info_html = ""
final_title_info_html = ""
output_files = []
for i, uploaded_file in enumerate(uploaded_files):
# Process the PDF file
pdf_processor = PDFProcessor_Unstructured(pdf_processing_config)
merged_chunks, tables, title = pdf_processor.process_pdf_file(uploaded_file)
documents = [Document(text=t) for t in merged_chunks]
# Prompts and Queries
utils = base_utils()
info_prompt = utils.read_from_file(info_prompt_path)
# LLM Model choice
try:
if llm_model == "Model 1":
llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=output_token_size)
general_prompt = utils.read_from_file(gpt_prompt_path)
elif llm_model == "Model 2":
if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]):
raise ValueError("All parameters are required for Mistral LLM.")
llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size,
temperature=model_temperature, model_name="mistralai/Mixtral-8x7B-Instruct-v0.1", api_key=hf_token)
general_prompt = utils.read_from_file(mistral_prompt_path)
else:
raise ValueError(f"Unsupported language model: {llm_model}")
except Exception as e:
logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True)
raise # Or handle the exception as needed
# Embedding model choice for RAG
try:
if embed == "openai":
embed_model = OpenAIEmbedding(model="text-embedding-3-large")
elif embed == "huggingface":
# Use the specified model name
embed_model = HuggingFaceEmbedding(embed_model_name)
else:
raise ValueError(f"Unsupported embedding model: {embed_model}")
except Exception as e:
logger.error(f"Error initializing embedding model: {e}", exc_info=True)
raise
peer_review_journals = utils.read_from_file(peer_review_journals_path)
eq_network_journals = utils.read_from_file(eq_network_journals_path)
peer_review_journals_list = peer_review_journals.split('\n')
eq_network_journals_list = eq_network_journals.split('\n')
modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?"
info_llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=100)
pdf_info_query = PDFQueryEngine(documents, info_llm, embed_model, (info_prompt))
info_query_engine = pdf_info_query.setup_query_engine()
journal_result = info_query_engine.query(modified_journal_query).response
author_result = info_query_engine.query(author_query).response
pdf_criteria_query = PDFQueryEngine(documents, llm, embed_model, (general_prompt))
# Check for prior registration
nlp_methods = KeywordSearch(merged_chunks)
eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list)
peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list)
registration_result = nlp_methods.check_registration()
# Evaluate with OpenAI model
total_score, criteria_met, score_percentage, reasoning = pdf_criteria_query.evaluate_with_llm(registration_result, peer_journal_result, eq_journal_result, queries)
# Convert reasoning list to plain text
reasoning_text = "\n".join([f"{idx + 1}. {reason}" for idx, reason in enumerate(reasoning)])
# Generate the score bar HTML
score_bar_html = generate_score_bar(total_score, n_criteria)
scaled_total_score = str(round((total_score / n_criteria) * 100)) + "/100"
output_path = f"/tmp/paper_report_{i+1}.pdf"
create_pdf_report(title, author_result, scaled_total_score, reasoning_text, output_path)
output_files.append(output_path)
# Construct the processing message
processing_message = f"Processing complete. {len(uploaded_files)} reports generated. Please download your reports below."
return processing_message, output_files
# Return the score as a string and the reasoning as HTML
#return str(round((total_score / n_criteria) * 100)) + "/100", score_bar_html, reasoning_html, author_info_html, title_info_html
with gr.Blocks(theme=gr.themes.Glass(
text_size="sm",
font=[gr.themes.GoogleFont("Inconsolata"), "Arial", "sans-serif"],
primary_hue="neutral",
secondary_hue="gray")) as demo:
gr.Markdown("## Med Library")
with gr.Row():
file_upload = gr.File(label="Choose papers", file_types=['.pdf'], file_count="multiple")
with gr.Row():
model_choice = gr.Dropdown(["Model 1", "Model 2"], label="Choose a model", value="Model 1")
submit_button = gr.Button("Evaluate")
processing_message_output = gr.Textbox(label="Processing Status", interactive=False)
report_download_links = gr.File(label="Download Reports", type="filepath", file_count="multiple")
submit_button.click(
fn=process_pdf,
inputs=[file_upload, model_choice],
outputs=[processing_message_output, report_download_links]
)
#Launch the app
demo.launch(share=True, server_name="0.0.0.0", server_port=7860)
# Main route for file upload and display results
# @app.route('/', methods=['GET', 'POST'])
# def upload_and_display_results():
# total_score = 0
# score_percentage = 0
# reasoning = []
# criteria_met = 0
# if request.method == 'POST':
# # Check if the post request has the file part
# if 'file' not in request.files:
# flash('No file part')
# return redirect(request.url)
# file = request.files['file']
# # If user does not select file, browser also submits an empty part without filename
# if file.filename == '':
# flash('No selected file')
# return redirect(request.url)
# if file and allowed_file(file.filename):
# try:
# # Process the PDF file
# pdf_processor = PDFProcessor_Unstructured(pdf_processing_config)
# merged_chunks, tables = pdf_processor.process_pdf_file(file)
# documents = [Document(text=t) for t in merged_chunks]
# # LLM Model choice
# try:
# if llm_model == "gpt-4" or llm_model == "gpt-3.5-turbo":
# llm = OpenAI(model=llm_model, temperature=model_temperature, max_tokens=output_token_size)
# elif llm_model == "mistralai/Mixtral-8x7B-Instruct-v0.1":
# if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]):
# raise ValueError("All parameters are required for Mistral LLM.")
# llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size,
# temperature=model_temperature, model_name=llm_model, api_key=hf_token)
# else:
# raise ValueError(f"Unsupported language model: {llm_model}")
# except Exception as e:
# logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True)
# raise # Or handle the exception as needed
# # Embedding model choice for RAG
# try:
# if embed == "openai":
# embed_model = OpenAIEmbedding()
# elif embed == "huggingface":
# if embed_model_name is None:
# # Set to default model if name not provided
# embed_model_name = "BAAI/bge-small-en-v1.5"
# embed_model = HuggingFaceEmbedding(embed_model_name)
# else:
# # Use the specified model name
# embed_model = HuggingFaceEmbedding(embed_model_name)
# else:
# raise ValueError(f"Unsupported embedding model: {embed_model}")
# except Exception as e:
# logger.error(f"Error initializing embedding model: {e}", exc_info=True)
# raise
# # Prompts and Queries
# utils = base_utils()
# general_prompt = utils.read_from_file(general_prompt_path)
# info_prompt = utils.read_from_file(info_prompt_path)
# peer_review_journals = utils.read_from_file(peer_review_journals_path)
# eq_network_journals = utils.read_from_file(eq_network_journals_path)
# peer_review_journals_list = peer_review_journals.split('\n')
# eq_network_journals_list = eq_network_journals.split('\n')
# modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?"
# pdf_info_query = PDFQueryEngine(documents, llm, embed_model, (info_prompt))
# info_query_engine = pdf_info_query.setup_query_engine()
# journal_result = info_query_engine.query(modified_journal_query).response
# pdf_criteria_query = PDFQueryEngine(documents, llm, embed_model, (general_prompt))
# # Check for prior registration
# nlp_methods = KeywordSearch(merged_chunks)
# eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list)
# peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list)
# registration_result = nlp_methods.check_registration()
# # Evaluate with OpenAI model
# total_score, criteria_met, score_percentage, reasoning = pdf_criteria_query.evaluate_with_llm(registration_result, peer_journal_result, eq_journal_result, queries)
# except Exception as e:
# logging.exception("An error occurred while processing the file.")
# # Consider adding a user-friendly message or redirect
# flash('An error occurred while processing the file.')
# return redirect(request.url)
# return render_template('index.html',
# total_score = total_score,
# score_percentage = score_percentage,
# criteria_met = criteria_met,
# reasoning = reasoning)
# if __name__ == '__main__':
# app.run(debug=True)