-
Václav Bartoš authoredVáclav Bartoš authored
haas2warden.py 4.90 KiB
#!/usr/bin/env python3
from gzip import decompress
from json import loads
from datetime import datetime, timedelta
import argparse
import logging
import uuid
import json
import os
import requests
data_date = datetime.date(datetime.utcnow()) - timedelta(days=1)
LOGFORMAT = "%(asctime)-15s,%(name)s [%(levelname)s] %(message)s"
LOGDATEFORMAT = "%Y-%m-%dT%H:%M:%S"
logging.basicConfig(level=logging.INFO, format=LOGFORMAT, datefmt=LOGDATEFORMAT)
logger = logging.getLogger('haas2warden')
def createIDEAFile(idea_id, idea_msg):
"""
Creates file for IDEA message in .../tmp folder, then move it to .../incoming folder
"""
tmp_dir_path = os.path.join(args.path, "tmp")
idea_file_path = os.path.join(tmp_dir_path, idea_id+".idea")
os.makedirs(tmp_dir_path, exist_ok=True)
idea_file = open(idea_file_path, "w")
idea_file.write(idea_msg)
idea_file.close()
incoming_dir_path = os.path.join(args.path, "incoming")
incoming_file_path = os.path.join(incoming_dir_path,idea_id+".idea")
os.makedirs(incoming_dir_path, exist_ok=True)
os.rename(idea_file_path,incoming_file_path)
def createIDEA(time, time_closed, ip, login_successful, commands):
"""
Creates IDEA message
"""
idea_id = str(uuid.uuid4())
if login_successful:
category = "[\"Intrusion.UserCompromise\"]"
description = "SSH login on honeypot (HaaS)"
if args.test:
category = "[\"Intrusion.UserCompromise\", \"Test\"]"
attach = f''',
"Attach": [
{{
"Note": "commands",
"Type": ["ShellCode"],
"ContentType": "application/json",
"Content": {json.dumps(commands)}
}}
]''' # ^-- "commands" is already serialiezed into a json string, we want to include it into a bigger JSON so we must encode it again (to escape quotes and any other special charaters)
else:
category = "[\"Attempt.Login\"]"
description = "Unsuccessful SSH login attempt on honeypot (HaaS)"
if args.test:
category = "[\"Attempt.Login\", \"Test\"]"
attach = ""
if time_closed: # sometimes time_closed is empty, in such case we must omit CeaseTime completely from IDEA msg
cease_time = f'"CeaseTime": "{time_closed}",'
else:
cease_time = ""
idea_msg = f"""\
{{
"Format": "IDEA0",
"ID": "{idea_id}",
"Category": {category},
"Description": "{description}",
"Note": "Extracted from data of CZ.NIC HaaS project",
"DetectTime": "{time}",
"EventTime": "{time}",
{cease_time}
"CreateTime": "{datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')}",
"Source": [
{{
"IP4": ["{ip}"],
"Proto": ["tcp", "ssh"]
}}
],
"Node": [
{{
"Name": "{args.name}",
"SW": ["CZ.NIC HaaS"],
"Type": ["Connection", "Auth", "Honeypot"],
"Note": "A script converting daily HaaS data dumps from https://haas.nic.cz/stats/export/"
}}
]{attach}
}}
"""
createIDEAFile(idea_id, idea_msg)
def processJSON():
"""
Downloads data from https://haas.nic.cz/stats/export/ and process json files.
"""
date = datetime.strptime(args.date, '%Y-%m-%d').date()
# get url
url = "https://haas.nic.cz/stats/export/{}/{}/{}.json.gz".format(str(date).split('-')[0],str(date).split('-')[1], str(date))
# get data
logger.info("Downloading {}".format(url))
response = requests.get(url)
if response.status_code == 200:
# unzip and read json file
json_objects = loads(decompress(response.content))
logger.info("Found {} records, converting to IDEA messages".format(len(json_objects)))
# go through all json objects
for json_object in json_objects:
createIDEA(json_object["time"], json_object["time_closed"], json_object["ip"], json_object["login_successful"], json.dumps(json_object["commands"]))
if __name__ == "__main__":
# parse arguments
parser = argparse.ArgumentParser(
prog="haas_receiver.py",
description="A script converting daily HaaS data dumps from https://haas.nic.cz/stats/export/"
)
parser.add_argument('-d', '--date', metavar='DATE', default = str(data_date),
help='To download data from date YYYY-MM-DD, use date + 1 day (default: utcnow - 1 day)')
parser.add_argument('-p', '--path', metavar='DIRPATH', default = "/data/haas2warden/warden_filer/",
help='Target folder for Idea messages (default: "/data/haas2warden/warden_filer/")')
parser.add_argument('-n', '--name', metavar='NODENAME', default = "undefined",
help='Name of the node (default: undefined)')
parser.add_argument('-t', '--test', action="store_true",
help='Test category')
args = parser.parse_args()
processJSON()
logger.info("Done")