Spaces:
Runtime error
Runtime error
"""Async executor versions of file functions from the os module.""" | |
import asyncio | |
from functools import partial, wraps | |
import os | |
def wrap(func): | |
async def run(*args, loop=None, executor=None, **kwargs): | |
if loop is None: | |
loop = asyncio.get_running_loop() | |
pfunc = partial(func, *args, **kwargs) | |
return await loop.run_in_executor(executor, pfunc) | |
return run | |
from . import ospath as path | |
stat = wrap(os.stat) | |
rename = wrap(os.rename) | |
renames = wrap(os.renames) | |
replace = wrap(os.replace) | |
remove = wrap(os.remove) | |
unlink = wrap(os.unlink) | |
mkdir = wrap(os.mkdir) | |
makedirs = wrap(os.makedirs) | |
rmdir = wrap(os.rmdir) | |
removedirs = wrap(os.removedirs) | |
link = wrap(os.link) | |
symlink = wrap(os.symlink) | |
readlink = wrap(os.readlink) | |
listdir = wrap(os.listdir) | |
scandir = wrap(os.scandir) | |
access = wrap(os.access) | |
if hasattr(os, "sendfile"): | |
sendfile = wrap(os.sendfile) | |