diff --git a/flowmon-ads/warden3_flowmon_ads_filer.py b/flowmon-ads/warden3_flowmon_ads_filer.py index d82d697eeef99488f0e200fe127004c1b242cd0e..94adf3e99ad0d22f198661b79bf4cf3657bfa33a 100755 --- a/flowmon-ads/warden3_flowmon_ads_filer.py +++ b/flowmon-ads/warden3_flowmon_ads_filer.py @@ -25,6 +25,7 @@ opt_dict = { "help": False, "test": False, "origdata": False, + "noorigdata": False, "errlog": "/data/warden/var/flowmon-ads-filer_lastrun.log", "out": "/data/warden/var/feeds-out", "target": "NONE" @@ -52,82 +53,69 @@ def get_opts(): return opts -# Conversion/validation routines -def isotime(t): - if not t: - return None +# Conversion routines +def iso_time(t): return strptime(t, "%Y-%m-%d %H:%M:%S") -def intlist(il): +def int_list(il): if not il: return [] return [int(i.strip()) for i in il.split(",")] -def strlist(sl): - if not sl: - return [] - return [str(s) for s in sl.split(",")] +def str_list(sl): + return [s.strip() for s in sl.split(",")] -def ip(s): - if not s: - return None - return s.strip() +def sum_int(s): + return int(sum(float(val) for val in s)) -def iplist(sl): - if not sl: - return [] - return [ip(s) for s in sl.split(",")] - - -ads_fieldnames = { - # unikátnĂ id v ADS db - "ID": {"type": int, "order": 0}, - # ÄŤas vygenerovánĂ události - "Timestamp": {"type": isotime, "order": 1}, - # ÄŤas prvnĂho toku, na základÄ› kterĂ©ho se událost detekovala - "FirstFlow": {"type": isotime, "order": 2}, - # typ události, napĹ™Ăklad SRVNA, SCANS, .. - "Type": {"type": str, "order": 3}, - # popis typu události, napĹ™Ăklad "Service not available", "Port scanning", .. - "TypeDesc": {"type": str, "order": 4}, - # název perspektivy, podle kterĂ© se reportuje. Perspektiva ohodnocuje události prioritou CRITICAL, HIGH, .. - "Perspective": {"type": str, "order": 5}, - # priorita podle perspektivy - "Severity": {"type": str, "order": 6}, - # textovĂ˝ detail události, napĹ™Ăklad: "Known attackers, - # attempts: 13, uploaded: 12.54 KiB, downloaded: 25.67 KiB, - # frequently used port(s): 22, 37257, 37304, 48856, 36616." - "Detail": {"type": str, "order": 7}, - # seznam portĹŻ (pokud byly identifikovány) - "Ports": {"type": intlist, "order": 8}, - # IP protokol (pokud lze idenfitikovat) - "Protocol": {"type": strlist, "order": 9}, - # IP adresa zpĹŻsobujĂcĂ událost - "Source": {"type": ip, "order": 10}, - # domĂ©novĂ© jmĂ©no Source v dobÄ› vygenerovánĂ události (pokud funkce zapnuta) - "CapturedSource": {"type": str, "order": 11}, - # seznam cĂlovĂ˝ch IP adres, napĹ™Ăklad u skenovánĂ jsou zde skenovanĂ© IP adresy - "Targets": {"type": iplist, "order": 12}, - # název ADS zdroje, na kterĂ©m se událost detekovala - "NetFlowSource": {"type": str, "order": 13}, - # identifikátor uĹľivatele pĹ™ihlášenĂ©ho na source IP - # (pokud funkce zapnuta, vÄ›tšinou uĹľivatelskĂ© jmĂ©no z napĹ™Ăklad ldap logĹŻ) - "UserIdentity": {"type": str, "order": 14} -} +def port_split(p): + return [int(val.strip()) for val in p.split(",")] +def one_int_list(p): + return [int(p)] + +def get_proto(s): + try: + pnum = int(s) + proto = "transport%s" % pnum + except ValueError: + proto = s.strip().lower() + if proto == "not": + return None + return proto -def xlat_ads_field(key, val): - type_ = ads_fieldnames[key]["type"] - sval = "" - if val is not None: - sval = val.strip() - return type_(sval) +def one_proto_list(s): + if s: + return [s] + return None + +def proto_list(pl): + return filter(None, [get_proto(val) for val in pl.split(",")]) + + +ads_fields = ( + ('ID', int), # Unique id within ADS db + ('Timestamp', iso_time), # Timestamp of event generation + ('FirstFlow', iso_time), # Timestamp of the first Flow on which was based the event detection + ('Type', str), # Type of event, in fact a reference to the detection method, which recognized the event + ('TypeDesc', str), # Event type description + ('Perspective', str), # Perspective name + ('Severity', str), # Priority based on perspective + ('Detail', str), # Detailed information on the event + ('Ports', int_list), # List of ports (if identified) + ('Protocol', proto_list), # IP protocol (if identified) + ('Source', str_list), # Event originator (IP address) + ('CapturedSource', str), # DNS name assigned to the IP address at the time of event detection + ('Targets', str_list), # Event targets (a list of IP addresses) + ('NetFlowSource', str), # Flow data source on which the event has been generated + ('UserIdentity', str) # User ID from domain controller +) ads_types = { "ANOMALY": ["Anomaly.Behaviour"], - "BLACKLIST": ["Other"], # FIXME - will need to be set based on other data? - "BPATTERNS": ["Attempt.Exploit"], # FIXME - will need to be set based on other data? + "BLACKLIST": ["Other"], + "BPATTERNS": ["Attempt.Exploit"], "DNSANOMALY": ["Information.UnauthorizedAccess"], "DNSQUERY": ["Anomaly.Traffic"], "DOS": ["Availability.DoS"], @@ -139,17 +127,15 @@ ads_types = { "L3ANOMALY": ["Recon.Sniffing"], "MULTICAST": ["Anomaly.Traffic"], "RDPDICT": ["Attempt.Login"], - "REFLECTDOS": ["Availability.DoS"], # FIXME - will need to add Source.Type: Backscatter + "REFLECTDOS": ["Availability.DoS"], "SCANS": ["Recon.Scanning"], "SIPFLOOD": ["Availability.DoS"], "SIPPROXY": ["Information.UnauthorizedAccess"], "SIPSCAN": ["Recon.Scanning"], - "SMTPANOMALY": ["Fraud.UnauthorizedUsage", "Anomaly.Traffic"], # FIXME - will need to be set based on other data? + "SMTPANOMALY": ["Abusive.Spam", "Anomaly.Traffic"], "SRVNA": ["Availability.Outage"], "SSHDICT": ["Attempt.Login"], - "TELNET": ["Anomaly.Traffic"], - - # FIXME - what to do with the following? + "TELNET": ["Attempt.Login"], "BITTORRENT": ["Anomaly.Traffic"], "UPLOAD": ["Anomaly.Traffic"], "IPV6TUNNEL": ["Anomaly.Traffic"], @@ -161,82 +147,224 @@ ads_types = { "COUNTRY": ["Other"] } +_SPC = r"\s*" +_SPCM = r"\s+" +INTEGER = r"(\d+)" +FLOAT = r"(\d+(?:\.\d+))" +WORD = UNITS = r"(\w+)" +PORTLIST = r"(\d+(?:,\s*\d+)*)" + +detail_regexps = { + "conn_count": ( + (r"attempts:" + _SPC + INTEGER, int), + (INTEGER + _SPCM + r"times", int), + (r"connections:" + _SPC + INTEGER, int), + (r"Mail count:" + _SPC + INTEGER, int), + (r"with response:" + _SPC + INTEGER, r"without response" + _SPC + INTEGER, sum_int) + ), + "byte_count": ( + (r"uploaded:" + _SPC + FLOAT + _SPC + UNITS, r"downloaded:" + _SPC + FLOAT + _SPC + UNITS, sum_int), + (r"total upload:" + _SPC + FLOAT + _SPC + UNITS, r"total download:" + _SPC + FLOAT + _SPC + UNITS, sum_int), + (r"data sent:" + _SPC + FLOAT + _SPC + UNITS, r"data received:" + _SPC + FLOAT + _SPC + UNITS, sum_int), + (FLOAT + _SPC + UNITS + _SPCM + r"file", int) + ), + "target_ports": ( + (r"port\(s\):" + _SPC + PORTLIST, int_list), + (r"\[(?:TCP|UDP)/(\d+)]", one_int_list) + ), + "base_proto": ( + (r"(TCP|UDP)", get_proto), + (r"protocol:" + _SPC + WORD, get_proto) + ), + "proto": ( + (r"service:" + _SPC + WORD, one_proto_list), + ), + "target_count": ( + (r"attackers:" + _SPC + INTEGER, int), + (INTEGER + _SPCM + r"targets", int), + (r"targets:" + _SPC + INTEGER, int), + (r"scanned IPs:" + _SPC + INTEGER, int) + ), + "botnet": ( + (r"(botnet)", bool), + (r"(command" + _SPC + "&" + _SPC + "&" + "control)", bool) + ), + "omit": ( + (r"(End of attack)", bool), + ) +} -def xlat_ads_type(s): - if s not in ads_types.keys(): - return [] - return ads_types[s][:] - +unit_translate = { + 'b': 1, + 'kib': 1000, + 'mib': 1000**2, + 'gib': 1000**3, + 'kb': 1024, + 'mb': 1024**2, + 'gb': 1024**3 +} -def xlat_ads_proto(s): - try: - pnum = int(s) - proto = "transport%s" % pnum - except ValueError: - # FIXME, will probably also need translation table - proto = s.lower() - return proto +def parse_detail(e): + detail = e["Detail"] + for key, matchers in detail_regexps.items(): + results = [] + for matcher in matchers: + shaper = matcher[-1] # last field is typecasting callable + reg_res = [] + for r in matcher[:-1]: + match = re.search(r, detail, re.IGNORECASE) + if not match: + continue + groups = match.groups() + if not groups: + continue # dont'care for empty matches + elif len(groups) == 1: + reg_res.append(groups[0]) + else: + val, units = groups # two valued regexps are numbers with units, like 1.2 KiB + unit_val = unit_translate.get(units.lower(), 1) + reg_res.append(float(val) * unit_val) + if reg_res: + if len(reg_res) == 1 and len(matcher)==2: # Singlevalued definition + reg_res = reg_res[0] + try: + res = shaper(reg_res) + except Exception as e: + sys.stderr.write('Error parsing "%s" on detail "%s": %s\n' % (reg_res, detail, e)) + else: + results.append(res) + uniq_results = [] # We cannot use sets for uniq, as result may be unhashable + for val in results: + if val and val not in uniq_results: + uniq_results.append(val) + if len(uniq_results) > 1: + sys.stderr.write('Warning: multiple regexp rules matched differently for "%s" on detail "%s"\n' % (key, detail)) + if uniq_results: + e[key] = uniq_results[0] + + +def idea_ip_key(ip): + if not ':' in ip: + return "IP4" + else: + return "IP6" def gen_idea_from_ads(ads, orig_data, anonymised_target, add_test): - # Mandatory - ts = ads["Timestamp"] or time.localtime() + lts = time.localtime() + ts = ads.get("Timestamp") or lts + ets = ads.get("FirstFlow", 0) + if ets > ts: # ADS sometimes reports FirstFlow greater than DetectTime + ts = ets + atype = ads.get("Type") + cat = ads_types.get(atype, ("Other",))[:] + + ads["Ports"] = list(set(ads.get("Ports", [])) | set(ads.get("target_ports", []))) + ads["Ports"].sort() + + ads.setdefault("Protocol", []).extend(ads.get("proto", [])) + if ads.get("base_proto", None): + ads["Protocol"].insert(0, ads["base_proto"]) + + ads["target_hostname"] = None + + # Some event types are reported with reversed source/target + # Also add some protocols guessed from port and how ADS works according to docs + if atype in ("DOS", "SIPFLOOD", "HTTPDICT"): + ads["Source"], ads["Targets"] = ads["Targets"], ads["Source"] + ads["CapturedSource"], ads["target_hostname"] = None, ads["CapturedSource"] + if atype == "HTTPDICT": + # A guess, sure + if 80 in ads["Ports"]: + ads["Protocol"].extend(("tcp", "http")) + if 443 in ads["Ports"]: + ads["Protocol"].extend(("tcp", "https")) + elif atype == "RDPDICT": + ads["Protocol"].extend(("tcp", "rdp")) + elif atype == "SIPSCAN": + ads["Protocol"].append("sip") # ADS does not say TCP nor UDP and doc is rather terse + elif atype == "SMTPANOMALY": + if 587 in ads["Ports"]: + ads["Protocol"].extend(("tcp", "submission")) + if 465 in ads["Ports"]: + ads["Protocol"].extend(("tcp", "ssmtp")) + if 25 in ads["Ports"] or ("submission" not in ads["Protocol"] and "ssmtp" not in ads["Protocol"]): + ads["Protocol"].extend(("tcp", "smtp")) + elif atype == "SSHDICT": + ads["Protocol"].extend(("tcp", "ssh")) + elif atype == "TELNET": + ads["Protocol"].extend(("tcp", "telnet")) + + # Uniquify, but retain ordering + proto = [] + for p in ads["Protocol"]: + if p not in proto: + proto.append(p) + ads["Protocol"] = proto + + # More specific category for BLACKLIST + if "botnet" in ads: + cat = ["Intrusion.Botnet"] + + if add_test: + cat.append("Test") + event = { "Format": "IDEA0", "ID": str(uuid4()), - "Category": xlat_ads_type(ads.get("Type")), + "Category": cat, "DetectTime": format_time(*ts[0:6]), - "CreateTime": format_time(*time.localtime()[0:6]) + "CreateTime": format_time(*lts[0:6]) } - if add_test: - event["Category"].append("Test") - - # Optional - if ads["ID"]: + if ads.get("ID"): event["AltNames"] = ["ADS-%i" % ads["ID"]] - - if ads["FirstFlow"]: - event["EventTime"] = format_time(*ads["FirstFlow"][0:6]) - - if ads["TypeDesc"]: + if ets: + event["EventTime"] = format_time(*ets[0:6]) + if ads.get("TypeDesc"): event["Description"] = ads["TypeDesc"] - - if ads["Detail"]: + if ads.get("Detail"): event["Note"] = ads["Detail"] - # Source related parts - source = {} - if ads["Source"]: - srcip = ads["Source"] - key = "IP6" - if not ':' in srcip: - key = "IP4" - source[key] = [srcip] + if "conn_count" in ads: + event["ConnCount"] = ads["conn_count"] + if "byte_count" in ads: + event["ByteCount"] = ads["byte_count"] + # Source + source = {} + for srcip in ads["Source"]: + source.setdefault(idea_ip_key(srcip), []).append(srcip) if ads["CapturedSource"]: source["Hostname"] = [ads["CapturedSource"]] + if source: + source["Proto"] = ads["Protocol"] + if "botnet" in ads: + source["Type"] = ["Botnet"] + elif atype == "REFLECTDOS": + source["Type"] = ["Backscatter"] - # Target related parts + # Target target = {} if ads["Ports"]: - target["Port"] = ads["Ports"] # FIXME are the ports related with Target, Source or does it depend on attack type? - - if ads["Protocol"]: - target["Proto"] = [xlat_ads_proto(p) for p in ads["Protocol"]] + target["Port"] = ads["Ports"] if anonymised_target != "NONE": tgtips = [anonymised_target] + target["Type"] = ["Anonymised"] else: tgtips = ads["Targets"] - for tgtip in tgtips: - if not ':' in tgtip: - key = "IP4" - else: - key = "IP6" - target.setdefault(key, []).append(tgtip) + target.setdefault(idea_ip_key(tgtip), []).append(tgtip) + + if ads["target_hostname"]: + target["Hostname"] = [ads["target_hostname"]] + + if target: + target["Proto"] = ads["Protocol"] + if "botnet" in ads: + target.setdefault("Type", []).append("CC") if orig_data: event["Attach"] = [{ @@ -248,26 +376,12 @@ def gen_idea_from_ads(ads, orig_data, anonymised_target, add_test): # Insert subnodes into event if source: event["Source"] = [source] - if target: - event["Target"] = [target] - - # *** Modifications for specific alert types *** - - if ads["Type"] == "DOS": - # Extract additional info from Note - match = re.search("service:\s*([^,)]*)", event.get("Note","")) - if match and match.group(1) != 'not specified': - source["Proto"] = match.group(1) - match = re.search("attackers:\s*(\d+)", event.get("Note","")) - if match: - # Note: Count field is not standardized, but it is sometimes used to - # tell the total number of sources when not all of them are listed. - target["Count"] = int(match.group(1)) - # Swap Source and Target for DOS events - if source and target: - event["Source"] = [target] - event["Target"] = [source] + if "botnet" in ads: + # Both bot and CC are 'wrongdoers' from IDEA's point of view + event["Source"].append(target) + else: + event["Target"] = [target] return event @@ -281,24 +395,29 @@ def main(): print("Warning: error log %s unavailable (wrong directory or permissions?)" % opts["errlog"]) out = SafeDir(opts["out"]) - ads_fields = [it[0] for it in sorted(ads_fieldnames.items(), key=lambda it: it[1]["order"])] for row in csv.reader(sys.stdin, dialect="excel-tab"): if not row: continue tr_row = {} - for k, val in zip(ads_fields, row): - tr_row[k] = xlat_ads_field(k, val) - if not opts["origdata"]: + for (k, type_), val in zip(ads_fields, row): + try: + tr_row[k] = type_(val.strip()) + except Exception as e: + sys.stderr.write('Error parsing "%s" in event "%s": %s\n' % (val, row, e)) + if opts["noorigdata"] and not opts["origdata"]: row = None + parse_detail(tr_row) + if tr_row.get("omit"): + # Ignore "End of attack" events as they summarise previous ones + # and we would get duplicate counts. + continue event = gen_idea_from_ads(tr_row, row, opts["target"], opts["test"]) nf = out.newfile() try: data = json.dumps(event) nf.f.write(data.encode("utf-8")) except Exception as e: - sys.stderr.write("Error: %s\n" % str(e)) - sys.stderr.write("Error source line: %s\n" % row) - sys.stderr.write("Error event data: %s\n" % str(event)) + sys.stderr.write('Error writing file "%s", event "%s": %s\n' % (nf.get_path(), str(event), e)) nf.f.close() nf.moveto(out.incoming)