Skip to content
Snippets Groups Projects
Commit 51a81c38 authored by Radko Krkoš's avatar Radko Krkoš
Browse files

pdns_importer: Drop DBv2 save

parent f96256b4
No related branches found
No related tags found
No related merge requests found
......@@ -113,7 +113,6 @@ class PacketParser(Process):
if (dns.header.rcode != 0 or dns.header.a == 0) and dns.header.rcode in self.allowed_negative_types: # Error or no response
qname = str(dns.q.qname).rstrip('.')
if dns.q.qtype in self.allowed_rtypes and qname != '' and not self.is_blacklisted_domain(qname):
self.save_negative(qname, dns.q.qtype, detector, dns.header.rcode, t)
self.save_negative3(qname, dns.q.qtype, detector, dns.header.rcode, t)
# Positive responses
for dnsrr in itertools.chain(dns.rr, dns.ar):
......@@ -124,9 +123,8 @@ class PacketParser(Process):
continue
if self.is_blacklisted_domain(rname) or (dnsrr.rtype in [5, 12, 15] and self.is_blacklisted_domain(rdata)):
continue
# Unified DB v2 save
# Unified DB v3 save
ttl = struct.unpack('>l', struct.pack('>L', dnsrr.ttl))[0] if dnsrr.ttl > 2**31-1 else dnsrr.ttl # Workaround for incorrect decoding of TTL in dnslib
self.save2(rname, rdata, dnsrr.rtype, t, ttl, detector, struct.pack('>H', dns.header.id), getattr(dns.header, 'ad', 0), dns.header.aa)
self.save3(rname, rdata, dnsrr.rtype, t, ttl, detector, struct.pack('>H', dns.header.id), getattr(dns.header, 'ad', 0), dns.header.aa)
except (dnslib.dns.DNSError, dnslib.buffer.BufferError): # Truncated DNS
pass
......@@ -147,36 +145,6 @@ class PacketParser(Process):
return True
return False
def save_negative(self, qname, rtype, source, reply, timestamp):
"""Mark down a negative DNS reply such as NODATA or NXDOMAIN or error"""
try:
with self.conn.cursor() as cursor:
qname_g = str(qname).lower()
# Perform IDNA validation and conversion to the U-label format
try:
qname_u = idna.decode(qname_g) if qname_g.find('xn--') != -1 else qname_g
except (idna.core.IDNAError, UnicodeError, IndexError):
return
args = (qname_u, rtype, source, reply, timestamp, timestamp, 1)
smt = psql.SQL('INSERT INTO negatives AS ORIG (qname, rtype, source, reply, time_first, time_last, count) '
'VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT (qname, rtype, source, reply) DO UPDATE '
'SET count = ORIG.count + 1, time_last = GREATEST(ORIG.time_last, EXCLUDED.time_last), '
'time_first = LEAST(ORIG.time_first, EXCLUDED.time_first) RETURNING id')
cursor.execute(smt, args)
nid = cursor.fetchall()[0]
args = (nid, timestamp)
smt = psql.SQL('INSERT INTO negatives_details (id, time) VALUES (%s, %s) ON CONFLICT DO NOTHING RETURNING id')
cursor.execute(smt, args)
if not cursor.fetchall(): # `ON CONFLICT DO NOTHING RETURNING` only returns value in case of no conflict
raise psycopg2.errors.UniqueViolation
self.conn.commit()
except psycopg2.errors.UniqueViolation: # Importing records duplicately (unusual in production)
self.conn.rollback()
except ValueError: # A rare decoding error in dnslib, name contains a 0x00 character
self.conn.rollback()
except (psycopg2.InternalError, psycopg2.ProgrammingError) as exc:
raise RuntimeError("save_negative(): Unrecoverable error.") from exc
def save_negative3(self, qname, rtype, source, reply, timestamp):
"""Mark down a negative DNS reply such as NODATA or NXDOMAIN or error"""
try:
......@@ -207,66 +175,6 @@ class PacketParser(Process):
except (psycopg2.InternalError, psycopg2.ProgrammingError) as exc:
raise RuntimeError("save_negative3(): Unrecoverable error.") from exc
def save2(self, rname, rdata, rtype, timestamp, ttl, source, transaction, authen, author):
"""Performs INSERT/UPDATE of DNS records"""
try:
with self.conn.cursor() as cursor:
rname_generic = rname.lower()
if rtype in [1, 28]: # A / AAAA
tbl = psql.Identifier('ips')
dtbl = psql.Identifier('ips_details')
lcol = psql.Identifier('domain')
rcol = psql.Identifier('ip')
elif rtype in self.allowed_rtypes:
tbl = psql.Identifier('texts')
dtbl = psql.Identifier('texts_details')
lcol = psql.Identifier('lvalue')
rcol = psql.Identifier('rvalue')
rdata = rdata.lower()
if rtype == 15: # MX - split priority
try:
priority, rdata = rdata.split()
except ValueError: # Priority not provided
priority = None
else:
return
# Perform IDNA validation and conversion to the U-label format
try:
rname_u = idna.decode(rname_generic) if rname_generic.find('xn--') != -1 else rname_generic
rdata_u = idna.decode(rdata) if rdata.find('xn--') != -1 else rdata
except (idna.core.IDNAError, UnicodeError, IndexError):
return
# Save the overview information
args = (rname_u, rdata_u, source, rtype, 1, timestamp, timestamp)
smt = psql.SQL('INSERT INTO {tbl} AS ORIG ({lcol}, {rcol}, source, rtype, count, time_first, time_last) '
'VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT ({lcol}, source, rtype, {rcol}) DO UPDATE '
'SET count = ORIG.count + 1, time_last = GREATEST(ORIG.time_last, EXCLUDED.time_last), '
'time_first = LEAST(ORIG.time_first, EXCLUDED.time_first) RETURNING id').format(tbl=tbl, lcol=lcol, rcol=rcol)
cursor.execute(smt, args)
mapid = cursor.fetchall()[0]
# Decode 0x20 transaction identity bits
if rname != rname_generic:
x20ri = int(''.join(['1' if ('a' <= char <= 'z') else '0' if ('A' <= char <= 'Z') else '' for char in rname]), 2)
x20r = x20ri.to_bytes(math.ceil(x20ri.bit_length()/8), sys.byteorder)
else:
x20r = None
# Write details
args = (mapid, timestamp, ttl, transaction, x20r, str(authen) + str(author), json.dumps({'priority': priority}) if rtype == 15 else None)
smt = psql.SQL('INSERT INTO {dtbl} (id, time, ttl, transactionid, x20randomness, authenticated_authoritative, rem) '
'VALUES (%s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING RETURNING id').format(dtbl=dtbl)
cursor.execute(smt, args)
if not cursor.fetchall(): # `ON CONFLICT DO NOTHING RETURNING` only returns value in case of no conflict
raise psycopg2.errors.UniqueViolation
self.conn.commit()
except psycopg2.errors.NumericValueOutOfRange: # Erroneous TTLs
self.conn.rollback()
except psycopg2.errors.UniqueViolation: # Importing records duplicately (unusual in production)
self.conn.rollback()
except ValueError: # A rare decoding error in dnslib, name contains a 0x00 character
self.conn.rollback()
except (psycopg2.InternalError, psycopg2.ProgrammingError) as exc:
raise RuntimeError("save2(): Unrecoverable error.") from exc
def save3(self, rname, rdata, rtype, timestamp, ttl, source, transaction, authen, author):
"""Performs INSERT/UPDATE of DNS records"""
try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment