-
Václav Bartoš authoredVáclav Bartoš authored
warden3_flowmon_ads_filer.py 9.25 KiB
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2011-2015 Cesnet z.s.p.o
# Use of this source is governed by a 3-clause BSD-style license, see LICENSE file.
import os
import sys
import getopt
sys.path.append('/data/warden/libs')
from warden_client import read_cfg, format_time
from warden_filer import SafeDir
import json
import csv
from time import strptime, mktime
import time
import re
from uuid import uuid4
# Command line options handling
# Had to use getopt for 2.6 compatibility. Meh. :(
opt_dict = {
"help": False,
"test": False,
"origdata": False,
"errlog": "/data/warden/var/flowmon-ads-filer_lastrun.log",
"out": "/data/warden/var/feeds-out",
"target": "NONE"
}
getopt_format = [k + (v and "=" or "") for (k, v) in opt_dict.items()]
def help(s=None, exitcode=0):
if s is not None:
print("Error: %s" % s)
print("Usage: %s [%s]" % (sys.argv[0], "] [".join(["--" + v for v in getopt_format])))
sys.exit(exitcode)
def get_opts():
try:
rawopts, args = getopt.gnu_getopt(sys.argv[1:], "", getopt_format)
except getopt.GetoptError as err:
help(err, exitcode=2)
rawopts = dict(rawopts)
if "--help" in rawopts:
help()
opts = {}
for k, v in opt_dict.items():
opts[k] = v and rawopts.get("--" + k, v) or ("--" + k) in rawopts
return opts
# Conversion/validation routines
def isotime(t):
if not t:
return None
return strptime(t, "%Y-%m-%d %H:%M:%S")
def intlist(il):
if not il:
return []
return [int(i.strip()) for i in il.split(",")]
def strlist(sl):
if not sl:
return []
return [str(s) for s in sl.split(",")]
def ip(s):
if not s:
return None
return s.strip()
def iplist(sl):
if not sl:
return []
return [ip(s) for s in sl.split(",")]
ads_fieldnames = {
# unikátní id v ADS db
"ID": {"type": int, "order": 0},
# čas vygenerování události
"Timestamp": {"type": isotime, "order": 1},
# čas prvního toku, na základě kterého se událost detekovala
"FirstFlow": {"type": isotime, "order": 2},
# typ události, například SRVNA, SCANS, ..
"Type": {"type": str, "order": 3},
# popis typu události, například "Service not available", "Port scanning", ..
"TypeDesc": {"type": str, "order": 4},
# název perspektivy, podle které se reportuje. Perspektiva ohodnocuje události prioritou CRITICAL, HIGH, ..
"Perspective": {"type": str, "order": 5},
# priorita podle perspektivy
"Severity": {"type": str, "order": 6},
# textový detail události, například: "Known attackers,
# attempts: 13, uploaded: 12.54 KiB, downloaded: 25.67 KiB,
# frequently used port(s): 22, 37257, 37304, 48856, 36616."
"Detail": {"type": str, "order": 7},
# seznam portů (pokud byly identifikovány)
"Ports": {"type": intlist, "order": 8},
# IP protokol (pokud lze idenfitikovat)
"Protocol": {"type": strlist, "order": 9},
# IP adresa způsobující událost
"Source": {"type": ip, "order": 10},
# doménové jméno Source v době vygenerování události (pokud funkce zapnuta)
"CapturedSource": {"type": str, "order": 11},
# seznam cílových IP adres, například u skenování jsou zde skenované IP adresy
"Targets": {"type": iplist, "order": 12},
# název ADS zdroje, na kterém se událost detekovala
"NetFlowSource": {"type": str, "order": 13},
# identifikátor uživatele přihlášeného na source IP
# (pokud funkce zapnuta, většinou uživatelské jméno z například ldap logů)
"UserIdentity": {"type": str, "order": 14}
}
def xlat_ads_field(key, val):
type_ = ads_fieldnames[key]["type"]
sval = ""
if val is not None:
sval = val.strip()
return type_(sval)
ads_types = {
"ANOMALY": ["Anomaly.Behaviour"],
"BLACKLIST": ["Other"], # FIXME - will need to be set based on other data?
"BPATTERNS": ["Attempt.Exploit"], # FIXME - will need to be set based on other data?
"DNSANOMALY": ["information.UnauthorizedAccess"],
"DNSQUERY": ["Anomaly.Traffic"],
"DOS": ["Availability.DoS"],
"GEODIST": ["Anomaly.Behaviour"],
"HIGHTRANSF": ["Anomaly.Traffic"],
"HONEYPOT": ["Intrusion.UserCompromise"],
"HTTPDICT": ["Attempt.Login"],
"ICMPANOM": ["Recon.Scanning", "Anomaly.Protocol"],
"L3ANOMALY": ["Recon.Sniffing"],
"MULTICAST": ["Anomaly.Traffic"],
"RDPDICT": ["Attempt.Login"],
"REFLECTDOS": ["Availability.DoS"], # FIXME - will need to add Source.Type: Backscatter
"SCANS": ["Recon.Scanning"],
"SIPFLOOD": ["Availability.DoS"],
"SIPPROXY": ["Information.UnauthorizedAccess"],
"SIPSCAN": ["Recon.Scanning"],
"SMTPANOMALY": ["Fraud.UnauthorizedUsage", "Anomaly.Traffic"], # FIXME - will need to be set based on other data?
"SRVNA": ["Availability.Outage"],
"SSHDICT": ["Attempt.Login"],
"TELNET": ["Anomaly.Traffic"],
# FIXME - what to do with the following?
"BITTORRENT": ["Anomaly.Traffic"],
"UPLOAD": ["Anomaly.Traffic"],
"IPV6TUNNEL": ["Anomaly.Traffic"],
"TOR": ["Anomaly.Traffic"],
"INSTMSG": ["Anomaly.Traffic"],
"WEBSHARE": ["Anomaly.Traffic"],
"TEAMVIEWER": ["Anomaly.Traffic"],
"DIVCOM": ["Other"],
"COUNTRY": ["Other"]
}
def xlat_ads_type(s):
if s not in ads_types.keys():
return []
return ads_types[s][:]
def xlat_ads_proto(s):
try:
pnum = int(s)
proto = "transport%s" % pnum
except ValueError:
# FIXME, will probably also need translation table
proto = s.lower()
return proto
def gen_idea_from_ads(ads, orig_data, anonymised_target, add_test):
# Mandatory
ts = ads["Timestamp"] or time.localtime()
event = {
"Format": "IDEA0",
"ID": str(uuid4()),
"Category": xlat_ads_type(ads.get("Type")),
"DetectTime": format_time(*ts[0:6]),
"CreateTime": format_time(*time.localtime()[0:6])
}
if add_test:
event["Category"].append("Test")
# Optional
if ads["ID"]:
event["AltNames"] = ["ADS-%i" % ads["ID"]]
if ads["FirstFlow"]:
event["EventTime"] = format_time(*ads["FirstFlow"][0:6])
if ads["TypeDesc"]:
event["Description"] = ads["TypeDesc"]
if ads["Detail"]:
event["Note"] = ads["Detail"]
# Source related parts
source = {}
if ads["Source"]:
srcip = ads["Source"]
key = "IP6"
if not ':' in srcip:
key = "IP4"
source[key] = [srcip]
if ads["CapturedSource"]:
source["Hostname"] = [ads["CapturedSource"]]
# Target related parts
target = {}
if ads["Ports"]:
target["Port"] = ads["Ports"] # FIXME are the ports related with Target, Source or does it depend on attack type?
if ads["Protocol"]:
target["Proto"] = [xlat_ads_proto(p) for p in ads["Protocol"]]
if anonymised_target != "NONE":
tgtips = [anonymised_target]
else:
tgtips = ads["Targets"]
for tgtip in tgtips:
if not ':' in tgtip:
key = "IP4"
else:
key = "IP6"
target.setdefault(key, []).append(tgtip)
if orig_data:
event["Attach"] = [{
"Content": "\t".join(orig_data),
"Type": ["OrigData"],
"ContentType": "text/tab-separated-values"
}]
# Insert subnodes into event
if source:
event["Source"] = [source]
if target:
event["Target"] = [target]
# *** Modifications for specific alert types ***
if ads["Type"] == "DOS":
# Extract additional info from Note
match = re.search("service:\s*([^,)]*)", event.get("Note",""))
if match and match.group(1) != 'not specified':
source["Proto"] = match.group(1)
match = re.search("attackers:\s*(\d+)", event.get("Note",""))
if match:
# Note: Count field is not standardized, but it is sometimes used to
# tell the total number of sources when not all of them are listed.
target["Count"] = int(match.group(1))
# Swap Source and Target for DOS events
if source and target:
event["Source"] = [target]
event["Target"] = [source]
return event
def main():
opts = get_opts()
try:
errlog = open(opts["errlog"], "w")
sys.stderr = errlog
except IOError:
print("Warning: error log %s unavailable (wrong directory or permissions?)" % opts["errlog"])
out = SafeDir(opts["out"])
ads_fields = [it[0] for it in sorted(ads_fieldnames.items(), key=lambda it: it[1]["order"])]
for row in csv.reader(sys.stdin, dialect="excel-tab"):
if not row:
continue
tr_row = {}
for k, val in zip(ads_fields, row):
tr_row[k] = xlat_ads_field(k, val)
if not opts["origdata"]:
row = None
event = gen_idea_from_ads(tr_row, row, opts["target"], opts["test"])
nf = out.newfile()
try:
data = json.dumps(event)
nf.f.write(data.encode("utf-8"))
except Exception as e:
sys.stderr.write("Error: %s\n" % str(e))
sys.stderr.write("Error source line: %s\n" % row)
sys.stderr.write("Error event data: %s\n" % str(event))
nf.f.close()
nf.moveto(out.incoming)
if __name__ == "__main__":
main()