Skip to content
Snippets Groups Projects
Commit 8e66bbf7 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Convert regex and CSV readers/writers to itemgetter/itemsetter concept

parent 58f35ed0
No related branches found
No related tags found
No related merge requests found
......@@ -4,29 +4,30 @@ from __future__ import absolute_import, division, print_function, unicode_litera
import unittest
from copy import deepcopy
from .text import RegexpLexicon, LinearRegexpLexicon, CSVParse, JSONParse
from .movement import itemsetter
class TestRegexp(unittest.TestCase):
redef = [
("assignment",
r"\s*(\w+)\s*=\s*(\S*)\s*",
("name", "value"),
(itemsetter("name"), itemsetter("value")),
"PATH = /bin"),
("cronspec",
r"\s*([0-9*]+)\s+([0-9*]+)\s+([0-9*]+)\s+([0-9*]+)\s+([0-9*]+)\s+(.*)",
("minute", "hour", "day", "month", "dow", "command"),
(itemsetter("minute"), itemsetter("hour"), itemsetter("day"), itemsetter("month"), itemsetter("dow"), itemsetter("command")),
"5 0 * * * true"),
("connect",
r'\s*([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*',
("timestamp", "src_ip", "src_port", "tgt_ip", "tgt_port"),
(itemsetter("timestamp"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")),
"1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"),
("synack",
r'\s*([0-9]+) Inbound SYN/ACK: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*',
("timestamp", "src_ip", "src_port", "tgt_ip", "tgt_port"),
(itemsetter("timestamp"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")),
"1493037991 Inbound SYN/ACK: 185.62.190.15 21001 -> 195.113.252.222 15584"),
("ping",
r'\s*([0-9]+) Responded to a Ping: ([^ ]+) -> ([^ ]+).*',
("timestamp", "src_ip", "tgt_ip"),
(itemsetter("timestamp"), itemsetter("src_ip"), itemsetter("tgt_ip")),
"1493035442 Responded to a Ping: 88.86.96.25 -> 195.113.253.87 *")
]
......@@ -105,7 +106,7 @@ class TestCSV(unittest.TestCase):
def setUp(self):
self.maxDiff = 2000
self.matcher = CSVParse(("first", "second", "third", "fourth"))
self.matcher = CSVParse((itemsetter("first"), itemsetter("second"), itemsetter("third"), itemsetter("fourth")))
self.example_lines = [{"input": line.strip()} for line in self.example.split("\n")]
for orig, shouldbe in zip(self.example_lines, self.example_shouldbe):
if shouldbe:
......
......@@ -62,7 +62,7 @@ class LinearRegexpLexicon(Cog):
def __init__(self, rlist, str_get=None, rname_set=None, flags=0):
""" Initialize LinearRegexpLexicon.
:param rlist: Ruleset of names, expressions, field names and examples
:param rlist: Ruleset of names, expressions, field setters and examples
:param str_get: Getter for matched data
:param rname_set: Setter for rule name
:param flags: Regular expression library flags (see :py:mod:`re`)
......@@ -73,7 +73,7 @@ class LinearRegexpLexicon(Cog):
text.rrule(
name="connect",
regex=r'([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*',
fields=("DetectTime", "src_ip", "src_port", "tgt_ip", "tgt_port"),
fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")),
example="1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"),
...
)
......@@ -90,9 +90,11 @@ class LinearRegexpLexicon(Cog):
for name, regex, fields, dummy in self.regexps:
match = regex.match(line, self.flags)
if match:
res = dict(zip(fields, match.groups()))
res = self.rname_set(res, name)
data.update(res)
#~ res = dict(zip(fields, match.groups()))
for setter, group in zip(fields, match.groups()):
data = setter(data, group)
data = self.rname_set(data, name)
#~ data.update(res)
return (data,)
return None
......@@ -107,7 +109,7 @@ class RegexpLexicon(Cog):
def __init__(self, rlist, str_get=None, rname_set=None, flags=0):
""" Initialize RegexpLexicon.
:param rlist: Ruleset of names, expressions, field names and examples
:param rlist: Ruleset of names, expressions, field setters and examples
:param str_get: Getter for matched data
:param rname_set: Setter for rule name
:param flags: Regular expression library flags (see :py:mod:`re`)
......@@ -118,7 +120,7 @@ class RegexpLexicon(Cog):
text.rrule(
name="connect",
regex=r'([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*',
fields=("DetectTime", "src_ip", "src_port", "tgt_ip", "tgt_port"),
fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")),
example="1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"),
...
)
......@@ -151,25 +153,28 @@ class RegexpLexicon(Cog):
while mg[i] is None:
i += 1
groups, rname = self.indexgroup[i]
res = dict(zip(groups, mg[i+1:]))
res = self.rname_set(res, rname)
data.update(res)
#~ res = dict(zip(groups, mg[i+1:]))
for setter, group in zip(groups, mg[i+1:]):
data = setter(data, group)
data = self.rname_set(data, rname)
#~ data.update(res)
return (data,)
class CSVParse(Cog):
""" Cog for parsing CSV lines. """
def __init__(self, fieldnames, restkey=None, restval=None, dialect=None, str_get=None, *args, **kwargs):
def __init__(self, fieldsetters, restkey=None, restval=None, dialect=None, str_get=None, *args, **kwargs):
""" Initialize CSVParse.
Expects `data` to be dict-like object and fields will be set from `fieldnames`.
:param str_get: Getter for data to be parsed
:param fieldsetters: sequence, whose elements are itemsetters, associated with the fields
of the input data in order. Note that fieldsetters is analogy to fieldnames of csv.DictReader,
but have to be not only names, but callable itemsetters.
For other arguments see :py:obj:`csv.DictReader`
"""
self.fieldnames = fieldnames
self.fieldsetters = fieldsetters
self.restkey = restkey
self.restval = restval
self.dialect = dialect
......@@ -194,43 +199,35 @@ class CSVParse(Cog):
if not row:
return None
# Adapted from csv.py, as original DictReader may try reading more than we have and block.
d = dict(zip(self.fieldnames, row))
lf = len(self.fieldnames)
for setter, field in zip(self.fieldsetters, row):
data = setter(data, field)
lf = len(self.fieldsetters)
lr = len(row)
if lf < lr:
d[self.restkey] = row[lf:]
data = self.restkey_setter(data, row[lf:])
elif lf > lr:
for key in self.fieldnames[lr:]:
d[key] = self.restval
data.update(d)
for setter in self.fieldsetters[lr:]:
data = setter(data, self.restval)
return (data,)
class CSVMarshall(Cog):
""" Cog for writing CSV lines.
Expects `data` to be dict-like object and fieldnames will be fetched as keys.
"""
""" Cog for writing CSV lines. """
def __init__(self, fieldnames, restval="", extrasaction="ignore", dialect="excel", data_set=None, *args, **kwargs):
def __init__(self, fieldgetters, dialect="excel", data_set=None, *args, **kwargs):
""" Initialize CSVMarshall.
:param fieldgetters: Sequence of itemgetters that identify the order in which the values are written to CSV line
:param data_set: Setter for the resulting data
For other orguments see :py:obj:`csv.DictWriter`
For other orguments see :py:obj:`csv.writer`
"""
self.fieldnames = fieldnames
self.restval = restval
self.extrasaction = extrasaction
self.fieldgetters = fieldgetters
self.dialect = dialect
self.data_set = data_set or itemsetter("output")
self.args = args
self.kwargs = kwargs
self.writer = csv.DictWriter(
self, self.fieldnames, restval=self.restval,
extrasaction=self.extrasaction, dialect=self.dialect,
lineterminator="",
*args, **kwargs)
self.writer = csv.writer(self, dialect=self.dialect, *args, **kwargs)
def write(self, s):
""" Helper function to make self behave as fake filelike object for DictWriter. """
......@@ -238,7 +235,8 @@ class CSVMarshall(Cog):
def __call__(self, data):
""" Main pipeline event handler. """
self.writer.writerow(data)
arr = [getter(data) for getter in self.fieldgetters]
self.writer.writerow(arr)
data = self.data_set(data, self.output)
return (data,)
......
......@@ -4,6 +4,8 @@
from __future__ import absolute_import, division, print_function, unicode_literals
import sys
from deadbeat import movement, log, conf, dns, text, fs
from deadbeat.movement import itemsetter
from operator import itemgetter
name = "IP_Resolve"
description = "Proof-of-concept bulk IP resolver"
......@@ -22,9 +24,9 @@ def main():
log.configure(**cfg.log)
train = movement.Train()
file_watcher = fs.FileWatcherSupply(train.esc, filenames=cfg.input_files, tail=True)
csv_parse = text.CSVParse(fieldnames=("ip",))
csv_parse = text.CSVParse(fieldsetters=(itemsetter("ip"),))
resolve = dns.IPtoPTR(train.esc)
serialise = text.CSVMarshall(fieldnames=("ip", "hostnames"))
serialise = text.CSVMarshall(fieldgetters=(itemgetter("ip"), itemgetter("hostnames")))
output = fs.LineFileDrain(train=train, path=cfg.output_file, timestamp=False, flush=False)
train.update(movement.train_line(file_watcher, csv_parse, resolve, serialise, output))
train()
......
......@@ -8,7 +8,7 @@ import operator
import ipranges
import idea.lite
from deadbeat import movement, log, conf, daemon, ip, text, fs, twist
from deadbeat.movement import catch
from deadbeat.movement import catch, itemsetter
name = "Warden_LaBrea"
description = "Proof-of-concept LaBrea Warden connector"
......@@ -35,17 +35,17 @@ regexp_rules = (
text.re_rule(
name="connect",
regex=r'([0-9]+) Initial Connect - tarpitting: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*',
fields=("DetectTime", "src_ip", "src_port", "tgt_ip", "tgt_port"),
fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")),
example="1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898"),
text.re_rule(
name="synack",
regex=r'([0-9]+) Inbound SYN/ACK: ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*',
fields=("DetectTime", "src_ip", "src_port", "tgt_ip", "tgt_port"),
fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("src_port"), itemsetter("tgt_ip"), itemsetter("tgt_port")),
example="1493037991 Inbound SYN/ACK: 185.62.190.15 21001 -> 195.113.252.222 15584"),
text.re_rule(
name="ping",
regex=r'([0-9]+) Responded to a Ping: ([^ ]+) -> ([^ ]+).*',
fields=("DetectTime", "src_ip", "tgt_ip"),
fields=(itemsetter("DetectTime"), itemsetter("src_ip"), itemsetter("tgt_ip")),
example="1493035442 Responded to a Ping: 88.86.96.25 -> 195.113.253.87 *")
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment