Spaces:
Running
Running
from services.document_manager.document_loader import DocumentsLoader | |
# from services.vectordb_manager.vectordb_manager import VectordbManager | |
import pandas as pd | |
from dotenv import load_dotenv | |
import os | |
import re | |
from app import client, default_embedding_function | |
import secrets | |
import streamlit as st | |
from spellchecker import SpellChecker | |
load_dotenv() | |
openai_key = os.getenv("OPENAI_API_KEY") | |
def clean_text(text): | |
# Remove non-ASCII characters | |
cleaned_text = re.sub(r"[^\x00-\x7F]+", " ", text) | |
# Replace specific artifacts you identified (example pattern below) | |
cleaned_text = re.sub( | |
r"�+", " ", cleaned_text | |
) # Replace sequences of � with a single space | |
# Further cleaning steps can be added here as needed | |
return cleaned_text | |
def correct_spelling(text): | |
spell = SpellChecker() | |
corrected_text = " ".join([spell.correction(word) for word in text.split()]) | |
return corrected_text | |
def clean_list(contents): | |
cleaned_contents = [] | |
for text in contents: | |
# Count weird characters (non-ASCII) | |
weird_chars = sum(not (ord(char) < 128) for char in text) | |
# Check if weird characters are more than 20% of the text | |
if weird_chars / len(text) <= 0.20: | |
cleaned_contents.append(text) | |
return cleaned_contents | |
def generate_knowledge_box_from_url( | |
client, | |
kb_name: str, | |
urls: list, | |
embedding_fct=default_embedding_function, | |
chunk_size: int = 2_000, | |
): | |
dl = DocumentsLoader() | |
docs = dl.load_docs(urls) | |
splits = dl.split_docs(docs, chunk_size=chunk_size) | |
contents = [split.page_content for split in splits] | |
metadatas = [split.metadata for split in splits] | |
cleaned_contents = clean_list(contents) | |
cleaned_contents = [ | |
re.sub(r"\n+", " ", content) for content in cleaned_contents | |
] # clean text a bit | |
cleaned_contents = [clean_text(content) for content in cleaned_contents] | |
cleaned_contents = [correct_spelling(content) for content in cleaned_contents] | |
chroma_collection = client.create_collection( | |
kb_name, | |
embedding_function=embedding_fct, | |
metadata={"hnsw:space": "cosine"}, | |
) | |
ids = [secrets.token_hex(16) for _ in cleaned_contents] | |
chroma_collection.add(documents=cleaned_contents, ids=ids, metadatas=metadatas) | |
n_splits = chroma_collection.count() | |
return {"status": 200, "n_split": n_splits} | |
def add_links_to_knowledge_base( | |
client, | |
kb_name: str, | |
urls: list, | |
chunk_size: int = 2_000, | |
pdf_optional_link=None, | |
youtube_optional_link=None, | |
video_title=None, | |
pdf_title=None, | |
embedding_fct=default_embedding_function, | |
): | |
dl = DocumentsLoader() | |
docs = dl.load_docs(urls) | |
splits = dl.split_docs(docs, chunk_size=chunk_size) | |
contents = [split.page_content for split in splits] | |
metadatas = [split.metadata for split in splits] | |
if pdf_optional_link and pdf_title: | |
for md in metadatas: | |
md["source"] = pdf_optional_link | |
md["title"] = pdf_title | |
if youtube_optional_link and video_title: | |
for md in metadatas: | |
md["source"] = youtube_optional_link | |
md["title"] = video_title | |
cleaned_contents = [ | |
re.sub(r"\n+", " ", content) for content in contents | |
] # clean text a bit | |
embeddings = default_embedding_function(cleaned_contents) | |
chroma_collection = client.get_collection(name=kb_name) | |
ids = [secrets.token_hex(16) for _ in cleaned_contents] | |
chroma_collection.add( | |
documents=cleaned_contents, embeddings=embeddings, ids=ids, metadatas=metadatas | |
) | |
n_splits = chroma_collection.count() | |
return {"status": 200, "n_split": n_splits} | |
if __name__ == "__main__": | |
df = pd.read_csv("test_marcello.csv") | |
kb_name = "new_new_test" | |
urls = df.values.tolist() | |
# res = generate_knowledge_box_from_url( | |
# client=client, | |
# urls=urls, | |
# kb_name=kb_name, | |
# embedding_fct=default_embedding_function, | |
# chunk_size=2_000, | |
# ) | |
df = pd.read_csv("test2.csv") | |
urls = df.values.tolist() | |
res = add_links_to_knowledge_base( | |
client=client, | |
kb_name="test", | |
urls=urls, | |
) | |