from services.document_manager.document_loader import DocumentsLoader # from services.vectordb_manager.vectordb_manager import VectordbManager import pandas as pd from dotenv import load_dotenv import os import re from app import client, default_embedding_function import secrets import streamlit as st from spellchecker import SpellChecker load_dotenv() openai_key = os.getenv("OPENAI_API_KEY") openai_key = st.secrets["OPENAI_API_KEY"] def clean_text(text): # Remove non-ASCII characters cleaned_text = re.sub(r"[^\x00-\x7F]+", " ", text) # Replace specific artifacts you identified (example pattern below) cleaned_text = re.sub( r"�+", " ", cleaned_text ) # Replace sequences of � with a single space # Further cleaning steps can be added here as needed return cleaned_text def correct_spelling(text): spell = SpellChecker() corrected_text = " ".join([spell.correction(word) for word in text.split()]) return corrected_text def clean_list(contents): cleaned_contents = [] for text in contents: # Count weird characters (non-ASCII) weird_chars = sum(not (ord(char) < 128) for char in text) # Check if weird characters are more than 20% of the text if weird_chars / len(text) <= 0.20: cleaned_contents.append(text) return cleaned_contents def generate_knowledge_box_from_url( client, kb_name: str, urls: list, embedding_fct=default_embedding_function, chunk_size: int = 2_000, ): dl = DocumentsLoader() docs = dl.load_docs(urls) splits = dl.split_docs(docs, chunk_size=chunk_size) contents = [split.page_content for split in splits] metadatas = [split.metadata for split in splits] cleaned_contents = clean_list(contents) cleaned_contents = [ re.sub(r"\n+", " ", content) for content in cleaned_contents ] # clean text a bit cleaned_contents = [clean_text(content) for content in cleaned_contents] cleaned_contents = [correct_spelling(content) for content in cleaned_contents] chroma_collection = client.create_collection( kb_name, embedding_function=embedding_fct, metadata={"hnsw:space": "cosine"}, ) ids = [secrets.token_hex(16) for _ in cleaned_contents] chroma_collection.add(documents=cleaned_contents, ids=ids, metadatas=metadatas) n_splits = chroma_collection.count() return {"status": 200, "n_split": n_splits} def add_links_to_knowledge_base( client, kb_name: str, urls: list, chunk_size: int = 2_000, pdf_optional_link=None, pdf_title=None, embedding_fct=default_embedding_function, ): dl = DocumentsLoader() docs = dl.load_docs(urls) splits = dl.split_docs(docs, chunk_size=chunk_size) contents = [split.page_content for split in splits] metadatas = [split.metadata for split in splits] if pdf_optional_link and pdf_title: for md in metadatas: md["source"] = pdf_optional_link md["title"] = pdf_title cleaned_contents = [ re.sub(r"\n+", " ", content) for content in contents ] # clean text a bit embeddings = default_embedding_function(cleaned_contents) chroma_collection = client.get_collection(name=kb_name) ids = [secrets.token_hex(16) for _ in cleaned_contents] chroma_collection.add( documents=cleaned_contents, embeddings=embeddings, ids=ids, metadatas=metadatas ) n_splits = chroma_collection.count() return {"status": 200, "n_split": n_splits} if __name__ == "__main__": df = pd.read_csv("test_marcello.csv") kb_name = "new_new_test" urls = df.values.tolist() # res = generate_knowledge_box_from_url( # client=client, # urls=urls, # kb_name=kb_name, # embedding_fct=default_embedding_function, # chunk_size=2_000, # ) df = pd.read_csv("test2.csv") urls = df.values.tolist() res = add_links_to_knowledge_base( client=client, kb_name="test", urls=urls, )