Spaces:
Runtime error
Runtime error
""" | |
babel.messages.pofile | |
~~~~~~~~~~~~~~~~~~~~~ | |
Reading and writing of files in the ``gettext`` PO (portable object) | |
format. | |
:copyright: (c) 2013-2023 by the Babel Team. | |
:license: BSD, see LICENSE for more details. | |
""" | |
from __future__ import annotations | |
import os | |
import re | |
from collections.abc import Iterable | |
from typing import TYPE_CHECKING | |
from babel.core import Locale | |
from babel.messages.catalog import Catalog, Message | |
from babel.util import _cmp, wraptext | |
if TYPE_CHECKING: | |
from typing import IO, AnyStr | |
from _typeshed import SupportsWrite | |
from typing_extensions import Literal | |
def unescape(string: str) -> str: | |
r"""Reverse `escape` the given string. | |
>>> print(unescape('"Say:\\n \\"hello, world!\\"\\n"')) | |
Say: | |
"hello, world!" | |
<BLANKLINE> | |
:param string: the string to unescape | |
""" | |
def replace_escapes(match): | |
m = match.group(1) | |
if m == 'n': | |
return '\n' | |
elif m == 't': | |
return '\t' | |
elif m == 'r': | |
return '\r' | |
# m is \ or " | |
return m | |
return re.compile(r'\\([\\trn"])').sub(replace_escapes, string[1:-1]) | |
def denormalize(string: str) -> str: | |
r"""Reverse the normalization done by the `normalize` function. | |
>>> print(denormalize(r'''"" | |
... "Say:\n" | |
... " \"hello, world!\"\n"''')) | |
Say: | |
"hello, world!" | |
<BLANKLINE> | |
>>> print(denormalize(r'''"" | |
... "Say:\n" | |
... " \"Lorem ipsum dolor sit " | |
... "amet, consectetur adipisicing" | |
... " elit, \"\n"''')) | |
Say: | |
"Lorem ipsum dolor sit amet, consectetur adipisicing elit, " | |
<BLANKLINE> | |
:param string: the string to denormalize | |
""" | |
if '\n' in string: | |
escaped_lines = string.splitlines() | |
if string.startswith('""'): | |
escaped_lines = escaped_lines[1:] | |
lines = map(unescape, escaped_lines) | |
return ''.join(lines) | |
else: | |
return unescape(string) | |
class PoFileError(Exception): | |
"""Exception thrown by PoParser when an invalid po file is encountered.""" | |
def __init__(self, message: str, catalog: Catalog, line: str, lineno: int) -> None: | |
super().__init__(f'{message} on {lineno}') | |
self.catalog = catalog | |
self.line = line | |
self.lineno = lineno | |
class _NormalizedString: | |
def __init__(self, *args: str) -> None: | |
self._strs: list[str] = [] | |
for arg in args: | |
self.append(arg) | |
def append(self, s: str) -> None: | |
self._strs.append(s.strip()) | |
def denormalize(self) -> str: | |
return ''.join(map(unescape, self._strs)) | |
def __bool__(self) -> bool: | |
return bool(self._strs) | |
def __repr__(self) -> str: | |
return os.linesep.join(self._strs) | |
def __cmp__(self, other: object) -> int: | |
if not other: | |
return 1 | |
return _cmp(str(self), str(other)) | |
def __gt__(self, other: object) -> bool: | |
return self.__cmp__(other) > 0 | |
def __lt__(self, other: object) -> bool: | |
return self.__cmp__(other) < 0 | |
def __ge__(self, other: object) -> bool: | |
return self.__cmp__(other) >= 0 | |
def __le__(self, other: object) -> bool: | |
return self.__cmp__(other) <= 0 | |
def __eq__(self, other: object) -> bool: | |
return self.__cmp__(other) == 0 | |
def __ne__(self, other: object) -> bool: | |
return self.__cmp__(other) != 0 | |
class PoFileParser: | |
"""Support class to read messages from a ``gettext`` PO (portable object) file | |
and add them to a `Catalog` | |
See `read_po` for simple cases. | |
""" | |
_keywords = [ | |
'msgid', | |
'msgstr', | |
'msgctxt', | |
'msgid_plural', | |
] | |
def __init__(self, catalog: Catalog, ignore_obsolete: bool = False, abort_invalid: bool = False) -> None: | |
self.catalog = catalog | |
self.ignore_obsolete = ignore_obsolete | |
self.counter = 0 | |
self.offset = 0 | |
self.abort_invalid = abort_invalid | |
self._reset_message_state() | |
def _reset_message_state(self) -> None: | |
self.messages = [] | |
self.translations = [] | |
self.locations = [] | |
self.flags = [] | |
self.user_comments = [] | |
self.auto_comments = [] | |
self.context = None | |
self.obsolete = False | |
self.in_msgid = False | |
self.in_msgstr = False | |
self.in_msgctxt = False | |
def _add_message(self) -> None: | |
""" | |
Add a message to the catalog based on the current parser state and | |
clear the state ready to process the next message. | |
""" | |
self.translations.sort() | |
if len(self.messages) > 1: | |
msgid = tuple(m.denormalize() for m in self.messages) | |
else: | |
msgid = self.messages[0].denormalize() | |
if isinstance(msgid, (list, tuple)): | |
string = ['' for _ in range(self.catalog.num_plurals)] | |
for idx, translation in self.translations: | |
if idx >= self.catalog.num_plurals: | |
self._invalid_pofile("", self.offset, "msg has more translations than num_plurals of catalog") | |
continue | |
string[idx] = translation.denormalize() | |
string = tuple(string) | |
else: | |
string = self.translations[0][1].denormalize() | |
msgctxt = self.context.denormalize() if self.context else None | |
message = Message(msgid, string, list(self.locations), set(self.flags), | |
self.auto_comments, self.user_comments, lineno=self.offset + 1, | |
context=msgctxt) | |
if self.obsolete: | |
if not self.ignore_obsolete: | |
self.catalog.obsolete[msgid] = message | |
else: | |
self.catalog[msgid] = message | |
self.counter += 1 | |
self._reset_message_state() | |
def _finish_current_message(self) -> None: | |
if self.messages: | |
self._add_message() | |
def _process_message_line(self, lineno, line, obsolete=False) -> None: | |
if line.startswith('"'): | |
self._process_string_continuation_line(line, lineno) | |
else: | |
self._process_keyword_line(lineno, line, obsolete) | |
def _process_keyword_line(self, lineno, line, obsolete=False) -> None: | |
for keyword in self._keywords: | |
try: | |
if line.startswith(keyword) and line[len(keyword)] in [' ', '[']: | |
arg = line[len(keyword):] | |
break | |
except IndexError: | |
self._invalid_pofile(line, lineno, "Keyword must be followed by a string") | |
else: | |
self._invalid_pofile(line, lineno, "Start of line didn't match any expected keyword.") | |
return | |
if keyword in ['msgid', 'msgctxt']: | |
self._finish_current_message() | |
self.obsolete = obsolete | |
# The line that has the msgid is stored as the offset of the msg | |
# should this be the msgctxt if it has one? | |
if keyword == 'msgid': | |
self.offset = lineno | |
if keyword in ['msgid', 'msgid_plural']: | |
self.in_msgctxt = False | |
self.in_msgid = True | |
self.messages.append(_NormalizedString(arg)) | |
elif keyword == 'msgstr': | |
self.in_msgid = False | |
self.in_msgstr = True | |
if arg.startswith('['): | |
idx, msg = arg[1:].split(']', 1) | |
self.translations.append([int(idx), _NormalizedString(msg)]) | |
else: | |
self.translations.append([0, _NormalizedString(arg)]) | |
elif keyword == 'msgctxt': | |
self.in_msgctxt = True | |
self.context = _NormalizedString(arg) | |
def _process_string_continuation_line(self, line, lineno) -> None: | |
if self.in_msgid: | |
s = self.messages[-1] | |
elif self.in_msgstr: | |
s = self.translations[-1][1] | |
elif self.in_msgctxt: | |
s = self.context | |
else: | |
self._invalid_pofile(line, lineno, "Got line starting with \" but not in msgid, msgstr or msgctxt") | |
return | |
s.append(line) | |
def _process_comment(self, line) -> None: | |
self._finish_current_message() | |
if line[1:].startswith(':'): | |
for location in line[2:].lstrip().split(): | |
pos = location.rfind(':') | |
if pos >= 0: | |
try: | |
lineno = int(location[pos + 1:]) | |
except ValueError: | |
continue | |
self.locations.append((location[:pos], lineno)) | |
else: | |
self.locations.append((location, None)) | |
elif line[1:].startswith(','): | |
for flag in line[2:].lstrip().split(','): | |
self.flags.append(flag.strip()) | |
elif line[1:].startswith('.'): | |
# These are called auto-comments | |
comment = line[2:].strip() | |
if comment: # Just check that we're not adding empty comments | |
self.auto_comments.append(comment) | |
else: | |
# These are called user comments | |
self.user_comments.append(line[1:].strip()) | |
def parse(self, fileobj: IO[AnyStr]) -> None: | |
""" | |
Reads from the file-like object `fileobj` and adds any po file | |
units found in it to the `Catalog` supplied to the constructor. | |
""" | |
for lineno, line in enumerate(fileobj): | |
line = line.strip() | |
if not isinstance(line, str): | |
line = line.decode(self.catalog.charset) | |
if not line: | |
continue | |
if line.startswith('#'): | |
if line[1:].startswith('~'): | |
self._process_message_line(lineno, line[2:].lstrip(), obsolete=True) | |
else: | |
self._process_comment(line) | |
else: | |
self._process_message_line(lineno, line) | |
self._finish_current_message() | |
# No actual messages found, but there was some info in comments, from which | |
# we'll construct an empty header message | |
if not self.counter and (self.flags or self.user_comments or self.auto_comments): | |
self.messages.append(_NormalizedString('""')) | |
self.translations.append([0, _NormalizedString('""')]) | |
self._add_message() | |
def _invalid_pofile(self, line, lineno, msg) -> None: | |
assert isinstance(line, str) | |
if self.abort_invalid: | |
raise PoFileError(msg, self.catalog, line, lineno) | |
print("WARNING:", msg) | |
print(f"WARNING: Problem on line {lineno + 1}: {line!r}") | |
def read_po( | |
fileobj: IO[AnyStr], | |
locale: str | Locale | None = None, | |
domain: str | None = None, | |
ignore_obsolete: bool = False, | |
charset: str | None = None, | |
abort_invalid: bool = False, | |
) -> Catalog: | |
"""Read messages from a ``gettext`` PO (portable object) file from the given | |
file-like object and return a `Catalog`. | |
>>> from datetime import datetime | |
>>> from io import StringIO | |
>>> buf = StringIO(''' | |
... #: main.py:1 | |
... #, fuzzy, python-format | |
... msgid "foo %(name)s" | |
... msgstr "quux %(name)s" | |
... | |
... # A user comment | |
... #. An auto comment | |
... #: main.py:3 | |
... msgid "bar" | |
... msgid_plural "baz" | |
... msgstr[0] "bar" | |
... msgstr[1] "baaz" | |
... ''') | |
>>> catalog = read_po(buf) | |
>>> catalog.revision_date = datetime(2007, 4, 1) | |
>>> for message in catalog: | |
... if message.id: | |
... print((message.id, message.string)) | |
... print(' ', (message.locations, sorted(list(message.flags)))) | |
... print(' ', (message.user_comments, message.auto_comments)) | |
(u'foo %(name)s', u'quux %(name)s') | |
([(u'main.py', 1)], [u'fuzzy', u'python-format']) | |
([], []) | |
((u'bar', u'baz'), (u'bar', u'baaz')) | |
([(u'main.py', 3)], []) | |
([u'A user comment'], [u'An auto comment']) | |
.. versionadded:: 1.0 | |
Added support for explicit charset argument. | |
:param fileobj: the file-like object to read the PO file from | |
:param locale: the locale identifier or `Locale` object, or `None` | |
if the catalog is not bound to a locale (which basically | |
means it's a template) | |
:param domain: the message domain | |
:param ignore_obsolete: whether to ignore obsolete messages in the input | |
:param charset: the character set of the catalog. | |
:param abort_invalid: abort read if po file is invalid | |
""" | |
catalog = Catalog(locale=locale, domain=domain, charset=charset) | |
parser = PoFileParser(catalog, ignore_obsolete, abort_invalid=abort_invalid) | |
parser.parse(fileobj) | |
return catalog | |
WORD_SEP = re.compile('(' | |
r'\s+|' # any whitespace | |
r'[^\s\w]*\w+[a-zA-Z]-(?=\w+[a-zA-Z])|' # hyphenated words | |
r'(?<=[\w\!\"\'\&\.\,\?])-{2,}(?=\w)' # em-dash | |
')') | |
def escape(string: str) -> str: | |
r"""Escape the given string so that it can be included in double-quoted | |
strings in ``PO`` files. | |
>>> escape('''Say: | |
... "hello, world!" | |
... ''') | |
'"Say:\\n \\"hello, world!\\"\\n"' | |
:param string: the string to escape | |
""" | |
return '"%s"' % string.replace('\\', '\\\\') \ | |
.replace('\t', '\\t') \ | |
.replace('\r', '\\r') \ | |
.replace('\n', '\\n') \ | |
.replace('\"', '\\"') | |
def normalize(string: str, prefix: str = '', width: int = 76) -> str: | |
r"""Convert a string into a format that is appropriate for .po files. | |
>>> print(normalize('''Say: | |
... "hello, world!" | |
... ''', width=None)) | |
"" | |
"Say:\n" | |
" \"hello, world!\"\n" | |
>>> print(normalize('''Say: | |
... "Lorem ipsum dolor sit amet, consectetur adipisicing elit, " | |
... ''', width=32)) | |
"" | |
"Say:\n" | |
" \"Lorem ipsum dolor sit " | |
"amet, consectetur adipisicing" | |
" elit, \"\n" | |
:param string: the string to normalize | |
:param prefix: a string that should be prepended to every line | |
:param width: the maximum line width; use `None`, 0, or a negative number | |
to completely disable line wrapping | |
""" | |
if width and width > 0: | |
prefixlen = len(prefix) | |
lines = [] | |
for line in string.splitlines(True): | |
if len(escape(line)) + prefixlen > width: | |
chunks = WORD_SEP.split(line) | |
chunks.reverse() | |
while chunks: | |
buf = [] | |
size = 2 | |
while chunks: | |
length = len(escape(chunks[-1])) - 2 + prefixlen | |
if size + length < width: | |
buf.append(chunks.pop()) | |
size += length | |
else: | |
if not buf: | |
# handle long chunks by putting them on a | |
# separate line | |
buf.append(chunks.pop()) | |
break | |
lines.append(''.join(buf)) | |
else: | |
lines.append(line) | |
else: | |
lines = string.splitlines(True) | |
if len(lines) <= 1: | |
return escape(string) | |
# Remove empty trailing line | |
if lines and not lines[-1]: | |
del lines[-1] | |
lines[-1] += '\n' | |
return '""\n' + '\n'.join([(prefix + escape(line)) for line in lines]) | |
def write_po( | |
fileobj: SupportsWrite[bytes], | |
catalog: Catalog, | |
width: int = 76, | |
no_location: bool = False, | |
omit_header: bool = False, | |
sort_output: bool = False, | |
sort_by_file: bool = False, | |
ignore_obsolete: bool = False, | |
include_previous: bool = False, | |
include_lineno: bool = True, | |
) -> None: | |
r"""Write a ``gettext`` PO (portable object) template file for a given | |
message catalog to the provided file-like object. | |
>>> catalog = Catalog() | |
>>> catalog.add(u'foo %(name)s', locations=[('main.py', 1)], | |
... flags=('fuzzy',)) | |
<Message...> | |
>>> catalog.add((u'bar', u'baz'), locations=[('main.py', 3)]) | |
<Message...> | |
>>> from io import BytesIO | |
>>> buf = BytesIO() | |
>>> write_po(buf, catalog, omit_header=True) | |
>>> print(buf.getvalue().decode("utf8")) | |
#: main.py:1 | |
#, fuzzy, python-format | |
msgid "foo %(name)s" | |
msgstr "" | |
<BLANKLINE> | |
#: main.py:3 | |
msgid "bar" | |
msgid_plural "baz" | |
msgstr[0] "" | |
msgstr[1] "" | |
<BLANKLINE> | |
<BLANKLINE> | |
:param fileobj: the file-like object to write to | |
:param catalog: the `Catalog` instance | |
:param width: the maximum line width for the generated output; use `None`, | |
0, or a negative number to completely disable line wrapping | |
:param no_location: do not emit a location comment for every message | |
:param omit_header: do not include the ``msgid ""`` entry at the top of the | |
output | |
:param sort_output: whether to sort the messages in the output by msgid | |
:param sort_by_file: whether to sort the messages in the output by their | |
locations | |
:param ignore_obsolete: whether to ignore obsolete messages and not include | |
them in the output; by default they are included as | |
comments | |
:param include_previous: include the old msgid as a comment when | |
updating the catalog | |
:param include_lineno: include line number in the location comment | |
""" | |
def _normalize(key, prefix=''): | |
return normalize(key, prefix=prefix, width=width) | |
def _write(text): | |
if isinstance(text, str): | |
text = text.encode(catalog.charset, 'backslashreplace') | |
fileobj.write(text) | |
def _write_comment(comment, prefix=''): | |
# xgettext always wraps comments even if --no-wrap is passed; | |
# provide the same behaviour | |
_width = width if width and width > 0 else 76 | |
for line in wraptext(comment, _width): | |
_write(f"#{prefix} {line.strip()}\n") | |
def _write_message(message, prefix=''): | |
if isinstance(message.id, (list, tuple)): | |
if message.context: | |
_write(f"{prefix}msgctxt {_normalize(message.context, prefix)}\n") | |
_write(f"{prefix}msgid {_normalize(message.id[0], prefix)}\n") | |
_write(f"{prefix}msgid_plural {_normalize(message.id[1], prefix)}\n") | |
for idx in range(catalog.num_plurals): | |
try: | |
string = message.string[idx] | |
except IndexError: | |
string = '' | |
_write(f"{prefix}msgstr[{idx:d}] {_normalize(string, prefix)}\n") | |
else: | |
if message.context: | |
_write(f"{prefix}msgctxt {_normalize(message.context, prefix)}\n") | |
_write(f"{prefix}msgid {_normalize(message.id, prefix)}\n") | |
_write(f"{prefix}msgstr {_normalize(message.string or '', prefix)}\n") | |
sort_by = None | |
if sort_output: | |
sort_by = "message" | |
elif sort_by_file: | |
sort_by = "location" | |
for message in _sort_messages(catalog, sort_by=sort_by): | |
if not message.id: # This is the header "message" | |
if omit_header: | |
continue | |
comment_header = catalog.header_comment | |
if width and width > 0: | |
lines = [] | |
for line in comment_header.splitlines(): | |
lines += wraptext(line, width=width, | |
subsequent_indent='# ') | |
comment_header = '\n'.join(lines) | |
_write(f"{comment_header}\n") | |
for comment in message.user_comments: | |
_write_comment(comment) | |
for comment in message.auto_comments: | |
_write_comment(comment, prefix='.') | |
if not no_location: | |
locs = [] | |
# sort locations by filename and lineno. | |
# if there's no <int> as lineno, use `-1`. | |
# if no sorting possible, leave unsorted. | |
# (see issue #606) | |
try: | |
locations = sorted(message.locations, | |
key=lambda x: (x[0], isinstance(x[1], int) and x[1] or -1)) | |
except TypeError: # e.g. "TypeError: unorderable types: NoneType() < int()" | |
locations = message.locations | |
for filename, lineno in locations: | |
location = filename.replace(os.sep, '/') | |
if lineno and include_lineno: | |
location = f"{location}:{lineno:d}" | |
if location not in locs: | |
locs.append(location) | |
_write_comment(' '.join(locs), prefix=':') | |
if message.flags: | |
_write(f"#{', '.join(['', *sorted(message.flags)])}\n") | |
if message.previous_id and include_previous: | |
_write_comment( | |
f'msgid {_normalize(message.previous_id[0])}', | |
prefix='|', | |
) | |
if len(message.previous_id) > 1: | |
_write_comment('msgid_plural %s' % _normalize( | |
message.previous_id[1] | |
), prefix='|') | |
_write_message(message) | |
_write('\n') | |
if not ignore_obsolete: | |
for message in _sort_messages( | |
catalog.obsolete.values(), | |
sort_by=sort_by | |
): | |
for comment in message.user_comments: | |
_write_comment(comment) | |
_write_message(message, prefix='#~ ') | |
_write('\n') | |
def _sort_messages(messages: Iterable[Message], sort_by: Literal["message", "location"]) -> list[Message]: | |
""" | |
Sort the given message iterable by the given criteria. | |
Always returns a list. | |
:param messages: An iterable of Messages. | |
:param sort_by: Sort by which criteria? Options are `message` and `location`. | |
:return: list[Message] | |
""" | |
messages = list(messages) | |
if sort_by == "message": | |
messages.sort() | |
elif sort_by == "location": | |
messages.sort(key=lambda m: m.locations) | |
return messages | |