Spaces:
Running
Running
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license | |
# Copyright (C) 2003-2017 Nominum, Inc. | |
# | |
# Permission to use, copy, modify, and distribute this software and its | |
# documentation for any purpose with or without fee is hereby granted, | |
# provided that the above copyright notice and this permission notice | |
# appear in all copies. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES | |
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR | |
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | |
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | |
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT | |
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
"""Common DNSSEC-related functions and constants.""" | |
import base64 | |
import contextlib | |
import functools | |
import hashlib | |
import struct | |
import time | |
from datetime import datetime | |
from typing import Callable, Dict, List, Optional, Set, Tuple, Union, cast | |
import dns._features | |
import dns.exception | |
import dns.name | |
import dns.node | |
import dns.rdata | |
import dns.rdataclass | |
import dns.rdataset | |
import dns.rdatatype | |
import dns.rrset | |
import dns.transaction | |
import dns.zone | |
from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash | |
from dns.exception import ( # pylint: disable=W0611 | |
AlgorithmKeyMismatch, | |
DeniedByPolicy, | |
UnsupportedAlgorithm, | |
ValidationFailure, | |
) | |
from dns.rdtypes.ANY.CDNSKEY import CDNSKEY | |
from dns.rdtypes.ANY.CDS import CDS | |
from dns.rdtypes.ANY.DNSKEY import DNSKEY | |
from dns.rdtypes.ANY.DS import DS | |
from dns.rdtypes.ANY.NSEC import NSEC, Bitmap | |
from dns.rdtypes.ANY.NSEC3PARAM import NSEC3PARAM | |
from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime | |
from dns.rdtypes.dnskeybase import Flag | |
PublicKey = Union[ | |
"GenericPublicKey", | |
"rsa.RSAPublicKey", | |
"ec.EllipticCurvePublicKey", | |
"ed25519.Ed25519PublicKey", | |
"ed448.Ed448PublicKey", | |
] | |
PrivateKey = Union[ | |
"GenericPrivateKey", | |
"rsa.RSAPrivateKey", | |
"ec.EllipticCurvePrivateKey", | |
"ed25519.Ed25519PrivateKey", | |
"ed448.Ed448PrivateKey", | |
] | |
RRsetSigner = Callable[[dns.transaction.Transaction, dns.rrset.RRset], None] | |
def algorithm_from_text(text: str) -> Algorithm: | |
"""Convert text into a DNSSEC algorithm value. | |
*text*, a ``str``, the text to convert to into an algorithm value. | |
Returns an ``int``. | |
""" | |
return Algorithm.from_text(text) | |
def algorithm_to_text(value: Union[Algorithm, int]) -> str: | |
"""Convert a DNSSEC algorithm value to text | |
*value*, a ``dns.dnssec.Algorithm``. | |
Returns a ``str``, the name of a DNSSEC algorithm. | |
""" | |
return Algorithm.to_text(value) | |
def to_timestamp(value: Union[datetime, str, float, int]) -> int: | |
"""Convert various format to a timestamp""" | |
if isinstance(value, datetime): | |
return int(value.timestamp()) | |
elif isinstance(value, str): | |
return sigtime_to_posixtime(value) | |
elif isinstance(value, float): | |
return int(value) | |
elif isinstance(value, int): | |
return value | |
else: | |
raise TypeError("Unsupported timestamp type") | |
def key_id(key: Union[DNSKEY, CDNSKEY]) -> int: | |
"""Return the key id (a 16-bit number) for the specified key. | |
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` | |
Returns an ``int`` between 0 and 65535 | |
""" | |
rdata = key.to_wire() | |
if key.algorithm == Algorithm.RSAMD5: | |
return (rdata[-3] << 8) + rdata[-2] | |
else: | |
total = 0 | |
for i in range(len(rdata) // 2): | |
total += (rdata[2 * i] << 8) + rdata[2 * i + 1] | |
if len(rdata) % 2 != 0: | |
total += rdata[len(rdata) - 1] << 8 | |
total += (total >> 16) & 0xFFFF | |
return total & 0xFFFF | |
class Policy: | |
def __init__(self): | |
pass | |
def ok_to_sign(self, _: DNSKEY) -> bool: # pragma: no cover | |
return False | |
def ok_to_validate(self, _: DNSKEY) -> bool: # pragma: no cover | |
return False | |
def ok_to_create_ds(self, _: DSDigest) -> bool: # pragma: no cover | |
return False | |
def ok_to_validate_ds(self, _: DSDigest) -> bool: # pragma: no cover | |
return False | |
class SimpleDeny(Policy): | |
def __init__(self, deny_sign, deny_validate, deny_create_ds, deny_validate_ds): | |
super().__init__() | |
self._deny_sign = deny_sign | |
self._deny_validate = deny_validate | |
self._deny_create_ds = deny_create_ds | |
self._deny_validate_ds = deny_validate_ds | |
def ok_to_sign(self, key: DNSKEY) -> bool: | |
return key.algorithm not in self._deny_sign | |
def ok_to_validate(self, key: DNSKEY) -> bool: | |
return key.algorithm not in self._deny_validate | |
def ok_to_create_ds(self, algorithm: DSDigest) -> bool: | |
return algorithm not in self._deny_create_ds | |
def ok_to_validate_ds(self, algorithm: DSDigest) -> bool: | |
return algorithm not in self._deny_validate_ds | |
rfc_8624_policy = SimpleDeny( | |
{Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1, Algorithm.ECCGOST}, | |
{Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1}, | |
{DSDigest.NULL, DSDigest.SHA1, DSDigest.GOST}, | |
{DSDigest.NULL}, | |
) | |
allow_all_policy = SimpleDeny(set(), set(), set(), set()) | |
default_policy = rfc_8624_policy | |
def make_ds( | |
name: Union[dns.name.Name, str], | |
key: dns.rdata.Rdata, | |
algorithm: Union[DSDigest, str], | |
origin: Optional[dns.name.Name] = None, | |
policy: Optional[Policy] = None, | |
validating: bool = False, | |
) -> DS: | |
"""Create a DS record for a DNSSEC key. | |
*name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record. | |
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``, | |
the key the DS is about. | |
*algorithm*, a ``str`` or ``int`` specifying the hash algorithm. | |
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case | |
does not matter for these strings. | |
*origin*, a ``dns.name.Name`` or ``None``. If *key* is a relative name, | |
then it will be made absolute using the specified origin. | |
*policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, | |
``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. | |
*validating*, a ``bool``. If ``True``, then policy is checked in | |
validating mode, i.e. "Is it ok to validate using this digest algorithm?". | |
Otherwise the policy is checked in creating mode, i.e. "Is it ok to create a DS with | |
this digest algorithm?". | |
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown. | |
Raises ``DeniedByPolicy`` if the algorithm is denied by policy. | |
Returns a ``dns.rdtypes.ANY.DS.DS`` | |
""" | |
if policy is None: | |
policy = default_policy | |
try: | |
if isinstance(algorithm, str): | |
algorithm = DSDigest[algorithm.upper()] | |
except Exception: | |
raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) | |
if validating: | |
check = policy.ok_to_validate_ds | |
else: | |
check = policy.ok_to_create_ds | |
if not check(algorithm): | |
raise DeniedByPolicy | |
if not isinstance(key, (DNSKEY, CDNSKEY)): | |
raise ValueError("key is not a DNSKEY/CDNSKEY") | |
if algorithm == DSDigest.SHA1: | |
dshash = hashlib.sha1() | |
elif algorithm == DSDigest.SHA256: | |
dshash = hashlib.sha256() | |
elif algorithm == DSDigest.SHA384: | |
dshash = hashlib.sha384() | |
else: | |
raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) | |
if isinstance(name, str): | |
name = dns.name.from_text(name, origin) | |
wire = name.canonicalize().to_wire() | |
assert wire is not None | |
dshash.update(wire) | |
dshash.update(key.to_wire(origin=origin)) | |
digest = dshash.digest() | |
dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, algorithm) + digest | |
ds = dns.rdata.from_wire( | |
dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0, len(dsrdata) | |
) | |
return cast(DS, ds) | |
def make_cds( | |
name: Union[dns.name.Name, str], | |
key: dns.rdata.Rdata, | |
algorithm: Union[DSDigest, str], | |
origin: Optional[dns.name.Name] = None, | |
) -> CDS: | |
"""Create a CDS record for a DNSSEC key. | |
*name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record. | |
*key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``, | |
the key the DS is about. | |
*algorithm*, a ``str`` or ``int`` specifying the hash algorithm. | |
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case | |
does not matter for these strings. | |
*origin*, a ``dns.name.Name`` or ``None``. If *key* is a relative name, | |
then it will be made absolute using the specified origin. | |
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown. | |
Returns a ``dns.rdtypes.ANY.DS.CDS`` | |
""" | |
ds = make_ds(name, key, algorithm, origin) | |
return CDS( | |
rdclass=ds.rdclass, | |
rdtype=dns.rdatatype.CDS, | |
key_tag=ds.key_tag, | |
algorithm=ds.algorithm, | |
digest_type=ds.digest_type, | |
digest=ds.digest, | |
) | |
def _find_candidate_keys( | |
keys: Dict[dns.name.Name, Union[dns.rdataset.Rdataset, dns.node.Node]], rrsig: RRSIG | |
) -> Optional[List[DNSKEY]]: | |
value = keys.get(rrsig.signer) | |
if isinstance(value, dns.node.Node): | |
rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY) | |
else: | |
rdataset = value | |
if rdataset is None: | |
return None | |
return [ | |
cast(DNSKEY, rd) | |
for rd in rdataset | |
if rd.algorithm == rrsig.algorithm | |
and key_id(rd) == rrsig.key_tag | |
and (rd.flags & Flag.ZONE) == Flag.ZONE # RFC 4034 2.1.1 | |
and rd.protocol == 3 # RFC 4034 2.1.2 | |
] | |
def _get_rrname_rdataset( | |
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], | |
) -> Tuple[dns.name.Name, dns.rdataset.Rdataset]: | |
if isinstance(rrset, tuple): | |
return rrset[0], rrset[1] | |
else: | |
return rrset.name, rrset | |
def _validate_signature(sig: bytes, data: bytes, key: DNSKEY) -> None: | |
public_cls = get_algorithm_cls_from_dnskey(key).public_cls | |
try: | |
public_key = public_cls.from_dnskey(key) | |
except ValueError: | |
raise ValidationFailure("invalid public key") | |
public_key.verify(sig, data) | |
def _validate_rrsig( | |
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], | |
rrsig: RRSIG, | |
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]], | |
origin: Optional[dns.name.Name] = None, | |
now: Optional[float] = None, | |
policy: Optional[Policy] = None, | |
) -> None: | |
"""Validate an RRset against a single signature rdata, throwing an | |
exception if validation is not successful. | |
*rrset*, the RRset to validate. This can be a | |
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) | |
tuple. | |
*rrsig*, a ``dns.rdata.Rdata``, the signature to validate. | |
*keys*, the key dictionary, used to find the DNSKEY associated | |
with a given name. The dictionary is keyed by a | |
``dns.name.Name``, and has ``dns.node.Node`` or | |
``dns.rdataset.Rdataset`` values. | |
*origin*, a ``dns.name.Name`` or ``None``, the origin to use for relative | |
names. | |
*now*, a ``float`` or ``None``, the time, in seconds since the epoch, to | |
use as the current time when validating. If ``None``, the actual current | |
time is used. | |
*policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, | |
``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. | |
Raises ``ValidationFailure`` if the signature is expired, not yet valid, | |
the public key is invalid, the algorithm is unknown, the verification | |
fails, etc. | |
Raises ``UnsupportedAlgorithm`` if the algorithm is recognized by | |
dnspython but not implemented. | |
""" | |
if policy is None: | |
policy = default_policy | |
candidate_keys = _find_candidate_keys(keys, rrsig) | |
if candidate_keys is None: | |
raise ValidationFailure("unknown key") | |
if now is None: | |
now = time.time() | |
if rrsig.expiration < now: | |
raise ValidationFailure("expired") | |
if rrsig.inception > now: | |
raise ValidationFailure("not yet valid") | |
data = _make_rrsig_signature_data(rrset, rrsig, origin) | |
for candidate_key in candidate_keys: | |
if not policy.ok_to_validate(candidate_key): | |
continue | |
try: | |
_validate_signature(rrsig.signature, data, candidate_key) | |
return | |
except (InvalidSignature, ValidationFailure): | |
# this happens on an individual validation failure | |
continue | |
# nothing verified -- raise failure: | |
raise ValidationFailure("verify failure") | |
def _validate( | |
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], | |
rrsigset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], | |
keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]], | |
origin: Optional[dns.name.Name] = None, | |
now: Optional[float] = None, | |
policy: Optional[Policy] = None, | |
) -> None: | |
"""Validate an RRset against a signature RRset, throwing an exception | |
if none of the signatures validate. | |
*rrset*, the RRset to validate. This can be a | |
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) | |
tuple. | |
*rrsigset*, the signature RRset. This can be a | |
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) | |
tuple. | |
*keys*, the key dictionary, used to find the DNSKEY associated | |
with a given name. The dictionary is keyed by a | |
``dns.name.Name``, and has ``dns.node.Node`` or | |
``dns.rdataset.Rdataset`` values. | |
*origin*, a ``dns.name.Name``, the origin to use for relative names; | |
defaults to None. | |
*now*, an ``int`` or ``None``, the time, in seconds since the epoch, to | |
use as the current time when validating. If ``None``, the actual current | |
time is used. | |
*policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, | |
``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. | |
Raises ``ValidationFailure`` if the signature is expired, not yet valid, | |
the public key is invalid, the algorithm is unknown, the verification | |
fails, etc. | |
""" | |
if policy is None: | |
policy = default_policy | |
if isinstance(origin, str): | |
origin = dns.name.from_text(origin, dns.name.root) | |
if isinstance(rrset, tuple): | |
rrname = rrset[0] | |
else: | |
rrname = rrset.name | |
if isinstance(rrsigset, tuple): | |
rrsigname = rrsigset[0] | |
rrsigrdataset = rrsigset[1] | |
else: | |
rrsigname = rrsigset.name | |
rrsigrdataset = rrsigset | |
rrname = rrname.choose_relativity(origin) | |
rrsigname = rrsigname.choose_relativity(origin) | |
if rrname != rrsigname: | |
raise ValidationFailure("owner names do not match") | |
for rrsig in rrsigrdataset: | |
if not isinstance(rrsig, RRSIG): | |
raise ValidationFailure("expected an RRSIG") | |
try: | |
_validate_rrsig(rrset, rrsig, keys, origin, now, policy) | |
return | |
except (ValidationFailure, UnsupportedAlgorithm): | |
pass | |
raise ValidationFailure("no RRSIGs validated") | |
def _sign( | |
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], | |
private_key: PrivateKey, | |
signer: dns.name.Name, | |
dnskey: DNSKEY, | |
inception: Optional[Union[datetime, str, int, float]] = None, | |
expiration: Optional[Union[datetime, str, int, float]] = None, | |
lifetime: Optional[int] = None, | |
verify: bool = False, | |
policy: Optional[Policy] = None, | |
origin: Optional[dns.name.Name] = None, | |
) -> RRSIG: | |
"""Sign RRset using private key. | |
*rrset*, the RRset to validate. This can be a | |
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) | |
tuple. | |
*private_key*, the private key to use for signing, a | |
``cryptography.hazmat.primitives.asymmetric`` private key class applicable | |
for DNSSEC. | |
*signer*, a ``dns.name.Name``, the Signer's name. | |
*dnskey*, a ``DNSKEY`` matching ``private_key``. | |
*inception*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the | |
signature inception time. If ``None``, the current time is used. If a ``str``, the | |
format is "YYYYMMDDHHMMSS" or alternatively the number of seconds since the UNIX | |
epoch in text form; this is the same the RRSIG rdata's text form. | |
Values of type `int` or `float` are interpreted as seconds since the UNIX epoch. | |
*expiration*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature | |
expiration time. If ``None``, the expiration time will be the inception time plus | |
the value of the *lifetime* parameter. See the description of *inception* above | |
for how the various parameter types are interpreted. | |
*lifetime*, an ``int`` or ``None``, the signature lifetime in seconds. This | |
parameter is only meaningful if *expiration* is ``None``. | |
*verify*, a ``bool``. If set to ``True``, the signer will verify signatures | |
after they are created; the default is ``False``. | |
*policy*, a ``dns.dnssec.Policy`` or ``None``. If ``None``, the default policy, | |
``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624. | |
*origin*, a ``dns.name.Name`` or ``None``. If ``None``, the default, then all | |
names in the rrset (including its owner name) must be absolute; otherwise the | |
specified origin will be used to make names absolute when signing. | |
Raises ``DeniedByPolicy`` if the signature is denied by policy. | |
""" | |
if policy is None: | |
policy = default_policy | |
if not policy.ok_to_sign(dnskey): | |
raise DeniedByPolicy | |
if isinstance(rrset, tuple): | |
rdclass = rrset[1].rdclass | |
rdtype = rrset[1].rdtype | |
rrname = rrset[0] | |
original_ttl = rrset[1].ttl | |
else: | |
rdclass = rrset.rdclass | |
rdtype = rrset.rdtype | |
rrname = rrset.name | |
original_ttl = rrset.ttl | |
if inception is not None: | |
rrsig_inception = to_timestamp(inception) | |
else: | |
rrsig_inception = int(time.time()) | |
if expiration is not None: | |
rrsig_expiration = to_timestamp(expiration) | |
elif lifetime is not None: | |
rrsig_expiration = rrsig_inception + lifetime | |
else: | |
raise ValueError("expiration or lifetime must be specified") | |
# Derelativize now because we need a correct labels length for the | |
# rrsig_template. | |
if origin is not None: | |
rrname = rrname.derelativize(origin) | |
labels = len(rrname) - 1 | |
# Adjust labels appropriately for wildcards. | |
if rrname.is_wild(): | |
labels -= 1 | |
rrsig_template = RRSIG( | |
rdclass=rdclass, | |
rdtype=dns.rdatatype.RRSIG, | |
type_covered=rdtype, | |
algorithm=dnskey.algorithm, | |
labels=labels, | |
original_ttl=original_ttl, | |
expiration=rrsig_expiration, | |
inception=rrsig_inception, | |
key_tag=key_id(dnskey), | |
signer=signer, | |
signature=b"", | |
) | |
data = dns.dnssec._make_rrsig_signature_data(rrset, rrsig_template, origin) | |
if isinstance(private_key, GenericPrivateKey): | |
signing_key = private_key | |
else: | |
try: | |
private_cls = get_algorithm_cls_from_dnskey(dnskey) | |
signing_key = private_cls(key=private_key) | |
except UnsupportedAlgorithm: | |
raise TypeError("Unsupported key algorithm") | |
signature = signing_key.sign(data, verify) | |
return cast(RRSIG, rrsig_template.replace(signature=signature)) | |
def _make_rrsig_signature_data( | |
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], | |
rrsig: RRSIG, | |
origin: Optional[dns.name.Name] = None, | |
) -> bytes: | |
"""Create signature rdata. | |
*rrset*, the RRset to sign/validate. This can be a | |
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) | |
tuple. | |
*rrsig*, a ``dns.rdata.Rdata``, the signature to validate, or the | |
signature template used when signing. | |
*origin*, a ``dns.name.Name`` or ``None``, the origin to use for relative | |
names. | |
Raises ``UnsupportedAlgorithm`` if the algorithm is recognized by | |
dnspython but not implemented. | |
""" | |
if isinstance(origin, str): | |
origin = dns.name.from_text(origin, dns.name.root) | |
signer = rrsig.signer | |
if not signer.is_absolute(): | |
if origin is None: | |
raise ValidationFailure("relative RR name without an origin specified") | |
signer = signer.derelativize(origin) | |
# For convenience, allow the rrset to be specified as a (name, | |
# rdataset) tuple as well as a proper rrset | |
rrname, rdataset = _get_rrname_rdataset(rrset) | |
data = b"" | |
data += rrsig.to_wire(origin=signer)[:18] | |
data += rrsig.signer.to_digestable(signer) | |
# Derelativize the name before considering labels. | |
if not rrname.is_absolute(): | |
if origin is None: | |
raise ValidationFailure("relative RR name without an origin specified") | |
rrname = rrname.derelativize(origin) | |
name_len = len(rrname) | |
if rrname.is_wild() and rrsig.labels != name_len - 2: | |
raise ValidationFailure("wild owner name has wrong label length") | |
if name_len - 1 < rrsig.labels: | |
raise ValidationFailure("owner name longer than RRSIG labels") | |
elif rrsig.labels < name_len - 1: | |
suffix = rrname.split(rrsig.labels + 1)[1] | |
rrname = dns.name.from_text("*", suffix) | |
rrnamebuf = rrname.to_digestable() | |
rrfixed = struct.pack("!HHI", rdataset.rdtype, rdataset.rdclass, rrsig.original_ttl) | |
rdatas = [rdata.to_digestable(origin) for rdata in rdataset] | |
for rdata in sorted(rdatas): | |
data += rrnamebuf | |
data += rrfixed | |
rrlen = struct.pack("!H", len(rdata)) | |
data += rrlen | |
data += rdata | |
return data | |
def _make_dnskey( | |
public_key: PublicKey, | |
algorithm: Union[int, str], | |
flags: int = Flag.ZONE, | |
protocol: int = 3, | |
) -> DNSKEY: | |
"""Convert a public key to DNSKEY Rdata | |
*public_key*, a ``PublicKey`` (``GenericPublicKey`` or | |
``cryptography.hazmat.primitives.asymmetric``) to convert. | |
*algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm. | |
*flags*: DNSKEY flags field as an integer. | |
*protocol*: DNSKEY protocol field as an integer. | |
Raises ``ValueError`` if the specified key algorithm parameters are not | |
unsupported, ``TypeError`` if the key type is unsupported, | |
`UnsupportedAlgorithm` if the algorithm is unknown and | |
`AlgorithmKeyMismatch` if the algorithm does not match the key type. | |
Return DNSKEY ``Rdata``. | |
""" | |
algorithm = Algorithm.make(algorithm) | |
if isinstance(public_key, GenericPublicKey): | |
return public_key.to_dnskey(flags=flags, protocol=protocol) | |
else: | |
public_cls = get_algorithm_cls(algorithm).public_cls | |
return public_cls(key=public_key).to_dnskey(flags=flags, protocol=protocol) | |
def _make_cdnskey( | |
public_key: PublicKey, | |
algorithm: Union[int, str], | |
flags: int = Flag.ZONE, | |
protocol: int = 3, | |
) -> CDNSKEY: | |
"""Convert a public key to CDNSKEY Rdata | |
*public_key*, the public key to convert, a | |
``cryptography.hazmat.primitives.asymmetric`` public key class applicable | |
for DNSSEC. | |
*algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm. | |
*flags*: DNSKEY flags field as an integer. | |
*protocol*: DNSKEY protocol field as an integer. | |
Raises ``ValueError`` if the specified key algorithm parameters are not | |
unsupported, ``TypeError`` if the key type is unsupported, | |
`UnsupportedAlgorithm` if the algorithm is unknown and | |
`AlgorithmKeyMismatch` if the algorithm does not match the key type. | |
Return CDNSKEY ``Rdata``. | |
""" | |
dnskey = _make_dnskey(public_key, algorithm, flags, protocol) | |
return CDNSKEY( | |
rdclass=dnskey.rdclass, | |
rdtype=dns.rdatatype.CDNSKEY, | |
flags=dnskey.flags, | |
protocol=dnskey.protocol, | |
algorithm=dnskey.algorithm, | |
key=dnskey.key, | |
) | |
def nsec3_hash( | |
domain: Union[dns.name.Name, str], | |
salt: Optional[Union[str, bytes]], | |
iterations: int, | |
algorithm: Union[int, str], | |
) -> str: | |
""" | |
Calculate the NSEC3 hash, according to | |
https://tools.ietf.org/html/rfc5155#section-5 | |
*domain*, a ``dns.name.Name`` or ``str``, the name to hash. | |
*salt*, a ``str``, ``bytes``, or ``None``, the hash salt. If a | |
string, it is decoded as a hex string. | |
*iterations*, an ``int``, the number of iterations. | |
*algorithm*, a ``str`` or ``int``, the hash algorithm. | |
The only defined algorithm is SHA1. | |
Returns a ``str``, the encoded NSEC3 hash. | |
""" | |
b32_conversion = str.maketrans( | |
"ABCDEFGHIJKLMNOPQRSTUVWXYZ234567", "0123456789ABCDEFGHIJKLMNOPQRSTUV" | |
) | |
try: | |
if isinstance(algorithm, str): | |
algorithm = NSEC3Hash[algorithm.upper()] | |
except Exception: | |
raise ValueError("Wrong hash algorithm (only SHA1 is supported)") | |
if algorithm != NSEC3Hash.SHA1: | |
raise ValueError("Wrong hash algorithm (only SHA1 is supported)") | |
if salt is None: | |
salt_encoded = b"" | |
elif isinstance(salt, str): | |
if len(salt) % 2 == 0: | |
salt_encoded = bytes.fromhex(salt) | |
else: | |
raise ValueError("Invalid salt length") | |
else: | |
salt_encoded = salt | |
if not isinstance(domain, dns.name.Name): | |
domain = dns.name.from_text(domain) | |
domain_encoded = domain.canonicalize().to_wire() | |
assert domain_encoded is not None | |
digest = hashlib.sha1(domain_encoded + salt_encoded).digest() | |
for _ in range(iterations): | |
digest = hashlib.sha1(digest + salt_encoded).digest() | |
output = base64.b32encode(digest).decode("utf-8") | |
output = output.translate(b32_conversion) | |
return output | |
def make_ds_rdataset( | |
rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]], | |
algorithms: Set[Union[DSDigest, str]], | |
origin: Optional[dns.name.Name] = None, | |
) -> dns.rdataset.Rdataset: | |
"""Create a DS record from DNSKEY/CDNSKEY/CDS. | |
*rrset*, the RRset to create DS Rdataset for. This can be a | |
``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``) | |
tuple. | |
*algorithms*, a set of ``str`` or ``int`` specifying the hash algorithms. | |
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case | |
does not matter for these strings. If the RRset is a CDS, only digest | |
algorithms matching algorithms are accepted. | |
*origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name, | |
then it will be made absolute using the specified origin. | |
Raises ``UnsupportedAlgorithm`` if any of the algorithms are unknown and | |
``ValueError`` if the given RRset is not usable. | |
Returns a ``dns.rdataset.Rdataset`` | |
""" | |
rrname, rdataset = _get_rrname_rdataset(rrset) | |
if rdataset.rdtype not in ( | |
dns.rdatatype.DNSKEY, | |
dns.rdatatype.CDNSKEY, | |
dns.rdatatype.CDS, | |
): | |
raise ValueError("rrset not a DNSKEY/CDNSKEY/CDS") | |
_algorithms = set() | |
for algorithm in algorithms: | |
try: | |
if isinstance(algorithm, str): | |
algorithm = DSDigest[algorithm.upper()] | |
except Exception: | |
raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) | |
_algorithms.add(algorithm) | |
if rdataset.rdtype == dns.rdatatype.CDS: | |
res = [] | |
for rdata in cds_rdataset_to_ds_rdataset(rdataset): | |
if rdata.digest_type in _algorithms: | |
res.append(rdata) | |
if len(res) == 0: | |
raise ValueError("no acceptable CDS rdata found") | |
return dns.rdataset.from_rdata_list(rdataset.ttl, res) | |
res = [] | |
for algorithm in _algorithms: | |
res.extend(dnskey_rdataset_to_cds_rdataset(rrname, rdataset, algorithm, origin)) | |
return dns.rdataset.from_rdata_list(rdataset.ttl, res) | |
def cds_rdataset_to_ds_rdataset( | |
rdataset: dns.rdataset.Rdataset, | |
) -> dns.rdataset.Rdataset: | |
"""Create a CDS record from DS. | |
*rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for. | |
Raises ``ValueError`` if the rdataset is not CDS. | |
Returns a ``dns.rdataset.Rdataset`` | |
""" | |
if rdataset.rdtype != dns.rdatatype.CDS: | |
raise ValueError("rdataset not a CDS") | |
res = [] | |
for rdata in rdataset: | |
res.append( | |
CDS( | |
rdclass=rdata.rdclass, | |
rdtype=dns.rdatatype.DS, | |
key_tag=rdata.key_tag, | |
algorithm=rdata.algorithm, | |
digest_type=rdata.digest_type, | |
digest=rdata.digest, | |
) | |
) | |
return dns.rdataset.from_rdata_list(rdataset.ttl, res) | |
def dnskey_rdataset_to_cds_rdataset( | |
name: Union[dns.name.Name, str], | |
rdataset: dns.rdataset.Rdataset, | |
algorithm: Union[DSDigest, str], | |
origin: Optional[dns.name.Name] = None, | |
) -> dns.rdataset.Rdataset: | |
"""Create a CDS record from DNSKEY/CDNSKEY. | |
*name*, a ``dns.name.Name`` or ``str``, the owner name of the CDS record. | |
*rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for. | |
*algorithm*, a ``str`` or ``int`` specifying the hash algorithm. | |
The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case | |
does not matter for these strings. | |
*origin*, a ``dns.name.Name`` or ``None``. If `key` is a relative name, | |
then it will be made absolute using the specified origin. | |
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown or | |
``ValueError`` if the rdataset is not DNSKEY/CDNSKEY. | |
Returns a ``dns.rdataset.Rdataset`` | |
""" | |
if rdataset.rdtype not in (dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY): | |
raise ValueError("rdataset not a DNSKEY/CDNSKEY") | |
res = [] | |
for rdata in rdataset: | |
res.append(make_cds(name, rdata, algorithm, origin)) | |
return dns.rdataset.from_rdata_list(rdataset.ttl, res) | |
def dnskey_rdataset_to_cdnskey_rdataset( | |
rdataset: dns.rdataset.Rdataset, | |
) -> dns.rdataset.Rdataset: | |
"""Create a CDNSKEY record from DNSKEY. | |
*rdataset*, a ``dns.rdataset.Rdataset``, to create CDNSKEY Rdataset for. | |
Returns a ``dns.rdataset.Rdataset`` | |
""" | |
if rdataset.rdtype != dns.rdatatype.DNSKEY: | |
raise ValueError("rdataset not a DNSKEY") | |
res = [] | |
for rdata in rdataset: | |
res.append( | |
CDNSKEY( | |
rdclass=rdataset.rdclass, | |
rdtype=rdataset.rdtype, | |
flags=rdata.flags, | |
protocol=rdata.protocol, | |
algorithm=rdata.algorithm, | |
key=rdata.key, | |
) | |
) | |
return dns.rdataset.from_rdata_list(rdataset.ttl, res) | |
def default_rrset_signer( | |
txn: dns.transaction.Transaction, | |
rrset: dns.rrset.RRset, | |
signer: dns.name.Name, | |
ksks: List[Tuple[PrivateKey, DNSKEY]], | |
zsks: List[Tuple[PrivateKey, DNSKEY]], | |
inception: Optional[Union[datetime, str, int, float]] = None, | |
expiration: Optional[Union[datetime, str, int, float]] = None, | |
lifetime: Optional[int] = None, | |
policy: Optional[Policy] = None, | |
origin: Optional[dns.name.Name] = None, | |
) -> None: | |
"""Default RRset signer""" | |
if rrset.rdtype in set( | |
[ | |
dns.rdatatype.RdataType.DNSKEY, | |
dns.rdatatype.RdataType.CDS, | |
dns.rdatatype.RdataType.CDNSKEY, | |
] | |
): | |
keys = ksks | |
else: | |
keys = zsks | |
for private_key, dnskey in keys: | |
rrsig = dns.dnssec.sign( | |
rrset=rrset, | |
private_key=private_key, | |
dnskey=dnskey, | |
inception=inception, | |
expiration=expiration, | |
lifetime=lifetime, | |
signer=signer, | |
policy=policy, | |
origin=origin, | |
) | |
txn.add(rrset.name, rrset.ttl, rrsig) | |
def sign_zone( | |
zone: dns.zone.Zone, | |
txn: Optional[dns.transaction.Transaction] = None, | |
keys: Optional[List[Tuple[PrivateKey, DNSKEY]]] = None, | |
add_dnskey: bool = True, | |
dnskey_ttl: Optional[int] = None, | |
inception: Optional[Union[datetime, str, int, float]] = None, | |
expiration: Optional[Union[datetime, str, int, float]] = None, | |
lifetime: Optional[int] = None, | |
nsec3: Optional[NSEC3PARAM] = None, | |
rrset_signer: Optional[RRsetSigner] = None, | |
policy: Optional[Policy] = None, | |
) -> None: | |
"""Sign zone. | |
*zone*, a ``dns.zone.Zone``, the zone to sign. | |
*txn*, a ``dns.transaction.Transaction``, an optional transaction to use for | |
signing. | |
*keys*, a list of (``PrivateKey``, ``DNSKEY``) tuples, to use for signing. KSK/ZSK | |
roles are assigned automatically if the SEP flag is used, otherwise all RRsets are | |
signed by all keys. | |
*add_dnskey*, a ``bool``. If ``True``, the default, all specified DNSKEYs are | |
automatically added to the zone on signing. | |
*dnskey_ttl*, a``int``, specifies the TTL for DNSKEY RRs. If not specified the TTL | |
of the existing DNSKEY RRset used or the TTL of the SOA RRset. | |
*inception*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature | |
inception time. If ``None``, the current time is used. If a ``str``, the format is | |
"YYYYMMDDHHMMSS" or alternatively the number of seconds since the UNIX epoch in text | |
form; this is the same the RRSIG rdata's text form. Values of type `int` or `float` | |
are interpreted as seconds since the UNIX epoch. | |
*expiration*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature | |
expiration time. If ``None``, the expiration time will be the inception time plus | |
the value of the *lifetime* parameter. See the description of *inception* above for | |
how the various parameter types are interpreted. | |
*lifetime*, an ``int`` or ``None``, the signature lifetime in seconds. This | |
parameter is only meaningful if *expiration* is ``None``. | |
*nsec3*, a ``NSEC3PARAM`` Rdata, configures signing using NSEC3. Not yet | |
implemented. | |
*rrset_signer*, a ``Callable``, an optional function for signing RRsets. The | |
function requires two arguments: transaction and RRset. If the not specified, | |
``dns.dnssec.default_rrset_signer`` will be used. | |
Returns ``None``. | |
""" | |
ksks = [] | |
zsks = [] | |
# if we have both KSKs and ZSKs, split by SEP flag. if not, sign all | |
# records with all keys | |
if keys: | |
for key in keys: | |
if key[1].flags & Flag.SEP: | |
ksks.append(key) | |
else: | |
zsks.append(key) | |
if not ksks: | |
ksks = keys | |
if not zsks: | |
zsks = keys | |
else: | |
keys = [] | |
if txn: | |
cm: contextlib.AbstractContextManager = contextlib.nullcontext(txn) | |
else: | |
cm = zone.writer() | |
with cm as _txn: | |
if add_dnskey: | |
if dnskey_ttl is None: | |
dnskey = _txn.get(zone.origin, dns.rdatatype.DNSKEY) | |
if dnskey: | |
dnskey_ttl = dnskey.ttl | |
else: | |
soa = _txn.get(zone.origin, dns.rdatatype.SOA) | |
dnskey_ttl = soa.ttl | |
for _, dnskey in keys: | |
_txn.add(zone.origin, dnskey_ttl, dnskey) | |
if nsec3: | |
raise NotImplementedError("Signing with NSEC3 not yet implemented") | |
else: | |
_rrset_signer = rrset_signer or functools.partial( | |
default_rrset_signer, | |
signer=zone.origin, | |
ksks=ksks, | |
zsks=zsks, | |
inception=inception, | |
expiration=expiration, | |
lifetime=lifetime, | |
policy=policy, | |
origin=zone.origin, | |
) | |
return _sign_zone_nsec(zone, _txn, _rrset_signer) | |
def _sign_zone_nsec( | |
zone: dns.zone.Zone, | |
txn: dns.transaction.Transaction, | |
rrset_signer: Optional[RRsetSigner] = None, | |
) -> None: | |
"""NSEC zone signer""" | |
def _txn_add_nsec( | |
txn: dns.transaction.Transaction, | |
name: dns.name.Name, | |
next_secure: Optional[dns.name.Name], | |
rdclass: dns.rdataclass.RdataClass, | |
ttl: int, | |
rrset_signer: Optional[RRsetSigner] = None, | |
) -> None: | |
"""NSEC zone signer helper""" | |
mandatory_types = set( | |
[dns.rdatatype.RdataType.RRSIG, dns.rdatatype.RdataType.NSEC] | |
) | |
node = txn.get_node(name) | |
if node and next_secure: | |
types = ( | |
set([rdataset.rdtype for rdataset in node.rdatasets]) | mandatory_types | |
) | |
windows = Bitmap.from_rdtypes(list(types)) | |
rrset = dns.rrset.from_rdata( | |
name, | |
ttl, | |
NSEC( | |
rdclass=rdclass, | |
rdtype=dns.rdatatype.RdataType.NSEC, | |
next=next_secure, | |
windows=windows, | |
), | |
) | |
txn.add(rrset) | |
if rrset_signer: | |
rrset_signer(txn, rrset) | |
rrsig_ttl = zone.get_soa().minimum | |
delegation = None | |
last_secure = None | |
for name in sorted(txn.iterate_names()): | |
if delegation and name.is_subdomain(delegation): | |
# names below delegations are not secure | |
continue | |
elif txn.get(name, dns.rdatatype.NS) and name != zone.origin: | |
# inside delegation | |
delegation = name | |
else: | |
# outside delegation | |
delegation = None | |
if rrset_signer: | |
node = txn.get_node(name) | |
if node: | |
for rdataset in node.rdatasets: | |
if rdataset.rdtype == dns.rdatatype.RRSIG: | |
# do not sign RRSIGs | |
continue | |
elif delegation and rdataset.rdtype != dns.rdatatype.DS: | |
# do not sign delegations except DS records | |
continue | |
else: | |
rrset = dns.rrset.from_rdata(name, rdataset.ttl, *rdataset) | |
rrset_signer(txn, rrset) | |
# We need "is not None" as the empty name is False because its length is 0. | |
if last_secure is not None: | |
_txn_add_nsec(txn, last_secure, name, zone.rdclass, rrsig_ttl, rrset_signer) | |
last_secure = name | |
if last_secure: | |
_txn_add_nsec( | |
txn, last_secure, zone.origin, zone.rdclass, rrsig_ttl, rrset_signer | |
) | |
def _need_pyca(*args, **kwargs): | |
raise ImportError( | |
"DNSSEC validation requires python cryptography" | |
) # pragma: no cover | |
if dns._features.have("dnssec"): | |
from cryptography.exceptions import InvalidSignature | |
from cryptography.hazmat.primitives.asymmetric import dsa # pylint: disable=W0611 | |
from cryptography.hazmat.primitives.asymmetric import ec # pylint: disable=W0611 | |
from cryptography.hazmat.primitives.asymmetric import ed448 # pylint: disable=W0611 | |
from cryptography.hazmat.primitives.asymmetric import rsa # pylint: disable=W0611 | |
from cryptography.hazmat.primitives.asymmetric import ( # pylint: disable=W0611 | |
ed25519, | |
) | |
from dns.dnssecalgs import ( # pylint: disable=C0412 | |
get_algorithm_cls, | |
get_algorithm_cls_from_dnskey, | |
) | |
from dns.dnssecalgs.base import GenericPrivateKey, GenericPublicKey | |
validate = _validate # type: ignore | |
validate_rrsig = _validate_rrsig # type: ignore | |
sign = _sign | |
make_dnskey = _make_dnskey | |
make_cdnskey = _make_cdnskey | |
_have_pyca = True | |
else: # pragma: no cover | |
validate = _need_pyca | |
validate_rrsig = _need_pyca | |
sign = _need_pyca | |
make_dnskey = _need_pyca | |
make_cdnskey = _need_pyca | |
_have_pyca = False | |
### BEGIN generated Algorithm constants | |
RSAMD5 = Algorithm.RSAMD5 | |
DH = Algorithm.DH | |
DSA = Algorithm.DSA | |
ECC = Algorithm.ECC | |
RSASHA1 = Algorithm.RSASHA1 | |
DSANSEC3SHA1 = Algorithm.DSANSEC3SHA1 | |
RSASHA1NSEC3SHA1 = Algorithm.RSASHA1NSEC3SHA1 | |
RSASHA256 = Algorithm.RSASHA256 | |
RSASHA512 = Algorithm.RSASHA512 | |
ECCGOST = Algorithm.ECCGOST | |
ECDSAP256SHA256 = Algorithm.ECDSAP256SHA256 | |
ECDSAP384SHA384 = Algorithm.ECDSAP384SHA384 | |
ED25519 = Algorithm.ED25519 | |
ED448 = Algorithm.ED448 | |
INDIRECT = Algorithm.INDIRECT | |
PRIVATEDNS = Algorithm.PRIVATEDNS | |
PRIVATEOID = Algorithm.PRIVATEOID | |
### END generated Algorithm constants | |