Skip to content
Snippets Groups Projects
Commit 30edb1ab authored by Jan Mach's avatar Jan Mach
Browse files

Merge branch 'hruska-feature-#4571-ips-aggregation' into devel.

(Redmine issue: #4571)
parents 53135c9d 3bfc6752
No related branches found
No related tags found
No related merge requests found
......@@ -45,6 +45,8 @@ import typedcols
import idea.base
import idea.lite
import ipranges
import pynspect.jpath
import pynspect.rules
import pynspect.compilers
......@@ -263,6 +265,42 @@ class Idea(idea.lite.Idea): # pylint: disable=locally-disabled,too-many-ancesto
"""
return [name for name in (node.get('Name', None) for node in self.get('Node', [])) if name]
@staticmethod
def get_ranges(addresses, rngcls, ipcls):
"""
Helper function for making ranges of IP addresses.
:param list addrs: List of single addresses, nets or ranges of IP4 or IP6 addresses.
:param class rngcls: The class of returned list (either IP4Range or IP6Range).
:param class ipcls: The class of single IP address (either IP4 or IP6).
:return: The list of ranges of IP addresses.
:rtype: list of ``rngcls`` objects
"""
addrs = sorted(addresses, reverse = True, key = lambda ip: ip.high())
result = []
prev = None
ipmin = None
ipmax = None
for curr in addrs:
if not prev:
ipmin = curr.low()
ipmax = curr.high()
else:
if curr.high() + 1 >= ipmin:
ipmin = min(curr.low(), ipmin)
else:
result.append(rngcls((ipmin, ipmax)) if ipmin != ipmax else ipcls(ipmin))
ipmin = curr.low()
ipmax = curr.high()
prev = curr
if ipmin and ipmax:
result.append(rngcls((ipmin, ipmax)) if ipmin != ipmax else ipcls(ipmin))
return result
def get_addresses(self, node, get_v4 = True, get_v6 = True):
"""
Convenience method for returning list of all addresses (both v4 and v6)
......@@ -275,12 +313,19 @@ class Idea(idea.lite.Idea): # pylint: disable=locally-disabled,too-many-ancesto
:rtype: list of ipranges
"""
result = []
ip4s = []
ip6s = []
if node in self:
for src in self[node]:
if get_v4 and 'IP4' in src:
result.extend(list(src['IP4']))
ip4s.extend(list(src['IP4']))
if get_v6 and 'IP6' in src:
result.extend(list(src['IP6']))
ip6s.extend(list(src['IP6']))
result.extend(self.get_ranges(ip4s, ipranges.IP4Range, ipranges.IP4))
result.extend(self.get_ranges(ip6s, ipranges.IP6Range, ipranges.IP6))
return result
def get_ports(self, node):
......
......@@ -54,7 +54,7 @@ class TestMentatIdeaInternal(unittest.TestCase):
'Source': [
{
'Type': ['Phishing'],
'IP4': ['192.168.0.2-192.168.0.5', '192.168.0.10/25', '192.168.1.1'],
'IP4': ['192.168.0.2-192.168.0.5', '192.168.0.0/25', '192.168.1.1', '192.168.1.2', '192.168.1.4'],
'IP6': ['2001:db8::ff00:42:0/112','2001:db8::ff00:42:50'],
'Hostname': ['example.com'],
'URL': ['http://example.com/cgi-bin/killemall'],
......@@ -237,15 +237,14 @@ class TestMentatIdeaInternal(unittest.TestCase):
self.assertEqual(idea_internal_1.get_description(), 'Synthetic example')
self.assertEqual(idea_internal_1.get_detectors(), ['org.example.kippo_honey'])
self.assertEqual(idea_internal_1.get_addresses('Source'), [
ipranges.IP4Range('192.168.0.2-192.168.0.5'),
ipranges.IP4Net('192.168.0.10/25'),
ipranges.IP4('192.168.1.1'),
ipranges.IP6Net('2001:db8::ff00:42:0/112'),
ipranges.IP6('2001:db8::ff00:42:50')
ipranges.IP4('192.168.1.4'),
ipranges.IP4Range('192.168.1.1-192.168.1.2'),
ipranges.IP4Range('192.168.0.0-192.168.0.127'),
ipranges.IP6Range('2001:db8::ff00:42:0-2001:db8::ff00:42:ffff')
])
self.assertEqual(idea_internal_1.get_addresses('Target'), [
ipranges.IP6Net('2001:ffff::ff00:42:0/112'),
ipranges.IP4Net('10.2.2.0/24')
ipranges.IP4Range('10.2.2.0-10.2.2.255'),
ipranges.IP6Range('2001:ffff::ff00:42:0-2001:ffff::ff00:42:ffff')
])
self.assertEqual(idea_internal_1.get_ports('Source'), [])
self.assertEqual(idea_internal_1.get_ports('Target'), [22, 25, 443])
......@@ -272,6 +271,46 @@ class TestMentatIdeaInternal(unittest.TestCase):
new = json.dumps(idea_internal_2, indent=4, sort_keys=True, default=idea_internal_2.json_default)
self.assertEqual(orig, new, "\n".join([l for l in difflib.context_diff(orig.split("\n"), new.split("\n"))]))
def test_05_get_ranges(self):
"""
Perform tests of get_ranges function.
"""
self.maxDiff = None
self.assertEqual(mentat.idea.internal.Idea.get_ranges([], ipranges.IP4Range, ipranges.IP4), [])
self.assertEqual(mentat.idea.internal.Idea.get_ranges([], ipranges.IP6Range, ipranges.IP4), [])
self.assertEqual(mentat.idea.internal.Idea.get_ranges([
ipranges.IP4('192.168.0.2'),
ipranges.IP4('192.168.0.3'),
ipranges.IP4('192.168.0.4'),
ipranges.IP4('192.168.0.5')],
ipranges.IP4Range, ipranges.IP4),
[ipranges.IP4Range((ipranges.IP4('192.168.0.2'), ipranges.IP4('192.168.0.5')))])
self.assertEqual(mentat.idea.internal.Idea.get_ranges([
ipranges.IP4('192.168.0.3'),
ipranges.IP4('192.168.0.5'),
ipranges.IP4('192.168.1.19'),
ipranges.IP4('192.168.1.20')],
ipranges.IP4Range, ipranges.IP4),[
ipranges.IP4Range((ipranges.IP4('192.168.1.19'), ipranges.IP4('192.168.1.20'))),
ipranges.IP4('192.168.0.5'),
ipranges.IP4('192.168.0.3')])
self.assertEqual(mentat.idea.internal.Idea.get_ranges([
ipranges.IP4('192.168.0.2'),
ipranges.IP4('192.168.0.5'),
ipranges.IP4('192.168.0.7')],
ipranges.IP4Range, ipranges.IP4), [
ipranges.IP4('192.168.0.7'),
ipranges.IP4('192.168.0.5'),
ipranges.IP4('192.168.0.2')])
self.assertEqual(mentat.idea.internal.Idea.get_ranges([
ipranges.IP6('2001:db8::ff00:42:50'),
ipranges.IP6('2001:0db8::ff00:42:51'),
ipranges.IP6('2001:db8::ff00:42:0052'),
ipranges.IP6('2001:0db8::ff00:0042:0053')],
ipranges.IP6Range, ipranges.IP6),
[ipranges.IP6Range((ipranges.IP6('2001:db8::ff00:42:50'), ipranges.IP6('2001:db8::ff00:42:53')))])
class TestIDEAFilterCompiler(unittest.TestCase):
"""
......
......@@ -55,7 +55,7 @@ class TestMentatIdeaJSON(unittest.TestCase):
'Source': [
{
'Type': ['Phishing'],
'IP4': ['192.168.0.2-192.168.0.5', '192.168.0.10/25', '192.168.1.1'],
'IP4': ['192.168.0.2-192.168.0.5', '192.168.0.0/25', '192.168.1.1'],
'IP6': ['2001:db8::ff00:42:0/112','2001:db8::ff00:42:50'],
'Hostname': ['example.com'],
'URL': ['http://example.com/cgi-bin/killemall'],
......@@ -206,15 +206,13 @@ class TestMentatIdeaJSON(unittest.TestCase):
self.assertEqual(idea_sqldb.ident, '4390fc3f-c753-4a3e-bc83-1b44f24baf75')
self.assertEqual(idea_sqldb.detecttime.isoformat(), '2012-11-03T10:00:07')
self.assertEqual(idea_sqldb.source_ip, [
ipranges.IP4Range('192.168.0.2-192.168.0.5'),
ipranges.IP4Net('192.168.0.10/25'),
ipranges.IP4('192.168.1.1'),
ipranges.IP6Net('2001:db8::ff00:42:0/112'),
ipranges.IP6('2001:db8::ff00:42:50')
ipranges.IP4Range('192.168.0.0-192.168.0.127'),
ipranges.IP6Range('2001:db8::ff00:42:0-2001:db8::ff00:42:ffff')
])
self.assertEqual(idea_sqldb.target_ip, [
ipranges.IP6Net('2001:ffff::ff00:42:0/112'),
ipranges.IP4Net('10.2.2.0/24')
ipranges.IP4Range('10.2.2.0-10.2.2.255'),
ipranges.IP6Net('2001:ffff::ff00:42:0/112')
])
self.assertEqual(idea_sqldb.source_ip_aggr_ip4, ipranges.IP4Range('192.168.0.0-192.168.1.1'))
self.assertEqual(idea_sqldb.source_ip_aggr_ip6, ipranges.IP6Range('2001:db8::ff00:42:0-2001:db8::ff00:42:ffff'))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment