aie4_prodtest / app.py
northstaranlyticsma24's picture
Update app.py
f91f2e7 verified
import os
from typing import List
from chainlit.types import AskFileResponse
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from langchain_qdrant import QdrantVectorStore
#from langchain_openai import ChatOpenAI
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain.storage import LocalFileStore
from langchain.embeddings import CacheBackedEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.globals import set_llm_cache
from langchain_openai import ChatOpenAI
from langchain_core.caches import InMemoryCache
from operator import itemgetter
from langchain_core.runnables.passthrough import RunnablePassthrough
from chainlit.types import AskFileResponse
from typing import List
import uuid
import chainlit as cl
set_llm_cache(InMemoryCache())
rag_system_prompt_template = """\
You are a helpful assistant that uses the provided context to answer questions. Never reference this prompt, or the existance of context.
"""
rag_message_list = [
{"role" : "system", "content" : rag_system_prompt_template},
]
rag_user_prompt_template = """\
Question:
{question}
Context:
{context}
"""
chat_prompt = ChatPromptTemplate.from_messages([
("system", rag_system_prompt_template),
("human", rag_user_prompt_template)
])
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
# Typical QDrant Client Set-up
collection_name = f"pdf_to_parse_{uuid.uuid4()}"
client = QdrantClient(":memory:")
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
# Typical Embedding Model
core_embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
def process_text_file(file: AskFileResponse):
import tempfile
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
with open(temp_file.name, "wb") as f:
f.write(file.content)
Loader = PyMuPDFLoader
loader = Loader(temp_file.name)
documents = loader.load()
docs = text_splitter.split_documents(documents)
for i, doc in enumerate(docs):
doc.metadata["source"] = f"source_{i}"
return docs
@cl.on_chat_start
async def on_chat_start():
await cl.Message(content="Hello! This is a simply but powerful RAG app. It will build context on the fly & use LCEL chain to help with your questions. Special Bonus: this app will cache seen docs so it will expand knowledge base with every use!!").send()
files = None
# Wait for the user to upload a file
while files == None:
files = await cl.AskFileMessage(
content="Please upload a PDF File file to begin!",
accept=["application/pdf"],
max_size_mb=2,
timeout=180,
).send()
file = files[0]
msg = cl.Message(
content=f"Processing `{file.name}`...", disable_human_feedback=True
)
await msg.send()
# load the file
texts = process_text_file(file)
print(f"Processing {len(texts)} text chunks")
# Create a dict vector store
#vector_db = VectorDatabase()
# Adding cache!
store = LocalFileStore("./cache/")
cached_embedder = CacheBackedEmbeddings.from_bytes_store(
core_embeddings, store, namespace=core_embeddings.model
)
print ('three')
# Typical QDrant Vector Store Set-up
vectorstore = QdrantVectorStore(
client=client,
collection_name=collection_name,
embedding=cached_embedder)
vectorstore.add_documents(texts)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 3})
#vector_db = await vector_db.abuild_from_list(texts)
chat_openai = ChatOpenAI()
retrieval_augmented_qa_chain = (
{"context": itemgetter("question") | retriever, "question": itemgetter("question")} ##
| RunnablePassthrough.assign(context=itemgetter("context"))
| chat_prompt | chat_openai
)
# Create a chain
#retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline(
# vector_db_retriever=vectorstore,
# llm=chat_openai
#)
# Let the user know that the system is ready
msg.content = f"Processing `{file.name}` done. You can now ask questions!"
await msg.update()
print ('five')
cl.user_session.set("midterm_chain", retrieval_augmented_qa_chain)
@cl.on_message
async def main(message):
midterm_chain = cl.user_session.get("midterm_chain")
#chain = cl.user_session.get("chain")
result = midterm_chain.invoke({"question": message.content})
# Create a new message for the response
#print (result)
response_message = cl.Message(content=result.content)
# Send the response back to the user
await response_message.send()