|
import argparse |
|
import logging |
|
import os |
|
import stat |
|
import threading |
|
import time |
|
from errno import EIO, ENOENT |
|
|
|
from fuse import FUSE, FuseOSError, LoggingMixIn, Operations |
|
|
|
from fsspec import __version__ |
|
from fsspec.core import url_to_fs |
|
|
|
logger = logging.getLogger("fsspec.fuse") |
|
|
|
|
|
class FUSEr(Operations): |
|
def __init__(self, fs, path, ready_file=False): |
|
self.fs = fs |
|
self.cache = {} |
|
self.root = path.rstrip("/") + "/" |
|
self.counter = 0 |
|
logger.info("Starting FUSE at %s", path) |
|
self._ready_file = ready_file |
|
|
|
def getattr(self, path, fh=None): |
|
logger.debug("getattr %s", path) |
|
if self._ready_file and path in ["/.fuse_ready", ".fuse_ready"]: |
|
return {"type": "file", "st_size": 5} |
|
|
|
path = "".join([self.root, path.lstrip("/")]).rstrip("/") |
|
try: |
|
info = self.fs.info(path) |
|
except FileNotFoundError as exc: |
|
raise FuseOSError(ENOENT) from exc |
|
|
|
data = {"st_uid": info.get("uid", 1000), "st_gid": info.get("gid", 1000)} |
|
perm = info.get("mode", 0o777) |
|
|
|
if info["type"] != "file": |
|
data["st_mode"] = stat.S_IFDIR | perm |
|
data["st_size"] = 0 |
|
data["st_blksize"] = 0 |
|
else: |
|
data["st_mode"] = stat.S_IFREG | perm |
|
data["st_size"] = info["size"] |
|
data["st_blksize"] = 5 * 2**20 |
|
data["st_nlink"] = 1 |
|
data["st_atime"] = info["atime"] if "atime" in info else time.time() |
|
data["st_ctime"] = info["ctime"] if "ctime" in info else time.time() |
|
data["st_mtime"] = info["mtime"] if "mtime" in info else time.time() |
|
return data |
|
|
|
def readdir(self, path, fh): |
|
logger.debug("readdir %s", path) |
|
path = "".join([self.root, path.lstrip("/")]) |
|
files = self.fs.ls(path, False) |
|
files = [os.path.basename(f.rstrip("/")) for f in files] |
|
return [".", ".."] + files |
|
|
|
def mkdir(self, path, mode): |
|
path = "".join([self.root, path.lstrip("/")]) |
|
self.fs.mkdir(path) |
|
return 0 |
|
|
|
def rmdir(self, path): |
|
path = "".join([self.root, path.lstrip("/")]) |
|
self.fs.rmdir(path) |
|
return 0 |
|
|
|
def read(self, path, size, offset, fh): |
|
logger.debug("read %s", (path, size, offset)) |
|
if self._ready_file and path in ["/.fuse_ready", ".fuse_ready"]: |
|
|
|
return b"ready" |
|
|
|
f = self.cache[fh] |
|
f.seek(offset) |
|
out = f.read(size) |
|
return out |
|
|
|
def write(self, path, data, offset, fh): |
|
logger.debug("write %s", (path, offset)) |
|
f = self.cache[fh] |
|
f.seek(offset) |
|
f.write(data) |
|
return len(data) |
|
|
|
def create(self, path, flags, fi=None): |
|
logger.debug("create %s", (path, flags)) |
|
fn = "".join([self.root, path.lstrip("/")]) |
|
self.fs.touch(fn) |
|
f = self.fs.open(fn, "wb") |
|
self.cache[self.counter] = f |
|
self.counter += 1 |
|
return self.counter - 1 |
|
|
|
def open(self, path, flags): |
|
logger.debug("open %s", (path, flags)) |
|
fn = "".join([self.root, path.lstrip("/")]) |
|
if flags % 2 == 0: |
|
|
|
mode = "rb" |
|
else: |
|
|
|
mode = "wb" |
|
self.cache[self.counter] = self.fs.open(fn, mode) |
|
self.counter += 1 |
|
return self.counter - 1 |
|
|
|
def truncate(self, path, length, fh=None): |
|
fn = "".join([self.root, path.lstrip("/")]) |
|
if length != 0: |
|
raise NotImplementedError |
|
|
|
self.fs.touch(fn) |
|
|
|
def unlink(self, path): |
|
fn = "".join([self.root, path.lstrip("/")]) |
|
try: |
|
self.fs.rm(fn, False) |
|
except (OSError, FileNotFoundError) as exc: |
|
raise FuseOSError(EIO) from exc |
|
|
|
def release(self, path, fh): |
|
try: |
|
if fh in self.cache: |
|
f = self.cache[fh] |
|
f.close() |
|
self.cache.pop(fh) |
|
except Exception as e: |
|
print(e) |
|
return 0 |
|
|
|
def chmod(self, path, mode): |
|
if hasattr(self.fs, "chmod"): |
|
path = "".join([self.root, path.lstrip("/")]) |
|
return self.fs.chmod(path, mode) |
|
raise NotImplementedError |
|
|
|
|
|
def run( |
|
fs, |
|
path, |
|
mount_point, |
|
foreground=True, |
|
threads=False, |
|
ready_file=False, |
|
ops_class=FUSEr, |
|
): |
|
"""Mount stuff in a local directory |
|
|
|
This uses fusepy to make it appear as if a given path on an fsspec |
|
instance is in fact resident within the local file-system. |
|
|
|
This requires that fusepy by installed, and that FUSE be available on |
|
the system (typically requiring a package to be installed with |
|
apt, yum, brew, etc.). |
|
|
|
Parameters |
|
---------- |
|
fs: file-system instance |
|
From one of the compatible implementations |
|
path: str |
|
Location on that file-system to regard as the root directory to |
|
mount. Note that you typically should include the terminating "/" |
|
character. |
|
mount_point: str |
|
An empty directory on the local file-system where the contents of |
|
the remote path will appear. |
|
foreground: bool |
|
Whether or not calling this function will block. Operation will |
|
typically be more stable if True. |
|
threads: bool |
|
Whether or not to create threads when responding to file operations |
|
within the mounter directory. Operation will typically be more |
|
stable if False. |
|
ready_file: bool |
|
Whether the FUSE process is ready. The ``.fuse_ready`` file will |
|
exist in the ``mount_point`` directory if True. Debugging purpose. |
|
ops_class: FUSEr or Subclass of FUSEr |
|
To override the default behavior of FUSEr. For Example, logging |
|
to file. |
|
|
|
""" |
|
func = lambda: FUSE( |
|
ops_class(fs, path, ready_file=ready_file), |
|
mount_point, |
|
nothreads=not threads, |
|
foreground=foreground, |
|
) |
|
if not foreground: |
|
th = threading.Thread(target=func) |
|
th.daemon = True |
|
th.start() |
|
return th |
|
else: |
|
try: |
|
func() |
|
except KeyboardInterrupt: |
|
pass |
|
|
|
|
|
def main(args): |
|
"""Mount filesystem from chained URL to MOUNT_POINT. |
|
|
|
Examples: |
|
|
|
python3 -m fsspec.fuse memory /usr/share /tmp/mem |
|
|
|
python3 -m fsspec.fuse local /tmp/source /tmp/local \\ |
|
-l /tmp/fsspecfuse.log |
|
|
|
You can also mount chained-URLs and use special settings: |
|
|
|
python3 -m fsspec.fuse 'filecache::zip::file://data.zip' \\ |
|
/ /tmp/zip \\ |
|
-o 'filecache-cache_storage=/tmp/simplecache' |
|
|
|
You can specify the type of the setting by using `[int]` or `[bool]`, |
|
(`true`, `yes`, `1` represents the Boolean value `True`): |
|
|
|
python3 -m fsspec.fuse 'simplecache::ftp://ftp1.at.proftpd.org' \\ |
|
/historic/packages/RPMS /tmp/ftp \\ |
|
-o 'simplecache-cache_storage=/tmp/simplecache' \\ |
|
-o 'simplecache-check_files=false[bool]' \\ |
|
-o 'ftp-listings_expiry_time=60[int]' \\ |
|
-o 'ftp-username=anonymous' \\ |
|
-o 'ftp-password=xieyanbo' |
|
""" |
|
|
|
class RawDescriptionArgumentParser(argparse.ArgumentParser): |
|
def format_help(self): |
|
usage = super().format_help() |
|
parts = usage.split("\n\n") |
|
parts[1] = self.description.rstrip() |
|
return "\n\n".join(parts) |
|
|
|
parser = RawDescriptionArgumentParser(prog="fsspec.fuse", description=main.__doc__) |
|
parser.add_argument("--version", action="version", version=__version__) |
|
parser.add_argument("url", type=str, help="fs url") |
|
parser.add_argument("source_path", type=str, help="source directory in fs") |
|
parser.add_argument("mount_point", type=str, help="local directory") |
|
parser.add_argument( |
|
"-o", |
|
"--option", |
|
action="append", |
|
help="Any options of protocol included in the chained URL", |
|
) |
|
parser.add_argument( |
|
"-l", "--log-file", type=str, help="Logging FUSE debug info (Default: '')" |
|
) |
|
parser.add_argument( |
|
"-f", |
|
"--foreground", |
|
action="store_false", |
|
help="Running in foreground or not (Default: False)", |
|
) |
|
parser.add_argument( |
|
"-t", |
|
"--threads", |
|
action="store_false", |
|
help="Running with threads support (Default: False)", |
|
) |
|
parser.add_argument( |
|
"-r", |
|
"--ready-file", |
|
action="store_false", |
|
help="The `.fuse_ready` file will exist after FUSE is ready. " |
|
"(Debugging purpose, Default: False)", |
|
) |
|
args = parser.parse_args(args) |
|
|
|
kwargs = {} |
|
for item in args.option or []: |
|
key, sep, value = item.partition("=") |
|
if not sep: |
|
parser.error(message=f"Wrong option: {item!r}") |
|
val = value.lower() |
|
if val.endswith("[int]"): |
|
value = int(value[: -len("[int]")]) |
|
elif val.endswith("[bool]"): |
|
value = val[: -len("[bool]")] in ["1", "yes", "true"] |
|
|
|
if "-" in key: |
|
fs_name, setting_name = key.split("-", 1) |
|
if fs_name in kwargs: |
|
kwargs[fs_name][setting_name] = value |
|
else: |
|
kwargs[fs_name] = {setting_name: value} |
|
else: |
|
kwargs[key] = value |
|
|
|
if args.log_file: |
|
logging.basicConfig( |
|
level=logging.DEBUG, |
|
filename=args.log_file, |
|
format="%(asctime)s %(message)s", |
|
) |
|
|
|
class LoggingFUSEr(FUSEr, LoggingMixIn): |
|
pass |
|
|
|
fuser = LoggingFUSEr |
|
else: |
|
fuser = FUSEr |
|
|
|
fs, url_path = url_to_fs(args.url, **kwargs) |
|
logger.debug("Mounting %s to %s", url_path, str(args.mount_point)) |
|
run( |
|
fs, |
|
args.source_path, |
|
args.mount_point, |
|
foreground=args.foreground, |
|
threads=args.threads, |
|
ready_file=args.ready_file, |
|
ops_class=fuser, |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
import sys |
|
|
|
main(sys.argv[1:]) |
|
|