Skip to content
Snippets Groups Projects
Commit ca1b97f6 authored by Jan Mach's avatar Jan Mach
Browse files

Implemented unit tests for 'hawat.blueprints.filters' module.

(Redmine issue: #4410,#1017,#3443)
parent 994d5c67
No related branches found
No related tags found
No related merge requests found
......@@ -5,10 +5,10 @@
<div class="row">
<div class="col-lg-12">
<ol class="breadcrumb">
<li><a href="{{ url_for('home.index') }}">{{ gettext('Home') }}</a></li>
<li><a href="{{ url_for('filters.list') }}">{{ gettext('Filter management') }}</a></li>
<li><a href="{{ url_for('home.index') }}">{{ _('Home') }}</a></li>
<li><a href="{{ url_for('filters.list') }}">{{ _('Filter management') }}</a></li>
{%- if item_action == 'update' %}
<li><a href="{{ url_for('filters.show', item_id = item.id) }}">{{ gettext('Filter detail') }}</a></li>
<li><a href="{{ url_for('filters.show', item_id = item.id) }}">{{ _('Filter detail') }}</a></li>
{%- endif %}
<li class="active">{{ action_name }}</li>
</ol>
......@@ -21,7 +21,7 @@
{%- if item_action == 'createfor' %}
{{ macros_form.render_form_item_static(gettext('Group:'), parent.name) }}
{{ macros_form.render_form_item_static(_('Group:'), parent.name) }}
{%- elif item_action == 'create' or current_user.has_role('admin') %}
......@@ -29,7 +29,7 @@
{%- elif item_action == 'update' %}
{{ macros_form.render_form_item_static(gettext('Group:'), item.group.name) }}
{{ macros_form.render_form_item_static(_('Group:'), item.group.name) }}
{%- endif %}
{{ macros_form.render_form_item_default(form.name) }}
......@@ -68,10 +68,12 @@
{{ macros_form.render_form_errors(form.next.errors) }}
{{ form.next }}
{%- if 'csrf_token' in form %}
{{ macros_form.render_form_errors(form.csrf_token.errors) }}
{{ form.csrf_token }}
{%- endif %}
<div class="btn-toolbar" role="toolbar" aria-label="{{ gettext('Form submission buttons') }}">
<div class="btn-toolbar" role="toolbar" aria-label="{{ _('Form submission buttons') }}">
<div class="btn-group" role="group">
{{ form.cancel(class_='btn btn-default') }}
{{ form.preview(class_='btn btn-default') }}
......@@ -93,7 +95,7 @@
<div class="row">
<div class="col-md-6 col-md-offset-3">
<h4>{{ gettext('Filter structure preview:') }}</h4>
<h4>{{ _('Filter structure preview:') }}</h4>
</div> <!-- /.col-md-6 -->
</div> <!-- /.row -->
......
......@@ -6,25 +6,25 @@
<thead>
<tr>
<th>
{{ gettext('Group') }}
{{ _('Group') }}
</th>
<th>
{{ gettext('Name') }}
{{ _('Name') }}
</th>
<th>
{{ gettext('Type') }}
{{ _('Type') }}
</th>
<th>
{{ gettext('Description') }}
{{ _('Description') }}
</th>
<th>
{{ gettext('Hits') }}
{{ _('Hits') }}
</th>
<th>
{{ gettext('State') }}
{{ _('State') }}
</th>
<th data-toggle="tooltip" title="{{ gettext('Contextual item actions') }}">
{{ get_icon('actions') }} {{ gettext('Actions') }}
<th data-toggle="tooltip" title="{{ _('Contextual item actions') }}">
{{ get_icon('actions') }} {{ _('Actions') }}
</th>
</tr>
</thead>
......@@ -32,18 +32,18 @@
{%- for item in items %}
<tr>
<td>
<a data-toggle="tooltip" href="{{ url_for('groups.show', item_id = item.group.id ) }}" title="{{ gettext('View details of group &quot;%(item)s&quot;', item = item.group.name) }}">
<a data-toggle="tooltip" href="{{ url_for('groups.show', item_id = item.group.id ) }}" title="{{ _('View details of group &quot;%(item)s&quot;', item = item.group.name) }}">
{{ item.group.name }}
</a>
</td>
<td>
{{ item.name | default(gettext('<< unknown >>'), True) }}
{{ item.name | default(_('<< unknown >>'), True) }}
</td>
<td>
{{ item.type | default(gettext('<< unknown >>'), True) }}
{{ item.type | default(_('<< unknown >>'), True) }}
</td>
<td>
{{ item.description | default(gettext('<< unknown >>'), True) | truncate(50) }}
{{ item.description | default(_('<< unknown >>'), True) | truncate(50) }}
</td>
<td>
{{ item.hits }}
......
......@@ -15,7 +15,7 @@
</div>
<p>
<small>
<strong>{{ gettext('Filter created') }}:</strong> {{ babel_format_datetime(item.createtime) }} ({{ gettext('%(delta)s ago', delta = babel_format_timedelta(current_datetime_utc - item.createtime)) }})
<strong>{{ _('Filter created') }}:</strong> {{ babel_format_datetime(item.createtime) }} ({{ _('%(delta)s ago', delta = babel_format_timedelta(current_datetime_utc - item.createtime)) }})
</small>
</p>
<br>
......@@ -24,20 +24,20 @@
<ul class="nav nav-tabs" role="tablist">
<li role="presentation" class="active">
<a href="#tab-general" aria-controls="tab-general" role="tab" data-toggle="tab">
<strong>{{ get_icon('alert-info') }} {{ gettext('General information') }}</strong>
<strong>{{ get_icon('alert-info') }} {{ _('General information') }}</strong>
</a>
</li>
{%- if filter_preview %}
<li role="presentation">
<a href="#tab-tree" aria-controls="tab-tree" role="tab" data-toggle="tab">
<strong>{{ get_icon('structure') }} {{ gettext('Filter tree structure') }}</strong>
<strong>{{ get_icon('structure') }} {{ _('Filter tree structure') }}</strong>
</a>
</li>
{%- endif %}
{%- if can_access_endpoint('filters.update', item) %}
<li role="presentation">
<a href="#tab-changelog" aria-controls="tab-changelog" role="tab" data-toggle="tab">
<strong>{{ get_icon('module-changelogs') }} {{ gettext('Changelogs') }}</strong> <span class="badge">{{ item_changelog | length }}</span>
<strong>{{ get_icon('module-changelogs') }} {{ _('Changelogs') }}</strong> <span class="badge">{{ item_changelog | length }}</span>
</a>
</li>
{%- endif %}
......@@ -52,49 +52,49 @@
<tbody>
<tr>
<th>
{{ gettext('Group') }}:
{{ _('Group') }}:
</th>
<td>
<a data-toggle="tooltip" href="{{ url_for('groups.show', item_id = item.group.id ) }}" title="{{ gettext('View details of group &quot;%(item)s&quot;', item = item.group.name) }}">
<a data-toggle="tooltip" href="{{ url_for('groups.show', item_id = item.group.id ) }}" title="{{ _('View details of group &quot;%(item)s&quot;', item = item.group.name) }}">
{{ item.group.name }}
</a>
</td>
</tr>
<tr>
<th>
{{ gettext('Name') }}:
{{ _('Name') }}:
</th>
<td>
{{ item.name | default(gettext('<< unknown >>'), True) }}
{{ item.name | default(_('<< unknown >>'), True) }}
</td>
</tr>
<tr>
<th>
{{ gettext('Type') }}:
{{ _('Type') }}:
</th>
<td>
{{ item.type | default(gettext('<< unknown >>'), True) }}
{{ item.type | default(_('<< unknown >>'), True) }}
</td>
</tr>
<tr>
<th>
{{ gettext('Description') }}:
{{ _('Description') }}:
</th>
<td>
{{ item.description | default(gettext('<< unknown >>'), True) }}
{{ item.description | default(_('<< unknown >>'), True) }}
</td>
</tr>
<tr>
<th>
{{ gettext('Filter') }}:
{{ _('Filter') }}:
</th>
<td>
<code>{{ item.filter | default(gettext('<< unknown >>'), True) }}</code>
<code>{{ item.filter | default(_('<< unknown >>'), True) }}</code>
</td>
</tr>
<tr>
<th>
{{ gettext('Hits') }}:
{{ _('Hits') }}:
</th>
<td>
<span class="badge">{{ item.hits }}</span>
......@@ -102,7 +102,7 @@
</tr>
<tr>
<th>
{{ gettext('State') }}:
{{ _('State') }}:
</th>
<td>
{{ macros_site.render_label_item_state(item.enabled, True) }}
......@@ -133,12 +133,12 @@
{{ macros_page.render_changelog_records(item_changelog, context_action_menu_changelogs) }}
<p>
<small>
{{ gettext('Displaying only latest %(count)s changelogs', count = 100) }}
{{ _('Displaying only latest %(count)s changelogs', count = 100) }}
</small>
</p>
{%- else %}
{%- call macros_site.render_alert('info', False) %}
{{ gettext('This object does not have any changelog records at the moment.') }}
{{ _('This object does not have any changelog records at the moment.') }}
{%- endcall %}
{%- endif %}
</div>
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#-------------------------------------------------------------------------------
# This file is part of Mentat system (https://mentat.cesnet.cz/).
#
# Copyright (C) since 2011 CESNET, z.s.p.o (http://www.ces.net/)
# Use of this source is governed by the MIT license, see LICENSE file.
#-------------------------------------------------------------------------------
"""
Unit tests for :py:mod:`hawat.blueprints.filters`.
"""
import unittest
from mentat.datatype.sqldb import FilterModel
import vial.const
import vial.test
import vial.db
from hawat.test import BaseAppTestCase, ItemCreateTestCase
class FilterTestMixin:
"""
Mixin class for filter specific tests.
"""
@staticmethod
def _fname(gname):
return 'FLT_{}'.format(gname)
def filter_get(self, filter_name, with_app_context = False):
"""
Get given filter.
"""
if not with_app_context:
return vial.db.db_session().query(FilterModel).filter(FilterModel.name == filter_name).one_or_none()
with self.app.app_context():
return vial.db.db_session().query(FilterModel).filter(FilterModel.name == filter_name).one_or_none()
def filter_save(self, filter_object, with_app_context = False):
"""
Update given filter.
"""
if not with_app_context:
vial.db.db_session().add(filter_object)
vial.db.db_session().commit()
with self.app.app_context():
vial.db.db_session().add(filter_object)
vial.db.db_session().commit()
def filter_id(self, filter_type, with_app_context = False):
"""
Get ID of given filter.
"""
if not with_app_context:
fobj = self.filter_get(filter_type)
return fobj.id
with self.app.app_context():
fobj = self.filter_get(filter_type)
return fobj.id
class FiltersListTestCase(BaseAppTestCase):
"""Class for testing ``filters.list`` endpoint."""
def _attempt_fail(self):
self.assertGetURL(
'/filters/list',
403
)
def _attempt_succeed(self):
self.assertGetURL(
'/filters/list',
200,
[
b'View details of reporting filter &quot;FLT_DEMO_GROUP_A&quot;',
b'View details of reporting filter &quot;FLT_DEMO_GROUP_B&quot;',
]
)
@vial.test.do_as_user_decorator(vial.const.ROLE_USER)
def test_01_as_user(self):
"""Test access as user ``user``."""
self._attempt_fail()
@vial.test.do_as_user_decorator(vial.const.ROLE_DEVELOPER)
def test_02_as_developer(self):
"""Test access as user ``developer``."""
self._attempt_fail()
@vial.test.do_as_user_decorator(vial.const.ROLE_MAINTAINER)
def test_03_as_maintainer(self):
"""Test access as user ``maintainer``."""
self._attempt_succeed()
@vial.test.do_as_user_decorator(vial.const.ROLE_ADMIN)
def test_04_as_admin(self):
"""Test access as user ``admin``."""
self._attempt_succeed()
class FiltersShowTestCase(FilterTestMixin, BaseAppTestCase):
"""Base class for testing ``filters.show`` endpoint."""
def _attempt_fail(self, fname):
fid = self.filter_id(fname, True)
self.assertGetURL(
'/filters/{}/show'.format(fid),
403
)
def _attempt_succeed(self, fname):
fid = self.filter_id(fname, True)
self.assertGetURL(
'/filters/{}/show'.format(fid),
200,
[
'<h3>{}</h3>'.format(fname).encode('utf8'),
b'<strong>Filter created:</strong>'
]
)
@vial.test.do_as_user_decorator(vial.const.ROLE_USER)
def test_01_as_user(self):
"""
Test access as user 'user'.
Only power user is able to view all available filters.
"""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_fail(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_DEVELOPER)
def test_02_as_developer(self):
"""
Test access as user 'developer'.
Only power user is able to view all available filters.
"""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_fail(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_MAINTAINER)
def test_03_as_maintainer(self):
"""
Test access as user 'maintainer'.
Only power user is able to view all available filters.
"""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_ADMIN)
def test_04_as_admin(self):
"""
Test access as user 'admin'.
Only power user is able to view all available filters.
"""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_B))
class FiltersCreateTestCase(FilterTestMixin, ItemCreateTestCase):
"""Class for testing ``filters.create`` endpoint."""
filter_data_fixture = [
('submit', 'Create'),
('name', 'TEST_FILTER'),
('type', 'basic'),
('group', 1),
('description', 'Test filter for unit testing purposes.'),
('filter', 'Category IN ["Recon.Scanning"] AND Target.IP4 IN ["191.168.1.1", "10.0.0.1"]'),
('enabled', True)
]
def _attempt_fail(self):
self.assertGetURL(
'/filters/create',
403
)
def _attempt_succeed(self):
self.assertCreate(
'/filters/create',
self.filter_data_fixture,
[
b'Reporting filter <strong>TEST_FILTER</strong> for group ',
b' was successfully created.'
]
)
@vial.test.do_as_user_decorator(vial.const.ROLE_USER)
def test_01_as_user(self):
"""Test access as user 'user'."""
self._attempt_fail()
@vial.test.do_as_user_decorator(vial.const.ROLE_DEVELOPER)
def test_02_as_developer(self):
"""Test access as user 'developer'."""
self._attempt_fail()
@vial.test.do_as_user_decorator(vial.const.ROLE_MAINTAINER)
def test_03_as_maintainer(self):
"""Test access as user 'maintainer'."""
self._attempt_succeed()
@vial.test.do_as_user_decorator(vial.const.ROLE_ADMIN)
def test_04_as_admin(self):
"""Test access as user 'admin'."""
self._attempt_succeed()
class FiltersCreateForTestCase(FilterTestMixin, ItemCreateTestCase):
"""Class for testing ``filters.createfor`` endpoint."""
filter_data_fixture = [
('submit', 'Create'),
('name', 'TEST_FILTER'),
('type', 'basic'),
('description', 'Test filter for unit testing purposes.'),
('filter', 'Category IN ["Recon.Scanning"] AND Target.IP4 IN ["191.168.1.1", "10.0.0.1"]'),
('enabled', True)
]
def _attempt_fail(self, gname):
gid = self.group_id(gname, True)
self.assertGetURL(
'/filters/createfor/{}'.format(gid),
403
)
def _attempt_succeed(self, gname):
gid = self.group_id(gname, True)
self.assertCreate(
'/filters/createfor/{}'.format(gid),
self.filter_data_fixture,
[
b'Reporting filter <strong>TEST_FILTER</strong> for group ',
b' was successfully created.'
]
)
@vial.test.do_as_user_decorator(vial.const.ROLE_USER)
def test_01_as_user(self):
"""Test access as user 'user'."""
self._attempt_fail(vial.test.DEMO_GROUP_A)
self._attempt_fail(vial.test.DEMO_GROUP_B)
@vial.test.do_as_user_decorator(vial.const.ROLE_DEVELOPER)
def test_02_as_developer(self):
"""Test access as user 'developer'."""
self._attempt_succeed(vial.test.DEMO_GROUP_A)
self._attempt_fail(vial.test.DEMO_GROUP_B)
@vial.test.do_as_user_decorator(vial.const.ROLE_MAINTAINER)
def test_03_as_maintainer(self):
"""Test access as user 'maintainer'."""
self._attempt_succeed(vial.test.DEMO_GROUP_A)
self._attempt_succeed(vial.test.DEMO_GROUP_B)
@vial.test.do_as_user_decorator(vial.const.ROLE_ADMIN)
def test_04_as_admin(self):
"""Test access as user 'admin'."""
self._attempt_succeed(vial.test.DEMO_GROUP_A)
self._attempt_succeed(vial.test.DEMO_GROUP_B)
class FiltersUpdateTestCase(FilterTestMixin, BaseAppTestCase):
"""Class for testing ``filters.update`` endpoint."""
def _attempt_fail(self, fname):
fid = self.filter_id(fname, True)
self.assertGetURL(
'/filters/{}/update'.format(fid),
403
)
def _attempt_succeed(self, fname):
fid = self.filter_id(fname, True)
self.assertGetURL(
'/filters/{}/update'.format(fid),
200,
[
b'Update reporting filter details'
]
)
@vial.test.do_as_user_decorator(vial.const.ROLE_USER)
def test_01_as_user(self):
"""Test access as user 'user'."""
self._attempt_fail(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_fail(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_DEVELOPER)
def test_04_as_developer(self):
"""Test access as user 'developer'."""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_fail(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_MAINTAINER)
def test_05_as_maintainer(self):
"""Test access as user 'maintainer'."""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_ADMIN)
def test_06_as_admin(self):
"""Test access as user 'admin'."""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_B))
class FiltersEnableDisableTestCase(FilterTestMixin, BaseAppTestCase):
"""Class for testing ``filters.enable`` and ``filters.disable`` endpoint."""
def _attempt_fail(self, fname):
fid = self.filter_id(fname, True)
self.assertGetURL(
'/filters/{}/disable'.format(fid),
403
)
self.assertGetURL(
'/filters/{}/enable'.format(fid),
403
)
def _attempt_succeed(self, fname):
fid = self.filter_id(fname, True)
self.assertGetURL(
'/filters/{}/disable'.format(fid),
200,
[
b'Are you really sure you want to disable following item:'
]
)
self.assertPostURL(
'/filters/{}/disable'.format(fid),
{
'submit': 'Confirm'
},
200,
[
b'was successfully disabled.'
]
)
self.assertGetURL(
'/filters/{}/enable'.format(fid),
200,
[
b'Are you really sure you want to enable following item:'
]
)
self.assertPostURL(
'/filters/{}/enable'.format(fid),
{
'submit': 'Confirm'
},
200,
[
b'was successfully enabled.'
]
)
@vial.test.do_as_user_decorator(vial.const.ROLE_USER)
def test_01_as_user(self):
"""Test access as user 'user'."""
self._attempt_fail(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_fail(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_DEVELOPER)
def test_02_as_developer(self):
"""Test access as user 'developer'."""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_fail(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_MAINTAINER)
def test_03_as_maintainer(self):
"""Test access as user 'maintainer'."""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_ADMIN)
def test_04_as_admin(self):
"""Test access as user 'admin'."""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_B))
class FiltersDeleteTestCase(FilterTestMixin, BaseAppTestCase):
"""Class for testing ``filters.delete`` endpoint."""
def _attempt_fail(self, fname):
fid = self.filter_id(fname, True)
self.assertGetURL(
'/filters/{}/delete'.format(fid),
403
)
def _attempt_succeed(self, fname):
fid = self.filter_id(fname, True)
self.assertGetURL(
'/filters/{}/delete'.format(fid),
200,
[
b'Are you really sure you want to permanently remove following item:'
]
)
self.assertPostURL(
'/filters/{}/delete'.format(fid),
{
'submit': 'Confirm'
},
200,
[
b'was successfully and permanently deleted.'
]
)
@vial.test.do_as_user_decorator(vial.const.ROLE_USER)
def test_01_as_user(self):
"""Test access as user 'user'."""
self._attempt_fail(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_fail(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_DEVELOPER)
def test_02_as_developer(self):
"""Test access as user 'developer'."""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_fail(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_MAINTAINER)
def test_03_as_maintainer(self):
"""Test access as user 'maintainer'."""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_B))
@vial.test.do_as_user_decorator(vial.const.ROLE_ADMIN)
def test_04_as_admin(self):
"""Test access as user 'admin'."""
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_A))
self._attempt_succeed(self._fname(vial.test.DEMO_GROUP_B))
#-------------------------------------------------------------------------------
if __name__ == "__main__":
unittest.main()
......@@ -35,72 +35,76 @@ import mentat.services.eventstorage
_DB = None
def _get_cached_values(cache_file):
cache_dir = flask.current_app.config.get(
hawat.const.CFGKEY_MENTAT_CACHE_DIR
)
return pyzenkit.jsonconf.json_load(
os.path.join(
cache_dir,
cache_file
)
)
def _get_values(cache_file, column):
try:
return _get_cached_values(cache_file)
except FileNotFoundError:
return db_get().distinct_values(column)
def get_event_source_types():
"""
Return list of all available event source types.
"""
cache_dir = flask.current_app.config.get(hawat.const.CFGKEY_MENTAT_CACHE_DIR)
return pyzenkit.jsonconf.json_load(os.path.join(cache_dir, 'itemset-stat-sourcetypes.json'))
return _get_values('itemset-stat-sourcetypes.json', 'source_type')
def get_event_target_types():
"""
Return list of all available event target types.
"""
cache_dir = flask.current_app.config.get(hawat.const.CFGKEY_MENTAT_CACHE_DIR)
return pyzenkit.jsonconf.json_load(os.path.join(cache_dir, 'itemset-stat-targettypes.json'))
return _get_values('itemset-stat-targettypes.json', 'target_type')
def get_event_detector_types():
"""
Return list of all available event detector types.
"""
cache_dir = flask.current_app.config.get(hawat.const.CFGKEY_MENTAT_CACHE_DIR)
return pyzenkit.jsonconf.json_load(os.path.join(cache_dir, 'itemset-stat-detectortypes.json'))
return _get_values('itemset-stat-detectortypes.json', 'node_type')
def get_event_detectors():
"""
Return list of all available event detectors.
"""
cache_dir = flask.current_app.config.get(hawat.const.CFGKEY_MENTAT_CACHE_DIR)
return pyzenkit.jsonconf.json_load(os.path.join(cache_dir, 'itemset-stat-detectors.json'))
return _get_values('itemset-stat-detectors.json', 'node_name')
def get_event_categories():
"""
Return list of all available event categories.
"""
cache_dir = flask.current_app.config.get(hawat.const.CFGKEY_MENTAT_CACHE_DIR)
return pyzenkit.jsonconf.json_load(os.path.join(cache_dir, 'itemset-stat-categories.json'))
return _get_values('itemset-stat-categories.json', 'category')
def get_event_severities():
"""
Return list of all available event severities.
"""
cache_dir = flask.current_app.config.get(hawat.const.CFGKEY_MENTAT_CACHE_DIR)
return pyzenkit.jsonconf.json_load(os.path.join(cache_dir, 'itemset-stat-severities.json'))
return _get_values('itemset-stat-severities.json', 'cesnet_eventseverity')
def get_event_classes():
"""
Return list of all available event classes.
"""
cache_dir = flask.current_app.config.get(hawat.const.CFGKEY_MENTAT_CACHE_DIR)
return pyzenkit.jsonconf.json_load(os.path.join(cache_dir, 'itemset-stat-classes.json'))
return _get_values('itemset-stat-classes.json', 'cesnet_eventclass')
def get_event_protocols():
"""
Return list of all available event protocols.
"""
cache_dir = flask.current_app.config.get(hawat.const.CFGKEY_MENTAT_CACHE_DIR)
return pyzenkit.jsonconf.json_load(os.path.join(cache_dir, 'itemset-stat-protocols.json'))
return _get_values('itemset-stat-protocols.json', 'protocol')
def get_event_inspection_errs():
"""
Return list of all available event inspection errors.
"""
cache_dir = flask.current_app.config.get(hawat.const.CFGKEY_MENTAT_CACHE_DIR)
return pyzenkit.jsonconf.json_load(os.path.join(cache_dir, 'itemset-stat-inspectionerrors.json'))
return _get_values('itemset-stat-inspectionerrors.json', 'cesnet_inspectionerrors')
def db_settings(app):
"""
......
......@@ -16,7 +16,7 @@ import vial.const
import vial.db
from vial.test import _config_testapp_vial, VialTestCase, ItemCreateVialTestCase, RegistrationVialTestCase
from mentat.datatype.sqldb import SettingsReportingModel
from mentat.datatype.sqldb import SettingsReportingModel, FilterModel
from mentat.const import CKEY_CORE_DATABASE, CKEY_CORE_DATABASE_EVENTSTORAGE
......@@ -57,6 +57,14 @@ class BaseAppTestCase(VialTestCase):
if isinstance(fixture, self.app.get_model(vial.const.MODEL_GROUP)):
SettingsReportingModel(group = fixture)
vial.db.db_session().add(fixture)
FilterModel(
group = fixture,
name = 'FLT_{}'.format(fixture.name),
type = 'basic',
filter = 'Source.IP4 == 127.0.0.1',
enabled = True,
description = 'Filter for {}'.format(fixture.name)
)
vial.db.db_session().commit()
class ItemCreateTestCase(ItemCreateVialTestCase, BaseAppTestCase):
......
......@@ -76,13 +76,16 @@ Custom config file options
Example configuration::
"itemsets": [
["itemset-stat-categories", "category"],
["itemset-stat-sourcetypes", "source_type"],
["itemset-stat-targettypes", "target_type"],
["itemset-stat-detectors", "node_name"],
["itemset-stat-detectortypes", "node_type"],
["itemset-stat-protocols", "protocol"],
["itemset-stat-groups", "cesnet_resolvedabuses"]
["itemset-stat-categories", "category"],
["itemset-stat-sourcetypes", "source_type"],
["itemset-stat-targettypes", "target_type"],
["itemset-stat-detectors", "node_name"],
["itemset-stat-detectortypes", "node_type"],
["itemset-stat-protocols", "protocol"],
["itemset-stat-groups", "cesnet_resolvedabuses"],
["itemset-stat-classes", "cesnet_eventclass"],
["itemset-stat-severities", "cesnet_eventseverity"],
["itemset-stat-inspectionerrors", "cesnet_inspectionerrors"]
],
*Type:* ``list of list of strings``, *default:* ``[]``
......
......@@ -428,8 +428,8 @@ class ItemCreateVialTestCase(VialTestCase):
"""
maxDiff = None
def assertCreate(self, url, data, content_checks = None): # pylint: disable=locally-disabled,invalid-name
response = response = self.client.get(
def assertCreate(self, url, data, content_checks = None, print_response = False): # pylint: disable=locally-disabled,invalid-name
response = self.client.get(
url,
follow_redirects = True
)
......@@ -439,7 +439,7 @@ class ItemCreateVialTestCase(VialTestCase):
for idx, param in enumerate(data):
if idx == len(data) - 1:
break
response = response = self.client.post(
response = self.client.post(
url,
follow_redirects = True,
data = {
......@@ -457,6 +457,12 @@ class ItemCreateVialTestCase(VialTestCase):
i[0]: i[1] for i in data
}
)
if print_response:
print("--------------------------------------------------------------------------------")
print("Response for {}, {}: {} ({})".format(url, pprint.pformat(data), response.status_code, response.status))
pprint.pprint(response.headers)
pprint.pprint(response.data)
print("--------------------------------------------------------------------------------")
self.assertEqual(response.status_code, 200)
self.assertTrue(b'<div class="alert alert-success alert-dismissible">' in response.data)
if content_checks:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment