Spaces:
Runtime error
Runtime error
File size: 4,995 Bytes
5a67683 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import abc
import logging
import threading
from pathlib import Path
from typing import Any
from llama_index import (
Document,
ServiceContext,
StorageContext,
VectorStoreIndex,
load_index_from_storage,
)
from llama_index.data_structs import IndexDict
from llama_index.indices.base import BaseIndex
from app.components.ingest.helpers import IngestionHelper
from app.paths import local_data_path
logger = logging.getLogger(__name__)
class BaseIngestComponent(abc.ABC):
def __init__(
self,
storage_context: StorageContext,
service_context: ServiceContext,
*args: Any,
**kwargs: Any,
) -> None:
logger.debug(f"Initializing base ingest component type={type(self).__name__}")
self.storage_context = storage_context
self.service_context = service_context
@abc.abstractmethod
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
pass
@abc.abstractmethod
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
pass
@abc.abstractmethod
def delete(self, doc_id: str) -> None:
pass
class BaseIngestComponentWithIndex(BaseIngestComponent, abc.ABC):
def __init__(
self,
storage_context: StorageContext,
service_context: ServiceContext,
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(storage_context, service_context, *args, **kwargs)
self.show_progress = True
self._index_thread_lock = (
threading.Lock()
) # Thread lock! Not Multiprocessing lock
self._index = self._initialize_index()
def _initialize_index(self) -> BaseIndex[IndexDict]:
"""Initialize the index from the storage context."""
try:
# Load the index with store_nodes_override=True to be able to delete them
index = load_index_from_storage(
storage_context=self.storage_context,
service_context=self.service_context,
store_nodes_override=True, # Force store nodes in index and document stores
show_progress=self.show_progress,
)
except ValueError:
# There are no index in the storage context, creating a new one
logger.info("Creating a new vector store index")
index = VectorStoreIndex.from_documents(
[],
storage_context=self.storage_context,
service_context=self.service_context,
store_nodes_override=True, # Force store nodes in index and document stores
show_progress=self.show_progress,
)
index.storage_context.persist(persist_dir=local_data_path)
return index
def _save_index(self) -> None:
self._index.storage_context.persist(persist_dir=local_data_path)
def delete(self, doc_id: str) -> None:
with self._index_thread_lock:
# Delete the document from the index
self._index.delete_ref_doc(doc_id, delete_from_docstore=True)
# Save the index
self._save_index()
class SimpleIngestComponent(BaseIngestComponentWithIndex):
def __init__(
self,
storage_context: StorageContext,
service_context: ServiceContext,
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(storage_context, service_context, *args, **kwargs)
def ingest(self, file_name: str, file_data: Path) -> list[Document]:
logger.info("Ingesting file_name=%s", file_name)
documents = IngestionHelper.transform_file_into_documents(file_name, file_data)
logger.info(
"Transformed file=%s into count=%s documents", file_name, len(documents)
)
logger.debug("Saving the documents in the index and doc store")
return self._save_docs(documents)
def bulk_ingest(self, files: list[tuple[str, Path]]) -> list[Document]:
saved_documents = []
for file_name, file_data in files:
documents = IngestionHelper.transform_file_into_documents(
file_name, file_data
)
saved_documents.extend(self._save_docs(documents))
return saved_documents
def _save_docs(self, documents: list[Document]) -> list[Document]:
logger.debug("Transforming count=%s documents into nodes", len(documents))
with self._index_thread_lock:
for document in documents:
self._index.insert(document, show_progress=True)
logger.debug("Persisting the index and nodes")
# persist the index and nodes
self._save_index()
logger.debug("Persisted the index and nodes")
return documents
def get_ingestion_component(
storage_context: StorageContext,
service_context: ServiceContext,
) -> BaseIngestComponent:
return SimpleIngestComponent(storage_context, service_context)
|