Spaces:
Runtime error
Runtime error
import os | |
import re | |
import json | |
import getpass | |
import logging | |
import openai | |
import asyncio | |
import pandas as pd | |
from typing import Any, List, Tuple, Dict | |
import gradio as gr | |
import llama_index | |
from fpdf import FPDF | |
from llama_index import Document | |
from llama_index.llms import OpenAI | |
from llama_index.embeddings import OpenAIEmbedding, HuggingFaceEmbedding | |
from llama_index.llms import HuggingFaceLLM | |
import requests | |
from RAG_utils import PDFProcessor_Unstructured, PDFQueryEngine, HybridRetriever, MixtralLLM, KeywordSearch, base_utils, ConfigManager | |
# Configure basic logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# Create a logger object | |
logger = logging.getLogger(__name__) | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
config_manager = ConfigManager() | |
#config_manager.load_config("api", "Config/api_config.json") | |
config_manager.load_config("model", "model_config.json") | |
openai.api_key = os.environ['OPENAI_API_KEY'] #config_manager.get_config_value("api", "OPENAI_API_KEY") | |
hf_token = os.environ['HF_TOKEN']#config_manager.get_config_value("api", "HF_TOKEN") | |
# PDF rendering and chunking parameters | |
pdf_processing_config = config_manager.get_config_value("model", "pdf_processing") | |
ALLOWED_EXTENSIONS = config_manager.get_config_value("model", "allowed_extensions") | |
embed = config_manager.get_config_value("model", "embeddings") | |
embed_model_name = config_manager.get_config_value("model", "embeddings_model") | |
#llm_model = config_manager.get_config_value("model", "llm_model") | |
model_temperature = config_manager.get_config_value("model", "model_temp") | |
output_token_size = config_manager.get_config_value("model", "max_tokens") | |
model_context_window = config_manager.get_config_value("model", "context_window") | |
gpt_prompt_path = config_manager.get_config_value("model","GPT_PROMPT_PATH") | |
mistral_prompt_path = config_manager.get_config_value("model","MISTRAL_PROMPT_PATH") | |
info_prompt_path = config_manager.get_config_value("model", "INFO_PROMPT_PATH") | |
peer_review_journals_path = config_manager.get_config_value("model", "peer_review_journals_path") | |
eq_network_journals_path = config_manager.get_config_value("model", "eq_network_journals_path") | |
queries = config_manager.get_config_value("model", "queries") | |
criteria = config_manager.get_config_value("model", "criteria") | |
num_criteria = len(queries) | |
author_query = config_manager.get_config_value("model", "author_query") | |
journal_query = config_manager.get_config_value("model", "journal_query") | |
# Assuming you have already set up logging as described earlier | |
logger = logging.getLogger(__name__) | |
# Get the current working directory | |
current_working_directory = os.getcwd() | |
# Log the current working directory | |
logger.info(f"The current working directory is: {current_working_directory}") | |
# Helper function to check if the file extension is allowed | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
def generate_score_bar(score, num_criteria): | |
# Convert and round the score from a 9-point scale to a 100-point scale | |
score_out_of_100 = round((score / num_criteria) * 100) | |
# Determine the color and text based on the original score | |
if score == 9: | |
color = "#4CAF50" # green | |
text = "Very good" | |
elif score in [7, 8]: | |
color = "#FFEB3B" # yellow | |
text = "Good" | |
elif score in [5, 6]: | |
color = "#FF9800" # orange | |
text = "Ok" | |
elif score in [3, 4]: | |
color = "#F44336" # red | |
text = "Bad" | |
else: # score < 3 | |
color = "#800000" # maroon | |
text = "Very bad" | |
# Create the HTML for the score bar | |
score_bar_html = f""" | |
<div style="background-color: #ddd; border-radius: 10px; position: relative; height: 20px; width: 100%;"> | |
<div style="background-color: {color}; height: 100%; border-radius: 10px; width: {score_out_of_100}%;"></div> | |
</div> | |
<p style="color: {color};">{text}</p> <!-- Display the text --> | |
""" | |
return score_bar_html | |
class PDF(FPDF): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
# Load the DejaVu font files | |
self.add_font('DejaVu', '', 'DejaVu_Sans/DejaVuSansCondensed.ttf', uni=True) | |
self.add_font('DejaVu', 'B', 'DejaVu_Sans/DejaVuSansCondensed-Bold.ttf', uni=True) | |
self.add_font('DejaVu', 'I', 'DejaVu_Sans/DejaVuSansCondensed-Oblique.ttf', uni=True) | |
def header(self): | |
self.set_font('DejaVu', 'B', 12) | |
self.cell(0, 10, 'Paper Analysis Report', 0, 1, 'C') | |
def footer(self): | |
self.set_y(-15) | |
self.set_font('DejaVu', 'I', 8) | |
self.cell(0, 10, f'Page {self.page_no()}', 0, 0, 'C') | |
import os | |
def create_pdf_report(title, author_info, score, criteria, reasoning_list, output_path): | |
pdf = PDF() | |
pdf.add_page() | |
# Set margins | |
pdf.set_left_margin(10) | |
pdf.set_right_margin(10) | |
# Title | |
pdf.set_font("DejaVu", 'B', 14) | |
pdf.cell(0, 10, "Title:", 0, 1) | |
pdf.set_font("DejaVu", '', 12) | |
pdf.multi_cell(0, 10, title, 0, 1) | |
# Author Information | |
pdf.set_font("DejaVu", 'B', 14) | |
pdf.cell(0, 10, "Author Information:", 0, 1) | |
pdf.set_font("DejaVu", '', 12) | |
pdf.multi_cell(0, 10, author_info, 0, 1) | |
# Score | |
pdf.set_font("DejaVu", 'B', 14) | |
pdf.cell(0, 10, "Score:", 0, 1) | |
pdf.set_font("DejaVu", '', 12) | |
pdf.multi_cell(0, 10, score, 0, 1) | |
# Reasoning - each reasoning with a green heading in bold | |
for heading, reasoning in zip(criteria, reasoning_list): | |
pdf.set_font("DejaVu", 'B', 14) | |
pdf.set_text_color(0, 128, 0) # Green color | |
pdf.multi_cell(0, 10, heading, 0, 1) | |
pdf.set_text_color(0, 0, 0) # Reset to black color | |
pdf.set_font("DejaVu", '', 12) | |
pdf.multi_cell(0, 10, reasoning, 0, 1) | |
# Save the PDF to the specified output path | |
pdf.output(output_path) | |
return output_path # Return the path to the generated report | |
def process_pdf(uploaded_files, llm_model, n_criteria = num_criteria): | |
# Initialize aggregation variables | |
final_score = 0 | |
final_reasoning = [] | |
final_score_bar_html = "" | |
final_author_info_html = "" | |
final_title_info_html = "" | |
output_files = [] | |
for i, uploaded_file in enumerate(uploaded_files): | |
# Process the PDF file | |
file_name_without_extension = os.path.splitext(os.path.basename(uploaded_file))[0] | |
id_number = file_name_without_extension.split('_')[1] | |
pdf_processor = PDFProcessor_Unstructured(pdf_processing_config) | |
merged_chunks, tables, title = pdf_processor.process_pdf_file(uploaded_file) | |
documents = [Document(text=t) for t in merged_chunks] | |
# Prompts and Queries | |
utils = base_utils() | |
info_prompt = utils.read_from_file(info_prompt_path) | |
# LLM Model choice | |
try: | |
if llm_model == "Model 1": | |
llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=output_token_size) | |
general_prompt = utils.read_from_file(gpt_prompt_path) | |
elif llm_model == "Model 2": | |
if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]): | |
raise ValueError("All parameters are required for Mistral LLM.") | |
llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size, | |
temperature=model_temperature, model_name="mistralai/Mixtral-8x7B-Instruct-v0.1", api_key=hf_token) | |
general_prompt = utils.read_from_file(mistral_prompt_path) | |
else: | |
raise ValueError(f"Unsupported language model: {llm_model}") | |
except Exception as e: | |
logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True) | |
raise # Or handle the exception as needed | |
# Embedding model choice for RAG | |
try: | |
if embed == "openai": | |
embed_model = OpenAIEmbedding(model="text-embedding-3-large") | |
elif embed == "huggingface": | |
# Use the specified model name | |
embed_model = HuggingFaceEmbedding(embed_model_name) | |
else: | |
raise ValueError(f"Unsupported embedding model: {embed_model}") | |
except Exception as e: | |
logger.error(f"Error initializing embedding model: {e}", exc_info=True) | |
raise | |
peer_review_journals = utils.read_from_file(peer_review_journals_path) | |
eq_network_journals = utils.read_from_file(eq_network_journals_path) | |
peer_review_journals_list = peer_review_journals.split('\n') | |
eq_network_journals_list = eq_network_journals.split('\n') | |
modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?" | |
info_llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=100) | |
pdf_info_query = PDFQueryEngine(documents, info_llm, embed_model, (info_prompt)) | |
info_query_engine = pdf_info_query.setup_query_engine() | |
journal_result = info_query_engine.query(modified_journal_query).response | |
author_result = info_query_engine.query(author_query).response | |
pdf_criteria_query = PDFQueryEngine(documents, llm, embed_model, (general_prompt)) | |
# Check for prior registration | |
nlp_methods = KeywordSearch(merged_chunks) | |
eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list) | |
peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list) | |
registration_result = nlp_methods.check_registration() | |
# Evaluate with OpenAI model | |
total_score, criteria_met, score_percentage, score_list, reasoning = pdf_criteria_query.evaluate_with_llm(registration_result, peer_journal_result, eq_journal_result, queries) | |
try: | |
# Define the path to your CSV file | |
csv_file_path = '/home/user/app/storing_output.csv' | |
logger.info("CSV file path: %s", csv_file_path) | |
# Create a dictionary for the new row | |
new_row = { | |
'Id': id_number, | |
'Title': title, | |
'Author': author_result | |
} | |
new_row.update({f'score_cr_{i}': score for i, score in enumerate(score_list, 1)}) | |
new_row.update({f'reasoning_cr_{i}': reasoning for i, reasoning in enumerate(reasoning, 1)}) | |
# Convert new_row dictionary to a DataFrame for easy appending | |
new_row_df = pd.DataFrame([new_row]) | |
logger.info("New row DataFrame:\n%s", new_row_df) | |
# Check if the CSV file exists | |
if os.path.exists(csv_file_path): | |
# Load the existing data | |
logger.info("CSV file exists. Loading existing data.") | |
df = pd.read_csv(csv_file_path) | |
else: | |
# Or create a new DataFrame if the file does not exist | |
logger.info("CSV file does not exist. Creating a new DataFrame.") | |
columns = ['Id', 'Title', 'Author'] + [f'score_cr_{i}' for i in range(1, 10)] + [f'reasoning_cr_{i}' for i in range(1, 10)] | |
df = pd.DataFrame(columns=columns) | |
# Append the new data using pd.concat | |
df = pd.concat([df, new_row_df], ignore_index=True) | |
# Save the updated DataFrame back to CSV | |
df.to_csv(csv_file_path, index=False) | |
logger.info(f"Updated data saved to {csv_file_path}.") | |
except Exception as e: | |
logger.info(f"An error occurred: {e}") | |
# Generate the score bar HTML | |
score_bar_html = generate_score_bar(total_score, n_criteria) | |
scaled_total_score = str(round((total_score / n_criteria) * 100)) + "/100" | |
output_dir="/tmp" | |
base_name = os.path.splitext(uploaded_file)[0] | |
output_path = os.path.join(output_dir, f"{base_name}_report.pdf") | |
create_pdf_report(title, author_result, scaled_total_score, criteria, reasoning, output_path) | |
output_files.append(output_path) | |
# Construct the processing message | |
processing_message = f"Processing complete. {len(uploaded_files)} reports generated. Please download your reports below." | |
return processing_message, output_files | |
with gr.Blocks(theme=gr.themes.Glass( | |
text_size="sm", | |
font=[gr.themes.GoogleFont("Inconsolata"), "Arial", "sans-serif"], | |
primary_hue="neutral", | |
secondary_hue="gray")) as demo: | |
gr.Markdown("## Med Library") | |
with gr.Row(): | |
file_upload = gr.File(label="Choose papers", file_types=['.pdf'], file_count="multiple") | |
with gr.Row(): | |
model_choice = gr.Dropdown(["Model 1", "Model 2"], label="Choose a model", value="Model 1") | |
submit_button = gr.Button("Evaluate") | |
processing_message_output = gr.Textbox(label="Processing Status", interactive=False) | |
report_download_links = gr.File(label="Download Reports", type="filepath", file_count="multiple") | |
submit_button.click( | |
fn=process_pdf, | |
inputs=[file_upload, model_choice], | |
outputs=[processing_message_output, report_download_links] | |
) | |
#Launch the app | |
demo.launch(share=True, server_name="0.0.0.0", server_port=7860) | |
# Main route for file upload and display results | |
# @app.route('/', methods=['GET', 'POST']) | |
# def upload_and_display_results(): | |
# total_score = 0 | |
# score_percentage = 0 | |
# reasoning = [] | |
# criteria_met = 0 | |
# if request.method == 'POST': | |
# # Check if the post request has the file part | |
# if 'file' not in request.files: | |
# flash('No file part') | |
# return redirect(request.url) | |
# file = request.files['file'] | |
# # If user does not select file, browser also submits an empty part without filename | |
# if file.filename == '': | |
# flash('No selected file') | |
# return redirect(request.url) | |
# if file and allowed_file(file.filename): | |
# try: | |
# # Process the PDF file | |
# pdf_processor = PDFProcessor_Unstructured(pdf_processing_config) | |
# merged_chunks, tables = pdf_processor.process_pdf_file(file) | |
# documents = [Document(text=t) for t in merged_chunks] | |
# # LLM Model choice | |
# try: | |
# if llm_model == "gpt-4" or llm_model == "gpt-3.5-turbo": | |
# llm = OpenAI(model=llm_model, temperature=model_temperature, max_tokens=output_token_size) | |
# elif llm_model == "mistralai/Mixtral-8x7B-Instruct-v0.1": | |
# if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]): | |
# raise ValueError("All parameters are required for Mistral LLM.") | |
# llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size, | |
# temperature=model_temperature, model_name=llm_model, api_key=hf_token) | |
# else: | |
# raise ValueError(f"Unsupported language model: {llm_model}") | |
# except Exception as e: | |
# logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True) | |
# raise # Or handle the exception as needed | |
# # Embedding model choice for RAG | |
# try: | |
# if embed == "openai": | |
# embed_model = OpenAIEmbedding() | |
# elif embed == "huggingface": | |
# if embed_model_name is None: | |
# # Set to default model if name not provided | |
# embed_model_name = "BAAI/bge-small-en-v1.5" | |
# embed_model = HuggingFaceEmbedding(embed_model_name) | |
# else: | |
# # Use the specified model name | |
# embed_model = HuggingFaceEmbedding(embed_model_name) | |
# else: | |
# raise ValueError(f"Unsupported embedding model: {embed_model}") | |
# except Exception as e: | |
# logger.error(f"Error initializing embedding model: {e}", exc_info=True) | |
# raise | |
# # Prompts and Queries | |
# utils = base_utils() | |
# general_prompt = utils.read_from_file(general_prompt_path) | |
# info_prompt = utils.read_from_file(info_prompt_path) | |
# peer_review_journals = utils.read_from_file(peer_review_journals_path) | |
# eq_network_journals = utils.read_from_file(eq_network_journals_path) | |
# peer_review_journals_list = peer_review_journals.split('\n') | |
# eq_network_journals_list = eq_network_journals.split('\n') | |
# modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?" | |
# pdf_info_query = PDFQueryEngine(documents, llm, embed_model, (info_prompt)) | |
# info_query_engine = pdf_info_query.setup_query_engine() | |
# journal_result = info_query_engine.query(modified_journal_query).response | |
# pdf_criteria_query = PDFQueryEngine(documents, llm, embed_model, (general_prompt)) | |
# # Check for prior registration | |
# nlp_methods = KeywordSearch(merged_chunks) | |
# eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list) | |
# peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list) | |
# registration_result = nlp_methods.check_registration() | |
# # Evaluate with OpenAI model | |
# total_score, criteria_met, score_percentage, reasoning = pdf_criteria_query.evaluate_with_llm(registration_result, peer_journal_result, eq_journal_result, queries) | |
# except Exception as e: | |
# logging.exception("An error occurred while processing the file.") | |
# # Consider adding a user-friendly message or redirect | |
# flash('An error occurred while processing the file.') | |
# return redirect(request.url) | |
# return render_template('index.html', | |
# total_score = total_score, | |
# score_percentage = score_percentage, | |
# criteria_met = criteria_met, | |
# reasoning = reasoning) | |
# if __name__ == '__main__': | |
# app.run(debug=True) | |