Skip to content
Snippets Groups Projects
Commit 284eafc0 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Add notification on event filtered

parent 899fcb2d
No related branches found
No related tags found
No related merge requests found
......@@ -6,6 +6,7 @@ from __future__ import absolute_import, division, print_function, unicode_litera
import ipranges
import shlex
import logging
from . import conf
from operator import itemgetter
from .movement import Cog, basestring
......@@ -96,22 +97,40 @@ filter_base_config = (
)
class FilterBase(Cog):
""" Base cog for creating hard filters. """
def __init__(self, train, id_get=None):
""" Initialize FilterBase.
:param train: Train singleton.
:param id_get: ID getter.
"""
self.train = train
self.id_get = id_get or itemgetter("ID")
def __call__(self, data):
""" Main pipeline event handler. """
return (data,) if not self.condition(data) else None
if self.condition(data):
event_id = self.id_get(data)
if event_id is not None:
self.train.notify("event_done", event_id)
logging.debug("%s: dropped", event_id)
return None
else:
return (data,)
class IPFilter(FilterBase):
""" Cog for dropping unwanted event data based on IP ranges. """
def __init__(self, ranges, item_get):
def __init__(self, train, id_get=None, ranges=None, item_get=None):
""" Initialize IPFilter.
:param ranges: List of IP ranges.
:param item_get: Getter for data to consult.
"""
self.ranges = ranges
super(IPFilter, self).__init__(train, id_get)
self.ranges = ranges or []
self.item_get = item_get or itemgetter("ip")
def condition(self, data):
......
......@@ -159,8 +159,8 @@ def main():
#~ watcher = fs.FilerSupply(train.esc, directory=cfg.input_dir)
reg_list = text.LinearRegexpLexicon(rlist = regexp_rules, rname_set = movement.itemsetter("attack_type"))
normalize = twist.Normalize(typedef = labrea_normalize, tname = "LabreaData")
filter_source = ip.IPFilter(ranges = cfg.filter.sources, item_get = operator.itemgetter("src_ip"))
filter_target = ip.IPFilter(ranges = cfg.filter.targets, item_get = operator.itemgetter("tgt_ip"))
filter_source = ip.IPFilter(train, ranges = cfg.filter.sources, item_get = operator.itemgetter("src_ip"))
filter_target = ip.IPFilter(train, ranges = cfg.filter.targets, item_get = operator.itemgetter("tgt_ip"))
aggregate = WindowContextMgr(train, window=10, timeout=15)
const_enrich = text.StringLexicon(metadata_lexicon)
convert = Idealise(node_name = cfg.client_name, test = cfg.test)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment