Spaces:
Runtime error
Runtime error
langchain-qa-bot
/
docs
/langchain
/libs
/community
/langchain_community
/document_loaders
/concurrent.py
from __future__ import annotations | |
import concurrent.futures | |
from pathlib import Path | |
from typing import Iterator, Literal, Optional, Sequence, Union | |
from langchain_core.documents import Document | |
from langchain_community.document_loaders.base import BaseBlobParser | |
from langchain_community.document_loaders.blob_loaders import ( | |
BlobLoader, | |
FileSystemBlobLoader, | |
) | |
from langchain_community.document_loaders.generic import GenericLoader | |
from langchain_community.document_loaders.parsers.registry import get_parser | |
_PathLike = Union[str, Path] | |
DEFAULT = Literal["default"] | |
class ConcurrentLoader(GenericLoader): | |
"""Load and pars Documents concurrently.""" | |
def __init__( | |
self, | |
blob_loader: BlobLoader, # type: ignore[valid-type] | |
blob_parser: BaseBlobParser, | |
num_workers: int = 4, # type: ignore[valid-type] | |
) -> None: | |
super().__init__(blob_loader, blob_parser) | |
self.num_workers = num_workers | |
def lazy_load( | |
self, | |
) -> Iterator[Document]: | |
"""Load documents lazily with concurrent parsing.""" | |
with concurrent.futures.ThreadPoolExecutor( | |
max_workers=self.num_workers | |
) as executor: | |
futures = { | |
executor.submit(self.blob_parser.lazy_parse, blob) | |
for blob in self.blob_loader.yield_blobs() # type: ignore[attr-defined] | |
} | |
for future in concurrent.futures.as_completed(futures): | |
yield from future.result() | |
def from_filesystem( | |
cls, | |
path: _PathLike, | |
*, | |
glob: str = "**/[!.]*", | |
exclude: Sequence[str] = (), | |
suffixes: Optional[Sequence[str]] = None, | |
show_progress: bool = False, | |
parser: Union[DEFAULT, BaseBlobParser] = "default", | |
num_workers: int = 4, | |
parser_kwargs: Optional[dict] = None, | |
) -> ConcurrentLoader: | |
"""Create a concurrent generic document loader using a filesystem blob loader. | |
Args: | |
path: The path to the directory to load documents from. | |
glob: The glob pattern to use to find documents. | |
suffixes: The suffixes to use to filter documents. If None, all files | |
matching the glob will be loaded. | |
exclude: A list of patterns to exclude from the loader. | |
show_progress: Whether to show a progress bar or not (requires tqdm). | |
Proxies to the file system loader. | |
parser: A blob parser which knows how to parse blobs into documents | |
num_workers: Max number of concurrent workers to use. | |
parser_kwargs: Keyword arguments to pass to the parser. | |
""" | |
blob_loader = FileSystemBlobLoader( # type: ignore[attr-defined, misc] | |
path, | |
glob=glob, | |
exclude=exclude, | |
suffixes=suffixes, | |
show_progress=show_progress, | |
) | |
if isinstance(parser, str): | |
if parser == "default" and cls.get_parser != GenericLoader.get_parser: | |
# There is an implementation of get_parser on the class, use it. | |
blob_parser = cls.get_parser(**(parser_kwargs or {})) | |
else: | |
blob_parser = get_parser(parser) | |
else: | |
blob_parser = parser | |
return cls(blob_loader, blob_parser, num_workers=num_workers) | |