diff --git a/pdns_importer.py b/pdns_importer.py index da54396e39781836be1f5cae4bbeff5690557570..d51a7b65a7dee9d3675c1d44543a9d32a1fd898e 100755 --- a/pdns_importer.py +++ b/pdns_importer.py @@ -113,7 +113,6 @@ class PacketParser(Process): if (dns.header.rcode != 0 or dns.header.a == 0) and dns.header.rcode in self.allowed_negative_types: # Error or no response qname = str(dns.q.qname).rstrip('.') if dns.q.qtype in self.allowed_rtypes and qname != '' and not self.is_blacklisted_domain(qname): - self.save_negative(qname, dns.q.qtype, detector, dns.header.rcode, t) self.save_negative3(qname, dns.q.qtype, detector, dns.header.rcode, t) # Positive responses for dnsrr in itertools.chain(dns.rr, dns.ar): @@ -124,9 +123,8 @@ class PacketParser(Process): continue if self.is_blacklisted_domain(rname) or (dnsrr.rtype in [5, 12, 15] and self.is_blacklisted_domain(rdata)): continue - # Unified DB v2 save + # Unified DB v3 save ttl = struct.unpack('>l', struct.pack('>L', dnsrr.ttl))[0] if dnsrr.ttl > 2**31-1 else dnsrr.ttl # Workaround for incorrect decoding of TTL in dnslib - self.save2(rname, rdata, dnsrr.rtype, t, ttl, detector, struct.pack('>H', dns.header.id), getattr(dns.header, 'ad', 0), dns.header.aa) self.save3(rname, rdata, dnsrr.rtype, t, ttl, detector, struct.pack('>H', dns.header.id), getattr(dns.header, 'ad', 0), dns.header.aa) except (dnslib.dns.DNSError, dnslib.buffer.BufferError): # Truncated DNS pass @@ -147,36 +145,6 @@ class PacketParser(Process): return True return False - def save_negative(self, qname, rtype, source, reply, timestamp): - """Mark down a negative DNS reply such as NODATA or NXDOMAIN or error""" - try: - with self.conn.cursor() as cursor: - qname_g = str(qname).lower() - # Perform IDNA validation and conversion to the U-label format - try: - qname_u = idna.decode(qname_g) if qname_g.find('xn--') != -1 else qname_g - except (idna.core.IDNAError, UnicodeError, IndexError): - return - args = (qname_u, rtype, source, reply, timestamp, timestamp, 1) - smt = psql.SQL('INSERT INTO negatives AS ORIG (qname, rtype, source, reply, time_first, time_last, count) ' - 'VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (qname, rtype, source, reply) DO UPDATE ' - 'SET count = ORIG.count + 1, time_last = GREATEST(ORIG.time_last, EXCLUDED.time_last), ' - 'time_first = LEAST(ORIG.time_first, EXCLUDED.time_first) RETURNING id') - cursor.execute(smt, args) - nid = cursor.fetchall()[0] - args = (nid, timestamp) - smt = psql.SQL('INSERT INTO negatives_details (id, time) VALUES (%s, %s) ON CONFLICT DO NOTHING RETURNING id') - cursor.execute(smt, args) - if not cursor.fetchall(): # `ON CONFLICT DO NOTHING RETURNING` only returns value in case of no conflict - raise psycopg2.errors.UniqueViolation - self.conn.commit() - except psycopg2.errors.UniqueViolation: # Importing records duplicately (unusual in production) - self.conn.rollback() - except ValueError: # A rare decoding error in dnslib, name contains a 0x00 character - self.conn.rollback() - except (psycopg2.InternalError, psycopg2.ProgrammingError) as exc: - raise RuntimeError("save_negative(): Unrecoverable error.") from exc - def save_negative3(self, qname, rtype, source, reply, timestamp): """Mark down a negative DNS reply such as NODATA or NXDOMAIN or error""" try: @@ -207,66 +175,6 @@ class PacketParser(Process): except (psycopg2.InternalError, psycopg2.ProgrammingError) as exc: raise RuntimeError("save_negative3(): Unrecoverable error.") from exc - def save2(self, rname, rdata, rtype, timestamp, ttl, source, transaction, authen, author): - """Performs INSERT/UPDATE of DNS records""" - try: - with self.conn.cursor() as cursor: - rname_generic = rname.lower() - if rtype in [1, 28]: # A / AAAA - tbl = psql.Identifier('ips') - dtbl = psql.Identifier('ips_details') - lcol = psql.Identifier('domain') - rcol = psql.Identifier('ip') - elif rtype in self.allowed_rtypes: - tbl = psql.Identifier('texts') - dtbl = psql.Identifier('texts_details') - lcol = psql.Identifier('lvalue') - rcol = psql.Identifier('rvalue') - rdata = rdata.lower() - if rtype == 15: # MX - split priority - try: - priority, rdata = rdata.split() - except ValueError: # Priority not provided - priority = None - else: - return - # Perform IDNA validation and conversion to the U-label format - try: - rname_u = idna.decode(rname_generic) if rname_generic.find('xn--') != -1 else rname_generic - rdata_u = idna.decode(rdata) if rdata.find('xn--') != -1 else rdata - except (idna.core.IDNAError, UnicodeError, IndexError): - return - # Save the overview information - args = (rname_u, rdata_u, source, rtype, 1, timestamp, timestamp) - smt = psql.SQL('INSERT INTO {tbl} AS ORIG ({lcol}, {rcol}, source, rtype, count, time_first, time_last) ' - 'VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT ({lcol}, source, rtype, {rcol}) DO UPDATE ' - 'SET count = ORIG.count + 1, time_last = GREATEST(ORIG.time_last, EXCLUDED.time_last), ' - 'time_first = LEAST(ORIG.time_first, EXCLUDED.time_first) RETURNING id').format(tbl=tbl, lcol=lcol, rcol=rcol) - cursor.execute(smt, args) - mapid = cursor.fetchall()[0] - # Decode 0x20 transaction identity bits - if rname != rname_generic: - x20ri = int(''.join(['1' if ('a' <= char <= 'z') else '0' if ('A' <= char <= 'Z') else '' for char in rname]), 2) - x20r = x20ri.to_bytes(math.ceil(x20ri.bit_length()/8), sys.byteorder) - else: - x20r = None - # Write details - args = (mapid, timestamp, ttl, transaction, x20r, str(authen) + str(author), json.dumps({'priority': priority}) if rtype == 15 else None) - smt = psql.SQL('INSERT INTO {dtbl} (id, time, ttl, transactionid, x20randomness, authenticated_authoritative, rem) ' - 'VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING RETURNING id').format(dtbl=dtbl) - cursor.execute(smt, args) - if not cursor.fetchall(): # `ON CONFLICT DO NOTHING RETURNING` only returns value in case of no conflict - raise psycopg2.errors.UniqueViolation - self.conn.commit() - except psycopg2.errors.NumericValueOutOfRange: # Erroneous TTLs - self.conn.rollback() - except psycopg2.errors.UniqueViolation: # Importing records duplicately (unusual in production) - self.conn.rollback() - except ValueError: # A rare decoding error in dnslib, name contains a 0x00 character - self.conn.rollback() - except (psycopg2.InternalError, psycopg2.ProgrammingError) as exc: - raise RuntimeError("save2(): Unrecoverable error.") from exc - def save3(self, rname, rdata, rtype, timestamp, ttl, source, transaction, authen, author): """Performs INSERT/UPDATE of DNS records""" try: