JFIF$        dd7 

Viewing File: /usr/lib/python3.9/site-packages/dns/_ddr.py

# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
#
# Support for Discovery of Designated Resolvers

import socket
import time
from urllib.parse import urlparse

import dns.asyncbackend
import dns.inet
import dns.name
import dns.nameserver
import dns.query
import dns.rdtypes.svcbbase

# The special name of the local resolver when using DDR
_local_resolver_name = dns.name.from_text("_dns.resolver.arpa")


#
# Processing is split up into I/O independent and I/O dependent parts to
# make supporting sync and async versions easy.
#


class _SVCBInfo:
    def __init__(self, bootstrap_address, port, hostname, nameservers):
        self.bootstrap_address = bootstrap_address
        self.port = port
        self.hostname = hostname
        self.nameservers = nameservers

    def ddr_check_certificate(self, cert):
        """Verify that the _SVCBInfo's address is in the cert's subjectAltName (SAN)"""
        for name, value in cert["subjectAltName"]:
            if name == "IP Address" and value == self.bootstrap_address:
                return True
        return False

    def make_tls_context(self):
        ssl = dns.query.ssl
        ctx = ssl.create_default_context()
        ctx.minimum_version = ssl.TLSVersion.TLSv1_2
        return ctx

    def ddr_tls_check_sync(self, lifetime):
        ctx = self.make_tls_context()
        expiration = time.time() + lifetime
        with socket.create_connection(
            (self.bootstrap_address, self.port), lifetime
        ) as s:
            with ctx.wrap_socket(s, server_hostname=self.hostname) as ts:
                ts.settimeout(dns.query._remaining(expiration))
                ts.do_handshake()
                cert = ts.getpeercert()
                return self.ddr_check_certificate(cert)

    async def ddr_tls_check_async(self, lifetime, backend=None):
        if backend is None:
            backend = dns.asyncbackend.get_default_backend()
        ctx = self.make_tls_context()
        expiration = time.time() + lifetime
        async with await backend.make_socket(
            dns.inet.af_for_address(self.bootstrap_address),
            socket.SOCK_STREAM,
            0,
            None,
            (self.bootstrap_address, self.port),
            lifetime,
            ctx,
            self.hostname,
        ) as ts:
            cert = await ts.getpeercert(dns.query._remaining(expiration))
            return self.ddr_check_certificate(cert)


def _extract_nameservers_from_svcb(answer):
    bootstrap_address = answer.nameserver
    if not dns.inet.is_address(bootstrap_address):
        return []
    infos = []
    for rr in answer.rrset.processing_order():
        nameservers = []
        param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.ALPN)
        if param is None:
            continue
        alpns = set(param.ids)
        host = rr.target.to_text(omit_final_dot=True)
        port = None
        param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.PORT)
        if param is not None:
            port = param.port
        # For now we ignore address hints and address resolution and always use the
        # bootstrap address
        if b"h2" in alpns:
            param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.DOHPATH)
            if param is None or not param.value.endswith(b"{?dns}"):
                continue
            path = param.value[:-6].decode()
            if not path.startswith("/"):
                path = "/" + path
            if port is None:
                port = 443
            url = f"https://{host}:{port}{path}"
            # check the URL
            try:
                urlparse(url)
                nameservers.append(dns.nameserver.DoHNameserver(url, bootstrap_address))
            except Exception:
                # continue processing other ALPN types
                pass
        if b"dot" in alpns:
            if port is None:
                port = 853
            nameservers.append(
                dns.nameserver.DoTNameserver(bootstrap_address, port, host)
            )
        if b"doq" in alpns:
            if port is None:
                port = 853
            nameservers.append(
                dns.nameserver.DoQNameserver(bootstrap_address, port, True, host)
            )
        if len(nameservers) > 0:
            infos.append(_SVCBInfo(bootstrap_address, port, host, nameservers))
    return infos


def _get_nameservers_sync(answer, lifetime):
    """Return a list of TLS-validated resolver nameservers extracted from an SVCB
    answer."""
    nameservers = []
    infos = _extract_nameservers_from_svcb(answer)
    for info in infos:
        try:
            if info.ddr_tls_check_sync(lifetime):
                nameservers.extend(info.nameservers)
        except Exception:
            pass
    return nameservers


async def _get_nameservers_async(answer, lifetime):
    """Return a list of TLS-validated resolver nameservers extracted from an SVCB
    answer."""
    nameservers = []
    infos = _extract_nameservers_from_svcb(answer)
    for info in infos:
        try:
            if await info.ddr_tls_check_async(lifetime):
                nameservers.extend(info.nameservers)
        except Exception:
            pass
    return nameservers
Back to Directory  nL+D550H?Mx ,D"v]qv;6*Zqn)ZP0!1 A "#a$2Qr D8 a Ri[f\mIykIw0cuFcRı?lO7к_f˓[C$殷WF<_W ԣsKcëIzyQy/_LKℂ;C",pFA:/]=H  ~,ls/9ć:[=/#f;)x{ٛEQ )~ =𘙲r*2~ a _V=' kumFD}KYYC)({ *g&f`툪ry`=^cJ.I](*`wq1dđ#̩͑0;H]u搂@:~וKL Nsh}OIR*8:2 !lDJVo(3=M(zȰ+i*NAr6KnSl)!JJӁ* %݉?|D}d5:eP0R;{$X'xF@.ÊB {,WJuQɲRI;9QE琯62fT.DUJ;*cP A\ILNj!J۱+O\͔]ޒS߼Jȧc%ANolՎprULZԛerE2=XDXgVQeӓk yP7U*omQIs,K`)6\G3t?pgjrmۛجwluGtfh9uyP0D;Uڽ"OXlif$)&|ML0Zrm1[HXPlPR0'G=i2N+0e2]]9VTPO׮7h(F*癈'=QVZDF,d߬~TX G[`le69CR(!S2!P <0x<!1AQ "Raq02Br#SCTb ?Ζ"]mH5WR7k.ۛ!}Q~+yԏz|@T20S~Kek *zFf^2X*(@8r?CIuI|֓>^ExLgNUY+{.RѪ τV׸YTD I62'8Y27'\TP.6d&˦@Vqi|8-OΕ]ʔ U=TL8=;6c| !qfF3aů&~$l}'NWUs$Uk^SV:U# 6w++s&r+nڐ{@29 gL u"TÙM=6(^"7r}=6YݾlCuhquympǦ GjhsǜNlɻ}o7#S6aw4!OSrD57%|?x>L |/nD6?/8w#[)L7+6〼T ATg!%5MmZ/c-{1_Je"|^$'O&ޱմTrb$w)R$& N1EtdU3Uȉ1pM"N*(DNyd96.(jQ)X 5cQɎMyW?Q*!R>6=7)Xj5`J]e8%t!+'!1Q5 !1 AQaqё#2"0BRb?Gt^## .llQT $v,,m㵜5ubV =sY+@d{N! dnO<.-B;_wJt6;QJd.Qc%p{ 1,sNDdFHI0ГoXшe黅XۢF:)[FGXƹ/w_cMeD,ʡcc.WDtA$j@:) -# u c1<@ۗ9F)KJ-hpP]_x[qBlbpʖw q"LFGdƶ*s+ډ_Zc"?%t[IP 6J]#=ɺVvvCGsGh1 >)6|ey?Lӣm,4GWUi`]uJVoVDG< SB6ϏQ@ TiUlyOU0kfV~~}SZ@*WUUi##; s/[=!7}"WN]'(L! ~y5g9T̅JkbM' +s:S +B)v@Mj e Cf jE 0Y\QnzG1д~Wo{T9?`Rmyhsy3!HAD]mc1~2LSu7xT;j$`}4->L#vzŏILS ֭T{rjGKC;bpU=-`BsK.SFw4Mq]ZdHS0)tLg