|
from __future__ import annotations |
|
|
|
import io |
|
import logging |
|
import os |
|
import re |
|
from glob import has_magic |
|
from pathlib import Path |
|
|
|
|
|
from fsspec.caching import ( |
|
BaseCache, |
|
BlockCache, |
|
BytesCache, |
|
MMapCache, |
|
ReadAheadCache, |
|
caches, |
|
) |
|
from fsspec.compression import compr |
|
from fsspec.config import conf |
|
from fsspec.registry import filesystem, get_filesystem_class |
|
from fsspec.utils import ( |
|
_unstrip_protocol, |
|
build_name_function, |
|
infer_compression, |
|
stringify_path, |
|
) |
|
|
|
logger = logging.getLogger("fsspec") |
|
|
|
|
|
class OpenFile: |
|
""" |
|
File-like object to be used in a context |
|
|
|
Can layer (buffered) text-mode and compression over any file-system, which |
|
are typically binary-only. |
|
|
|
These instances are safe to serialize, as the low-level file object |
|
is not created until invoked using ``with``. |
|
|
|
Parameters |
|
---------- |
|
fs: FileSystem |
|
The file system to use for opening the file. Should be a subclass or duck-type |
|
with ``fsspec.spec.AbstractFileSystem`` |
|
path: str |
|
Location to open |
|
mode: str like 'rb', optional |
|
Mode of the opened file |
|
compression: str or None, optional |
|
Compression to apply |
|
encoding: str or None, optional |
|
The encoding to use if opened in text mode. |
|
errors: str or None, optional |
|
How to handle encoding errors if opened in text mode. |
|
newline: None or str |
|
Passed to TextIOWrapper in text mode, how to handle line endings. |
|
autoopen: bool |
|
If True, calls open() immediately. Mostly used by pickle |
|
pos: int |
|
If given and autoopen is True, seek to this location immediately |
|
""" |
|
|
|
def __init__( |
|
self, |
|
fs, |
|
path, |
|
mode="rb", |
|
compression=None, |
|
encoding=None, |
|
errors=None, |
|
newline=None, |
|
): |
|
self.fs = fs |
|
self.path = path |
|
self.mode = mode |
|
self.compression = get_compression(path, compression) |
|
self.encoding = encoding |
|
self.errors = errors |
|
self.newline = newline |
|
self.fobjects = [] |
|
|
|
def __reduce__(self): |
|
return ( |
|
OpenFile, |
|
( |
|
self.fs, |
|
self.path, |
|
self.mode, |
|
self.compression, |
|
self.encoding, |
|
self.errors, |
|
self.newline, |
|
), |
|
) |
|
|
|
def __repr__(self): |
|
return f"<OpenFile '{self.path}'>" |
|
|
|
def __enter__(self): |
|
mode = self.mode.replace("t", "").replace("b", "") + "b" |
|
|
|
try: |
|
f = self.fs.open(self.path, mode=mode) |
|
except FileNotFoundError as e: |
|
if has_magic(self.path): |
|
raise FileNotFoundError( |
|
"%s not found. The URL contains glob characters: you maybe needed\n" |
|
"to pass expand=True in fsspec.open() or the storage_options of \n" |
|
"your library. You can also set the config value 'open_expand'\n" |
|
"before import, or fsspec.core.DEFAULT_EXPAND at runtime, to True.", |
|
self.path, |
|
) from e |
|
raise |
|
|
|
self.fobjects = [f] |
|
|
|
if self.compression is not None: |
|
compress = compr[self.compression] |
|
f = compress(f, mode=mode[0]) |
|
self.fobjects.append(f) |
|
|
|
if "b" not in self.mode: |
|
|
|
f = PickleableTextIOWrapper( |
|
f, encoding=self.encoding, errors=self.errors, newline=self.newline |
|
) |
|
self.fobjects.append(f) |
|
|
|
return self.fobjects[-1] |
|
|
|
def __exit__(self, *args): |
|
self.close() |
|
|
|
@property |
|
def full_name(self): |
|
return _unstrip_protocol(self.path, self.fs) |
|
|
|
def open(self): |
|
"""Materialise this as a real open file without context |
|
|
|
The OpenFile object should be explicitly closed to avoid enclosed file |
|
instances persisting. You must, therefore, keep a reference to the OpenFile |
|
during the life of the file-like it generates. |
|
""" |
|
return self.__enter__() |
|
|
|
def close(self): |
|
"""Close all encapsulated file objects""" |
|
for f in reversed(self.fobjects): |
|
if "r" not in self.mode and not f.closed: |
|
f.flush() |
|
f.close() |
|
self.fobjects.clear() |
|
|
|
|
|
class OpenFiles(list): |
|
"""List of OpenFile instances |
|
|
|
Can be used in a single context, which opens and closes all of the |
|
contained files. Normal list access to get the elements works as |
|
normal. |
|
|
|
A special case is made for caching filesystems - the files will |
|
be down/uploaded together at the start or end of the context, and |
|
this may happen concurrently, if the target filesystem supports it. |
|
""" |
|
|
|
def __init__(self, *args, mode="rb", fs=None): |
|
self.mode = mode |
|
self.fs = fs |
|
self.files = [] |
|
super().__init__(*args) |
|
|
|
def __enter__(self): |
|
if self.fs is None: |
|
raise ValueError("Context has already been used") |
|
|
|
fs = self.fs |
|
while True: |
|
if hasattr(fs, "open_many"): |
|
|
|
self.files = fs.open_many(self) |
|
return self.files |
|
if hasattr(fs, "fs") and fs.fs is not None: |
|
fs = fs.fs |
|
else: |
|
break |
|
return [s.__enter__() for s in self] |
|
|
|
def __exit__(self, *args): |
|
fs = self.fs |
|
[s.__exit__(*args) for s in self] |
|
if "r" not in self.mode: |
|
while True: |
|
if hasattr(fs, "open_many"): |
|
|
|
fs.commit_many(self.files) |
|
return |
|
if hasattr(fs, "fs") and fs.fs is not None: |
|
fs = fs.fs |
|
else: |
|
break |
|
|
|
def __getitem__(self, item): |
|
out = super().__getitem__(item) |
|
if isinstance(item, slice): |
|
return OpenFiles(out, mode=self.mode, fs=self.fs) |
|
return out |
|
|
|
def __repr__(self): |
|
return f"<List of {len(self)} OpenFile instances>" |
|
|
|
|
|
def open_files( |
|
urlpath, |
|
mode="rb", |
|
compression=None, |
|
encoding="utf8", |
|
errors=None, |
|
name_function=None, |
|
num=1, |
|
protocol=None, |
|
newline=None, |
|
auto_mkdir=True, |
|
expand=True, |
|
**kwargs, |
|
): |
|
"""Given a path or paths, return a list of ``OpenFile`` objects. |
|
|
|
For writing, a str path must contain the "*" character, which will be filled |
|
in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2. |
|
|
|
For either reading or writing, can instead provide explicit list of paths. |
|
|
|
Parameters |
|
---------- |
|
urlpath: string or list |
|
Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` |
|
to read from alternative filesystems. To read from multiple files you |
|
can pass a globstring or a list of paths, with the caveat that they |
|
must all have the same protocol. |
|
mode: 'rb', 'wt', etc. |
|
compression: string or None |
|
If given, open file using compression codec. Can either be a compression |
|
name (a key in ``fsspec.compression.compr``) or "infer" to guess the |
|
compression from the filename suffix. |
|
encoding: str |
|
For text mode only |
|
errors: None or str |
|
Passed to TextIOWrapper in text mode |
|
name_function: function or None |
|
if opening a set of files for writing, those files do not yet exist, |
|
so we need to generate their names by formatting the urlpath for |
|
each sequence number |
|
num: int [1] |
|
if writing mode, number of files we expect to create (passed to |
|
name+function) |
|
protocol: str or None |
|
If given, overrides the protocol found in the URL. |
|
newline: bytes or None |
|
Used for line terminator in text mode. If None, uses system default; |
|
if blank, uses no translation. |
|
auto_mkdir: bool (True) |
|
If in write mode, this will ensure the target directory exists before |
|
writing, by calling ``fs.mkdirs(exist_ok=True)``. |
|
expand: bool |
|
**kwargs: dict |
|
Extra options that make sense to a particular storage connection, e.g. |
|
host, port, username, password, etc. |
|
|
|
Examples |
|
-------- |
|
>>> files = open_files('2015-*-*.csv') # doctest: +SKIP |
|
>>> files = open_files( |
|
... 's3://bucket/2015-*-*.csv.gz', compression='gzip' |
|
... ) # doctest: +SKIP |
|
|
|
Returns |
|
------- |
|
An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can |
|
be used as a single context |
|
|
|
Notes |
|
----- |
|
For a full list of the available protocols and the implementations that |
|
they map across to see the latest online documentation: |
|
|
|
- For implementations built into ``fsspec`` see |
|
https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations |
|
- For implementations in separate packages see |
|
https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations |
|
""" |
|
fs, fs_token, paths = get_fs_token_paths( |
|
urlpath, |
|
mode, |
|
num=num, |
|
name_function=name_function, |
|
storage_options=kwargs, |
|
protocol=protocol, |
|
expand=expand, |
|
) |
|
if fs.protocol == "file": |
|
fs.auto_mkdir = auto_mkdir |
|
elif "r" not in mode and auto_mkdir: |
|
parents = {fs._parent(path) for path in paths} |
|
for parent in parents: |
|
try: |
|
fs.makedirs(parent, exist_ok=True) |
|
except PermissionError: |
|
pass |
|
return OpenFiles( |
|
[ |
|
OpenFile( |
|
fs, |
|
path, |
|
mode=mode, |
|
compression=compression, |
|
encoding=encoding, |
|
errors=errors, |
|
newline=newline, |
|
) |
|
for path in paths |
|
], |
|
mode=mode, |
|
fs=fs, |
|
) |
|
|
|
|
|
def _un_chain(path, kwargs): |
|
|
|
from fsspec.implementations.cached import CachingFileSystem |
|
|
|
if "::" in path: |
|
x = re.compile(".*[^a-z]+.*") |
|
bits = [] |
|
for p in path.split("::"): |
|
if "://" in p or x.match(p): |
|
bits.append(p) |
|
else: |
|
bits.append(p + "://") |
|
else: |
|
bits = [path] |
|
|
|
out = [] |
|
previous_bit = None |
|
kwargs = kwargs.copy() |
|
for bit in reversed(bits): |
|
protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file" |
|
cls = get_filesystem_class(protocol) |
|
extra_kwargs = cls._get_kwargs_from_urls(bit) |
|
kws = kwargs.pop(protocol, {}) |
|
if bit is bits[0]: |
|
kws.update(kwargs) |
|
kw = dict( |
|
**{k: v for k, v in extra_kwargs.items() if k not in kws or v != kws[k]}, |
|
**kws, |
|
) |
|
bit = cls._strip_protocol(bit) |
|
if "target_protocol" not in kw and issubclass(cls, CachingFileSystem): |
|
bit = previous_bit |
|
out.append((bit, protocol, kw)) |
|
previous_bit = bit |
|
out.reverse() |
|
return out |
|
|
|
|
|
def url_to_fs(url, **kwargs): |
|
""" |
|
Turn fully-qualified and potentially chained URL into filesystem instance |
|
|
|
Parameters |
|
---------- |
|
url : str |
|
The fsspec-compatible URL |
|
**kwargs: dict |
|
Extra options that make sense to a particular storage connection, e.g. |
|
host, port, username, password, etc. |
|
|
|
Returns |
|
------- |
|
filesystem : FileSystem |
|
The new filesystem discovered from ``url`` and created with |
|
``**kwargs``. |
|
urlpath : str |
|
The file-systems-specific URL for ``url``. |
|
""" |
|
url = stringify_path(url) |
|
|
|
|
|
known_kwargs = { |
|
"compression", |
|
"encoding", |
|
"errors", |
|
"expand", |
|
"mode", |
|
"name_function", |
|
"newline", |
|
"num", |
|
} |
|
kwargs = {k: v for k, v in kwargs.items() if k not in known_kwargs} |
|
chain = _un_chain(url, kwargs) |
|
inkwargs = {} |
|
|
|
for i, ch in enumerate(reversed(chain)): |
|
urls, protocol, kw = ch |
|
if i == len(chain) - 1: |
|
inkwargs = dict(**kw, **inkwargs) |
|
continue |
|
inkwargs["target_options"] = dict(**kw, **inkwargs) |
|
inkwargs["target_protocol"] = protocol |
|
inkwargs["fo"] = urls |
|
urlpath, protocol, _ = chain[0] |
|
fs = filesystem(protocol, **inkwargs) |
|
return fs, urlpath |
|
|
|
|
|
DEFAULT_EXPAND = conf.get("open_expand", False) |
|
|
|
|
|
def open( |
|
urlpath, |
|
mode="rb", |
|
compression=None, |
|
encoding="utf8", |
|
errors=None, |
|
protocol=None, |
|
newline=None, |
|
expand=None, |
|
**kwargs, |
|
): |
|
"""Given a path or paths, return one ``OpenFile`` object. |
|
|
|
Parameters |
|
---------- |
|
urlpath: string or list |
|
Absolute or relative filepath. Prefix with a protocol like ``s3://`` |
|
to read from alternative filesystems. Should not include glob |
|
character(s). |
|
mode: 'rb', 'wt', etc. |
|
compression: string or None |
|
If given, open file using compression codec. Can either be a compression |
|
name (a key in ``fsspec.compression.compr``) or "infer" to guess the |
|
compression from the filename suffix. |
|
encoding: str |
|
For text mode only |
|
errors: None or str |
|
Passed to TextIOWrapper in text mode |
|
protocol: str or None |
|
If given, overrides the protocol found in the URL. |
|
newline: bytes or None |
|
Used for line terminator in text mode. If None, uses system default; |
|
if blank, uses no translation. |
|
expand: bool or Nonw |
|
Whether to regard file paths containing special glob characters as needing |
|
expansion (finding the first match) or absolute. Setting False allows using |
|
paths which do embed such characters. If None (default), this argument |
|
takes its value from the DEFAULT_EXPAND module variable, which takes |
|
its initial value from the "open_expand" config value at startup, which will |
|
be False if not set. |
|
**kwargs: dict |
|
Extra options that make sense to a particular storage connection, e.g. |
|
host, port, username, password, etc. |
|
|
|
Examples |
|
-------- |
|
>>> openfile = open('2015-01-01.csv') # doctest: +SKIP |
|
>>> openfile = open( |
|
... 's3://bucket/2015-01-01.csv.gz', compression='gzip' |
|
... ) # doctest: +SKIP |
|
>>> with openfile as f: |
|
... df = pd.read_csv(f) # doctest: +SKIP |
|
... |
|
|
|
Returns |
|
------- |
|
``OpenFile`` object. |
|
|
|
Notes |
|
----- |
|
For a full list of the available protocols and the implementations that |
|
they map across to see the latest online documentation: |
|
|
|
- For implementations built into ``fsspec`` see |
|
https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations |
|
- For implementations in separate packages see |
|
https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations |
|
""" |
|
expand = DEFAULT_EXPAND if expand is None else expand |
|
out = open_files( |
|
urlpath=[urlpath], |
|
mode=mode, |
|
compression=compression, |
|
encoding=encoding, |
|
errors=errors, |
|
protocol=protocol, |
|
newline=newline, |
|
expand=expand, |
|
**kwargs, |
|
) |
|
if not out: |
|
raise FileNotFoundError(urlpath) |
|
return out[0] |
|
|
|
|
|
def open_local( |
|
url: str | list[str] | Path | list[Path], |
|
mode: str = "rb", |
|
**storage_options: dict, |
|
) -> str | list[str]: |
|
"""Open file(s) which can be resolved to local |
|
|
|
For files which either are local, or get downloaded upon open |
|
(e.g., by file caching) |
|
|
|
Parameters |
|
---------- |
|
url: str or list(str) |
|
mode: str |
|
Must be read mode |
|
storage_options: |
|
passed on to FS for or used by open_files (e.g., compression) |
|
""" |
|
if "r" not in mode: |
|
raise ValueError("Can only ensure local files when reading") |
|
of = open_files(url, mode=mode, **storage_options) |
|
if not getattr(of[0].fs, "local_file", False): |
|
raise ValueError( |
|
"open_local can only be used on a filesystem which" |
|
" has attribute local_file=True" |
|
) |
|
with of as files: |
|
paths = [f.name for f in files] |
|
if (isinstance(url, str) and not has_magic(url)) or isinstance(url, Path): |
|
return paths[0] |
|
return paths |
|
|
|
|
|
def get_compression(urlpath, compression): |
|
if compression == "infer": |
|
compression = infer_compression(urlpath) |
|
if compression is not None and compression not in compr: |
|
raise ValueError(f"Compression type {compression} not supported") |
|
return compression |
|
|
|
|
|
def split_protocol(urlpath): |
|
"""Return protocol, path pair""" |
|
urlpath = stringify_path(urlpath) |
|
if "://" in urlpath: |
|
protocol, path = urlpath.split("://", 1) |
|
if len(protocol) > 1: |
|
|
|
return protocol, path |
|
if urlpath.startswith("data:"): |
|
return urlpath.split(":", 1) |
|
return None, urlpath |
|
|
|
|
|
def strip_protocol(urlpath): |
|
"""Return only path part of full URL, according to appropriate backend""" |
|
protocol, _ = split_protocol(urlpath) |
|
cls = get_filesystem_class(protocol) |
|
return cls._strip_protocol(urlpath) |
|
|
|
|
|
def expand_paths_if_needed(paths, mode, num, fs, name_function): |
|
"""Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]`` |
|
in them (read mode). |
|
|
|
:param paths: list of paths |
|
mode: str |
|
Mode in which to open files. |
|
num: int |
|
If opening in writing mode, number of files we expect to create. |
|
fs: filesystem object |
|
name_function: callable |
|
If opening in writing mode, this callable is used to generate path |
|
names. Names are generated for each partition by |
|
``urlpath.replace('*', name_function(partition_index))``. |
|
:return: list of paths |
|
""" |
|
expanded_paths = [] |
|
paths = list(paths) |
|
|
|
if "w" in mode: |
|
if sum(1 for p in paths if "*" in p) > 1: |
|
raise ValueError( |
|
"When writing data, only one filename mask can be specified." |
|
) |
|
num = max(num, len(paths)) |
|
|
|
for curr_path in paths: |
|
if "*" in curr_path: |
|
|
|
expanded_paths.extend(_expand_paths(curr_path, name_function, num)) |
|
else: |
|
expanded_paths.append(curr_path) |
|
|
|
if len(expanded_paths) > num: |
|
expanded_paths = expanded_paths[:num] |
|
|
|
else: |
|
for curr_path in paths: |
|
if has_magic(curr_path): |
|
|
|
expanded_paths.extend(fs.glob(curr_path)) |
|
else: |
|
expanded_paths.append(curr_path) |
|
|
|
return expanded_paths |
|
|
|
|
|
def get_fs_token_paths( |
|
urlpath, |
|
mode="rb", |
|
num=1, |
|
name_function=None, |
|
storage_options=None, |
|
protocol=None, |
|
expand=True, |
|
): |
|
"""Filesystem, deterministic token, and paths from a urlpath and options. |
|
|
|
Parameters |
|
---------- |
|
urlpath: string or iterable |
|
Absolute or relative filepath, URL (may include protocols like |
|
``s3://``), or globstring pointing to data. |
|
mode: str, optional |
|
Mode in which to open files. |
|
num: int, optional |
|
If opening in writing mode, number of files we expect to create. |
|
name_function: callable, optional |
|
If opening in writing mode, this callable is used to generate path |
|
names. Names are generated for each partition by |
|
``urlpath.replace('*', name_function(partition_index))``. |
|
storage_options: dict, optional |
|
Additional keywords to pass to the filesystem class. |
|
protocol: str or None |
|
To override the protocol specifier in the URL |
|
expand: bool |
|
Expand string paths for writing, assuming the path is a directory |
|
""" |
|
if isinstance(urlpath, (list, tuple, set)): |
|
if not urlpath: |
|
raise ValueError("empty urlpath sequence") |
|
urlpath0 = stringify_path(next(iter(urlpath))) |
|
else: |
|
urlpath0 = stringify_path(urlpath) |
|
storage_options = storage_options or {} |
|
if protocol: |
|
storage_options["protocol"] = protocol |
|
chain = _un_chain(urlpath0, storage_options or {}) |
|
inkwargs = {} |
|
|
|
for i, ch in enumerate(reversed(chain)): |
|
urls, nested_protocol, kw = ch |
|
if i == len(chain) - 1: |
|
inkwargs = dict(**kw, **inkwargs) |
|
continue |
|
inkwargs["target_options"] = dict(**kw, **inkwargs) |
|
inkwargs["target_protocol"] = nested_protocol |
|
inkwargs["fo"] = urls |
|
paths, protocol, _ = chain[0] |
|
fs = filesystem(protocol, **inkwargs) |
|
if isinstance(urlpath, (list, tuple, set)): |
|
pchains = [ |
|
_un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath |
|
] |
|
if len({pc[1] for pc in pchains}) > 1: |
|
raise ValueError("Protocol mismatch getting fs from %s", urlpath) |
|
paths = [pc[0] for pc in pchains] |
|
else: |
|
paths = fs._strip_protocol(paths) |
|
if isinstance(paths, (list, tuple, set)): |
|
if expand: |
|
paths = expand_paths_if_needed(paths, mode, num, fs, name_function) |
|
elif not isinstance(paths, list): |
|
paths = list(paths) |
|
else: |
|
if ("w" in mode or "x" in mode) and expand: |
|
paths = _expand_paths(paths, name_function, num) |
|
elif "*" in paths: |
|
paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)] |
|
else: |
|
paths = [paths] |
|
|
|
return fs, fs._fs_token, paths |
|
|
|
|
|
def _expand_paths(path, name_function, num): |
|
if isinstance(path, str): |
|
if path.count("*") > 1: |
|
raise ValueError("Output path spec must contain exactly one '*'.") |
|
elif "*" not in path: |
|
path = os.path.join(path, "*.part") |
|
|
|
if name_function is None: |
|
name_function = build_name_function(num - 1) |
|
|
|
paths = [path.replace("*", name_function(i)) for i in range(num)] |
|
if paths != sorted(paths): |
|
logger.warning( |
|
"In order to preserve order between partitions" |
|
" paths created with ``name_function`` should " |
|
"sort to partition order" |
|
) |
|
elif isinstance(path, (tuple, list)): |
|
assert len(path) == num |
|
paths = list(path) |
|
else: |
|
raise ValueError( |
|
"Path should be either\n" |
|
"1. A list of paths: ['foo.json', 'bar.json', ...]\n" |
|
"2. A directory: 'foo/\n" |
|
"3. A path with a '*' in it: 'foo.*.json'" |
|
) |
|
return paths |
|
|
|
|
|
class PickleableTextIOWrapper(io.TextIOWrapper): |
|
"""TextIOWrapper cannot be pickled. This solves it. |
|
|
|
Requires that ``buffer`` be pickleable, which all instances of |
|
AbstractBufferedFile are. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
buffer, |
|
encoding=None, |
|
errors=None, |
|
newline=None, |
|
line_buffering=False, |
|
write_through=False, |
|
): |
|
self.args = buffer, encoding, errors, newline, line_buffering, write_through |
|
super().__init__(*self.args) |
|
|
|
def __reduce__(self): |
|
return PickleableTextIOWrapper, self.args |
|
|