Skip to content
Snippets Groups Projects
Commit 0b88226d authored by Pavel Kácha's avatar Pavel Kácha
Browse files

LaBrea Idea/Warden connector - watches files, aggregates, but still just outputs to stdout

parent 3df4bffe
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import sys
import re
import time
import heapq
import optparse
import itertools
import signal
import contextlib
import uuid
import json
import os.path as pth
from collections import namedtuple
class FileWatcher(object):
def __init__(self, filename, tail=True):
self.filename = filename
self.open()
if tail and self.f:
self.f.seek(0, os.SEEK_END)
def open(self):
try:
self.f = open(self.filename, "r")
st = os.fstat(self.f.fileno())
self.inode, self.size = st.st_ino, st.st_size
except IOError:
self.f = None
self.inode = -1
self.size = -1
def _check_reopen(self):
try:
st = os.stat(self.filename)
cur_inode, cur_size = st.st_ino, st.st_size
except OSError as e:
cur_inode = -1
cur_size = -1
if cur_inode != self.inode or cur_size < self.size:
self.close()
self.open()
def readline(self):
if self.f:
self.open()
if self.f:
return ""
res = self.f.readline()
if not res:
self._check_reopen()
if self.f:
return ""
res = self.f.readline()
return res
def close(self):
try:
if self.f:
self.f.close()
except IOError:
pass
self.inode = -1
self.size = -1
def __repr__(self):
return '%s("%s")' % (type(self).__name__, self.filename)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def __iter__(self):
return self
def next(self):
return self.readline()
class WindowContextMgr(object):
def __init__(self, window=60*10, timeout=60*5, ideagen=None):
self.contexts = {}
self.update_timestamp = 0
self.window = window
self.timeout = timeout
self.ideagen = ideagen
self.first_update_queue = []
self.last_update_queue = []
def expire_queue(self, queue, time_key, window):
aggr_events = []
while queue:
ctx = queue[0][1]
if ctx not in self.contexts:
# event went away while processing another queue, so just clean up
heapq.heappop(queue)
elif self.contexts[ctx][time_key] < self.update_timestamp - window:
heapq.heappop(queue)
closed = self.ctx_close(self.contexts[ctx])
if closed is not None:
aggr_events.append(closed)
del self.contexts[ctx]
else:
break
return aggr_events
def process(self, event=None, timestamp=None):
self.update_timestamp = timestamp
aggr_events = []
aggr_events.extend(self.expire_queue(self.first_update_queue, "first_update", self.window))
aggr_events.extend(self.expire_queue(self.last_update_queue, "last_update", self.timeout))
if event is not None:
ctx = self.match(event)
if ctx is not None:
if ctx not in self.contexts:
self.contexts[ctx] = self.ctx_create(event)
heapq.heappush(self.first_update_queue, (self.update_timestamp, ctx))
else:
self.ctx_append(self.contexts[ctx], event)
heapq.heappush(self.last_update_queue, (self.update_timestamp, ctx))
return aggr_events
def close(self):
aggr_events = []
for context in self.contexts.values():
closed = self.ctx_close(context)
if closed is not None:
aggr_events.append(closed)
self.contexts = {}
self.first_update_queue = []
self.last_update_queue = []
return aggr_events
class PingContextMgr(WindowContextMgr):
def match(self, event):
return event.src_ip if 'Ping' in event.message else None
def ctx_create(self, event):
ctx = dict(
src_ip = event.src_ip,
tgt_ips = set([event.tgt_ip]),
count = 1,
first_update = self.update_timestamp,
last_update = self.update_timestamp
)
return ctx
def ctx_append(self, ctx, event):
ctx["tgt_ips"].add(event.tgt_ip)
ctx["count"] += 1
ctx["last_update"] = self.update_timestamp
def ctx_close(self, ctx):
return self.ideagen.gen_idea(
src = ctx["src_ip"],
src_ports = None,
targets = [(ip, []) for ip in ctx["tgt_ips"]],
detect_time = self.update_timestamp,
event_time = ctx["first_update"],
cease_time = ctx["last_update"],
count = ctx["count"],
template = "ping"
)
class ConnectContextMgr(WindowContextMgr):
def match(self, event):
return event.src_ip if 'Connect' in event.message else None
def ctx_create(self, event):
ctx = dict(
src_ip = event.src_ip,
src_ports = set([event.src_port]),
tgt_ips_ports = {event.tgt_ip: set([event.tgt_port])},
count = 1,
first_update = self.update_timestamp,
last_update = self.update_timestamp
)
return ctx
def ctx_append(self, ctx, event):
tgt_ports = ctx["tgt_ips_ports"].setdefault(event.tgt_ip, set())
tgt_ports.add(event.tgt_port)
ctx["src_ports"].add(event.src_port)
ctx["count"] += 1
ctx["last_update"] = self.update_timestamp
def ctx_close(self, ctx):
return self.ideagen.gen_idea(
src = ctx["src_ip"],
src_ports = ctx["src_ports"],
targets = ctx["tgt_ips_ports"].items(),
detect_time = self.update_timestamp,
event_time = ctx["first_update"],
cease_time = ctx["last_update"],
count = ctx["count"],
template = "connect"
)
class InboundContextMgr(ConnectContextMgr):
def match(self, event):
return event.src_ip if 'Inbound' in event.message else None
def ctx_close(self, ctx):
return self.ideagen.gen_idea(
src = ctx["src_ip"],
src_ports = ctx["src_ports"],
targets = ctx["tgt_ips_ports"].items(),
detect_time = self.update_timestamp,
event_time = ctx["first_update"],
cease_time = ctx["last_update"],
count = ctx["count"],
template = "synack"
)
class IdeaGen(object):
def __init__(self, name, test=False):
self.name = name
self.test = test
self.template = {
"connect": {
"category": ["Recon.Scanning"],
"description": "TCP connection/scan",
"template": "labrea-001",
"note": "Connections from remote host to never assigned IP"
},
"ping": {
"category": ["Recon.Scanning"],
"description": "Ping scan",
"template": "labrea-002",
"note": "Ping requests from remote host to never assigned IP"
},
"synack": {
"category": ["Recon.Scanning"],
"description": "SYN/ACK connections/scan",
"template": "labrea-003",
"note": "Unsolicited SYN/ACK packet received from remote host to never assigned IP"
}
}
def format_timestamp(self, epoch):
return "%04d-%02d-%02dT%02d:%02d:%02dZ" % time.gmtime(epoch)[:6]
def gen_idea(self, src, src_ports, targets, detect_time, event_time, cease_time, count, template):
tmpl = self.template[template]
isource = {
"IP6" if ":" in src else "IP4": [src],
"Proto": ["tcp"]
}
if src_ports:
isource["Port"] = [int(port) for port in src_ports]
# Fold multiple IPs with the same portset
folded_tgt = {}
for tgt, ports in targets:
folded_tgt.setdefault(frozenset(ports), []).append(tgt)
itargets = []
for ports, tgt in folded_tgt.items():
itarget = {"Proto": ["tcp"]}
tgts4 = [ip for ip in tgt if ":" not in ip]
tgts6 = [ip for ip in tgt if ":" in ip]
if tgts4:
itarget["IP4"] = tgts4
if tgts6:
itarget["IP6"] = tgts6
if ports:
itarget["Port"] = [int(port) for port in ports]
itargets.append(itarget)
inode = {
"SW": ["LaBrea"],
"Type": ["Connection", "Tarpit"],
"Name": self.name
}
idea = {
"Format": "IDEA0",
"ID": str(uuid.uuid4()),
"Category": tmpl["category"] + ["Test"] if self.test else [],
"Description": tmpl["description"],
"DetectTime": self.format_timestamp(detect_time),
"EventTime": self.format_timestamp(event_time),
"CeaseTime": self.format_timestamp(cease_time),
"ConnectionCount": count,
"Note": tmpl["note"],
"_CESNET": {
"EventTemplate": tmpl["template"],
"testtime": detect_time
},
"Source": [isource],
"Target": itargets,
"Node": [inode]
}
return json.dumps(idea)
def daemonize(
work_dir = None, chroot_dir = None,
umask = None, uid = None, gid = None,
pidfile = None, files_preserve = [], signals = {}):
# Dirs, limits, users
if chroot_dir is not None:
os.chdir(chroot_dir)
os.chroot(chroot_dir)
if umask is not None:
os.umask(umask)
if work_dir is not None:
os.chdir(work_dir)
if gid is not None:
os.setgid(gid)
if uid is not None:
os.setuid(uid)
# Doublefork, split session
if os.fork()>0:
os._exit(0)
os.setsid()
if os.fork()>0:
os._exit(0)
# Setup signal handlers
for (signum, handler) in signals.items():
signal.signal(signum, handler)
# Close descriptors
descr_preserve = set(f.fileno() for f in files_preserve)
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd==resource.RLIM_INFINITY:
maxfd = 65535
for fd in range(maxfd, 3, -1): # 3 means omit stdin, stdout, stderr
if fd not in descr_preserve:
try:
os.close(fd)
except Exception:
pass
# Redirect stdin, stdout, stderr to /dev/null
devnull = os.open(os.devnull, os.O_RDWR)
for fd in range(3):
os.dup2(devnull, fd)
# PID file
if pidfile is not None:
pidd = os.open(pidfile, os.O_RDWR|os.O_CREAT|os.O_EXCL|os.O_TRUNC)
os.write(pidd, str(os.getpid())+"\n")
os.close(pidd)
# Define and setup atexit closure
@atexit.register
def unlink_pid():
try:
os.unlink(pidfile)
except Exception:
pass
def get_args():
optp = optparse.OptionParser(
usage="usage: %prog [options] logfile ...",
description="Watch LaBrea logfiles and generate Idea events into directory")
optp.add_option("-w", "--window",
default=900,
dest="window",
type="int",
action="store",
help="max detection window (default: %default)")
optp.add_option("-t", "--timeout",
default=300,
dest="timeout",
type="int",
action="store",
help="detection timeout (default: %default)")
optp.add_option("-n", "--name",
default=None,
dest="name",
type="string",
action="store",
help="Warden client name")
optp.add_option("--test",
default=False,
dest="test",
action="store_true",
help="Add Test category")
optp.add_option("-o", "--oneshot",
default=False,
dest="oneshot",
action="store_true",
help="process files and quit (do not daemonize)")
optp.add_option("-p", "--pid",
default=pth.join("/var/run", pth.splitext(pth.basename(sys.argv[0]))[0] + ".pid"),
dest="pid",
type="string",
action="store",
help="create PID file with this name (default: %default)")
optp.add_option("--realtime",
default=True,
dest="realtime",
action="store_true",
help="use system time along with log timestamps (default)")
optp.add_option("--norealtime",
dest="realtime",
action="store_false",
help="don't system time, use solely log timestamps")
return optp
# 1493035442 Initial Connect - tarpitting: 89.163.242.15 56736 -> 195.113.254.182 9898
# 1493037991 Inbound SYN/ACK: 185.62.190.15 21001 -> 195.113.252.222 15584
# 1492790713 Inbound SYN/ACK: 62.210.130.232 80 -> 78.128.253.197 58376
labrea_re_connect_inbound = re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) ([0-9]+) -> ([^ ]+) ([0-9]+).*')
connect_tuple = namedtuple("connect_tuple", ("timestamp", "message", "src_ip", "src_port", "tgt_ip", "tgt_port"))
# 1493035442 Responded to a Ping: 88.86.96.25 -> 195.113.253.87 *
labrea_re_ping = re.compile(r'([0-9]+) ([^:]*:) ([^ ]+) -> ([^ ]+).*')
ping_tuple = namedtuple("ping_tuple", ("timestamp", "message", "src_ip", "tgt_ip"))
re_list = [labrea_re_connect_inbound, labrea_re_ping]
event_list = [connect_tuple, ping_tuple]
running_flag = True
reload_flag = False
def terminate_me(signum, frame):
global running_flag
running_flag = False
def reload_me(signum, frame):
global reload_flag
reload_flag = True
def main():
# safedir output
# daemon
global reload_flag
optp = get_args()
opts, args = optp.parse_args()
if not args or opts.name is None:
optp.print_help()
sys.exit()
signal.signal(signal.SIGINT, terminate_me)
signal.signal(signal.SIGTERM, terminate_me)
signal.signal(signal.SIGHUP, reload_me)
if opts.oneshot:
files = [open(arg) for arg in args]
else:
files = [FileWatcher(arg) for arg in args]
with contextlib.nested(*files):
lines = itertools.izip(*files)
ideagen = IdeaGen(name=opts.name, test=opts.test)
contexts = [context(window=opts.window, timeout=opts.timeout, ideagen=ideagen) for context in [PingContextMgr, ConnectContextMgr, InboundContextMgr]]
for line_set in lines:
for line in line_set:
if line:
line = line.strip()
event = None
for labrea_re, event_tuple in zip(re_list, event_list):
match = labrea_re.match(line)
if match:
event = event_tuple(*match.groups())
break
aggr = []
if event is not None:
timestamp = int(event.timestamp)
else:
if opts.realtime:
timestamp = int(time.time())
else:
timestamp = None
if timestamp is not None:
for context in contexts:
aggr.extend(context.process(event, timestamp))
for a in aggr:
print(a)
if not running_flag:
break
if reload_flag:
for f in files:
f.close()
f.open()
reload_flag = False
if not any(line_set):
time.sleep(1)
line = ""
aggr = []
for context in contexts:
aggr.extend(context.close())
for a in aggr:
print(a)
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment