Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Pavel.Valach/warden
1 result
Show changes
Commits on Source (18)
...@@ -3,6 +3,5 @@ ...@@ -3,6 +3,5 @@
"certfile": "cert.pem", "certfile": "cert.pem",
"keyfile": "key.pem", "keyfile": "key.pem",
"filelog": {"level": "debug"}, "filelog": {"level": "debug"},
"name": "org.example.warden_client", "name": "org.example.warden_client"
"secret": "ToP_SeCrEt"
} }
...@@ -42,13 +42,13 @@ B. Dependencies ...@@ -42,13 +42,13 @@ B. Dependencies
2. Python modules 2. Python modules
python-mysqldb 5.3.3+ python-mysqldb 5.3.3+ | python-psycopg2 2.8.6+
python-m2crypto 0.20+ python-m2crypto 0.20+
jsonschema 2.4+ jsonschema 2.4+
3. Database 3. Database
MySQL | MariaDB >= 5.5 MySQL | MariaDB >= 5.5 | PostgreSQL >= 13
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
C. Installation C. Installation
...@@ -70,14 +70,28 @@ C. Installation ...@@ -70,14 +70,28 @@ C. Installation
> GRANT ALL ON warden3.* TO `warden`@`localhost`; > GRANT ALL ON warden3.* TO `warden`@`localhost`;
> FLUSH PRIVILEGES; > FLUSH PRIVILEGES;
# psql
> CREATE DATABASE warden3 ENCODING 'UTF-8';
> CREATE ROLE "warden" LOGIN PASSWORD 'example';
> GRANT ALL ON DATABASE "warden3" TO "warden";
* Create necessary table structure * Create necessary table structure
mysql -p -u warden warden3 < warden_3.0.sql mysql -p -u warden warden3 < warden_3.0_mysql.sql
or
psql -U warden -h localhost warden3 < warden_3.0_postgres.sql
* Get up to date Idea schema * Get up to date Idea schema
wget -O warden_server/idea.schema https://idea.cesnet.cz/_media/en/idea0.schema wget -O warden_server/idea.schema https://idea.cesnet.cz/_media/en/idea0.schema
* Load category and tag maps into database (This step is optional for MySQL dbms)
./warden_server.py loadmaps
* Enable mod_wsgi, mod_ssl, include Warden configuration * Enable mod_wsgi, mod_ssl, include Warden configuration
This depends heavily on your distribution and Apache configuration. This depends heavily on your distribution and Apache configuration.
...@@ -129,7 +143,7 @@ particular implementation object of the aspect, for example type of logger ...@@ -129,7 +143,7 @@ particular implementation object of the aspect, for example type of logger
Sections and their "type" objects can be: Sections and their "type" objects can be:
Log: FileLogger, SysLogger Log: FileLogger, SysLogger
DB: MySQL DB: MySQL, PostgreSQL
Auth: X509Authenticator, X509NameAuthenticator, Auth: X509Authenticator, X509NameAuthenticator,
X509MixMatchAuthenticator,PlainAuthenticator X509MixMatchAuthenticator,PlainAuthenticator
Validator: JSONSchemaValidator, NoValidator Validator: JSONSchemaValidator, NoValidator
...@@ -186,22 +200,36 @@ object from particular section list is used ("FileLogger" for example). ...@@ -186,22 +200,36 @@ object from particular section list is used ("FileLogger" for example).
retry_count: number of retries, defaults to 3 retry_count: number of retries, defaults to 3
event_size_limit: max size of serialized event, defaults to 5 MB event_size_limit: max size of serialized event, defaults to 5 MB
catmap_filename: IDEA category mapping to database ids, defaults to catmap_filename: IDEA category mapping to database ids, defaults to
"catmap_mysql.json" at installation directory "catmap_db.json" at installation directory
tagmap_filename": IDEA node type mapping to database ids, defaults to
"tagmap_db.json" at installation directory
PostgreSQL: database storage backend
host: database server host, default "localhost"
user: database user, default "warden"
password: database password
dbname: database name, default "warden3"
port: database server port, default 5432
retry_pause: retry in case of database errors, in seconds, defaults to 5
retry_count: number of retries, defaults to 3
event_size_limit: max size of serialized event, defaults to 5 MB
catmap_filename: IDEA category mapping to database ids, defaults to
"catmap_db.json" at installation directory
tagmap_filename": IDEA node type mapping to database ids, defaults to tagmap_filename": IDEA node type mapping to database ids, defaults to
"tagmap_mysql.json" at installation directory "tagmap_db.json" at installation directory
WardenHandler: Main Warden RPC worker WardenHandler: Main Warden RPC worker
send_events_limit: max events sent in one bunch, defaults to 10000 send_events_limit: max events sent in one bunch, defaults to 10000
get_events_limit: max events received in one bunch, defaults to 10000 get_events_limit: max events received in one bunch, defaults to 10000
description: human readable description, sent in server info description: human readable description, sent in server info
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
E. Command line E. Command line
When run from command line, server offers set of commands and options for When run from command line, server offers set of commands and options for
runtime and database management. You can also use --help option for each runtime and database management. You can also use --help option for each
command and for server itself. command and for server itself.
warden_server.py [--help] [-c CONFIG] <command> warden_server.py [--help] [-c CONFIG] <command>
optional arguments: optional arguments:
...@@ -285,11 +313,9 @@ warden_server.py purge [--help] [-l] [-e] [-d DAYS] ...@@ -285,11 +313,9 @@ warden_server.py purge [--help] [-l] [-e] [-d DAYS]
warden_server.py loadmaps [--help] warden_server.py loadmaps [--help]
Load 'categories' and 'tags' table from 'catmap_mysql.json' and Load 'categories' and 'tags' table from 'catmap_db.json' and
'tagmap_mysql.json'. Note that this is NOT needed for server at all, load 'tagmap_db.json'. Note also that previous content of both tables
them into db at will, should you need to run your own specific SQL queries will be lost.
on data directly. Note also that previous content of both tables will be
lost.
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
Copyright (C) 2011-2015 Cesnet z.s.p.o Copyright (C) 2011-2015 Cesnet z.s.p.o
...@@ -21,7 +21,8 @@ B. Compatibility ...@@ -21,7 +21,8 @@ B. Compatibility
* The test suite, just like the Warden Server, is compatible with both Python2 * The test suite, just like the Warden Server, is compatible with both Python2
(tested on 2.7) and Python3 (tested on 3.6). (tested on 2.7) and Python3 (tested on 3.6).
* Just like Warden Server, the test suite requires a local MySQL installation. * Just like Warden Server, the test suite requires a local MySQL or PostgreSQL
installation.
* It is safe to run the test suite on a production system. For testing, * It is safe to run the test suite on a production system. For testing,
a database distinct from the default production one is used. Also, the user a database distinct from the default production one is used. Also, the user
account used for accessing the testing database is set for local login only. account used for accessing the testing database is set for local login only.
...@@ -56,7 +57,11 @@ D. Usage ...@@ -56,7 +57,11 @@ D. Usage
Before running the tests (for the first time), a DB user with required rights Before running the tests (for the first time), a DB user with required rights
must be created. An easy way to do it is: must be created. An easy way to do it is:
./test_warden_server.py --init ./test_warden_server.py --init
This will prompt for MySQL root password. This will prompt for the database administrator account ('root' for MySQL and
'postgres' for PostgreSQL) password. Please note that by default, the user
'postgres' can only be authenticated using the peer authentication method,
which requires that the script is run by the operating system user 'postgres'.
When this is the case, the password is not required.
Standard usage for testing: Standard usage for testing:
./test_warden_server.py ./test_warden_server.py
...@@ -64,16 +69,22 @@ Standard usage for testing: ...@@ -64,16 +69,22 @@ Standard usage for testing:
Advanced usage: Advanced usage:
./test_warden_server.py --help ./test_warden_server.py --help
usage: test_warden_server.py [-h] [-i] [-n] usage: test_warden_server.py [-h] [-i] [-n] [-d {MySQL,PostgreSQL}]
Warden3 Server Test Suite Warden3 Server Test Suite
optional arguments: optional arguments:
-h, --help show this help message and exit -h, --help show this help message and exit
-i, --init Set up an user with rights to CREATE/DROP the -d {MySQL,PostgreSQL}, --dbms {MySQL,PostgreSQL}
test database Database management system to use for
-n, --nopurge Skip the database purge after running the tests testing
-i, --init Set up an user with rights to
CREATE/DROP the test database
-n, --nopurge Skip the database purge after running
the tests
Option -d (--dbms) sets the databse management system to use for testing.
If this option is not provided, MySQL is used as default.
Option -n (--nopurge) is meant for debugging purposes and test development, it Option -n (--nopurge) is meant for debugging purposes and test development, it
keeps the test database around for inspection after running the tests. keeps the test database around for inspection after running the tests.
......
...@@ -6,9 +6,8 @@ import argparse ...@@ -6,9 +6,8 @@ import argparse
import getpass import getpass
import sys import sys
import warnings import warnings
import json
from os import path from os import path
from copy import deepcopy
import MySQLdb as my
from warden_server import build_server from warden_server import build_server
import warden_server import warden_server
...@@ -26,48 +25,19 @@ USER = 'warden3test' ...@@ -26,48 +25,19 @@ USER = 'warden3test'
PASSWORD = 'h7w*D>4B)3omcvLM$oJp' PASSWORD = 'h7w*D>4B)3omcvLM$oJp'
DB = 'w3test' DB = 'w3test'
def setUpModule(): # pylint: disable = locally-disabled, invalid-name def setUpModule(): # pylint: disable = locally-disabled, invalid-name
"""Initialize the test database""" """Initialize the test database"""
print(__doc__) print(__doc__)
conn = None DBMS.set_up()
try:
conn = my.connect(user=USER, passwd=PASSWORD)
cur = conn.cursor()
with warnings.catch_warnings(): # The database is not supposed to exist
warnings.simplefilter("ignore")
cur.execute("DROP DATABASE IF EXISTS %s" % (DB,)) # NOT SECURE
cur.execute("CREATE DATABASE %s" % (DB,)) # NOT SECURE
cur.execute("USE %s" % (DB,)) # NOT SECURE
with open(path.join(path.dirname(__file__), 'warden_3.0.sql')) as script:
statements = ''.join([line.replace('\n', '') for line in script if line[0:2] != '--']).split(';')[:-1]
for statement in statements:
cur.execute(statement)
cur.execute("INSERT INTO clients VALUES(NULL, NOW(), 'warden-info@cesnet.cz', 'test.server.warden.cesnet.cz', NULL, 1, 'cz.cesnet.warden3test', 'abc', 1, 1, 1, 0)")
conn.commit()
except my.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
print('Setup failed, have you tried --init ? Original exception: %s' % (str(ex),))
exit()
finally:
if conn:
conn.close()
NO_PURGE = False NO_PURGE = False
DBMS = None
def tearDownModule(): # pylint: disable = locally-disabled, invalid-name def tearDownModule(): # pylint: disable = locally-disabled, invalid-name
"""Clean up by purging the test database""" """Clean up by purging the test database"""
if not NO_PURGE: if not NO_PURGE:
conn = my.connect(user=USER, passwd=PASSWORD) DBMS.tear_down()
cur = conn.cursor()
cur.execute("DROP DATABASE IF EXISTS %s" % (DB,)) # NOT SECURE
conn.commit()
conn.close()
class ReadableSTR(str): class ReadableSTR(str):
...@@ -123,8 +93,6 @@ class Request(object): ...@@ -123,8 +93,6 @@ class Request(object):
class Warden3ServerTest(unittest.TestCase): class Warden3ServerTest(unittest.TestCase):
"""High level Warden3 Server tests""" """High level Warden3 Server tests"""
config = {'log': {'level': 'debug'}, 'validator': {'type': 'NoValidator'}, 'auth': {'type': 'PlainAuthenticator'},
'db': {'user': USER, 'password': PASSWORD, 'dbname': DB}, 'handler': {'description': 'Warden Test Server'}}
getInfo_interface_tests_specific = [ getInfo_interface_tests_specific = [
("/getInfo", "403 I'm watching. Authenticate."), ("/getInfo", "403 I'm watching. Authenticate."),
...@@ -138,22 +106,26 @@ class Warden3ServerTest(unittest.TestCase): ...@@ -138,22 +106,26 @@ class Warden3ServerTest(unittest.TestCase):
("/getEvents?secret=123", "403 I'm watching. Authenticate.", None), ("/getEvents?secret=123", "403 I'm watching. Authenticate.", None),
] ]
@staticmethod
def get_config():
return {
'log': {'level': 'debug'},
'validator': {'type': 'NoValidator'},
'auth': {'type': 'PlainAuthenticator'},
'db': {'type': DBMS.name, 'user': USER, 'password': PASSWORD, 'dbname': DB},
'handler': {'description': 'Warden Test Server'}
}
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
"""Pre-test cleanup""" """Pre-test cleanup"""
cls.clean_lastid() cls.clean_lastid()
cls.app = build_server(cls.config) cls.app = build_server(cls.get_config())
@classmethod @classmethod
def clean_lastid(cls): def clean_lastid(cls):
"""Cleans the lastid information for all clients""" """Cleans the lastid information for all clients"""
conn = my.connect(user=USER, passwd=PASSWORD, db=DB) DBMS.clean_lastid()
cur = conn.cursor()
cur.execute("DELETE FROM events")
cur.execute("DELETE FROM last_events")
cur.close()
conn.commit()
conn.close()
def test_getInfo_interface(self): # pylint: disable = locally-disabled, invalid-name def test_getInfo_interface(self): # pylint: disable = locally-disabled, invalid-name
"""Tests the getInfo method invocation""" """Tests the getInfo method invocation"""
...@@ -245,14 +217,22 @@ class Warden3ServerTest(unittest.TestCase): ...@@ -245,14 +217,22 @@ class Warden3ServerTest(unittest.TestCase):
class X509AuthenticatorTest(Warden3ServerTest): class X509AuthenticatorTest(Warden3ServerTest):
"""Performs the basic test suite using the X509Authenticator""" """Performs the basic test suite using the X509Authenticator"""
config = deepcopy(Warden3ServerTest.config)
config['auth']['type'] = 'X509Authenticator' @staticmethod
def get_config():
config = Warden3ServerTest.get_config()
config['auth']['type'] = 'X509Authenticator'
return config
class X509NameAuthenticatorTest(Warden3ServerTest): class X509NameAuthenticatorTest(Warden3ServerTest):
"""Performs the basic test suite using the X509NameAuthenticator""" """Performs the basic test suite using the X509NameAuthenticator"""
config = deepcopy(Warden3ServerTest.config)
config['auth']['type'] = 'X509NameAuthenticator' @staticmethod
def get_config():
config = Warden3ServerTest.get_config()
config['auth']['type'] = 'X509NameAuthenticator'
return config
getInfo_interface_tests_specific = [ getInfo_interface_tests_specific = [
("/getInfo", "200 OK"), ("/getInfo", "200 OK"),
...@@ -271,8 +251,13 @@ class WScliTest(unittest.TestCase): ...@@ -271,8 +251,13 @@ class WScliTest(unittest.TestCase):
"""Tester of the Warden Server command line interface""" """Tester of the Warden Server command line interface"""
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
cls.config = {'log': {'level': 'debug'}, 'validator': {'type': 'NoValidator'}, 'auth': {'type': 'PlainAuthenticator'}, cls.config = {
'db': {'user': USER, 'password': PASSWORD, 'dbname': DB}, 'handler': {'description': 'Warden Test Server'}} 'log': {'level': 'debug'},
'validator': {'type': 'NoValidator'},
'auth': {'type': 'PlainAuthenticator'},
'db': {'type': DBMS.name, 'user': USER, 'password': PASSWORD, 'dbname': DB},
'handler': {'description': 'Warden Test Server'}
}
warden_server.server = build_server(cls.config) warden_server.server = build_server(cls.config)
@staticmethod @staticmethod
...@@ -298,17 +283,6 @@ class WScliTest(unittest.TestCase): ...@@ -298,17 +283,6 @@ class WScliTest(unittest.TestCase):
sys.argv = argv_backup sys.argv = argv_backup
return ret, out.getvalue(), err.getvalue() return ret, out.getvalue(), err.getvalue()
@staticmethod
def do_sql_select(query, params):
"""Reads data from database"""
conn = my.connect(user=USER, passwd=PASSWORD, db=DB)
cur = conn.cursor()
cur.execute(query, params)
result = cur.fetchall()
cur.close()
conn.close()
return result
def test_list(self): def test_list(self):
"""Tests the list command line option""" """Tests the list command line option"""
tests = [ tests = [
...@@ -392,7 +366,6 @@ class WScliTest(unittest.TestCase): ...@@ -392,7 +366,6 @@ class WScliTest(unittest.TestCase):
(['modify', '-i', 'CLIENT_ID', '--note', 'Valid until:', '20.1.2038'], 2, (['modify', '-i', 'CLIENT_ID', '--note', 'Valid until:', '20.1.2038'], 2,
(('a@b', 'test3.warden.cesnet.cz', 'cz.cesnet.warden.test3', 'top_secret', 1, 1, 0, 1, 0, 'Valid until: 18.01.2038'),)), (('a@b', 'test3.warden.cesnet.cz', 'cz.cesnet.warden.test3', 'top_secret', 1, 1, 0, 1, 0, 'Valid until: 18.01.2038'),)),
] ]
test_sql = "SELECT requestor, hostname, name, secret, valid, clients.read, debug, clients.write, test, note FROM clients WHERE id = %s"
client_id = None client_id = None
for supplied_arguments, expected_return, expected_sql_result in tests: for supplied_arguments, expected_return, expected_sql_result in tests:
with self.subTest(supplied_arguments=supplied_arguments, expected_return=expected_return, expected_sql_result=expected_sql_result): with self.subTest(supplied_arguments=supplied_arguments, expected_return=expected_return, expected_sql_result=expected_sql_result):
...@@ -403,44 +376,303 @@ class WScliTest(unittest.TestCase): ...@@ -403,44 +376,303 @@ class WScliTest(unittest.TestCase):
client_id = int(out.split('\n')[-2].split(' ')[0]) client_id = int(out.split('\n')[-2].split(' ')[0])
except IndexError: # No modification was performed, keep the previous client_id except IndexError: # No modification was performed, keep the previous client_id
pass pass
result = self.do_sql_select(test_sql, (client_id,)) result = DBMS.do_sql_select(DBMS.reg_mod_test_query, (client_id,))
self.assertEqual(result, expected_sql_result) self.assertEqual(result, expected_sql_result)
def init_user(): class MySQL:
"""DB user rights setup""" name = "MySQL"
conn = None reg_mod_test_query = "SELECT requestor, hostname, name, secret, valid, clients.read, " \
try: "debug, clients.write, test, note FROM clients WHERE id = %s"
conn = my.connect(user='root', passwd=getpass.getpass('Enter MySQL Root password:'))
with conn.cursor() as cur: def __init__(self, user=USER, password=PASSWORD, dbname=DB):
cur.execute("CREATE USER IF NOT EXISTS %s@'localhost' IDENTIFIED BY %s", (USER, PASSWORD)) import MySQLdb as my
cur.execute("GRANT SELECT, INSERT, UPDATE, CREATE, DELETE, DROP ON *.* TO %s@'localhost'", (USER,)) self.my = my
self.user = user
self.password = password
self.dbname = dbname
def init_user(self):
"""DB user rights setup"""
conn = None
try:
conn = self.my.connect(user='root', passwd=getpass.getpass(
'Enter MySQL Root password:'))
with conn.cursor() as cur:
cur.execute(
"CREATE USER IF NOT EXISTS %s@'localhost' IDENTIFIED BY %s",
(self.user, self.password)
)
cur.execute(
"GRANT SELECT, INSERT, UPDATE, CREATE, DELETE, DROP ON *.* TO %s@'localhost'",
(self.user,)
)
conn.commit()
print("DB User set up successfuly")
except self.my.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
print('Connection unsuccessful, bad password? Original exception: %s' % (str(ex)))
exit()
except KeyboardInterrupt:
print("\nCancelled!")
exit()
finally:
if conn:
conn.close()
def set_up(self):
conn = None
try:
conn = self.my.connect(user=self.user, passwd=self.password)
cur = conn.cursor()
with warnings.catch_warnings(): # The database is not supposed to exist
warnings.simplefilter("ignore")
cur.execute("DROP DATABASE IF EXISTS %s" % (self.dbname,)) # NOT SECURE
cur.execute("CREATE DATABASE %s" % (self.dbname,)) # NOT SECURE
cur.execute("USE %s" % (self.dbname,)) # NOT SECURE
with open(path.join(path.dirname(__file__), 'warden_3.0_mysql.sql')) as script:
statements = ''.join(
[line.replace('\n', '') for line in script if line[0:2] != '--']
).split(';')[:-1]
for statement in statements:
cur.execute(statement)
cur.execute(
"INSERT INTO clients VALUES("
"NULL, NOW(), 'warden-info@cesnet.cz', 'test.server.warden.cesnet.cz',"
"NULL, 1, 'cz.cesnet.warden3test', 'abc', 1, 1, 1, 0"
")"
)
conn.commit()
except self.my.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
print('Setup failed, have you tried --init ? Original exception: %s' % (str(ex),))
exit()
finally:
if conn:
conn.close()
def do_sql_select(self, query, params):
"""Reads data from database"""
conn = self.my.connect(user=self.user, passwd=self.password, db=self.dbname)
cur = conn.cursor()
cur.execute(query, params)
result = cur.fetchall()
cur.close()
conn.close()
return result
def tear_down(self):
"""Clean up by purging the test database"""
conn = self.my.connect(user=self.user, passwd=self.password)
cur = conn.cursor()
cur.execute("DROP DATABASE IF EXISTS %s" % (self.dbname,)) # NOT SECURE
conn.commit() conn.commit()
print("DB User set up successfuly") conn.close()
except my.OperationalError as ex:
if conn: def clean_lastid(self):
conn.rollback() """Cleans the lastid information for all clients"""
conn.close() conn = self.my.connect(user=self.user, passwd=self.password, db=self.dbname)
conn = None cur = conn.cursor()
print('Connection unsuccessful, bad password? Original exception: %s' % (str(ex))) cur.execute("DELETE FROM last_events")
exit() cur.execute("DELETE FROM events")
except KeyboardInterrupt: cur.close()
print("\nCancelled!") conn.commit()
exit() conn.close()
finally:
if conn:
conn.close() class PostgreSQL:
name = "PostgreSQL"
reg_mod_test_query = "SELECT requestor, hostname, name, secret, valid, clients.read, " \
"debug, clients.write, test, note FROM clients WHERE id = %s"
def __init__(self, user=USER, password=PASSWORD, dbname=DB):
import psycopg2 as ppg
from psycopg2 import sql as ppgsql
self.ppg = ppg
self.ppgsql = ppgsql
self.user = user
self.password = password
self.dbname = dbname
def init_user(self):
"""DB user rights setup"""
running_as_postgres = getpass.getuser() == "postgres"
conn = None
try:
password = None if running_as_postgres else getpass.getpass("Enter PostgreSQL password for the user 'postgres':")
conn = self.ppg.connect(user="postgres", password=password)
with conn.cursor() as cur:
cur.execute(
self.ppgsql.SQL("CREATE ROLE {} PASSWORD {} CREATEDB LOGIN").format(
self.ppgsql.Identifier(self.user),
self.ppgsql.Placeholder()
),
(self.password,)
)
conn.commit()
print("DB User set up successfuly")
except self.ppg.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
if running_as_postgres:
print("Connection unsuccessful. Original exception: %s" % (str(ex)))
else:
print("Connection unsuccessful, bad password or meant to run as the user 'postgres'"
" (su postgres -c '%s --dbms PostgreSQL --init')? Original exception: %s" %
(path.join('.', path.normpath(sys.argv[0])), str(ex)))
exit()
except KeyboardInterrupt:
print("\nCancelled!")
exit()
finally:
if conn:
conn.close()
def _load_tags(self, cur):
with open(path.join(path.dirname(__file__), "tagmap_db.json")) as tagmapf:
tagmap = json.load(tagmapf)
for tag, num in tagmap.items():
cur.execute(
"INSERT INTO tags(id, tag) VALUES (%s, %s)", (num, tag))
def _load_cats(self, cur):
with open(path.join(path.dirname(__file__), "catmap_db.json")) as catmapf:
catmap = json.load(catmapf)
for cat_subcat, num in catmap.items():
catsplit = cat_subcat.split(".", 1)
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
cur.execute(
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES (%s, %s, %s, %s)",
(num, category, subcategory, cat_subcat)
)
def set_up(self):
conn = None
try:
conn = self.ppg.connect(user=self.user, password=self.password,
host='localhost', dbname='postgres')
conn.autocommit = True
cur = conn.cursor()
cur.execute(
self.ppgsql.SQL("DROP DATABASE IF EXISTS {}").format(
self.ppgsql.Identifier(self.dbname)
)
)
cur.execute(
self.ppgsql.SQL("CREATE DATABASE {}").format(
self.ppgsql.Identifier(self.dbname)
)
)
except self.ppg.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
print(
'Setup failed, have you tried --init ? Original exception: %s' % (str(ex),))
exit()
finally:
if conn:
conn.close()
conn = None
try:
conn = self.ppg.connect(user=self.user, password=self.password,
dbname=self.dbname, host='localhost')
cur = conn.cursor()
with open(path.join(path.dirname(__file__), 'warden_3.0_postgres.sql')) as script:
statements = script.read()
cur.execute(statements)
self._load_tags(cur)
self._load_cats(cur)
cur.execute(
"INSERT INTO clients "
"(registered, requestor, hostname, note, valid,"
" name, secret, read, debug, write, test) "
"VALUES(NOW(), 'warden-info@cesnet.cz', 'test.server.warden.cesnet.cz', "
"NULL, true, 'cz.cesnet.warden3test', 'abc', true, true, true, false)"
)
conn.commit()
except self.ppg.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
print(
'Something went wrong during database setup. Original exception: %s' % (str(ex),))
exit()
finally:
if conn:
conn.close()
def do_sql_select(self, query, params):
"""Reads data from database"""
conn = self.ppg.connect(user=self.user, password=self.password,
dbname=self.dbname, host='localhost')
cur = conn.cursor()
cur.execute(query, params)
result = cur.fetchall()
cur.close()
conn.close()
return tuple(result)
def tear_down(self):
"""Clean up by purging the test database"""
conn = self.ppg.connect(user=self.user, password=self.password,
dbname='postgres', host='localhost')
conn.autocommit = True
cur = conn.cursor()
cur.execute(
self.ppgsql.SQL("DROP DATABASE IF EXISTS {} WITH(FORCE)").format(
self.ppgsql.Identifier(self.dbname)
)
)
conn.close()
def clean_lastid(self):
"""Cleans the lastid information for all clients"""
conn = self.ppg.connect(
user=self.user, password=self.password, dbname=self.dbname, host='localhost')
cur = conn.cursor()
cur.execute("DELETE FROM last_events")
cur.execute("DELETE FROM events")
cur.close()
conn.commit()
conn.close()
database_types = {
'MySQL': MySQL,
'PostgreSQL': PostgreSQL
}
def main(): def main():
"""Parses arguments and acts accordingly""" """Parses arguments and acts accordingly"""
parser = argparse.ArgumentParser(description=__doc__) parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-d', '--dbms', default='MySQL', choices=database_types, help='Database management system to use for testing')
parser.add_argument('-i', '--init', action='store_true', help='Set up an user with rights to CREATE/DROP the test database') parser.add_argument('-i', '--init', action='store_true', help='Set up an user with rights to CREATE/DROP the test database')
parser.add_argument('-n', '--nopurge', action='store_true', help='Skip the database purge after running the tests') parser.add_argument('-n', '--nopurge', action='store_true', help='Skip the database purge after running the tests')
args = parser.parse_args() args = parser.parse_args()
global DBMS # pylint: disable = locally-disabled, global-statement
DBMS = database_types[args.dbms](USER, PASSWORD, DB)
if args.init: if args.init:
init_user() DBMS.init_user()
else: else:
if args.nopurge: if args.nopurge:
global NO_PURGE # pylint: disable = locally-disabled, global-statement global NO_PURGE # pylint: disable = locally-disabled, global-statement
......
SET TimeZone='+00:00';
-- ---------------------------------------------------------
--
-- Database: "warden3"
--
-- --------------------------------------------------------
--
-- Table structure for table "categories"
--
CREATE TABLE IF NOT EXISTS "categories" (
"id" int NOT NULL UNIQUE,
"category" text NOT NULL,
"subcategory" text DEFAULT NULL,
"cat_subcat" text NOT NULL
);
CREATE INDEX IF NOT EXISTS "cat_sub" ON "categories" ("cat_subcat");
-- --------------------------------------------------------
--
-- Table structure for table "clients"
--
CREATE TABLE IF NOT EXISTS "clients" (
"id" SERIAL PRIMARY KEY,
"registered" timestamp NOT NULL DEFAULT '1970-01-01 00:00:00',
"requestor" text NOT NULL,
"hostname" text NOT NULL,
"note" text NULL,
"valid" boolean NOT NULL DEFAULT true,
"name" text NOT NULL,
"secret" text NULL,
"read" boolean NOT NULL DEFAULT true,
"debug" boolean NOT NULL DEFAULT false,
"write" boolean NOT NULL DEFAULT false,
"test" boolean NOT NULL DEFAULT false
);
CREATE INDEX IF NOT EXISTS "clients_1" ON "clients" ("valid", "secret", "hostname");
CREATE INDEX IF NOT EXISTS "clients_2" ON "clients" ("valid", "name");
-- --------------------------------------------------------
--
-- Table structure for table "events"
--
CREATE TABLE IF NOT EXISTS "events" (
"id" bigint PRIMARY KEY GENERATED ALWAYS AS IDENTITY (MINVALUE 2),
"received" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"client_id" int NOT NULL REFERENCES "clients" ("id"),
"data" bytea NOT NULL,
"valid" boolean NOT NULL DEFAULT true
);
CREATE INDEX IF NOT EXISTS "id" ON "events" ("id", "client_id");
CREATE INDEX IF NOT EXISTS "received" ON "events" ("received");
-- --------------------------------------------------------
--
-- Table structure for table "event_category_mapping"
--
CREATE TABLE IF NOT EXISTS "event_category_mapping" (
"event_id" bigint NOT NULL REFERENCES "events" ("id") ON DELETE CASCADE,
"category_id" int NOT NULL,
PRIMARY KEY ("event_id", "category_id"),
CONSTRAINT "event_category_mapping_category_id_fk" FOREIGN KEY ("category_id") REFERENCES "categories" ("id")
);
-- --------------------------------------------------------
--
-- Table structure for table "last_events"
--
CREATE TABLE IF NOT EXISTS "last_events" (
"id" SERIAL PRIMARY KEY,
"client_id" int NOT NULL REFERENCES "clients" ("id"),
"event_id" bigint REFERENCES "events" ("id"),
"timestamp" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS "client_id" ON "last_events" ("client_id", "event_id");
-- --------------------------------------------------------
--
-- Table structure for table "tags"
--
CREATE TABLE IF NOT EXISTS "tags" (
"id" int NOT NULL UNIQUE,
"tag" text NOT NULL
);
CREATE INDEX IF NOT EXISTS "id_tag_name" ON "tags" ("id", "tag");
CREATE INDEX IF NOT EXISTS "tag_name" ON "tags" ("tag");
-- --------------------------------------------------------
--
-- Table structure for table "event_tag_mapping"
--
CREATE TABLE IF NOT EXISTS "event_tag_mapping" (
"event_id" bigint NOT NULL REFERENCES "events" ("id") ON DELETE CASCADE,
"tag_id" int NOT NULL,
PRIMARY KEY ("event_id", "tag_id"),
CONSTRAINT "event_tag_mapping_tag_id_fk" FOREIGN KEY ("tag_id") REFERENCES "tags" ("id")
);
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
from __future__ import print_function from __future__ import print_function
import abc
import sys import sys
import os import os
import io import io
...@@ -16,11 +17,10 @@ import json ...@@ -16,11 +17,10 @@ import json
import re import re
from traceback import format_tb from traceback import format_tb
from collections import namedtuple from collections import namedtuple
from itertools import repeat
from time import sleep from time import sleep
from random import randint from random import randint
import M2Crypto.X509 import M2Crypto.X509
import MySQLdb as my
import MySQLdb.cursors as mycursors
if sys.version_info[0] >= 3: if sys.version_info[0] >= 3:
import configparser as ConfigParser import configparser as ConfigParser
...@@ -75,6 +75,9 @@ class Error(Exception): ...@@ -75,6 +75,9 @@ class Error(Exception):
next_msg = e.get("message", "Unknown error").replace("\n", " ") next_msg = e.get("message", "Unknown error").replace("\n", " ")
if msg != next_msg: if msg != next_msg:
msg = "Multiple errors" msg = "Multiple errors"
msg = "".join((c if '\x20' <= c != '\x7f' else r'\x{:02x}'.format(ord(c))) for c in msg) # escape control characters
return err, msg return err, msg
def __str__(self): def __str__(self):
...@@ -469,12 +472,13 @@ class JSONSchemaValidator(NoValidator): ...@@ -469,12 +472,13 @@ class JSONSchemaValidator(NoValidator):
return res return res
class MySQL(ObjectBase): class DataBase(ObjectBase):
def __init__( def __init__(
self, req, log, host, user, password, dbname, port, retry_count, self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename): retry_pause, event_size_limit, catmap_filename, tagmap_filename):
ObjectBase.__init__(self, req, log) ObjectBase.__init__(self, req, log)
self.host = host self.host = host
self.user = user self.user = user
self.password = password self.password = password
...@@ -495,12 +499,12 @@ class MySQL(ObjectBase): ...@@ -495,12 +499,12 @@ class MySQL(ObjectBase):
self.tagmap = json.load(tagmap_fd) self.tagmap = json.load(tagmap_fd)
self.tagmap_other = self.tagmap["Other"] # Catch error soon, avoid lookup later self.tagmap_other = self.tagmap["Other"] # Catch error soon, avoid lookup later
self.db = None
self.con = None self.con = None
@abc.abstractmethod
def connect(self): def connect(self):
self.con = my.connect( pass
host=self.host, user=self.user, passwd=self.password,
db=self.dbname, port=self.port, cursorclass=mycursors.DictCursor)
def close(self): def close(self):
try: try:
...@@ -510,7 +514,8 @@ class MySQL(ObjectBase): ...@@ -510,7 +514,8 @@ class MySQL(ObjectBase):
pass pass
self.con = None self.con = None
__del__ = close def __del__(self):
self.close()
def repeat(self): def repeat(self):
""" Allows for graceful repeating of transactions self.retry_count """ Allows for graceful repeating of transactions self.retry_count
...@@ -521,8 +526,7 @@ class MySQL(ObjectBase): ...@@ -521,8 +526,7 @@ class MySQL(ObjectBase):
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
crs = db.query(...) res = db.query_all(...)
# do something with crs
Note that it's not reentrant (as is not underlying MySQL Note that it's not reentrant (as is not underlying MySQL
connection), so avoid nesting on the same MySQL object. connection), so avoid nesting on the same MySQL object.
...@@ -540,8 +544,7 @@ class MySQL(ObjectBase): ...@@ -540,8 +544,7 @@ class MySQL(ObjectBase):
exception. Can be used with self.repeat(), or alone as: exception. Can be used with self.repeat(), or alone as:
with self as db: with self as db:
crs = db.query(...) res = db.query_all(...)
# do something with crs
Note that it's not reentrant (as is not underlying MySQL Note that it's not reentrant (as is not underlying MySQL
connection), so avoid nesting on the same MySQL object. connection), so avoid nesting on the same MySQL object.
...@@ -557,24 +560,25 @@ class MySQL(ObjectBase): ...@@ -557,24 +560,25 @@ class MySQL(ObjectBase):
open transaction is rolled back. open transaction is rolled back.
In case of no exception, transaction gets commited. In case of no exception, transaction gets commited.
""" """
if not exc_type: if exc_type is None:
self.con.commit() self.con.commit()
self.retry_attempt = 0 self.retry_attempt = 0
else: else:
try: try:
if self.con: if self.con is not None:
self.con.rollback() self.con.rollback()
except my.Error: except self.db.Error:
pass pass
try: try:
self.close() self.close()
except my.Error: except self.db.Error:
pass pass
if self.retry_attempt: if self.retry_attempt > 0:
self.log.info("Database error (%d attempts left): %s %s" % (self.retry_attempt, exc_type.__name__, exc_val)) self.log.info("Database error (%d attempts left): %s %s" %
(self.retry_attempt, exc_type.__name__, exc_val))
return True return True
def query(self, *args, **kwargs): def _query(self, *args, **kwargs):
if not self.con: if not self.con:
self.connect() self.connect()
crs = self.con.cursor() crs = self.con.cursor()
...@@ -582,85 +586,105 @@ class MySQL(ObjectBase): ...@@ -582,85 +586,105 @@ class MySQL(ObjectBase):
crs.execute(*args, **kwargs) crs.execute(*args, **kwargs)
return crs return crs
def _query_multiple(self, query, params, ret, fetch):
res = None
for n, (q, p) in enumerate(zip(query, params)):
cur = self._query(q, p)
if n == ret:
res = fetch(cur)
if ret == -1: # fetch the result of the last query
res = fetch(cur)
return res
def execute(self, query, params, ret=None):
"""Execute the provided queries; discard the result"""
self._query_multiple(query, params, None, None)
def query_all(self, query, params, ret=-1):
"""Execute the provided queries; return list of all rows as dicts of the ret-th query (0 based)"""
return self._query_multiple(query, params, ret, lambda cur: cur.fetchall())
def query_one(self, query, params, ret=-1):
"""Execute the provided queries; return the first result of the ret-th query (0 based)"""
return self._query_multiple(query, params, ret, lambda cur: cur.fetchone())
def query_rowcount(self, query, params, ret=-1):
"""Execute provided query; return the number of affected rows or the number of returned rows of the ret-th query (0 based)"""
return self._query_multiple(query, params, ret, lambda cur: cur.rowcount)
def _get_comma_perc(self, l): def _get_comma_perc(self, l):
return ','.join(['%s'] * len(l)) return ",".join(repeat("%s", l if isinstance(l, int) else len(l)))
def _get_comma_perc_n(self, n, l):
return ", ".join(repeat("(%s)" % self._get_comma_perc(n), len(l)))
def _get_not(self, b): def _get_not(self, b):
return "" if b else "NOT" return "" if b else "NOT"
@abc.abstractmethod
def _build_get_client_by_name(self, cert_names, name, secret):
"""Build query and params for client lookup"""
def get_client_by_name(self, cert_names=None, name=None, secret=None): def get_client_by_name(self, cert_names=None, name=None, secret=None):
query = ["SELECT * FROM clients WHERE valid = 1"] query, params, ret = self._build_get_client_by_name(cert_names, name, secret)
params = []
if name:
query.append(" AND name = %s")
params.append(name.lower())
if secret:
query.append(" AND secret = %s")
params.append(secret)
if cert_names:
query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
params.extend(n.lower() for n in cert_names)
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
rows = db.query("".join(query), params).fetchall() rows = db.query_all(query, params, ret)
if len(rows) > 1: if len(rows) > 1:
self.log.warning( self.log.warning(
"get_client_by_name: query returned more than one result (cert_names = %s, name = %s, secret = %s): %s" % ( "get_client_by_name: query returned more than one result (cert_names = %s, name = %s, secret = %s): %s" %
cert_names, name, secret, ", ".join([str(Client(**row)) for row in rows]))) (cert_names, name, secret, ", ".join([str(Client(**row)) for row in rows]))
)
return None return None
return Client(**rows[0]) if rows else None return Client(**rows[0]) if rows else None
@abc.abstractmethod
def _build_get_clients(self, id):
"""Build query and params for client lookup by id"""
def get_clients(self, id=None): def get_clients(self, id=None):
query = ["SELECT * FROM clients"] query, params, ret = self._build_get_clients(id)
params = []
if id:
query.append("WHERE id = %s")
params.append(id)
query.append("ORDER BY id")
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
rows = db.query(" ".join(query), params).fetchall() rows = db.query_all(query, params, ret=ret)
return [Client(**row) for row in rows] return [Client(**row) for row in rows]
@abc.abstractmethod
def _build_add_modify_client(self, id, **kwargs):
"""Build query and params for adding/modifying client"""
def add_modify_client(self, id=None, **kwargs): def add_modify_client(self, id=None, **kwargs):
query = [] if id is not None and all(kwargs.get(attr, None) is None for attr in set(Client._fields) - {"id", "registered"}):
params = []
uquery = []
if id is None:
query.append("INSERT INTO clients SET")
uquery.append("registered = now()")
else:
query.append("UPDATE clients SET")
for attr in set(Client._fields) - set(["id", "registered"]):
val = kwargs.get(attr, None)
if val is not None:
if attr == "secret" and val == "": # disable secret
val = None
uquery.append("`%s` = %%s" % attr)
params.append(val)
if not uquery:
return id return id
query.append(", ".join(uquery))
if id is not None: query, params, ret = self._build_add_modify_client(id, **kwargs)
query.append("WHERE id = %s")
params.append(id)
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
crs = db.query(" ".join(query), params) res_id = db.query_one(query, params, ret=ret)["id"]
newid = crs.lastrowid if id is None else id newid = res_id if id is None else id
return newid return newid
@abc.abstractmethod
def _build_get_debug_version(self):
pass
@abc.abstractmethod
def _build_get_debug_tablestat(self):
pass
def get_debug(self): def get_debug(self):
vquery, vparams, vret = self._build_get_debug_version()
tquery, tparams, tret = self._build_get_debug_tablestat()
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
rows = db.query("SELECT VERSION() AS VER").fetchall()
tablestat = db.query("SHOW TABLE STATUS").fetchall()
return { return {
"db": "MySQL", "db": type(self).__name__,
"version": rows[0]["VER"], "version": db.query_one(vquery, vparams, vret)["version"],
"tables": tablestat "tables": db.query_all(tquery, tparams, tret)
} }
def getMaps(self, section, variables): def getMaps(self, section, variables):
...@@ -671,10 +695,21 @@ class MySQL(ObjectBase): ...@@ -671,10 +695,21 @@ class MySQL(ObjectBase):
except KeyError: except KeyError:
raise self.req.error( raise self.req.error(
message="Wrong tag or category used in query.", message="Wrong tag or category used in query.",
error=422, exc=sys.exc_info(), key=v) error=422, exc=sys.exc_info(), key=v
)
maps.append(mapped) maps.append(mapped)
return set(maps) # unique return set(maps) # unique
@abc.abstractmethod
def _build_fetch_events(
self, client, id, count,
cat, nocat, tag, notag, group, nogroup):
"""Build query and params for fetching events based on id, count and category, tag and group filters"""
@abc.abstractmethod
def _load_event_json(self, data):
"""Return decoded json from data loaded from database, if unable to decode, return None"""
def fetch_events( def fetch_events(
self, client, id, count, self, client, id, count,
cat=None, nocat=None, cat=None, nocat=None,
...@@ -694,43 +729,16 @@ class MySQL(ObjectBase): ...@@ -694,43 +729,16 @@ class MySQL(ObjectBase):
message="Unrealizable conditions. Choose group or nogroup option.", message="Unrealizable conditions. Choose group or nogroup option.",
error=422, group=group, nogroup=nogroup) error=422, group=group, nogroup=nogroup)
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"] query, params, ret = self._build_fetch_events(
params = [id or 0] client, id, count,
cat, nocat,
if cat or nocat: tag, notag,
cats = self.getMaps(self.catmap, (cat or nocat)) group, nogroup
query.append( )
" AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" % (
self._get_not(cat), self._get_comma_perc(cats)))
params.extend(cats)
if tag or notag:
tags = self.getMaps(self.tagmap, (tag or notag))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" % (
self._get_not(tag), self._get_comma_perc(tags)))
params.extend(tags)
if group or nogroup:
subquery = []
for name in (group or nogroup):
escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE
subquery.append("c.name = %s") # exact client
params.append(name)
subquery.append("c.name LIKE CONCAT(%s, '.%%') ESCAPE '&'") # whole subtree
params.append(escaped_name)
query.append(" AND %s (%s)" % (self._get_not(group), " OR ".join(subquery)))
query.append(" AND e.valid = 1 LIMIT %s")
params.append(count)
query_string = "".join(query)
row = None row = None
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
row = db.query(query_string, params).fetchall() row = db.query_all(query, params, ret=ret)
if row: if row:
maxid = max(r['id'] for r in row) maxid = max(r['id'] for r in row)
...@@ -739,9 +747,8 @@ class MySQL(ObjectBase): ...@@ -739,9 +747,8 @@ class MySQL(ObjectBase):
events = [] events = []
for r in row: for r in row:
try: e = self._load_event_json(r["data"])
e = json.loads(r["data"]) if e is None: # null cannot be valid event JSON
except Exception:
# Note that we use Error object just for proper formatting, # Note that we use Error object just for proper formatting,
# but do not raise it; from client perspective invalid # but do not raise it; from client perspective invalid
# events get skipped silently. # events get skipped silently.
...@@ -749,114 +756,685 @@ class MySQL(ObjectBase): ...@@ -749,114 +756,685 @@ class MySQL(ObjectBase):
message="Unable to deserialize JSON event from db, id=%s" % r["id"], message="Unable to deserialize JSON event from db, id=%s" % r["id"],
error=500, exc=sys.exc_info(), id=r["id"]) error=500, exc=sys.exc_info(), id=r["id"])
err.log(self.log, prio=logging.WARNING) err.log(self.log, prio=logging.WARNING)
events.append(e) else:
events.append(e)
return { return {
"lastid": maxid, "lastid": maxid,
"events": events "events": events
} }
@abc.abstractmethod
def _build_store_events_event(self, client, event, raw_event):
"""Build query and params for event insertion"""
@abc.abstractmethod
def _build_store_events_categories(self, event_id, cat_ids):
"""Build query and params for insertion of event-categories mapping"""
@abc.abstractmethod
def _build_store_events_tags(self, event_id, tag_ids):
"""Build query and params for insertion of event-tags mapping"""
def store_events(self, client, events, events_raw): def store_events(self, client, events, events_raw):
try: try:
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
for event, raw_event in zip(events, events_raw): for event, raw_event in zip(events, events_raw):
lastid = db.query( equery, eparams, eret = self._build_store_events_event(client, event, raw_event)
"INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)", lastid = db.query_one(equery, eparams, ret=eret)["id"]
(client.id, raw_event)).lastrowid
catlist = event.get('Category', ["Other"]) catlist = event.get('Category', ["Other"])
cats = set(catlist) | set(cat.split(".", 1)[0] for cat in catlist) cats = set(catlist) | {cat.split(".", 1)[0] for cat in catlist}
for cat in cats: cat_ids = [self.catmap.get(cat, self.catmap_other) for cat in cats]
cat_id = self.catmap.get(cat, self.catmap_other) cquery, cparams, _ = self._build_store_events_categories(lastid, cat_ids)
db.query("INSERT INTO event_category_mapping (event_id,category_id) VALUES (%s, %s)", (lastid, cat_id)) db.execute(cquery, cparams)
nodes = event.get('Node', []) nodes = event.get('Node', [])
tags = [] tags = {tag for node in nodes for tag in node.get('Type', [])}
for node in nodes: if tags:
tags.extend(node.get('Type', [])) tag_ids = [self.tagmap.get(tag, self.tagmap_other) for tag in tags]
for tag in set(tags): tquery, tparams, _ = self._build_store_events_tags(lastid, tag_ids)
tag_id = self.tagmap.get(tag, self.tagmap_other) db.execute(tquery, tparams)
db.query("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES (%s, %s)", (lastid, tag_id))
return [] return []
except Exception as e: except Exception as e:
exception = self.req.error(message="DB error", error=500, exc=sys.exc_info(), env=self.req.env) exception = self.req.error(message="DB error", error=500, exc=sys.exc_info(), env=self.req.env)
exception.log(self.log) exception.log(self.log)
return [{"error": 500, "message": "DB error %s" % type(e).__name__}] return [{"error": 500, "message": "DB error %s" % type(e).__name__}]
@abc.abstractmethod
def _build_insert_last_received_id(self, client, id):
"""Build query and params for insertion of the last event id received by client"""
def insertLastReceivedId(self, client, id): def insertLastReceivedId(self, client, id):
self.log.debug("insertLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname)) self.log.debug("insertLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname))
query, params, _ = self._build_insert_last_received_id(client, id)
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
db.query("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())", (client.id, id)) db.execute(query, params)
@abc.abstractmethod
def _build_get_last_event_id(self):
"""Build query and params for querying the id of the last inserted event"""
def getLastEventId(self): def getLastEventId(self):
query, params, ret = self._build_get_last_event_id()
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
row = db.query("SELECT MAX(id) as id FROM events").fetchall()[0] id_ = db.query_one(query, params, ret=ret)["id"]
return row['id'] or 1 return id_ or 1
@abc.abstractmethod
def _build_get_last_received_id(self, client):
"""Build query and params for querying the last event id received by client"""
def getLastReceivedId(self, client): def getLastReceivedId(self, client):
query, params, ret = self._build_get_last_received_id(client)
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
res = db.query( res = db.query_one(query, params, ret=ret)
"SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1",
(client.id,)).fetchall() if res is None:
try:
row = res[0]
except IndexError:
id = None id = None
self.log.debug("getLastReceivedId: probably first access, unable to get id for client %i(%s)" % ( self.log.debug("getLastReceivedId: probably first access, unable to get id for client %i(%s)" %
client.id, client.hostname)) (client.id, client.hostname))
else: else:
id = row["id"] id = res["id"] or 1
self.log.debug("getLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname)) self.log.debug("getLastReceivedId: id %i for client %i(%s)" %
(id, client.id, client.hostname))
return id return id
@abc.abstractmethod
def _build_load_maps_tags(self):
"""Build query and params for updating the tag map"""
@abc.abstractmethod
def _build_load_maps_cats(self):
"""Build query and params for updating the catetgory map"""
def load_maps(self): def load_maps(self):
tquery, tparams, _ = self._build_load_maps_tags()
cquery, cparams, _ = self._build_load_maps_cats()
with self as db: with self as db:
db.query("DELETE FROM tags") db.execute(tquery, tparams)
for tag, num in self.tagmap.items(): db.execute(cquery, cparams)
db.query("INSERT INTO tags(id, tag) VALUES (%s, %s)", (num, tag))
db.query("DELETE FROM categories") @abc.abstractmethod
for cat_subcat, num in self.catmap.items(): def _build_purge_lastlog(self, days):
catsplit = cat_subcat.split(".", 1) """Build query and params for purging stored client last event mapping older than days"""
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
db.query(
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES (%s, %s, %s, %s)",
(num, category, subcategory, cat_subcat))
def purge_lastlog(self, days): def purge_lastlog(self, days):
query, params, ret = self._build_purge_lastlog(days)
with self as db:
return db.query_rowcount(query, params, ret=ret)
@abc.abstractmethod
def _build_purge_events_get_id(self, days):
"""Build query and params to get largest event id of events older than days"""
@abc.abstractmethod
def _build_purge_events_events(self, id_):
"""Build query and params to remove events older then days and their mappings"""
def purge_events(self, days):
iquery, iparams, iret = self._build_purge_events_get_id(days)
with self as db: with self as db:
return db.query( id_ = db.query_one(iquery, iparams, ret=iret)["id"]
if id_ is None:
return 0
equery, eparams, eret = self._build_purge_events_events(id_)
affected = db.query_rowcount(equery, eparams, ret=eret)
return affected
DataBase = abc.ABCMeta("DataBase", (DataBase,), {})
class MySQL(DataBase):
def __init__(
self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename):
super(DataBase, self).__init__(req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename)
import MySQLdb as db
import MySQLdb.cursors as mycursors
self.db = db
self.mycursors = mycursors
def connect(self):
self.con = self.db.connect(
host=self.host, user=self.user, passwd=self.password,
db=self.dbname, port=self.port, cursorclass=self.mycursors.DictCursor)
def _build_get_client_by_name(self, cert_names=None, name=None, secret=None):
"""Build query and params for client lookup"""
query = ["SELECT * FROM clients WHERE valid = 1"]
params = []
if name:
query.append(" AND name = %s")
params.append(name.lower())
if secret:
query.append(" AND secret = %s")
params.append(secret)
if cert_names:
query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
params.extend(n.lower() for n in cert_names)
return ["".join(query)], [params], 0
def _build_get_clients(self, id):
"""Build query and params for client lookup by id"""
query = ["SELECT * FROM clients"]
params = []
if id:
query.append("WHERE id = %s")
params.append(id)
query.append("ORDER BY id")
return [" ".join(query)], [params], 0
def _build_add_modify_client(self, id, **kwargs):
"""Build query and params for adding/modifying client"""
query = []
params = []
uquery = []
if id is None:
query.append("INSERT INTO clients SET")
uquery.append("registered = now()")
else:
query.append("UPDATE clients SET")
for attr in set(Client._fields) - set(["id", "registered"]):
val = kwargs.get(attr, None)
if val is not None: # guaranteed at least one is not None
if attr == "secret" and val == "": # disable secret
val = None
uquery.append("`%s` = %%s" % attr)
params.append(val)
query.append(", ".join(uquery))
if id is not None:
query.append("WHERE id = %s")
params.append(id)
return (
[" ".join(query), 'SELECT LAST_INSERT_ID() AS id'],
[params, []],
1
)
def _build_get_debug_version(self):
return ["SELECT VERSION() AS version"], [()], 0
def _build_get_debug_tablestat(self):
return ["SHOW TABLE STATUS"], [()], 0
def _load_event_json(self, data):
"""Return decoded json from data loaded from database, if unable to decode, return None"""
try:
return json.loads(data)
except Exception:
return None
def _build_fetch_events(
self, client, id, count,
cat, nocat, tag, notag, group, nogroup):
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
params = [id or 0]
if cat or nocat:
cats = self.getMaps(self.catmap, (cat or nocat))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" %
(self._get_not(cat), self._get_comma_perc(cats))
)
params.extend(cats)
if tag or notag:
tags = self.getMaps(self.tagmap, (tag or notag))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" %
(self._get_not(tag), self._get_comma_perc(tags))
)
params.extend(tags)
if group or nogroup:
subquery = []
for name in (group or nogroup):
escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE
subquery.append("c.name = %s") # exact client
params.append(name)
subquery.append("c.name LIKE CONCAT(%s, '.%%') ESCAPE '&'") # whole subtree
params.append(escaped_name)
query.append(" AND %s (%s)" %
(self._get_not(group), " OR ".join(subquery)))
query.append(" AND e.valid = 1 LIMIT %s")
params.append(count)
return ["".join(query)], [params], 0
def _build_store_events_event(self, client, event, raw_event):
"""Build query and params for event insertion"""
return (
[
"INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)",
"SELECT LAST_INSERT_ID() AS id"
],
[(client.id, raw_event), ()],
1
)
def _build_store_events_categories(self, event_id, cat_ids):
"""Build query and params for insertion of event-categories mapping"""
return (
["INSERT INTO event_category_mapping (event_id,category_id) VALUES " +
self._get_comma_perc_n(2, cat_ids)],
[tuple(param for cat_id in cat_ids for param in (event_id, cat_id))],
None
)
def _build_store_events_tags(self, event_id, tag_ids):
"""Build query and params for insertion of event-tags mapping"""
return (
["INSERT INTO event_tag_mapping (event_id,tag_id) VALUES " +
self._get_comma_perc_n(2, tag_ids)],
[tuple(param for tag_id in tag_ids for param in (event_id, tag_id))],
None
)
def _build_insert_last_received_id(self, client, id):
"""Build query and params for insertion of the last event id received by client"""
return (
["INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())"],
[(client.id, id)],
None
)
def _build_get_last_event_id(self):
"""Build query and params for querying the id of the last inserted event"""
return ["SELECT MAX(id) as id FROM events"], [()], 0
def _build_get_last_received_id(self, client):
"""Build query and params for querying the last event id received by client"""
return (
["SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1"],
[(client.id,)],
0
)
def _build_load_maps_tags(self):
"""Build query and params for updating the tag map"""
return (
[
"DELETE FROM tags",
"INSERT INTO tags(id, tag) VALUES " +
self._get_comma_perc_n(2, self.tagmap)
],
[
(),
tuple(param for tag, num in self.tagmap.items() for param in (num, tag))
],
None
)
def _build_load_maps_cats(self):
"""Build query and params for updating the catetgory map"""
params = []
for cat_subcat, num in self.catmap.items():
catsplit = cat_subcat.split(".", 1)
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
params.extend((num, category, subcategory, cat_subcat))
return (
[
"DELETE FROM categories",
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES " +
self._get_comma_perc_n(4, self.catmap)
],
[
(),
tuple(params)
],
None
)
def _build_purge_lastlog(self, days):
"""Build query and params for purging stored client last event mapping older than days"""
return (
[
"DELETE FROM last_events " "DELETE FROM last_events "
" USING last_events LEFT JOIN (" " USING last_events LEFT JOIN ("
" SELECT MAX(id) AS last FROM last_events" " SELECT MAX(id) AS last FROM last_events"
" GROUP BY client_id" " GROUP BY client_id"
" ) AS maxids ON last=id" " ) AS maxids ON last=id"
" WHERE timestamp < DATE_SUB(CURDATE(), INTERVAL %s DAY) AND last IS NULL", " WHERE timestamp < DATE_SUB(CURDATE(), INTERVAL %s DAY) AND last IS NULL",
(days,)).rowcount ],
[(days,)],
0
)
def _build_purge_events_get_id(self, days):
"""Build query and params to get largest event id of events older than days"""
return (
[
"SELECT MAX(id) as id"
" FROM events"
" WHERE received < DATE_SUB(CURDATE(), INTERVAL %s DAY)"
],
[(days,)],
0
)
def _build_purge_events_events(self, id_):
"""Build query and params to remove events older then days and their mappings"""
return (
[
"DELETE FROM event_category_mapping WHERE event_id <= %s",
"DELETE FROM event_tag_mapping WHERE event_id <= %s",
"DELETE FROM events WHERE id <= %s",
],
[(id_,), (id_,), (id_,)],
2
)
class PostgreSQL(DataBase):
def purge_events(self, days): def __init__(
with self as db: self, req, log, host, user, password, dbname, port, retry_count,
affected = 0 retry_pause, event_size_limit, catmap_filename, tagmap_filename):
id_ = db.query(
super(DataBase, self).__init__(req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename)
import psycopg2 as db
from psycopg2 import sql as ppgsql
import psycopg2.extras as ppgextra
self.db = db
self.ppgsql = ppgsql
self.ppgextra = ppgextra
def connect(self):
self.con = self.db.connect(
host=self.host, user=self.user, password=self.password,
dbname=self.dbname, port=self.port, cursor_factory=self.ppgextra.RealDictCursor)
def _build_get_client_by_name(self, cert_names=None, name=None, secret=None):
"""Build query and params for client lookup"""
query = ["SELECT * FROM clients WHERE valid"]
params = []
if name:
query.append(" AND name = %s")
params.append(name.lower())
if secret:
query.append(" AND secret = %s")
params.append(secret)
if cert_names:
query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
params.extend(n.lower() for n in cert_names)
return ["".join(query)], [params], 0
def _build_get_clients(self, id):
"""Build query and params for client lookup by id"""
query = ["SELECT * FROM clients"]
params = []
if id:
query.append("WHERE id = %s")
params.append(id)
query.append("ORDER BY id")
return [" ".join(query)], [params], 0
def _build_add_modify_client(self, id, **kwargs):
"""Build query and params for adding/modifying client"""
fields = set(Client._fields) - {"id", "registered"}
cols, params = map(
list,
zip(
*(
(k, None) # disable secret
if k == "secret" and v == "" else
(k, v)
for k, v in kwargs.items()
if v is not None and k in fields
)
)
)
if id is None:
query = self.ppgsql.SQL('INSERT INTO clients ("registered", {}) VALUES (NOW(), {}) RETURNING id').format(
self.ppgsql.SQL(", ").join(map(self.ppgsql.Identifier, cols)),
self.ppgsql.SQL(", ").join(self.ppgsql.Placeholder() * len(cols))
)
elif not cols:
return ["SELECT %s AS id"], [(id,)], 0
else:
query = self.ppgsql.SQL("UPDATE clients SET {} WHERE id = {} RETURNING id").format(
self.ppgsql.SQL(", ").join(
self.ppgsql.SQL("{} = {}").format(
self.ppgsql.Identifier(col),
self.ppgsql.Placeholder()
) for col in cols
),
self.ppgsql.Placeholder()
)
params.append(id)
return [query], [params], 0
def _build_get_debug_version(self):
return ["SELECT setting AS version FROM pg_settings WHERE name = 'server_version'"], [()], 0
def _build_get_debug_tablestat(self):
return [
"SELECT "
'tablename AS "Name", '
'relnatts AS "Columns", '
'n_live_tup AS "Rows", '
'pg_catalog.pg_size_pretty(pg_catalog.pg_table_size(oid)) AS "Table_size", '
'pg_catalog.pg_size_pretty(pg_catalog.pg_indexes_size(oid)) AS "Index_size", '
'coll.collations AS "Collations" '
"FROM "
"pg_catalog.pg_tables tbls "
"LEFT OUTER JOIN pg_catalog.pg_class cls "
"ON tbls.tablename=cls.relname "
"LEFT OUTER JOIN pg_catalog.pg_stat_user_tables sut "
"ON tbls.tablename=sut.relname "
"LEFT OUTER JOIN ("
"SELECT "
"table_name, "
"string_agg("
"DISTINCT COALESCE("
"collation_name, "
"("
"SELECT "
"datcollate "
"FROM "
"pg_catalog.pg_database "
"WHERE "
"datname=%s"
")"
"), "
"','"
") AS collations "
"FROM "
"information_schema.columns "
"GROUP BY "
"table_name"
") coll "
"ON tbls.tablename=coll.table_name "
"WHERE "
"tbls.schemaname='public' "
"AND tbls.tableowner=%s"
], [(self.dbname, self.user)], 0
def _load_event_json(self, data):
"""Return decoded json from data loaded from database, if unable to decode, return None"""
try:
return json.loads(data.tobytes())
except Exception:
return None
def _build_fetch_events(
self, client, id, count,
cat, nocat, tag, notag, group, nogroup):
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
params = [id or 0]
if cat or nocat:
cats = self.getMaps(self.catmap, (cat or nocat))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" %
(self._get_not(cat), self._get_comma_perc(cats))
)
params.extend(cats)
if tag or notag:
tags = self.getMaps(self.tagmap, (tag or notag))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" %
(self._get_not(tag), self._get_comma_perc(tags))
)
params.extend(tags)
if group or nogroup:
subquery = []
for name in group or nogroup:
name = name.lower() # assumes only lowercase names
escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE
subquery.append("c.name = %s") # exact client
params.append(name)
subquery.append("c.name LIKE %s || '.%%' ESCAPE '&'") # whole subtree
params.append(escaped_name)
query.append(" AND %s (%s)" % (self._get_not(group), " OR ".join(subquery)))
query.append(" AND e.valid LIMIT %s")
params.append(count)
return ["".join(query)], [params], 0
def _build_store_events_event(self, client, event, raw_event):
"""Build query and params for event insertion"""
return (
["INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s) RETURNING id"],
[(client.id, self.db.Binary(raw_event.encode('utf8')))],
0
)
def _build_store_events_categories(self, event_id, cat_ids):
"""Build query and params for insertion of event-categories mapping"""
return (
["INSERT INTO event_category_mapping (event_id,category_id) VALUES " +
self._get_comma_perc_n(2, cat_ids)],
[tuple(param for cat_id in cat_ids for param in (event_id, cat_id))],
None
)
def _build_store_events_tags(self, event_id, tag_ids):
"""Build query and params for insertion of event-tags mapping"""
return (
["INSERT INTO event_tag_mapping (event_id,tag_id) VALUES " +
self._get_comma_perc_n(2, tag_ids)],
[tuple(param for tag_id in tag_ids for param in (event_id, tag_id))],
None
)
def _build_insert_last_received_id(self, client, id):
"""Build query and params for insertion of the last event id received by client"""
return (
["INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())"],
[(client.id, None if id == 1 else id)],
None
)
def _build_get_last_event_id(self):
"""Build query and params for querying the id of the last inserted event"""
return ["SELECT MAX(id) as id FROM events"], [()], 0
def _build_get_last_received_id(self, client):
"""Build query and params for querying the last event id received by client"""
return (
["SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1"],
[(client.id,)],
0
)
def _build_load_maps_tags(self):
"""Build query and params for updating the tag map"""
return (
[
"ALTER TABLE event_tag_mapping DROP CONSTRAINT event_tag_mapping_tag_id_fk",
"DELETE FROM tags",
"INSERT INTO tags(id, tag) VALUES " +
self._get_comma_perc_n(2, self.tagmap),
'ALTER TABLE event_tag_mapping ADD CONSTRAINT "event_tag_mapping_tag_id_fk" FOREIGN KEY ("tag_id") REFERENCES "tags" ("id")'
],
[(), (), tuple(param for tag, num in self.tagmap.items() for param in (num, tag)), ()],
None
)
def _build_load_maps_cats(self):
"""Build query and params for updating the catetgory map"""
params = []
for cat_subcat, num in self.catmap.items():
catsplit = cat_subcat.split(".", 1)
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
params.extend((num, category, subcategory, cat_subcat))
return (
[
"ALTER TABLE event_category_mapping DROP CONSTRAINT event_category_mapping_category_id_fk",
"DELETE FROM categories",
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES " +
self._get_comma_perc_n(4, self.catmap),
'ALTER TABLE event_category_mapping ADD CONSTRAINT "event_category_mapping_category_id_fk" FOREIGN KEY ("category_id") REFERENCES "categories" ("id")'
],
[(), (), tuple(params), ()],
None
)
def _build_purge_lastlog(self, days):
"""Build query and params for purging stored client last event mapping older than days"""
return (
[
"DELETE FROM last_events "
" USING last_events le LEFT JOIN ("
" SELECT MAX(id) AS last FROM last_events"
" GROUP BY client_id"
" ) AS maxids ON maxids.last=le.id"
" WHERE le.timestamp < CURRENT_DATE - INTERVAL %s DAY AND maxids.last IS NULL"
],
[(str(days),)],
0
)
def _build_purge_events_get_id(self, days):
"""Build query and params to get largest event id of events older than days"""
return (
[
"SELECT MAX(id) as id" "SELECT MAX(id) as id"
" FROM events" " FROM events"
" WHERE received < DATE_SUB(CURDATE(), INTERVAL %s DAY)", " WHERE received < CURRENT_DATE - INTERVAL %s DAY"
(days,) ],
).fetchall()[0]["id"] [(str(days),)],
if id_ is None: 0
return 0 )
affected = db.query("DELETE FROM events WHERE id <= %s", (id_,)).rowcount
db.query("DELETE FROM event_category_mapping WHERE event_id <= %s", (id_,)) def _build_purge_events_events(self, id_):
db.query("DELETE FROM event_tag_mapping WHERE event_id <= %s", (id_,)) """Build query and params to remove events older then days and their mappings"""
return affected return ["DELETE FROM events WHERE id <= %s"], [(id_,)], 0
def expose(read=1, write=0, debug=0): def expose(read=True, write=False, debug=False):
def expose_deco(meth): def expose_deco(meth):
meth.exposed = True meth.exposed = True
...@@ -1019,7 +1597,7 @@ class WardenHandler(ObjectBase): ...@@ -1019,7 +1597,7 @@ class WardenHandler(ObjectBase):
self.get_events_limit = get_events_limit self.get_events_limit = get_events_limit
self.description = description self.description = description
@expose(read=1, debug=1) @expose(read=True, debug=True)
@json_wrapper @json_wrapper
def getDebug(self): def getDebug(self):
return { return {
...@@ -1043,7 +1621,7 @@ class WardenHandler(ObjectBase): ...@@ -1043,7 +1621,7 @@ class WardenHandler(ObjectBase):
} }
} }
@expose(read=1) @expose(read=True)
@json_wrapper @json_wrapper
def getInfo(self): def getInfo(self):
info = { info = {
...@@ -1055,7 +1633,7 @@ class WardenHandler(ObjectBase): ...@@ -1055,7 +1633,7 @@ class WardenHandler(ObjectBase):
info["description"] = self.description info["description"] = self.description
return info return info
@expose(read=1) @expose(read=True)
@json_wrapper @json_wrapper
def getEvents( def getEvents(
self, id=None, count=None, self, id=None, count=None,
...@@ -1129,7 +1707,7 @@ class WardenHandler(ObjectBase): ...@@ -1129,7 +1707,7 @@ class WardenHandler(ObjectBase):
ev_ids.append(id) ev_ids.append(id)
return errlist return errlist
@expose(write=1) @expose(write=True)
@json_wrapper @json_wrapper
def sendEvents(self, events=[]): def sendEvents(self, events=[]):
if not isinstance(events, list): if not isinstance(events, list):
...@@ -1245,7 +1823,7 @@ section_order = ("log", "db", "auth", "validator", "handler", "server") ...@@ -1245,7 +1823,7 @@ section_order = ("log", "db", "auth", "validator", "handler", "server")
# "type" keyword in section may be used to choose other # "type" keyword in section may be used to choose other
section_def = { section_def = {
"log": [FileLogger, SysLogger], "log": [FileLogger, SysLogger],
"db": [MySQL], "db": [MySQL, PostgreSQL],
"auth": [X509NameAuthenticator, PlainAuthenticator, X509Authenticator, X509MixMatchAuthenticator], "auth": [X509NameAuthenticator, PlainAuthenticator, X509Authenticator, X509MixMatchAuthenticator],
"validator": [JSONSchemaValidator, NoValidator], "validator": [JSONSchemaValidator, NoValidator],
"handler": [WardenHandler], "handler": [WardenHandler],
...@@ -1305,8 +1883,22 @@ param_def = { ...@@ -1305,8 +1883,22 @@ param_def = {
"retry_pause": {"type": "natural", "default": 3}, "retry_pause": {"type": "natural", "default": 3},
"retry_count": {"type": "natural", "default": 3}, "retry_count": {"type": "natural", "default": 3},
"event_size_limit": {"type": "natural", "default": 5*1024*1024}, "event_size_limit": {"type": "natural", "default": 5*1024*1024},
"catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_mysql.json")}, "catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
"tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_mysql.json")} "tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
},
PostgreSQL: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"host": {"type": "str", "default": "localhost"},
"user": {"type": "str", "default": "warden"},
"password": {"type": "str", "default": ""},
"dbname": {"type": "str", "default": "warden3"},
"port": {"type": "natural", "default": 5432},
"retry_pause": {"type": "natural", "default": 3},
"retry_count": {"type": "natural", "default": 3},
"event_size_limit": {"type": "natural", "default": 5*1024*1024},
"catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
"tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
}, },
WardenHandler: { WardenHandler: {
"req": {"type": "obj", "default": "req"}, "req": {"type": "obj", "default": "req"},
...@@ -1453,11 +2045,11 @@ def list_clients(id=None): ...@@ -1453,11 +2045,11 @@ def list_clients(id=None):
def register_client(**kwargs): def register_client(**kwargs):
# argparse does _always_ return something, so we cannot rely on missing arguments # argparse does _always_ return something, so we cannot rely on missing arguments
if kwargs["valid"] is None: kwargs["valid"] = 1 if kwargs["valid"] is None: kwargs["valid"] = True
if kwargs["read"] is None: kwargs["read"] = 1 if kwargs["read"] is None: kwargs["read"] = True
if kwargs["write"] is None: kwargs["write"] = 0 if kwargs["write"] is None: kwargs["write"] = False
if kwargs["debug"] is None: kwargs["debug"] = 0 if kwargs["debug"] is None: kwargs["debug"] = False
if kwargs["test"] is None: kwargs["test"] = 1 if kwargs["test"] is None: kwargs["test"] = True
return modify_client(id=None, **kwargs) return modify_client(id=None, **kwargs)
...@@ -1562,33 +2154,33 @@ def add_client_args(subargp, mod=False): ...@@ -1562,33 +2154,33 @@ def add_client_args(subargp, mod=False):
reg_valid = subargp.add_mutually_exclusive_group(required=False) reg_valid = subargp.add_mutually_exclusive_group(required=False)
reg_valid.add_argument( reg_valid.add_argument(
"--valid", action="store_const", const=1, default=None, "--valid", action="store_const", const=True, default=None,
help="valid client (default)") help="valid client (default)")
reg_valid.add_argument("--novalid", action="store_const", const=0, dest="valid", default=None) reg_valid.add_argument("--novalid", action="store_const", const=False, dest="valid", default=None)
reg_read = subargp.add_mutually_exclusive_group(required=False) reg_read = subargp.add_mutually_exclusive_group(required=False)
reg_read.add_argument( reg_read.add_argument(
"--read", action="store_const", const=1, default=None, "--read", action="store_const", const=True, default=None,
help="client is allowed to read (default)") help="client is allowed to read (default)")
reg_read.add_argument("--noread", action="store_const", const=0, dest="read", default=None) reg_read.add_argument("--noread", action="store_const", const=False, dest="read", default=None)
reg_write = subargp.add_mutually_exclusive_group(required=False) reg_write = subargp.add_mutually_exclusive_group(required=False)
reg_write.add_argument( reg_write.add_argument(
"--nowrite", action="store_const", const=0, dest="write", default=None, "--nowrite", action="store_const", const=False, dest="write", default=None,
help="client is allowed to send (default - no)") help="client is allowed to send (default - no)")
reg_write.add_argument("--write", action="store_const", const=1, default=None) reg_write.add_argument("--write", action="store_const", const=True, default=None)
reg_debug = subargp.add_mutually_exclusive_group(required=False) reg_debug = subargp.add_mutually_exclusive_group(required=False)
reg_debug.add_argument( reg_debug.add_argument(
"--nodebug", action="store_const", const=0, dest="debug", default=None, "--nodebug", action="store_const", const=False, dest="debug", default=None,
help="client is allowed receive debug output (default - no)") help="client is allowed receive debug output (default - no)")
reg_debug.add_argument("--debug", action="store_const", const=1, default=None) reg_debug.add_argument("--debug", action="store_const", const=True, default=None)
reg_test = subargp.add_mutually_exclusive_group(required=False) reg_test = subargp.add_mutually_exclusive_group(required=False)
reg_test.add_argument( reg_test.add_argument(
"--test", action="store_const", const=1, default=None, "--test", action="store_const", const=True, default=None,
help="client is yet in testing phase (default - yes)") help="client is yet in testing phase (default - yes)")
reg_test.add_argument("--notest", action="store_const", const=0, dest="test", default=None) reg_test.add_argument("--notest", action="store_const", const=False, dest="test", default=None)
def get_args(): def get_args():
...@@ -1663,9 +2255,7 @@ def get_args(): ...@@ -1663,9 +2255,7 @@ def get_args():
subargp_loadmaps = subargp.add_parser( subargp_loadmaps = subargp.add_parser(
"loadmaps", add_help=False, "loadmaps", add_help=False,
description=( description=(
"Load 'categories' and 'tags' table from 'catmap_mysql.json' and 'tagmap_mysql.json'." "Load 'categories' and 'tags' table from 'catmap_db.json' and 'tagmap_db.json'."
" Note that this is NOT needed for server at all, load them into db at will,"
" should you need to run your own specific SQL queries on data directly."
" Note also that previous content of both tables will be lost."), " Note also that previous content of both tables will be lost."),
help="load catmap and tagmap into db") help="load catmap and tagmap into db")
subargp_loadmaps.set_defaults(command=load_maps) subargp_loadmaps.set_defaults(command=load_maps)
......