Skip to content
Snippets Groups Projects
Commit 542e44d1 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Reordered source, pep8 cleanup, fixed logging

parent 5529ae73
Branches
No related tags found
No related merge requests found
......@@ -6,7 +6,6 @@ import sys
import re
import time
import optparse
import itertools
import signal
import contextlib
import uuid
......@@ -14,86 +13,16 @@ import json
import socket
import resource
import atexit
import logging
import logging.handlers
import os.path as pth
from itertools import izip
from collections import namedtuple
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
class FileWatcher(object):
def __init__(self, filename, tail=True):
self.filename = filename
self.open()
self.line_buffer = ""
if tail and self.f:
self.f.seek(0, os.SEEK_END)
def open(self):
try:
self.f = open(self.filename, "r")
st = os.fstat(self.f.fileno())
self.inode, self.size = st.st_ino, st.st_size
except IOError:
self.f = None
self.inode = -1
self.size = -1
def _check_reopen(self):
try:
st = os.stat(self.filename)
cur_inode, cur_size = st.st_ino, st.st_size
except OSError as e:
cur_inode = -1
cur_size = -1
if cur_inode != self.inode or cur_size < self.size:
self.close()
self.open()
def readline(self):
if not self.f:
self.open()
if not self.f:
return self.line_buffer
res = self.f.readline()
if not res:
self._check_reopen()
if not self.f:
return self.line_buffer
res = self.f.readline()
if not res.endswith("\n"):
self.line_buffer += res
else:
res = self.line_buffer + res
self.line_buffer = ""
return res
def close(self):
try:
if self.f:
self.f.close()
except IOError:
pass
self.inode = -1
self.size = -1
def __repr__(self):
return '%s("%s")' % (type(self).__name__, self.filename)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def __iter__(self):
return self
def next(self):
return self.readline()
class WindowContextMgr(object):
......@@ -144,7 +73,6 @@ class WindowContextMgr(object):
return aggr_events
def close(self):
aggr_events = []
for context in self.contexts.values():
......@@ -244,6 +172,80 @@ class InboundContextMgr(ConnectContextMgr):
)
class FileWatcher(object):
def __init__(self, filename, tail=True):
self.filename = filename
self.open()
self.line_buffer = ""
if tail and self.f:
self.f.seek(0, os.SEEK_END)
def open(self):
try:
self.f = open(self.filename, "r")
st = os.fstat(self.f.fileno())
self.inode, self.size = st.st_ino, st.st_size
except IOError:
self.f = None
self.inode = -1
self.size = -1
def _check_reopen(self):
try:
st = os.stat(self.filename)
cur_inode, cur_size = st.st_ino, st.st_size
except OSError as e:
cur_inode = -1
cur_size = -1
if cur_inode != self.inode or cur_size < self.size:
self.close()
self.open()
def readline(self):
if not self.f:
self.open()
if not self.f:
return self.line_buffer
res = self.f.readline()
if not res:
self._check_reopen()
if not self.f:
return self.line_buffer
res = self.f.readline()
if not res.endswith("\n"):
self.line_buffer += res
else:
res = self.line_buffer + res
self.line_buffer = ""
return res
def close(self):
try:
if self.f:
self.f.close()
except IOError:
pass
self.inode = -1
self.size = -1
def __repr__(self):
return '%s("%s")' % (type(self).__name__, self.filename)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def __iter__(self):
return self
def next(self):
return self.readline()
class IdeaGen(object):
def __init__(self, name, test=False):
......@@ -319,7 +321,7 @@ class IdeaGen(object):
"Target": itargets,
"Node": [inode]
}
return json.dumps(idea, ensure_ascii=True)
return idea
class Filer(object):
......@@ -414,6 +416,7 @@ def daemonize(
os.write(pidd, str(os.getpid())+"\n")
os.close(pidd)
# Define and setup atexit closure
@atexit.register
def unlink_pid():
try:
......@@ -422,104 +425,153 @@ def daemonize(
pass
def save_events(aggr, filer):
for event in aggr:
f, name = filer.create_unique_file()
with f:
f.write(json.dumps(event, ensure_ascii=True))
filer.publish_file(name)
logging.info(
"Saved %s %s \"%s\" (%d) as file %s" % (
event["ID"], "+".join(event["Category"]), event["Description"], event["ConnCount"], name))
RE_LIST = (
# 1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898
# 1493037991 Inbound SYN/ACK: 185.62.190.15 21001 -> 195.113.252.222 15584
(
re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*'),
namedtuple("connect_tuple", ("timestamp", "message", "src_ip", "src_port", "tgt_ip", "tgt_port"))
),
# 1493035442 Responded to a Ping: 88.86.96.25 -> 195.113.253.87 *
(
re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) -> ([^ ]+).*'),
namedtuple("ping_tuple", ("timestamp", "message", "src_ip", "tgt_ip"))
)
)
def match_event(line):
for labrea_re, event_tuple in RE_LIST:
match = labrea_re.match(line)
if match:
return event_tuple(*match.groups())
logging.info("Unmatched line: \"%s\"" % line.replace("\n", r"\n"))
return None
def get_args():
optp = optparse.OptionParser(
usage="usage: %prog [options] logfile ...",
description="Watch LaBrea logfiles and generate Idea events into directory")
optp.add_option("-w", "--window",
optp.add_option(
"-w", "--window",
default=900,
dest="window",
type="int",
action="store",
help="max detection window (default: %default)")
optp.add_option("-t", "--timeout",
optp.add_option(
"-t", "--timeout",
default=300,
dest="timeout",
type="int",
action="store",
help="detection timeout (default: %default)")
optp.add_option("-n", "--name",
optp.add_option(
"-n", "--name",
default=None,
dest="name",
type="string",
action="store",
help="Warden client name")
optp.add_option("--test",
optp.add_option(
"--test",
default=False,
dest="test",
action="store_true",
help="Add Test category")
optp.add_option("-o", "--oneshot",
optp.add_option(
"-o", "--oneshot",
default=False,
dest="oneshot",
action="store_true",
help="process files and quit (do not daemonize)")
optp.add_option("--poll",
optp.add_option(
"--poll",
default=1,
dest="poll",
type="int",
action="store",
help="log file polling interval")
optp.add_option("-d", "--dir",
optp.add_option(
"-d", "--dir",
default=None,
dest="dir",
type="string",
action="store",
help="Target directory (mandatory)"),
optp.add_option("-p", "--pid",
optp.add_option(
"-p", "--pid",
default=pth.join("/var/run", pth.splitext(pth.basename(sys.argv[0]))[0] + ".pid"),
dest="pid",
type="string",
action="store",
help="create PID file with this name (default: %default)")
optp.add_option("-u", "--uid",
optp.add_option(
"-u", "--uid",
default=None,
dest="uid",
type="int",
action="store",
help="user id to run under")
optp.add_option("-g", "--gid",
optp.add_option(
"-g", "--gid",
default=None,
dest="gid",
type="int",
action="store",
help="group id to run under")
optp.add_option("--realtime",
optp.add_option(
"-v", "--verbose",
dest="verbose",
action="store_true",
help="turn on debug logging")
optp.add_option(
"--log",
default="local7",
dest="log",
type="string",
action="store",
help="syslog facility or log file name (default: %default)")
optp.add_option(
"--realtime",
default=True,
dest="realtime",
action="store_true",
help="use system time along with log timestamps (default)")
optp.add_option("--norealtime",
optp.add_option(
"--norealtime",
dest="realtime",
action="store_false",
help="don't system time, use solely log timestamps")
return optp
# 1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898
# 1493037991 Inbound SYN/ACK: 185.62.190.15 21001 -> 195.113.252.222 15584
# 1492790713 Inbound SYN/ACK: 62.210.130.232 80 -> 78.128.253.197 58376
labrea_re_connect_inbound = re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*')
connect_tuple = namedtuple("connect_tuple", ("timestamp", "message", "src_ip", "src_port", "tgt_ip", "tgt_port"))
# 1493035442 Responded to a Ping: 88.86.96.25 -> 195.113.253.87 *
labrea_re_ping = re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) -> ([^ ]+).*')
ping_tuple = namedtuple("ping_tuple", ("timestamp", "message", "src_ip", "tgt_ip"))
re_list = [labrea_re_connect_inbound, labrea_re_ping]
event_list = [connect_tuple, ping_tuple]
running_flag = True
reload_flag = False
def terminate_me(signum, frame):
global running_flag
running_flag = False
def reload_me(signum, frame):
global reload_flag
reload_flag = True
def main():
global reload_flag
optp = get_args()
......@@ -527,7 +579,26 @@ def main():
if not args or opts.name is None or opts.dir is None:
optp.print_help()
sys.exit()
logformat = "%(filename)s[%(process)d] %(message)s"
logger = logging.getLogger()
if opts.oneshot:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s " + logformat))
else:
if "/" in opts.log:
handler = logging.handlers.WatchedFileHandler(opts.log)
handler.setFormatter(logging.Formatter("%(asctime)s " + logformat))
else:
handler = logging.handlers.SysLogHandler(address="/dev/log", facility=opts.log)
handler.setFormatter(logging.Formatter(logformat))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG if opts.verbose else logging.INFO)
try:
if opts.oneshot:
signal.signal(signal.SIGINT, terminate_me)
signal.signal(signal.SIGTERM, terminate_me)
files = [open(arg) for arg in args]
else:
daemonize(
......@@ -540,37 +611,27 @@ def main():
signal.SIGHUP: reload_me
})
files = [FileWatcher(arg) for arg in args]
with contextlib.nested(*files):
lines = itertools.izip(*files)
ideagen = IdeaGen(name=opts.name, test=opts.test)
contexts = [context(window=opts.window, timeout=opts.timeout, ideagen=ideagen) for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr]]
filer = Filer(opts.dir)
for line_set in lines:
contexts = [
context(window=opts.window, timeout=opts.timeout, ideagen=ideagen)
for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr]
]
with contextlib.nested(*files):
for line_set in izip(*files):
for line in line_set:
if line:
line = line.strip()
event = None
for labrea_re, event_tuple in zip(re_list, event_list):
match = labrea_re.match(line)
if match:
event = event_tuple(*match.groups())
if not line:
break
aggr = []
if event is not None:
timestamp = int(event.timestamp)
else:
if opts.realtime:
timestamp = int(time.time())
else:
timestamp = None
if timestamp is not None:
event = match_event(line)
if event or opts.realtime:
timestamp = int(event.timestamp) if event else int(time.time())
for context in contexts:
aggr.extend(context.process(event, timestamp))
for event in aggr:
f, name = filer.create_unique_file()
with f:
f.write(event)
filer.publish_file(name)
save_events(context.process(event, timestamp), filer)
if not running_flag:
break
if reload_flag:
......@@ -580,14 +641,11 @@ def main():
reload_flag = False
if not any(line_set):
time.sleep(opts.poll)
line = ""
aggr = []
for context in contexts:
aggr.extend(context.close())
for event in aggr:
f, name = filer.create_unique_file()
with f:
f.write(event)
filer.publish_file(name)
save_events(context.close(), filer)
except Exception:
logging.exception("Exception")
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment