Skip to content
Snippets Groups Projects
Commit df6d2a1e authored by Pavel Eis's avatar Pavel Eis Committed by Pavel Kácha
Browse files

IDEA to STIX connector refractored -- merged into one file IdeaToStix.py and...

IDEA to STIX connector refractored -- merged into one file IdeaToStix.py and simplified some constructions, fixed some mistakes, added Desription to objects of observed data object and from IDEA Node is now filled both indices to identity object.
parent 31705e58
No related branches found
No related tags found
No related merge requests found
import json
import argparse
import os
from uuid import uuid4
import re
class StixGenerator(object):
def sighting_object(self, id_identity, id_observed_data, id_alert, detect_time, conn_count=None, event_time=None,
cease_time=None):
sight_object = {
'type': "sighting",
'id': "sighting--" + str(uuid4()),
'created_by_ref': id_identity,
'created': detect_time,
'count': conn_count if conn_count else 1,
'sighting_of_ref': id_alert,
'observed_data_refs': [id_observed_data],
'where_sighted_refs': [id_identity]
}
if event_time:
sight_object['first_observed'] = event_time
if cease_time:
sight_object['last_observed'] = cease_time
return sight_object
def identity_object(self, node):
identity = {
'type': "identity",
'id': "identity--" + str(uuid4()),
'name': node[0]['Name'],
'identity_class': "technology"
}
# IDEA does not have to contain SW, the same with Type below
if node[0].get('SW'):
identity['description'] = ", ".join(node[0].get('SW'))
# if there are two indices, fill it with the second too
if len(node) == 2 and node[1].get('SW'):
identity['description'] = (("[0] " + identity['description'] + " ") if identity.get('description')
else "") + "[1] " + ", ".join(node[1]['SW'])
if node[0].get('Type'):
identity['labels'] = ", ".join(node[0].get('Type'))
if len(node) == 2 and node[1].get('Type'):
identity['labels'] = (("[0] " + identity['labels'] + " ") if identity.get('labels') else "") + \
"[1] " + ", ".join(node[1].get('Type'))
if len(node) == 2:
identity['name'] = "[0] " + identity['name'] + " [1] " + node[1]['Name']
return identity
def ipvx_addr_object(self, addr_objects, object_counter, source=False):
objects = {}
network_values = []
# go through all dictionaries {"port": xxx, "ip": [xxx] ... }
for ip_dict in addr_objects:
ip_references = {}
# for every ip list generate ipvx_addr object and count created objects for next generating
if addr_objects[0].get('IP4'):
af = "IP4"
elif addr_objects[0].get('IP6'):
af = "IP6"
else:
af = ""
if af:
for ip_addr in ip_dict[af]:
objects[str(object_counter[-1])] = {'type': "ipv4-addr" if af == "IP4" else "ipv6-addr",
'value': ip_addr}
object_counter.append(object_counter[-1] + 1)
# save references to ip-addr objects
ip_references['Ip_addr_references'] = object_counter[0:-1]
if ip_dict.get('Proto'):
ip_references['Proto'] = ip_dict['Proto']
if ip_dict.get('Port'):
ip_references['Port'] = ip_dict['Port']
network_values.append(ip_references)
object_counter = [object_counter[-1]]
return network_values, object_counter, objects
def one_network_traffic_object(self, src_network_references=None, dst_network_references=None):
network_traffic = {
'type': "network-traffic"
}
if src_network_references:
if src_network_references.get('Ip_addr_references'):
network_traffic['src_ref'] = [str(ip_key) for ip_key in src_network_references['Ip_addr_references']]
if src_network_references.get('Proto'):
network_traffic['protocols'] = src_network_references['Proto']
if src_network_references.get('Port'):
network_traffic['src_port'] = src_network_references['Port'][0]
if dst_network_references:
if dst_network_references.get('Ip_addr_references'):
network_traffic['dst_ref'] = [str(ip_key) for ip_key in dst_network_references['Ip_addr_references']]
if dst_network_references.get('Proto'):
network_traffic['protocols'] = dst_network_references['Proto']
if dst_network_references.get('Port'):
network_traffic['dst_port'] = dst_network_references['Port'][0]
return network_traffic
def all_network_traffic_objects(self, src_network_references, dst_network_references, object_counter):
objects = {}
if src_network_references:
for network_record in src_network_references:
objects[str(object_counter)] = self.one_network_traffic_object(network_record)
object_counter += 1
if dst_network_references:
for network_record in dst_network_references:
objects[str(object_counter)] = self.one_network_traffic_object(None, network_record)
object_counter += 1
return objects, object_counter
def external_references(self, refs):
ext_references = []
for record in refs:
if re.search("^url:", record):
ext_references.append({'url': record[4:]})
else:
ext_references.append({'source_name': record.split(":")[0],
'external_id': record.split(":")[1]})
return ext_references
def observed_data_object(self, identity, data, file, labels=False):
observed_data = {
'type': "observed-data",
'id': "observed-data--" + str(uuid4()),
'created_by_ref': identity,
'created': data['DetectTime'],
'first_observed': data['EventTime'] if data.get('EventTime') else data['DetectTime'],
'last_observed': data['CeaseTime'] if data.get('CeaseTime') else data['DetectTime'],
'number-observed': data['ConnCount'] if data.get('ConnCount') else 1,
'x_idea_original_data': data
}
print(file)
if data.get('Ref'):
observed_data['external_references'] = self.external_references(data['Ref'])
if labels:
observed_data['labels'] = data['Category']
object_counter = [0]
# process source and target data
if data.get('Source') and data.get('Target'):
src_network_references, object_counter, src_objects = self.ipvx_addr_object(data['Source'], object_counter, True)
dst_network_references, object_counter, dst_objects = self.ipvx_addr_object(data['Target'], object_counter)
# get src and dst together
objects = {**src_objects, **dst_objects}
elif data.get('Target'):
dst_network_references, object_counter, objects = self.ipvx_addr_object(data['Target'], object_counter)
src_network_references = {}
elif data.get('Source'):
src_network_references, object_counter, objects = self.ipvx_addr_object(data['Source'], object_counter, True)
dst_network_references = {}
object_counter = object_counter[-1]
network_objects, object_counter = self.all_network_traffic_objects(src_network_references,
dst_network_references, object_counter)
# get ip's and network_objects together
observed_data['objects'] = {**objects, **network_objects}
if data.get('Description'):
observed_data['objects'][object_counter] = {'type': "artifact",
'mime_type': "text/plain",
'payload_bin': "Description: " + data['Description']}
return observed_data
def alert_object(self, category, ref):
if "Vulnerable" in category:
vulnerability = {
'type': "vulnerability",
'id': "vulnerability--" + str(uuid4()),
'name': "unknown"
}
if ref:
vulnerability['external_references'] = self.external_references(ref)
return vulnerability
else:
return {
'type': "malware",
'id': "malware--" + str(uuid4()),
'name': "unknown",
'labels': ["resource-exploitation" if "Exploit" in category else category]
}
def get_args():
parser = argparse.ArgumentParser(
description="Load IDEA messages from directory and converts them into STIX 2.0 messages.")
parser.add_argument(
"--path",
required=True,
dest="path",
action="store",
help="Path to directory of IDEA files you want to convert.")
return parser
def generate_sighting_message(data, category, file):
stix_gen = StixGenerator()
identity = stix_gen.identity_object(data.get('Node'))
observed_data = stix_gen.observed_data_object(identity['id'], data, file)
alert_object = stix_gen.alert_object(category, data.get('Ref'))
sighting_object = stix_gen.sighting_object(identity['id'], observed_data['id'], alert_object['id'],
data['DetectTime'], data.get('ConnCount'), data.get('EventTime'),
data.get('CeaseTime'))
return [json.dumps(sighting_object), json.dumps(identity), json.dumps(alert_object), json.dumps(observed_data)]
def generate_observable_message(data, file):
stix_gen = StixGenerator()
identity = stix_gen.identity_object(data.get('Node'))
observed_data = stix_gen.observed_data_object(identity['id'], data, file, True)
return [json.dumps(identity), json.dumps(observed_data)]
def main():
# get files path
parser = get_args()
args = parser.parse_args()
# list all IDEA files
list_dir = os.listdir(args.path)
for file in list_dir:
data = json.load(open(os.path.join(args.path, file)))
# if Idea message does not contain Source or Target, there is no way how to generate STIX message
if data.get('Source') or data.get('Target'):
# Sighting message can contain only these categories, because sighting_of_ref must contain only
# STIX object (Malware, Vulnerability, the rest is useless in IDEA message meaning)
sighting_types = ["Virus", "Worm", "Trojan", "Spyware", "Rootkit", "Exploit", "Botnet", "DDoS", "Vulnerable",
"DoS"]
sighting_message = None
for type in sighting_types:
if type in data['Category'][0]:
sighting_message = type
if sighting_message:
output = generate_sighting_message(data, sighting_message, file)
else:
output = generate_observable_message(data, file)
output_file = open(os.path.join(os.getcwd(), "STIX_converted_messages", "STIX_converted_"+file), 'w')
for object in output:
json.dump(json.JSONDecoder().decode(object), output_file)
output_file.write("\n")
else:
print("Cannot generate STIX message, because IDEA message does not contain enough information.")
if __name__ == "__main__":
main()
\ No newline at end of file
from uuid import uuid4
import re
from time import gmtime, strftime
class StixGenerator(object):
def sighting_object(self, identity, conn_count, observed_data, alert_type):
return {
'type': "sighting",
'id': "sighting--" + str(uuid4()),
'created_by_ref': identity,
'created': strftime("%y-%m-%dT%H:%M:%S", gmtime()),
'count': conn_count,
'sighting_of_ref': alert_type,
'observed_data_refs': [observed_data],
'where_sighted_refs': [identity]
}
def identity_object(self, node):
return {
'type': "identity",
'id': "identity--" + str(uuid4()),
'name': node[0]['Name'],
'labels': node[0]['Type'],
'description': "".join(node[0]['SW']),
'identity_class': "technology"
}
def ipvx_addr_object(self, addr_objects, object_counter, source=False):
objects = {}
ip_references = {}
for ip_dict in addr_objects:
af = "IP4" if addr_objects[0].get('IP4') else "IP6"
for ip_addr in ip_dict[af]:
objects[str(object_counter[-1])] = {'type': "ipv4-addr" if af == "IP4" else "ipv6-addr",
'value': ip_addr}
object_counter.append(object_counter[-1] + 1)
ip_references[tuple(ip_dict['Port'])] = [object_counter[0:-1], ([ip_dict['Proto']] if source else [])]
object_counter = [object_counter[-1]]
return ip_references, object_counter, objects
def one_network_traffic_object(self, src_ip_references=None, dst_ip_references=None):
network_traffic = {
'type': "network-traffic"
}
if src_ip_references:
for port in src_ip_references.keys():
network_traffic['src_ref'] = [str(ip_key) for ip_key in src_ip_references[port][0]]
network_traffic['protocols'] = src_ip_references[port][1][0]
network_traffic['src_port'] = port[0] if len(port) == 1 else port
if dst_ip_references:
for port in dst_ip_references.keys():
network_traffic['dst_ref'] = [str(ip_key) for ip_key in dst_ip_references[port][0]]
network_traffic['dst_port'] = port[0] if len(port) == 1 else port
return network_traffic
def all_network_traffic_objects(self, src_ip_references, dst_ip_references, object_counter):
objects = {}
if len(src_ip_references) > 1 and len(dst_ip_references) > 1:
network_state = "go_through_src_dst"
elif len(dst_ip_references) == 0 or len(src_ip_references) > len(dst_ip_references):
network_state = "go_through_src"
elif len(src_ip_references) == 0 or len(dst_ip_references) > len(src_ip_references):
network_state = "go_through_dst"
else:
network_state = "one_object"
network_opts = {
'go_through_src_dst': {'dst_params': {'dst_ip_references': None},
'src_params': {'src_ip_references': None}},
'go_through_src': {'dst_params': {'dst_ip_references': dst_ip_references}},
'go_through_dst': {'src_params': {'src_ip_references': src_ip_references}},
}
if re.search("src", network_state):
for port, list_of_src_ip in src_ip_references.items():
objects[str(object_counter)] = self.one_network_traffic_object({port: list_of_src_ip},
network_opts[network_state]['dst_params']['dst_ip_references'])
object_counter += 1
if re.search("dst", network_state):
for port, list_of_dst_ip in dst_ip_references.items():
objects[str(object_counter)] = self.one_network_traffic_object(network_opts[network_state]
['src_params']['src_ip_references'], {port: list_of_dst_ip})
object_counter += 1
if network_state == "one_object":
objects[str(object_counter)] = self.one_network_traffic_object(src_ip_references, dst_ip_references)
return objects
def external_references(self, refs):
ext_references = []
for record in refs:
if re.search("^url:", record):
ext_references.append({'url': record[4:]})
else:
ext_references.append({'source_name': record.split(":")[0],
'external_id': record.split(":")[1]})
return ext_references
def observed_data_object(self, identity, data, labels=False):
observed_data = {
'type': "observed-data",
'id': "observed-data--" + str(uuid4()),
'created_by_ref': identity,
'created': data['DetectTime'],
'first_observed': data['EventTime'] if data.get('EventTime') else data['DetectTime'],
'last_observed': data['CeaseTime'] if data.get('CeaseTime') else data['DetectTime'],
'number-observed': data['ConnCount'] if data.get('ConnCount') else 1
}
if data['Ref']:
observed_data['external_references'] = self.external_references(data['Ref'])
if labels:
observed_data['labels'] = data['Category']
object_counter = [0]
# process source and target data
if data.get('Source') and data.get('Target'):
src_ip_references, object_counter, src_objects = self.ipvx_addr_object(data['Source'],
object_counter, True)
dst_ip_references, object_counter, dst_objects = self.ipvx_addr_object(data['Target'],
object_counter)
objects = {**src_objects, **dst_objects}
elif data.get('Target'):
dst_ip_references, object_counter, objects = self.ipvx_addr_object(data['Target'], object_counter)
src_ip_references = {}
elif data.get('Source'):
src_ip_references, object_counter, objects = self.ipvx_addr_object(data['Source'], object_counter,
True)
dst_ip_references = {}
else:
objects = None
if objects:
object_counter = object_counter[-1]
network_objects = self.all_network_traffic_objects(src_ip_references, dst_ip_references,
object_counter)
observed_data['objects'] = {**objects, **network_objects}
return observed_data
def alert_object(self, category, ref):
if re.search("Vulnerability", category):
vulnerability = {
'type': "Vulnerability",
'id': "vulnerability--" + str(uuid4()),
'name': "unknown"
}
if ref:
vulnerability['external_references'] = self.external_references(ref)
return vulnerability
else:
return {
'type': "malware",
'id': "malware--" + str(uuid4()),
'name': "unknown",
'labels': [category]
}
import json
from StixObjects import StixGenerator
def generate_sighting_message(data, category):
stix_gen = StixGenerator()
print("sighting message")
identity = stix_gen.identity_object(data.get('Node'))
print(identity)
observed_data = stix_gen.observed_data_object(identity['id'], data)
print(observed_data)
alert_object = stix_gen.alert_object(category, data.get('Ref'))
print(alert_object)
sighting_object = stix_gen.sighting_object(identity['id'], data['ConnCount'], observed_data['id'],
alert_object['id'])
print(sighting_object)
def generate_observable_message(data):
stix_gen = StixGenerator()
print("observable message")
identity = stix_gen.identity_object(data.get('Node'))
print(identity)
observed_data = stix_gen.observed_data_object(identity['id'], data, True)
print(observed_data)
def main():
with open("IdeaLog.txt") as f:
data = json.load(f)
if data.get('Source') or data.get('Target'):
sighting_types = ["Virus", "Worm", "Trojan", "Spyware", "Rootkit", "Exploit", "Bot", "DDoS", "Vulnerability",
"DoS"]
sighting_message = None
for type in sighting_types:
if type in data['Category'][0]:
sighting_message = type
if sighting_message:
generate_sighting_message(data, sighting_message)
else:
generate_observable_message(data)
else:
print("Cannot generate STIX message, because IDEA message does not contain enough information.")
if __name__ == "__main__":
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment