Skip to content
Snippets Groups Projects
haas2warden.py 4.90 KiB
#!/usr/bin/env python3
from gzip import decompress
from json import loads
from datetime import datetime, timedelta
import argparse
import logging
import uuid
import json
import os
import requests


data_date = datetime.date(datetime.utcnow()) - timedelta(days=1)

LOGFORMAT = "%(asctime)-15s,%(name)s [%(levelname)s] %(message)s"
LOGDATEFORMAT = "%Y-%m-%dT%H:%M:%S"
logging.basicConfig(level=logging.INFO, format=LOGFORMAT, datefmt=LOGDATEFORMAT)

logger = logging.getLogger('haas2warden')

def createIDEAFile(idea_id, idea_msg):
    """
    Creates file for IDEA message in .../tmp folder, then move it to .../incoming folder
    """
    tmp_dir_path = os.path.join(args.path, "tmp")
    idea_file_path = os.path.join(tmp_dir_path, idea_id+".idea")
    os.makedirs(tmp_dir_path, exist_ok=True)
    idea_file = open(idea_file_path, "w")
    idea_file.write(idea_msg)
    idea_file.close()

    incoming_dir_path = os.path.join(args.path, "incoming")
    incoming_file_path = os.path.join(incoming_dir_path,idea_id+".idea")
    os.makedirs(incoming_dir_path, exist_ok=True) 
    os.rename(idea_file_path,incoming_file_path)


def createIDEA(time, time_closed, ip, login_successful, commands):
    """
    Creates IDEA message 
    """ 
    idea_id = str(uuid.uuid4())

    if login_successful:
        category = "[\"Intrusion.UserCompromise\"]" 
        description = "SSH login on honeypot (HaaS)"
        if args.test:
            category = "[\"Intrusion.UserCompromise\", \"Test\"]"
        attach = f''',
   "Attach": [
        {{
            "Note": "commands",
            "Type": ["ShellCode"],
            "ContentType": "application/json",
            "Content": {json.dumps(commands)}
        }}
    ]''' #              ^-- "commands" is already serialiezed into a json string, we want to include it into a bigger JSON so we must encode it again (to escape quotes and any other special charaters)

    else:
        category = "[\"Attempt.Login\"]" 
        description = "Unsuccessful SSH login attempt on honeypot (HaaS)"
        if args.test:
            category = "[\"Attempt.Login\", \"Test\"]"
        attach = ""

    if time_closed: # sometimes time_closed is empty, in such case we must omit CeaseTime completely from IDEA msg
        cease_time = f'"CeaseTime": "{time_closed}",'
    else:
        cease_time = ""

    idea_msg = f"""\
{{
    "Format": "IDEA0",
    "ID": "{idea_id}",
    "Category": {category},
    "Description": "{description}",
    "Note": "Extracted from data of CZ.NIC HaaS project",
    "DetectTime": "{time}",
    "EventTime": "{time}",
    {cease_time}
    "CreateTime": "{datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')}",
    "Source": [
        {{
            "IP4": ["{ip}"],
            "Proto": ["tcp", "ssh"]
        }}
    ],
    "Node": [
        {{
            "Name": "{args.name}",
            "SW": ["CZ.NIC HaaS"],
            "Type": ["Connection", "Auth", "Honeypot"],
            "Note": "A script converting daily HaaS data dumps from https://haas.nic.cz/stats/export/"
        }}
    ]{attach}
}}
"""
    createIDEAFile(idea_id, idea_msg)

    
def processJSON():
    """
    Downloads data from https://haas.nic.cz/stats/export/ and process json files.
    """
    date = datetime.strptime(args.date, '%Y-%m-%d').date()
    # get url
    url = "https://haas.nic.cz/stats/export/{}/{}/{}.json.gz".format(str(date).split('-')[0],str(date).split('-')[1], str(date))
    # get data 
    logger.info("Downloading {}".format(url))
    response = requests.get(url)
    if response.status_code == 200:
        # unzip and read json file
        json_objects = loads(decompress(response.content))
        logger.info("Found {} records, converting to IDEA messages".format(len(json_objects)))
        # go through all json objects
        for json_object in json_objects:
            createIDEA(json_object["time"], json_object["time_closed"], json_object["ip"], json_object["login_successful"], json.dumps(json_object["commands"]))

if __name__ == "__main__":
    
    # parse arguments
    parser = argparse.ArgumentParser(
        prog="haas_receiver.py",
        description="A script converting daily HaaS data dumps from https://haas.nic.cz/stats/export/"
    )

    parser.add_argument('-d', '--date', metavar='DATE', default = str(data_date),
                        help='To download data from date YYYY-MM-DD, use date + 1 day (default: utcnow - 1 day)')
    parser.add_argument('-p', '--path', metavar='DIRPATH', default = "/data/haas2warden/warden_filer/",
                        help='Target folder for Idea messages (default: "/data/haas2warden/warden_filer/")')
    parser.add_argument('-n', '--name', metavar='NODENAME', default = "undefined",
                        help='Name of the node (default: undefined)')
    parser.add_argument('-t', '--test', action="store_true",
                        help='Test category')                                        

    args = parser.parse_args()
    
    processJSON()
    logger.info("Done")