ConversAI / functions.py
Rauhan's picture
DEBUG: llModel
572d835
raw
history blame
15.3 kB
import pymupdf
import string
from concurrent.futures import ThreadPoolExecutor
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_qdrant import QdrantVectorStore
from langchain_qdrant import RetrievalMode
from langchain_core.prompts.chat import ChatPromptTemplate
from langchain_core.prompts import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain.memory import ChatMessageHistory
from pandasai import SmartDataframe
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.document_loaders import YoutubeLoader
from langchain.docstore.document import Document
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_qdrant import FastEmbedSparse
from supabase.client import create_client
from qdrant_client import QdrantClient
from langchain_groq import ChatGroq
from pdf2image import convert_from_bytes
import numpy as np
import easyocr
from bs4 import BeautifulSoup
from urllib.parse import urlparse, urljoin
from supabase import create_client
from dotenv import load_dotenv
import os
import base64
import time
import requests
load_dotenv("secrets.env")
client = create_client(os.environ["SUPABASE_URL"], os.environ["SUPABASE_KEY"])
qdrantClient = QdrantClient(url=os.environ["QDRANT_URL"], api_key=os.environ["QDRANT_API_KEY"])
model_kwargs = {"device": "cuda"}
encode_kwargs = {"normalize_embeddings": True}
vectorEmbeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs=model_kwargs,
encode_kwargs=encode_kwargs
)
reader = easyocr.Reader(['en'], gpu=True, model_storage_directory="/app/EasyOCRModels")
sparseEmbeddings = FastEmbedSparse(model="Qdrant/BM25", threads=20, parallel=0)
prompt = """
INSTRUCTIONS:
=====================================
### Role
**Primary Function**: You are an AI chatbot designed to provide accurate and efficient assistance to users based on provided context data. Your responses must be reliable, friendly, and directly address user inquiries or issues. Always clarify any unclear questions, and conclude responses positively.
### Constraints
1. **No Data Disclosure**: Never reveal access to training data or any context explicitly.
2. **Maintaining Focus**: Politely redirect any off-topic conversations back to relevant issues without breaking character.
3. **Exclusive Reliance on Context Data**: Base all answers strictly on the provided context data. If the context doesn’t cover the query, use a fallback response. Always maintain a third-person perspective.
4. **Restrictive Role Focus**: Do not engage in tasks or answer questions unrelated to your role or context data.
Ensure all instructions are strictly followed. Responses must be meaningful and concise, within 512 words. Make sure the user is always happy and satisfied with the outputs you return.
CONTEXT:
=====================================
{context}
======================================
QUESTION:
=====================================
{question}
CHAT HISTORY:
=====================================
{chatHistory}
NOTE: Generate responses directly without using phrases like "Response:" or "Answer:". NEVER mention the user about usage of any context to generate an answer.
"""
prompt = ChatPromptTemplate.from_template(prompt)
chatHistoryStore = dict()
class FollowUps(BaseModel):
q1: str = Field(description="First Follow-up Question")
q2: str = Field(description="Second Follow-up Question")
q3: str = Field(description="Third Follow-up Question")
followUpPrompt = """
You are an expert chatbot at framing follow up questions using some given text such that their answers can be found in the text itself and have been given the task of doing the same. Make sure that the questions are good quality and not too long in length. Frame appropriate and meaningful questions out of the given text and DO NOT mention the usage of any text in the questions. Also, if no the given text says NO CONTEXT FOUND, please return an empty string for each question asked.
\n{format_instructions}
\n{context}
"""
jsonParser = JsonOutputParser(pydantic_object=FollowUps)
followUpPrompt = PromptTemplate(
template=followUpPrompt,
input_variables=["context"],
partial_variables={"format_instructions": jsonParser.get_format_instructions()},
)
def createUser(user_id: str, username: str, email: str) -> dict:
userData = client.table("ConversAI_UserInfo").select("*").execute().data
if username not in [userData[x]["username"] for x in range(len(userData))]:
try:
client.table("ConversAI_UserInfo").insert(
{"user_id": user_id, "username": username, "email": email}).execute()
client.table("ConversAI_UserConfig").insert({"user_id": username}).execute()
res = {
"code": 200,
"message": "User Setup Successful"
}
except Exception as e:
res = {
"code": 409,
"message": "Email already exists",
}
return res
else:
return {
"code": 409,
"message": "Username already exists"
}
def createTable(tablename: str):
global vectorEmbeddings
global sparseEmbeddings
qdrant = QdrantVectorStore.from_documents(
documents=[],
embedding=vectorEmbeddings,
sparse_embedding=sparseEmbeddings,
url=os.environ["QDRANT_URL"],
prefer_grpc=True,
api_key=os.environ["QDRANT_API_KEY"],
collection_name=tablename,
force_recreate=True,
retrieval_mode=RetrievalMode.HYBRID
)
return {
"output": "SUCCESS"
}
def cleanText(text: str):
text = text.replace("\n", " ")
text = text.translate(str.maketrans('', '', string.punctuation.replace(".", "")))
return text
def addDocuments(texts: list[tuple[str]], vectorstore: str):
global vectorEmbeddings
global sparseEmbeddings
splitter = RecursiveCharacterTextSplitter(
chunk_size=1500,
chunk_overlap=250,
add_start_index=True
)
sources = [textTuple[1] for textTuple in texts]
texts = [textTuple[0].replace("\n", " ") for textTuple in texts]
texts = [text.translate(str.maketrans('', '', string.punctuation.replace(".", ""))) for text in texts]
texts = [Document(page_content=text, metadata={"source": source}) for text, source in zip(texts, sources)]
documents = splitter.split_documents(texts)
vectorstore = QdrantVectorStore.from_documents(
documents=documents,
embedding=vectorEmbeddings,
sparse_embedding=sparseEmbeddings,
url=os.environ["QDRANT_URL"],
prefer_grpc=True,
api_key=os.environ["QDRANT_API_KEY"],
collection_name=vectorstore,
force_recreate=True,
retrieval_mode=RetrievalMode.HYBRID
)
return {
"output": "SUCCESS"
}
def format_docs(docs: str):
global sources
global tempContext
sources = []
context = ""
for doc in docs:
context += f"{doc.page_content}\n\n\n"
source = doc.metadata
source = source["source"]
sources.append(source)
if context == "":
context = "No context found"
else:
pass
sources = list(set(sources))
tempContext = context
return context
def get_session_history(session_id: str) -> BaseChatMessageHistory:
if session_id not in chatHistoryStore:
chatHistoryStore[session_id] = ChatMessageHistory()
return chatHistoryStore[session_id]
def trimMessages(chain_input):
for storeName in chatHistoryStore:
messages = chatHistoryStore[storeName].messages
if len(messages) <= 1:
pass
else:
chatHistoryStore[storeName].clear()
for message in messages[-1:]:
chatHistoryStore[storeName].add_message(message)
return True
def answerQuery(query: str, vectorstore: str, llmModel: str = "llama-3.1-70b-versatile") -> str:
global prompt
global client
global sources
global jsonParser
global tempContext
global followUpPrompt
global vectorEmbeddings
global sparseEmbeddings
vectorStoreName = vectorstore
vectorstore = QdrantVectorStore.from_existing_collection(
embedding=vectorEmbeddings,
sparse_embedding=sparseEmbeddings,
collection_name=vectorstore,
url=os.environ["QDRANT_URL"],
api_key=os.environ["QDRANT_API_KEY"],
retrieval_mode=RetrievalMode.HYBRID
)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 4, "score_threshold": None})
baseChain = (
{"context": RunnableLambda(lambda x: x["question"]) | retriever | RunnableLambda(format_docs),
"question": RunnableLambda(lambda x: x["question"]),
"chatHistory": RunnableLambda(lambda x: x["chatHistory"])}
| prompt
| ChatGroq(model_name=llmModel, temperature=0.75, max_tokens=512)
| StrOutputParser()
)
messageChain = RunnableWithMessageHistory(
baseChain,
get_session_history,
input_messages_key="question",
history_messages_key="chatHistory"
)
chain = RunnablePassthrough.assign(messages_trimmed=trimMessages) | messageChain
followUpChain = followUpPrompt | ChatGroq(model_name="llama-3.1-70b-versatile", temperature=0) | jsonParser
output = chain.invoke(
{"question": query},
{"configurable": {"session_id": vectorStoreName}}
)
followUpQuestions = followUpChain.invoke({"context": tempContext})
return {
"output": output,
"followUpQuestions": followUpQuestions,
"sources": sources
}
def deleteTable(tableName: str):
try:
global qdrantClient
qdrantClient.delete_collection(collection_name=tableName)
return {
"output": "SUCCESS"
}
except Exception as e:
return {
"error": e
}
def listTables(username: str):
try:
global qdrantClient
qdrantCollections = qdrantClient.get_collections()
return {
"output": list(filter(lambda x: True if x.split("$")[1] == username else False,
[x.name for x in qdrantCollections.collections]))
}
except Exception as e:
return {
"error": e
}
def getLinks(url: str, timeout=30):
start = time.time()
def getLinksFromPage(url: str) -> list:
response = requests.get(url)
soup = BeautifulSoup(response.content, "lxml")
anchors = soup.find_all("a")
links = []
for anchor in anchors:
if "href" in anchor.attrs:
if urlparse(anchor.attrs["href"]).netloc == urlparse(url).netloc:
links.append(anchor.attrs["href"])
elif not anchor.attrs["href"].startswith(("//", "file", "javascript", "tel", "mailto", "http")):
links.append(urljoin(url + "/", anchor.attrs["href"]))
else:
pass
links = [link for link in links if "#" not in link]
links = list(set(links))
else:
continue
return links
links = getLinksFromPage(url)
uniqueLinks = set()
for link in links:
now = time.time()
if now - start > timeout:
break
else:
uniqueLinks = uniqueLinks.union(set(getLinksFromPage(link)))
return list(set([x[:len(x) - 1] if x[-1] == "/" else x for x in uniqueLinks]))
def getTextFromImagePDF(pdfBytes):
def getText(image):
global reader
text = "\n".join([text[1] for text in reader.readtext(np.array(image), paragraph=True)])
return cleanText(text = text)
allImages = convert_from_bytes(pdfBytes)
texts = [getText(image) for image in allImages]
return {x + 1: y for x, y in enumerate(texts)}
def getTranscript(urls: str):
texts = []
for url in set(urls):
try:
loader = YoutubeLoader.from_youtube_url(
url, add_video_info=False
)
doc = " ".join([x.page_content for x in loader.load()])
texts.append(cleanText(text = doc))
except:
doc = ""
texts.append(doc)
return {x: y for x, y in zip(urls, texts)}
def analyzeData(query, dataframe):
query += ". In case, you are to plot a chart, make sure the x-axis labels are 90 degree rotated"
llm = ChatGroq(name="llama-3.1-8b-instant")
df = SmartDataframe(dataframe, config={"llm": llm, "verbose": False})
response = df.chat(query)
if os.path.isfile(response):
with open(response, "rb") as file:
b64string = base64.b64encode(file.read()).decode("utf-8", errors = "replace")
return f"data:image/png;base64,{b64string}"
else:
return response
def extractTextFromPage(page):
return cleanText(text = page.get_text())
def extractTextFromPdf(pdf_path):
doc = pymupdf.open(pdf_path)
pages = [doc.load_page(i) for i in range(len(doc))]
with ThreadPoolExecutor() as executor:
texts = list(executor.map(extractTextFromPage, pages))
doc.close()
return {x + 1: y for x, y in enumerate(texts)}
def extractTextFromUrl(url):
response = requests.get(url)
response.raise_for_status()
html = response.text
soup = BeautifulSoup(html, 'lxml')
return cleanText(text = soup.get_text(separator=' ', strip=True))
def extractTextFromUrlList(urls):
with ThreadPoolExecutor() as executor:
texts = list(executor.map(extractTextFromUrl, urls))
return {x: y for x, y in zip(urls, texts)}
def encodeToBase64(dct: dict):
for key in dct:
if type(dct[key]) == str:
dct[key] = base64.b64encode(dct[key].encode("utf-8", errors = "replace")).decode("utf-8", errors = "replace")
elif type(dct[key]) == dict:
dct[key] = encodeToBase64(dct[key])
return dct
def decodeBase64(dct: dict):
if type(dct["output"]) == str:
dct["output"] = base64.b64decode(dct["output"].encode("utf-8", errors = "replace")).decode("utf-8", errors = "replace")
else:
for key in dct["output"]:
dct["output"][key] = base64.b64decode(dct["output"][key].encode("utf-8", errors = "replace")).decode("utf-8", errors = "replace")
return dct
def createDataSourceName(sourceName):
sources = [x["dataSourceName"] for x in client.table("ConversAI_ChatbotDataSources").select("dataSourceName").execute().data]
if sourceName not in sources:
return sourceName
else:
i = 1
while True:
sourceName = sourceName + "-" + str(i)
return createDataSourceName(sourceName)
def trackUsage(vectorstore: str, endpoint: str):
username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2]
client.table("ConversAI_ActivityLog").insert({"username": username, "chatbotName": chatbotName, "endpointUsed": endpoint}).execute()