Spaces:
Running
Running
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license | |
# Copyright (C) 2001-2017 Nominum, Inc. | |
# | |
# Permission to use, copy, modify, and distribute this software and its | |
# documentation for any purpose with or without fee is hereby granted, | |
# provided that the above copyright notice and this permission notice | |
# appear in all copies. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES | |
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR | |
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | |
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT | |
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
"""DNS rdatasets (an rdataset is a set of rdatas of a given type and class)""" | |
import io | |
import random | |
import struct | |
from typing import Any, Collection, Dict, List, Optional, Union, cast | |
import dns.exception | |
import dns.immutable | |
import dns.name | |
import dns.rdata | |
import dns.rdataclass | |
import dns.rdatatype | |
import dns.renderer | |
import dns.set | |
import dns.ttl | |
# define SimpleSet here for backwards compatibility | |
SimpleSet = dns.set.Set | |
class DifferingCovers(dns.exception.DNSException): | |
"""An attempt was made to add a DNS SIG/RRSIG whose covered type | |
is not the same as that of the other rdatas in the rdataset.""" | |
class IncompatibleTypes(dns.exception.DNSException): | |
"""An attempt was made to add DNS RR data of an incompatible type.""" | |
class Rdataset(dns.set.Set): | |
"""A DNS rdataset.""" | |
__slots__ = ["rdclass", "rdtype", "covers", "ttl"] | |
def __init__( | |
self, | |
rdclass: dns.rdataclass.RdataClass, | |
rdtype: dns.rdatatype.RdataType, | |
covers: dns.rdatatype.RdataType = dns.rdatatype.NONE, | |
ttl: int = 0, | |
): | |
"""Create a new rdataset of the specified class and type. | |
*rdclass*, a ``dns.rdataclass.RdataClass``, the rdataclass. | |
*rdtype*, an ``dns.rdatatype.RdataType``, the rdatatype. | |
*covers*, an ``dns.rdatatype.RdataType``, the covered rdatatype. | |
*ttl*, an ``int``, the TTL. | |
""" | |
super().__init__() | |
self.rdclass = rdclass | |
self.rdtype: dns.rdatatype.RdataType = rdtype | |
self.covers: dns.rdatatype.RdataType = covers | |
self.ttl = ttl | |
def _clone(self): | |
obj = super()._clone() | |
obj.rdclass = self.rdclass | |
obj.rdtype = self.rdtype | |
obj.covers = self.covers | |
obj.ttl = self.ttl | |
return obj | |
def update_ttl(self, ttl: int) -> None: | |
"""Perform TTL minimization. | |
Set the TTL of the rdataset to be the lesser of the set's current | |
TTL or the specified TTL. If the set contains no rdatas, set the TTL | |
to the specified TTL. | |
*ttl*, an ``int`` or ``str``. | |
""" | |
ttl = dns.ttl.make(ttl) | |
if len(self) == 0: | |
self.ttl = ttl | |
elif ttl < self.ttl: | |
self.ttl = ttl | |
def add( # pylint: disable=arguments-differ,arguments-renamed | |
self, rd: dns.rdata.Rdata, ttl: Optional[int] = None | |
) -> None: | |
"""Add the specified rdata to the rdataset. | |
If the optional *ttl* parameter is supplied, then | |
``self.update_ttl(ttl)`` will be called prior to adding the rdata. | |
*rd*, a ``dns.rdata.Rdata``, the rdata | |
*ttl*, an ``int``, the TTL. | |
Raises ``dns.rdataset.IncompatibleTypes`` if the type and class | |
do not match the type and class of the rdataset. | |
Raises ``dns.rdataset.DifferingCovers`` if the type is a signature | |
type and the covered type does not match that of the rdataset. | |
""" | |
# | |
# If we're adding a signature, do some special handling to | |
# check that the signature covers the same type as the | |
# other rdatas in this rdataset. If this is the first rdata | |
# in the set, initialize the covers field. | |
# | |
if self.rdclass != rd.rdclass or self.rdtype != rd.rdtype: | |
raise IncompatibleTypes | |
if ttl is not None: | |
self.update_ttl(ttl) | |
if self.rdtype == dns.rdatatype.RRSIG or self.rdtype == dns.rdatatype.SIG: | |
covers = rd.covers() | |
if len(self) == 0 and self.covers == dns.rdatatype.NONE: | |
self.covers = covers | |
elif self.covers != covers: | |
raise DifferingCovers | |
if dns.rdatatype.is_singleton(rd.rdtype) and len(self) > 0: | |
self.clear() | |
super().add(rd) | |
def union_update(self, other): | |
self.update_ttl(other.ttl) | |
super().union_update(other) | |
def intersection_update(self, other): | |
self.update_ttl(other.ttl) | |
super().intersection_update(other) | |
def update(self, other): | |
"""Add all rdatas in other to self. | |
*other*, a ``dns.rdataset.Rdataset``, the rdataset from which | |
to update. | |
""" | |
self.update_ttl(other.ttl) | |
super().update(other) | |
def _rdata_repr(self): | |
def maybe_truncate(s): | |
if len(s) > 100: | |
return s[:100] + "..." | |
return s | |
return "[%s]" % ", ".join("<%s>" % maybe_truncate(str(rr)) for rr in self) | |
def __repr__(self): | |
if self.covers == 0: | |
ctext = "" | |
else: | |
ctext = "(" + dns.rdatatype.to_text(self.covers) + ")" | |
return ( | |
"<DNS " | |
+ dns.rdataclass.to_text(self.rdclass) | |
+ " " | |
+ dns.rdatatype.to_text(self.rdtype) | |
+ ctext | |
+ " rdataset: " | |
+ self._rdata_repr() | |
+ ">" | |
) | |
def __str__(self): | |
return self.to_text() | |
def __eq__(self, other): | |
if not isinstance(other, Rdataset): | |
return False | |
if ( | |
self.rdclass != other.rdclass | |
or self.rdtype != other.rdtype | |
or self.covers != other.covers | |
): | |
return False | |
return super().__eq__(other) | |
def __ne__(self, other): | |
return not self.__eq__(other) | |
def to_text( | |
self, | |
name: Optional[dns.name.Name] = None, | |
origin: Optional[dns.name.Name] = None, | |
relativize: bool = True, | |
override_rdclass: Optional[dns.rdataclass.RdataClass] = None, | |
want_comments: bool = False, | |
**kw: Dict[str, Any], | |
) -> str: | |
"""Convert the rdataset into DNS zone file format. | |
See ``dns.name.Name.choose_relativity`` for more information | |
on how *origin* and *relativize* determine the way names | |
are emitted. | |
Any additional keyword arguments are passed on to the rdata | |
``to_text()`` method. | |
*name*, a ``dns.name.Name``. If name is not ``None``, emit RRs with | |
*name* as the owner name. | |
*origin*, a ``dns.name.Name`` or ``None``, the origin for relative | |
names. | |
*relativize*, a ``bool``. If ``True``, names will be relativized | |
to *origin*. | |
*override_rdclass*, a ``dns.rdataclass.RdataClass`` or ``None``. | |
If not ``None``, use this class instead of the Rdataset's class. | |
*want_comments*, a ``bool``. If ``True``, emit comments for rdata | |
which have them. The default is ``False``. | |
""" | |
if name is not None: | |
name = name.choose_relativity(origin, relativize) | |
ntext = str(name) | |
pad = " " | |
else: | |
ntext = "" | |
pad = "" | |
s = io.StringIO() | |
if override_rdclass is not None: | |
rdclass = override_rdclass | |
else: | |
rdclass = self.rdclass | |
if len(self) == 0: | |
# | |
# Empty rdatasets are used for the question section, and in | |
# some dynamic updates, so we don't need to print out the TTL | |
# (which is meaningless anyway). | |
# | |
s.write( | |
"{}{}{} {}\n".format( | |
ntext, | |
pad, | |
dns.rdataclass.to_text(rdclass), | |
dns.rdatatype.to_text(self.rdtype), | |
) | |
) | |
else: | |
for rd in self: | |
extra = "" | |
if want_comments: | |
if rd.rdcomment: | |
extra = f" ;{rd.rdcomment}" | |
s.write( | |
"%s%s%d %s %s %s%s\n" | |
% ( | |
ntext, | |
pad, | |
self.ttl, | |
dns.rdataclass.to_text(rdclass), | |
dns.rdatatype.to_text(self.rdtype), | |
rd.to_text(origin=origin, relativize=relativize, **kw), | |
extra, | |
) | |
) | |
# | |
# We strip off the final \n for the caller's convenience in printing | |
# | |
return s.getvalue()[:-1] | |
def to_wire( | |
self, | |
name: dns.name.Name, | |
file: Any, | |
compress: Optional[dns.name.CompressType] = None, | |
origin: Optional[dns.name.Name] = None, | |
override_rdclass: Optional[dns.rdataclass.RdataClass] = None, | |
want_shuffle: bool = True, | |
) -> int: | |
"""Convert the rdataset to wire format. | |
*name*, a ``dns.name.Name`` is the owner name to use. | |
*file* is the file where the name is emitted (typically a | |
BytesIO file). | |
*compress*, a ``dict``, is the compression table to use. If | |
``None`` (the default), names will not be compressed. | |
*origin* is a ``dns.name.Name`` or ``None``. If the name is | |
relative and origin is not ``None``, then *origin* will be appended | |
to it. | |
*override_rdclass*, an ``int``, is used as the class instead of the | |
class of the rdataset. This is useful when rendering rdatasets | |
associated with dynamic updates. | |
*want_shuffle*, a ``bool``. If ``True``, then the order of the | |
Rdatas within the Rdataset will be shuffled before rendering. | |
Returns an ``int``, the number of records emitted. | |
""" | |
if override_rdclass is not None: | |
rdclass = override_rdclass | |
want_shuffle = False | |
else: | |
rdclass = self.rdclass | |
if len(self) == 0: | |
name.to_wire(file, compress, origin) | |
file.write(struct.pack("!HHIH", self.rdtype, rdclass, 0, 0)) | |
return 1 | |
else: | |
l: Union[Rdataset, List[dns.rdata.Rdata]] | |
if want_shuffle: | |
l = list(self) | |
random.shuffle(l) | |
else: | |
l = self | |
for rd in l: | |
name.to_wire(file, compress, origin) | |
file.write(struct.pack("!HHI", self.rdtype, rdclass, self.ttl)) | |
with dns.renderer.prefixed_length(file, 2): | |
rd.to_wire(file, compress, origin) | |
return len(self) | |
def match( | |
self, | |
rdclass: dns.rdataclass.RdataClass, | |
rdtype: dns.rdatatype.RdataType, | |
covers: dns.rdatatype.RdataType, | |
) -> bool: | |
"""Returns ``True`` if this rdataset matches the specified class, | |
type, and covers. | |
""" | |
if self.rdclass == rdclass and self.rdtype == rdtype and self.covers == covers: | |
return True | |
return False | |
def processing_order(self) -> List[dns.rdata.Rdata]: | |
"""Return rdatas in a valid processing order according to the type's | |
specification. For example, MX records are in preference order from | |
lowest to highest preferences, with items of the same preference | |
shuffled. | |
For types that do not define a processing order, the rdatas are | |
simply shuffled. | |
""" | |
if len(self) == 0: | |
return [] | |
else: | |
return self[0]._processing_order(iter(self)) | |
class ImmutableRdataset(Rdataset): # lgtm[py/missing-equals] | |
"""An immutable DNS rdataset.""" | |
_clone_class = Rdataset | |
def __init__(self, rdataset: Rdataset): | |
"""Create an immutable rdataset from the specified rdataset.""" | |
super().__init__( | |
rdataset.rdclass, rdataset.rdtype, rdataset.covers, rdataset.ttl | |
) | |
self.items = dns.immutable.Dict(rdataset.items) | |
def update_ttl(self, ttl): | |
raise TypeError("immutable") | |
def add(self, rd, ttl=None): | |
raise TypeError("immutable") | |
def union_update(self, other): | |
raise TypeError("immutable") | |
def intersection_update(self, other): | |
raise TypeError("immutable") | |
def update(self, other): | |
raise TypeError("immutable") | |
def __delitem__(self, i): | |
raise TypeError("immutable") | |
# lgtm complains about these not raising ArithmeticError, but there is | |
# precedent for overrides of these methods in other classes to raise | |
# TypeError, and it seems like the better exception. | |
def __ior__(self, other): # lgtm[py/unexpected-raise-in-special-method] | |
raise TypeError("immutable") | |
def __iand__(self, other): # lgtm[py/unexpected-raise-in-special-method] | |
raise TypeError("immutable") | |
def __iadd__(self, other): # lgtm[py/unexpected-raise-in-special-method] | |
raise TypeError("immutable") | |
def __isub__(self, other): # lgtm[py/unexpected-raise-in-special-method] | |
raise TypeError("immutable") | |
def clear(self): | |
raise TypeError("immutable") | |
def __copy__(self): | |
return ImmutableRdataset(super().copy()) | |
def copy(self): | |
return ImmutableRdataset(super().copy()) | |
def union(self, other): | |
return ImmutableRdataset(super().union(other)) | |
def intersection(self, other): | |
return ImmutableRdataset(super().intersection(other)) | |
def difference(self, other): | |
return ImmutableRdataset(super().difference(other)) | |
def symmetric_difference(self, other): | |
return ImmutableRdataset(super().symmetric_difference(other)) | |
def from_text_list( | |
rdclass: Union[dns.rdataclass.RdataClass, str], | |
rdtype: Union[dns.rdatatype.RdataType, str], | |
ttl: int, | |
text_rdatas: Collection[str], | |
idna_codec: Optional[dns.name.IDNACodec] = None, | |
origin: Optional[dns.name.Name] = None, | |
relativize: bool = True, | |
relativize_to: Optional[dns.name.Name] = None, | |
) -> Rdataset: | |
"""Create an rdataset with the specified class, type, and TTL, and with | |
the specified list of rdatas in text format. | |
*idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA | |
encoder/decoder to use; if ``None``, the default IDNA 2003 | |
encoder/decoder is used. | |
*origin*, a ``dns.name.Name`` (or ``None``), the | |
origin to use for relative names. | |
*relativize*, a ``bool``. If true, name will be relativized. | |
*relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use | |
when relativizing names. If not set, the *origin* value will be used. | |
Returns a ``dns.rdataset.Rdataset`` object. | |
""" | |
rdclass = dns.rdataclass.RdataClass.make(rdclass) | |
rdtype = dns.rdatatype.RdataType.make(rdtype) | |
r = Rdataset(rdclass, rdtype) | |
r.update_ttl(ttl) | |
for t in text_rdatas: | |
rd = dns.rdata.from_text( | |
r.rdclass, r.rdtype, t, origin, relativize, relativize_to, idna_codec | |
) | |
r.add(rd) | |
return r | |
def from_text( | |
rdclass: Union[dns.rdataclass.RdataClass, str], | |
rdtype: Union[dns.rdatatype.RdataType, str], | |
ttl: int, | |
*text_rdatas: Any, | |
) -> Rdataset: | |
"""Create an rdataset with the specified class, type, and TTL, and with | |
the specified rdatas in text format. | |
Returns a ``dns.rdataset.Rdataset`` object. | |
""" | |
return from_text_list(rdclass, rdtype, ttl, cast(Collection[str], text_rdatas)) | |
def from_rdata_list(ttl: int, rdatas: Collection[dns.rdata.Rdata]) -> Rdataset: | |
"""Create an rdataset with the specified TTL, and with | |
the specified list of rdata objects. | |
Returns a ``dns.rdataset.Rdataset`` object. | |
""" | |
if len(rdatas) == 0: | |
raise ValueError("rdata list must not be empty") | |
r = None | |
for rd in rdatas: | |
if r is None: | |
r = Rdataset(rd.rdclass, rd.rdtype) | |
r.update_ttl(ttl) | |
r.add(rd) | |
assert r is not None | |
return r | |
def from_rdata(ttl: int, *rdatas: Any) -> Rdataset: | |
"""Create an rdataset with the specified TTL, and with | |
the specified rdata objects. | |
Returns a ``dns.rdataset.Rdataset`` object. | |
""" | |
return from_rdata_list(ttl, cast(Collection[dns.rdata.Rdata], rdatas)) | |