import os import re import json import getpass import logging import openai import asyncio import pandas as pd from typing import Any, List, Tuple, Dict import gradio as gr import llama_index from fpdf import FPDF from llama_index import Document from llama_index.llms import OpenAI from llama_index.embeddings import OpenAIEmbedding, HuggingFaceEmbedding from llama_index.llms import HuggingFaceLLM import requests from RAG_utils import PDFProcessor_Unstructured, PDFQueryEngine, HybridRetriever, MixtralLLM, KeywordSearch, base_utils, ConfigManager # Configure basic logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Create a logger object logger = logging.getLogger(__name__) os.environ["TOKENIZERS_PARALLELISM"] = "false" config_manager = ConfigManager() #config_manager.load_config("api", "Config/api_config.json") config_manager.load_config("model", "model_config.json") openai.api_key = os.environ['OPENAI_API_KEY'] #config_manager.get_config_value("api", "OPENAI_API_KEY") hf_token = os.environ['HF_TOKEN']#config_manager.get_config_value("api", "HF_TOKEN") # PDF rendering and chunking parameters pdf_processing_config = config_manager.get_config_value("model", "pdf_processing") ALLOWED_EXTENSIONS = config_manager.get_config_value("model", "allowed_extensions") embed = config_manager.get_config_value("model", "embeddings") embed_model_name = config_manager.get_config_value("model", "embeddings_model") #llm_model = config_manager.get_config_value("model", "llm_model") model_temperature = config_manager.get_config_value("model", "model_temp") output_token_size = config_manager.get_config_value("model", "max_tokens") model_context_window = config_manager.get_config_value("model", "context_window") gpt_prompt_path = config_manager.get_config_value("model","GPT_PROMPT_PATH") mistral_prompt_path = config_manager.get_config_value("model","MISTRAL_PROMPT_PATH") info_prompt_path = config_manager.get_config_value("model", "INFO_PROMPT_PATH") peer_review_journals_path = config_manager.get_config_value("model", "peer_review_journals_path") eq_network_journals_path = config_manager.get_config_value("model", "eq_network_journals_path") queries = config_manager.get_config_value("model", "queries") criteria = config_manager.get_config_value("model", "criteria") num_criteria = len(queries) author_query = config_manager.get_config_value("model", "author_query") journal_query = config_manager.get_config_value("model", "journal_query") # Assuming you have already set up logging as described earlier logger = logging.getLogger(__name__) # Get the current working directory current_working_directory = os.getcwd() # Log the current working directory logger.info(f"The current working directory is: {current_working_directory}") # Helper function to check if the file extension is allowed def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def generate_score_bar(score, num_criteria): # Convert and round the score from a 9-point scale to a 100-point scale score_out_of_100 = round((score / num_criteria) * 100) # Determine the color and text based on the original score if score == 9: color = "#4CAF50" # green text = "Very good" elif score in [7, 8]: color = "#FFEB3B" # yellow text = "Good" elif score in [5, 6]: color = "#FF9800" # orange text = "Ok" elif score in [3, 4]: color = "#F44336" # red text = "Bad" else: # score < 3 color = "#800000" # maroon text = "Very bad" # Create the HTML for the score bar score_bar_html = f"""
{text}
""" return score_bar_html class PDF(FPDF): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Load the DejaVu font files self.add_font('DejaVu', '', 'DejaVu_Sans/DejaVuSansCondensed.ttf', uni=True) self.add_font('DejaVu', 'B', 'DejaVu_Sans/DejaVuSansCondensed-Bold.ttf', uni=True) self.add_font('DejaVu', 'I', 'DejaVu_Sans/DejaVuSansCondensed-Oblique.ttf', uni=True) def header(self): self.set_font('DejaVu', 'B', 12) self.cell(0, 10, 'Paper Analysis Report', 0, 1, 'C') def footer(self): self.set_y(-15) self.set_font('DejaVu', 'I', 8) self.cell(0, 10, f'Page {self.page_no()}', 0, 0, 'C') import os def create_pdf_report(title, author_info, score, criteria, reasoning_list, output_path): pdf = PDF() pdf.add_page() # Set margins pdf.set_left_margin(10) pdf.set_right_margin(10) # Title pdf.set_font("DejaVu", 'B', 14) pdf.cell(0, 10, "Title:", 0, 1) pdf.set_font("DejaVu", '', 12) pdf.multi_cell(0, 10, title, 0, 1) # Author Information pdf.set_font("DejaVu", 'B', 14) pdf.cell(0, 10, "Author Information:", 0, 1) pdf.set_font("DejaVu", '', 12) pdf.multi_cell(0, 10, author_info, 0, 1) # Score pdf.set_font("DejaVu", 'B', 14) pdf.cell(0, 10, "Score:", 0, 1) pdf.set_font("DejaVu", '', 12) pdf.multi_cell(0, 10, score, 0, 1) # Reasoning - each reasoning with a green heading in bold for heading, reasoning in zip(criteria, reasoning_list): pdf.set_font("DejaVu", 'B', 14) pdf.set_text_color(0, 128, 0) # Green color pdf.multi_cell(0, 10, heading, 0, 1) pdf.set_text_color(0, 0, 0) # Reset to black color pdf.set_font("DejaVu", '', 12) pdf.multi_cell(0, 10, reasoning, 0, 1) # Save the PDF to the specified output path pdf.output(output_path) return output_path # Return the path to the generated report def process_pdf(uploaded_files, llm_model, n_criteria = num_criteria): # Initialize aggregation variables final_score = 0 final_reasoning = [] final_score_bar_html = "" final_author_info_html = "" final_title_info_html = "" output_files = [] for i, uploaded_file in enumerate(uploaded_files): # Process the PDF file file_name_without_extension = os.path.splitext(os.path.basename(uploaded_file))[0] id_number = file_name_without_extension.split('_')[1] pdf_processor = PDFProcessor_Unstructured(pdf_processing_config) merged_chunks, tables, title = pdf_processor.process_pdf_file(uploaded_file) documents = [Document(text=t) for t in merged_chunks] # Prompts and Queries utils = base_utils() info_prompt = utils.read_from_file(info_prompt_path) # LLM Model choice try: if llm_model == "Model 1": llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=output_token_size) general_prompt = utils.read_from_file(gpt_prompt_path) elif llm_model == "Model 2": if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]): raise ValueError("All parameters are required for Mistral LLM.") llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size, temperature=model_temperature, model_name="mistralai/Mixtral-8x7B-Instruct-v0.1", api_key=hf_token) general_prompt = utils.read_from_file(mistral_prompt_path) else: raise ValueError(f"Unsupported language model: {llm_model}") except Exception as e: logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True) raise # Or handle the exception as needed # Embedding model choice for RAG try: if embed == "openai": embed_model = OpenAIEmbedding(model="text-embedding-3-large") elif embed == "huggingface": # Use the specified model name embed_model = HuggingFaceEmbedding(embed_model_name) else: raise ValueError(f"Unsupported embedding model: {embed_model}") except Exception as e: logger.error(f"Error initializing embedding model: {e}", exc_info=True) raise peer_review_journals = utils.read_from_file(peer_review_journals_path) eq_network_journals = utils.read_from_file(eq_network_journals_path) peer_review_journals_list = peer_review_journals.split('\n') eq_network_journals_list = eq_network_journals.split('\n') modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?" info_llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=100) pdf_info_query = PDFQueryEngine(documents, info_llm, embed_model, (info_prompt)) info_query_engine = pdf_info_query.setup_query_engine() journal_result = info_query_engine.query(modified_journal_query).response author_result = info_query_engine.query(author_query).response pdf_criteria_query = PDFQueryEngine(documents, llm, embed_model, (general_prompt)) # Check for prior registration nlp_methods = KeywordSearch(merged_chunks) eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list) peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list) registration_result = nlp_methods.check_registration() # Evaluate with OpenAI model total_score, criteria_met, score_percentage, score_list, reasoning = pdf_criteria_query.evaluate_with_llm(registration_result, peer_journal_result, eq_journal_result, queries) try: # Define the path to your CSV file csv_file_path = '/home/user/app/storing_output.csv' logger.info("CSV file path: %s", csv_file_path) # Create a dictionary for the new row new_row = { 'Id': id_number, 'Title': title, 'Author': author_result } new_row.update({f'score_cr_{i}': score for i, score in enumerate(score_list, 1)}) new_row.update({f'reasoning_cr_{i}': reasoning for i, reasoning in enumerate(reasoning, 1)}) # Convert new_row dictionary to a DataFrame for easy appending new_row_df = pd.DataFrame([new_row]) logger.info("New row DataFrame:\n%s", new_row_df) # Check if the CSV file exists if os.path.exists(csv_file_path): # Load the existing data logger.info("CSV file exists. Loading existing data.") df = pd.read_csv(csv_file_path) else: # Or create a new DataFrame if the file does not exist logger.info("CSV file does not exist. Creating a new DataFrame.") columns = ['Id', 'Title', 'Author'] + [f'score_cr_{i}' for i in range(1, 10)] + [f'reasoning_cr_{i}' for i in range(1, 10)] df = pd.DataFrame(columns=columns) # Append the new data using pd.concat df = pd.concat([df, new_row_df], ignore_index=True) # Save the updated DataFrame back to CSV df.to_csv(csv_file_path, index=False) logger.info(f"Updated data saved to {csv_file_path}.") except Exception as e: logger.info(f"An error occurred: {e}") # Generate the score bar HTML score_bar_html = generate_score_bar(total_score, n_criteria) scaled_total_score = str(round((total_score / n_criteria) * 100)) + "/100" output_dir="/tmp" base_name = os.path.splitext(uploaded_file)[0] output_path = os.path.join(output_dir, f"{base_name}_report.pdf") create_pdf_report(title, author_result, scaled_total_score, criteria, reasoning, output_path) output_files.append(output_path) # Construct the processing message processing_message = f"Processing complete. {len(uploaded_files)} reports generated. Please download your reports below." return processing_message, output_files with gr.Blocks(theme=gr.themes.Glass( text_size="sm", font=[gr.themes.GoogleFont("Inconsolata"), "Arial", "sans-serif"], primary_hue="neutral", secondary_hue="gray")) as demo: gr.Markdown("## Med Library") with gr.Row(): file_upload = gr.File(label="Choose papers", file_types=['.pdf'], file_count="multiple") with gr.Row(): model_choice = gr.Dropdown(["Model 1", "Model 2"], label="Choose a model", value="Model 1") submit_button = gr.Button("Evaluate") processing_message_output = gr.Textbox(label="Processing Status", interactive=False) report_download_links = gr.File(label="Download Reports", type="filepath", file_count="multiple") submit_button.click( fn=process_pdf, inputs=[file_upload, model_choice], outputs=[processing_message_output, report_download_links] ) #Launch the app demo.launch(share=True, server_name="0.0.0.0", server_port=7860) # Main route for file upload and display results # @app.route('/', methods=['GET', 'POST']) # def upload_and_display_results(): # total_score = 0 # score_percentage = 0 # reasoning = [] # criteria_met = 0 # if request.method == 'POST': # # Check if the post request has the file part # if 'file' not in request.files: # flash('No file part') # return redirect(request.url) # file = request.files['file'] # # If user does not select file, browser also submits an empty part without filename # if file.filename == '': # flash('No selected file') # return redirect(request.url) # if file and allowed_file(file.filename): # try: # # Process the PDF file # pdf_processor = PDFProcessor_Unstructured(pdf_processing_config) # merged_chunks, tables = pdf_processor.process_pdf_file(file) # documents = [Document(text=t) for t in merged_chunks] # # LLM Model choice # try: # if llm_model == "gpt-4" or llm_model == "gpt-3.5-turbo": # llm = OpenAI(model=llm_model, temperature=model_temperature, max_tokens=output_token_size) # elif llm_model == "mistralai/Mixtral-8x7B-Instruct-v0.1": # if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]): # raise ValueError("All parameters are required for Mistral LLM.") # llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size, # temperature=model_temperature, model_name=llm_model, api_key=hf_token) # else: # raise ValueError(f"Unsupported language model: {llm_model}") # except Exception as e: # logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True) # raise # Or handle the exception as needed # # Embedding model choice for RAG # try: # if embed == "openai": # embed_model = OpenAIEmbedding() # elif embed == "huggingface": # if embed_model_name is None: # # Set to default model if name not provided # embed_model_name = "BAAI/bge-small-en-v1.5" # embed_model = HuggingFaceEmbedding(embed_model_name) # else: # # Use the specified model name # embed_model = HuggingFaceEmbedding(embed_model_name) # else: # raise ValueError(f"Unsupported embedding model: {embed_model}") # except Exception as e: # logger.error(f"Error initializing embedding model: {e}", exc_info=True) # raise # # Prompts and Queries # utils = base_utils() # general_prompt = utils.read_from_file(general_prompt_path) # info_prompt = utils.read_from_file(info_prompt_path) # peer_review_journals = utils.read_from_file(peer_review_journals_path) # eq_network_journals = utils.read_from_file(eq_network_journals_path) # peer_review_journals_list = peer_review_journals.split('\n') # eq_network_journals_list = eq_network_journals.split('\n') # modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?" # pdf_info_query = PDFQueryEngine(documents, llm, embed_model, (info_prompt)) # info_query_engine = pdf_info_query.setup_query_engine() # journal_result = info_query_engine.query(modified_journal_query).response # pdf_criteria_query = PDFQueryEngine(documents, llm, embed_model, (general_prompt)) # # Check for prior registration # nlp_methods = KeywordSearch(merged_chunks) # eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list) # peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list) # registration_result = nlp_methods.check_registration() # # Evaluate with OpenAI model # total_score, criteria_met, score_percentage, reasoning = pdf_criteria_query.evaluate_with_llm(registration_result, peer_journal_result, eq_journal_result, queries) # except Exception as e: # logging.exception("An error occurred while processing the file.") # # Consider adding a user-friendly message or redirect # flash('An error occurred while processing the file.') # return redirect(request.url) # return render_template('index.html', # total_score = total_score, # score_percentage = score_percentage, # criteria_met = criteria_met, # reasoning = reasoning) # if __name__ == '__main__': # app.run(debug=True)