Skip to content
Snippets Groups Projects
Commit 07e96211 authored by Pavel Eis's avatar Pavel Eis Committed by Pavel Kácha
Browse files

Suricata connector - remake of IDEA categories conversion, added list of...

Suricata connector - remake of IDEA categories conversion, added list of CVE's, which pairs CVE to certain suricata alert. Added logging option and some refractoring.
parent baa347b7
No related branches found
No related tags found
No related merge requests found
Source diff could not be displayed: it is too large. Options to address this: view the blob.
......@@ -10,8 +10,8 @@ import resource
import os.path as pth
import atexit
import time
from collections import OrderedDict
import logging
import logging.handlers
class FileWatcher(object):
......@@ -145,38 +145,48 @@ class Filer(object):
class IdeaGen(object):
idea_categories = {'Intrusion.Botnet': re.compile("A Network Trojan was detected, signature: ET CNC"),
'Malware.Trojan': re.compile("A Network Trojan was detected, signature: ET (?!CNC)"),
'Recon.Scanning': re.compile("signature: (ET|GPL) (SCAN|DNS Query|CURRENT_EVENTS DNS Query)"),
'Availability.DoS': re.compile("signature: ET DOS"),
'Abusive.Spam': re.compile("signature:.*?Spam(?!cop.net)"),
'Attempt.Exploit': re.compile("category: Web Application Attack")}
other_a_re = re.compile("signature: (GPL SNMP|GPL WEB_SERVER|ET DROP Dshield|ET WEB_CLIENT Hex Obfuscation|GPL TELNET)")
other_b_re = re.compile("Potential Corporate Privacy Violation, signature: ET (?!POLICY)")
idea_categories = {'Other': re.compile("(Misc Attack|Executable code|Protocol Command Decode)"),
'Attempt.Exploit': re.compile("(Attempted(?! login)|Unsuccessful User|Web Application Attack| RPC Query)"),
'Information.UnauthorizedAccess': re.compile("(?<!Attempted )Information Leak"),
'Availability.DoS': re.compile("(?<!Attempted )Denial of Service"),
'Intrusion.UserCompromise': re.compile("Successful User"),
'Intrusion.AdminCompromise': re.compile("Successful Admin"),
'Anomaly.System': re.compile("system call"),
'Malware': re.compile("Trojan"),
'Recon.Scanning': re.compile("Network Scan"),
'Anomaly.Traffic': re.compile("(?:Bad Traffic|non-standard|port)"),
'Abusive.Sexual': re.compile("lotion"),
'Attempt.Login': re.compile("login"),
'Anomaly.Application': re.compile("web application(?! Attack)")}
vulnerability_re = re.compile("vulnerability|vulnerable")
confidence_re = re.compile("(?i)(?:suspicious|possible|potential)")
confidence_likely_re = re.compile("(?i)most likely")
cve_list_file = open("CVE_list.txt")
def __init__(self, name, test):
self.name = name
self.test = test
def convert_category(self, category, signature):
if not (category and signature):
if not (category and signature) or "SURICATA" in signature:
return None
suricata_category = "category: " + category + ", signature: " + signature
if IdeaGen.other_a_re.search(suricata_category) or IdeaGen.other_b_re.search(suricata_category):
return "Other"
for category, pattern in IdeaGen.idea_categories.items():
if pattern.search(suricata_category):
return category
if IdeaGen.vulnerability_re.search(signature):
return "Vulnerable"
for idea_category, pattern in IdeaGen.idea_categories.items():
if pattern.search(category):
return idea_category
return None
def gen_event_idea(self, timestamp, category, src_ip, src_port, proto, dest_ip, dest_port, orig_data,
incident_desription):
incident_desription, signature_id, signature):
event = {
'Format': "IDEA0",
'ID': str(uuid4()),
'DetectTime': timestamp,
'Category': [category] + (["Test"] if self.test else []),
'Note': incident_desription
'Note': incident_desription,
}
source = {}
target = {}
......@@ -200,12 +210,26 @@ class IdeaGen(object):
attachment = {'Handle': "att1",
'Note': "original data",
'Content': orig_data}
event['Attach'] = attachment
event['Attach'] = [attachment]
event['Node'] = [{
'Name': self.name,
'Type': ["Connection", "Honeypot", "Recon"],
'SW': ["HP Tipping Point"],
'SW': ["Suricata"],
}]
# if signature_id in alert, check if it has CVE reference
for line in IdeaGen.cve_list_file:
if str(signature_id) in line:
event['Ref'] = "cve: " + line[24:]
break
# fill confidence according to probability of alert
if IdeaGen.confidence_re.search(signature):
if IdeaGen.confidence_likely_re.search(signature):
event['Confidence'] = 0.85
else:
event['Confidence'] = 0.5
return event
......@@ -248,7 +272,7 @@ def daemonize(
os.close(fd)
except Exception:
pass
# Redirect stdin, stdout, stderr to /dev/null
# Redirect stdin, stdout, stderr to /dev/nulla
devnull = os.open(os.devnull, os.O_RDWR)
for fd in range(3):
os.dup2(devnull, fd)
......@@ -330,6 +354,18 @@ def get_args():
dest="origdata",
action="store_true",
help="Store original report to IDEA message")
optp.add_option(
"-v", "--verbose",
dest="verbose",
action="store_true",
help="turn on debug logging")
optp.add_option(
"--log",
default="local7",
dest="log",
type="string",
action="store",
help="syslog facility or log file name (default:%default)")
return optp
......@@ -356,19 +392,13 @@ def reload_me(signum, frame):
def main():
global reload_flag
global running_flag
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG, filename='suricataDaemonLog.log', filemode='w')
optp = get_args()
# win
'''opts, args = optp.parse_args(["--origdata", "-d", "C:\\Users\\Aisik\\PycharmProjects\\SuricataToIdea\\IdeaLogTest.txt", "-n",
"cz.cesnet.server.suricata", "C:\\Users\\Aisik\\PycharmProjects\\SuricataToIdea\\"
"var\\log\\suricata\\suricataJsonLog.txt"])'''
# linux
opts, args = optp.parse_args(["--origdata", "-d", "/root/Dokumenty/PycharmProjects/SuricataToIdea/IdeaLogTest", "-n",
"cz.cesnet.server.suricata", "/root/Dokumenty/PycharmProjects/SuricataToIdea/"
"var/log/suricata/suricataJsonLog.txt"])
opts, args = optp.parse_args()
if not args or opts.name is None or opts.dir is None:
optp.print_help()
sys.exit()
if opts.oneshot:
signal.signal(signal.SIGINT, terminate_me)
signal.signal(signal.SIGTERM, terminate_me)
......@@ -386,29 +416,42 @@ def main():
files = [FileWatcher(arg) for arg in args]
filer = Filer(opts.dir)
idea_gen = IdeaGen(opts.name, opts.test)
log_format = "%(message)s"
logger = logging.getLogger()
if opts.oneshot:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s - " + log_format))
else:
if "/" in opts.log:
handler = logging.handlers.WatchedFileHandler(opts.log)
handler.setFormatter(logging.Formatter("%(asctime)s - " + log_format))
else:
handler = logging.handlers.SysLogHandler(address="/dev/log", facility=opts.log)
handler.setFormatter(logging.Formatter(log_format))
logger.addHandler(handler)
logger.setLevel(logging.DEBUG if opts.verbose else logging.INFO)
while running_flag:
for log_file in files:
while True:
try:
line = log_file.readline()
if line is None or not line.strip():
logging.info("no line")
break
log = json.loads(line)
logging.info("readline")
category = idea_gen.convert_category(category=log.get('alert').get('category'),
signature=log.get('alert').get('signature'))
if category and log.get('timestamp'):
idea_event = idea_gen.gen_event_idea(timestamp=log['timestamp'], category=category,
src_ip=log.get('src_ip'),
src_port=log.get('src_port'), proto=log.get('proto'),
dest_ip=log.get('dest_ip'),
dest_port=log.get('dest_port'),
orig_data=str(log) if opts.origdata else False,
incident_desription=log['alert']['signature'])
save_events(idea_event, filer)
except Exception as e:
logging.debug(e)
line = log_file.readline()
if line is None or not line.strip():
break
log = json.loads(line)
category = idea_gen.convert_category(category=log.get('alert').get('category'),
signature=log.get('alert').get('signature'))
if category and log.get('timestamp'):
idea_event = idea_gen.gen_event_idea(timestamp=log['timestamp'], category=category,
src_ip=log.get('src_ip'),
src_port=log.get('src_port'), proto=log.get('proto'),
dest_ip=log.get('dest_ip'),
dest_port=log.get('dest_port'),
orig_data=str(log) if opts.origdata else False,
incident_desription=log['alert']['signature'],
signature_id=log.get('alert')['signature_id'],
signature=log.get('alert')['signature'])
save_events(idea_event, filer)
if not running_flag:
break
if reload_flag:
......@@ -423,4 +466,4 @@ def main():
if __name__ == "__main__":
main()
\ No newline at end of file
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment