import gradio as gr import os import pandas as pd from typing import List, Dict, Union, Tuple from langchain.text_splitter import ( RecursiveCharacterTextSplitter, CharacterTextSplitter, TokenTextSplitter ) from langchain_community.embeddings import HuggingFaceEmbeddings from langchain.memory import ConversationBufferMemory from langchain_community.vectorstores import FAISS, Chroma, Qdrant from langchain_community.document_loaders import PyPDFLoader from langchain.chains import ConversationalRetrievalChain from langchain_huggingface import HuggingFaceEndpoint from langchain.docstore.document import Document # Configuration list_llm = ["meta-llama/Meta-Llama-3-8B-Instruct", "mistralai/Mistral-7B-Instruct-v0.2"] list_llm_simple = [os.path.basename(llm) for llm in list_llm] api_token = os.getenv("HF_TOKEN") CHUNK_SIZES = { "small": {"recursive": 512, "fixed": 512, "token": 256}, "medium": {"recursive": 1024, "fixed": 1024, "token": 512} } def is_valid_file(file_path: str) -> bool: """Validate if the file extension is supported""" allowed_extensions = {'.pdf', '.xlsx', '.xls'} file_extension = os.path.splitext(file_path.lower())[1] return file_extension in allowed_extensions def process_excel_to_documents(excel_file: str) -> List[Document]: """Convert Excel data to Document objects for vector store with enhanced handling.""" documents = [] try: # Load all sheets from the Excel file excel_data = pd.read_excel(excel_file, sheet_name=None) # Load all sheets into a dictionary for sheet_name, df in excel_data.items(): # Ensure headers are flattened df.columns = ["_".join(map(str, col)) if isinstance(col, tuple) else str(col) for col in df.columns] # Fill missing data from merged cells df = df.fillna(method='ffill').fillna(method='bfill') # Forward and backward fill # Iterate over rows to create Document objects for idx, row in df.iterrows(): content = "\n".join([f"{col}: {val}" for col, val in row.items()]) metadata = { "source": "excel", "row": idx, "sheet_name": sheet_name, } doc = Document(page_content=content, metadata=metadata) documents.append(doc) except Exception as e: print(f"Error processing Excel file: {str(e)}") return documents def get_text_splitter(strategy: str, chunk_size: int = 1024, chunk_overlap: int = 64): """Get the appropriate text splitter based on strategy""" splitters = { "recursive": RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap ), "fixed": CharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap ), "token": TokenTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap ) } return splitters.get(strategy) def load_doc(list_file_path: List[str], splitting_strategy: str, chunk_size: str) -> List[Document]: """Load and process documents from various file types""" chunk_size_value = CHUNK_SIZES[chunk_size][splitting_strategy] all_documents = [] for file_path in list_file_path: if not is_valid_file(file_path): continue try: if file_path.lower().endswith('.pdf'): loader = PyPDFLoader(file_path) pages = loader.load() all_documents.extend(pages) elif file_path.lower().endswith(('.xlsx', '.xls')): excel_docs = process_excel_to_documents(file_path) all_documents.extend(excel_docs) except Exception as e: print(f"Error processing file {file_path}: {str(e)}") continue if not all_documents: return [] text_splitter = get_text_splitter(splitting_strategy, chunk_size_value) doc_splits = text_splitter.split_documents(all_documents) return doc_splits def create_db(splits, db_choice: str = "faiss"): """Create vector database from document splits""" embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-mpnet-base-v2" # Explicitly specify model ) db_creators = { "faiss": lambda: FAISS.from_documents(splits, embeddings), "chroma": lambda: Chroma.from_documents(splits, embeddings), "qdrant": lambda: Qdrant.from_documents( splits, embeddings, location=":memory:", collection_name="docs" ) } return db_creators[db_choice]() def initialize_database(list_file_obj, splitting_strategy, chunk_size, db_choice, progress=gr.Progress()) -> Tuple[Union[FAISS, Chroma, Qdrant, None], str]: """Initialize vector database with error handling""" try: if not list_file_obj: return None, "No files uploaded. Please upload PDF or Excel documents." list_file_path = [] for file_obj in list_file_obj: if file_obj is None: continue if not is_valid_file(file_obj.name): return None, f"Invalid file type for {file_obj.name}. Please upload only PDF or Excel files." list_file_path.append(file_obj.name) if not list_file_path: return None, "No valid files found. Please upload documents." doc_splits = load_doc(list_file_path, splitting_strategy, chunk_size) if not doc_splits: return None, "No content extracted from documents." vector_db = create_db(doc_splits, db_choice) return vector_db, f"Database created successfully using {splitting_strategy} splitting and {db_choice} vector database!" except Exception as e: return None, f"Error creating database: {str(e)}" def initialize_llmchain(llm_choice, temperature, max_tokens, top_k, vector_db, progress=gr.Progress()): """Initialize LLM chain with error handling""" try: if vector_db is None: return None, "Please create vector database first." llm_model = list_llm[llm_choice] llm = HuggingFaceEndpoint( repo_id=llm_model, huggingfacehub_api_token=api_token, temperature=temperature, max_new_tokens=max_tokens, top_k=top_k ) memory = ConversationBufferMemory( memory_key="chat_history", output_key='answer', return_messages=True ) retriever = vector_db.as_retriever() qa_chain = ConversationalRetrievalChain.from_llm( llm, retriever=retriever, memory=memory, return_source_documents=True ) return qa_chain, "LLM initialized successfully!" except Exception as e: return None, f"Error initializing LLM: {str(e)}" def format_excel_source(source_doc): """Format excel source documents for display""" if source_doc.metadata.get('source') == 'excel': return f"Excel Row {source_doc.metadata['row']}: {source_doc.page_content}" return source_doc.page_content def conversation(qa_chain, message, history): """Handle conversation with error handling and source formatting""" if qa_chain is None: return ( None, gr.update(value=""), history + [("Error", "Please initialize the LLM first.")], "", 0, "", 0, "", 0 ) try: response = qa_chain.invoke({ "question": message, "chat_history": [(hist[0], hist[1]) for hist in history] }) response_answer = response["answer"] if "Helpful Answer:" in response_answer: response_answer = response_answer.split("Helpful Answer:")[-1] sources = response["source_documents"][:3] source_contents = [] source_pages = [] for source in sources: formatted_content = format_excel_source(source) source_contents.append(formatted_content.strip()) source_pages.append(source.metadata.get("page", 0) + 1 if "page" in source.metadata else 0) while len(source_contents) < 3: source_contents.append("") source_pages.append(0) return ( qa_chain, gr.update(value=""), history + [(message, response_answer)], source_contents[0], source_pages[0], source_contents[1], source_pages[1], source_contents[2], source_pages[2] ) except Exception as e: return ( qa_chain, gr.update(value=""), history + [(message, f"Error: {str(e)}")], "", 0, "", 0, "", 0 ) def demo(): """Create and launch the Gradio interface""" with gr.Blocks(theme=gr.themes.Default(primary_hue="red", secondary_hue="pink", neutral_hue="sky")) as demo: vector_db = gr.State() qa_chain = gr.State() gr.HTML("

