diff --git a/IDEA_to_STIX/IdeaToStix.py b/IDEA_to_STIX/IdeaToStix.py new file mode 100644 index 0000000000000000000000000000000000000000..58070a1ecfe8c5e3b8ff95c328107feaa6ca3af8 --- /dev/null +++ b/IDEA_to_STIX/IdeaToStix.py @@ -0,0 +1,237 @@ +import json +import argparse +import os +from uuid import uuid4 +import re + +class StixGenerator(object): + def sighting_object(self, id_identity, id_observed_data, id_alert, detect_time, conn_count=None, event_time=None, + cease_time=None): + sight_object = { + 'type': "sighting", + 'id': "sighting--" + str(uuid4()), + 'created_by_ref': id_identity, + 'created': detect_time, + 'count': conn_count if conn_count else 1, + 'sighting_of_ref': id_alert, + 'observed_data_refs': [id_observed_data], + 'where_sighted_refs': [id_identity] + } + if event_time: + sight_object['first_observed'] = event_time + if cease_time: + sight_object['last_observed'] = cease_time + return sight_object + + def identity_object(self, node): + identity = { + 'type': "identity", + 'id': "identity--" + str(uuid4()), + 'name': node[0]['Name'], + 'identity_class': "technology" + } + # IDEA does not have to contain SW, the same with Type below + if node[0].get('SW'): + identity['description'] = ", ".join(node[0].get('SW')) + # if there are two indices, fill it with the second too + if len(node) == 2 and node[1].get('SW'): + identity['description'] = (("[0] " + identity['description'] + " ") if identity.get('description') + else "") + "[1] " + ", ".join(node[1]['SW']) + if node[0].get('Type'): + identity['labels'] = ", ".join(node[0].get('Type')) + if len(node) == 2 and node[1].get('Type'): + identity['labels'] = (("[0] " + identity['labels'] + " ") if identity.get('labels') else "") + \ + "[1] " + ", ".join(node[1].get('Type')) + if len(node) == 2: + identity['name'] = "[0] " + identity['name'] + " [1] " + node[1]['Name'] + return identity + + def ipvx_addr_object(self, addr_objects, object_counter, source=False): + objects = {} + network_values = [] + # go through all dictionaries {"port": xxx, "ip": [xxx] ... } + for ip_dict in addr_objects: + ip_references = {} + # for every ip list generate ipvx_addr object and count created objects for next generating + if addr_objects[0].get('IP4'): + af = "IP4" + elif addr_objects[0].get('IP6'): + af = "IP6" + else: + af = "" + if af: + for ip_addr in ip_dict[af]: + objects[str(object_counter[-1])] = {'type': "ipv4-addr" if af == "IP4" else "ipv6-addr", + 'value': ip_addr} + object_counter.append(object_counter[-1] + 1) + # save references to ip-addr objects + ip_references['Ip_addr_references'] = object_counter[0:-1] + if ip_dict.get('Proto'): + ip_references['Proto'] = ip_dict['Proto'] + if ip_dict.get('Port'): + ip_references['Port'] = ip_dict['Port'] + network_values.append(ip_references) + object_counter = [object_counter[-1]] + return network_values, object_counter, objects + + def one_network_traffic_object(self, src_network_references=None, dst_network_references=None): + network_traffic = { + 'type': "network-traffic" + } + if src_network_references: + if src_network_references.get('Ip_addr_references'): + network_traffic['src_ref'] = [str(ip_key) for ip_key in src_network_references['Ip_addr_references']] + if src_network_references.get('Proto'): + network_traffic['protocols'] = src_network_references['Proto'] + if src_network_references.get('Port'): + network_traffic['src_port'] = src_network_references['Port'][0] + if dst_network_references: + if dst_network_references.get('Ip_addr_references'): + network_traffic['dst_ref'] = [str(ip_key) for ip_key in dst_network_references['Ip_addr_references']] + if dst_network_references.get('Proto'): + network_traffic['protocols'] = dst_network_references['Proto'] + if dst_network_references.get('Port'): + network_traffic['dst_port'] = dst_network_references['Port'][0] + return network_traffic + + def all_network_traffic_objects(self, src_network_references, dst_network_references, object_counter): + objects = {} + if src_network_references: + for network_record in src_network_references: + objects[str(object_counter)] = self.one_network_traffic_object(network_record) + object_counter += 1 + if dst_network_references: + for network_record in dst_network_references: + objects[str(object_counter)] = self.one_network_traffic_object(None, network_record) + object_counter += 1 + return objects, object_counter + + def external_references(self, refs): + ext_references = [] + for record in refs: + if re.search("^url:", record): + ext_references.append({'url': record[4:]}) + else: + ext_references.append({'source_name': record.split(":")[0], + 'external_id': record.split(":")[1]}) + return ext_references + + def observed_data_object(self, identity, data, file, labels=False): + observed_data = { + 'type': "observed-data", + 'id': "observed-data--" + str(uuid4()), + 'created_by_ref': identity, + 'created': data['DetectTime'], + 'first_observed': data['EventTime'] if data.get('EventTime') else data['DetectTime'], + 'last_observed': data['CeaseTime'] if data.get('CeaseTime') else data['DetectTime'], + 'number-observed': data['ConnCount'] if data.get('ConnCount') else 1, + 'x_idea_original_data': data + } + print(file) + if data.get('Ref'): + observed_data['external_references'] = self.external_references(data['Ref']) + if labels: + observed_data['labels'] = data['Category'] + object_counter = [0] + # process source and target data + if data.get('Source') and data.get('Target'): + src_network_references, object_counter, src_objects = self.ipvx_addr_object(data['Source'], object_counter, True) + dst_network_references, object_counter, dst_objects = self.ipvx_addr_object(data['Target'], object_counter) + # get src and dst together + objects = {**src_objects, **dst_objects} + elif data.get('Target'): + dst_network_references, object_counter, objects = self.ipvx_addr_object(data['Target'], object_counter) + src_network_references = {} + elif data.get('Source'): + src_network_references, object_counter, objects = self.ipvx_addr_object(data['Source'], object_counter, True) + dst_network_references = {} + object_counter = object_counter[-1] + network_objects, object_counter = self.all_network_traffic_objects(src_network_references, + dst_network_references, object_counter) + # get ip's and network_objects together + observed_data['objects'] = {**objects, **network_objects} + if data.get('Description'): + observed_data['objects'][object_counter] = {'type': "artifact", + 'mime_type': "text/plain", + 'payload_bin': "Description: " + data['Description']} + return observed_data + + def alert_object(self, category, ref): + if "Vulnerable" in category: + vulnerability = { + 'type': "vulnerability", + 'id': "vulnerability--" + str(uuid4()), + 'name': "unknown" + } + if ref: + vulnerability['external_references'] = self.external_references(ref) + return vulnerability + else: + return { + 'type': "malware", + 'id': "malware--" + str(uuid4()), + 'name': "unknown", + 'labels': ["resource-exploitation" if "Exploit" in category else category] + } + +def get_args(): + parser = argparse.ArgumentParser( + description="Load IDEA messages from directory and converts them into STIX 2.0 messages.") + parser.add_argument( + "--path", + required=True, + dest="path", + action="store", + help="Path to directory of IDEA files you want to convert.") + return parser + +def generate_sighting_message(data, category, file): + stix_gen = StixGenerator() + identity = stix_gen.identity_object(data.get('Node')) + observed_data = stix_gen.observed_data_object(identity['id'], data, file) + alert_object = stix_gen.alert_object(category, data.get('Ref')) + sighting_object = stix_gen.sighting_object(identity['id'], observed_data['id'], alert_object['id'], + data['DetectTime'], data.get('ConnCount'), data.get('EventTime'), + data.get('CeaseTime')) + return [json.dumps(sighting_object), json.dumps(identity), json.dumps(alert_object), json.dumps(observed_data)] + + +def generate_observable_message(data, file): + stix_gen = StixGenerator() + identity = stix_gen.identity_object(data.get('Node')) + observed_data = stix_gen.observed_data_object(identity['id'], data, file, True) + return [json.dumps(identity), json.dumps(observed_data)] + + +def main(): + # get files path + parser = get_args() + args = parser.parse_args() + # list all IDEA files + list_dir = os.listdir(args.path) + for file in list_dir: + data = json.load(open(os.path.join(args.path, file))) + # if Idea message does not contain Source or Target, there is no way how to generate STIX message + if data.get('Source') or data.get('Target'): + # Sighting message can contain only these categories, because sighting_of_ref must contain only + # STIX object (Malware, Vulnerability, the rest is useless in IDEA message meaning) + sighting_types = ["Virus", "Worm", "Trojan", "Spyware", "Rootkit", "Exploit", "Botnet", "DDoS", "Vulnerable", + "DoS"] + sighting_message = None + for type in sighting_types: + if type in data['Category'][0]: + sighting_message = type + if sighting_message: + output = generate_sighting_message(data, sighting_message, file) + else: + output = generate_observable_message(data, file) + output_file = open(os.path.join(os.getcwd(), "STIX_converted_messages", "STIX_converted_"+file), 'w') + for object in output: + json.dump(json.JSONDecoder().decode(object), output_file) + output_file.write("\n") + else: + print("Cannot generate STIX message, because IDEA message does not contain enough information.") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/IDEA_to_STIX/StixObjects.py b/IDEA_to_STIX/StixObjects.py deleted file mode 100644 index 9354bbf1ede37adc5ddfe16cb093434754172cc1..0000000000000000000000000000000000000000 --- a/IDEA_to_STIX/StixObjects.py +++ /dev/null @@ -1,151 +0,0 @@ -from uuid import uuid4 -import re -from time import gmtime, strftime - - -class StixGenerator(object): - def sighting_object(self, identity, conn_count, observed_data, alert_type): - return { - 'type': "sighting", - 'id': "sighting--" + str(uuid4()), - 'created_by_ref': identity, - 'created': strftime("%y-%m-%dT%H:%M:%S", gmtime()), - 'count': conn_count, - 'sighting_of_ref': alert_type, - 'observed_data_refs': [observed_data], - 'where_sighted_refs': [identity] - } - - def identity_object(self, node): - return { - 'type': "identity", - 'id': "identity--" + str(uuid4()), - 'name': node[0]['Name'], - 'labels': node[0]['Type'], - 'description': "".join(node[0]['SW']), - 'identity_class': "technology" - } - - def ipvx_addr_object(self, addr_objects, object_counter, source=False): - objects = {} - ip_references = {} - for ip_dict in addr_objects: - af = "IP4" if addr_objects[0].get('IP4') else "IP6" - for ip_addr in ip_dict[af]: - objects[str(object_counter[-1])] = {'type': "ipv4-addr" if af == "IP4" else "ipv6-addr", - 'value': ip_addr} - object_counter.append(object_counter[-1] + 1) - ip_references[tuple(ip_dict['Port'])] = [object_counter[0:-1], ([ip_dict['Proto']] if source else [])] - object_counter = [object_counter[-1]] - return ip_references, object_counter, objects - - def one_network_traffic_object(self, src_ip_references=None, dst_ip_references=None): - network_traffic = { - 'type': "network-traffic" - } - if src_ip_references: - for port in src_ip_references.keys(): - network_traffic['src_ref'] = [str(ip_key) for ip_key in src_ip_references[port][0]] - network_traffic['protocols'] = src_ip_references[port][1][0] - network_traffic['src_port'] = port[0] if len(port) == 1 else port - if dst_ip_references: - for port in dst_ip_references.keys(): - network_traffic['dst_ref'] = [str(ip_key) for ip_key in dst_ip_references[port][0]] - network_traffic['dst_port'] = port[0] if len(port) == 1 else port - return network_traffic - - def all_network_traffic_objects(self, src_ip_references, dst_ip_references, object_counter): - objects = {} - if len(src_ip_references) > 1 and len(dst_ip_references) > 1: - network_state = "go_through_src_dst" - elif len(dst_ip_references) == 0 or len(src_ip_references) > len(dst_ip_references): - network_state = "go_through_src" - elif len(src_ip_references) == 0 or len(dst_ip_references) > len(src_ip_references): - network_state = "go_through_dst" - else: - network_state = "one_object" - network_opts = { - 'go_through_src_dst': {'dst_params': {'dst_ip_references': None}, - 'src_params': {'src_ip_references': None}}, - 'go_through_src': {'dst_params': {'dst_ip_references': dst_ip_references}}, - 'go_through_dst': {'src_params': {'src_ip_references': src_ip_references}}, - } - if re.search("src", network_state): - for port, list_of_src_ip in src_ip_references.items(): - objects[str(object_counter)] = self.one_network_traffic_object({port: list_of_src_ip}, - network_opts[network_state]['dst_params']['dst_ip_references']) - object_counter += 1 - if re.search("dst", network_state): - for port, list_of_dst_ip in dst_ip_references.items(): - objects[str(object_counter)] = self.one_network_traffic_object(network_opts[network_state] - ['src_params']['src_ip_references'], {port: list_of_dst_ip}) - object_counter += 1 - if network_state == "one_object": - objects[str(object_counter)] = self.one_network_traffic_object(src_ip_references, dst_ip_references) - return objects - - def external_references(self, refs): - ext_references = [] - for record in refs: - if re.search("^url:", record): - ext_references.append({'url': record[4:]}) - else: - ext_references.append({'source_name': record.split(":")[0], - 'external_id': record.split(":")[1]}) - return ext_references - - def observed_data_object(self, identity, data, labels=False): - observed_data = { - 'type': "observed-data", - 'id': "observed-data--" + str(uuid4()), - 'created_by_ref': identity, - 'created': data['DetectTime'], - 'first_observed': data['EventTime'] if data.get('EventTime') else data['DetectTime'], - 'last_observed': data['CeaseTime'] if data.get('CeaseTime') else data['DetectTime'], - 'number-observed': data['ConnCount'] if data.get('ConnCount') else 1 - } - if data['Ref']: - observed_data['external_references'] = self.external_references(data['Ref']) - if labels: - observed_data['labels'] = data['Category'] - object_counter = [0] - # process source and target data - if data.get('Source') and data.get('Target'): - src_ip_references, object_counter, src_objects = self.ipvx_addr_object(data['Source'], - object_counter, True) - dst_ip_references, object_counter, dst_objects = self.ipvx_addr_object(data['Target'], - object_counter) - objects = {**src_objects, **dst_objects} - elif data.get('Target'): - dst_ip_references, object_counter, objects = self.ipvx_addr_object(data['Target'], object_counter) - src_ip_references = {} - elif data.get('Source'): - src_ip_references, object_counter, objects = self.ipvx_addr_object(data['Source'], object_counter, - True) - dst_ip_references = {} - else: - objects = None - if objects: - object_counter = object_counter[-1] - network_objects = self.all_network_traffic_objects(src_ip_references, dst_ip_references, - object_counter) - observed_data['objects'] = {**objects, **network_objects} - return observed_data - - def alert_object(self, category, ref): - if re.search("Vulnerability", category): - vulnerability = { - 'type': "Vulnerability", - 'id': "vulnerability--" + str(uuid4()), - 'name': "unknown" - } - if ref: - vulnerability['external_references'] = self.external_references(ref) - return vulnerability - else: - return { - 'type': "malware", - 'id': "malware--" + str(uuid4()), - 'name': "unknown", - 'labels': [category] - } diff --git a/IDEA_to_STIX/StixToIdea.py b/IDEA_to_STIX/StixToIdea.py deleted file mode 100644 index 3ab65224130ef1c53db74d0e80565b75dc38fcf0..0000000000000000000000000000000000000000 --- a/IDEA_to_STIX/StixToIdea.py +++ /dev/null @@ -1,46 +0,0 @@ -import json -from StixObjects import StixGenerator - -def generate_sighting_message(data, category): - stix_gen = StixGenerator() - print("sighting message") - identity = stix_gen.identity_object(data.get('Node')) - print(identity) - observed_data = stix_gen.observed_data_object(identity['id'], data) - print(observed_data) - alert_object = stix_gen.alert_object(category, data.get('Ref')) - print(alert_object) - sighting_object = stix_gen.sighting_object(identity['id'], data['ConnCount'], observed_data['id'], - alert_object['id']) - print(sighting_object) - - -def generate_observable_message(data): - stix_gen = StixGenerator() - print("observable message") - identity = stix_gen.identity_object(data.get('Node')) - print(identity) - observed_data = stix_gen.observed_data_object(identity['id'], data, True) - print(observed_data) - - -def main(): - with open("IdeaLog.txt") as f: - data = json.load(f) - if data.get('Source') or data.get('Target'): - sighting_types = ["Virus", "Worm", "Trojan", "Spyware", "Rootkit", "Exploit", "Bot", "DDoS", "Vulnerability", - "DoS"] - sighting_message = None - for type in sighting_types: - if type in data['Category'][0]: - sighting_message = type - if sighting_message: - generate_sighting_message(data, sighting_message) - else: - generate_observable_message(data) - else: - print("Cannot generate STIX message, because IDEA message does not contain enough information.") - - -if __name__ == "__main__": - main() \ No newline at end of file