Skip to content
Snippets Groups Projects
Select Git revision
  • abef56479a4f2592b79d1b901b7525f01eb7e598
  • master default protected
  • rednatco-v2
  • rednatco
  • test
  • ntc-tube-uniform-color
  • ntc-tube-missing-atoms
  • restore-vertex-array-per-program
  • watlas2
  • dnatco_new
  • cleanup-old-nodejs
  • webmmb
  • fix_auth_seq_id
  • update_deps
  • ext_dev
  • ntc_balls
  • nci-2
  • plugin
  • bugfix-0.4.5
  • nci
  • servers
  • v0.5.0-dev.1
  • v0.4.5
  • v0.4.4
  • v0.4.3
  • v0.4.2
  • v0.4.1
  • v0.4.0
  • v0.3.12
  • v0.3.11
  • v0.3.10
  • v0.3.9
  • v0.3.8
  • v0.3.7
  • v0.3.6
  • v0.3.5
  • v0.3.4
  • v0.3.3
  • v0.3.2
  • v0.3.1
  • v0.3.0
41 results

webpack.config.js

Blame
  • ip.py 4.80 KiB
    # -*- coding: utf-8 -*-
    
    """ Filtering and anonymisation cogs for DeadBeat """
    
    from __future__ import absolute_import, division, print_function, unicode_literals
    
    import ipranges
    import shlex
    import logging
    from . import conf
    from operator import itemgetter
    from .movement import Cog, basestring
    
    __all__ = ["cast_ip_range_list", "anonymise_base_config", "IdeaAnonymise", "filter_base_config", "FilterBase", "IPFilter"]
    
    def cast_ip_range_list(l):
        """ Translate comma separated list of IP addresses into list of ipranges objects. """
        if isinstance(l, basestring):
            lex = shlex.shlex(l, posix=True)
            lex.whitespace=','
            lex.whitespace_split=True
            strings = list(lex)
        else:
            strings = l
        return [ipranges.from_str(r.strip()) for r in strings if r]
    
    #: Anonymisation configuration insert
    anonymise_base_config = (
        conf.cfg_item(
            "sources", cast_ip_range_list,
            "List of source ranges, which will be reported instead of IP addresses within them", default=()),
        conf.cfg_item(
            "sources_exclude", cast_ip_range_list,
            "List of source ranges, which will not be anonymised",
            default=()),
        conf.cfg_item(
            "targets", cast_ip_range_list,
            "List of target ranges, which will be reported instead of IP addresses within them", default=()),
        conf.cfg_item(
            "targets_exclude", cast_ip_range_list,
            "List of target ranges, which will not be anonymised",
            default=())
    )
    
    class IdeaAnonymise(Cog):
        """ Cog for anonymisation of Idea events """
    
        def __init__(self, sources=(), targets=(), sources_exclude=(), targets_exclude=()):
            """ Initialize IdeaAnonymise.
    
            :param sources: List of IP ranges to anonymise Source.IP? to.
            :param targets: List of IP ranges to anonymise Target.IP? to.
            :param sources_exclude: List of IP ranges to NOT anonymise in Source.IP?.
            :param targets_exclude: List of IP ranges to NOT anonymise in Target.IP?.
            """
            self.sources = sources
            self.targets = targets
            self.sources_exclude = sources_exclude
            self.targets_exclude = targets_exclude
    
        def __call__(self, idea):
            """ Main pipeline event handler """
            anonymised = False
            for node, ranges, excludes in (
                    ("Source", self.sources, self.sources_exclude),
                    ("Target", self.targets, self.targets_exclude)):
                for net in idea.get(node, []):
                    for key in ("IP4", "IP6"):
                        ips = net.get(key, [])
                        for i in range(len(ips)):
                            ip = ips[i]
                            for te in excludes:
                                if ip in te:
                                    break
                            else:
                                for tr in ranges:
                                    if ip in tr:
                                        ips[i] = tr
                                        anonymised = True
                                        break
                    if anonymised:
                        type_ = net.get("Type", [])
                        if "Anonymised" not in type_:
                            type_.append("Anonymised")
                        net["Type"] = type_
            return (idea,)
    
    
    #: Filtering configuration insert
    filter_base_config = (
        conf.cfg_item(
            "sources", cast_ip_range_list,
            "List of source ranges, which will not be processed", default=()),
        conf.cfg_item(
            "targets", cast_ip_range_list,
            "List of target ranges, which will not be processes", default=())
    )
    
    class FilterBase(Cog):
        """ Base cog for creating hard filters. """
    
        def __init__(self, train, id_get=None):
            """ Initialize FilterBase.
    
                :param train: Train singleton.
                :param id_get: ID getter.
            """
            self.train = train
            self.id_get = id_get or itemgetter("ID")
    
        def __call__(self, data):
            """ Main pipeline event handler. """
            if self.condition(data):
                event_id = self.id_get(data)
                if event_id is not None:
                    self.train.notify("event_done", event_id)
                logging.debug("%s: dropped", event_id)
                return None
            else:
                return (data,)
    
    
    class IPFilter(FilterBase):
        """ Cog for dropping unwanted event data based on IP ranges. """
    
        def __init__(self, train, id_get=None, ranges=None, item_get=None):
            """ Initialize IPFilter.
    
            :param ranges: List of IP ranges.
            :param item_get: Getter for data to consult.
            """
            super(IPFilter, self).__init__(train, id_get)
            self.ranges = ranges or []
            self.item_get = item_get or itemgetter("ip")
    
        def condition(self, data):
            ip = self.item_get(data)
            for r in self.ranges:
                if ip in r:
                    return True
            return False