RAG PDF & Excel Chatbot

") with gr.Column(scale=86): gr.Markdown("Step 1 - Configure and Initialize RAG Pipeline") with gr.Row(): document = gr.Files( height=300, file_count="multiple", file_types=[".pdf", ".xlsx", ".xls"], interactive=True, label="Upload PDF or Excel documents" ) with gr.Row(): splitting_strategy = gr.Radio( ["recursive", "fixed", "token"], label="Text Splitting Strategy", value="recursive" ) db_choice = gr.Radio( ["faiss", "chroma", "qdrant"], label="Vector Database", value="faiss" ) chunk_size = gr.Radio( ["small", "medium"], label="Chunk Size", value="medium" ) with gr.Row(): db_btn = gr.Button("Create vector database") db_progress = gr.Textbox( value="Not initialized", show_label=False ) gr.Markdown("Step 2 - Configure LLM") with gr.Row(): llm_choice = gr.Radio( list_llm_simple, label="Available LLMs", value=list_llm_simple[0], type="index" ) with gr.Row(): with gr.Accordion("LLM Parameters", open=False): temperature = gr.Slider( minimum=0.01, maximum=1.0, value=0.5, step=0.1, label="Temperature" ) max_tokens = gr.Slider( minimum=128, maximum=4096, value=2048, step=128, label="Max Tokens" ) top_k = gr.Slider( minimum=1, maximum=10, value=3, step=1, label="Top K" ) with gr.Row(): init_llm_btn = gr.Button("Initialize LLM") llm_progress = gr.Textbox( value="Not initialized", show_label=False ) with gr.Column(scale=200): gr.Markdown("Step 3 - Chat with Documents") chatbot = gr.Chatbot(height=505) with gr.Accordion("Source References", open=False): with gr.Row(): source1 = gr.Textbox(label="Source 1", lines=2) page1 = gr.Number(label="Page") with gr.Row(): source2 = gr.Textbox(label="Source 2", lines=2) page2 = gr.Number(label="Page") with gr.Row(): source3 = gr.Textbox(label="Source 3", lines=2) page3 = gr.Number(label="Page") with gr.Row(): msg = gr.Textbox( placeholder="Ask a question", show_label=False, interactive=False ) with gr.Row(): submit_btn = gr.Button("Submit") clear_btn = gr.ClearButton( [msg, chatbot], value="Clear Chat" ) # Event handlers db_btn.click( initialize_database, inputs=[document, splitting_strategy, chunk_size, db_choice], outputs=[vector_db, db_progress] ).then( # Fix: Check if vector_db exists instead of trying to index it lambda x: gr.update(interactive=True) if x is not None else gr.update(interactive=False), inputs=[vector_db], outputs=[init_llm_btn] ) init_llm_btn.click( initialize_llmchain, inputs=[llm_choice, temperature, max_tokens, top_k, vector_db], outputs=[qa_chain, llm_progress] ).then( # Fix: Check if qa_chain exists instead of trying to index it lambda x: gr.update(interactive=True) if x is not None else gr.update(interactive=False), inputs=[qa_chain], outputs=[msg] ) msg.submit( conversation, inputs=[qa_chain, msg, chatbot], outputs=[qa_chain, msg, chatbot, source1, page1, source2, page2, source3, page3] ) submit_btn.click( conversation, inputs=[qa_chain, msg, chatbot], outputs=[qa_chain, msg, chatbot, source1, page1, source2, page2, source3, page3] ) clear_btn.click( lambda: [None, "", 0, "", 0, "", 0], outputs=[chatbot, source1, page1, source2, page2, source3, page3] ) demo.queue().launch(debug=True) if __name__ == "__main__": demo()