import os from typing import List from chainlit.types import AskFileResponse from aimakerspace.text_utils import CharacterTextSplitter, TextFileLoader from aimakerspace.openai_utils.prompts import ( UserRolePrompt, SystemRolePrompt, AssistantRolePrompt, ) from aimakerspace.openai_utils.embedding import EmbeddingModel from aimakerspace.vectordatabase import VectorDatabase, QdrantDatabase from aimakerspace.openai_utils.chatmodel import ChatOpenAI import chainlit as cl import fitz system_template = """\ Use the following context to answer a users question. If you cannot find the answer in the context, say you don't know the answer.""" system_role_prompt = SystemRolePrompt(system_template) user_prompt_template = """\ Context: {context} Question: {question} """ user_role_prompt = UserRolePrompt(user_prompt_template) class RetrievalAugmentedQAPipeline: def __init__(self, llm, vector_db_retriever: VectorDatabase) -> None: self.llm = llm self.vector_db_retriever = vector_db_retriever async def arun_pipeline(self, user_query: str): context_list = self.vector_db_retriever.search_by_text(user_query, k=4) context_prompt = "" for context in context_list: context_prompt += context[0] + "\n" formatted_system_prompt = system_role_prompt.create_message() formatted_user_prompt = user_role_prompt.create_message(question=user_query, context=context_prompt) async def generate_response(): async for chunk in self.llm.astream([formatted_system_prompt, formatted_user_prompt]): yield chunk return {"response": generate_response(), "context": context_list} text_splitter = CharacterTextSplitter() def process_text_file(file: AskFileResponse): import tempfile file_extension = os.path.splitext(file.name)[1].lower() with tempfile.NamedTemporaryFile(mode="wb", delete=False, suffix=file_extension) as temp_file: temp_file_path = temp_file.name temp_file.write(file.content) if file_extension == ".txt": with open(temp_file_path, "r", encoding="utf-8") as f: text_loader = TextFileLoader(temp_file_path) documents = text_loader.load_documents() texts = text_splitter.split_texts(documents) elif file_extension == ".pdf": pdf_document = fitz.open(temp_file_path) documents = [] for page_num in range(len(pdf_document)): page = pdf_document.load_page(page_num) text = page.get_text() documents.append(text) texts = text_splitter.split_texts(documents) else: raise ValueError("Unsupported file type") return texts @cl.on_chat_start async def on_chat_start(): files = None # Wait for the user to upload a file while not files: files = await cl.AskFileMessage( content="Please upload a .txt or .pdf file to begin processing!", accept=["text/plain", "application/pdf"], max_size_mb=2, timeout=180, ).send() file = files[0] msg = cl.Message( content=f"Processing `{file.name}`...", disable_human_feedback=True ) await msg.send() # load the file texts = process_text_file(file) msg = cl.Message( content=f"Resulted in {len(texts)} chunks", disable_human_feedback=True ) await msg.send() print(f"Processing {len(texts)} text chunks") # decide if to use the dict vector store of the Qdrant vector store use_qdrant = True from qdrant_client import QdrantClient from qdrant_client.http.models import VectorParams, Distance # Create a dict vector store if use_qdrant: embedding_model = EmbeddingModel() qdrant_client = QdrantClient( url='https://6b3eac94-adfe-42cb-98f8-9f068538243c.europe-west3-0.gcp.cloud.qdrant.io:6333', # Replace with your cluster URL api_key='YrnApyEfdNAt41N7WkcZwjhjKqiIQQbXHBtzk_04guNyRLa83J0hOw' # Replace with your API key ) vectors_config = { "default": VectorParams(size=1536, distance="Cosine") # Adjust size as per your model's output } if not qdrant_client.collection_exists("my_collection"): qdrant_client.create_collection( collection_name="my_collection", vectors_config=vectors_config ) vector_db = QdrantDatabase( qdrant_client=qdrant_client, collection_name="my_collection", embedding_model=embedding_model # Replace with your embedding model instance ) vector_db = await vector_db.abuild_from_list(texts) else: vector_db = VectorDatabase() vector_db = await vector_db.abuild_from_list(texts) msg = cl.Message( content=f"The Vector store has been created", disable_human_feedback=True ) await msg.send() chat_openai = ChatOpenAI() # Create a chain retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline( vector_db_retriever=vector_db, llm=chat_openai ) # Let the user know that the system is ready msg.content = f"Processing `{file.name}` done. You can now ask questions!" await msg.update() cl.user_session.set("chain", retrieval_augmented_qa_pipeline) @cl.on_message async def main(message): chain = cl.user_session.get("chain") msg = cl.Message(content="") result = await chain.arun_pipeline(message.content) async for stream_resp in result["response"]: await msg.stream_token(stream_resp) await msg.send()