|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = ['Parser', 'ParserError'] |
|
|
|
from .error import MarkedYAMLError |
|
from .tokens import * |
|
from .events import * |
|
from .scanner import * |
|
|
|
class ParserError(MarkedYAMLError): |
|
pass |
|
|
|
class Parser: |
|
|
|
|
|
|
|
DEFAULT_TAGS = { |
|
'!': '!', |
|
'!!': 'tag:yaml.org,2002:', |
|
} |
|
|
|
def __init__(self): |
|
self.current_event = None |
|
self.yaml_version = None |
|
self.tag_handles = {} |
|
self.states = [] |
|
self.marks = [] |
|
self.state = self.parse_stream_start |
|
|
|
def dispose(self): |
|
|
|
self.states = [] |
|
self.state = None |
|
|
|
def check_event(self, *choices): |
|
|
|
if self.current_event is None: |
|
if self.state: |
|
self.current_event = self.state() |
|
if self.current_event is not None: |
|
if not choices: |
|
return True |
|
for choice in choices: |
|
if isinstance(self.current_event, choice): |
|
return True |
|
return False |
|
|
|
def peek_event(self): |
|
|
|
if self.current_event is None: |
|
if self.state: |
|
self.current_event = self.state() |
|
return self.current_event |
|
|
|
def get_event(self): |
|
|
|
if self.current_event is None: |
|
if self.state: |
|
self.current_event = self.state() |
|
value = self.current_event |
|
self.current_event = None |
|
return value |
|
|
|
|
|
|
|
|
|
|
|
def parse_stream_start(self): |
|
|
|
|
|
token = self.get_token() |
|
event = StreamStartEvent(token.start_mark, token.end_mark, |
|
encoding=token.encoding) |
|
|
|
|
|
self.state = self.parse_implicit_document_start |
|
|
|
return event |
|
|
|
def parse_implicit_document_start(self): |
|
|
|
|
|
if not self.check_token(DirectiveToken, DocumentStartToken, |
|
StreamEndToken): |
|
self.tag_handles = self.DEFAULT_TAGS |
|
token = self.peek_token() |
|
start_mark = end_mark = token.start_mark |
|
event = DocumentStartEvent(start_mark, end_mark, |
|
explicit=False) |
|
|
|
|
|
self.states.append(self.parse_document_end) |
|
self.state = self.parse_block_node |
|
|
|
return event |
|
|
|
else: |
|
return self.parse_document_start() |
|
|
|
def parse_document_start(self): |
|
|
|
|
|
while self.check_token(DocumentEndToken): |
|
self.get_token() |
|
|
|
|
|
if not self.check_token(StreamEndToken): |
|
token = self.peek_token() |
|
start_mark = token.start_mark |
|
version, tags = self.process_directives() |
|
if not self.check_token(DocumentStartToken): |
|
raise ParserError(None, None, |
|
"expected '<document start>', but found %r" |
|
% self.peek_token().id, |
|
self.peek_token().start_mark) |
|
token = self.get_token() |
|
end_mark = token.end_mark |
|
event = DocumentStartEvent(start_mark, end_mark, |
|
explicit=True, version=version, tags=tags) |
|
self.states.append(self.parse_document_end) |
|
self.state = self.parse_document_content |
|
else: |
|
|
|
token = self.get_token() |
|
event = StreamEndEvent(token.start_mark, token.end_mark) |
|
assert not self.states |
|
assert not self.marks |
|
self.state = None |
|
return event |
|
|
|
def parse_document_end(self): |
|
|
|
|
|
token = self.peek_token() |
|
start_mark = end_mark = token.start_mark |
|
explicit = False |
|
if self.check_token(DocumentEndToken): |
|
token = self.get_token() |
|
end_mark = token.end_mark |
|
explicit = True |
|
event = DocumentEndEvent(start_mark, end_mark, |
|
explicit=explicit) |
|
|
|
|
|
self.state = self.parse_document_start |
|
|
|
return event |
|
|
|
def parse_document_content(self): |
|
if self.check_token(DirectiveToken, |
|
DocumentStartToken, DocumentEndToken, StreamEndToken): |
|
event = self.process_empty_scalar(self.peek_token().start_mark) |
|
self.state = self.states.pop() |
|
return event |
|
else: |
|
return self.parse_block_node() |
|
|
|
def process_directives(self): |
|
self.yaml_version = None |
|
self.tag_handles = {} |
|
while self.check_token(DirectiveToken): |
|
token = self.get_token() |
|
if token.name == 'YAML': |
|
if self.yaml_version is not None: |
|
raise ParserError(None, None, |
|
"found duplicate YAML directive", token.start_mark) |
|
major, minor = token.value |
|
if major != 1: |
|
raise ParserError(None, None, |
|
"found incompatible YAML document (version 1.* is required)", |
|
token.start_mark) |
|
self.yaml_version = token.value |
|
elif token.name == 'TAG': |
|
handle, prefix = token.value |
|
if handle in self.tag_handles: |
|
raise ParserError(None, None, |
|
"duplicate tag handle %r" % handle, |
|
token.start_mark) |
|
self.tag_handles[handle] = prefix |
|
if self.tag_handles: |
|
value = self.yaml_version, self.tag_handles.copy() |
|
else: |
|
value = self.yaml_version, None |
|
for key in self.DEFAULT_TAGS: |
|
if key not in self.tag_handles: |
|
self.tag_handles[key] = self.DEFAULT_TAGS[key] |
|
return value |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_block_node(self): |
|
return self.parse_node(block=True) |
|
|
|
def parse_flow_node(self): |
|
return self.parse_node() |
|
|
|
def parse_block_node_or_indentless_sequence(self): |
|
return self.parse_node(block=True, indentless_sequence=True) |
|
|
|
def parse_node(self, block=False, indentless_sequence=False): |
|
if self.check_token(AliasToken): |
|
token = self.get_token() |
|
event = AliasEvent(token.value, token.start_mark, token.end_mark) |
|
self.state = self.states.pop() |
|
else: |
|
anchor = None |
|
tag = None |
|
start_mark = end_mark = tag_mark = None |
|
if self.check_token(AnchorToken): |
|
token = self.get_token() |
|
start_mark = token.start_mark |
|
end_mark = token.end_mark |
|
anchor = token.value |
|
if self.check_token(TagToken): |
|
token = self.get_token() |
|
tag_mark = token.start_mark |
|
end_mark = token.end_mark |
|
tag = token.value |
|
elif self.check_token(TagToken): |
|
token = self.get_token() |
|
start_mark = tag_mark = token.start_mark |
|
end_mark = token.end_mark |
|
tag = token.value |
|
if self.check_token(AnchorToken): |
|
token = self.get_token() |
|
end_mark = token.end_mark |
|
anchor = token.value |
|
if tag is not None: |
|
handle, suffix = tag |
|
if handle is not None: |
|
if handle not in self.tag_handles: |
|
raise ParserError("while parsing a node", start_mark, |
|
"found undefined tag handle %r" % handle, |
|
tag_mark) |
|
tag = self.tag_handles[handle]+suffix |
|
else: |
|
tag = suffix |
|
|
|
|
|
|
|
|
|
if start_mark is None: |
|
start_mark = end_mark = self.peek_token().start_mark |
|
event = None |
|
implicit = (tag is None or tag == '!') |
|
if indentless_sequence and self.check_token(BlockEntryToken): |
|
end_mark = self.peek_token().end_mark |
|
event = SequenceStartEvent(anchor, tag, implicit, |
|
start_mark, end_mark) |
|
self.state = self.parse_indentless_sequence_entry |
|
else: |
|
if self.check_token(ScalarToken): |
|
token = self.get_token() |
|
end_mark = token.end_mark |
|
if (token.plain and tag is None) or tag == '!': |
|
implicit = (True, False) |
|
elif tag is None: |
|
implicit = (False, True) |
|
else: |
|
implicit = (False, False) |
|
event = ScalarEvent(anchor, tag, implicit, token.value, |
|
start_mark, end_mark, style=token.style) |
|
self.state = self.states.pop() |
|
elif self.check_token(FlowSequenceStartToken): |
|
end_mark = self.peek_token().end_mark |
|
event = SequenceStartEvent(anchor, tag, implicit, |
|
start_mark, end_mark, flow_style=True) |
|
self.state = self.parse_flow_sequence_first_entry |
|
elif self.check_token(FlowMappingStartToken): |
|
end_mark = self.peek_token().end_mark |
|
event = MappingStartEvent(anchor, tag, implicit, |
|
start_mark, end_mark, flow_style=True) |
|
self.state = self.parse_flow_mapping_first_key |
|
elif block and self.check_token(BlockSequenceStartToken): |
|
end_mark = self.peek_token().start_mark |
|
event = SequenceStartEvent(anchor, tag, implicit, |
|
start_mark, end_mark, flow_style=False) |
|
self.state = self.parse_block_sequence_first_entry |
|
elif block and self.check_token(BlockMappingStartToken): |
|
end_mark = self.peek_token().start_mark |
|
event = MappingStartEvent(anchor, tag, implicit, |
|
start_mark, end_mark, flow_style=False) |
|
self.state = self.parse_block_mapping_first_key |
|
elif anchor is not None or tag is not None: |
|
|
|
|
|
event = ScalarEvent(anchor, tag, (implicit, False), '', |
|
start_mark, end_mark) |
|
self.state = self.states.pop() |
|
else: |
|
if block: |
|
node = 'block' |
|
else: |
|
node = 'flow' |
|
token = self.peek_token() |
|
raise ParserError("while parsing a %s node" % node, start_mark, |
|
"expected the node content, but found %r" % token.id, |
|
token.start_mark) |
|
return event |
|
|
|
|
|
|
|
def parse_block_sequence_first_entry(self): |
|
token = self.get_token() |
|
self.marks.append(token.start_mark) |
|
return self.parse_block_sequence_entry() |
|
|
|
def parse_block_sequence_entry(self): |
|
if self.check_token(BlockEntryToken): |
|
token = self.get_token() |
|
if not self.check_token(BlockEntryToken, BlockEndToken): |
|
self.states.append(self.parse_block_sequence_entry) |
|
return self.parse_block_node() |
|
else: |
|
self.state = self.parse_block_sequence_entry |
|
return self.process_empty_scalar(token.end_mark) |
|
if not self.check_token(BlockEndToken): |
|
token = self.peek_token() |
|
raise ParserError("while parsing a block collection", self.marks[-1], |
|
"expected <block end>, but found %r" % token.id, token.start_mark) |
|
token = self.get_token() |
|
event = SequenceEndEvent(token.start_mark, token.end_mark) |
|
self.state = self.states.pop() |
|
self.marks.pop() |
|
return event |
|
|
|
|
|
|
|
def parse_indentless_sequence_entry(self): |
|
if self.check_token(BlockEntryToken): |
|
token = self.get_token() |
|
if not self.check_token(BlockEntryToken, |
|
KeyToken, ValueToken, BlockEndToken): |
|
self.states.append(self.parse_indentless_sequence_entry) |
|
return self.parse_block_node() |
|
else: |
|
self.state = self.parse_indentless_sequence_entry |
|
return self.process_empty_scalar(token.end_mark) |
|
token = self.peek_token() |
|
event = SequenceEndEvent(token.start_mark, token.start_mark) |
|
self.state = self.states.pop() |
|
return event |
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_block_mapping_first_key(self): |
|
token = self.get_token() |
|
self.marks.append(token.start_mark) |
|
return self.parse_block_mapping_key() |
|
|
|
def parse_block_mapping_key(self): |
|
if self.check_token(KeyToken): |
|
token = self.get_token() |
|
if not self.check_token(KeyToken, ValueToken, BlockEndToken): |
|
self.states.append(self.parse_block_mapping_value) |
|
return self.parse_block_node_or_indentless_sequence() |
|
else: |
|
self.state = self.parse_block_mapping_value |
|
return self.process_empty_scalar(token.end_mark) |
|
if not self.check_token(BlockEndToken): |
|
token = self.peek_token() |
|
raise ParserError("while parsing a block mapping", self.marks[-1], |
|
"expected <block end>, but found %r" % token.id, token.start_mark) |
|
token = self.get_token() |
|
event = MappingEndEvent(token.start_mark, token.end_mark) |
|
self.state = self.states.pop() |
|
self.marks.pop() |
|
return event |
|
|
|
def parse_block_mapping_value(self): |
|
if self.check_token(ValueToken): |
|
token = self.get_token() |
|
if not self.check_token(KeyToken, ValueToken, BlockEndToken): |
|
self.states.append(self.parse_block_mapping_key) |
|
return self.parse_block_node_or_indentless_sequence() |
|
else: |
|
self.state = self.parse_block_mapping_key |
|
return self.process_empty_scalar(token.end_mark) |
|
else: |
|
self.state = self.parse_block_mapping_key |
|
token = self.peek_token() |
|
return self.process_empty_scalar(token.start_mark) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_flow_sequence_first_entry(self): |
|
token = self.get_token() |
|
self.marks.append(token.start_mark) |
|
return self.parse_flow_sequence_entry(first=True) |
|
|
|
def parse_flow_sequence_entry(self, first=False): |
|
if not self.check_token(FlowSequenceEndToken): |
|
if not first: |
|
if self.check_token(FlowEntryToken): |
|
self.get_token() |
|
else: |
|
token = self.peek_token() |
|
raise ParserError("while parsing a flow sequence", self.marks[-1], |
|
"expected ',' or ']', but got %r" % token.id, token.start_mark) |
|
|
|
if self.check_token(KeyToken): |
|
token = self.peek_token() |
|
event = MappingStartEvent(None, None, True, |
|
token.start_mark, token.end_mark, |
|
flow_style=True) |
|
self.state = self.parse_flow_sequence_entry_mapping_key |
|
return event |
|
elif not self.check_token(FlowSequenceEndToken): |
|
self.states.append(self.parse_flow_sequence_entry) |
|
return self.parse_flow_node() |
|
token = self.get_token() |
|
event = SequenceEndEvent(token.start_mark, token.end_mark) |
|
self.state = self.states.pop() |
|
self.marks.pop() |
|
return event |
|
|
|
def parse_flow_sequence_entry_mapping_key(self): |
|
token = self.get_token() |
|
if not self.check_token(ValueToken, |
|
FlowEntryToken, FlowSequenceEndToken): |
|
self.states.append(self.parse_flow_sequence_entry_mapping_value) |
|
return self.parse_flow_node() |
|
else: |
|
self.state = self.parse_flow_sequence_entry_mapping_value |
|
return self.process_empty_scalar(token.end_mark) |
|
|
|
def parse_flow_sequence_entry_mapping_value(self): |
|
if self.check_token(ValueToken): |
|
token = self.get_token() |
|
if not self.check_token(FlowEntryToken, FlowSequenceEndToken): |
|
self.states.append(self.parse_flow_sequence_entry_mapping_end) |
|
return self.parse_flow_node() |
|
else: |
|
self.state = self.parse_flow_sequence_entry_mapping_end |
|
return self.process_empty_scalar(token.end_mark) |
|
else: |
|
self.state = self.parse_flow_sequence_entry_mapping_end |
|
token = self.peek_token() |
|
return self.process_empty_scalar(token.start_mark) |
|
|
|
def parse_flow_sequence_entry_mapping_end(self): |
|
self.state = self.parse_flow_sequence_entry |
|
token = self.peek_token() |
|
return MappingEndEvent(token.start_mark, token.start_mark) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_flow_mapping_first_key(self): |
|
token = self.get_token() |
|
self.marks.append(token.start_mark) |
|
return self.parse_flow_mapping_key(first=True) |
|
|
|
def parse_flow_mapping_key(self, first=False): |
|
if not self.check_token(FlowMappingEndToken): |
|
if not first: |
|
if self.check_token(FlowEntryToken): |
|
self.get_token() |
|
else: |
|
token = self.peek_token() |
|
raise ParserError("while parsing a flow mapping", self.marks[-1], |
|
"expected ',' or '}', but got %r" % token.id, token.start_mark) |
|
if self.check_token(KeyToken): |
|
token = self.get_token() |
|
if not self.check_token(ValueToken, |
|
FlowEntryToken, FlowMappingEndToken): |
|
self.states.append(self.parse_flow_mapping_value) |
|
return self.parse_flow_node() |
|
else: |
|
self.state = self.parse_flow_mapping_value |
|
return self.process_empty_scalar(token.end_mark) |
|
elif not self.check_token(FlowMappingEndToken): |
|
self.states.append(self.parse_flow_mapping_empty_value) |
|
return self.parse_flow_node() |
|
token = self.get_token() |
|
event = MappingEndEvent(token.start_mark, token.end_mark) |
|
self.state = self.states.pop() |
|
self.marks.pop() |
|
return event |
|
|
|
def parse_flow_mapping_value(self): |
|
if self.check_token(ValueToken): |
|
token = self.get_token() |
|
if not self.check_token(FlowEntryToken, FlowMappingEndToken): |
|
self.states.append(self.parse_flow_mapping_key) |
|
return self.parse_flow_node() |
|
else: |
|
self.state = self.parse_flow_mapping_key |
|
return self.process_empty_scalar(token.end_mark) |
|
else: |
|
self.state = self.parse_flow_mapping_key |
|
token = self.peek_token() |
|
return self.process_empty_scalar(token.start_mark) |
|
|
|
def parse_flow_mapping_empty_value(self): |
|
self.state = self.parse_flow_mapping_key |
|
return self.process_empty_scalar(self.peek_token().start_mark) |
|
|
|
def process_empty_scalar(self, mark): |
|
return ScalarEvent(None, None, (True, False), '', mark, mark) |
|
|
|
|