Skip to content
Snippets Groups Projects
Select Git revision
  • master
1 result

Api.inc

Blame
  • ip.py 4.80 KiB
    # -*- coding: utf-8 -*-
    
    """ Filtering and anonymisation cogs for DeadBeat """
    
    from __future__ import absolute_import, division, print_function, unicode_literals
    
    import ipranges
    import shlex
    import logging
    from . import conf
    from operator import itemgetter
    from .movement import Cog, basestring
    
    __all__ = ["cast_ip_range_list", "anonymise_base_config", "IdeaAnonymise", "filter_base_config", "FilterBase", "IPFilter"]
    
    def cast_ip_range_list(l):
        """ Translate comma separated list of IP addresses into list of ipranges objects. """
        if isinstance(l, basestring):
            lex = shlex.shlex(l, posix=True)
            lex.whitespace=','
            lex.whitespace_split=True
            strings = list(lex)
        else:
            strings = l
        return [ipranges.from_str(r.strip()) for r in strings if r]
    
    #: Anonymisation configuration insert
    anonymise_base_config = (
        conf.cfg_item(
            "sources", cast_ip_range_list,
            "List of source ranges, which will be reported instead of IP addresses within them", default=()),
        conf.cfg_item(
            "sources_exclude", cast_ip_range_list,
            "List of source ranges, which will not be anonymised",
            default=()),
        conf.cfg_item(
            "targets", cast_ip_range_list,
            "List of target ranges, which will be reported instead of IP addresses within them", default=()),
        conf.cfg_item(
            "targets_exclude", cast_ip_range_list,
            "List of target ranges, which will not be anonymised",
            default=())
    )
    
    class IdeaAnonymise(Cog):
        """ Cog for anonymisation of Idea events """
    
        def __init__(self, sources=(), targets=(), sources_exclude=(), targets_exclude=()):
            """ Initialize IdeaAnonymise.
    
            :param sources: List of IP ranges to anonymise Source.IP? to.
            :param targets: List of IP ranges to anonymise Target.IP? to.
            :param sources_exclude: List of IP ranges to NOT anonymise in Source.IP?.
            :param targets_exclude: List of IP ranges to NOT anonymise in Target.IP?.
            """
            self.sources = sources
            self.targets = targets
            self.sources_exclude = sources_exclude
            self.targets_exclude = targets_exclude
    
        def __call__(self, idea):
            """ Main pipeline event handler """
            anonymised = False
            for node, ranges, excludes in (
                    ("Source", self.sources, self.sources_exclude),
                    ("Target", self.targets, self.targets_exclude)):
                for net in idea.get(node, []):
                    for key in ("IP4", "IP6"):
                        ips = net.get(key, [])
                        for i in range(len(ips)):
                            ip = ips[i]
                            for te in excludes:
                                if ip in te:
                                    break
                            else:
                                for tr in ranges:
                                    if ip in tr:
                                        ips[i] = tr
                                        anonymised = True
                                        break
                    if anonymised:
                        type_ = net.get("Type", [])
                        if "Anonymised" not in type_:
                            type_.append("Anonymised")
                        net["Type"] = type_
            return (idea,)
    
    
    #: Filtering configuration insert
    filter_base_config = (
        conf.cfg_item(
            "sources", cast_ip_range_list,
            "List of source ranges, which will not be processed", default=()),
        conf.cfg_item(
            "targets", cast_ip_range_list,
            "List of target ranges, which will not be processes", default=())
    )
    
    class FilterBase(Cog):
        """ Base cog for creating hard filters. """
    
        def __init__(self, train, id_get=None):
            """ Initialize FilterBase.
    
                :param train: Train singleton.
                :param id_get: ID getter.
            """
            self.train = train
            self.id_get = id_get or itemgetter("ID")
    
        def __call__(self, data):
            """ Main pipeline event handler. """
            if self.condition(data):
                event_id = self.id_get(data)
                if event_id is not None:
                    self.train.notify("event_done", event_id)
                logging.debug("%s: dropped", event_id)
                return None
            else:
                return (data,)
    
    
    class IPFilter(FilterBase):
        """ Cog for dropping unwanted event data based on IP ranges. """
    
        def __init__(self, train, id_get=None, ranges=None, item_get=None):
            """ Initialize IPFilter.
    
            :param ranges: List of IP ranges.
            :param item_get: Getter for data to consult.
            """
            super(IPFilter, self).__init__(train, id_get)
            self.ranges = ranges or []
            self.item_get = item_get or itemgetter("ip")
    
        def condition(self, data):
            ip = self.item_get(data)
            for r in self.ranges:
                if ip in r:
                    return True
            return False