chatbot / iterators /iterator_pipe.py
kelvin-t-lu's picture
init
dbd2ac6
import queue
import asyncio
class IteratorPipe:
"""
Iterator Pipe creates an iterator that can be fed in data from another block of code or thread of execution
"""
def __init__(self, sentinel=object()):
self._q = queue.Queue()
self._sentinel = sentinel
self._sentinel_pushed = False
self._closed = False
def __iter__(self):
return self
def __next__(self):
if self._closed:
raise StopIteration
data = self._q.get(block=True)
if data is self._sentinel:
self._closed = True
raise StopIteration
return data
def put(self, data) -> bool:
"""
Pushes next item to Iterator and returns True
If iterator has been closed via close(), doesn't push anything and returns False
"""
if self._sentinel_pushed:
return False
self._q.put(data)
return True
def close(self):
"""
Close is idempotent. Calling close multiple times is safe
Iterator will raise StopIteration only after all elements pushed before close have been iterated
"""
# make close idempotent
if not self._sentinel_pushed:
self._sentinel_pushed = True
self._q.put(self._sentinel)
class AsyncIteratorPipe:
def __init__(self, sentinel=object()):
self._q = asyncio.Queue()
self._sentinel = sentinel
self._sentinel_pushed = False
self._closed = False
def __aiter__(self):
return self
async def __anext__(self):
if self._closed:
raise StopAsyncIteration
data = await self._q.get()
if data is self._sentinel:
self._closed = True
raise StopAsyncIteration
return data
async def put(self, data) -> bool:
"""
Pushes next item to Iterator and returns True
If iterator has been closed via close(), doesn't push anything and returns False
"""
if self._sentinel_pushed:
return False
await self._q.put(data)
return True
async def close(self):
"""
Close is idempotent. Calling close multiple times is safe
Iterator will raise StopIteration only after all elements pushed before close have been iterated
"""
# make close idempotent
if not self._sentinel_pushed:
self._sentinel_pushed = True
await self._q.put(self._sentinel)