Spaces:
Running
Running
"""Various base classes.""" | |
from types import coroutine | |
from collections.abc import Coroutine | |
from asyncio import get_running_loop | |
class AsyncBase: | |
def __init__(self, file, loop, executor): | |
self._file = file | |
self._executor = executor | |
self._ref_loop = loop | |
def _loop(self): | |
return self._ref_loop or get_running_loop() | |
def __aiter__(self): | |
"""We are our own iterator.""" | |
return self | |
def __repr__(self): | |
return super().__repr__() + " wrapping " + repr(self._file) | |
async def __anext__(self): | |
"""Simulate normal file iteration.""" | |
line = await self.readline() | |
if line: | |
return line | |
else: | |
raise StopAsyncIteration | |
class AsyncIndirectBase(AsyncBase): | |
def __init__(self, name, loop, executor, indirect): | |
self._indirect = indirect | |
self._name = name | |
super().__init__(None, loop, executor) | |
def _file(self): | |
return self._indirect() | |
def _file(self, v): | |
pass # discard writes | |
class _ContextManager(Coroutine): | |
__slots__ = ("_coro", "_obj") | |
def __init__(self, coro): | |
self._coro = coro | |
self._obj = None | |
def send(self, value): | |
return self._coro.send(value) | |
def throw(self, typ, val=None, tb=None): | |
if val is None: | |
return self._coro.throw(typ) | |
elif tb is None: | |
return self._coro.throw(typ, val) | |
else: | |
return self._coro.throw(typ, val, tb) | |
def close(self): | |
return self._coro.close() | |
def gi_frame(self): | |
return self._coro.gi_frame | |
def gi_running(self): | |
return self._coro.gi_running | |
def gi_code(self): | |
return self._coro.gi_code | |
def __next__(self): | |
return self.send(None) | |
def __iter__(self): | |
resp = yield from self._coro | |
return resp | |
def __await__(self): | |
resp = yield from self._coro | |
return resp | |
async def __anext__(self): | |
resp = await self._coro | |
return resp | |
async def __aenter__(self): | |
self._obj = await self._coro | |
return self._obj | |
async def __aexit__(self, exc_type, exc, tb): | |
self._obj.close() | |
self._obj = None | |
class AiofilesContextManager(_ContextManager): | |
"""An adjusted async context manager for aiofiles.""" | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
await get_running_loop().run_in_executor( | |
None, self._obj._file.__exit__, exc_type, exc_val, exc_tb | |
) | |
self._obj = None | |