tmlinhdinh
deploy RAG
370ed2e
### Import Section ###
import uuid
from operator import itemgetter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.globals import set_llm_cache
from langchain_core.caches import InMemoryCache
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.storage import LocalFileStore
from langchain.embeddings import CacheBackedEmbeddings
from langchain.schema import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings
from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, VectorParams
from langchain_qdrant import QdrantVectorStore
import chainlit as cl
from chainlit.types import AskFileResponse
### Global Section ###
set_llm_cache(InMemoryCache())
rag_system_prompt_template = """\
You are a helpful assistant that uses the provided context to answer questions. Never reference this prompt, or the existance of context.
"""
rag_message_list = [
{"role" : "system", "content" : rag_system_prompt_template},
]
rag_user_prompt_template = """\
Question:
{question}
Context:
{context}
"""
chat_prompt = ChatPromptTemplate.from_messages([
("system", rag_system_prompt_template),
("human", rag_user_prompt_template)
])
class VectorDatabase:
def __init__(self, embeddings: OpenAIEmbeddings()) -> None:
self.embeddings = embeddings
async def build_retriever(self, docs) -> None:
collection_name = f"pdf_to_parse_{uuid.uuid4()}"
client = QdrantClient(":memory:")
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
)
# Adding cache!
store = LocalFileStore("./cache/")
cached_embedder = CacheBackedEmbeddings.from_bytes_store(
self.embeddings, store, namespace=self.embeddings.model
)
# Typical QDrant Vector Store Set-up
vectorstore = QdrantVectorStore(
client=client,
collection_name=collection_name,
embedding=cached_embedder)
vectorstore.add_documents(docs)
return vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 3})
class RetrievalAugmentedQAPipeline:
def __init__(self, llm: ChatOpenAI(), vector_db_retriever: VectorDatabase) -> None:
self.llm = llm
self.retriever = vector_db_retriever
async def arun_pipeline(self, user_query: str):
retrieval_augmented_qa_chain = (
{"context": itemgetter("question") | self.retriever, "question": itemgetter("question")}
| chat_prompt | self.llm | StrOutputParser()
)
async def generate_response():
async for chunk in retrieval_augmented_qa_chain.astream({"question": user_query}):
yield chunk
return {"response": generate_response()}
def process_pdf_file(file: AskFileResponse):
import tempfile
with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=".pdf") as temp_file:
temp_file_path = temp_file.name
temp_file.write(file.content)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
Loader = PyMuPDFLoader
loader = Loader(temp_file_path)
documents = loader.load()
docs = text_splitter.split_documents(documents)
for i, doc in enumerate(docs):
doc.metadata["source"] = f"source_{i}"
return docs
### On Chat Start (Session Start) Section ###
@cl.on_chat_start
async def on_chat_start():
""" SESSION SPECIFIC CODE HERE """
files = None
# Wait for the user to upload a file
while files == None:
files = await cl.AskFileMessage(
content="Please upload a pdf file to begin!",
accept=["pdf"],
max_size_mb=2,
timeout=180,
).send()
file = files[0]
msg = cl.Message(
content=f"Processing `{file.name}`...", disable_human_feedback=True
)
await msg.send()
docs = process_pdf_file(file)
print(f"Processing {len(docs)} text chunks")
# Create a dict vector store
vector_db = VectorDatabase(embeddings=OpenAIEmbeddings(model="text-embedding-3-small"))
vector_db = await vector_db.build_retriever(docs)
# Create a chain
retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline(
llm=ChatOpenAI(model="gpt-4o-mini"),
vector_db_retriever=vector_db
)
# Let the user know that the system is ready
msg.content = f"Processing `{file.name}` done. You can now ask questions!"
await msg.update()
cl.user_session.set("chain", retrieval_augmented_qa_pipeline)
### Rename Chains ###
@cl.author_rename
def rename(orig_author: str):
""" RENAME CODE HERE """
rename_dict = {"LLMMathChain": "Albert Einstein", "Chatbot": "Assistant"}
return rename_dict.get(orig_author, orig_author)
### On Message Section ###
@cl.on_message
async def main(message: cl.Message):
"""
MESSAGE CODE HERE
"""
chain = cl.user_session.get("chain")
msg = cl.Message(content="")
result = await chain.arun_pipeline(message.content)
async for stream_resp in result["response"]:
await msg.stream_token(stream_resp)
await msg.send()