Spaces:
Running
Running
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license | |
# Copyright (C) 2001-2017 Nominum, Inc. | |
# | |
# Permission to use, copy, modify, and distribute this software and its | |
# documentation for any purpose with or without fee is hereby granted, | |
# provided that the above copyright notice and this permission notice | |
# appear in all copies. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES | |
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR | |
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | |
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT | |
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
"""DNS rdata.""" | |
import base64 | |
import binascii | |
import inspect | |
import io | |
import itertools | |
import random | |
from importlib import import_module | |
from typing import Any, Dict, Optional, Tuple, Union | |
import dns.exception | |
import dns.immutable | |
import dns.ipv4 | |
import dns.ipv6 | |
import dns.name | |
import dns.rdataclass | |
import dns.rdatatype | |
import dns.tokenizer | |
import dns.ttl | |
import dns.wire | |
_chunksize = 32 | |
# We currently allow comparisons for rdata with relative names for backwards | |
# compatibility, but in the future we will not, as these kinds of comparisons | |
# can lead to subtle bugs if code is not carefully written. | |
# | |
# This switch allows the future behavior to be turned on so code can be | |
# tested with it. | |
_allow_relative_comparisons = True | |
class NoRelativeRdataOrdering(dns.exception.DNSException): | |
"""An attempt was made to do an ordered comparison of one or more | |
rdata with relative names. The only reliable way of sorting rdata | |
is to use non-relativized rdata. | |
""" | |
def _wordbreak(data, chunksize=_chunksize, separator=b" "): | |
"""Break a binary string into chunks of chunksize characters separated by | |
a space. | |
""" | |
if not chunksize: | |
return data.decode() | |
return separator.join( | |
[data[i : i + chunksize] for i in range(0, len(data), chunksize)] | |
).decode() | |
# pylint: disable=unused-argument | |
def _hexify(data, chunksize=_chunksize, separator=b" ", **kw): | |
"""Convert a binary string into its hex encoding, broken up into chunks | |
of chunksize characters separated by a separator. | |
""" | |
return _wordbreak(binascii.hexlify(data), chunksize, separator) | |
def _base64ify(data, chunksize=_chunksize, separator=b" ", **kw): | |
"""Convert a binary string into its base64 encoding, broken up into chunks | |
of chunksize characters separated by a separator. | |
""" | |
return _wordbreak(base64.b64encode(data), chunksize, separator) | |
# pylint: enable=unused-argument | |
__escaped = b'"\\' | |
def _escapify(qstring): | |
"""Escape the characters in a quoted string which need it.""" | |
if isinstance(qstring, str): | |
qstring = qstring.encode() | |
if not isinstance(qstring, bytearray): | |
qstring = bytearray(qstring) | |
text = "" | |
for c in qstring: | |
if c in __escaped: | |
text += "\\" + chr(c) | |
elif c >= 0x20 and c < 0x7F: | |
text += chr(c) | |
else: | |
text += "\\%03d" % c | |
return text | |
def _truncate_bitmap(what): | |
"""Determine the index of greatest byte that isn't all zeros, and | |
return the bitmap that contains all the bytes less than that index. | |
""" | |
for i in range(len(what) - 1, -1, -1): | |
if what[i] != 0: | |
return what[0 : i + 1] | |
return what[0:1] | |
# So we don't have to edit all the rdata classes... | |
_constify = dns.immutable.constify | |
class Rdata: | |
"""Base class for all DNS rdata types.""" | |
__slots__ = ["rdclass", "rdtype", "rdcomment"] | |
def __init__(self, rdclass, rdtype): | |
"""Initialize an rdata. | |
*rdclass*, an ``int`` is the rdataclass of the Rdata. | |
*rdtype*, an ``int`` is the rdatatype of the Rdata. | |
""" | |
self.rdclass = self._as_rdataclass(rdclass) | |
self.rdtype = self._as_rdatatype(rdtype) | |
self.rdcomment = None | |
def _get_all_slots(self): | |
return itertools.chain.from_iterable( | |
getattr(cls, "__slots__", []) for cls in self.__class__.__mro__ | |
) | |
def __getstate__(self): | |
# We used to try to do a tuple of all slots here, but it | |
# doesn't work as self._all_slots isn't available at | |
# __setstate__() time. Before that we tried to store a tuple | |
# of __slots__, but that didn't work as it didn't store the | |
# slots defined by ancestors. This older way didn't fail | |
# outright, but ended up with partially broken objects, e.g. | |
# if you unpickled an A RR it wouldn't have rdclass and rdtype | |
# attributes, and would compare badly. | |
state = {} | |
for slot in self._get_all_slots(): | |
state[slot] = getattr(self, slot) | |
return state | |
def __setstate__(self, state): | |
for slot, val in state.items(): | |
object.__setattr__(self, slot, val) | |
if not hasattr(self, "rdcomment"): | |
# Pickled rdata from 2.0.x might not have a rdcomment, so add | |
# it if needed. | |
object.__setattr__(self, "rdcomment", None) | |
def covers(self) -> dns.rdatatype.RdataType: | |
"""Return the type a Rdata covers. | |
DNS SIG/RRSIG rdatas apply to a specific type; this type is | |
returned by the covers() function. If the rdata type is not | |
SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when | |
creating rdatasets, allowing the rdataset to contain only RRSIGs | |
of a particular type, e.g. RRSIG(NS). | |
Returns a ``dns.rdatatype.RdataType``. | |
""" | |
return dns.rdatatype.NONE | |
def extended_rdatatype(self) -> int: | |
"""Return a 32-bit type value, the least significant 16 bits of | |
which are the ordinary DNS type, and the upper 16 bits of which are | |
the "covered" type, if any. | |
Returns an ``int``. | |
""" | |
return self.covers() << 16 | self.rdtype | |
def to_text( | |
self, | |
origin: Optional[dns.name.Name] = None, | |
relativize: bool = True, | |
**kw: Dict[str, Any], | |
) -> str: | |
"""Convert an rdata to text format. | |
Returns a ``str``. | |
""" | |
raise NotImplementedError # pragma: no cover | |
def _to_wire( | |
self, | |
file: Optional[Any], | |
compress: Optional[dns.name.CompressType] = None, | |
origin: Optional[dns.name.Name] = None, | |
canonicalize: bool = False, | |
) -> bytes: | |
raise NotImplementedError # pragma: no cover | |
def to_wire( | |
self, | |
file: Optional[Any] = None, | |
compress: Optional[dns.name.CompressType] = None, | |
origin: Optional[dns.name.Name] = None, | |
canonicalize: bool = False, | |
) -> bytes: | |
"""Convert an rdata to wire format. | |
Returns a ``bytes`` or ``None``. | |
""" | |
if file: | |
return self._to_wire(file, compress, origin, canonicalize) | |
else: | |
f = io.BytesIO() | |
self._to_wire(f, compress, origin, canonicalize) | |
return f.getvalue() | |
def to_generic( | |
self, origin: Optional[dns.name.Name] = None | |
) -> "dns.rdata.GenericRdata": | |
"""Creates a dns.rdata.GenericRdata equivalent of this rdata. | |
Returns a ``dns.rdata.GenericRdata``. | |
""" | |
return dns.rdata.GenericRdata( | |
self.rdclass, self.rdtype, self.to_wire(origin=origin) | |
) | |
def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes: | |
"""Convert rdata to a format suitable for digesting in hashes. This | |
is also the DNSSEC canonical form. | |
Returns a ``bytes``. | |
""" | |
return self.to_wire(origin=origin, canonicalize=True) | |
def __repr__(self): | |
covers = self.covers() | |
if covers == dns.rdatatype.NONE: | |
ctext = "" | |
else: | |
ctext = "(" + dns.rdatatype.to_text(covers) + ")" | |
return ( | |
"<DNS " | |
+ dns.rdataclass.to_text(self.rdclass) | |
+ " " | |
+ dns.rdatatype.to_text(self.rdtype) | |
+ ctext | |
+ " rdata: " | |
+ str(self) | |
+ ">" | |
) | |
def __str__(self): | |
return self.to_text() | |
def _cmp(self, other): | |
"""Compare an rdata with another rdata of the same rdtype and | |
rdclass. | |
For rdata with only absolute names: | |
Return < 0 if self < other in the DNSSEC ordering, 0 if self | |
== other, and > 0 if self > other. | |
For rdata with at least one relative names: | |
The rdata sorts before any rdata with only absolute names. | |
When compared with another relative rdata, all names are | |
made absolute as if they were relative to the root, as the | |
proper origin is not available. While this creates a stable | |
ordering, it is NOT guaranteed to be the DNSSEC ordering. | |
In the future, all ordering comparisons for rdata with | |
relative names will be disallowed. | |
""" | |
try: | |
our = self.to_digestable() | |
our_relative = False | |
except dns.name.NeedAbsoluteNameOrOrigin: | |
if _allow_relative_comparisons: | |
our = self.to_digestable(dns.name.root) | |
our_relative = True | |
try: | |
their = other.to_digestable() | |
their_relative = False | |
except dns.name.NeedAbsoluteNameOrOrigin: | |
if _allow_relative_comparisons: | |
their = other.to_digestable(dns.name.root) | |
their_relative = True | |
if _allow_relative_comparisons: | |
if our_relative != their_relative: | |
# For the purpose of comparison, all rdata with at least one | |
# relative name is less than an rdata with only absolute names. | |
if our_relative: | |
return -1 | |
else: | |
return 1 | |
elif our_relative or their_relative: | |
raise NoRelativeRdataOrdering | |
if our == their: | |
return 0 | |
elif our > their: | |
return 1 | |
else: | |
return -1 | |
def __eq__(self, other): | |
if not isinstance(other, Rdata): | |
return False | |
if self.rdclass != other.rdclass or self.rdtype != other.rdtype: | |
return False | |
our_relative = False | |
their_relative = False | |
try: | |
our = self.to_digestable() | |
except dns.name.NeedAbsoluteNameOrOrigin: | |
our = self.to_digestable(dns.name.root) | |
our_relative = True | |
try: | |
their = other.to_digestable() | |
except dns.name.NeedAbsoluteNameOrOrigin: | |
their = other.to_digestable(dns.name.root) | |
their_relative = True | |
if our_relative != their_relative: | |
return False | |
return our == their | |
def __ne__(self, other): | |
if not isinstance(other, Rdata): | |
return True | |
if self.rdclass != other.rdclass or self.rdtype != other.rdtype: | |
return True | |
return not self.__eq__(other) | |
def __lt__(self, other): | |
if ( | |
not isinstance(other, Rdata) | |
or self.rdclass != other.rdclass | |
or self.rdtype != other.rdtype | |
): | |
return NotImplemented | |
return self._cmp(other) < 0 | |
def __le__(self, other): | |
if ( | |
not isinstance(other, Rdata) | |
or self.rdclass != other.rdclass | |
or self.rdtype != other.rdtype | |
): | |
return NotImplemented | |
return self._cmp(other) <= 0 | |
def __ge__(self, other): | |
if ( | |
not isinstance(other, Rdata) | |
or self.rdclass != other.rdclass | |
or self.rdtype != other.rdtype | |
): | |
return NotImplemented | |
return self._cmp(other) >= 0 | |
def __gt__(self, other): | |
if ( | |
not isinstance(other, Rdata) | |
or self.rdclass != other.rdclass | |
or self.rdtype != other.rdtype | |
): | |
return NotImplemented | |
return self._cmp(other) > 0 | |
def __hash__(self): | |
return hash(self.to_digestable(dns.name.root)) | |
def from_text( | |
cls, | |
rdclass: dns.rdataclass.RdataClass, | |
rdtype: dns.rdatatype.RdataType, | |
tok: dns.tokenizer.Tokenizer, | |
origin: Optional[dns.name.Name] = None, | |
relativize: bool = True, | |
relativize_to: Optional[dns.name.Name] = None, | |
) -> "Rdata": | |
raise NotImplementedError # pragma: no cover | |
def from_wire_parser( | |
cls, | |
rdclass: dns.rdataclass.RdataClass, | |
rdtype: dns.rdatatype.RdataType, | |
parser: dns.wire.Parser, | |
origin: Optional[dns.name.Name] = None, | |
) -> "Rdata": | |
raise NotImplementedError # pragma: no cover | |
def replace(self, **kwargs: Any) -> "Rdata": | |
""" | |
Create a new Rdata instance based on the instance replace was | |
invoked on. It is possible to pass different parameters to | |
override the corresponding properties of the base Rdata. | |
Any field specific to the Rdata type can be replaced, but the | |
*rdtype* and *rdclass* fields cannot. | |
Returns an instance of the same Rdata subclass as *self*. | |
""" | |
# Get the constructor parameters. | |
parameters = inspect.signature(self.__init__).parameters # type: ignore | |
# Ensure that all of the arguments correspond to valid fields. | |
# Don't allow rdclass or rdtype to be changed, though. | |
for key in kwargs: | |
if key == "rdcomment": | |
continue | |
if key not in parameters: | |
raise AttributeError( | |
"'{}' object has no attribute '{}'".format( | |
self.__class__.__name__, key | |
) | |
) | |
if key in ("rdclass", "rdtype"): | |
raise AttributeError( | |
"Cannot overwrite '{}' attribute '{}'".format( | |
self.__class__.__name__, key | |
) | |
) | |
# Construct the parameter list. For each field, use the value in | |
# kwargs if present, and the current value otherwise. | |
args = (kwargs.get(key, getattr(self, key)) for key in parameters) | |
# Create, validate, and return the new object. | |
rd = self.__class__(*args) | |
# The comment is not set in the constructor, so give it special | |
# handling. | |
rdcomment = kwargs.get("rdcomment", self.rdcomment) | |
if rdcomment is not None: | |
object.__setattr__(rd, "rdcomment", rdcomment) | |
return rd | |
# Type checking and conversion helpers. These are class methods as | |
# they don't touch object state and may be useful to others. | |
def _as_rdataclass(cls, value): | |
return dns.rdataclass.RdataClass.make(value) | |
def _as_rdatatype(cls, value): | |
return dns.rdatatype.RdataType.make(value) | |
def _as_bytes( | |
cls, | |
value: Any, | |
encode: bool = False, | |
max_length: Optional[int] = None, | |
empty_ok: bool = True, | |
) -> bytes: | |
if encode and isinstance(value, str): | |
bvalue = value.encode() | |
elif isinstance(value, bytearray): | |
bvalue = bytes(value) | |
elif isinstance(value, bytes): | |
bvalue = value | |
else: | |
raise ValueError("not bytes") | |
if max_length is not None and len(bvalue) > max_length: | |
raise ValueError("too long") | |
if not empty_ok and len(bvalue) == 0: | |
raise ValueError("empty bytes not allowed") | |
return bvalue | |
def _as_name(cls, value): | |
# Note that proper name conversion (e.g. with origin and IDNA | |
# awareness) is expected to be done via from_text. This is just | |
# a simple thing for people invoking the constructor directly. | |
if isinstance(value, str): | |
return dns.name.from_text(value) | |
elif not isinstance(value, dns.name.Name): | |
raise ValueError("not a name") | |
return value | |
def _as_uint8(cls, value): | |
if not isinstance(value, int): | |
raise ValueError("not an integer") | |
if value < 0 or value > 255: | |
raise ValueError("not a uint8") | |
return value | |
def _as_uint16(cls, value): | |
if not isinstance(value, int): | |
raise ValueError("not an integer") | |
if value < 0 or value > 65535: | |
raise ValueError("not a uint16") | |
return value | |
def _as_uint32(cls, value): | |
if not isinstance(value, int): | |
raise ValueError("not an integer") | |
if value < 0 or value > 4294967295: | |
raise ValueError("not a uint32") | |
return value | |
def _as_uint48(cls, value): | |
if not isinstance(value, int): | |
raise ValueError("not an integer") | |
if value < 0 or value > 281474976710655: | |
raise ValueError("not a uint48") | |
return value | |
def _as_int(cls, value, low=None, high=None): | |
if not isinstance(value, int): | |
raise ValueError("not an integer") | |
if low is not None and value < low: | |
raise ValueError("value too small") | |
if high is not None and value > high: | |
raise ValueError("value too large") | |
return value | |
def _as_ipv4_address(cls, value): | |
if isinstance(value, str): | |
return dns.ipv4.canonicalize(value) | |
elif isinstance(value, bytes): | |
return dns.ipv4.inet_ntoa(value) | |
else: | |
raise ValueError("not an IPv4 address") | |
def _as_ipv6_address(cls, value): | |
if isinstance(value, str): | |
return dns.ipv6.canonicalize(value) | |
elif isinstance(value, bytes): | |
return dns.ipv6.inet_ntoa(value) | |
else: | |
raise ValueError("not an IPv6 address") | |
def _as_bool(cls, value): | |
if isinstance(value, bool): | |
return value | |
else: | |
raise ValueError("not a boolean") | |
def _as_ttl(cls, value): | |
if isinstance(value, int): | |
return cls._as_int(value, 0, dns.ttl.MAX_TTL) | |
elif isinstance(value, str): | |
return dns.ttl.from_text(value) | |
else: | |
raise ValueError("not a TTL") | |
def _as_tuple(cls, value, as_value): | |
try: | |
# For user convenience, if value is a singleton of the list | |
# element type, wrap it in a tuple. | |
return (as_value(value),) | |
except Exception: | |
# Otherwise, check each element of the iterable *value* | |
# against *as_value*. | |
return tuple(as_value(v) for v in value) | |
# Processing order | |
def _processing_order(cls, iterable): | |
items = list(iterable) | |
random.shuffle(items) | |
return items | |
class GenericRdata(Rdata): | |
"""Generic Rdata Class | |
This class is used for rdata types for which we have no better | |
implementation. It implements the DNS "unknown RRs" scheme. | |
""" | |
__slots__ = ["data"] | |
def __init__(self, rdclass, rdtype, data): | |
super().__init__(rdclass, rdtype) | |
self.data = data | |
def to_text( | |
self, | |
origin: Optional[dns.name.Name] = None, | |
relativize: bool = True, | |
**kw: Dict[str, Any], | |
) -> str: | |
return r"\# %d " % len(self.data) + _hexify(self.data, **kw) | |
def from_text( | |
cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None | |
): | |
token = tok.get() | |
if not token.is_identifier() or token.value != r"\#": | |
raise dns.exception.SyntaxError(r"generic rdata does not start with \#") | |
length = tok.get_int() | |
hex = tok.concatenate_remaining_identifiers(True).encode() | |
data = binascii.unhexlify(hex) | |
if len(data) != length: | |
raise dns.exception.SyntaxError("generic rdata hex data has wrong length") | |
return cls(rdclass, rdtype, data) | |
def _to_wire(self, file, compress=None, origin=None, canonicalize=False): | |
file.write(self.data) | |
def from_wire_parser(cls, rdclass, rdtype, parser, origin=None): | |
return cls(rdclass, rdtype, parser.get_remaining()) | |
_rdata_classes: Dict[Tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any] = ( | |
{} | |
) | |
_module_prefix = "dns.rdtypes" | |
def get_rdata_class(rdclass, rdtype): | |
cls = _rdata_classes.get((rdclass, rdtype)) | |
if not cls: | |
cls = _rdata_classes.get((dns.rdatatype.ANY, rdtype)) | |
if not cls: | |
rdclass_text = dns.rdataclass.to_text(rdclass) | |
rdtype_text = dns.rdatatype.to_text(rdtype) | |
rdtype_text = rdtype_text.replace("-", "_") | |
try: | |
mod = import_module( | |
".".join([_module_prefix, rdclass_text, rdtype_text]) | |
) | |
cls = getattr(mod, rdtype_text) | |
_rdata_classes[(rdclass, rdtype)] = cls | |
except ImportError: | |
try: | |
mod = import_module(".".join([_module_prefix, "ANY", rdtype_text])) | |
cls = getattr(mod, rdtype_text) | |
_rdata_classes[(dns.rdataclass.ANY, rdtype)] = cls | |
_rdata_classes[(rdclass, rdtype)] = cls | |
except ImportError: | |
pass | |
if not cls: | |
cls = GenericRdata | |
_rdata_classes[(rdclass, rdtype)] = cls | |
return cls | |
def from_text( | |
rdclass: Union[dns.rdataclass.RdataClass, str], | |
rdtype: Union[dns.rdatatype.RdataType, str], | |
tok: Union[dns.tokenizer.Tokenizer, str], | |
origin: Optional[dns.name.Name] = None, | |
relativize: bool = True, | |
relativize_to: Optional[dns.name.Name] = None, | |
idna_codec: Optional[dns.name.IDNACodec] = None, | |
) -> Rdata: | |
"""Build an rdata object from text format. | |
This function attempts to dynamically load a class which | |
implements the specified rdata class and type. If there is no | |
class-and-type-specific implementation, the GenericRdata class | |
is used. | |
Once a class is chosen, its from_text() class method is called | |
with the parameters to this function. | |
If *tok* is a ``str``, then a tokenizer is created and the string | |
is used as its input. | |
*rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass. | |
*rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype. | |
*tok*, a ``dns.tokenizer.Tokenizer`` or a ``str``. | |
*origin*, a ``dns.name.Name`` (or ``None``), the | |
origin to use for relative names. | |
*relativize*, a ``bool``. If true, name will be relativized. | |
*relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use | |
when relativizing names. If not set, the *origin* value will be used. | |
*idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA | |
encoder/decoder to use if a tokenizer needs to be created. If | |
``None``, the default IDNA 2003 encoder/decoder is used. If a | |
tokenizer is not created, then the codec associated with the tokenizer | |
is the one that is used. | |
Returns an instance of the chosen Rdata subclass. | |
""" | |
if isinstance(tok, str): | |
tok = dns.tokenizer.Tokenizer(tok, idna_codec=idna_codec) | |
rdclass = dns.rdataclass.RdataClass.make(rdclass) | |
rdtype = dns.rdatatype.RdataType.make(rdtype) | |
cls = get_rdata_class(rdclass, rdtype) | |
with dns.exception.ExceptionWrapper(dns.exception.SyntaxError): | |
rdata = None | |
if cls != GenericRdata: | |
# peek at first token | |
token = tok.get() | |
tok.unget(token) | |
if token.is_identifier() and token.value == r"\#": | |
# | |
# Known type using the generic syntax. Extract the | |
# wire form from the generic syntax, and then run | |
# from_wire on it. | |
# | |
grdata = GenericRdata.from_text( | |
rdclass, rdtype, tok, origin, relativize, relativize_to | |
) | |
rdata = from_wire( | |
rdclass, rdtype, grdata.data, 0, len(grdata.data), origin | |
) | |
# | |
# If this comparison isn't equal, then there must have been | |
# compressed names in the wire format, which is an error, | |
# there being no reasonable context to decompress with. | |
# | |
rwire = rdata.to_wire() | |
if rwire != grdata.data: | |
raise dns.exception.SyntaxError( | |
"compressed data in " | |
"generic syntax form " | |
"of known rdatatype" | |
) | |
if rdata is None: | |
rdata = cls.from_text( | |
rdclass, rdtype, tok, origin, relativize, relativize_to | |
) | |
token = tok.get_eol_as_token() | |
if token.comment is not None: | |
object.__setattr__(rdata, "rdcomment", token.comment) | |
return rdata | |
def from_wire_parser( | |
rdclass: Union[dns.rdataclass.RdataClass, str], | |
rdtype: Union[dns.rdatatype.RdataType, str], | |
parser: dns.wire.Parser, | |
origin: Optional[dns.name.Name] = None, | |
) -> Rdata: | |
"""Build an rdata object from wire format | |
This function attempts to dynamically load a class which | |
implements the specified rdata class and type. If there is no | |
class-and-type-specific implementation, the GenericRdata class | |
is used. | |
Once a class is chosen, its from_wire() class method is called | |
with the parameters to this function. | |
*rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass. | |
*rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype. | |
*parser*, a ``dns.wire.Parser``, the parser, which should be | |
restricted to the rdata length. | |
*origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, | |
then names will be relativized to this origin. | |
Returns an instance of the chosen Rdata subclass. | |
""" | |
rdclass = dns.rdataclass.RdataClass.make(rdclass) | |
rdtype = dns.rdatatype.RdataType.make(rdtype) | |
cls = get_rdata_class(rdclass, rdtype) | |
with dns.exception.ExceptionWrapper(dns.exception.FormError): | |
return cls.from_wire_parser(rdclass, rdtype, parser, origin) | |
def from_wire( | |
rdclass: Union[dns.rdataclass.RdataClass, str], | |
rdtype: Union[dns.rdatatype.RdataType, str], | |
wire: bytes, | |
current: int, | |
rdlen: int, | |
origin: Optional[dns.name.Name] = None, | |
) -> Rdata: | |
"""Build an rdata object from wire format | |
This function attempts to dynamically load a class which | |
implements the specified rdata class and type. If there is no | |
class-and-type-specific implementation, the GenericRdata class | |
is used. | |
Once a class is chosen, its from_wire() class method is called | |
with the parameters to this function. | |
*rdclass*, an ``int``, the rdataclass. | |
*rdtype*, an ``int``, the rdatatype. | |
*wire*, a ``bytes``, the wire-format message. | |
*current*, an ``int``, the offset in wire of the beginning of | |
the rdata. | |
*rdlen*, an ``int``, the length of the wire-format rdata | |
*origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, | |
then names will be relativized to this origin. | |
Returns an instance of the chosen Rdata subclass. | |
""" | |
parser = dns.wire.Parser(wire, current) | |
with parser.restrict_to(rdlen): | |
return from_wire_parser(rdclass, rdtype, parser, origin) | |
class RdatatypeExists(dns.exception.DNSException): | |
"""DNS rdatatype already exists.""" | |
supp_kwargs = {"rdclass", "rdtype"} | |
fmt = ( | |
"The rdata type with class {rdclass:d} and rdtype {rdtype:d} " | |
+ "already exists." | |
) | |
def register_type( | |
implementation: Any, | |
rdtype: int, | |
rdtype_text: str, | |
is_singleton: bool = False, | |
rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN, | |
) -> None: | |
"""Dynamically register a module to handle an rdatatype. | |
*implementation*, a module implementing the type in the usual dnspython | |
way. | |
*rdtype*, an ``int``, the rdatatype to register. | |
*rdtype_text*, a ``str``, the textual form of the rdatatype. | |
*is_singleton*, a ``bool``, indicating if the type is a singleton (i.e. | |
RRsets of the type can have only one member.) | |
*rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if | |
it applies to all classes. | |
""" | |
rdtype = dns.rdatatype.RdataType.make(rdtype) | |
existing_cls = get_rdata_class(rdclass, rdtype) | |
if existing_cls != GenericRdata or dns.rdatatype.is_metatype(rdtype): | |
raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype) | |
_rdata_classes[(rdclass, rdtype)] = getattr( | |
implementation, rdtype_text.replace("-", "_") | |
) | |
dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton) | |