Spaces:
Build error
Build error
# %% | |
import nltk | |
from langchain.indexes import VectorstoreIndexCreator | |
from langchain.text_splitter import CharacterTextSplitter, NLTKTextSplitter | |
from langchain.document_loaders import OnlinePDFLoader, UnstructuredPDFLoader | |
from langchain.vectorstores import Chroma | |
from langchain.embeddings import LlamaCppEmbeddings, HuggingFaceInstructEmbeddings | |
from chromadb.config import Settings | |
import chromadb | |
from chromadb.utils import embedding_functions | |
from hashlib import sha256 | |
import cloudpickle | |
import logging | |
import os | |
from load_model import load_embedding, load_vectorstore | |
import torch | |
import re | |
import pathlib | |
import tempfile | |
current_path = str( pathlib.Path(__file__).parent.resolve() ) | |
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE" | |
nltk.download('punkt') | |
persist_directory = current_path + "/VectorStore" | |
logger = logging.getLogger() | |
# %% | |
def create_collection(collection_name, model_name, client): | |
"""Not used atm""" | |
if not torch.cuda.is_available(): | |
device= "cpu" | |
else: | |
device= "cuda" | |
ef = embedding_functions.InstructorEmbeddingFunction( | |
model_name=model_name, device=device) | |
client.get_or_create_collection(collection_name, embedding_function=ef) | |
return True | |
def create_and_add(collection_name, sub_docs, model_name, metadata): | |
logging.info(f"Adding documents to {collection_name}") | |
embeddings = load_embedding(model_name) | |
vectorstore = load_vectorstore(model_name, collection_name, metadata = metadata) | |
vectorstore.add_documents(documents=sub_docs, embedding=embeddings) | |
vectorstore.persist() | |
# Test Vectorstore | |
vectorstore2 = load_vectorstore(model_name, collection_name, metadata = metadata) | |
print( vectorstore2.similarity_search_with_score(query="What are AXAs green Goals?", k=4) ) | |
return vectorstore | |
def load_from_file(files): | |
saved_files=[] | |
with tempfile.TemporaryDirectory() as tmpdirname: | |
for file in files: | |
temp_dir = pathlib.Path(tmpdirname) | |
file_name = os.path.join(temp_dir,file.name) | |
saved_files.append(file_name) | |
with open(file_name, mode='wb') as w: | |
w.write(file.read()) | |
print(saved_files) | |
loaders=[UnstructuredPDFLoader(pdf) for pdf in saved_files] | |
docs = [] | |
print(loaders) | |
for loader in loaders: | |
docs.extend(loader.load()) | |
return docs | |
def load_from_web(urls, cache=True): | |
docs_list = urls | |
filename=f"{current_path}/.cache/{sha256(str(urls).encode('utf-8')).hexdigest()}.pkl" | |
isFile = os.path.isfile(filename) | |
if cache and isFile: | |
logger.info("Using Cache") | |
pikd = open(filename, "rb") | |
docs = cloudpickle.load(pikd) | |
else: | |
loaders=[OnlinePDFLoader(pdf) for pdf in docs_list] | |
docs = [] | |
for loader in loaders: | |
docs.extend(loader.load()) | |
with open(filename, 'wb') as output: | |
cloudpickle.dump(docs, output) | |
#update metadata | |
i=0 | |
for doc in docs: | |
doc.metadata = {'source': docs_list[i], 'url': docs_list[i], 'owner':'Heiko Wagner'} | |
i=i+1 | |
return docs | |
def load_and_split(docs, chunk_size=700): | |
text_splitter = NLTKTextSplitter(chunk_size=chunk_size, chunk_overlap=0) | |
sub_docs = text_splitter.split_documents(docs) | |
return sub_docs |