Skip to content
Snippets Groups Projects
Commit 2d92abeb authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Reordered source, pep8 cleanup, fixed logging

parent 46ca1c21
No related branches found
No related tags found
No related merge requests found
......@@ -6,7 +6,6 @@ import sys
import re
import time
import optparse
import itertools
import signal
import contextlib
import uuid
......@@ -14,86 +13,16 @@ import json
import socket
import resource
import atexit
import logging
import logging.handlers
import os.path as pth
from itertools import izip
from collections import namedtuple
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
class FileWatcher(object):
def __init__(self, filename, tail=True):
self.filename = filename
self.open()
self.line_buffer = ""
if tail and self.f:
self.f.seek(0, os.SEEK_END)
def open(self):
try:
self.f = open(self.filename, "r")
st = os.fstat(self.f.fileno())
self.inode, self.size = st.st_ino, st.st_size
except IOError:
self.f = None
self.inode = -1
self.size = -1
def _check_reopen(self):
try:
st = os.stat(self.filename)
cur_inode, cur_size = st.st_ino, st.st_size
except OSError as e:
cur_inode = -1
cur_size = -1
if cur_inode != self.inode or cur_size < self.size:
self.close()
self.open()
def readline(self):
if not self.f:
self.open()
if not self.f:
return self.line_buffer
res = self.f.readline()
if not res:
self._check_reopen()
if not self.f:
return self.line_buffer
res = self.f.readline()
if not res.endswith("\n"):
self.line_buffer += res
else:
res = self.line_buffer + res
self.line_buffer = ""
return res
def close(self):
try:
if self.f:
self.f.close()
except IOError:
pass
self.inode = -1
self.size = -1
def __repr__(self):
return '%s("%s")' % (type(self).__name__, self.filename)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def __iter__(self):
return self
def next(self):
return self.readline()
class WindowContextMgr(object):
......@@ -121,7 +50,7 @@ class WindowContextMgr(object):
del self.first_update_queue[ctx]
del self.last_update_queue[ctx]
return aggr_events
def process(self, event=None, timestamp=None):
self.update_timestamp = timestamp
......@@ -144,7 +73,6 @@ class WindowContextMgr(object):
return aggr_events
def close(self):
aggr_events = []
for context in self.contexts.values():
......@@ -164,11 +92,11 @@ class PingContextMgr(WindowContextMgr):
def ctx_create(self, event):
ctx = dict(
src_ip = event.src_ip,
tgt_ips = set([event.tgt_ip]),
count = 1,
first_update = self.update_timestamp,
last_update = self.update_timestamp
src_ip=event.src_ip,
tgt_ips=set([event.tgt_ip]),
count=1,
first_update=self.update_timestamp,
last_update=self.update_timestamp
)
return ctx
......@@ -179,14 +107,14 @@ class PingContextMgr(WindowContextMgr):
def ctx_close(self, ctx):
return self.ideagen.gen_idea(
src = ctx["src_ip"],
src_ports = None,
targets = [(ip, []) for ip in ctx["tgt_ips"]],
detect_time = self.update_timestamp,
event_time = ctx["first_update"],
cease_time = ctx["last_update"],
count = ctx["count"],
template = "ping"
src=ctx["src_ip"],
src_ports=None,
targets=[(ip, []) for ip in ctx["tgt_ips"]],
detect_time=self.update_timestamp,
event_time=ctx["first_update"],
cease_time=ctx["last_update"],
count=ctx["count"],
template="ping"
)
......@@ -197,12 +125,12 @@ class ConnectContextMgr(WindowContextMgr):
def ctx_create(self, event):
ctx = dict(
src_ip = event.src_ip,
src_ports = set([event.src_port]),
tgt_ips_ports = {event.tgt_ip: set([event.tgt_port])},
count = 1,
first_update = self.update_timestamp,
last_update = self.update_timestamp
src_ip=event.src_ip,
src_ports=set([event.src_port]),
tgt_ips_ports={event.tgt_ip: set([event.tgt_port])},
count=1,
first_update=self.update_timestamp,
last_update=self.update_timestamp
)
return ctx
......@@ -215,14 +143,14 @@ class ConnectContextMgr(WindowContextMgr):
def ctx_close(self, ctx):
return self.ideagen.gen_idea(
src = ctx["src_ip"],
src_ports = ctx["src_ports"],
targets = ctx["tgt_ips_ports"].items(),
detect_time = self.update_timestamp,
event_time = ctx["first_update"],
cease_time = ctx["last_update"],
count = ctx["count"],
template = "connect"
src=ctx["src_ip"],
src_ports=ctx["src_ports"],
targets=ctx["tgt_ips_ports"].items(),
detect_time=self.update_timestamp,
event_time=ctx["first_update"],
cease_time=ctx["last_update"],
count=ctx["count"],
template="connect"
)
......@@ -233,17 +161,91 @@ class InboundContextMgr(ConnectContextMgr):
def ctx_close(self, ctx):
return self.ideagen.gen_idea(
src = ctx["src_ip"],
src_ports = ctx["src_ports"],
targets = ctx["tgt_ips_ports"].items(),
detect_time = self.update_timestamp,
event_time = ctx["first_update"],
cease_time = ctx["last_update"],
count = ctx["count"],
template = "synack"
src=ctx["src_ip"],
src_ports=ctx["src_ports"],
targets=ctx["tgt_ips_ports"].items(),
detect_time=self.update_timestamp,
event_time=ctx["first_update"],
cease_time=ctx["last_update"],
count=ctx["count"],
template="synack"
)
class FileWatcher(object):
def __init__(self, filename, tail=True):
self.filename = filename
self.open()
self.line_buffer = ""
if tail and self.f:
self.f.seek(0, os.SEEK_END)
def open(self):
try:
self.f = open(self.filename, "r")
st = os.fstat(self.f.fileno())
self.inode, self.size = st.st_ino, st.st_size
except IOError:
self.f = None
self.inode = -1
self.size = -1
def _check_reopen(self):
try:
st = os.stat(self.filename)
cur_inode, cur_size = st.st_ino, st.st_size
except OSError as e:
cur_inode = -1
cur_size = -1
if cur_inode != self.inode or cur_size < self.size:
self.close()
self.open()
def readline(self):
if not self.f:
self.open()
if not self.f:
return self.line_buffer
res = self.f.readline()
if not res:
self._check_reopen()
if not self.f:
return self.line_buffer
res = self.f.readline()
if not res.endswith("\n"):
self.line_buffer += res
else:
res = self.line_buffer + res
self.line_buffer = ""
return res
def close(self):
try:
if self.f:
self.f.close()
except IOError:
pass
self.inode = -1
self.size = -1
def __repr__(self):
return '%s("%s")' % (type(self).__name__, self.filename)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def __iter__(self):
return self
def next(self):
return self.readline()
class IdeaGen(object):
def __init__(self, name, test=False):
......@@ -319,7 +321,7 @@ class IdeaGen(object):
"Target": itargets,
"Node": [inode]
}
return json.dumps(idea, ensure_ascii=True)
return idea
class Filer(object):
......@@ -366,9 +368,9 @@ class Filer(object):
def daemonize(
work_dir = None, chroot_dir = None,
umask = None, uid = None, gid = None,
pidfile = None, files_preserve = [], signals = {}):
work_dir=None, chroot_dir=None,
umask=None, uid=None, gid=None,
pidfile=None, files_preserve=[], signals={}):
# Dirs, limits, users
if chroot_dir is not None:
os.chdir(chroot_dir)
......@@ -382,13 +384,13 @@ def daemonize(
if uid is not None:
os.setuid(uid)
# Doublefork, split session
if os.fork()>0:
if os.fork() > 0:
os._exit(0)
try:
os.setsid()
except OSError:
pass
if os.fork()>0:
if os.fork() > 0:
os._exit(0)
# Setup signal handlers
for (signum, handler) in signals.items():
......@@ -396,7 +398,7 @@ def daemonize(
# Close descriptors
descr_preserve = set(f.fileno() for f in files_preserve)
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd==resource.RLIM_INFINITY:
if maxfd == resource.RLIM_INFINITY:
maxfd = 65535
for fd in range(maxfd, 3, -1): # 3 means omit stdin, stdout, stderr
if fd not in descr_preserve:
......@@ -410,10 +412,11 @@ def daemonize(
os.dup2(devnull, fd)
# PID file
if pidfile is not None:
pidd = os.open(pidfile, os.O_RDWR|os.O_CREAT|os.O_EXCL|os.O_TRUNC)
pidd = os.open(pidfile, os.O_RDWR | os.O_CREAT | os.O_EXCL | os.O_TRUNC)
os.write(pidd, str(os.getpid())+"\n")
os.close(pidd)
# Define and setup atexit closure
@atexit.register
def unlink_pid():
try:
......@@ -422,104 +425,153 @@ def daemonize(
pass
def save_events(aggr, filer):
for event in aggr:
f, name = filer.create_unique_file()
with f:
f.write(json.dumps(event, ensure_ascii=True))
filer.publish_file(name)
logging.info(
"Saved %s %s \"%s\" (%d) as file %s" % (
event["ID"], "+".join(event["Category"]), event["Description"], event["ConnCount"], name))
RE_LIST = (
# 1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898
# 1493037991 Inbound SYN/ACK: 185.62.190.15 21001 -> 195.113.252.222 15584
(
re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*'),
namedtuple("connect_tuple", ("timestamp", "message", "src_ip", "src_port", "tgt_ip", "tgt_port"))
),
# 1493035442 Responded to a Ping: 88.86.96.25 -> 195.113.253.87 *
(
re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) -> ([^ ]+).*'),
namedtuple("ping_tuple", ("timestamp", "message", "src_ip", "tgt_ip"))
)
)
def match_event(line):
for labrea_re, event_tuple in RE_LIST:
match = labrea_re.match(line)
if match:
return event_tuple(*match.groups())
logging.info("Unmatched line: \"%s\"" % line.replace("\n", r"\n"))
return None
def get_args():
optp = optparse.OptionParser(
usage="usage: %prog [options] logfile ...",
description="Watch LaBrea logfiles and generate Idea events into directory")
optp.add_option("-w", "--window",
optp.add_option(
"-w", "--window",
default=900,
dest="window",
type="int",
action="store",
help="max detection window (default: %default)")
optp.add_option("-t", "--timeout",
optp.add_option(
"-t", "--timeout",
default=300,
dest="timeout",
type="int",
action="store",
help="detection timeout (default: %default)")
optp.add_option("-n", "--name",
optp.add_option(
"-n", "--name",
default=None,
dest="name",
type="string",
action="store",
help="Warden client name")
optp.add_option("--test",
optp.add_option(
"--test",
default=False,
dest="test",
action="store_true",
help="Add Test category")
optp.add_option("-o", "--oneshot",
optp.add_option(
"-o", "--oneshot",
default=False,
dest="oneshot",
action="store_true",
help="process files and quit (do not daemonize)")
optp.add_option("--poll",
optp.add_option(
"--poll",
default=1,
dest="poll",
type="int",
action="store",
help="log file polling interval")
optp.add_option("-d", "--dir",
optp.add_option(
"-d", "--dir",
default=None,
dest="dir",
type="string",
action="store",
help="Target directory (mandatory)"),
optp.add_option("-p", "--pid",
optp.add_option(
"-p", "--pid",
default=pth.join("/var/run", pth.splitext(pth.basename(sys.argv[0]))[0] + ".pid"),
dest="pid",
type="string",
action="store",
help="create PID file with this name (default: %default)")
optp.add_option("-u", "--uid",
optp.add_option(
"-u", "--uid",
default=None,
dest="uid",
type="int",
action="store",
help="user id to run under")
optp.add_option("-g", "--gid",
optp.add_option(
"-g", "--gid",
default=None,
dest="gid",
type="int",
action="store",
help="group id to run under")
optp.add_option("--realtime",
optp.add_option(
"-v", "--verbose",
dest="verbose",
action="store_true",
help="turn on debug logging")
optp.add_option(
"--log",
default="local7",
dest="log",
type="string",
action="store",
help="syslog facility or log file name (default: %default)")
optp.add_option(
"--realtime",
default=True,
dest="realtime",
action="store_true",
help="use system time along with log timestamps (default)")
optp.add_option("--norealtime",
optp.add_option(
"--norealtime",
dest="realtime",
action="store_false",
help="don't system time, use solely log timestamps")
return optp
# 1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898
# 1493037991 Inbound SYN/ACK: 185.62.190.15 21001 -> 195.113.252.222 15584
# 1492790713 Inbound SYN/ACK: 62.210.130.232 80 -> 78.128.253.197 58376
labrea_re_connect_inbound = re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*')
connect_tuple = namedtuple("connect_tuple", ("timestamp", "message", "src_ip", "src_port", "tgt_ip", "tgt_port"))
# 1493035442 Responded to a Ping: 88.86.96.25 -> 195.113.253.87 *
labrea_re_ping = re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) -> ([^ ]+).*')
ping_tuple = namedtuple("ping_tuple", ("timestamp", "message", "src_ip", "tgt_ip"))
re_list = [labrea_re_connect_inbound, labrea_re_ping]
event_list = [connect_tuple, ping_tuple]
running_flag = True
reload_flag = False
def terminate_me(signum, frame):
global running_flag
running_flag = False
def reload_me(signum, frame):
global reload_flag
reload_flag = True
def main():
global reload_flag
optp = get_args()
......@@ -527,67 +579,73 @@ def main():
if not args or opts.name is None or opts.dir is None:
optp.print_help()
sys.exit()
logformat = "%(filename)s[%(process)d] %(message)s"
logger = logging.getLogger()
if opts.oneshot:
files = [open(arg) for arg in args]
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s " + logformat))
else:
daemonize(
pidfile = opts.pid,
uid = opts.uid,
gid = opts.gid,
signals = {
signal.SIGINT: terminate_me,
signal.SIGTERM: terminate_me,
signal.SIGHUP: reload_me
})
files = [FileWatcher(arg) for arg in args]
with contextlib.nested(*files):
lines = itertools.izip(*files)
if "/" in opts.log:
handler = logging.handlers.WatchedFileHandler(opts.log)
handler.setFormatter(logging.Formatter("%(asctime)s " + logformat))
else:
handler = logging.handlers.SysLogHandler(address="/dev/log", facility=opts.log)
handler.setFormatter(logging.Formatter(logformat))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG if opts.verbose else logging.INFO)
try:
if opts.oneshot:
signal.signal(signal.SIGINT, terminate_me)
signal.signal(signal.SIGTERM, terminate_me)
files = [open(arg) for arg in args]
else:
daemonize(
pidfile=opts.pid,
uid=opts.uid,
gid=opts.gid,
signals={
signal.SIGINT: terminate_me,
signal.SIGTERM: terminate_me,
signal.SIGHUP: reload_me
})
files = [FileWatcher(arg) for arg in args]
ideagen = IdeaGen(name=opts.name, test=opts.test)
contexts = [context(window=opts.window, timeout=opts.timeout, ideagen=ideagen) for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr]]
filer = Filer(opts.dir)
for line_set in lines:
for line in line_set:
if line:
line = line.strip()
event = None
for labrea_re, event_tuple in zip(re_list, event_list):
match = labrea_re.match(line)
if match:
event = event_tuple(*match.groups())
break
aggr = []
if event is not None:
timestamp = int(event.timestamp)
else:
if opts.realtime:
timestamp = int(time.time())
else:
timestamp = None
if timestamp is not None:
contexts = [
context(window=opts.window, timeout=opts.timeout, ideagen=ideagen)
for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr]
]
with contextlib.nested(*files):
for line_set in izip(*files):
for line in line_set:
if not line:
break
event = match_event(line)
if event or opts.realtime:
timestamp = int(event.timestamp) if event else int(time.time())
for context in contexts:
aggr.extend(context.process(event, timestamp))
for event in aggr:
f, name = filer.create_unique_file()
with f:
f.write(event)
filer.publish_file(name)
if not running_flag:
break
if reload_flag:
for f in files:
f.close()
f.open()
save_events(context.process(event, timestamp), filer)
if not running_flag:
break
if reload_flag:
for f in files:
f.close()
f.open()
reload_flag = False
if not any(line_set):
time.sleep(opts.poll)
line = ""
aggr = []
for context in contexts:
aggr.extend(context.close())
for event in aggr:
f, name = filer.create_unique_file()
with f:
f.write(event)
filer.publish_file(name)
if not any(line_set):
time.sleep(opts.poll)
for context in contexts:
save_events(context.close(), filer)
except Exception:
logging.exception("Exception")
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment