from __future__ import annotations from argparse import ArgumentParser from dataclasses import dataclass from datetime import datetime from enum import Enum, Flag, auto from io import BytesIO import socket import struct import time from typing import Any def format_hex(data: bytes, online: bool = False) -> str: hex_repr = [] line = [] for i in range(0, len(data), 2): line.append(data[i:i + 2].hex()) if not online and i % 16 == 14: hex_repr.append(' '.join(line)) line = [] if line: hex_repr.append(' '.join(line)) return '\n'.join(hex_repr) # From RFC 1035 class BaseType(Enum): A = 1 # a host address NS = 2 # an authoritative name server MD = 3 # a mail destination (Obsolete - use MX) MF = 4 # a mail forwarder (Obsolete - use MX) CNAME = 5 # the canonical name for an alias SOA = 6 # marks the start of a zone of authority MB = 7 # a mailbox domain name (EXPERIMENTAL) MG = 8 # a mail group member (EXPERIMENTAL) MR = 9 # a mail rename domain name (EXPERIMENTAL) NULL = 10 # a null RR (EXPERIMENTAL) WKS = 11 # a well known service description PTR = 12 # a domain name pointer HINFO = 13 # host information MINFO = 14 # mailbox or mail list information MX = 15 # mail exchange TXT = 16 # text strings class BaseQType(Enum): AXFR = 252 # A request for a transfer of an entire zone MAILB = 253 # A request for mailbox-related records (MB, MG or MR) MAILA = 254 # A request for mail agent RRs (Obsolete - see MX) WILDCARD = 255 # A request for all records # From https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#dns-parameters-4 class Type(Enum): RESERVED_1 = 0 # [RFC6895] A = 1 # a host address [RFC1035] NS = 2 # an authoritative name server [RFC1035] MD = 3 # a mail destination (OBSOLETE - use MX) [RFC1035] MF = 4 # a mail forwarder (OBSOLETE - use MX) [RFC1035] CNAME = 5 # the canonical name for an alias [RFC1035] SOA = 6 # marks the start of a zone of authority [RFC1035] MB = 7 # a mailbox domain name (EXPERIMENTAL) [RFC1035] MG = 8 # a mail group member (EXPERIMENTAL) [RFC1035] MR = 9 # a mail rename domain name (EXPERIMENTAL) [RFC1035] NULL = 10 # a null RR (EXPERIMENTAL) [RFC1035] WKS = 11 # a well known service description [RFC1035] PTR = 12 # a domain name pointer [RFC1035] HINFO = 13 # host information [RFC1035] MINFO = 14 # mailbox or mail list information [RFC1035] MX = 15 # mail exchange [RFC1035] TXT = 16 # text strings [RFC1035] RP = 17 # for Responsible Person [RFC1183] AFSDB = 18 # for AFS Data Base location [RFC1183] [RFC5864] X25 = 19 # for X.25 PSDN address [RFC1183] ISDN = 20 # for ISDN address [RFC1183] RT = 21 # for Route Through [RFC1183] NSAP = 22 # for NSAP address, NSAP style A record (DEPRECATED) [RFC1706] NSAP_PTR = 23 # for domain name pointer, NSAP style (DEPRECATED) [RFC1706] SIG = 24 # for security signature [RFC2536][RFC2931][RFC3110] [RFC4034] KEY = 25 # for security key [RFC2536][RFC2539][RFC3110] [RFC4034] PX = 26 # X.400 mail mapping information [RFC2163] GPOS = 27 # Geographical Position [RFC1712] AAAA = 28 # IP6 Address [RFC3596] LOC = 29 # Location Information [RFC1876] NXT = 30 # Next Domain (OBSOLETE) [RFC2535] [RFC3755] EID = 31 # Endpoint Identifier [Michael_Patton] [http://ana-3.lcs.mit.edu/~jnc/nimrod/dns.txt] NIMLOC = 32 # Nimrod Locator [1][Michael_Patton] [http://ana-3.lcs.mit.edu/~jnc/nimrod/dns.txt] SRV = 33 # Server Selection [1] [RFC2782] ATMA = 34 # ATM Address [ ATM Forum Technical Committee, "ATM Name System, V2.0", Doc ID: AF-DANS-0152.000, # July 2000. Available from and held in escrow by IANA.] NAPTR = 35 # Naming Authority Pointer [RFC3403] KX = 36 # Key Exchanger [RFC2230] CERT = 37 # CERT [RFC4398] A6 = 38 # A6 (OBSOLETE - use AAAA) [RFC2874][RFC3226] [RFC6563] DNAME = 39 # DNAME [RFC6672] SINK = 40 # SINK [Donald_E_Eastlake] [draft-eastlake-kitchen-sink-02] OPT = 41 # OPT [RFC3225] [RFC6891] APL = 42 # APL [RFC3123] DS = 43 # Delegation Signer [RFC4034] SSHFP = 44 # SSH Key Fingerprint [RFC4255] IPSECKEY = 45 # IPSECKEY [RFC4025] RRSIG = 46 # RRSIG [RFC4034] NSEC = 47 # NSEC [RFC4034] [RFC9077] DNSKEY = 48 # DNSKEY [RFC4034] DHCID = 49 # DHCID [RFC4701] NSEC3 = 50 # NSEC3 [RFC5155] [RFC9077] NSEC3PARAM = 51 # NSEC3PARAM [RFC5155] TLSA = 52 # TLSA [RFC6698] SMIMEA = 53 # S/MIME cert association [RFC8162] HIP = 55 # Host Identity Protocol [RFC8005] NINFO = 56 # NINFO [Jim_Reid] RKEY = 57 # RKEY [Jim_Reid] TALINK = 58 # Trust Anchor LINK [Wouter_Wijngaards] CDS = 59 # Child DS [RFC7344] CDNSKEY = 60 # DNSKEY(s) the Child wants reflected in DS [RFC7344] OPENPGPKEY = 61 # OpenPGP Key [RFC7929] CSYNC = 62 # Child-To-Parent Synchronization [RFC7477] ZONEMD = 63 # Message Digest Over Zone Data [RFC8976] SVCB = 64 # General-purpose service binding [RFC9460] HTTPS = 65 # SVCB-compatible type for use with HTTP [RFC9460] DSYNC = 66 # Endpoint discovery for delegation synchronization [RFC9859] HHIT = 67 # Hierarchical Host Identity Tag [draft-ietf-drip-registries-28] BRID = 68 # UAS Broadcast Remote Identification [draft-ietf-drip-registries-28] SPF = 99 # [RFC7208] UINFO = 100 # [IANA-Reserved] UID = 101 # [IANA-Reserved] GID = 102 # [IANA-Reserved] UNSPEC = 103 # [IANA-Reserved] NID = 104 # [RFC6742] L32 = 105 # [RFC6742] L64 = 106 # [RFC6742] LP = 107 # [RFC6742] EUI48 = 108 # an EUI-48 address [RFC7043] EUI64 = 109 # an EUI-64 address [RFC7043] NXNAME = 128 # NXDOMAIN indicator for Compact Denial of Existence [RFC9824] TKEY = 249 # Transaction Key [RFC2930] TSIG = 250 # Transaction Signature [RFC8945] IXFR = 251 # incremental transfer [RFC1995] AXFR = 252 # transfer of an entire zone [RFC1035] [RFC5936] MAILB = 253 # mailbox-related RRs (MB, MG or MR) [RFC1035] MAILA = 254 # mail agent RRs (OBSOLETE - see MX) [RFC1035] WILDCARD = 255 # A request for some or all records the server has available [RFC1035][RFC6895] [RFC8482] URI = 256 # URI [RFC7553] CAA = 257 # Certification Authority Restriction [RFC8659] AVC = 258 # Application Visibility and Control [Wolfgang_Riedel] DOA = 259 # Digital Object Architecture [draft-durand-doa-over-dns-02] AMTRELAY = 260 # Automatic Multicast Tunneling Relay [RFC8777] RESINFO = 261 # Resolver Information as Key/Value Pairs [RFC9606] WALLET = 262 # Public wallet address [Paul_Hoffman] CLA = 263 # BP Convergence Layer Adapter [draft-johnson-dns-ipn-cla-07] IPN = 264 # BP Node Number [draft-johnson-dns-ipn-cla-07] TA = 32768 # DNSSEC Trust Authorities [Sam_Weiler] [Deploying DNSSEC Without a Signed Root. # Technical Report 1999-19, Information Networking Institute, Carnegie Mellon University, April 2004.] DLV = 32769 # DNSSEC Lookaside Validation (OBSOLETE) [RFC8749] [RFC4431] RESEVERD_2 = 65535 class Class(Enum): IN = 1 # the Internet CS = 2 # the CSNET class (Obsolete - used only for examples in some obsolete RFCs) CH = 3 # the CHAOS class HS = 4 # Hesiod [Dyer 87] class QueryResponse(Enum): QUERY = 0 RESPONSE = 1 class OpCode(Enum): QUERY = 0 IQUERY = 1 STATUS = 2 class RCode(Enum): NO_ERROR = 0 FORMAT_ERROR = 1 SERVER_FAILURE = 2 NAME_ERROR = 3 NOT_IMPLEMENTED = 4 REFUSED = 5 def read_labels(message: BytesIO) -> list[bytes]: labels: list[bytes] = [] while (length := message.read(1)[0]) != 0: if length & 0xc0 == 0xc0: offset = (length & 0x3f_ff << 8) + message.read(1)[0] current_pos = message.tell() message.seek(offset) while (length := message.read(1)[0]) != 0: labels.append(message.read(length)) message.seek(current_pos) break labels.append(message.read(length)) return labels def read_type(message: BytesIO) -> Type: return Type(struct.unpack('!H', message.read(2))[0]) def read_class(message: BytesIO) -> Class: return Class(struct.unpack('!H', message.read(2))[0]) @dataclass(slots=True) class Header: class Flags(Flag): RA = auto() # recursion available RD = auto() # recursion desired TC = auto() # truncation AA = auto() # authoritative answer id: int qr: QueryResponse # query/answer (0=query, 1=answer) op_code: OpCode flags: Flags z: int # zero r_code: RCode # response code qd_count: int # question count an_count: int # answer count ns_count: int # authority/nameserver count ar_count: int # additionals count @staticmethod def from_bytes(message: BytesIO) -> Header: # 1 1 1 1 1 1 # 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | ID | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # |QR| Opcode |AA|TC|RD|RA| Z | RCODE | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | QDCOUNT | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | ANCOUNT | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | NSCOUNT | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | ARCOUNT | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ msg_id, flags_1, flags_2, qd_count, an_count, ns_count, ar_count = struct.unpack('!H2B4H', message.read(12)) return Header( id=msg_id, qr=QueryResponse(flags_1 >> 7), op_code=OpCode((flags_1 & 0x78) >> 3), flags=Header.Flags(((flags_1 & 0x5) << 1) + ((flags_2 & 0x80) >> 7)), z=(flags_2 & 0x70) >> 4, r_code=RCode(flags_2 & 0xf), qd_count=qd_count, an_count=an_count, ns_count=ns_count, ar_count=ar_count) def to_bytes(self) -> bytes: return struct.pack( '!HBB4H', self.id, (self.qr.value << 7) + (self.op_code.value << 3) + (self.flags.value >> 1), ((self.flags.value & 1) << 7) + (self.z << 4) + self.r_code.value, self.qd_count, self.an_count, self.ns_count, self.ar_count) def dig_repr(self) -> str: flags = (['qr'] if self.qr.value else []) + ( self.flags.name.lower().split('|') if self.flags.name else [])[::-1] return ( f';; ->>HEADER<<- opcode: {self.op_code.name}, status: {self.r_code.name}, id: {self.id}\n' f';; flags: {" ".join(flags)}; QUERY: {self.qd_count}, ANSWER: {self.an_count}' f', AUTHORITY: {self.ns_count}, ADDITIONAL: {self.ar_count}\n') @dataclass(slots=True) class Question: qname: list[bytes] qtype: Type qclass: Class @staticmethod def from_bytes(message: BytesIO) -> Question: # 1 1 1 1 1 1 # 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | | # / QNAME / # / / # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | QTYPE | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | QCLASS | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ qname = read_labels(message) qtype = read_type(message) qclass = read_class(message) return Question(qname=qname, qtype=qtype, qclass=qclass) def to_bytes(self) -> bytes: result: list[bytes] = [] for name in self.qname: result.extend([struct.pack('!B', len(name)), name]) result.extend([b'\0', struct.pack('!H', self.qtype.value), struct.pack('!H', self.qclass.value)]) return b''.join(result) def dig_repr(self) -> str: return f';{b".".join([*self.qname, b""]).decode():<31}{self.qclass.name:<8}{self.qtype.name}\n' @dataclass(slots=True) class RessourceBase: rname: list[bytes] rtype: Type @classmethod def from_bytes(cls, message: BytesIO) -> RessourceBase: rname = read_labels(message) rtype = read_type(message) return RessourceBase(rname=rname, rtype=rtype) def dig_repr(self) -> str: return f'{b".".join([*self.rname, b""]).decode():<40}{self.rtype.name:<8}\n' @dataclass(slots=True) class OPTRessource(RessourceBase): class Flags(Flag): DO = auto() @dataclass(slots=True) class Data: # Data format # +0 (MSB) +1 (LSB) # +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ # 0: | OPTION-CODE | # +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ # 2: | OPTION-LENGTH | # +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ # 4: | | # / OPTION-DATA / # / / # +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ # From https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#server-cookie-methods class OptionCode(Enum): RESERVED_1 = 0 # [RFC6891] LLQ = 1 # [RFC8764] UPDATE_LEASE = 2 # [RFC9664] NSID = 3 # [RFC5001] RESERVED_2 = 4 # [draft-cheshire-edns0-owner-option-00] DAU = 5 # [RFC6975] DHU = 6 # [RFC6975] N3U = 7 # [RFC6975] EDNS_CLIENT_SUBNET = 8 # [RFC7871] EDNS_EXPIRE = 9 # [RFC7314] COOKIE = 10 # [RFC7873] EDNS_TCP_KEEPALIVE = 11 # [RFC7828] PADDING = 12 # [RFC7830] CHAIN = 13 # [RFC7901] EDNS_KEY_TAG = 14 # [RFC8145] EXTENDED_DNS_ERROR = 15 # [RFC8914] EDNS_CLIENT_TAG = 16 # [draft-bellis-dnsop-edns-tags-01] EDNS_SERVER_TAG = 17 # [draft-bellis-dnsop-edns-tags-01] REPORT_CHANNEL = 18 # [RFC9567] ZONEVERSION = 19 # [RFC9660] MQTYPE_QUERY = 20 # [draft-ietf-dnssd-multi-qtypes-07] MQTYPE_RESPONSE = 21 # [draft-ietf-dnssd-multi-qtypes-07] # [https://developer.cisco.com/docs/cloud-security/#!integrating-network-devices/rdata-description] # [Cisco_CIE_DNS_team] UMBRELLA_IDENT = 20292 # [https://developer.cisco.com/docs/cloud-security/#!network-devices-getting-started/response-codes] # [Cisco_CIE_DNS_team] DEVICEID = 26946 option_code: OptionCode option_length: int option_data: bytes def dig_repr(self) -> str: return f'; {self.option_code.name}: {self.option_data.hex()}' # From RFC 6891 (6.1.2 - Wire Format) # +------------+--------------+------------------------------+ # | Field Name | Field Type | Description | # +------------+--------------+------------------------------+ # | NAME | domain name | MUST be 0 (root domain) | # | TYPE | u_int16_t | OPT (41) | # | CLASS | u_int16_t | requestor's UDP payload size | # | TTL | u_int32_t | extended RCODE and flags | # | RDLEN | u_int16_t | length of all RDATA | # | RDATA | octet stream | {attribute,value} pairs | # +------------+--------------+------------------------------+ udp_payload: int flags: Flags extended_rcode: int version: int data_length: int data: Data @classmethod def from_bytes(cls, message: BytesIO) -> OPTRessource: base_ressource = super(OPTRessource, cls).from_bytes(message) # Extended RCODE and flags (TTL's 32 bits) # +0 (MSB) +1 (LSB) # +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ # 0: | EXTENDED-RCODE | VERSION | # +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ # 2: | DO| Z | # +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ udp_payload, extended_rcode, version, z, data_length = struct.unpack('!H2B2H', message.read(8)) flags = OPTRessource.Flags(z >> 15) z &= 0x7fff option_code, option_length = struct.unpack('!2H', message.read(4)) option_data = message.read(option_length) return OPTRessource(rname=base_ressource.rname, rtype=base_ressource.rtype, udp_payload=udp_payload, flags=flags, extended_rcode=extended_rcode, version=version, data_length=data_length, data=OPTRessource.Data(option_code=OPTRessource.Data.OptionCode(option_code), option_length=option_length, option_data=option_data)) def dig_repr(self) -> str: return (f'; EDNS: version: {self.version}' f', flags: {self.flags.name.lower().replace("|", " ") if self.flags.name else ""}' f'; udp: {self.udp_payload}\n{self.data.dig_repr()}') @dataclass(slots=True) class Ressource(RessourceBase): rclass: Class ttl: int rd_length: int rdata: Any @classmethod def from_bytes(cls, message: BytesIO) -> Ressource: # 1 1 1 1 1 1 # 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | | # / / # / NAME / # | | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | TYPE | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | CLASS | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | TTL | # | | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ # | RDLENGTH | # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--| # / RDATA / # / / # +--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+ base_ressource = super(Ressource, cls).from_bytes(message) rclass = read_class(message) ttl = struct.unpack('!I', message.read(4))[0] rd_length = struct.unpack('!H', message.read(2))[0] match base_ressource.rtype: case Type.A: assert rd_length == 4, f'Mismatch rd_length: expected 4 from type A, got {rd_length}' rdata = struct.unpack('!4B', message.read(4)) case Type.NS: rdata = read_labels(message) case _: rdata = message.read(rd_length) return Ressource(rname=base_ressource.rname, rtype=base_ressource.rtype, rclass=rclass, ttl=ttl, rd_length=rd_length, rdata=rdata) def dig_repr(self) -> str: match self.rtype: case Type.A: rdata = '.'.join([str(v) for v in self.rdata]) case Type.NS: rdata = b'.'.join(self.rdata).decode() case _: rdata = self.rdata return f'{b".".join(self.rname).decode():<24}{self.ttl:<8}{self.rclass.name:<8}{self.rtype.name:<8}{rdata}\n' def additional_ressource_from_bytes(message: BytesIO) -> Ressource | OPTRessource: current_pos = message.tell() base = RessourceBase.from_bytes(message) message.seek(current_pos) if base.rtype == Type.OPT: return OPTRessource.from_bytes(message) return Ressource.from_bytes(message) @dataclass(slots=True) class Message: header: Header question: list[Question] answer: list[Ressource] authority: list[Ressource] additional: list[Ressource | OPTRessource] @staticmethod def from_bytes(message: BytesIO) -> Message: header = Header.from_bytes(message) questions: list[Question] = [Question.from_bytes(message) for _ in range(header.qd_count)] answers: list[Ressource] = [Ressource.from_bytes(message) for _ in range(header.an_count)] authorities: list[Ressource] = [Ressource.from_bytes(message) for _ in range(header.ns_count)] additionals: list[Ressource | OPTRessource] = [ additional_ressource_from_bytes(message) for _ in range(header.ar_count)] return Message( header=header, question=questions, answer=answers, authority=authorities, additional=additionals) def to_bytes(self) -> bytes: return b''.join([section.to_bytes() for section in [ self.header, *self.question, *self.answer, *self.authority, *self.additional]]) def dig_repr(self) -> str: sections: list[str] = [f'{self.header.dig_repr()}'] opt_section: list[OPTRessource] = [] additional_section: list[Ressource] = [] for res in self.additional: if isinstance(res, OPTRessource): opt_section.append(res) else: additional_section.append(res) if opt_section: sections.append(f';; OPT PSEUDOSECTION:\n{"".join([a.dig_repr() for a in opt_section])}') if self.question: sections.append(f';; QUESTION SECTION:\n{"".join([q.dig_repr() for q in self.question])}') if self.answer: sections.append(f';; ANSWER SECTION:\n{"".join([a.dig_repr() for a in self.answer])}') if self.authority: sections.append(f';; AUTHORITY SECTION:\n{"".join([a.dig_repr() for a in self.authority])}') if self.additional: sections.append(f';; ADDITIONAL SECTION:\n{"".join([a.dig_repr() for a in additional_section])}') return '\n'.join(sections) def main(): parser = ArgumentParser() parser.add_argument('nameserver') parser.add_argument('url') parser.add_argument('--port', type=int, default=53) arguments = parser.parse_args() nameserver: str = arguments.nameserver url: str = arguments.url port: int = arguments.port message = Message( header=Header(id=0x1234, qr=QueryResponse.QUERY, op_code=OpCode.QUERY, flags=Header.Flags.RD | Header.Flags.RA, z=0, r_code=RCode.NO_ERROR, qd_count=1, an_count=0, ns_count=0, ar_count=0), question=[Question(qname=url.encode().split(b'.'), qtype=Type.A, qclass=Class.IN)], answer=[], authority=[], additional=[]) print('\n;; Sending:') print(message.dig_repr()) print(f';; Query Size: {len(message.to_bytes())}\n') with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket: query_time = time.monotonic() udp_socket.sendto(message.to_bytes(), (nameserver, port)) response_message, _ = udp_socket.recvfrom(4096) response_time = time.monotonic() response = Message.from_bytes(BytesIO(response_message)) print(';; Got answer:') print(response.dig_repr()) print(f';; Query time: {int((response_time - query_time) * 1000)} msec\n' f';; SERVER: {nameserver}#{port}({nameserver}) (UDP)\n' f';; WHEN: {datetime.now(datetime.now().astimezone().tzinfo).strftime("%a %b %d %H:%M:%S %Z %Y")}\n' f';; MSG SIZE rcvd: {len(response_message)}\n') if __name__ == '__main__': main()