########################################################################## # app.py - Pennwick PDF Chat # # HuggingFace Spaces application to anlayze uploaded PDF files # with open-source models ( hkunlp/instructor-xl ) # # Mike Pastor February 16, 2024 import streamlit as st from streamlit.components.v1 import html from dotenv import load_dotenv from PyPDF2 import PdfReader from PIL import Image # Local file from htmlTemplates import css, bot_template, user_template # from langchain.embeddings import HuggingFaceInstructEmbeddings from langchain_community.embeddings import HuggingFaceInstructEmbeddings # from langchain.vectorstores import FAISS from langchain_community.vectorstores import FAISS from langchain.text_splitter import CharacterTextSplitter from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationalRetrievalChain # from langchain.llms import HuggingFaceHub from langchain_community.llms import HuggingFaceHub def extract_pdf_text(pdf_docs): text = "" for pdf in pdf_docs: pdf_reader = PdfReader(pdf) for page in pdf_reader.pages: text += page.extract_text() return text # Chunk size and overlap must not exceed the models capacity! # def extract_bitesize_pieces(text): text_splitter = CharacterTextSplitter( separator="\n", chunk_size=800, # 1000 chunk_overlap=200, length_function=len ) chunks = text_splitter.split_text(text) return chunks def prepare_embedding_vectors(text_chunks): st.write('Here in vector store....', unsafe_allow_html=True) # embeddings = OpenAIEmbeddings() # pip install InstructorEmbedding # pip install sentence-transformers==2.2.2 embeddings = HuggingFaceInstructEmbeddings(model_name="hkunlp/instructor-xl") st.write('Here in vector store - got embeddings ', unsafe_allow_html=True) # from InstructorEmbedding import INSTRUCTOR # model = INSTRUCTOR('hkunlp/instructor-xl') # sentence = "3D ActionSLAM: wearable person tracking in multi-floor environments" # instruction = "Represent the Science title:" # embeddings = model.encode([[instruction, sentence]]) # embeddings = model.encode(text_chunks) print('have Embeddings: ') # text_chunks="this is a test" # FAISS, Chroma and other vector databases # vectorstore = FAISS.from_texts(texts=text_chunks, embedding=embeddings) st.write('FAISS succeeds: ') return vectorstore def prepare_conversation(vectorstore): # llm = ChatOpenAI() # llm = HuggingFaceHub(repo_id="google/flan-t5-xxl", model_kwargs={"temperature":0.5, "max_length":512}) # google/bigbird-roberta-base facebook/bart-large llm = HuggingFaceHub(repo_id="google/flan-t5-xxl", model_kwargs={"temperature": 0.5, "max_length": 512}) memory = ConversationBufferMemory( memory_key='chat_history', return_messages=True) conversation_chain = ConversationalRetrievalChain.from_llm( llm=llm, retriever=vectorstore.as_retriever(), memory=memory, ) return conversation_chain def process_user_question(user_question): print('process_user_question called: \n') if user_question == None : print('question is null') return if user_question == '' : print('question is blank') return if st == None : print('session is null') return if st.session_state == None : print('session STATE is null') return print('question is: ', user_question) print('\nsession is: ', st ) response = st.session_state.conversation({'question': user_question}) # response = st.session_state.conversation({'summarization': user_question}) st.session_state.chat_history = response['chat_history'] # st.empty() results_string = "" for i, message in enumerate(st.session_state.chat_history): if i % 2 == 0: st.write(user_template.replace( "{{MSG}}", message.content), unsafe_allow_html=True) results_string += ( "

" + message.content + "

" ) else: st.write(bot_template.replace( "{{MSG}}", message.content), unsafe_allow_html=True) results_string += ( "

" + message.content + "

" ) lorem = ( """

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed euismod, nisl nec vulputate lacinia, nunc nisl aliquam mauris, eget aliquet nisl nisl et nunc. Sed euismod, nisl nec vulputate lacinia, nunc nisl aliquam mauris, eget aliquet.

""" * 3 ) # html(lorem, height=100, scrolling=True) html(results_string, height=100, scrolling=True) # with st.container() as scrollable_container: # # Your content to be scrolled goes here # for i in range(100): # st.write(f"Line {i}") # st.markdown(f"""""", unsafe_allow_html=True) ################################################################################### def main(): print( 'Pennwick Starting up...\n') # Load the environment variables - if any load_dotenv() ################################################################################## # st.set_page_config(page_title="Pennwick PDF Analyzer", page_icon=":books:") # im = Image.open("robot_icon.ico") # st.set_page_config(page_title="Pennwick PDF Analyzer", page_icon=im ) # st.set_page_config(page_title="Pennwick PDF Analyzer") import base64 from PIL import Image # Open your image image = Image.open("robot_icon.ico") # Convert image to base64 string with open("robot_icon.ico", "rb") as f: encoded_string = base64.b64encode(f.read()).decode() # Set page config with base64 string st.set_page_config(page_title="Pennwick File Analyzer 2", page_icon=f"data:image/ico;base64,{encoded_string}") print( 'prepared page...\n') ################### st.write(css, unsafe_allow_html=True) if "conversation" not in st.session_state: st.session_state.conversation = None if "chat_history" not in st.session_state: st.session_state.chat_history = None # st.header("Pennwick File Analyzer :books:") st.header("Pennwick File Analyzer 2") user_question = None user_question = st.text_input("Ask the Model a question about your uploaded documents:") if user_question != None: print( 'calling process question', user_question) process_user_question(user_question) # st.write( user_template, unsafe_allow_html=True) # st.write(user_template.replace( "{{MSG}}", "Hello robot!"), unsafe_allow_html=True) # st.write(bot_template.replace( "{{MSG}}", "Hello human!"), unsafe_allow_html=True) with st.sidebar: st.subheader("Your documents") pdf_docs = st.file_uploader( "Upload your PDFs here and click on 'Process'", accept_multiple_files=True) # Upon button press if st.button("Process these files"): with st.spinner("Processing..."): ################################################################# # Track the overall time for file processing into Vectors # # from datetime import datetime global_now = datetime.now() global_current_time = global_now.strftime("%H:%M:%S") st.write("Vectorizing Files - Current Time =", global_current_time) # get pdf text raw_text = extract_pdf_text(pdf_docs) # st.write(raw_text) # # get the text chunks text_chunks = extract_bitesize_pieces(raw_text) # st.write(text_chunks) # # create vector store vectorstore = prepare_embedding_vectors(text_chunks) # # create conversation chain st.session_state.conversation = prepare_conversation(vectorstore) # Mission Complete! global_later = datetime.now() st.write("Files Vectorized - Total EXECUTION Time =", (global_later - global_now), global_later) if __name__ == '__main__': main()