| from __future__ import annotations |
|
|
| import inspect |
| import logging |
| import os |
| import tempfile |
| import time |
| import weakref |
| from shutil import rmtree |
| from typing import TYPE_CHECKING, Any, Callable, ClassVar |
|
|
| from fsspec import AbstractFileSystem, filesystem |
| from fsspec.callbacks import _DEFAULT_CALLBACK |
| from fsspec.compression import compr |
| from fsspec.core import BaseCache, MMapCache |
| from fsspec.exceptions import BlocksizeMismatchError |
| from fsspec.implementations.cache_mapper import create_cache_mapper |
| from fsspec.implementations.cache_metadata import CacheMetadata |
| from fsspec.spec import AbstractBufferedFile |
| from fsspec.transaction import Transaction |
| from fsspec.utils import infer_compression |
|
|
| if TYPE_CHECKING: |
| from fsspec.implementations.cache_mapper import AbstractCacheMapper |
|
|
| logger = logging.getLogger("fsspec.cached") |
|
|
|
|
| class WriteCachedTransaction(Transaction): |
| def complete(self, commit=True): |
| rpaths = [f.path for f in self.files] |
| lpaths = [f.fn for f in self.files] |
| if commit: |
| self.fs.put(lpaths, rpaths) |
| |
| self.fs._intrans = False |
|
|
|
|
| class CachingFileSystem(AbstractFileSystem): |
| """Locally caching filesystem, layer over any other FS |
| |
| This class implements chunk-wise local storage of remote files, for quick |
| access after the initial download. The files are stored in a given |
| directory with hashes of URLs for the filenames. If no directory is given, |
| a temporary one is used, which should be cleaned up by the OS after the |
| process ends. The files themselves are sparse (as implemented in |
| :class:`~fsspec.caching.MMapCache`), so only the data which is accessed |
| takes up space. |
| |
| Restrictions: |
| |
| - the block-size must be the same for each access of a given file, unless |
| all blocks of the file have already been read |
| - caching can only be applied to file-systems which produce files |
| derived from fsspec.spec.AbstractBufferedFile ; LocalFileSystem is also |
| allowed, for testing |
| """ |
|
|
| protocol: ClassVar[str | tuple[str, ...]] = ("blockcache", "cached") |
|
|
| def __init__( |
| self, |
| target_protocol=None, |
| cache_storage="TMP", |
| cache_check=10, |
| check_files=False, |
| expiry_time=604800, |
| target_options=None, |
| fs=None, |
| same_names: bool | None = None, |
| compression=None, |
| cache_mapper: AbstractCacheMapper | None = None, |
| **kwargs, |
| ): |
| """ |
| |
| Parameters |
| ---------- |
| target_protocol: str (optional) |
| Target filesystem protocol. Provide either this or ``fs``. |
| cache_storage: str or list(str) |
| Location to store files. If "TMP", this is a temporary directory, |
| and will be cleaned up by the OS when this process ends (or later). |
| If a list, each location will be tried in the order given, but |
| only the last will be considered writable. |
| cache_check: int |
| Number of seconds between reload of cache metadata |
| check_files: bool |
| Whether to explicitly see if the UID of the remote file matches |
| the stored one before using. Warning: some file systems such as |
| HTTP cannot reliably give a unique hash of the contents of some |
| path, so be sure to set this option to False. |
| expiry_time: int |
| The time in seconds after which a local copy is considered useless. |
| Set to falsy to prevent expiry. The default is equivalent to one |
| week. |
| target_options: dict or None |
| Passed to the instantiation of the FS, if fs is None. |
| fs: filesystem instance |
| The target filesystem to run against. Provide this or ``protocol``. |
| same_names: bool (optional) |
| By default, target URLs are hashed using a ``HashCacheMapper`` so |
| that files from different backends with the same basename do not |
| conflict. If this argument is ``true``, a ``BasenameCacheMapper`` |
| is used instead. Other cache mapper options are available by using |
| the ``cache_mapper`` keyword argument. Only one of this and |
| ``cache_mapper`` should be specified. |
| compression: str (optional) |
| To decompress on download. Can be 'infer' (guess from the URL name), |
| one of the entries in ``fsspec.compression.compr``, or None for no |
| decompression. |
| cache_mapper: AbstractCacheMapper (optional) |
| The object use to map from original filenames to cached filenames. |
| Only one of this and ``same_names`` should be specified. |
| """ |
| super().__init__(**kwargs) |
| if fs is None and target_protocol is None: |
| raise ValueError( |
| "Please provide filesystem instance(fs) or target_protocol" |
| ) |
| if not (fs is None) ^ (target_protocol is None): |
| raise ValueError( |
| "Both filesystems (fs) and target_protocol may not be both given." |
| ) |
| if cache_storage == "TMP": |
| tempdir = tempfile.mkdtemp() |
| storage = [tempdir] |
| weakref.finalize(self, self._remove_tempdir, tempdir) |
| else: |
| if isinstance(cache_storage, str): |
| storage = [cache_storage] |
| else: |
| storage = cache_storage |
| os.makedirs(storage[-1], exist_ok=True) |
| self.storage = storage |
| self.kwargs = target_options or {} |
| self.cache_check = cache_check |
| self.check_files = check_files |
| self.expiry = expiry_time |
| self.compression = compression |
|
|
| |
| |
| |
| self._cache_size = None |
|
|
| if same_names is not None and cache_mapper is not None: |
| raise ValueError( |
| "Cannot specify both same_names and cache_mapper in " |
| "CachingFileSystem.__init__" |
| ) |
| if cache_mapper is not None: |
| self._mapper = cache_mapper |
| else: |
| self._mapper = create_cache_mapper( |
| same_names if same_names is not None else False |
| ) |
|
|
| self.target_protocol = ( |
| target_protocol |
| if isinstance(target_protocol, str) |
| else (fs.protocol if isinstance(fs.protocol, str) else fs.protocol[0]) |
| ) |
| self._metadata = CacheMetadata(self.storage) |
| self.load_cache() |
| self.fs = fs if fs is not None else filesystem(target_protocol, **self.kwargs) |
|
|
| def _strip_protocol(path): |
| |
| return self.fs._strip_protocol(type(self)._strip_protocol(path)) |
|
|
| self._strip_protocol: Callable = _strip_protocol |
|
|
| @staticmethod |
| def _remove_tempdir(tempdir): |
| try: |
| rmtree(tempdir) |
| except Exception: |
| pass |
|
|
| def _mkcache(self): |
| os.makedirs(self.storage[-1], exist_ok=True) |
|
|
| def cache_size(self): |
| """Return size of cache in bytes. |
| |
| If more than one cache directory is in use, only the size of the last |
| one (the writable cache directory) is returned. |
| """ |
| if self._cache_size is None: |
| cache_dir = self.storage[-1] |
| self._cache_size = filesystem("file").du(cache_dir, withdirs=True) |
| return self._cache_size |
|
|
| def load_cache(self): |
| """Read set of stored blocks from file""" |
| self._metadata.load() |
| self._mkcache() |
| self.last_cache = time.time() |
|
|
| def save_cache(self): |
| """Save set of stored blocks from file""" |
| self._mkcache() |
| self._metadata.save() |
| self.last_cache = time.time() |
| self._cache_size = None |
|
|
| def _check_cache(self): |
| """Reload caches if time elapsed or any disappeared""" |
| self._mkcache() |
| if not self.cache_check: |
| |
| return |
| timecond = time.time() - self.last_cache > self.cache_check |
| existcond = all(os.path.exists(storage) for storage in self.storage) |
| if timecond or not existcond: |
| self.load_cache() |
|
|
| def _check_file(self, path): |
| """Is path in cache and still valid""" |
| path = self._strip_protocol(path) |
| self._check_cache() |
| return self._metadata.check_file(path, self) |
|
|
| def clear_cache(self): |
| """Remove all files and metadata from the cache |
| |
| In the case of multiple cache locations, this clears only the last one, |
| which is assumed to be the read/write one. |
| """ |
| rmtree(self.storage[-1]) |
| self.load_cache() |
| self._cache_size = None |
|
|
| def clear_expired_cache(self, expiry_time=None): |
| """Remove all expired files and metadata from the cache |
| |
| In the case of multiple cache locations, this clears only the last one, |
| which is assumed to be the read/write one. |
| |
| Parameters |
| ---------- |
| expiry_time: int |
| The time in seconds after which a local copy is considered useless. |
| If not defined the default is equivalent to the attribute from the |
| file caching instantiation. |
| """ |
|
|
| if not expiry_time: |
| expiry_time = self.expiry |
|
|
| self._check_cache() |
|
|
| expired_files, writable_cache_empty = self._metadata.clear_expired(expiry_time) |
| for fn in expired_files: |
| if os.path.exists(fn): |
| os.remove(fn) |
|
|
| if writable_cache_empty: |
| rmtree(self.storage[-1]) |
| self.load_cache() |
|
|
| self._cache_size = None |
|
|
| def pop_from_cache(self, path): |
| """Remove cached version of given file |
| |
| Deletes local copy of the given (remote) path. If it is found in a cache |
| location which is not the last, it is assumed to be read-only, and |
| raises PermissionError |
| """ |
| path = self._strip_protocol(path) |
| fn = self._metadata.pop_file(path) |
| if fn is not None: |
| os.remove(fn) |
| self._cache_size = None |
|
|
| def _open( |
| self, |
| path, |
| mode="rb", |
| block_size=None, |
| autocommit=True, |
| cache_options=None, |
| **kwargs, |
| ): |
| """Wrap the target _open |
| |
| If the whole file exists in the cache, just open it locally and |
| return that. |
| |
| Otherwise, open the file on the target FS, and make it have a mmap |
| cache pointing to the location which we determine, in our cache. |
| The ``blocks`` instance is shared, so as the mmap cache instance |
| updates, so does the entry in our ``cached_files`` attribute. |
| We monkey-patch this file, so that when it closes, we call |
| ``close_and_update`` to save the state of the blocks. |
| """ |
| path = self._strip_protocol(path) |
|
|
| path = self.fs._strip_protocol(path) |
| if "r" not in mode: |
| return self.fs._open( |
| path, |
| mode=mode, |
| block_size=block_size, |
| autocommit=autocommit, |
| cache_options=cache_options, |
| **kwargs, |
| ) |
| detail = self._check_file(path) |
| if detail: |
| |
| detail, fn = detail |
| hash, blocks = detail["fn"], detail["blocks"] |
| if blocks is True: |
| |
| logger.debug("Opening local copy of %s", path) |
| return open(fn, mode) |
| |
| logger.debug("Opening partially cached copy of %s", path) |
| else: |
| hash = self._mapper(path) |
| fn = os.path.join(self.storage[-1], hash) |
| blocks = set() |
| detail = { |
| "original": path, |
| "fn": hash, |
| "blocks": blocks, |
| "time": time.time(), |
| "uid": self.fs.ukey(path), |
| } |
| self._metadata.update_file(path, detail) |
| logger.debug("Creating local sparse file for %s", path) |
|
|
| |
| self._mkcache() |
| f = self.fs._open( |
| path, |
| mode=mode, |
| block_size=block_size, |
| autocommit=autocommit, |
| cache_options=cache_options, |
| cache_type="none", |
| **kwargs, |
| ) |
| if self.compression: |
| comp = ( |
| infer_compression(path) |
| if self.compression == "infer" |
| else self.compression |
| ) |
| f = compr[comp](f, mode="rb") |
| if "blocksize" in detail: |
| if detail["blocksize"] != f.blocksize: |
| raise BlocksizeMismatchError( |
| f"Cached file must be reopened with same block" |
| f" size as original (old: {detail['blocksize']}," |
| f" new {f.blocksize})" |
| ) |
| else: |
| detail["blocksize"] = f.blocksize |
| f.cache = MMapCache(f.blocksize, f._fetch_range, f.size, fn, blocks) |
| close = f.close |
| f.close = lambda: self.close_and_update(f, close) |
| self.save_cache() |
| return f |
|
|
| def _parent(self, path): |
| return self.fs._parent(path) |
|
|
| def hash_name(self, path: str, *args: Any) -> str: |
| |
| |
| return self._mapper(path) |
|
|
| def close_and_update(self, f, close): |
| """Called when a file is closing, so store the set of blocks""" |
| if f.closed: |
| return |
| path = self._strip_protocol(f.path) |
| self._metadata.on_close_cached_file(f, path) |
| try: |
| logger.debug("going to save") |
| self.save_cache() |
| logger.debug("saved") |
| except OSError: |
| logger.debug("Cache saving failed while closing file") |
| except NameError: |
| logger.debug("Cache save failed due to interpreter shutdown") |
| close() |
| f.closed = True |
|
|
| def __getattribute__(self, item): |
| if item in [ |
| "load_cache", |
| "_open", |
| "save_cache", |
| "close_and_update", |
| "__init__", |
| "__getattribute__", |
| "__reduce__", |
| "_make_local_details", |
| "open", |
| "cat", |
| "cat_file", |
| "cat_ranges", |
| "get", |
| "read_block", |
| "tail", |
| "head", |
| "_check_file", |
| "_check_cache", |
| "_mkcache", |
| "clear_cache", |
| "clear_expired_cache", |
| "pop_from_cache", |
| "_mkcache", |
| "local_file", |
| "_paths_from_path", |
| "get_mapper", |
| "open_many", |
| "commit_many", |
| "hash_name", |
| "__hash__", |
| "__eq__", |
| "to_json", |
| "cache_size", |
| "pipe_file", |
| "pipe", |
| "start_transaction", |
| "end_transaction", |
| ]: |
| |
| |
| return lambda *args, **kw: getattr(type(self), item).__get__(self)( |
| *args, **kw |
| ) |
| if item in ["__reduce_ex__"]: |
| raise AttributeError |
| if item in ["transaction"]: |
| |
| return type(self).transaction.__get__(self) |
| if item in ["_cache", "transaction_type"]: |
| |
| return getattr(type(self), item) |
| if item == "__class__": |
| return type(self) |
| d = object.__getattribute__(self, "__dict__") |
| fs = d.get("fs", None) |
| if item in d: |
| return d[item] |
| elif fs is not None: |
| if item in fs.__dict__: |
| |
| return fs.__dict__[item] |
| |
| cls = type(fs) |
| m = getattr(cls, item) |
| if (inspect.isfunction(m) or inspect.isdatadescriptor(m)) and ( |
| not hasattr(m, "__self__") or m.__self__ is None |
| ): |
| |
| return m.__get__(fs, cls) |
| return m |
| else: |
| |
| return super().__getattribute__(item) |
|
|
| def __eq__(self, other): |
| """Test for equality.""" |
| if self is other: |
| return True |
| if not isinstance(other, type(self)): |
| return False |
| return ( |
| self.storage == other.storage |
| and self.kwargs == other.kwargs |
| and self.cache_check == other.cache_check |
| and self.check_files == other.check_files |
| and self.expiry == other.expiry |
| and self.compression == other.compression |
| and self._mapper == other._mapper |
| and self.target_protocol == other.target_protocol |
| ) |
|
|
| def __hash__(self): |
| """Calculate hash.""" |
| return ( |
| hash(tuple(self.storage)) |
| ^ hash(str(self.kwargs)) |
| ^ hash(self.cache_check) |
| ^ hash(self.check_files) |
| ^ hash(self.expiry) |
| ^ hash(self.compression) |
| ^ hash(self._mapper) |
| ^ hash(self.target_protocol) |
| ) |
|
|
| def to_json(self): |
| """Calculate JSON representation. |
| |
| Not implemented yet for CachingFileSystem. |
| """ |
| raise NotImplementedError( |
| "CachingFileSystem JSON representation not implemented" |
| ) |
|
|
|
|
| class WholeFileCacheFileSystem(CachingFileSystem): |
| """Caches whole remote files on first access |
| |
| This class is intended as a layer over any other file system, and |
| will make a local copy of each file accessed, so that all subsequent |
| reads are local. This is similar to ``CachingFileSystem``, but without |
| the block-wise functionality and so can work even when sparse files |
| are not allowed. See its docstring for definition of the init |
| arguments. |
| |
| The class still needs access to the remote store for listing files, |
| and may refresh cached files. |
| """ |
|
|
| protocol = "filecache" |
| local_file = True |
|
|
| def open_many(self, open_files): |
| paths = [of.path for of in open_files] |
| if "r" in open_files.mode: |
| self._mkcache() |
| else: |
| return [ |
| LocalTempFile( |
| self.fs, |
| path, |
| mode=open_files.mode, |
| fn=os.path.join(self.storage[-1], self._mapper(path)), |
| ) |
| for path in paths |
| ] |
|
|
| if self.compression: |
| raise NotImplementedError |
| details = [self._check_file(sp) for sp in paths] |
| downpath = [p for p, d in zip(paths, details) if not d] |
| downfn0 = [ |
| os.path.join(self.storage[-1], self._mapper(p)) |
| for p, d in zip(paths, details) |
| ] |
| downfn = [fn for fn, d in zip(downfn0, details) if not d] |
| if downpath: |
| |
| self.fs.get(downpath, downfn) |
|
|
| |
| newdetail = [ |
| { |
| "original": path, |
| "fn": self._mapper(path), |
| "blocks": True, |
| "time": time.time(), |
| "uid": self.fs.ukey(path), |
| } |
| for path in downpath |
| ] |
| for path, detail in zip(downpath, newdetail): |
| self._metadata.update_file(path, detail) |
| self.save_cache() |
|
|
| def firstpart(fn): |
| |
| return fn[1] if isinstance(fn, tuple) else fn |
|
|
| return [ |
| open(firstpart(fn0) if fn0 else fn1, mode=open_files.mode) |
| for fn0, fn1 in zip(details, downfn0) |
| ] |
|
|
| def commit_many(self, open_files): |
| self.fs.put([f.fn for f in open_files], [f.path for f in open_files]) |
| [f.close() for f in open_files] |
| for f in open_files: |
| |
| try: |
| os.remove(f.name) |
| except FileNotFoundError: |
| pass |
| self._cache_size = None |
|
|
| def _make_local_details(self, path): |
| hash = self._mapper(path) |
| fn = os.path.join(self.storage[-1], hash) |
| detail = { |
| "original": path, |
| "fn": hash, |
| "blocks": True, |
| "time": time.time(), |
| "uid": self.fs.ukey(path), |
| } |
| self._metadata.update_file(path, detail) |
| logger.debug("Copying %s to local cache", path) |
| return fn |
|
|
| def cat( |
| self, |
| path, |
| recursive=False, |
| on_error="raise", |
| callback=_DEFAULT_CALLBACK, |
| **kwargs, |
| ): |
| paths = self.expand_path( |
| path, recursive=recursive, maxdepth=kwargs.get("maxdepth", None) |
| ) |
| getpaths = [] |
| storepaths = [] |
| fns = [] |
| out = {} |
| for p in paths.copy(): |
| try: |
| detail = self._check_file(p) |
| if not detail: |
| fn = self._make_local_details(p) |
| getpaths.append(p) |
| storepaths.append(fn) |
| else: |
| detail, fn = detail if isinstance(detail, tuple) else (None, detail) |
| fns.append(fn) |
| except Exception as e: |
| if on_error == "raise": |
| raise |
| if on_error == "return": |
| out[p] = e |
| paths.remove(p) |
|
|
| if getpaths: |
| self.fs.get(getpaths, storepaths) |
| self.save_cache() |
|
|
| callback.set_size(len(paths)) |
| for p, fn in zip(paths, fns): |
| with open(fn, "rb") as f: |
| out[p] = f.read() |
| callback.relative_update(1) |
| if isinstance(path, str) and len(paths) == 1 and recursive is False: |
| out = out[paths[0]] |
| return out |
|
|
| def _open(self, path, mode="rb", **kwargs): |
| path = self._strip_protocol(path) |
| if "r" not in mode: |
| fn = self._make_local_details(path) |
| return LocalTempFile(self, path, mode=mode, fn=fn) |
| detail = self._check_file(path) |
| if detail: |
| detail, fn = detail |
| _, blocks = detail["fn"], detail["blocks"] |
| if blocks is True: |
| logger.debug("Opening local copy of %s", path) |
|
|
| |
| |
| |
| |
| |
| f = open(fn, mode) |
| f.original = detail.get("original") |
| return f |
| else: |
| raise ValueError( |
| f"Attempt to open partially cached file {path}" |
| f" as a wholly cached file" |
| ) |
| else: |
| fn = self._make_local_details(path) |
| kwargs["mode"] = mode |
|
|
| |
| self._mkcache() |
| if self.compression: |
| with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2: |
| if isinstance(f, AbstractBufferedFile): |
| |
| f.cache = BaseCache(0, f.cache.fetcher, f.size) |
| comp = ( |
| infer_compression(path) |
| if self.compression == "infer" |
| else self.compression |
| ) |
| f = compr[comp](f, mode="rb") |
| data = True |
| while data: |
| block = getattr(f, "blocksize", 5 * 2**20) |
| data = f.read(block) |
| f2.write(data) |
| else: |
| self.fs.get_file(path, fn) |
| self.save_cache() |
| return self._open(path, mode) |
|
|
|
|
| class SimpleCacheFileSystem(WholeFileCacheFileSystem): |
| """Caches whole remote files on first access |
| |
| This class is intended as a layer over any other file system, and |
| will make a local copy of each file accessed, so that all subsequent |
| reads are local. This implementation only copies whole files, and |
| does not keep any metadata about the download time or file details. |
| It is therefore safer to use in multi-threaded/concurrent situations. |
| |
| This is the only of the caching filesystems that supports write: you will |
| be given a real local open file, and upon close and commit, it will be |
| uploaded to the target filesystem; the writability or the target URL is |
| not checked until that time. |
| |
| """ |
|
|
| protocol = "simplecache" |
| local_file = True |
| transaction_type = WriteCachedTransaction |
|
|
| def __init__(self, **kwargs): |
| kw = kwargs.copy() |
| for key in ["cache_check", "expiry_time", "check_files"]: |
| kw[key] = False |
| super().__init__(**kw) |
| for storage in self.storage: |
| if not os.path.exists(storage): |
| os.makedirs(storage, exist_ok=True) |
|
|
| def _check_file(self, path): |
| self._check_cache() |
| sha = self._mapper(path) |
| for storage in self.storage: |
| fn = os.path.join(storage, sha) |
| if os.path.exists(fn): |
| return fn |
|
|
| def save_cache(self): |
| pass |
|
|
| def load_cache(self): |
| pass |
|
|
| def pipe_file(self, path, value=None, **kwargs): |
| if self._intrans: |
| with self.open(path, "wb") as f: |
| f.write(value) |
| else: |
| super().pipe_file(path, value) |
|
|
| def pipe(self, path, value=None, **kwargs): |
| if isinstance(path, str): |
| self.pipe_file(self._strip_protocol(path), value, **kwargs) |
| elif isinstance(path, dict): |
| for k, v in path.items(): |
| self.pipe_file(self._strip_protocol(k), v, **kwargs) |
| else: |
| raise ValueError("path must be str or dict") |
|
|
| def cat_ranges( |
| self, paths, starts, ends, max_gap=None, on_error="return", **kwargs |
| ): |
| lpaths = [self._check_file(p) for p in paths] |
| rpaths = [p for l, p in zip(lpaths, paths) if l is False] |
| lpaths = [l for l, p in zip(lpaths, paths) if l is False] |
| self.fs.get(rpaths, lpaths) |
| return super().cat_ranges( |
| paths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs |
| ) |
|
|
| def _open(self, path, mode="rb", **kwargs): |
| path = self._strip_protocol(path) |
| sha = self._mapper(path) |
|
|
| if "r" not in mode: |
| fn = os.path.join(self.storage[-1], sha) |
| return LocalTempFile( |
| self, path, mode=mode, autocommit=not self._intrans, fn=fn |
| ) |
| fn = self._check_file(path) |
| if fn: |
| return open(fn, mode) |
|
|
| fn = os.path.join(self.storage[-1], sha) |
| logger.debug("Copying %s to local cache", path) |
| kwargs["mode"] = mode |
|
|
| self._mkcache() |
| self._cache_size = None |
| if self.compression: |
| with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2: |
| if isinstance(f, AbstractBufferedFile): |
| |
| f.cache = BaseCache(0, f.cache.fetcher, f.size) |
| comp = ( |
| infer_compression(path) |
| if self.compression == "infer" |
| else self.compression |
| ) |
| f = compr[comp](f, mode="rb") |
| data = True |
| while data: |
| block = getattr(f, "blocksize", 5 * 2**20) |
| data = f.read(block) |
| f2.write(data) |
| else: |
| self.fs.get_file(path, fn) |
| return self._open(path, mode) |
|
|
|
|
| class LocalTempFile: |
| """A temporary local file, which will be uploaded on commit""" |
|
|
| def __init__(self, fs, path, fn, mode="wb", autocommit=True, seek=0): |
| self.fn = fn |
| self.fh = open(fn, mode) |
| self.mode = mode |
| if seek: |
| self.fh.seek(seek) |
| self.path = path |
| self.fs = fs |
| self.closed = False |
| self.autocommit = autocommit |
|
|
| def __reduce__(self): |
| |
| return ( |
| LocalTempFile, |
| (self.fs, self.path, self.fn, "r+b", self.autocommit, self.tell()), |
| ) |
|
|
| def __enter__(self): |
| return self.fh |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.close() |
|
|
| def close(self): |
| if self.closed: |
| return |
| self.fh.close() |
| self.closed = True |
| if self.autocommit: |
| self.commit() |
|
|
| def discard(self): |
| self.fh.close() |
| os.remove(self.fn) |
|
|
| def commit(self): |
| self.fs.put(self.fn, self.path) |
| try: |
| os.remove(self.fn) |
| except (PermissionError, FileNotFoundError): |
| |
| pass |
|
|
| @property |
| def name(self): |
| return self.fn |
|
|
| def __getattr__(self, item): |
| return getattr(self.fh, item) |
|
|