Spaces:
Sleeping
Sleeping
import pymupdf | |
from concurrent.futures import ThreadPoolExecutor | |
from langchain_core.runnables import RunnablePassthrough, RunnableLambda | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from langchain_qdrant import QdrantVectorStore | |
from langchain_qdrant import RetrievalMode | |
from langchain_core.prompts.chat import ChatPromptTemplate | |
from uuid import uuid4 | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain.retrievers import ParentDocumentRetriever | |
from langchain_core.runnables.history import RunnableWithMessageHistory | |
from langchain.memory import ChatMessageHistory | |
from pandasai import SmartDataframe | |
from langchain_core.chat_history import BaseChatMessageHistory | |
from langchain_community.document_loaders import YoutubeLoader | |
from langchain.docstore.document import Document | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain.retrievers import ContextualCompressionRetriever | |
from langchain_qdrant import FastEmbedSparse | |
from langchain.retrievers.document_compressors import FlashrankRerank | |
from supabase.client import create_client | |
from qdrant_client import QdrantClient | |
from langchain_groq import ChatGroq | |
from pdf2image import convert_from_bytes | |
import numpy as np | |
import easyocr | |
from bs4 import BeautifulSoup | |
from urllib.parse import urlparse, urljoin | |
from supabase import create_client | |
from dotenv import load_dotenv | |
import os | |
import base64 | |
import time | |
import requests | |
load_dotenv("secrets.env") | |
client = create_client(os.environ["SUPABASE_URL"], os.environ["SUPABASE_KEY"]) | |
qdrantClient = QdrantClient(url=os.environ["QDRANT_URL"], api_key=os.environ["QDRANT_API_KEY"]) | |
model_kwargs = {"device": "cuda"} | |
encode_kwargs = {"normalize_embeddings": True} | |
vectorEmbeddings = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-MiniLM-L6-v2", | |
model_kwargs=model_kwargs, | |
encode_kwargs=encode_kwargs | |
) | |
reader = easyocr.Reader(['en'], gpu=True, model_storage_directory="/app/EasyOCRModels") | |
sparseEmbeddings = FastEmbedSparse(model="Qdrant/BM25", threads=20, parallel=0) | |
prompt = """ | |
INSTRUCTIONS: | |
===================================== | |
### Role | |
**Primary Function**: You are an AI chatbot designed to provide accurate and efficient assistance to users based on provided context data. Your responses must be reliable, friendly, and directly address user inquiries or issues. Always clarify any unclear questions, and conclude responses positively. | |
### Constraints | |
1. **No Data Disclosure**: Never reveal access to training data or any context explicitly. | |
2. **Maintaining Focus**: Politely redirect any off-topic conversations back to relevant issues without breaking character. | |
3. **Exclusive Reliance on Context Data**: Base all answers strictly on the provided context data. If the context doesn’t cover the query, use a fallback response. Always maintain a third-person perspective. | |
4. **Restrictive Role Focus**: Do not engage in tasks or answer questions unrelated to your role or context data. | |
Ensure all instructions are strictly followed. Responses must be meaningful and concise, within 512 words. Include sources to support your answers when possible. | |
CONTEXT: | |
===================================== | |
{context} | |
====================================== | |
QUESTION: | |
===================================== | |
{question} | |
CHAT HISTORY: | |
===================================== | |
{chatHistory} | |
NOTE: Generate responses directly without using phrases like "Response:" or "Answer:". Do not mention the use of extracted context or provide unnecessary details. | |
""" | |
prompt = ChatPromptTemplate.from_template(prompt) | |
chatHistoryStore = dict() | |
def createUser(user_id: str, username: str) -> dict: | |
try: | |
userData = client.table("ConversAI_UserInfo").select("*").execute().data | |
if username not in [userData[x]["user_id"] for x in range(len(userData))]: | |
client.table("ConversAI_UserInfo").insert({"user_id": user_id, "username": username}).execute() | |
client.table("ConversAI_UserConfig").insert({"user_id": username}).execute() | |
return { | |
"output": "SUCCESS" | |
} | |
else: | |
return { | |
"output": "USER ALREADY EXISTS" | |
} | |
except Exception as e: | |
return { | |
"error": e | |
} | |
def createTable(tablename: str): | |
global vectorEmbeddings | |
global sparseEmbeddings | |
qdrant = QdrantVectorStore.from_documents( | |
documents=[], | |
embedding=vectorEmbeddings, | |
sparse_embedding=sparseEmbeddings, | |
url=os.environ["QDRANT_URL"], | |
prefer_grpc=True, | |
api_key=os.environ["QDRANT_API_KEY"], | |
collection_name=tablename, | |
retrieval_mode=RetrievalMode.HYBRID | |
) | |
return { | |
"output": "SUCCESS" | |
} | |
def addDocuments(text: str, source: str, vectorstore: str): | |
global vectorEmbeddings | |
global sparseEmbeddings | |
splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1500, | |
chunk_overlap=250, | |
add_start_index=True | |
) | |
texts = [Document(page_content=text, metadata={"source": source})] | |
texts = splitter.split_documents(texts) | |
ids = [str(uuid4()) for _ in range(len(texts))] | |
vectorstore = QdrantVectorStore.from_existing_collection( | |
embedding=vectorEmbeddings, | |
sparse_embedding=sparseEmbeddings, | |
collection_name=vectorstore, | |
url=os.environ["QDRANT_URL"], | |
api_key=os.environ["QDRANT_API_KEY"], | |
retrieval_mode=RetrievalMode.HYBRID | |
) | |
vectorstore.add_documents(documents=texts, ids=ids) | |
return { | |
"output": "SUCCESS" | |
} | |
def format_docs(docs: str): | |
context = "" | |
for doc in docs: | |
print("METADATA ::: ", type(doc.metadata)) | |
context += f"CONTENT: {doc.page_content}\nSOURCE: {doc.metadata} \n\n\n" | |
if context == "": | |
context = "No context found" | |
else: | |
pass | |
return context | |
def get_session_history(session_id: str) -> BaseChatMessageHistory: | |
if session_id not in chatHistoryStore: | |
chatHistoryStore[session_id] = ChatMessageHistory() | |
return chatHistoryStore[session_id] | |
def trimMessages(chain_input): | |
for storeName in chatHistoryStore: | |
messages = chatHistoryStore[storeName].messages | |
if len(messages) <= 1: | |
pass | |
else: | |
chatHistoryStore[storeName].clear() | |
for message in messages[-1:]: | |
chatHistoryStore[storeName].add_message(message) | |
return True | |
def answerQuery(query: str, vectorstore: str, llmModel: str = "llama3-70b-8192") -> str: | |
global prompt | |
global client | |
global vectorEmbeddings | |
global sparseEmbeddings | |
vectorStoreName = vectorstore | |
vectorstore = QdrantVectorStore.from_existing_collection( | |
embedding=vectorEmbeddings, | |
sparse_embedding=sparseEmbeddings, | |
collection_name=vectorstore, | |
url=os.environ["QDRANT_URL"], | |
api_key=os.environ["QDRANT_API_KEY"], | |
retrieval_mode=RetrievalMode.HYBRID | |
) | |
retriever = vectorstore.as_retriever(search_type = "mmr", search_kwargs={"k": 4, "score_threshold": None}) | |
baseChain = ( | |
{"context": RunnableLambda(lambda x: x["question"]) | retriever | RunnableLambda(format_docs), | |
"question": RunnableLambda(lambda x: x["question"]), "chatHistory": RunnableLambda(lambda x: x["chatHistory"])} | |
| prompt | |
| ChatGroq(model=llmModel, temperature=0.75, max_tokens=512) | |
| StrOutputParser() | |
) | |
messageChain = RunnableWithMessageHistory( | |
baseChain, | |
get_session_history, | |
input_messages_key="question", | |
history_messages_key="chatHistory" | |
) | |
chain = RunnablePassthrough.assign(messages_trimmed=trimMessages) | messageChain | |
return { | |
"output": chain.invoke( | |
{"question": query}, | |
{"configurable": {"session_id": vectorStoreName}} | |
) | |
} | |
def deleteTable(tableName: str): | |
try: | |
global qdrantClient | |
qdrantClient.delete_collection(collection_name=tableName) | |
return { | |
"output": "SUCCESS" | |
} | |
except Exception as e: | |
return { | |
"error": e | |
} | |
def listTables(username: str): | |
try: | |
global qdrantClient | |
qdrantCollections = qdrantClient.get_collections() | |
return { | |
"output": list(filter(lambda x: True if x.split("$")[1] == username else False, | |
[x.name for x in qdrantCollections.collections])) | |
} | |
except Exception as e: | |
return { | |
"error": e | |
} | |
def getLinks(url: str, timeout=30): | |
start = time.time() | |
def getLinksFromPage(url: str) -> list: | |
response = requests.get(url) | |
soup = BeautifulSoup(response.content, "lxml") | |
anchors = soup.find_all("a") | |
links = [] | |
for anchor in anchors: | |
if "href" in anchor.attrs: | |
if urlparse(anchor.attrs["href"]).netloc == urlparse(url).netloc: | |
links.append(anchor.attrs["href"]) | |
elif not anchor.attrs["href"].startswith(("//", "file", "javascript", "tel", "mailto", "http")): | |
links.append(urljoin(url + "/", anchor.attrs["href"])) | |
else: | |
pass | |
links = [link for link in links if "#" not in link] | |
links = list(set(links)) | |
else: | |
continue | |
return links | |
links = getLinksFromPage(url) | |
uniqueLinks = set() | |
for link in links: | |
now = time.time() | |
if now - start > timeout: | |
break | |
else: | |
uniqueLinks = uniqueLinks.union(set(getLinksFromPage(link))) | |
return list(set([x[:len(x) - 1] if x[-1] == "/" else x for x in uniqueLinks])) | |
def getTextFromImagePDF(pdfBytes): | |
def getText(image): | |
global reader | |
return "\n".join([text[1] for text in reader.readtext(np.array(image), paragraph=True)]) | |
allImages = convert_from_bytes(pdfBytes) | |
texts = [getText(image) for image in allImages] | |
return "\n\n\n".join(texts) | |
def getTranscript(urls: str): | |
urls = urls.split(",") | |
texts = [] | |
for url in urls: | |
try: | |
loader = YoutubeLoader.from_youtube_url( | |
url, add_video_info=False | |
) | |
doc = " ".join([x.page_content for x in loader.load()]) | |
texts.append(doc) | |
except: | |
doc = "" | |
texts.append(doc) | |
return "\n\n".join(texts) | |
def analyzeData(query, dataframe): | |
llm = ChatGroq(name="llama-3.1-8b-instant") | |
df = SmartDataframe(dataframe, config={"llm": llm, "verbose": False}) | |
response = df.chat(query) | |
if os.path.isfile(response): | |
with open(response, "rb") as file: | |
b64string = base64.b64encode(file.read()).decode("utf-8") | |
return f"data:image/png;base64,{b64string}" | |
else: | |
return response | |
def extractTextFromPage(page): | |
return page.get_text() | |
def extractTextFromPdf(pdf_path): | |
doc = pymupdf.open(pdf_path) | |
pages = [doc.load_page(i) for i in range(len(doc))] | |
with ThreadPoolExecutor() as executor: | |
texts = list(executor.map(extractTextFromPage, pages)) | |
doc.close() | |
return '.'.join(texts) | |
def extractTextFromUrl(url): | |
response = requests.get(url) | |
response.raise_for_status() | |
html = response.text | |
soup = BeautifulSoup(html, 'lxml') | |
return soup.get_text(separator=' ', strip=True) | |
def extractTextFromUrlList(urls): | |
with ThreadPoolExecutor() as executor: | |
texts = list(executor.map(extractTextFromUrl, urls)) | |
return '.'.join(texts) | |