From 2d92abeb5e2015da22eb0659955fea06886009ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavel=20K=C3=A1cha?= <ph@cesnet.cz> Date: Wed, 3 May 2017 14:25:14 +0200 Subject: [PATCH] Reordered source, pep8 cleanup, fixed logging --- .../connectors/hp-labrea/labrea-idea.py | 458 ++++++++++-------- 1 file changed, 258 insertions(+), 200 deletions(-) diff --git a/warden3/contrib/connectors/hp-labrea/labrea-idea.py b/warden3/contrib/connectors/hp-labrea/labrea-idea.py index 78d3fcf..46050f1 100755 --- a/warden3/contrib/connectors/hp-labrea/labrea-idea.py +++ b/warden3/contrib/connectors/hp-labrea/labrea-idea.py @@ -6,7 +6,6 @@ import sys import re import time import optparse -import itertools import signal import contextlib import uuid @@ -14,86 +13,16 @@ import json import socket import resource import atexit +import logging +import logging.handlers import os.path as pth +from itertools import izip from collections import namedtuple try: from collections import OrderedDict except ImportError: from ordereddict import OrderedDict -class FileWatcher(object): - - def __init__(self, filename, tail=True): - self.filename = filename - self.open() - self.line_buffer = "" - if tail and self.f: - self.f.seek(0, os.SEEK_END) - - def open(self): - try: - self.f = open(self.filename, "r") - st = os.fstat(self.f.fileno()) - self.inode, self.size = st.st_ino, st.st_size - except IOError: - self.f = None - self.inode = -1 - self.size = -1 - - def _check_reopen(self): - try: - st = os.stat(self.filename) - cur_inode, cur_size = st.st_ino, st.st_size - except OSError as e: - cur_inode = -1 - cur_size = -1 - if cur_inode != self.inode or cur_size < self.size: - self.close() - self.open() - - def readline(self): - if not self.f: - self.open() - if not self.f: - return self.line_buffer - res = self.f.readline() - if not res: - self._check_reopen() - if not self.f: - return self.line_buffer - res = self.f.readline() - if not res.endswith("\n"): - self.line_buffer += res - else: - res = self.line_buffer + res - self.line_buffer = "" - return res - - def close(self): - try: - if self.f: - self.f.close() - except IOError: - pass - self.inode = -1 - self.size = -1 - - def __repr__(self): - return '%s("%s")' % (type(self).__name__, self.filename) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - return False - - def __iter__(self): - return self - - def next(self): - return self.readline() - class WindowContextMgr(object): @@ -121,7 +50,7 @@ class WindowContextMgr(object): del self.first_update_queue[ctx] del self.last_update_queue[ctx] return aggr_events - + def process(self, event=None, timestamp=None): self.update_timestamp = timestamp @@ -144,7 +73,6 @@ class WindowContextMgr(object): return aggr_events - def close(self): aggr_events = [] for context in self.contexts.values(): @@ -164,11 +92,11 @@ class PingContextMgr(WindowContextMgr): def ctx_create(self, event): ctx = dict( - src_ip = event.src_ip, - tgt_ips = set([event.tgt_ip]), - count = 1, - first_update = self.update_timestamp, - last_update = self.update_timestamp + src_ip=event.src_ip, + tgt_ips=set([event.tgt_ip]), + count=1, + first_update=self.update_timestamp, + last_update=self.update_timestamp ) return ctx @@ -179,14 +107,14 @@ class PingContextMgr(WindowContextMgr): def ctx_close(self, ctx): return self.ideagen.gen_idea( - src = ctx["src_ip"], - src_ports = None, - targets = [(ip, []) for ip in ctx["tgt_ips"]], - detect_time = self.update_timestamp, - event_time = ctx["first_update"], - cease_time = ctx["last_update"], - count = ctx["count"], - template = "ping" + src=ctx["src_ip"], + src_ports=None, + targets=[(ip, []) for ip in ctx["tgt_ips"]], + detect_time=self.update_timestamp, + event_time=ctx["first_update"], + cease_time=ctx["last_update"], + count=ctx["count"], + template="ping" ) @@ -197,12 +125,12 @@ class ConnectContextMgr(WindowContextMgr): def ctx_create(self, event): ctx = dict( - src_ip = event.src_ip, - src_ports = set([event.src_port]), - tgt_ips_ports = {event.tgt_ip: set([event.tgt_port])}, - count = 1, - first_update = self.update_timestamp, - last_update = self.update_timestamp + src_ip=event.src_ip, + src_ports=set([event.src_port]), + tgt_ips_ports={event.tgt_ip: set([event.tgt_port])}, + count=1, + first_update=self.update_timestamp, + last_update=self.update_timestamp ) return ctx @@ -215,14 +143,14 @@ class ConnectContextMgr(WindowContextMgr): def ctx_close(self, ctx): return self.ideagen.gen_idea( - src = ctx["src_ip"], - src_ports = ctx["src_ports"], - targets = ctx["tgt_ips_ports"].items(), - detect_time = self.update_timestamp, - event_time = ctx["first_update"], - cease_time = ctx["last_update"], - count = ctx["count"], - template = "connect" + src=ctx["src_ip"], + src_ports=ctx["src_ports"], + targets=ctx["tgt_ips_ports"].items(), + detect_time=self.update_timestamp, + event_time=ctx["first_update"], + cease_time=ctx["last_update"], + count=ctx["count"], + template="connect" ) @@ -233,17 +161,91 @@ class InboundContextMgr(ConnectContextMgr): def ctx_close(self, ctx): return self.ideagen.gen_idea( - src = ctx["src_ip"], - src_ports = ctx["src_ports"], - targets = ctx["tgt_ips_ports"].items(), - detect_time = self.update_timestamp, - event_time = ctx["first_update"], - cease_time = ctx["last_update"], - count = ctx["count"], - template = "synack" + src=ctx["src_ip"], + src_ports=ctx["src_ports"], + targets=ctx["tgt_ips_ports"].items(), + detect_time=self.update_timestamp, + event_time=ctx["first_update"], + cease_time=ctx["last_update"], + count=ctx["count"], + template="synack" ) +class FileWatcher(object): + + def __init__(self, filename, tail=True): + self.filename = filename + self.open() + self.line_buffer = "" + if tail and self.f: + self.f.seek(0, os.SEEK_END) + + def open(self): + try: + self.f = open(self.filename, "r") + st = os.fstat(self.f.fileno()) + self.inode, self.size = st.st_ino, st.st_size + except IOError: + self.f = None + self.inode = -1 + self.size = -1 + + def _check_reopen(self): + try: + st = os.stat(self.filename) + cur_inode, cur_size = st.st_ino, st.st_size + except OSError as e: + cur_inode = -1 + cur_size = -1 + if cur_inode != self.inode or cur_size < self.size: + self.close() + self.open() + + def readline(self): + if not self.f: + self.open() + if not self.f: + return self.line_buffer + res = self.f.readline() + if not res: + self._check_reopen() + if not self.f: + return self.line_buffer + res = self.f.readline() + if not res.endswith("\n"): + self.line_buffer += res + else: + res = self.line_buffer + res + self.line_buffer = "" + return res + + def close(self): + try: + if self.f: + self.f.close() + except IOError: + pass + self.inode = -1 + self.size = -1 + + def __repr__(self): + return '%s("%s")' % (type(self).__name__, self.filename) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + + def __iter__(self): + return self + + def next(self): + return self.readline() + + class IdeaGen(object): def __init__(self, name, test=False): @@ -319,7 +321,7 @@ class IdeaGen(object): "Target": itargets, "Node": [inode] } - return json.dumps(idea, ensure_ascii=True) + return idea class Filer(object): @@ -366,9 +368,9 @@ class Filer(object): def daemonize( - work_dir = None, chroot_dir = None, - umask = None, uid = None, gid = None, - pidfile = None, files_preserve = [], signals = {}): + work_dir=None, chroot_dir=None, + umask=None, uid=None, gid=None, + pidfile=None, files_preserve=[], signals={}): # Dirs, limits, users if chroot_dir is not None: os.chdir(chroot_dir) @@ -382,13 +384,13 @@ def daemonize( if uid is not None: os.setuid(uid) # Doublefork, split session - if os.fork()>0: + if os.fork() > 0: os._exit(0) try: os.setsid() except OSError: pass - if os.fork()>0: + if os.fork() > 0: os._exit(0) # Setup signal handlers for (signum, handler) in signals.items(): @@ -396,7 +398,7 @@ def daemonize( # Close descriptors descr_preserve = set(f.fileno() for f in files_preserve) maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] - if maxfd==resource.RLIM_INFINITY: + if maxfd == resource.RLIM_INFINITY: maxfd = 65535 for fd in range(maxfd, 3, -1): # 3 means omit stdin, stdout, stderr if fd not in descr_preserve: @@ -410,10 +412,11 @@ def daemonize( os.dup2(devnull, fd) # PID file if pidfile is not None: - pidd = os.open(pidfile, os.O_RDWR|os.O_CREAT|os.O_EXCL|os.O_TRUNC) + pidd = os.open(pidfile, os.O_RDWR | os.O_CREAT | os.O_EXCL | os.O_TRUNC) os.write(pidd, str(os.getpid())+"\n") os.close(pidd) # Define and setup atexit closure + @atexit.register def unlink_pid(): try: @@ -422,104 +425,153 @@ def daemonize( pass +def save_events(aggr, filer): + for event in aggr: + f, name = filer.create_unique_file() + with f: + f.write(json.dumps(event, ensure_ascii=True)) + filer.publish_file(name) + logging.info( + "Saved %s %s \"%s\" (%d) as file %s" % ( + event["ID"], "+".join(event["Category"]), event["Description"], event["ConnCount"], name)) + + +RE_LIST = ( + # 1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898 + # 1493037991 Inbound SYN/ACK: 185.62.190.15 21001 -> 195.113.252.222 15584 + ( + re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*'), + namedtuple("connect_tuple", ("timestamp", "message", "src_ip", "src_port", "tgt_ip", "tgt_port")) + ), + # 1493035442 Responded to a Ping: 88.86.96.25 -> 195.113.253.87 * + ( + re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) -> ([^ ]+).*'), + namedtuple("ping_tuple", ("timestamp", "message", "src_ip", "tgt_ip")) + ) +) + + +def match_event(line): + for labrea_re, event_tuple in RE_LIST: + match = labrea_re.match(line) + if match: + return event_tuple(*match.groups()) + logging.info("Unmatched line: \"%s\"" % line.replace("\n", r"\n")) + return None + + def get_args(): optp = optparse.OptionParser( usage="usage: %prog [options] logfile ...", description="Watch LaBrea logfiles and generate Idea events into directory") - optp.add_option("-w", "--window", + optp.add_option( + "-w", "--window", default=900, dest="window", type="int", action="store", help="max detection window (default: %default)") - optp.add_option("-t", "--timeout", + optp.add_option( + "-t", "--timeout", default=300, dest="timeout", type="int", action="store", help="detection timeout (default: %default)") - optp.add_option("-n", "--name", + optp.add_option( + "-n", "--name", default=None, dest="name", type="string", action="store", help="Warden client name") - optp.add_option("--test", + optp.add_option( + "--test", default=False, dest="test", action="store_true", help="Add Test category") - optp.add_option("-o", "--oneshot", + optp.add_option( + "-o", "--oneshot", default=False, dest="oneshot", action="store_true", help="process files and quit (do not daemonize)") - optp.add_option("--poll", + optp.add_option( + "--poll", default=1, dest="poll", type="int", action="store", help="log file polling interval") - optp.add_option("-d", "--dir", + optp.add_option( + "-d", "--dir", default=None, dest="dir", type="string", action="store", help="Target directory (mandatory)"), - optp.add_option("-p", "--pid", + optp.add_option( + "-p", "--pid", default=pth.join("/var/run", pth.splitext(pth.basename(sys.argv[0]))[0] + ".pid"), dest="pid", type="string", action="store", help="create PID file with this name (default: %default)") - optp.add_option("-u", "--uid", + optp.add_option( + "-u", "--uid", default=None, dest="uid", type="int", action="store", help="user id to run under") - optp.add_option("-g", "--gid", + optp.add_option( + "-g", "--gid", default=None, dest="gid", type="int", action="store", help="group id to run under") - optp.add_option("--realtime", + optp.add_option( + "-v", "--verbose", + dest="verbose", + action="store_true", + help="turn on debug logging") + optp.add_option( + "--log", + default="local7", + dest="log", + type="string", + action="store", + help="syslog facility or log file name (default: %default)") + optp.add_option( + "--realtime", default=True, dest="realtime", action="store_true", help="use system time along with log timestamps (default)") - optp.add_option("--norealtime", + optp.add_option( + "--norealtime", dest="realtime", action="store_false", help="don't system time, use solely log timestamps") return optp -# 1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898 -# 1493037991 Inbound SYN/ACK: 185.62.190.15 21001 -> 195.113.252.222 15584 -# 1492790713 Inbound SYN/ACK: 62.210.130.232 80 -> 78.128.253.197 58376 -labrea_re_connect_inbound = re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*') -connect_tuple = namedtuple("connect_tuple", ("timestamp", "message", "src_ip", "src_port", "tgt_ip", "tgt_port")) - -# 1493035442 Responded to a Ping: 88.86.96.25 -> 195.113.253.87 * -labrea_re_ping = re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) -> ([^ ]+).*') -ping_tuple = namedtuple("ping_tuple", ("timestamp", "message", "src_ip", "tgt_ip")) - -re_list = [labrea_re_connect_inbound, labrea_re_ping] -event_list = [connect_tuple, ping_tuple] - running_flag = True reload_flag = False + def terminate_me(signum, frame): global running_flag running_flag = False + def reload_me(signum, frame): global reload_flag reload_flag = True + def main(): global reload_flag optp = get_args() @@ -527,67 +579,73 @@ def main(): if not args or opts.name is None or opts.dir is None: optp.print_help() sys.exit() + + logformat = "%(filename)s[%(process)d] %(message)s" + logger = logging.getLogger() if opts.oneshot: - files = [open(arg) for arg in args] + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter("%(asctime)s " + logformat)) else: - daemonize( - pidfile = opts.pid, - uid = opts.uid, - gid = opts.gid, - signals = { - signal.SIGINT: terminate_me, - signal.SIGTERM: terminate_me, - signal.SIGHUP: reload_me - }) - files = [FileWatcher(arg) for arg in args] - with contextlib.nested(*files): - lines = itertools.izip(*files) + if "/" in opts.log: + handler = logging.handlers.WatchedFileHandler(opts.log) + handler.setFormatter(logging.Formatter("%(asctime)s " + logformat)) + else: + handler = logging.handlers.SysLogHandler(address="/dev/log", facility=opts.log) + handler.setFormatter(logging.Formatter(logformat)) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG if opts.verbose else logging.INFO) + + try: + if opts.oneshot: + signal.signal(signal.SIGINT, terminate_me) + signal.signal(signal.SIGTERM, terminate_me) + files = [open(arg) for arg in args] + else: + daemonize( + pidfile=opts.pid, + uid=opts.uid, + gid=opts.gid, + signals={ + signal.SIGINT: terminate_me, + signal.SIGTERM: terminate_me, + signal.SIGHUP: reload_me + }) + files = [FileWatcher(arg) for arg in args] + ideagen = IdeaGen(name=opts.name, test=opts.test) - contexts = [context(window=opts.window, timeout=opts.timeout, ideagen=ideagen) for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr]] filer = Filer(opts.dir) - for line_set in lines: - for line in line_set: - if line: - line = line.strip() - event = None - for labrea_re, event_tuple in zip(re_list, event_list): - match = labrea_re.match(line) - if match: - event = event_tuple(*match.groups()) - break - aggr = [] - if event is not None: - timestamp = int(event.timestamp) - else: - if opts.realtime: - timestamp = int(time.time()) - else: - timestamp = None - if timestamp is not None: + contexts = [ + context(window=opts.window, timeout=opts.timeout, ideagen=ideagen) + for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr] + ] + + with contextlib.nested(*files): + for line_set in izip(*files): + for line in line_set: + if not line: + break + + event = match_event(line) + + if event or opts.realtime: + timestamp = int(event.timestamp) if event else int(time.time()) for context in contexts: - aggr.extend(context.process(event, timestamp)) - for event in aggr: - f, name = filer.create_unique_file() - with f: - f.write(event) - filer.publish_file(name) - if not running_flag: - break - if reload_flag: - for f in files: - f.close() - f.open() + save_events(context.process(event, timestamp), filer) + + if not running_flag: + break + if reload_flag: + for f in files: + f.close() + f.open() reload_flag = False - if not any(line_set): - time.sleep(opts.poll) - line = "" - aggr = [] - for context in contexts: - aggr.extend(context.close()) - for event in aggr: - f, name = filer.create_unique_file() - with f: - f.write(event) - filer.publish_file(name) + if not any(line_set): + time.sleep(opts.poll) + + for context in contexts: + save_events(context.close(), filer) + + except Exception: + logging.exception("Exception") main() -- GitLab