Spaces:
Running
Running
import codecs | |
import re | |
from typing import (IO, Iterator, Match, NamedTuple, Optional, # noqa:F401 | |
Pattern, Sequence, Tuple) | |
def make_regex(string: str, extra_flags: int = 0) -> Pattern[str]: | |
return re.compile(string, re.UNICODE | extra_flags) | |
_newline = make_regex(r"(\r\n|\n|\r)") | |
_multiline_whitespace = make_regex(r"\s*", extra_flags=re.MULTILINE) | |
_whitespace = make_regex(r"[^\S\r\n]*") | |
_export = make_regex(r"(?:export[^\S\r\n]+)?") | |
_single_quoted_key = make_regex(r"'([^']+)'") | |
_unquoted_key = make_regex(r"([^=\#\s]+)") | |
_equal_sign = make_regex(r"(=[^\S\r\n]*)") | |
_single_quoted_value = make_regex(r"'((?:\\'|[^'])*)'") | |
_double_quoted_value = make_regex(r'"((?:\\"|[^"])*)"') | |
_unquoted_value = make_regex(r"([^\r\n]*)") | |
_comment = make_regex(r"(?:[^\S\r\n]*#[^\r\n]*)?") | |
_end_of_line = make_regex(r"[^\S\r\n]*(?:\r\n|\n|\r|$)") | |
_rest_of_line = make_regex(r"[^\r\n]*(?:\r|\n|\r\n)?") | |
_double_quote_escapes = make_regex(r"\\[\\'\"abfnrtv]") | |
_single_quote_escapes = make_regex(r"\\[\\']") | |
class Original(NamedTuple): | |
string: str | |
line: int | |
class Binding(NamedTuple): | |
key: Optional[str] | |
value: Optional[str] | |
original: Original | |
error: bool | |
class Position: | |
def __init__(self, chars: int, line: int) -> None: | |
self.chars = chars | |
self.line = line | |
def start(cls) -> "Position": | |
return cls(chars=0, line=1) | |
def set(self, other: "Position") -> None: | |
self.chars = other.chars | |
self.line = other.line | |
def advance(self, string: str) -> None: | |
self.chars += len(string) | |
self.line += len(re.findall(_newline, string)) | |
class Error(Exception): | |
pass | |
class Reader: | |
def __init__(self, stream: IO[str]) -> None: | |
self.string = stream.read() | |
self.position = Position.start() | |
self.mark = Position.start() | |
def has_next(self) -> bool: | |
return self.position.chars < len(self.string) | |
def set_mark(self) -> None: | |
self.mark.set(self.position) | |
def get_marked(self) -> Original: | |
return Original( | |
string=self.string[self.mark.chars:self.position.chars], | |
line=self.mark.line, | |
) | |
def peek(self, count: int) -> str: | |
return self.string[self.position.chars:self.position.chars + count] | |
def read(self, count: int) -> str: | |
result = self.string[self.position.chars:self.position.chars + count] | |
if len(result) < count: | |
raise Error("read: End of string") | |
self.position.advance(result) | |
return result | |
def read_regex(self, regex: Pattern[str]) -> Sequence[str]: | |
match = regex.match(self.string, self.position.chars) | |
if match is None: | |
raise Error("read_regex: Pattern not found") | |
self.position.advance(self.string[match.start():match.end()]) | |
return match.groups() | |
def decode_escapes(regex: Pattern[str], string: str) -> str: | |
def decode_match(match: Match[str]) -> str: | |
return codecs.decode(match.group(0), 'unicode-escape') # type: ignore | |
return regex.sub(decode_match, string) | |
def parse_key(reader: Reader) -> Optional[str]: | |
char = reader.peek(1) | |
if char == "#": | |
return None | |
elif char == "'": | |
(key,) = reader.read_regex(_single_quoted_key) | |
else: | |
(key,) = reader.read_regex(_unquoted_key) | |
return key | |
def parse_unquoted_value(reader: Reader) -> str: | |
(part,) = reader.read_regex(_unquoted_value) | |
return re.sub(r"\s+#.*", "", part).rstrip() | |
def parse_value(reader: Reader) -> str: | |
char = reader.peek(1) | |
if char == u"'": | |
(value,) = reader.read_regex(_single_quoted_value) | |
return decode_escapes(_single_quote_escapes, value) | |
elif char == u'"': | |
(value,) = reader.read_regex(_double_quoted_value) | |
return decode_escapes(_double_quote_escapes, value) | |
elif char in (u"", u"\n", u"\r"): | |
return u"" | |
else: | |
return parse_unquoted_value(reader) | |
def parse_binding(reader: Reader) -> Binding: | |
reader.set_mark() | |
try: | |
reader.read_regex(_multiline_whitespace) | |
if not reader.has_next(): | |
return Binding( | |
key=None, | |
value=None, | |
original=reader.get_marked(), | |
error=False, | |
) | |
reader.read_regex(_export) | |
key = parse_key(reader) | |
reader.read_regex(_whitespace) | |
if reader.peek(1) == "=": | |
reader.read_regex(_equal_sign) | |
value: Optional[str] = parse_value(reader) | |
else: | |
value = None | |
reader.read_regex(_comment) | |
reader.read_regex(_end_of_line) | |
return Binding( | |
key=key, | |
value=value, | |
original=reader.get_marked(), | |
error=False, | |
) | |
except Error: | |
reader.read_regex(_rest_of_line) | |
return Binding( | |
key=None, | |
value=None, | |
original=reader.get_marked(), | |
error=True, | |
) | |
def parse_stream(stream: IO[str]) -> Iterator[Binding]: | |
reader = Reader(stream) | |
while reader.has_next(): | |
yield parse_binding(reader) | |