import os import re import json import getpass import logging import openai import asyncio from typing import Any, List, Tuple, Dict import gradio as gr import llama_index from fpdf import FPDF from llama_index import Document from llama_index.llms import OpenAI from llama_index.embeddings import OpenAIEmbedding, HuggingFaceEmbedding from llama_index.llms import HuggingFaceLLM import requests from RAG_utils import PDFProcessor_Unstructured, PDFQueryEngine, HybridRetriever, MixtralLLM, KeywordSearch, base_utils, ConfigManager # Configure basic logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Create a logger object logger = logging.getLogger(__name__) os.environ["TOKENIZERS_PARALLELISM"] = "false" config_manager = ConfigManager() #config_manager.load_config("api", "Config/api_config.json") config_manager.load_config("model", "model_config.json") openai.api_key = os.environ['OPENAI_API_KEY'] #config_manager.get_config_value("api", "OPENAI_API_KEY") hf_token = os.environ['HF_TOKEN']#config_manager.get_config_value("api", "HF_TOKEN") # PDF rendering and chunking parameters pdf_processing_config = config_manager.get_config_value("model", "pdf_processing") ALLOWED_EXTENSIONS = config_manager.get_config_value("model", "allowed_extensions") embed = config_manager.get_config_value("model", "embeddings") embed_model_name = config_manager.get_config_value("model", "embeddings_model") #llm_model = config_manager.get_config_value("model", "llm_model") model_temperature = config_manager.get_config_value("model", "model_temp") output_token_size = config_manager.get_config_value("model", "max_tokens") model_context_window = config_manager.get_config_value("model", "context_window") gpt_prompt_path = config_manager.get_config_value("model","GPT_PROMPT_PATH") mistral_prompt_path = config_manager.get_config_value("model","MISTRAL_PROMPT_PATH") info_prompt_path = config_manager.get_config_value("model", "INFO_PROMPT_PATH") peer_review_journals_path = config_manager.get_config_value("model", "peer_review_journals_path") eq_network_journals_path = config_manager.get_config_value("model", "eq_network_journals_path") queries = config_manager.get_config_value("model", "queries") criteria = config_manager.get_config_value("model", "criteria") num_criteria = len(queries) author_query = config_manager.get_config_value("model", "author_query") journal_query = config_manager.get_config_value("model", "journal_query") # Helper function to check if the file extension is allowed def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def generate_score_bar(score, num_criteria): # Convert and round the score from a 9-point scale to a 100-point scale score_out_of_100 = round((score / num_criteria) * 100) # Determine the color and text based on the original score if score == 9: color = "#4CAF50" # green text = "Very good" elif score in [7, 8]: color = "#FFEB3B" # yellow text = "Good" elif score in [5, 6]: color = "#FF9800" # orange text = "Ok" elif score in [3, 4]: color = "#F44336" # red text = "Bad" else: # score < 3 color = "#800000" # maroon text = "Very bad" # Create the HTML for the score bar score_bar_html = f"""
{text}
""" return score_bar_html class PDF(FPDF): def header(self): self.set_font('Arial', 'B', 12) self.cell(0, 10, 'Paper Analysis Report', 0, 1, 'C') def footer(self): self.set_y(-15) self.set_font('Arial', 'I', 8) self.cell(0, 10, f'Page {self.page_no()}', 0, 0, 'C') def create_pdf_report(title, author_info, score, reasoning_html, output_path): pdf = PDF() pdf.add_page() pdf.set_font("Arial", size=12) pdf.cell(0, 10, f"Title: {title}", 0, 1) pdf.cell(0, 10, f"Author Information: {author_info}", 0, 1) pdf.cell(0, 10, f"Score: {score}", 0, 1) pdf.multi_cell(0, 10, f"Reasoning: \n{reasoning_html}") pdf.output(output_path) def process_pdf(uploaded_files, llm_model, n_criteria = num_criteria): # Initialize aggregation variables final_score = 0 final_reasoning = [] final_score_bar_html = "" final_author_info_html = "" final_title_info_html = "" output_files = [] for i, uploaded_file in enumerate(uploaded_files): # Process the PDF file pdf_processor = PDFProcessor_Unstructured(pdf_processing_config) merged_chunks, tables, title = pdf_processor.process_pdf_file(uploaded_file) documents = [Document(text=t) for t in merged_chunks] # Prompts and Queries utils = base_utils() info_prompt = utils.read_from_file(info_prompt_path) # LLM Model choice try: if llm_model == "Model 1": llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=output_token_size) general_prompt = utils.read_from_file(gpt_prompt_path) elif llm_model == "Model 2": if any(param is None for param in [model_context_window, output_token_size, model_temperature, hf_token]): raise ValueError("All parameters are required for Mistral LLM.") llm = MixtralLLM(context_window=model_context_window, num_output=output_token_size, temperature=model_temperature, model_name="mistralai/Mixtral-8x7B-Instruct-v0.1", api_key=hf_token) general_prompt = utils.read_from_file(mistral_prompt_path) else: raise ValueError(f"Unsupported language model: {llm_model}") except Exception as e: logger.error(f"Error initializing language model '{llm_model}': {e}", exc_info=True) raise # Or handle the exception as needed # Embedding model choice for RAG try: if embed == "openai": embed_model = OpenAIEmbedding(model="text-embedding-3-large") elif embed == "huggingface": # Use the specified model name embed_model = HuggingFaceEmbedding(embed_model_name) else: raise ValueError(f"Unsupported embedding model: {embed_model}") except Exception as e: logger.error(f"Error initializing embedding model: {e}", exc_info=True) raise peer_review_journals = utils.read_from_file(peer_review_journals_path) eq_network_journals = utils.read_from_file(eq_network_journals_path) peer_review_journals_list = peer_review_journals.split('\n') eq_network_journals_list = eq_network_journals.split('\n') modified_journal_query = "Is the given research paper published in any of the following journals: " + ", ".join(peer_review_journals_list) + "?" info_llm = OpenAI(model="gpt-4-1106-preview", temperature=model_temperature, max_tokens=100) pdf_info_query = PDFQueryEngine(documents, info_llm, embed_model, (info_prompt)) info_query_engine = pdf_info_query.setup_query_engine() journal_result = info_query_engine.query(modified_journal_query).response author_result = info_query_engine.query(author_query).response pdf_criteria_query = PDFQueryEngine(documents, llm, embed_model, (general_prompt)) # Check for prior registration nlp_methods = KeywordSearch(merged_chunks) eq_journal_result = nlp_methods.find_journal_name(journal_result, eq_network_journals_list) peer_journal_result = nlp_methods.find_journal_name(journal_result, peer_review_journals_list) registration_result = nlp_methods.check_registration() # Evaluate with OpenAI model total_score, criteria_met, score_percentage, reasoning = pdf_criteria_query.evaluate_with_llm(registration_result, peer_journal_result, eq_journal_result, queries) reasoning_html = "