CoolApp / app.py
rchrdgwr's picture
after help
e1afdac
raw
history blame
4.61 kB
import os
from typing import List
from chainlit.types import AskFileResponse
from aimakerspace.text_utils import CharacterTextSplitter, TextFileLoader
from aimakerspace.openai_utils.prompts import (
UserRolePrompt,
SystemRolePrompt,
AssistantRolePrompt,
)
from aimakerspace.openai_utils.embedding import EmbeddingModel
from aimakerspace.vectordatabase import VectorDatabase
from aimakerspace.openai_utils.chatmodel import ChatOpenAI
import chainlit as cl
import fitz
system_template = """\
Use the following context to answer a users question. If you cannot find the answer in the context, say you don't know the answer."""
system_role_prompt = SystemRolePrompt(system_template)
user_prompt_template = """\
Context:
{context}
Question:
{question}
"""
user_role_prompt = UserRolePrompt(user_prompt_template)
class RetrievalAugmentedQAPipeline:
def __init__(self, llm, vector_db_retriever: VectorDatabase) -> None:
self.llm = llm
self.vector_db_retriever = vector_db_retriever
async def arun_pipeline(self, user_query: str):
context_list = self.vector_db_retriever.search_by_text(user_query, k=4)
context_prompt = ""
for context in context_list:
context_prompt += context[0] + "\n"
formatted_system_prompt = system_role_prompt.create_message()
formatted_user_prompt = user_role_prompt.create_message(question=user_query, context=context_prompt)
async def generate_response():
async for chunk in self.llm.astream([formatted_system_prompt, formatted_user_prompt]):
yield chunk
return {"response": generate_response(), "context": context_list}
text_splitter = CharacterTextSplitter()
def process_text_file(file: AskFileResponse):
import tempfile
file_extension = os.path.splitext(file.name)[1].lower()
with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=file_extension) as temp_file:
temp_file_path = temp_file.name
temp_file.write(file.content)
if file_extension == ".txt":
with open(temp_file_path, "r", encoding="utf-8") as f:
text_loader = TextFileLoader(temp_file_path)
documents = text_loader.load_documents()
texts = text_splitter.split_texts(documents)
elif file_extension == ".pdf":
pdf_document = fitz.open(temp_file_path)
documents = []
for page_num in range(len(pdf_document)):
page = pdf_document.load_page(page_num)
text = page.get_text()
documents.append(text)
texts = text_splitter.split_texts(documents)
else:
raise ValueError("Unsupported file type")
return texts
@cl.on_chat_start
async def on_chat_start():
files = None
# Wait for the user to upload a file
while not files:
files = await cl.AskFileMessage(
content="Please upload a .txt or .pdf file to begin processing!",
accept=["text/plain", "application/pdf"],
max_size_mb=2,
timeout=180,
).send()
file = files[0]
msg = cl.Message(
content=f"Processing `{file.name}`...", disable_human_feedback=True
)
await msg.send()
# load the file
texts = process_text_file(file)
msg = cl.Message(
content=f"Resulted in {len(texts)} chunks", disable_human_feedback=True
)
await msg.send()
print(f"Processing {len(texts)} text chunks")
# decide if to use the dict vector store of the Qdrant vector store
use_qdrant = False
# Create a dict vector store
if use_qdrant:
msg = cl.Message(
content="Sorry, qgrant not implemented yet", disable_human_feedback=True
)
await msg.send()
raise NotImplemented()
else:
vector_db = VectorDatabase()
vector_db = await vector_db.abuild_from_list(texts)
chat_openai = ChatOpenAI()
# Create a chain
retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline(
vector_db_retriever=vector_db,
llm=chat_openai
)
# Let the user know that the system is ready
msg.content = f"Processing `{file.name}` done. You can now ask questions!"
await msg.update()
cl.user_session.set("chain", retrieval_augmented_qa_pipeline)
@cl.on_message
async def main(message):
chain = cl.user_session.get("chain")
msg = cl.Message(content="")
result = await chain.arun_pipeline(message.content)
async for stream_resp in result["response"]:
await msg.stream_token(stream_resp)
await msg.send()