Skip to content
Snippets Groups Projects
Commit ba5cd22f authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Flowmon ADS connector update. Connector now knows the specifics of several...

Flowmon ADS connector update. Connector now knows the specifics of several event types, parses various types of detail field, and completes/modifies resulting Idea accordingly.
parent b649b538
No related branches found
No related tags found
No related merge requests found
......@@ -25,6 +25,7 @@ opt_dict = {
"help": False,
"test": False,
"origdata": False,
"noorigdata": False,
"errlog": "/data/warden/var/flowmon-ads-filer_lastrun.log",
"out": "/data/warden/var/feeds-out",
"target": "NONE"
......@@ -52,82 +53,69 @@ def get_opts():
return opts
# Conversion/validation routines
def isotime(t):
if not t:
return None
# Conversion routines
def iso_time(t):
return strptime(t, "%Y-%m-%d %H:%M:%S")
def intlist(il):
def int_list(il):
if not il:
return []
return [int(i.strip()) for i in il.split(",")]
def strlist(sl):
if not sl:
return []
return [str(s) for s in sl.split(",")]
def str_list(sl):
return [s.strip() for s in sl.split(",")]
def ip(s):
if not s:
return None
return s.strip()
def sum_int(s):
return int(sum(float(val) for val in s))
def iplist(sl):
if not sl:
return []
return [ip(s) for s in sl.split(",")]
ads_fieldnames = {
# unikátní id v ADS db
"ID": {"type": int, "order": 0},
# čas vygenerování události
"Timestamp": {"type": isotime, "order": 1},
# čas prvního toku, na základě kterého se událost detekovala
"FirstFlow": {"type": isotime, "order": 2},
# typ události, například SRVNA, SCANS, ..
"Type": {"type": str, "order": 3},
# popis typu události, například "Service not available", "Port scanning", ..
"TypeDesc": {"type": str, "order": 4},
# název perspektivy, podle které se reportuje. Perspektiva ohodnocuje události prioritou CRITICAL, HIGH, ..
"Perspective": {"type": str, "order": 5},
# priorita podle perspektivy
"Severity": {"type": str, "order": 6},
# textový detail události, například: "Known attackers,
# attempts: 13, uploaded: 12.54 KiB, downloaded: 25.67 KiB,
# frequently used port(s): 22, 37257, 37304, 48856, 36616."
"Detail": {"type": str, "order": 7},
# seznam portů (pokud byly identifikovány)
"Ports": {"type": intlist, "order": 8},
# IP protokol (pokud lze idenfitikovat)
"Protocol": {"type": strlist, "order": 9},
# IP adresa způsobující událost
"Source": {"type": ip, "order": 10},
# doménové jméno Source v době vygenerování události (pokud funkce zapnuta)
"CapturedSource": {"type": str, "order": 11},
# seznam cílových IP adres, například u skenování jsou zde skenované IP adresy
"Targets": {"type": iplist, "order": 12},
# název ADS zdroje, na kterém se událost detekovala
"NetFlowSource": {"type": str, "order": 13},
# identifikátor uživatele přihlášeného na source IP
# (pokud funkce zapnuta, většinou uživatelské jméno z například ldap logů)
"UserIdentity": {"type": str, "order": 14}
}
def port_split(p):
return [int(val.strip()) for val in p.split(",")]
def one_int_list(p):
return [int(p)]
def get_proto(s):
try:
pnum = int(s)
proto = "transport%s" % pnum
except ValueError:
proto = s.strip().lower()
if proto == "not":
return None
return proto
def xlat_ads_field(key, val):
type_ = ads_fieldnames[key]["type"]
sval = ""
if val is not None:
sval = val.strip()
return type_(sval)
def one_proto_list(s):
if s:
return [s]
return None
def proto_list(pl):
return filter(None, [get_proto(val) for val in pl.split(",")])
ads_fields = (
('ID', int), # Unique id within ADS db
('Timestamp', iso_time), # Timestamp of event generation
('FirstFlow', iso_time), # Timestamp of the first Flow on which was based the event detection
('Type', str), # Type of event, in fact a reference to the detection method, which recognized the event
('TypeDesc', str), # Event type description
('Perspective', str), # Perspective name
('Severity', str), # Priority based on perspective
('Detail', str), # Detailed information on the event
('Ports', int_list), # List of ports (if identified)
('Protocol', proto_list), # IP protocol (if identified)
('Source', str_list), # Event originator (IP address)
('CapturedSource', str), # DNS name assigned to the IP address at the time of event detection
('Targets', str_list), # Event targets (a list of IP addresses)
('NetFlowSource', str), # Flow data source on which the event has been generated
('UserIdentity', str) # User ID from domain controller
)
ads_types = {
"ANOMALY": ["Anomaly.Behaviour"],
"BLACKLIST": ["Other"], # FIXME - will need to be set based on other data?
"BPATTERNS": ["Attempt.Exploit"], # FIXME - will need to be set based on other data?
"BLACKLIST": ["Other"],
"BPATTERNS": ["Attempt.Exploit"],
"DNSANOMALY": ["Information.UnauthorizedAccess"],
"DNSQUERY": ["Anomaly.Traffic"],
"DOS": ["Availability.DoS"],
......@@ -139,17 +127,15 @@ ads_types = {
"L3ANOMALY": ["Recon.Sniffing"],
"MULTICAST": ["Anomaly.Traffic"],
"RDPDICT": ["Attempt.Login"],
"REFLECTDOS": ["Availability.DoS"], # FIXME - will need to add Source.Type: Backscatter
"REFLECTDOS": ["Availability.DoS"],
"SCANS": ["Recon.Scanning"],
"SIPFLOOD": ["Availability.DoS"],
"SIPPROXY": ["Information.UnauthorizedAccess"],
"SIPSCAN": ["Recon.Scanning"],
"SMTPANOMALY": ["Fraud.UnauthorizedUsage", "Anomaly.Traffic"], # FIXME - will need to be set based on other data?
"SMTPANOMALY": ["Abusive.Spam", "Anomaly.Traffic"],
"SRVNA": ["Availability.Outage"],
"SSHDICT": ["Attempt.Login"],
"TELNET": ["Anomaly.Traffic"],
# FIXME - what to do with the following?
"TELNET": ["Attempt.Login"],
"BITTORRENT": ["Anomaly.Traffic"],
"UPLOAD": ["Anomaly.Traffic"],
"IPV6TUNNEL": ["Anomaly.Traffic"],
......@@ -161,82 +147,224 @@ ads_types = {
"COUNTRY": ["Other"]
}
_SPC = r"\s*"
_SPCM = r"\s+"
INTEGER = r"(\d+)"
FLOAT = r"(\d+(?:\.\d+))"
WORD = UNITS = r"(\w+)"
PORTLIST = r"(\d+(?:,\s*\d+)*)"
detail_regexps = {
"conn_count": (
(r"attempts:" + _SPC + INTEGER, int),
(INTEGER + _SPCM + r"times", int),
(r"connections:" + _SPC + INTEGER, int),
(r"Mail count:" + _SPC + INTEGER, int),
(r"with response:" + _SPC + INTEGER, r"without response" + _SPC + INTEGER, sum_int)
),
"byte_count": (
(r"uploaded:" + _SPC + FLOAT + _SPC + UNITS, r"downloaded:" + _SPC + FLOAT + _SPC + UNITS, sum_int),
(r"total upload:" + _SPC + FLOAT + _SPC + UNITS, r"total download:" + _SPC + FLOAT + _SPC + UNITS, sum_int),
(r"data sent:" + _SPC + FLOAT + _SPC + UNITS, r"data received:" + _SPC + FLOAT + _SPC + UNITS, sum_int),
(FLOAT + _SPC + UNITS + _SPCM + r"file", int)
),
"target_ports": (
(r"port\(s\):" + _SPC + PORTLIST, int_list),
(r"\[(?:TCP|UDP)/(\d+)]", one_int_list)
),
"base_proto": (
(r"(TCP|UDP)", get_proto),
(r"protocol:" + _SPC + WORD, get_proto)
),
"proto": (
(r"service:" + _SPC + WORD, one_proto_list),
),
"target_count": (
(r"attackers:" + _SPC + INTEGER, int),
(INTEGER + _SPCM + r"targets", int),
(r"targets:" + _SPC + INTEGER, int),
(r"scanned IPs:" + _SPC + INTEGER, int)
),
"botnet": (
(r"(botnet)", bool),
(r"(command" + _SPC + "&" + _SPC + "&" + "control)", bool)
),
"omit": (
(r"(End of attack)", bool),
)
}
def xlat_ads_type(s):
if s not in ads_types.keys():
return []
return ads_types[s][:]
unit_translate = {
'b': 1,
'kib': 1000,
'mib': 1000**2,
'gib': 1000**3,
'kb': 1024,
'mb': 1024**2,
'gb': 1024**3
}
def xlat_ads_proto(s):
try:
pnum = int(s)
proto = "transport%s" % pnum
except ValueError:
# FIXME, will probably also need translation table
proto = s.lower()
return proto
def parse_detail(e):
detail = e["Detail"]
for key, matchers in detail_regexps.items():
results = []
for matcher in matchers:
shaper = matcher[-1] # last field is typecasting callable
reg_res = []
for r in matcher[:-1]:
match = re.search(r, detail, re.IGNORECASE)
if not match:
continue
groups = match.groups()
if not groups:
continue # dont'care for empty matches
elif len(groups) == 1:
reg_res.append(groups[0])
else:
val, units = groups # two valued regexps are numbers with units, like 1.2 KiB
unit_val = unit_translate.get(units.lower(), 1)
reg_res.append(float(val) * unit_val)
if reg_res:
if len(reg_res) == 1 and len(matcher)==2: # Singlevalued definition
reg_res = reg_res[0]
try:
res = shaper(reg_res)
except Exception as e:
sys.stderr.write('Error parsing "%s" on detail "%s": %s\n' % (reg_res, detail, e))
else:
results.append(res)
uniq_results = [] # We cannot use sets for uniq, as result may be unhashable
for val in results:
if val and val not in uniq_results:
uniq_results.append(val)
if len(uniq_results) > 1:
sys.stderr.write('Warning: multiple regexp rules matched differently for "%s" on detail "%s"\n' % (key, detail))
if uniq_results:
e[key] = uniq_results[0]
def idea_ip_key(ip):
if not ':' in ip:
return "IP4"
else:
return "IP6"
def gen_idea_from_ads(ads, orig_data, anonymised_target, add_test):
# Mandatory
ts = ads["Timestamp"] or time.localtime()
lts = time.localtime()
ts = ads.get("Timestamp") or lts
ets = ads.get("FirstFlow", 0)
if ets > ts: # ADS sometimes reports FirstFlow greater than DetectTime
ts = ets
atype = ads.get("Type")
cat = ads_types.get(atype, ("Other",))[:]
ads["Ports"] = list(set(ads.get("Ports", [])) | set(ads.get("target_ports", [])))
ads["Ports"].sort()
ads.setdefault("Protocol", []).extend(ads.get("proto", []))
if ads.get("base_proto", None):
ads["Protocol"].insert(0, ads["base_proto"])
ads["target_hostname"] = None
# Some event types are reported with reversed source/target
# Also add some protocols guessed from port and how ADS works according to docs
if atype in ("DOS", "SIPFLOOD", "HTTPDICT"):
ads["Source"], ads["Targets"] = ads["Targets"], ads["Source"]
ads["CapturedSource"], ads["target_hostname"] = None, ads["CapturedSource"]
if atype == "HTTPDICT":
# A guess, sure
if 80 in ads["Ports"]:
ads["Protocol"].extend(("tcp", "http"))
if 443 in ads["Ports"]:
ads["Protocol"].extend(("tcp", "https"))
elif atype == "RDPDICT":
ads["Protocol"].extend(("tcp", "rdp"))
elif atype == "SIPSCAN":
ads["Protocol"].append("sip") # ADS does not say TCP nor UDP and doc is rather terse
elif atype == "SMTPANOMALY":
if 587 in ads["Ports"]:
ads["Protocol"].extend(("tcp", "submission"))
if 465 in ads["Ports"]:
ads["Protocol"].extend(("tcp", "ssmtp"))
if 25 in ads["Ports"] or ("submission" not in ads["Protocol"] and "ssmtp" not in ads["Protocol"]):
ads["Protocol"].extend(("tcp", "smtp"))
elif atype == "SSHDICT":
ads["Protocol"].extend(("tcp", "ssh"))
elif atype == "TELNET":
ads["Protocol"].extend(("tcp", "telnet"))
# Uniquify, but retain ordering
proto = []
for p in ads["Protocol"]:
if p not in proto:
proto.append(p)
ads["Protocol"] = proto
# More specific category for BLACKLIST
if "botnet" in ads:
cat = ["Intrusion.Botnet"]
if add_test:
cat.append("Test")
event = {
"Format": "IDEA0",
"ID": str(uuid4()),
"Category": xlat_ads_type(ads.get("Type")),
"Category": cat,
"DetectTime": format_time(*ts[0:6]),
"CreateTime": format_time(*time.localtime()[0:6])
"CreateTime": format_time(*lts[0:6])
}
if add_test:
event["Category"].append("Test")
# Optional
if ads["ID"]:
if ads.get("ID"):
event["AltNames"] = ["ADS-%i" % ads["ID"]]
if ads["FirstFlow"]:
event["EventTime"] = format_time(*ads["FirstFlow"][0:6])
if ads["TypeDesc"]:
if ets:
event["EventTime"] = format_time(*ets[0:6])
if ads.get("TypeDesc"):
event["Description"] = ads["TypeDesc"]
if ads["Detail"]:
if ads.get("Detail"):
event["Note"] = ads["Detail"]
# Source related parts
source = {}
if ads["Source"]:
srcip = ads["Source"]
key = "IP6"
if not ':' in srcip:
key = "IP4"
source[key] = [srcip]
if "conn_count" in ads:
event["ConnCount"] = ads["conn_count"]
if "byte_count" in ads:
event["ByteCount"] = ads["byte_count"]
# Source
source = {}
for srcip in ads["Source"]:
source.setdefault(idea_ip_key(srcip), []).append(srcip)
if ads["CapturedSource"]:
source["Hostname"] = [ads["CapturedSource"]]
if source:
source["Proto"] = ads["Protocol"]
if "botnet" in ads:
source["Type"] = ["Botnet"]
elif atype == "REFLECTDOS":
source["Type"] = ["Backscatter"]
# Target related parts
# Target
target = {}
if ads["Ports"]:
target["Port"] = ads["Ports"] # FIXME are the ports related with Target, Source or does it depend on attack type?
if ads["Protocol"]:
target["Proto"] = [xlat_ads_proto(p) for p in ads["Protocol"]]
target["Port"] = ads["Ports"]
if anonymised_target != "NONE":
tgtips = [anonymised_target]
target["Type"] = ["Anonymised"]
else:
tgtips = ads["Targets"]
for tgtip in tgtips:
if not ':' in tgtip:
key = "IP4"
else:
key = "IP6"
target.setdefault(key, []).append(tgtip)
target.setdefault(idea_ip_key(tgtip), []).append(tgtip)
if ads["target_hostname"]:
target["Hostname"] = [ads["target_hostname"]]
if target:
target["Proto"] = ads["Protocol"]
if "botnet" in ads:
target.setdefault("Type", []).append("CC")
if orig_data:
event["Attach"] = [{
......@@ -248,26 +376,12 @@ def gen_idea_from_ads(ads, orig_data, anonymised_target, add_test):
# Insert subnodes into event
if source:
event["Source"] = [source]
if target:
event["Target"] = [target]
# *** Modifications for specific alert types ***
if ads["Type"] == "DOS":
# Extract additional info from Note
match = re.search("service:\s*([^,)]*)", event.get("Note",""))
if match and match.group(1) != 'not specified':
source["Proto"] = match.group(1)
match = re.search("attackers:\s*(\d+)", event.get("Note",""))
if match:
# Note: Count field is not standardized, but it is sometimes used to
# tell the total number of sources when not all of them are listed.
target["Count"] = int(match.group(1))
# Swap Source and Target for DOS events
if source and target:
event["Source"] = [target]
event["Target"] = [source]
if "botnet" in ads:
# Both bot and CC are 'wrongdoers' from IDEA's point of view
event["Source"].append(target)
else:
event["Target"] = [target]
return event
......@@ -281,24 +395,29 @@ def main():
print("Warning: error log %s unavailable (wrong directory or permissions?)" % opts["errlog"])
out = SafeDir(opts["out"])
ads_fields = [it[0] for it in sorted(ads_fieldnames.items(), key=lambda it: it[1]["order"])]
for row in csv.reader(sys.stdin, dialect="excel-tab"):
if not row:
continue
tr_row = {}
for k, val in zip(ads_fields, row):
tr_row[k] = xlat_ads_field(k, val)
if not opts["origdata"]:
for (k, type_), val in zip(ads_fields, row):
try:
tr_row[k] = type_(val.strip())
except Exception as e:
sys.stderr.write('Error parsing "%s" in event "%s": %s\n' % (val, row, e))
if opts["noorigdata"] and not opts["origdata"]:
row = None
parse_detail(tr_row)
if tr_row.get("omit"):
# Ignore "End of attack" events as they summarise previous ones
# and we would get duplicate counts.
continue
event = gen_idea_from_ads(tr_row, row, opts["target"], opts["test"])
nf = out.newfile()
try:
data = json.dumps(event)
nf.f.write(data.encode("utf-8"))
except Exception as e:
sys.stderr.write("Error: %s\n" % str(e))
sys.stderr.write("Error source line: %s\n" % row)
sys.stderr.write("Error event data: %s\n" % str(event))
sys.stderr.write('Error writing file "%s", event "%s": %s\n' % (nf.get_path(), str(event), e))
nf.f.close()
nf.moveto(out.incoming)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment