|
""" |
|
Implementations for the history of a `Buffer`. |
|
|
|
NOTE: There is no `DynamicHistory`: |
|
This doesn't work well, because the `Buffer` needs to be able to attach |
|
an event handler to the event when a history entry is loaded. This |
|
loading can be done asynchronously and making the history swappable would |
|
probably break this. |
|
""" |
|
from __future__ import annotations |
|
|
|
import datetime |
|
import os |
|
import threading |
|
from abc import ABCMeta, abstractmethod |
|
from asyncio import get_running_loop |
|
from typing import AsyncGenerator, Iterable, Sequence |
|
|
|
__all__ = [ |
|
"History", |
|
"ThreadedHistory", |
|
"DummyHistory", |
|
"FileHistory", |
|
"InMemoryHistory", |
|
] |
|
|
|
|
|
class History(metaclass=ABCMeta): |
|
""" |
|
Base ``History`` class. |
|
|
|
This also includes abstract methods for loading/storing history. |
|
""" |
|
|
|
def __init__(self) -> None: |
|
|
|
self._loaded = False |
|
|
|
|
|
|
|
self._loaded_strings: list[str] = [] |
|
|
|
|
|
|
|
|
|
|
|
async def load(self) -> AsyncGenerator[str, None]: |
|
""" |
|
Load the history and yield all the entries in reverse order (latest, |
|
most recent history entry first). |
|
|
|
This method can be called multiple times from the `Buffer` to |
|
repopulate the history when prompting for a new input. So we are |
|
responsible here for both caching, and making sure that strings that |
|
were were appended to the history will be incorporated next time this |
|
method is called. |
|
""" |
|
if not self._loaded: |
|
self._loaded_strings = list(self.load_history_strings()) |
|
self._loaded = True |
|
|
|
for item in self._loaded_strings: |
|
yield item |
|
|
|
def get_strings(self) -> list[str]: |
|
""" |
|
Get the strings from the history that are loaded so far. |
|
(In order. Oldest item first.) |
|
""" |
|
return self._loaded_strings[::-1] |
|
|
|
def append_string(self, string: str) -> None: |
|
"Add string to the history." |
|
self._loaded_strings.insert(0, string) |
|
self.store_string(string) |
|
|
|
|
|
|
|
|
|
|
|
@abstractmethod |
|
def load_history_strings(self) -> Iterable[str]: |
|
""" |
|
This should be a generator that yields `str` instances. |
|
|
|
It should yield the most recent items first, because they are the most |
|
important. (The history can already be used, even when it's only |
|
partially loaded.) |
|
""" |
|
while False: |
|
yield |
|
|
|
@abstractmethod |
|
def store_string(self, string: str) -> None: |
|
""" |
|
Store the string in persistent storage. |
|
""" |
|
|
|
|
|
class ThreadedHistory(History): |
|
""" |
|
Wrapper around `History` implementations that run the `load()` generator in |
|
a thread. |
|
|
|
Use this to increase the start-up time of prompt_toolkit applications. |
|
History entries are available as soon as they are loaded. We don't have to |
|
wait for everything to be loaded. |
|
""" |
|
|
|
def __init__(self, history: History) -> None: |
|
super().__init__() |
|
|
|
self.history = history |
|
|
|
self._load_thread: threading.Thread | None = None |
|
|
|
|
|
|
|
self._lock = threading.Lock() |
|
|
|
|
|
|
|
self._string_load_events: list[threading.Event] = [] |
|
|
|
async def load(self) -> AsyncGenerator[str, None]: |
|
""" |
|
Like `History.load(), but call `self.load_history_strings()` in a |
|
background thread. |
|
""" |
|
|
|
if not self._load_thread: |
|
self._load_thread = threading.Thread( |
|
target=self._in_load_thread, |
|
daemon=True, |
|
) |
|
self._load_thread.start() |
|
|
|
|
|
loop = get_running_loop() |
|
|
|
|
|
event = threading.Event() |
|
event.set() |
|
self._string_load_events.append(event) |
|
|
|
items_yielded = 0 |
|
|
|
try: |
|
while True: |
|
|
|
|
|
|
|
|
|
|
|
|
|
got_timeout = await loop.run_in_executor( |
|
None, lambda: event.wait(timeout=0.5) |
|
) |
|
if not got_timeout: |
|
continue |
|
|
|
|
|
def in_executor() -> tuple[list[str], bool]: |
|
with self._lock: |
|
new_items = self._loaded_strings[items_yielded:] |
|
done = self._loaded |
|
event.clear() |
|
return new_items, done |
|
|
|
new_items, done = await loop.run_in_executor(None, in_executor) |
|
|
|
items_yielded += len(new_items) |
|
|
|
for item in new_items: |
|
yield item |
|
|
|
if done: |
|
break |
|
finally: |
|
self._string_load_events.remove(event) |
|
|
|
def _in_load_thread(self) -> None: |
|
try: |
|
|
|
|
|
|
|
self._loaded_strings = [] |
|
|
|
for item in self.history.load_history_strings(): |
|
with self._lock: |
|
self._loaded_strings.append(item) |
|
|
|
for event in self._string_load_events: |
|
event.set() |
|
finally: |
|
with self._lock: |
|
self._loaded = True |
|
for event in self._string_load_events: |
|
event.set() |
|
|
|
def append_string(self, string: str) -> None: |
|
with self._lock: |
|
self._loaded_strings.insert(0, string) |
|
self.store_string(string) |
|
|
|
|
|
|
|
def load_history_strings(self) -> Iterable[str]: |
|
return self.history.load_history_strings() |
|
|
|
def store_string(self, string: str) -> None: |
|
self.history.store_string(string) |
|
|
|
def __repr__(self) -> str: |
|
return f"ThreadedHistory({self.history!r})" |
|
|
|
|
|
class InMemoryHistory(History): |
|
""" |
|
:class:`.History` class that keeps a list of all strings in memory. |
|
|
|
In order to prepopulate the history, it's possible to call either |
|
`append_string` for all items or pass a list of strings to `__init__` here. |
|
""" |
|
|
|
def __init__(self, history_strings: Sequence[str] | None = None) -> None: |
|
super().__init__() |
|
|
|
if history_strings is None: |
|
self._storage = [] |
|
else: |
|
self._storage = list(history_strings) |
|
|
|
def load_history_strings(self) -> Iterable[str]: |
|
yield from self._storage[::-1] |
|
|
|
def store_string(self, string: str) -> None: |
|
self._storage.append(string) |
|
|
|
|
|
class DummyHistory(History): |
|
""" |
|
:class:`.History` object that doesn't remember anything. |
|
""" |
|
|
|
def load_history_strings(self) -> Iterable[str]: |
|
return [] |
|
|
|
def store_string(self, string: str) -> None: |
|
pass |
|
|
|
def append_string(self, string: str) -> None: |
|
|
|
pass |
|
|
|
|
|
class FileHistory(History): |
|
""" |
|
:class:`.History` class that stores all strings in a file. |
|
""" |
|
|
|
def __init__(self, filename: str) -> None: |
|
self.filename = filename |
|
super().__init__() |
|
|
|
def load_history_strings(self) -> Iterable[str]: |
|
strings: list[str] = [] |
|
lines: list[str] = [] |
|
|
|
def add() -> None: |
|
if lines: |
|
|
|
string = "".join(lines)[:-1] |
|
|
|
strings.append(string) |
|
|
|
if os.path.exists(self.filename): |
|
with open(self.filename, "rb") as f: |
|
for line_bytes in f: |
|
line = line_bytes.decode("utf-8", errors="replace") |
|
|
|
if line.startswith("+"): |
|
lines.append(line[1:]) |
|
else: |
|
add() |
|
lines = [] |
|
|
|
add() |
|
|
|
|
|
return reversed(strings) |
|
|
|
def store_string(self, string: str) -> None: |
|
|
|
with open(self.filename, "ab") as f: |
|
|
|
def write(t: str) -> None: |
|
f.write(t.encode("utf-8")) |
|
|
|
write("\n# %s\n" % datetime.datetime.now()) |
|
for line in string.split("\n"): |
|
write("+%s\n" % line) |
|
|