Chat / api /answer /routes.py
VTechAI's picture
init
8a41f4d
import asyncio
import os
from flask import Blueprint, request, Response
import json
import datetime
import logging
import traceback
from pymongo import MongoClient
from bson.objectid import ObjectId
from transformers import GPT2TokenizerFast
from application.core.settings import settings
from application.vectorstore.vector_creator import VectorCreator
from application.llm.llm_creator import LLMCreator
from application.error import bad_request
logger = logging.getLogger(__name__)
mongo = MongoClient(settings.MONGO_URI)
db = mongo["docsgpt"]
conversations_collection = db["conversations"]
vectors_collection = db["vectors"]
prompts_collection = db["prompts"]
answer = Blueprint('answer', __name__)
if settings.LLM_NAME == "gpt4":
gpt_model = 'gpt-4'
elif settings.LLM_NAME == "anthropic":
gpt_model = 'claude-2'
else:
gpt_model = 'gpt-3.5-turbo'
# load the prompts
current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
with open(os.path.join(current_dir, "prompts", "chat_combine_default.txt"), "r") as f:
chat_combine_template = f.read()
with open(os.path.join(current_dir, "prompts", "chat_reduce_prompt.txt"), "r") as f:
chat_reduce_template = f.read()
with open(os.path.join(current_dir, "prompts", "chat_combine_creative.txt"), "r") as f:
chat_combine_creative = f.read()
with open(os.path.join(current_dir, "prompts", "chat_combine_strict.txt"), "r") as f:
chat_combine_strict = f.read()
api_key_set = settings.API_KEY is not None
embeddings_key_set = settings.EMBEDDINGS_KEY is not None
async def async_generate(chain, question, chat_history):
result = await chain.arun({"question": question, "chat_history": chat_history})
return result
def count_tokens(string):
tokenizer = GPT2TokenizerFast.from_pretrained('gpt2')
return len(tokenizer(string)['input_ids'])
def run_async_chain(chain, question, chat_history):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = {}
try:
answer = loop.run_until_complete(async_generate(chain, question, chat_history))
finally:
loop.close()
result["answer"] = answer
return result
def get_vectorstore(data):
if "active_docs" in data:
if data["active_docs"].split("/")[0] == "default":
vectorstore = ""
elif data["active_docs"].split("/")[0] == "local":
vectorstore = "indexes/" + data["active_docs"]
else:
vectorstore = "vectors/" + data["active_docs"]
if data["active_docs"] == "default":
vectorstore = ""
else:
vectorstore = ""
vectorstore = os.path.join("application", vectorstore)
return vectorstore
def is_azure_configured():
return settings.OPENAI_API_BASE and settings.OPENAI_API_VERSION and settings.AZURE_DEPLOYMENT_NAME
def complete_stream(question, docsearch, chat_history, api_key, prompt_id, conversation_id):
llm = LLMCreator.create_llm(settings.LLM_NAME, api_key=api_key)
if prompt_id == 'default':
prompt = chat_combine_template
elif prompt_id == 'creative':
prompt = chat_combine_creative
elif prompt_id == 'strict':
prompt = chat_combine_strict
else:
prompt = prompts_collection.find_one({"_id": ObjectId(prompt_id)})["content"]
docs = docsearch.search(question, k=2)
if settings.LLM_NAME == "llama.cpp":
docs = [docs[0]]
# join all page_content together with a newline
docs_together = "\n".join([doc.page_content for doc in docs])
p_chat_combine = prompt.replace("{summaries}", docs_together)
messages_combine = [{"role": "system", "content": p_chat_combine}]
source_log_docs = []
for doc in docs:
if doc.metadata:
source_log_docs.append({"title": doc.metadata['title'].split('/')[-1], "text": doc.page_content})
else:
source_log_docs.append({"title": doc.page_content, "text": doc.page_content})
if len(chat_history) > 1:
tokens_current_history = 0
# count tokens in history
chat_history.reverse()
for i in chat_history:
if "prompt" in i and "response" in i:
tokens_batch = count_tokens(i["prompt"]) + count_tokens(i["response"])
if tokens_current_history + tokens_batch < settings.TOKENS_MAX_HISTORY:
tokens_current_history += tokens_batch
messages_combine.append({"role": "user", "content": i["prompt"]})
messages_combine.append({"role": "system", "content": i["response"]})
messages_combine.append({"role": "user", "content": question})
response_full = ""
completion = llm.gen_stream(model=gpt_model, engine=settings.AZURE_DEPLOYMENT_NAME,
messages=messages_combine)
for line in completion:
data = json.dumps({"answer": str(line)})
response_full += str(line)
yield f"data: {data}\n\n"
# save conversation to database
if conversation_id is not None:
conversations_collection.update_one(
{"_id": ObjectId(conversation_id)},
{"$push": {"queries": {"prompt": question, "response": response_full, "sources": source_log_docs}}},
)
else:
# create new conversation
# generate summary
messages_summary = [{"role": "assistant", "content": "Summarise following conversation in no more than 3 "
"words, respond ONLY with the summary, use the same "
"language as the system \n\nUser: " + question + "\n\n" +
"AI: " +
response_full},
{"role": "user", "content": "Summarise following conversation in no more than 3 words, "
"respond ONLY with the summary, use the same language as the "
"system"}]
completion = llm.gen(model=gpt_model, engine=settings.AZURE_DEPLOYMENT_NAME,
messages=messages_summary, max_tokens=30)
conversation_id = conversations_collection.insert_one(
{"user": "local",
"date": datetime.datetime.utcnow(),
"name": completion,
"queries": [{"prompt": question, "response": response_full, "sources": source_log_docs}]}
).inserted_id
# send data.type = "end" to indicate that the stream has ended as json
data = json.dumps({"type": "id", "id": str(conversation_id)})
yield f"data: {data}\n\n"
data = json.dumps({"type": "end"})
yield f"data: {data}\n\n"
@answer.route("/stream", methods=["POST"])
def stream():
data = request.get_json()
# get parameter from url question
question = data["question"]
history = data["history"]
# history to json object from string
history = json.loads(history)
conversation_id = data["conversation_id"]
if 'prompt_id' in data:
prompt_id = data["prompt_id"]
else:
prompt_id = 'default'
# check if active_docs is set
if not api_key_set:
api_key = data["api_key"]
else:
api_key = settings.API_KEY
if not embeddings_key_set:
embeddings_key = data["embeddings_key"]
else:
embeddings_key = settings.EMBEDDINGS_KEY
if "active_docs" in data:
vectorstore = get_vectorstore({"active_docs": data["active_docs"]})
else:
vectorstore = ""
docsearch = VectorCreator.create_vectorstore(settings.VECTOR_STORE, vectorstore, embeddings_key)
return Response(
complete_stream(question, docsearch,
chat_history=history, api_key=api_key,
prompt_id=prompt_id,
conversation_id=conversation_id), mimetype="text/event-stream"
)
@answer.route("/api/answer", methods=["POST"])
def api_answer():
data = request.get_json()
question = data["question"]
history = data["history"]
if "conversation_id" not in data:
conversation_id = None
else:
conversation_id = data["conversation_id"]
print("-" * 5)
if not api_key_set:
api_key = data["api_key"]
else:
api_key = settings.API_KEY
if not embeddings_key_set:
embeddings_key = data["embeddings_key"]
else:
embeddings_key = settings.EMBEDDINGS_KEY
if 'prompt_id' in data:
prompt_id = data["prompt_id"]
else:
prompt_id = 'default'
if prompt_id == 'default':
prompt = chat_combine_template
elif prompt_id == 'creative':
prompt = chat_combine_creative
elif prompt_id == 'strict':
prompt = chat_combine_strict
else:
prompt = prompts_collection.find_one({"_id": ObjectId(prompt_id)})["content"]
# use try and except to check for exception
try:
# check if the vectorstore is set
vectorstore = get_vectorstore(data)
# loading the index and the store and the prompt template
# Note if you have used other embeddings than OpenAI, you need to change the embeddings
docsearch = VectorCreator.create_vectorstore(settings.VECTOR_STORE, vectorstore, embeddings_key)
llm = LLMCreator.create_llm(settings.LLM_NAME, api_key=api_key)
docs = docsearch.search(question, k=2)
# join all page_content together with a newline
docs_together = "\n".join([doc.page_content for doc in docs])
p_chat_combine = prompt.replace("{summaries}", docs_together)
messages_combine = [{"role": "system", "content": p_chat_combine}]
source_log_docs = []
for doc in docs:
if doc.metadata:
source_log_docs.append({"title": doc.metadata['title'].split('/')[-1], "text": doc.page_content})
else:
source_log_docs.append({"title": doc.page_content, "text": doc.page_content})
# join all page_content together with a newline
if len(history) > 1:
tokens_current_history = 0
# count tokens in history
history.reverse()
for i in history:
if "prompt" in i and "response" in i:
tokens_batch = count_tokens(i["prompt"]) + count_tokens(i["response"])
if tokens_current_history + tokens_batch < settings.TOKENS_MAX_HISTORY:
tokens_current_history += tokens_batch
messages_combine.append({"role": "user", "content": i["prompt"]})
messages_combine.append({"role": "system", "content": i["response"]})
messages_combine.append({"role": "user", "content": question})
completion = llm.gen(model=gpt_model, engine=settings.AZURE_DEPLOYMENT_NAME,
messages=messages_combine)
result = {"answer": completion, "sources": source_log_docs}
logger.debug(result)
# generate conversationId
if conversation_id is not None:
conversations_collection.update_one(
{"_id": ObjectId(conversation_id)},
{"$push": {"queries": {"prompt": question,
"response": result["answer"], "sources": result['sources']}}},
)
else:
# create new conversation
# generate summary
messages_summary = [
{"role": "assistant", "content": "Summarise following conversation in no more than 3 words, "
"respond ONLY with the summary, use the same language as the system \n\n"
"User: " + question + "\n\n" + "AI: " + result["answer"]},
{"role": "user", "content": "Summarise following conversation in no more than 3 words, "
"respond ONLY with the summary, use the same language as the system"}
]
completion = llm.gen(
model=gpt_model,
engine=settings.AZURE_DEPLOYMENT_NAME,
messages=messages_summary,
max_tokens=30
)
conversation_id = conversations_collection.insert_one(
{"user": "local",
"date": datetime.datetime.utcnow(),
"name": completion,
"queries": [{"prompt": question, "response": result["answer"], "sources": source_log_docs}]}
).inserted_id
result["conversation_id"] = str(conversation_id)
# mock result
# result = {
# "answer": "The answer is 42",
# "sources": ["https://en.wikipedia.org/wiki/42_(number)", "https://en.wikipedia.org/wiki/42_(number)"]
# }
return result
except Exception as e:
# print whole traceback
traceback.print_exc()
print(str(e))
return bad_request(500, str(e))
@answer.route("/api/search", methods=["POST"])
def api_search():
data = request.get_json()
# get parameter from url question
question = data["question"]
if not embeddings_key_set:
embeddings_key = data["embeddings_key"]
else:
embeddings_key = settings.EMBEDDINGS_KEY
if "active_docs" in data:
vectorstore = get_vectorstore({"active_docs": data["active_docs"]})
else:
vectorstore = ""
docsearch = VectorCreator.create_vectorstore(settings.VECTOR_STORE, vectorstore, embeddings_key)
docs = docsearch.search(question, k=2)
source_log_docs = []
for doc in docs:
if doc.metadata:
source_log_docs.append({"title": doc.metadata['title'].split('/')[-1], "text": doc.page_content})
else:
source_log_docs.append({"title": doc.page_content, "text": doc.page_content})
#yield f"data:{data}\n\n"
return source_log_docs