diff --git a/idea/stix.py b/idea/stix.py new file mode 100644 index 0000000000000000000000000000000000000000..29b197e0759927bc933bb8d2677024c14696cc67 --- /dev/null +++ b/idea/stix.py @@ -0,0 +1,512 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2018, CESNET, z. s. p. o. +# Use of this source is governed by an ISC license, see LICENSE file. + +from uuid import uuid4 +import re +from dateutil import parser as date_parser + + +class StixGenerator(object): + sighting_types = ["Virus", "Worm", "Trojan", "Spyware", "Rootkit", "Exploit", "Botnet", "DDoS", "Vulnerable", "DoS"] + + def __init__(self): + # objects in observed data object + self.obj_counter = 0 + # source references created form IDEA['Source'] + self.src_ref = {} + # destination references created from IDEA['Target'] + self.dst_ref = {} + + @staticmethod + def convert_timestamp(timestamp): + """ + converts any IDEA timestamp into required RFC 3339-formatted timestamp + :param timestamp: + :return: + """ + new_timestamp = date_parser.parse(timestamp).strftime("%Y-%m-%dT%H:%M:%S.%f") + return new_timestamp[0:-3] + "Z" + + @staticmethod + def external_references(refs): + """ + creates references for some STIX object from list of references + :param refs: list of references + :return: new references + """ + ext_references = [] + for record in refs: + if re.search("^url:", record): + ext_references.append({'url': record[4:]}) + else: + ext_references.append({'source_name': record.split(":")[0], + 'external_id': record.split(":")[1]}) + return ext_references + + def sighting_object(self, id_identity, id_observed_data, id_alert, detect_time, conn_count=None, event_time=None, + cease_time=None, create_time=None): + """ + Creates new sighting domain object + :param id_identity: id of indentity, which created message + :param id_observed_data: id of seen data + :param id_alert: id of alert object + :param detect_time: detect time of IDEA message + :param conn_count: number of connections + :param event_time: start of event + :param cease_time: end of event + :param create_time: create time of event + :return: sighting domain object + """ + sight_object = { + 'type': "sighting", + 'id': "sighting--" + str(uuid4()), + 'created_by_ref': id_identity, + 'created': self.convert_timestamp(create_time) if create_time else self.convert_timestamp(detect_time), + 'modified': self.convert_timestamp(create_time) if create_time else self.convert_timestamp(detect_time), + 'count': conn_count if conn_count else 1, + 'sighting_of_ref': id_alert, + 'observed_data_refs': [id_observed_data], + 'where_sighted_refs': [id_identity] + } + if event_time: + sight_object['first_seen'] = StixGenerator.convert_timestamp(event_time) + if cease_time: + sight_object['last_seen'] = StixGenerator.convert_timestamp(cease_time) + return sight_object + + def identity_object(self, node, detect_time, create_time=None): + """ + creates new identity object + :param node: one index of IDEA['Node'] + :param detect_time: detect time of event + :param create_time: create time of IDEA + :return: identity domain object + """ + identity = { + 'type': "identity", + 'id': "identity--" + str(uuid4()), + 'created': self.convert_timestamp(create_time) if create_time else self.convert_timestamp(detect_time), + 'modified': self.convert_timestamp(create_time) if create_time else self.convert_timestamp(detect_time), + 'name': node.get('Name', ["unknown"]), + 'identity_class': "organization" + } + try: + identity['description'] = ", ".join(node['SW']) + except KeyError: pass + try: + identity['labels'] = [", ".join(node['Type'])] + except KeyError: pass + return identity + + def ipvx_addr_object(self, addr_objects): + """ + Creates all IP objects for future mapping with network traffic objects + :param addr_objects: Source or Target dictionary from IDEA message + :return: gathered references to IP objects and newly created IP objects + """ + obs_objects = {} + network_values = {} + ipv4 = False + # go through all dictionaries {"port": xxx, "ip": [xxx] ... } + for ip_dict in addr_objects: + ip_references = {} + # put all IPs to addr_objects and increase counter for every new object + try: + obs_objects[str(self.obj_counter)] = {'type': "ipv4-addr", 'value': ", ".join(ip_dict["IP4"])} + self.obj_counter += 1 + ipv4 = True + except KeyError: pass + + try: + ip_references['Proto'] = ip_dict['Proto'].remove("IP") + except ValueError: + ip_references['Proto'] = ip_dict['Proto'] + except KeyError: pass + + try: + ip_references['Port'] = ip_dict['Port'] + except KeyError: pass + + # if ipv4 addr or atleast port, fill it into network values dict + if ipv4: + try: + ip_references['Proto'] = ["ipv4"] + ip_references.get('Proto') + except TypeError: + ip_references['Proto'] = ["ipv4"] + network_values[self.obj_counter - 1] = ip_references + + # check ipv6 + try: + obs_objects[str(self.obj_counter)] = {'type': "ipv6-addr", 'value': ", ".join(ip_dict["IP6"])} + try: + ip_references['Proto'] = ["ipv6"] + ip_references.get('Proto') + except TypeError: + ip_references['Proto'] = ["ipv6"] + network_values[self.obj_counter] = ip_references + self.obj_counter += 1 + except KeyError: + # if Source or Target does not contain IP address, Source or Target part cannot be converted. But if + # Source contains IP address and protocols, which are the same with Target's protocols and Target + # contains port without IP address, it can be converted to one network traffic object (and vise versa). + # So keep it for future possible use. + try: + if not ipv4 and ip_references.get('Proto') and len(ip_references.get('Port')) == 1 and \ + len(addr_objects) == 1: + network_values['X'] = ip_references + except TypeError: pass + ipv4 = False + + return network_values, obs_objects + + def one_network_traffic_object(self, src_ref=None, dst_ref=None, proto=None, src_port=None, dst_port=None): + """ + creates one new network traffic object based on references + :param src_ref: source object references + :param dst_ref: destination object references + :param proto: protocols + :param src_port: source ports + :param dst_port: destitation ports + :return: created netwrok traffic object + """ + if not proto and not (src_ref or dst_ref): + return {} + else: + network_traffic = { + 'type': "network-traffic" + } + if src_ref is not None or self.src_ref.get('X'): + if src_ref != 'X' and not self.src_ref.get('X'): + network_traffic['src_ref'] = str(src_ref) + + if src_port: + network_traffic['src_port'] = src_port[0] if type(src_port) is list else src_port + + if proto: + network_traffic['protocols'] = [protocol.lower() for protocol in proto] + + if dst_ref is not None or self.dst_ref.get('X'): + if dst_ref != 'X' and not self.dst_ref.get('X'): + network_traffic['dst_ref'] = str(dst_ref) + + if dst_port: + network_traffic['dst_port'] = dst_port[0] if type(dst_port) is list else dst_port + + if proto: + network_traffic['protocols'] = [protocol.lower() for protocol in proto] + return network_traffic + + def get_network_traffic_obj(self, objects, src_refs=None, dst_refs=None, src_port=None, dst_port=None): + """ + Creates network traffic objects based on port count. For every port in references create new network traffic object, + if not ports, create single network traffic object + :param objects: already created objects, to which will be added new ones + :param src_refs: source references + :param dst_refs: destination references + :return: all network traffic objects created so far + """ + if src_refs: + for obj_ref in src_refs: + try: + for port in src_refs[obj_ref]['Port']: + # source port forwarded in case of 'X' reference + objects[str(self.obj_counter)] = self.one_network_traffic_object(obj_ref, + proto=src_refs[obj_ref].get('Proto'), src_port=port, dst_port=dst_port) + self.obj_counter += 1 + except KeyError: + objects[str(self.obj_counter)] = self.one_network_traffic_object(obj_ref, + proto=src_refs[obj_ref].get('Proto'), dst_port=dst_port) + self.obj_counter += 1 + else: + for obj_ref in dst_refs: + try: + for port in dst_refs[obj_ref]['Port']: + # destination port forwarded in case of 'X' reference + objects[str(self.obj_counter)] = self.one_network_traffic_object(dst_ref=obj_ref, + proto=dst_refs[obj_ref].get('Proto'), dst_port=port, src_port=src_port) + self.obj_counter += 1 + except KeyError: + objects[str(self.obj_counter)] = self.one_network_traffic_object(dst_ref=obj_ref, + proto=dst_refs[obj_ref].get('Proto'), src_port=src_port) + self.obj_counter += 1 + return objects + + @staticmethod + def contain_same_proto(l1, l2): + """ + checks if the lists contain same protocols, can differ on ipv4 or ipv6 protocol + :param l1: list 1 + :param l2: list 2 + :return: boolean result + """ + diff = set(l1).symmetric_difference(l2) + if len(diff) == 0: + return True + elif len(diff) == 1 and ("ipv4" in diff or "ipv6" in diff): + return True + elif len(diff) == 2 and ("ipv4" in diff and "ipv6" in diff): + return True + return False + + def all_network_traffic_objects(self): + """ + Create all network traffic objects, decicion on how will be references splitted is based on references count, + port count and protocols + :return: all network traffic objects + """ + objects = {} + if self.src_ref and self.dst_ref: + if len(self.src_ref) > 1 or len(self.dst_ref) > 1: + if len(self.src_ref) == 1 and next(iter(self.src_ref.keys())) == 'X': + # Source contained only Port and Protocols without IP + objects = self.get_network_traffic_obj(objects, dst_refs=self.dst_ref, + src_port=self.src_ref['X']['Port']) + elif len(self.dst_ref) == 1 and next(iter(self.dst_ref.keys())) == 'X': + # Source contained only Port and Protocols without IP + objects = self.get_network_traffic_obj(objects, src_refs=self.src_ref, + dst_port=self.dst_ref['X']['Port']) + else: + # many IPs or Ports in Source and Target + objects = self.get_network_traffic_obj(objects, src_refs=self.src_ref) + objects = self.get_network_traffic_obj(objects, dst_refs=self.dst_ref) + else: + # len(src_network_ref) == 1 and len(dst_network_ref) == 1 + src_ref_key = next(iter(self.src_ref.keys())) + dst_ref_key = next(iter(self.dst_ref.keys())) + try: + if len(self.src_ref[src_ref_key]['Port']) == 1 and len(self.dst_ref[dst_ref_key]['Port']) == 1: + try: + # ports len == 1 + if self.contain_same_proto(self.src_ref[src_ref_key]['Proto'], self.dst_ref[dst_ref_key]['Proto']): + # can put references into one object + objects[str(self.obj_counter)] = self.one_network_traffic_object(src_ref_key, dst_ref_key, + self.src_ref[src_ref_key]['Proto'], self.src_ref[src_ref_key]['Port'], + self.dst_ref[dst_ref_key]['Port']) + self.obj_counter += 1 + else: + # ports len == 1 but different protocols --> split network traffic + objects = self.get_network_traffic_obj(objects, src_refs=self.src_ref) + objects = self.get_network_traffic_obj(objects, dst_refs=self.dst_ref) + except KeyError: + # src or/and dst has not protocol, if not protocol in both fill in one, else split + if not self.src_ref[src_ref_key].get('Proto') and not self.dst_ref[dst_ref_key].get('Proto'): + # neither one has protocol + return {} + else: + # one has protocol --> split, because it is not common for src and dst + objects = self.get_network_traffic_obj(objects, src_refs=self.src_ref) + objects = self.get_network_traffic_obj(objects, dst_refs=self.dst_ref) + else: + # ports has len more than one --> split + if len(self.src_ref) == 1 and next(iter(self.src_ref.keys())) == 'X': + objects = self.get_network_traffic_obj(objects, dst_refs=self.dst_ref, + src_port=self.src_ref['X']['Port']) + elif len(self.dst_ref) == 1 and next(iter(self.dst_ref.keys())) == 'X': + objects = self.get_network_traffic_obj(objects, src_refs=self.src_ref, + dst_port=self.dst_ref['X']['Port']) + else: + objects = self.get_network_traffic_obj(objects, src_refs=self.src_ref) + objects = self.get_network_traffic_obj(objects, dst_refs=self.dst_ref) + except KeyError: + # if src or/and dst has no port, check proto + try: + if self.contain_same_proto(self.src_ref[src_ref_key]['Proto'], self.dst_ref[dst_ref_key]['Proto']): + # can put references into one object + objects[str(self.obj_counter)] = self.one_network_traffic_object(src_ref_key, dst_ref_key, + self.src_ref[src_ref_key]['Proto'], self.src_ref[src_ref_key].get('Port'), + self.dst_ref[dst_ref_key].get('Port')) + self.obj_counter += 1 + else: + # proto is different --> split + objects = self.get_network_traffic_obj(objects, src_refs=self.src_ref) + objects = self.get_network_traffic_obj(objects, dst_refs=self.dst_ref) + except KeyError: + # src or/and dst has no protocol + if not self.src_ref[src_ref_key].get('Proto') and not self.dst_ref[dst_ref_key].get('Proto'): + # neither one has protocol + return {} + else: + # one has protocol --> split, because it is not common for src and dst + objects = self.get_network_traffic_obj(objects, src_refs=self.src_ref) + objects = self.get_network_traffic_obj(objects, dst_refs=self.dst_ref) + elif self.src_ref: + # only source refs + objects = self.get_network_traffic_obj(objects, src_refs=self.src_ref) + elif self.dst_ref: + # only dest refs + objects = self.get_network_traffic_obj(objects, dst_refs=self.dst_ref) + return objects + + def observed_data(self, identity, data, labels=False, origdata=False): + """ + create observed data domain object + :param identity: identity representing IDEA['Node'] + :param data: whole Idea message + :param labels: IDEA['Category'] + :param origdata: should the original IDEA message be inserted into new STIX message? + :return: created observed data or empty dictionary in case of error + """ + create_timestamp = self.convert_timestamp(data['CreateTime']) if data.get('CreateTime') else\ + self.convert_timestamp(data['DetectTime']) + observed_data = { + 'type': "observed-data", + 'id': "observed-data--" + str(uuid4()), + 'created_by_ref': identity, + 'created': create_timestamp, + 'modified': create_timestamp, + 'first_observed': self.convert_timestamp(data['EventTime']) if data.get('EventTime') else + self.convert_timestamp(data['DetectTime']), + 'last_observed': self.convert_timestamp(data['CeaseTime']) if data.get('CeaseTime') else + self.convert_timestamp(data['DetectTime']), + 'number_observed': data['ConnCount'] if data.get('ConnCount') else 1, + } + try: + observed_data['external_references'] = self.external_references(data['Ref']) + except KeyError: pass + if origdata: + observed_data['x_idea_cesnet_cz_original_data'] = data + try: + observed_data['x_idea_cesnet_cz_event_description'] = data['Description'] + ", " + data['Note'] \ + if data.get('Note') else data['Description'] + except KeyError: pass + if labels: + observed_data['labels'] = data['Category'] + # process source and target data + if data.get('Source') and data.get('Target'): + self.src_ref, src_objects = self.ipvx_addr_object(data['Source']) + self.dst_ref, dst_objects = self.ipvx_addr_object(data['Target']) + + # check possible 'X' reference, if not compatible with protocols, then delete it + if len(self.src_ref) == 1 and next(iter(self.src_ref.keys())) == 'X' and len(self.dst_ref) != 0: + for dst in self.dst_ref.values(): + try: + if not self.contain_same_proto(self.src_ref['X']['Proto'], dst['Proto']): + self.src_ref = {} + except KeyError: + self.src_ref = {} + if len(self.dst_ref) == 1 and next(iter(self.dst_ref.keys())) == 'X' and len(self.src_ref) != 0: + for src in self.src_ref.values(): + try: + if not self.contain_same_proto(self.dst_ref['X']['Proto'], src['Proto']): + self.dst_ref = {} + except KeyError: + self.dst_ref = {} + + # get src and dst together + objects = {**src_objects, **dst_objects} + elif data.get('Target'): + self.dst_ref, objects = self.ipvx_addr_object(data['Target']) + self.src_ref = {} + elif data.get('Source'): + self.src_ref, objects = self.ipvx_addr_object(data['Source']) + self.dst_ref = {} + network_objects = self.all_network_traffic_objects() + if not network_objects: + return {} + # get ip's and network_objects together + observed_data['objects'] = {**objects, **network_objects} + return observed_data + + def alert_object(self, category, ref, detect_time, create_time=None): + """ + create alert domain object + :param category: Category of event + :param ref: references of event + :param detect_time: detect time of event + :param create_time: create time of IDEA message + :return: alert domain object + """ + if "Vulnerable" in category: + vulnerability = { + 'type': "vulnerability", + 'id': "vulnerability--" + str(uuid4()), + 'created': self.convert_timestamp(create_time) if create_time else self.convert_timestamp(detect_time), + 'modified': self.convert_timestamp(create_time) if create_time else self.convert_timestamp(detect_time), + 'name': "unknown" + } + if ref: + vulnerability['external_references'] = self.external_references(ref) + return vulnerability + else: + malware = { + 'type': "malware", + 'id': "malware--" + str(uuid4()), + 'created': self.convert_timestamp(create_time) if create_time else self.convert_timestamp(detect_time), + 'modified': self.convert_timestamp(create_time) if create_time else self.convert_timestamp(detect_time), + 'name': "unknown", + 'labels': ["resource-exploitation" if "Exploit" in category else category.lower()] + } + if ref: + malware['external_references'] = self.external_references(ref) + return malware + + @staticmethod + def generate_bundle(objects, event_id=None): + """ + creates bundle object, which is collection of arbitrary STIX objects. Wrapped for transportation. + :param objects: all STIX objects + :param event_id: id of event passed, when the id needs to be set + :return: whole STIX message + """ + return {'type': "bundle", + 'id': "bundle--" + event_id if event_id is not None else str(uuid4()), + 'spec_version': "2.0", + 'objects': objects} + + def generate_sighting_message(self, data, category, origdata=False, event_id=None): + """ + generates message which consists of alert object, so create sighting domain object too + :param data: IDEA message + :param category: IDEA['Category'] + :param origdata: should be original IDEA message inserted into STIX message? + :param event_id: id of event passed, when the id needs to be set + :return: whole created STIX message if message was generated, otherwise None + """ + identity = [] + try: + for idea_id in data['Node']: + identity.append(self.identity_object(idea_id, data['DetectTime'], data.get('CreateTime'))) + except KeyError: + raise Exception("Cannot generate STIX 2.0 message, because IDEA message does not contain Node(identity)") + observed_data = self.observed_data(identity[-1]['id'], data, origdata=origdata) + if not observed_data: + raise Exception("Cannot generate STIX 2.0 message, because IDEA message does not contain IP address or " + "protocol in Source or Target.") + alert_object = self.alert_object(category, data.get('Ref'), data['DetectTime'], data.get('CreateTime')) + sighting_object = self.sighting_object(identity[-1]['id'], observed_data['id'], alert_object['id'], + data['DetectTime'], data.get('ConnCount'), data.get('EventTime'), + data.get('CeaseTime')) + return self.generate_bundle(identity + [alert_object, sighting_object, observed_data], event_id) + + def generate_observable_message(self, data, origdata=False, event_id=None): + """ + generates ordinary STIX message, which consists only from observed data and identity + :param data: IDEA message + :param origdata: should be original IDEA message inserted into STIX message? + :param event_id: id of event passed, when the id needs to be set + :return: whole created STIX message + """ + identity = [] + try: + for idea_id in data['Node']: + identity.append(self.identity_object(idea_id, data['DetectTime'], data.get('CreateTime'))) + except KeyError: + raise Exception("Cannot generate STIX 2.0 message, because IDEA message does not contain Node(identity)") + observed_data = self.observed_data(identity[-1]['id'], data, True, origdata=origdata) + if not observed_data: + raise Exception("Cannot generate STIX 2.0 message, because IDEA message does not contain IP address or " + "protocol in Source or Target") + return self.generate_bundle(identity + [observed_data], event_id) + + def to_stix(self, idea_event, origdata=None, event_id=None): + sighting_message = None + # check if IDEA Category is in sighting types + for stype in StixGenerator.sighting_types: + if stype in "".join(idea_event['Category']): + sighting_message = stype + # generate STIX message based on IDEA category + if sighting_message: + return self.generate_sighting_message(idea_event, sighting_message, origdata, event_id) + else: + return self.generate_observable_message(idea_event, origdata, event_id)