Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
I
IDEA
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
713
Warden
IDEA
Commits
ddb57dcd
"README.rst" did not exist on "fd8578fad3e8ed0c232063f55864cb31a8b57866"
Commit
ddb57dcd
authored
6 years ago
by
Pavel Eis
Browse files
Options
Downloads
Patches
Plain Diff
Added stix.py, converter from IDEA to STIX 2.0
parent
99566a15
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
idea/stix.py
+512
-0
512 additions, 0 deletions
idea/stix.py
with
512 additions
and
0 deletions
idea/stix.py
0 → 100644
+
512
−
0
View file @
ddb57dcd
# -*- coding: utf-8 -*-
#
# Copyright (c) 2018, CESNET, z. s. p. o.
# Use of this source is governed by an ISC license, see LICENSE file.
from
uuid
import
uuid4
import
re
from
dateutil
import
parser
as
date_parser
class
StixGenerator
(
object
):
sighting_types
=
[
"
Virus
"
,
"
Worm
"
,
"
Trojan
"
,
"
Spyware
"
,
"
Rootkit
"
,
"
Exploit
"
,
"
Botnet
"
,
"
DDoS
"
,
"
Vulnerable
"
,
"
DoS
"
]
def
__init__
(
self
):
# objects in observed data object
self
.
obj_counter
=
0
# source references created form IDEA['Source']
self
.
src_ref
=
{}
# destination references created from IDEA['Target']
self
.
dst_ref
=
{}
@staticmethod
def
convert_timestamp
(
timestamp
):
"""
converts any IDEA timestamp into required RFC 3339-formatted timestamp
:param timestamp:
:return:
"""
new_timestamp
=
date_parser
.
parse
(
timestamp
).
strftime
(
"
%Y-%m-%dT%H:%M:%S.%f
"
)
return
new_timestamp
[
0
:
-
3
]
+
"
Z
"
@staticmethod
def
external_references
(
refs
):
"""
creates references for some STIX object from list of references
:param refs: list of references
:return: new references
"""
ext_references
=
[]
for
record
in
refs
:
if
re
.
search
(
"
^url:
"
,
record
):
ext_references
.
append
({
'
url
'
:
record
[
4
:]})
else
:
ext_references
.
append
({
'
source_name
'
:
record
.
split
(
"
:
"
)[
0
],
'
external_id
'
:
record
.
split
(
"
:
"
)[
1
]})
return
ext_references
def
sighting_object
(
self
,
id_identity
,
id_observed_data
,
id_alert
,
detect_time
,
conn_count
=
None
,
event_time
=
None
,
cease_time
=
None
,
create_time
=
None
):
"""
Creates new sighting domain object
:param id_identity: id of indentity, which created message
:param id_observed_data: id of seen data
:param id_alert: id of alert object
:param detect_time: detect time of IDEA message
:param conn_count: number of connections
:param event_time: start of event
:param cease_time: end of event
:param create_time: create time of event
:return: sighting domain object
"""
sight_object
=
{
'
type
'
:
"
sighting
"
,
'
id
'
:
"
sighting--
"
+
str
(
uuid4
()),
'
created_by_ref
'
:
id_identity
,
'
created
'
:
self
.
convert_timestamp
(
create_time
)
if
create_time
else
self
.
convert_timestamp
(
detect_time
),
'
modified
'
:
self
.
convert_timestamp
(
create_time
)
if
create_time
else
self
.
convert_timestamp
(
detect_time
),
'
count
'
:
conn_count
if
conn_count
else
1
,
'
sighting_of_ref
'
:
id_alert
,
'
observed_data_refs
'
:
[
id_observed_data
],
'
where_sighted_refs
'
:
[
id_identity
]
}
if
event_time
:
sight_object
[
'
first_seen
'
]
=
StixGenerator
.
convert_timestamp
(
event_time
)
if
cease_time
:
sight_object
[
'
last_seen
'
]
=
StixGenerator
.
convert_timestamp
(
cease_time
)
return
sight_object
def
identity_object
(
self
,
node
,
detect_time
,
create_time
=
None
):
"""
creates new identity object
:param node: one index of IDEA[
'
Node
'
]
:param detect_time: detect time of event
:param create_time: create time of IDEA
:return: identity domain object
"""
identity
=
{
'
type
'
:
"
identity
"
,
'
id
'
:
"
identity--
"
+
str
(
uuid4
()),
'
created
'
:
self
.
convert_timestamp
(
create_time
)
if
create_time
else
self
.
convert_timestamp
(
detect_time
),
'
modified
'
:
self
.
convert_timestamp
(
create_time
)
if
create_time
else
self
.
convert_timestamp
(
detect_time
),
'
name
'
:
node
.
get
(
'
Name
'
,
[
"
unknown
"
]),
'
identity_class
'
:
"
organization
"
}
try
:
identity
[
'
description
'
]
=
"
,
"
.
join
(
node
[
'
SW
'
])
except
KeyError
:
pass
try
:
identity
[
'
labels
'
]
=
[
"
,
"
.
join
(
node
[
'
Type
'
])]
except
KeyError
:
pass
return
identity
def
ipvx_addr_object
(
self
,
addr_objects
):
"""
Creates all IP objects for future mapping with network traffic objects
:param addr_objects: Source or Target dictionary from IDEA message
:return: gathered references to IP objects and newly created IP objects
"""
obs_objects
=
{}
network_values
=
{}
ipv4
=
False
# go through all dictionaries {"port": xxx, "ip": [xxx] ... }
for
ip_dict
in
addr_objects
:
ip_references
=
{}
# put all IPs to addr_objects and increase counter for every new object
try
:
obs_objects
[
str
(
self
.
obj_counter
)]
=
{
'
type
'
:
"
ipv4-addr
"
,
'
value
'
:
"
,
"
.
join
(
ip_dict
[
"
IP4
"
])}
self
.
obj_counter
+=
1
ipv4
=
True
except
KeyError
:
pass
try
:
ip_references
[
'
Proto
'
]
=
ip_dict
[
'
Proto
'
].
remove
(
"
IP
"
)
except
ValueError
:
ip_references
[
'
Proto
'
]
=
ip_dict
[
'
Proto
'
]
except
KeyError
:
pass
try
:
ip_references
[
'
Port
'
]
=
ip_dict
[
'
Port
'
]
except
KeyError
:
pass
# if ipv4 addr or atleast port, fill it into network values dict
if
ipv4
:
try
:
ip_references
[
'
Proto
'
]
=
[
"
ipv4
"
]
+
ip_references
.
get
(
'
Proto
'
)
except
TypeError
:
ip_references
[
'
Proto
'
]
=
[
"
ipv4
"
]
network_values
[
self
.
obj_counter
-
1
]
=
ip_references
# check ipv6
try
:
obs_objects
[
str
(
self
.
obj_counter
)]
=
{
'
type
'
:
"
ipv6-addr
"
,
'
value
'
:
"
,
"
.
join
(
ip_dict
[
"
IP6
"
])}
try
:
ip_references
[
'
Proto
'
]
=
[
"
ipv6
"
]
+
ip_references
.
get
(
'
Proto
'
)
except
TypeError
:
ip_references
[
'
Proto
'
]
=
[
"
ipv6
"
]
network_values
[
self
.
obj_counter
]
=
ip_references
self
.
obj_counter
+=
1
except
KeyError
:
# if Source or Target does not contain IP address, Source or Target part cannot be converted. But if
# Source contains IP address and protocols, which are the same with Target's protocols and Target
# contains port without IP address, it can be converted to one network traffic object (and vise versa).
# So keep it for future possible use.
try
:
if
not
ipv4
and
ip_references
.
get
(
'
Proto
'
)
and
len
(
ip_references
.
get
(
'
Port
'
))
==
1
and
\
len
(
addr_objects
)
==
1
:
network_values
[
'
X
'
]
=
ip_references
except
TypeError
:
pass
ipv4
=
False
return
network_values
,
obs_objects
def
one_network_traffic_object
(
self
,
src_ref
=
None
,
dst_ref
=
None
,
proto
=
None
,
src_port
=
None
,
dst_port
=
None
):
"""
creates one new network traffic object based on references
:param src_ref: source object references
:param dst_ref: destination object references
:param proto: protocols
:param src_port: source ports
:param dst_port: destitation ports
:return: created netwrok traffic object
"""
if
not
proto
and
not
(
src_ref
or
dst_ref
):
return
{}
else
:
network_traffic
=
{
'
type
'
:
"
network-traffic
"
}
if
src_ref
is
not
None
or
self
.
src_ref
.
get
(
'
X
'
):
if
src_ref
!=
'
X
'
and
not
self
.
src_ref
.
get
(
'
X
'
):
network_traffic
[
'
src_ref
'
]
=
str
(
src_ref
)
if
src_port
:
network_traffic
[
'
src_port
'
]
=
src_port
[
0
]
if
type
(
src_port
)
is
list
else
src_port
if
proto
:
network_traffic
[
'
protocols
'
]
=
[
protocol
.
lower
()
for
protocol
in
proto
]
if
dst_ref
is
not
None
or
self
.
dst_ref
.
get
(
'
X
'
):
if
dst_ref
!=
'
X
'
and
not
self
.
dst_ref
.
get
(
'
X
'
):
network_traffic
[
'
dst_ref
'
]
=
str
(
dst_ref
)
if
dst_port
:
network_traffic
[
'
dst_port
'
]
=
dst_port
[
0
]
if
type
(
dst_port
)
is
list
else
dst_port
if
proto
:
network_traffic
[
'
protocols
'
]
=
[
protocol
.
lower
()
for
protocol
in
proto
]
return
network_traffic
def
get_network_traffic_obj
(
self
,
objects
,
src_refs
=
None
,
dst_refs
=
None
,
src_port
=
None
,
dst_port
=
None
):
"""
Creates network traffic objects based on port count. For every port in references create new network traffic object,
if not ports, create single network traffic object
:param objects: already created objects, to which will be added new ones
:param src_refs: source references
:param dst_refs: destination references
:return: all network traffic objects created so far
"""
if
src_refs
:
for
obj_ref
in
src_refs
:
try
:
for
port
in
src_refs
[
obj_ref
][
'
Port
'
]:
# source port forwarded in case of 'X' reference
objects
[
str
(
self
.
obj_counter
)]
=
self
.
one_network_traffic_object
(
obj_ref
,
proto
=
src_refs
[
obj_ref
].
get
(
'
Proto
'
),
src_port
=
port
,
dst_port
=
dst_port
)
self
.
obj_counter
+=
1
except
KeyError
:
objects
[
str
(
self
.
obj_counter
)]
=
self
.
one_network_traffic_object
(
obj_ref
,
proto
=
src_refs
[
obj_ref
].
get
(
'
Proto
'
),
dst_port
=
dst_port
)
self
.
obj_counter
+=
1
else
:
for
obj_ref
in
dst_refs
:
try
:
for
port
in
dst_refs
[
obj_ref
][
'
Port
'
]:
# destination port forwarded in case of 'X' reference
objects
[
str
(
self
.
obj_counter
)]
=
self
.
one_network_traffic_object
(
dst_ref
=
obj_ref
,
proto
=
dst_refs
[
obj_ref
].
get
(
'
Proto
'
),
dst_port
=
port
,
src_port
=
src_port
)
self
.
obj_counter
+=
1
except
KeyError
:
objects
[
str
(
self
.
obj_counter
)]
=
self
.
one_network_traffic_object
(
dst_ref
=
obj_ref
,
proto
=
dst_refs
[
obj_ref
].
get
(
'
Proto
'
),
src_port
=
src_port
)
self
.
obj_counter
+=
1
return
objects
@staticmethod
def
contain_same_proto
(
l1
,
l2
):
"""
checks if the lists contain same protocols, can differ on ipv4 or ipv6 protocol
:param l1: list 1
:param l2: list 2
:return: boolean result
"""
diff
=
set
(
l1
).
symmetric_difference
(
l2
)
if
len
(
diff
)
==
0
:
return
True
elif
len
(
diff
)
==
1
and
(
"
ipv4
"
in
diff
or
"
ipv6
"
in
diff
):
return
True
elif
len
(
diff
)
==
2
and
(
"
ipv4
"
in
diff
and
"
ipv6
"
in
diff
):
return
True
return
False
def
all_network_traffic_objects
(
self
):
"""
Create all network traffic objects, decicion on how will be references splitted is based on references count,
port count and protocols
:return: all network traffic objects
"""
objects
=
{}
if
self
.
src_ref
and
self
.
dst_ref
:
if
len
(
self
.
src_ref
)
>
1
or
len
(
self
.
dst_ref
)
>
1
:
if
len
(
self
.
src_ref
)
==
1
and
next
(
iter
(
self
.
src_ref
.
keys
()))
==
'
X
'
:
# Source contained only Port and Protocols without IP
objects
=
self
.
get_network_traffic_obj
(
objects
,
dst_refs
=
self
.
dst_ref
,
src_port
=
self
.
src_ref
[
'
X
'
][
'
Port
'
])
elif
len
(
self
.
dst_ref
)
==
1
and
next
(
iter
(
self
.
dst_ref
.
keys
()))
==
'
X
'
:
# Source contained only Port and Protocols without IP
objects
=
self
.
get_network_traffic_obj
(
objects
,
src_refs
=
self
.
src_ref
,
dst_port
=
self
.
dst_ref
[
'
X
'
][
'
Port
'
])
else
:
# many IPs or Ports in Source and Target
objects
=
self
.
get_network_traffic_obj
(
objects
,
src_refs
=
self
.
src_ref
)
objects
=
self
.
get_network_traffic_obj
(
objects
,
dst_refs
=
self
.
dst_ref
)
else
:
# len(src_network_ref) == 1 and len(dst_network_ref) == 1
src_ref_key
=
next
(
iter
(
self
.
src_ref
.
keys
()))
dst_ref_key
=
next
(
iter
(
self
.
dst_ref
.
keys
()))
try
:
if
len
(
self
.
src_ref
[
src_ref_key
][
'
Port
'
])
==
1
and
len
(
self
.
dst_ref
[
dst_ref_key
][
'
Port
'
])
==
1
:
try
:
# ports len == 1
if
self
.
contain_same_proto
(
self
.
src_ref
[
src_ref_key
][
'
Proto
'
],
self
.
dst_ref
[
dst_ref_key
][
'
Proto
'
]):
# can put references into one object
objects
[
str
(
self
.
obj_counter
)]
=
self
.
one_network_traffic_object
(
src_ref_key
,
dst_ref_key
,
self
.
src_ref
[
src_ref_key
][
'
Proto
'
],
self
.
src_ref
[
src_ref_key
][
'
Port
'
],
self
.
dst_ref
[
dst_ref_key
][
'
Port
'
])
self
.
obj_counter
+=
1
else
:
# ports len == 1 but different protocols --> split network traffic
objects
=
self
.
get_network_traffic_obj
(
objects
,
src_refs
=
self
.
src_ref
)
objects
=
self
.
get_network_traffic_obj
(
objects
,
dst_refs
=
self
.
dst_ref
)
except
KeyError
:
# src or/and dst has not protocol, if not protocol in both fill in one, else split
if
not
self
.
src_ref
[
src_ref_key
].
get
(
'
Proto
'
)
and
not
self
.
dst_ref
[
dst_ref_key
].
get
(
'
Proto
'
):
# neither one has protocol
return
{}
else
:
# one has protocol --> split, because it is not common for src and dst
objects
=
self
.
get_network_traffic_obj
(
objects
,
src_refs
=
self
.
src_ref
)
objects
=
self
.
get_network_traffic_obj
(
objects
,
dst_refs
=
self
.
dst_ref
)
else
:
# ports has len more than one --> split
if
len
(
self
.
src_ref
)
==
1
and
next
(
iter
(
self
.
src_ref
.
keys
()))
==
'
X
'
:
objects
=
self
.
get_network_traffic_obj
(
objects
,
dst_refs
=
self
.
dst_ref
,
src_port
=
self
.
src_ref
[
'
X
'
][
'
Port
'
])
elif
len
(
self
.
dst_ref
)
==
1
and
next
(
iter
(
self
.
dst_ref
.
keys
()))
==
'
X
'
:
objects
=
self
.
get_network_traffic_obj
(
objects
,
src_refs
=
self
.
src_ref
,
dst_port
=
self
.
dst_ref
[
'
X
'
][
'
Port
'
])
else
:
objects
=
self
.
get_network_traffic_obj
(
objects
,
src_refs
=
self
.
src_ref
)
objects
=
self
.
get_network_traffic_obj
(
objects
,
dst_refs
=
self
.
dst_ref
)
except
KeyError
:
# if src or/and dst has no port, check proto
try
:
if
self
.
contain_same_proto
(
self
.
src_ref
[
src_ref_key
][
'
Proto
'
],
self
.
dst_ref
[
dst_ref_key
][
'
Proto
'
]):
# can put references into one object
objects
[
str
(
self
.
obj_counter
)]
=
self
.
one_network_traffic_object
(
src_ref_key
,
dst_ref_key
,
self
.
src_ref
[
src_ref_key
][
'
Proto
'
],
self
.
src_ref
[
src_ref_key
].
get
(
'
Port
'
),
self
.
dst_ref
[
dst_ref_key
].
get
(
'
Port
'
))
self
.
obj_counter
+=
1
else
:
# proto is different --> split
objects
=
self
.
get_network_traffic_obj
(
objects
,
src_refs
=
self
.
src_ref
)
objects
=
self
.
get_network_traffic_obj
(
objects
,
dst_refs
=
self
.
dst_ref
)
except
KeyError
:
# src or/and dst has no protocol
if
not
self
.
src_ref
[
src_ref_key
].
get
(
'
Proto
'
)
and
not
self
.
dst_ref
[
dst_ref_key
].
get
(
'
Proto
'
):
# neither one has protocol
return
{}
else
:
# one has protocol --> split, because it is not common for src and dst
objects
=
self
.
get_network_traffic_obj
(
objects
,
src_refs
=
self
.
src_ref
)
objects
=
self
.
get_network_traffic_obj
(
objects
,
dst_refs
=
self
.
dst_ref
)
elif
self
.
src_ref
:
# only source refs
objects
=
self
.
get_network_traffic_obj
(
objects
,
src_refs
=
self
.
src_ref
)
elif
self
.
dst_ref
:
# only dest refs
objects
=
self
.
get_network_traffic_obj
(
objects
,
dst_refs
=
self
.
dst_ref
)
return
objects
def
observed_data
(
self
,
identity
,
data
,
labels
=
False
,
origdata
=
False
):
"""
create observed data domain object
:param identity: identity representing IDEA[
'
Node
'
]
:param data: whole Idea message
:param labels: IDEA[
'
Category
'
]
:param origdata: should the original IDEA message be inserted into new STIX message?
:return: created observed data or empty dictionary in case of error
"""
create_timestamp
=
self
.
convert_timestamp
(
data
[
'
CreateTime
'
])
if
data
.
get
(
'
CreateTime
'
)
else
\
self
.
convert_timestamp
(
data
[
'
DetectTime
'
])
observed_data
=
{
'
type
'
:
"
observed-data
"
,
'
id
'
:
"
observed-data--
"
+
str
(
uuid4
()),
'
created_by_ref
'
:
identity
,
'
created
'
:
create_timestamp
,
'
modified
'
:
create_timestamp
,
'
first_observed
'
:
self
.
convert_timestamp
(
data
[
'
EventTime
'
])
if
data
.
get
(
'
EventTime
'
)
else
self
.
convert_timestamp
(
data
[
'
DetectTime
'
]),
'
last_observed
'
:
self
.
convert_timestamp
(
data
[
'
CeaseTime
'
])
if
data
.
get
(
'
CeaseTime
'
)
else
self
.
convert_timestamp
(
data
[
'
DetectTime
'
]),
'
number_observed
'
:
data
[
'
ConnCount
'
]
if
data
.
get
(
'
ConnCount
'
)
else
1
,
}
try
:
observed_data
[
'
external_references
'
]
=
self
.
external_references
(
data
[
'
Ref
'
])
except
KeyError
:
pass
if
origdata
:
observed_data
[
'
x_idea_cesnet_cz_original_data
'
]
=
data
try
:
observed_data
[
'
x_idea_cesnet_cz_event_description
'
]
=
data
[
'
Description
'
]
+
"
,
"
+
data
[
'
Note
'
]
\
if
data
.
get
(
'
Note
'
)
else
data
[
'
Description
'
]
except
KeyError
:
pass
if
labels
:
observed_data
[
'
labels
'
]
=
data
[
'
Category
'
]
# process source and target data
if
data
.
get
(
'
Source
'
)
and
data
.
get
(
'
Target
'
):
self
.
src_ref
,
src_objects
=
self
.
ipvx_addr_object
(
data
[
'
Source
'
])
self
.
dst_ref
,
dst_objects
=
self
.
ipvx_addr_object
(
data
[
'
Target
'
])
# check possible 'X' reference, if not compatible with protocols, then delete it
if
len
(
self
.
src_ref
)
==
1
and
next
(
iter
(
self
.
src_ref
.
keys
()))
==
'
X
'
and
len
(
self
.
dst_ref
)
!=
0
:
for
dst
in
self
.
dst_ref
.
values
():
try
:
if
not
self
.
contain_same_proto
(
self
.
src_ref
[
'
X
'
][
'
Proto
'
],
dst
[
'
Proto
'
]):
self
.
src_ref
=
{}
except
KeyError
:
self
.
src_ref
=
{}
if
len
(
self
.
dst_ref
)
==
1
and
next
(
iter
(
self
.
dst_ref
.
keys
()))
==
'
X
'
and
len
(
self
.
src_ref
)
!=
0
:
for
src
in
self
.
src_ref
.
values
():
try
:
if
not
self
.
contain_same_proto
(
self
.
dst_ref
[
'
X
'
][
'
Proto
'
],
src
[
'
Proto
'
]):
self
.
dst_ref
=
{}
except
KeyError
:
self
.
dst_ref
=
{}
# get src and dst together
objects
=
{
**
src_objects
,
**
dst_objects
}
elif
data
.
get
(
'
Target
'
):
self
.
dst_ref
,
objects
=
self
.
ipvx_addr_object
(
data
[
'
Target
'
])
self
.
src_ref
=
{}
elif
data
.
get
(
'
Source
'
):
self
.
src_ref
,
objects
=
self
.
ipvx_addr_object
(
data
[
'
Source
'
])
self
.
dst_ref
=
{}
network_objects
=
self
.
all_network_traffic_objects
()
if
not
network_objects
:
return
{}
# get ip's and network_objects together
observed_data
[
'
objects
'
]
=
{
**
objects
,
**
network_objects
}
return
observed_data
def
alert_object
(
self
,
category
,
ref
,
detect_time
,
create_time
=
None
):
"""
create alert domain object
:param category: Category of event
:param ref: references of event
:param detect_time: detect time of event
:param create_time: create time of IDEA message
:return: alert domain object
"""
if
"
Vulnerable
"
in
category
:
vulnerability
=
{
'
type
'
:
"
vulnerability
"
,
'
id
'
:
"
vulnerability--
"
+
str
(
uuid4
()),
'
created
'
:
self
.
convert_timestamp
(
create_time
)
if
create_time
else
self
.
convert_timestamp
(
detect_time
),
'
modified
'
:
self
.
convert_timestamp
(
create_time
)
if
create_time
else
self
.
convert_timestamp
(
detect_time
),
'
name
'
:
"
unknown
"
}
if
ref
:
vulnerability
[
'
external_references
'
]
=
self
.
external_references
(
ref
)
return
vulnerability
else
:
malware
=
{
'
type
'
:
"
malware
"
,
'
id
'
:
"
malware--
"
+
str
(
uuid4
()),
'
created
'
:
self
.
convert_timestamp
(
create_time
)
if
create_time
else
self
.
convert_timestamp
(
detect_time
),
'
modified
'
:
self
.
convert_timestamp
(
create_time
)
if
create_time
else
self
.
convert_timestamp
(
detect_time
),
'
name
'
:
"
unknown
"
,
'
labels
'
:
[
"
resource-exploitation
"
if
"
Exploit
"
in
category
else
category
.
lower
()]
}
if
ref
:
malware
[
'
external_references
'
]
=
self
.
external_references
(
ref
)
return
malware
@staticmethod
def
generate_bundle
(
objects
,
event_id
=
None
):
"""
creates bundle object, which is collection of arbitrary STIX objects. Wrapped for transportation.
:param objects: all STIX objects
:param event_id: id of event passed, when the id needs to be set
:return: whole STIX message
"""
return
{
'
type
'
:
"
bundle
"
,
'
id
'
:
"
bundle--
"
+
event_id
if
event_id
is
not
None
else
str
(
uuid4
()),
'
spec_version
'
:
"
2.0
"
,
'
objects
'
:
objects
}
def
generate_sighting_message
(
self
,
data
,
category
,
origdata
=
False
,
event_id
=
None
):
"""
generates message which consists of alert object, so create sighting domain object too
:param data: IDEA message
:param category: IDEA[
'
Category
'
]
:param origdata: should be original IDEA message inserted into STIX message?
:param event_id: id of event passed, when the id needs to be set
:return: whole created STIX message if message was generated, otherwise None
"""
identity
=
[]
try
:
for
idea_id
in
data
[
'
Node
'
]:
identity
.
append
(
self
.
identity_object
(
idea_id
,
data
[
'
DetectTime
'
],
data
.
get
(
'
CreateTime
'
)))
except
KeyError
:
raise
Exception
(
"
Cannot generate STIX 2.0 message, because IDEA message does not contain Node(identity)
"
)
observed_data
=
self
.
observed_data
(
identity
[
-
1
][
'
id
'
],
data
,
origdata
=
origdata
)
if
not
observed_data
:
raise
Exception
(
"
Cannot generate STIX 2.0 message, because IDEA message does not contain IP address or
"
"
protocol in Source or Target.
"
)
alert_object
=
self
.
alert_object
(
category
,
data
.
get
(
'
Ref
'
),
data
[
'
DetectTime
'
],
data
.
get
(
'
CreateTime
'
))
sighting_object
=
self
.
sighting_object
(
identity
[
-
1
][
'
id
'
],
observed_data
[
'
id
'
],
alert_object
[
'
id
'
],
data
[
'
DetectTime
'
],
data
.
get
(
'
ConnCount
'
),
data
.
get
(
'
EventTime
'
),
data
.
get
(
'
CeaseTime
'
))
return
self
.
generate_bundle
(
identity
+
[
alert_object
,
sighting_object
,
observed_data
],
event_id
)
def
generate_observable_message
(
self
,
data
,
origdata
=
False
,
event_id
=
None
):
"""
generates ordinary STIX message, which consists only from observed data and identity
:param data: IDEA message
:param origdata: should be original IDEA message inserted into STIX message?
:param event_id: id of event passed, when the id needs to be set
:return: whole created STIX message
"""
identity
=
[]
try
:
for
idea_id
in
data
[
'
Node
'
]:
identity
.
append
(
self
.
identity_object
(
idea_id
,
data
[
'
DetectTime
'
],
data
.
get
(
'
CreateTime
'
)))
except
KeyError
:
raise
Exception
(
"
Cannot generate STIX 2.0 message, because IDEA message does not contain Node(identity)
"
)
observed_data
=
self
.
observed_data
(
identity
[
-
1
][
'
id
'
],
data
,
True
,
origdata
=
origdata
)
if
not
observed_data
:
raise
Exception
(
"
Cannot generate STIX 2.0 message, because IDEA message does not contain IP address or
"
"
protocol in Source or Target
"
)
return
self
.
generate_bundle
(
identity
+
[
observed_data
],
event_id
)
def
to_stix
(
self
,
idea_event
,
origdata
=
None
,
event_id
=
None
):
sighting_message
=
None
# check if IDEA Category is in sighting types
for
stype
in
StixGenerator
.
sighting_types
:
if
stype
in
""
.
join
(
idea_event
[
'
Category
'
]):
sighting_message
=
stype
# generate STIX message based on IDEA category
if
sighting_message
:
return
self
.
generate_sighting_message
(
idea_event
,
sighting_message
,
origdata
,
event_id
)
else
:
return
self
.
generate_observable_message
(
idea_event
,
origdata
,
event_id
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment