import asyncio import chainlit as cl #importing chainlit for our app from chainlit.input_widget import Select, Switch, Slider #importing chainlit settings selection tools from chainlit.prompt import Prompt, PromptMessage #importing prompt tools from chainlit.playground.providers import ChatOpenAI #importing ChatOpenAI tools import datetime # import nest_asyncio # nest_asyncio.apply() import openai import os from utils.text_utils import TextFileLoader, CharacterTextSplitter from utils.vectordatabase import VectorDatabase from utils.openai_utils.prompts import ( UserRolePrompt, SystemRolePrompt, AssistantRolePrompt, ) from utils.openai_utils.chatmodel import ChatOpenAI import wandb from wandb.sdk.data_types.trace_tree import Trace RAQA_PROMPT_TEMPLATE = """ Use the provided context to answer the user's query. You may not answer the user's query unless there is specific context in the following text. If you do not know the answer, or cannot answer, please respond with "I don't know". Context: {context} """ raqa_prompt = SystemRolePrompt(RAQA_PROMPT_TEMPLATE) USER_PROMPT_TEMPLATE = """ User Query: {user_query} """ user_prompt = UserRolePrompt(USER_PROMPT_TEMPLATE) # You only need the api key inserted here if it's not in your .env file openai.api_key = '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' wandb_key = '32ba71217e4c267962cf2e5603326490019df26a' os.environ["OPENAI_API_KEY"] = openai.api_key os.environ["WANDB_API_KEY"] = wandb_key # os.environ["WANDB_DIR"] = './wandb/' text_loader = TextFileLoader('docs/') documents = text_loader.load_documents() text_splitter = CharacterTextSplitter() split_documents = text_splitter.split_texts(documents) vector_db = VectorDatabase() vector_db = asyncio.run(vector_db.abuild_from_list(split_documents)) wandb_project = 'raqa_visibility' wandb.init(project=wandb_project) chat_openai = ChatOpenAI() class RetrievalAugmentedQAPipeline: def __init__(self, llm: ChatOpenAI(), vector_db_retriever: VectorDatabase, wandb_project = None) -> None: self.llm = llm self.vector_db_retriever = vector_db_retriever self.wandb_project = wandb_project def run_pipeline(self, user_query: str) -> str: context_list = self.vector_db_retriever.search_by_text(user_query, k=4) context_prompt = "" for context in context_list: context_prompt += context[0] + "\n" formatted_system_prompt = raqa_prompt.create_message(context=context_prompt) formatted_user_prompt = user_prompt.create_message(user_query=user_query) start_time = datetime.datetime.now().timestamp() * 1000 try: openai_response = self.llm.run([formatted_system_prompt, formatted_user_prompt], text_only=False) end_time = datetime.datetime.now().timestamp() * 1000 status = "success" status_message = (None, ) response_text = openai_response.choices[0].message.content token_usage = openai_response["usage"].to_dict() model = openai_response["model"] except Exception as e: end_time = datetime.datetime.now().timestamp() * 1000 status = "error" status_message = str(e) response_text = "" token_usage = {} model = "" if self.wandb_project: root_span = Trace( name="root_span", kind="llm", status_code=status, status_message=status_message, start_time_ms=start_time, end_time_ms=end_time, metadata={ "token_usage" : token_usage, "model_name" : model }, inputs= {"system_prompt" : formatted_system_prompt, "user_prompt" : formatted_user_prompt}, outputs= {"response" : response_text} ) root_span.log(name="openai_trace") return response_text if response_text else "We ran into an error. Please try again later. Full Error Message: " + status_message @cl.on_chat_start # marks a function that will be executed at the start of a user session async def start_chat(): retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline(vector_db_retriever=vector_db, llm=chat_openai, wandb_project=wandb_project) cl.user_session.set("pipeline", retrieval_augmented_qa_pipeline) @cl.on_message # marks a function that should be run each time the chatbot receives a message from a user async def main(message: str): retrieval_augmented_qa_pipeline = cl.user_session.get("pipeline") completion = retrieval_augmented_qa_pipeline.run_pipeline(message) await cl.Message(content=completion).send()