raqa-app / app.py
schrilax's picture
initial commit
f7dd277
raw
history blame
4.8 kB
import asyncio
import chainlit as cl #importing chainlit for our app
from chainlit.input_widget import Select, Switch, Slider #importing chainlit settings selection tools
from chainlit.prompt import Prompt, PromptMessage #importing prompt tools
from chainlit.playground.providers import ChatOpenAI #importing ChatOpenAI tools
import datetime
# import nest_asyncio
# nest_asyncio.apply()
import openai
import os
from utils.text_utils import TextFileLoader, CharacterTextSplitter
from utils.vectordatabase import VectorDatabase
from utils.openai_utils.prompts import (
UserRolePrompt,
SystemRolePrompt,
AssistantRolePrompt,
)
from utils.openai_utils.chatmodel import ChatOpenAI
import wandb
from wandb.sdk.data_types.trace_tree import Trace
RAQA_PROMPT_TEMPLATE = """
Use the provided context to answer the user's query.
You may not answer the user's query unless there is specific context in the following text.
If you do not know the answer, or cannot answer, please respond with "I don't know".
Context:
{context}
"""
raqa_prompt = SystemRolePrompt(RAQA_PROMPT_TEMPLATE)
USER_PROMPT_TEMPLATE = """
User Query:
{user_query}
"""
user_prompt = UserRolePrompt(USER_PROMPT_TEMPLATE)
# You only need the api key inserted here if it's not in your .env file
openai.api_key = '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'
wandb_key = '32ba71217e4c267962cf2e5603326490019df26a'
os.environ["OPENAI_API_KEY"] = openai.api_key
os.environ["WANDB_API_KEY"] = wandb_key
# os.environ["WANDB_DIR"] = './wandb/'
text_loader = TextFileLoader('docs/')
documents = text_loader.load_documents()
text_splitter = CharacterTextSplitter()
split_documents = text_splitter.split_texts(documents)
vector_db = VectorDatabase()
vector_db = asyncio.run(vector_db.abuild_from_list(split_documents))
wandb_project = 'raqa_visibility'
wandb.init(project=wandb_project)
chat_openai = ChatOpenAI()
class RetrievalAugmentedQAPipeline:
def __init__(self, llm: ChatOpenAI(), vector_db_retriever: VectorDatabase, wandb_project = None) -> None:
self.llm = llm
self.vector_db_retriever = vector_db_retriever
self.wandb_project = wandb_project
def run_pipeline(self, user_query: str) -> str:
context_list = self.vector_db_retriever.search_by_text(user_query, k=4)
context_prompt = ""
for context in context_list:
context_prompt += context[0] + "\n"
formatted_system_prompt = raqa_prompt.create_message(context=context_prompt)
formatted_user_prompt = user_prompt.create_message(user_query=user_query)
start_time = datetime.datetime.now().timestamp() * 1000
try:
openai_response = self.llm.run([formatted_system_prompt, formatted_user_prompt], text_only=False)
end_time = datetime.datetime.now().timestamp() * 1000
status = "success"
status_message = (None, )
response_text = openai_response.choices[0].message.content
token_usage = openai_response["usage"].to_dict()
model = openai_response["model"]
except Exception as e:
end_time = datetime.datetime.now().timestamp() * 1000
status = "error"
status_message = str(e)
response_text = ""
token_usage = {}
model = ""
if self.wandb_project:
root_span = Trace(
name="root_span",
kind="llm",
status_code=status,
status_message=status_message,
start_time_ms=start_time,
end_time_ms=end_time,
metadata={
"token_usage" : token_usage,
"model_name" : model
},
inputs= {"system_prompt" : formatted_system_prompt, "user_prompt" : formatted_user_prompt},
outputs= {"response" : response_text}
)
root_span.log(name="openai_trace")
return response_text if response_text else "We ran into an error. Please try again later. Full Error Message: " + status_message
@cl.on_chat_start # marks a function that will be executed at the start of a user session
async def start_chat():
retrieval_augmented_qa_pipeline = RetrievalAugmentedQAPipeline(vector_db_retriever=vector_db, llm=chat_openai, wandb_project=wandb_project)
cl.user_session.set("pipeline", retrieval_augmented_qa_pipeline)
@cl.on_message # marks a function that should be run each time the chatbot receives a message from a user
async def main(message: str):
retrieval_augmented_qa_pipeline = cl.user_session.get("pipeline")
completion = retrieval_augmented_qa_pipeline.run_pipeline(message)
await cl.Message(content=completion).send()