Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Pavel.Valach/warden
1 result
Show changes
Commits on Source (21)
......@@ -3,6 +3,5 @@
"certfile": "cert.pem",
"keyfile": "key.pem",
"filelog": {"level": "debug"},
"name": "org.example.warden_client",
"secret": "ToP_SeCrEt"
"name": "org.example.warden_client"
}
......@@ -42,13 +42,13 @@ B. Dependencies
2. Python modules
python-mysqldb 5.3.3+
python-mysqldb 5.3.3+ | python-psycopg2 2.8.6+
python-m2crypto 0.20+
jsonschema 2.4+
3. Database
MySQL | MariaDB >= 5.5
MySQL | MariaDB >= 5.5 | PostgreSQL >= 13
------------------------------------------------------------------------------
C. Installation
......@@ -70,14 +70,28 @@ C. Installation
> GRANT ALL ON warden3.* TO `warden`@`localhost`;
> FLUSH PRIVILEGES;
# psql
> CREATE DATABASE warden3 ENCODING 'UTF-8';
> CREATE ROLE "warden" LOGIN PASSWORD 'example';
> GRANT ALL ON DATABASE "warden3" TO "warden";
* Create necessary table structure
mysql -p -u warden warden3 < warden_3.0.sql
mysql -p -u warden warden3 < warden_3.0_mysql.sql
or
psql -U warden -h localhost warden3 < warden_3.0_postgres.sql
* Get up to date Idea schema
wget -O warden_server/idea.schema https://idea.cesnet.cz/_media/en/idea0.schema
* Load category and tag maps into database (This step is optional for MySQL dbms)
./warden_server.py loadmaps
* Enable mod_wsgi, mod_ssl, include Warden configuration
This depends heavily on your distribution and Apache configuration.
......@@ -129,7 +143,7 @@ particular implementation object of the aspect, for example type of logger
Sections and their "type" objects can be:
Log: FileLogger, SysLogger
DB: MySQL
DB: MySQL, PostgreSQL
Auth: X509Authenticator, X509NameAuthenticator,
X509MixMatchAuthenticator,PlainAuthenticator
Validator: JSONSchemaValidator, NoValidator
......@@ -186,22 +200,36 @@ object from particular section list is used ("FileLogger" for example).
retry_count: number of retries, defaults to 3
event_size_limit: max size of serialized event, defaults to 5 MB
catmap_filename: IDEA category mapping to database ids, defaults to
"catmap_mysql.json" at installation directory
"catmap_db.json" at installation directory
tagmap_filename": IDEA node type mapping to database ids, defaults to
"tagmap_db.json" at installation directory
PostgreSQL: database storage backend
host: database server host, default "localhost"
user: database user, default "warden"
password: database password
dbname: database name, default "warden3"
port: database server port, default 5432
retry_pause: retry in case of database errors, in seconds, defaults to 5
retry_count: number of retries, defaults to 3
event_size_limit: max size of serialized event, defaults to 5 MB
catmap_filename: IDEA category mapping to database ids, defaults to
"catmap_db.json" at installation directory
tagmap_filename": IDEA node type mapping to database ids, defaults to
"tagmap_mysql.json" at installation directory
"tagmap_db.json" at installation directory
WardenHandler: Main Warden RPC worker
send_events_limit: max events sent in one bunch, defaults to 10000
get_events_limit: max events received in one bunch, defaults to 10000
description: human readable description, sent in server info
------------------------------------------------------------------------------
E. Command line
When run from command line, server offers set of commands and options for
runtime and database management. You can also use --help option for each
command and for server itself.
warden_server.py [--help] [-c CONFIG] <command>
optional arguments:
......@@ -285,11 +313,9 @@ warden_server.py purge [--help] [-l] [-e] [-d DAYS]
warden_server.py loadmaps [--help]
Load 'categories' and 'tags' table from 'catmap_mysql.json' and
'tagmap_mysql.json'. Note that this is NOT needed for server at all, load
them into db at will, should you need to run your own specific SQL queries
on data directly. Note also that previous content of both tables will be
lost.
Load 'categories' and 'tags' table from 'catmap_db.json' and
'tagmap_db.json'. Note also that previous content of both tables
will be lost.
------------------------------------------------------------------------------
Copyright (C) 2011-2015 Cesnet z.s.p.o
......@@ -21,7 +21,8 @@ B. Compatibility
* The test suite, just like the Warden Server, is compatible with both Python2
(tested on 2.7) and Python3 (tested on 3.6).
* Just like Warden Server, the test suite requires a local MySQL installation.
* Just like Warden Server, the test suite requires a local MySQL or PostgreSQL
installation.
* It is safe to run the test suite on a production system. For testing,
a database distinct from the default production one is used. Also, the user
account used for accessing the testing database is set for local login only.
......@@ -56,7 +57,11 @@ D. Usage
Before running the tests (for the first time), a DB user with required rights
must be created. An easy way to do it is:
./test_warden_server.py --init
This will prompt for MySQL root password.
This will prompt for the database administrator account ('root' for MySQL and
'postgres' for PostgreSQL) password. Please note that by default, the user
'postgres' can only be authenticated using the peer authentication method,
which requires that the script is run by the operating system user 'postgres'.
When this is the case, the password is not required.
Standard usage for testing:
./test_warden_server.py
......@@ -64,16 +69,22 @@ Standard usage for testing:
Advanced usage:
./test_warden_server.py --help
usage: test_warden_server.py [-h] [-i] [-n]
usage: test_warden_server.py [-h] [-i] [-n] [-d {MySQL,PostgreSQL}]
Warden3 Server Test Suite
optional arguments:
-h, --help show this help message and exit
-i, --init Set up an user with rights to CREATE/DROP the
test database
-n, --nopurge Skip the database purge after running the tests
-h, --help show this help message and exit
-d {MySQL,PostgreSQL}, --dbms {MySQL,PostgreSQL}
Database management system to use for
testing
-i, --init Set up an user with rights to
CREATE/DROP the test database
-n, --nopurge Skip the database purge after running
the tests
Option -d (--dbms) sets the databse management system to use for testing.
If this option is not provided, MySQL is used as default.
Option -n (--nopurge) is meant for debugging purposes and test development, it
keeps the test database around for inspection after running the tests.
......
......@@ -6,9 +6,8 @@ import argparse
import getpass
import sys
import warnings
import json
from os import path
from copy import deepcopy
import MySQLdb as my
from warden_server import build_server
import warden_server
......@@ -26,48 +25,19 @@ USER = 'warden3test'
PASSWORD = 'h7w*D>4B)3omcvLM$oJp'
DB = 'w3test'
def setUpModule(): # pylint: disable = locally-disabled, invalid-name
"""Initialize the test database"""
print(__doc__)
conn = None
try:
conn = my.connect(user=USER, passwd=PASSWORD)
cur = conn.cursor()
with warnings.catch_warnings(): # The database is not supposed to exist
warnings.simplefilter("ignore")
cur.execute("DROP DATABASE IF EXISTS %s" % (DB,)) # NOT SECURE
cur.execute("CREATE DATABASE %s" % (DB,)) # NOT SECURE
cur.execute("USE %s" % (DB,)) # NOT SECURE
with open(path.join(path.dirname(__file__), 'warden_3.0.sql')) as script:
statements = ''.join([line.replace('\n', '') for line in script if line[0:2] != '--']).split(';')[:-1]
for statement in statements:
cur.execute(statement)
cur.execute("INSERT INTO clients VALUES(NULL, NOW(), 'warden-info@cesnet.cz', 'test.server.warden.cesnet.cz', NULL, 1, 'cz.cesnet.warden3test', 'abc', 1, 1, 1, 0)")
conn.commit()
except my.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
print('Setup failed, have you tried --init ? Original exception: %s' % (str(ex),))
exit()
finally:
if conn:
conn.close()
DBMS.set_up()
NO_PURGE = False
DBMS = None
def tearDownModule(): # pylint: disable = locally-disabled, invalid-name
"""Clean up by purging the test database"""
if not NO_PURGE:
conn = my.connect(user=USER, passwd=PASSWORD)
cur = conn.cursor()
cur.execute("DROP DATABASE IF EXISTS %s" % (DB,)) # NOT SECURE
conn.commit()
conn.close()
DBMS.tear_down()
class ReadableSTR(str):
......@@ -123,8 +93,6 @@ class Request(object):
class Warden3ServerTest(unittest.TestCase):
"""High level Warden3 Server tests"""
config = {'log': {'level': 'debug'}, 'validator': {'type': 'NoValidator'}, 'auth': {'type': 'PlainAuthenticator'},
'db': {'user': USER, 'password': PASSWORD, 'dbname': DB}, 'handler': {'description': 'Warden Test Server'}}
getInfo_interface_tests_specific = [
("/getInfo", "403 I'm watching. Authenticate."),
......@@ -138,22 +106,26 @@ class Warden3ServerTest(unittest.TestCase):
("/getEvents?secret=123", "403 I'm watching. Authenticate.", None),
]
@staticmethod
def get_config():
return {
'log': {'level': 'debug'},
'validator': {'type': 'NoValidator'},
'auth': {'type': 'PlainAuthenticator'},
'db': {'type': DBMS.name, 'user': USER, 'password': PASSWORD, 'dbname': DB},
'handler': {'description': 'Warden Test Server'}
}
@classmethod
def setUpClass(cls):
"""Pre-test cleanup"""
cls.clean_lastid()
cls.app = build_server(cls.config)
cls.app = build_server(cls.get_config())
@classmethod
def clean_lastid(cls):
"""Cleans the lastid information for all clients"""
conn = my.connect(user=USER, passwd=PASSWORD, db=DB)
cur = conn.cursor()
cur.execute("DELETE FROM events")
cur.execute("DELETE FROM last_events")
cur.close()
conn.commit()
conn.close()
DBMS.clean_lastid()
def test_getInfo_interface(self): # pylint: disable = locally-disabled, invalid-name
"""Tests the getInfo method invocation"""
......@@ -228,12 +200,17 @@ class Warden3ServerTest(unittest.TestCase):
("/sendEvents?secret=abc", "", "200 OK", ['{"saved": 0}']),
("/sendEvents?secret=abc", "{'test': 'true'}", "400 Deserialization error.", None),
("/sendEvents?secret=abc", '{"test": "true"}', "400 List of events expected.", None),
("/sendEvents?secret=abc", '[{"test": "true"}]', "422 Event does not bear valid Node attribute", None),
("/sendEvents?secret=abc", '[{"Node": ["test", "test2"]}]', "422 Event does not bear valid Node attribute", None),
("/sendEvents?secret=abc", '[{"Node": ["Name", "test"]}]', "422 Event does not bear valid Node attribute", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name"}]}]', "400 Deserialization error.", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "test"}]}]', "422 Node does not correspond with saving client", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}]}]', "200 OK", ['{"saved": 1}']),
("/sendEvents?secret=abc", '[{"test": "true"}]', "422 Missing IDEA ID", None),
("/sendEvents?secret=abc", '[{"test": "true", "ID": "120820201142"}]', "422 Event does not bear valid Node attribute", None),
("/sendEvents?secret=abc", '[{"Node": ["test", "test2"], "ID": "120820201142"}]', "422 Event does not bear valid Node attribute", None),
("/sendEvents?secret=abc", '[{"Node": ["Name", "test"], "ID": "120820201142"}]', "422 Event does not bear valid Node attribute", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name"}], "ID": "120820201142"}]', "400 Deserialization error.", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "test"}], "ID": "120820201142"}]', "422 Node does not correspond with saving client", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "test"}], "ID": "verylongideaidverylongideaidverylongideaidverylongideaidverylongideaid"}]', "422 The provided event ID is too long", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "verylongideaidverylongideaidverylongideaidverylongideaidverylongideaid"}]', "422 The provided event ID is too long", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "ideaidcontaininga\\u0000byte"}]', "422 IDEA ID cannot contain null bytes", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "verylongideaidverylongideaid\\u0000verylongideaidverylongideaidverylongideaid"}]', "422 Multiple errors", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201142"}]', "200 OK", ['{"saved": 1}']),
]
for query, payload, expected_status, expected_response in tests:
with self.subTest(query=query, payload=payload, expected_status=expected_status, expected_response=expected_response):
......@@ -245,14 +222,22 @@ class Warden3ServerTest(unittest.TestCase):
class X509AuthenticatorTest(Warden3ServerTest):
"""Performs the basic test suite using the X509Authenticator"""
config = deepcopy(Warden3ServerTest.config)
config['auth']['type'] = 'X509Authenticator'
@staticmethod
def get_config():
config = Warden3ServerTest.get_config()
config['auth']['type'] = 'X509Authenticator'
return config
class X509NameAuthenticatorTest(Warden3ServerTest):
"""Performs the basic test suite using the X509NameAuthenticator"""
config = deepcopy(Warden3ServerTest.config)
config['auth']['type'] = 'X509NameAuthenticator'
@staticmethod
def get_config():
config = Warden3ServerTest.get_config()
config['auth']['type'] = 'X509NameAuthenticator'
return config
getInfo_interface_tests_specific = [
("/getInfo", "200 OK"),
......@@ -271,8 +256,13 @@ class WScliTest(unittest.TestCase):
"""Tester of the Warden Server command line interface"""
@classmethod
def setUpClass(cls):
cls.config = {'log': {'level': 'debug'}, 'validator': {'type': 'NoValidator'}, 'auth': {'type': 'PlainAuthenticator'},
'db': {'user': USER, 'password': PASSWORD, 'dbname': DB}, 'handler': {'description': 'Warden Test Server'}}
cls.config = {
'log': {'level': 'debug'},
'validator': {'type': 'NoValidator'},
'auth': {'type': 'PlainAuthenticator'},
'db': {'type': DBMS.name, 'user': USER, 'password': PASSWORD, 'dbname': DB},
'handler': {'description': 'Warden Test Server'}
}
warden_server.server = build_server(cls.config)
@staticmethod
......@@ -298,17 +288,6 @@ class WScliTest(unittest.TestCase):
sys.argv = argv_backup
return ret, out.getvalue(), err.getvalue()
@staticmethod
def do_sql_select(query, params):
"""Reads data from database"""
conn = my.connect(user=USER, passwd=PASSWORD, db=DB)
cur = conn.cursor()
cur.execute(query, params)
result = cur.fetchall()
cur.close()
conn.close()
return result
def test_list(self):
"""Tests the list command line option"""
tests = [
......@@ -392,7 +371,6 @@ class WScliTest(unittest.TestCase):
(['modify', '-i', 'CLIENT_ID', '--note', 'Valid until:', '20.1.2038'], 2,
(('a@b', 'test3.warden.cesnet.cz', 'cz.cesnet.warden.test3', 'top_secret', 1, 1, 0, 1, 0, 'Valid until: 18.01.2038'),)),
]
test_sql = "SELECT requestor, hostname, name, secret, valid, clients.read, debug, clients.write, test, note FROM clients WHERE id = %s"
client_id = None
for supplied_arguments, expected_return, expected_sql_result in tests:
with self.subTest(supplied_arguments=supplied_arguments, expected_return=expected_return, expected_sql_result=expected_sql_result):
......@@ -403,44 +381,303 @@ class WScliTest(unittest.TestCase):
client_id = int(out.split('\n')[-2].split(' ')[0])
except IndexError: # No modification was performed, keep the previous client_id
pass
result = self.do_sql_select(test_sql, (client_id,))
result = DBMS.do_sql_select(DBMS.reg_mod_test_query, (client_id,))
self.assertEqual(result, expected_sql_result)
def init_user():
"""DB user rights setup"""
conn = None
try:
conn = my.connect(user='root', passwd=getpass.getpass('Enter MySQL Root password:'))
with conn.cursor() as cur:
cur.execute("CREATE USER IF NOT EXISTS %s@'localhost' IDENTIFIED BY %s", (USER, PASSWORD))
cur.execute("GRANT SELECT, INSERT, UPDATE, CREATE, DELETE, DROP ON *.* TO %s@'localhost'", (USER,))
class MySQL:
name = "MySQL"
reg_mod_test_query = "SELECT requestor, hostname, name, secret, valid, clients.read, " \
"debug, clients.write, test, note FROM clients WHERE id = %s"
def __init__(self, user=USER, password=PASSWORD, dbname=DB):
import MySQLdb as my
self.my = my
self.user = user
self.password = password
self.dbname = dbname
def init_user(self):
"""DB user rights setup"""
conn = None
try:
conn = self.my.connect(user='root', passwd=getpass.getpass(
'Enter MySQL Root password:'))
with conn.cursor() as cur:
cur.execute(
"CREATE USER IF NOT EXISTS %s@'localhost' IDENTIFIED BY %s",
(self.user, self.password)
)
cur.execute(
"GRANT SELECT, INSERT, UPDATE, CREATE, DELETE, DROP ON *.* TO %s@'localhost'",
(self.user,)
)
conn.commit()
print("DB User set up successfuly")
except self.my.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
print('Connection unsuccessful, bad password? Original exception: %s' % (str(ex)))
exit()
except KeyboardInterrupt:
print("\nCancelled!")
exit()
finally:
if conn:
conn.close()
def set_up(self):
conn = None
try:
conn = self.my.connect(user=self.user, passwd=self.password)
cur = conn.cursor()
with warnings.catch_warnings(): # The database is not supposed to exist
warnings.simplefilter("ignore")
cur.execute("DROP DATABASE IF EXISTS %s" % (self.dbname,)) # NOT SECURE
cur.execute("CREATE DATABASE %s" % (self.dbname,)) # NOT SECURE
cur.execute("USE %s" % (self.dbname,)) # NOT SECURE
with open(path.join(path.dirname(__file__), 'warden_3.0_mysql.sql')) as script:
statements = ''.join(
[line.replace('\n', '') for line in script if line[0:2] != '--']
).split(';')[:-1]
for statement in statements:
cur.execute(statement)
cur.execute(
"INSERT INTO clients VALUES("
"NULL, NOW(), 'warden-info@cesnet.cz', 'test.server.warden.cesnet.cz',"
"NULL, 1, 'cz.cesnet.warden3test', 'abc', 1, 1, 1, 0"
")"
)
conn.commit()
except self.my.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
print('Setup failed, have you tried --init ? Original exception: %s' % (str(ex),))
exit()
finally:
if conn:
conn.close()
def do_sql_select(self, query, params):
"""Reads data from database"""
conn = self.my.connect(user=self.user, passwd=self.password, db=self.dbname)
cur = conn.cursor()
cur.execute(query, params)
result = cur.fetchall()
cur.close()
conn.close()
return result
def tear_down(self):
"""Clean up by purging the test database"""
conn = self.my.connect(user=self.user, passwd=self.password)
cur = conn.cursor()
cur.execute("DROP DATABASE IF EXISTS %s" % (self.dbname,)) # NOT SECURE
conn.commit()
print("DB User set up successfuly")
except my.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
print('Connection unsuccessful, bad password? Original exception: %s' % (str(ex)))
exit()
except KeyboardInterrupt:
print("\nCancelled!")
exit()
finally:
if conn:
conn.close()
conn.close()
def clean_lastid(self):
"""Cleans the lastid information for all clients"""
conn = self.my.connect(user=self.user, passwd=self.password, db=self.dbname)
cur = conn.cursor()
cur.execute("DELETE FROM last_events")
cur.execute("DELETE FROM events")
cur.close()
conn.commit()
conn.close()
class PostgreSQL:
name = "PostgreSQL"
reg_mod_test_query = "SELECT requestor, hostname, name, secret, valid, clients.read, " \
"debug, clients.write, test, note FROM clients WHERE id = %s"
def __init__(self, user=USER, password=PASSWORD, dbname=DB):
import psycopg2 as ppg
from psycopg2 import sql as ppgsql
self.ppg = ppg
self.ppgsql = ppgsql
self.user = user
self.password = password
self.dbname = dbname
def init_user(self):
"""DB user rights setup"""
running_as_postgres = getpass.getuser() == "postgres"
conn = None
try:
password = None if running_as_postgres else getpass.getpass("Enter PostgreSQL password for the user 'postgres':")
conn = self.ppg.connect(user="postgres", password=password)
with conn.cursor() as cur:
cur.execute(
self.ppgsql.SQL("CREATE ROLE {} PASSWORD {} CREATEDB LOGIN").format(
self.ppgsql.Identifier(self.user),
self.ppgsql.Placeholder()
),
(self.password,)
)
conn.commit()
print("DB User set up successfuly")
except self.ppg.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
if running_as_postgres:
print("Connection unsuccessful. Original exception: %s" % (str(ex)))
else:
print("Connection unsuccessful, bad password or meant to run as the user 'postgres'"
" (su postgres -c '%s --dbms PostgreSQL --init')? Original exception: %s" %
(path.join('.', path.normpath(sys.argv[0])), str(ex)))
exit()
except KeyboardInterrupt:
print("\nCancelled!")
exit()
finally:
if conn:
conn.close()
def _load_tags(self, cur):
with open(path.join(path.dirname(__file__), "tagmap_db.json")) as tagmapf:
tagmap = json.load(tagmapf)
for tag, num in tagmap.items():
cur.execute(
"INSERT INTO tags(id, tag) VALUES (%s, %s)", (num, tag))
def _load_cats(self, cur):
with open(path.join(path.dirname(__file__), "catmap_db.json")) as catmapf:
catmap = json.load(catmapf)
for cat_subcat, num in catmap.items():
catsplit = cat_subcat.split(".", 1)
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
cur.execute(
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES (%s, %s, %s, %s)",
(num, category, subcategory, cat_subcat)
)
def set_up(self):
conn = None
try:
conn = self.ppg.connect(user=self.user, password=self.password,
host='localhost', dbname='postgres')
conn.autocommit = True
cur = conn.cursor()
cur.execute(
self.ppgsql.SQL("DROP DATABASE IF EXISTS {}").format(
self.ppgsql.Identifier(self.dbname)
)
)
cur.execute(
self.ppgsql.SQL("CREATE DATABASE {}").format(
self.ppgsql.Identifier(self.dbname)
)
)
except self.ppg.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
print(
'Setup failed, have you tried --init ? Original exception: %s' % (str(ex),))
exit()
finally:
if conn:
conn.close()
conn = None
try:
conn = self.ppg.connect(user=self.user, password=self.password,
dbname=self.dbname, host='localhost')
cur = conn.cursor()
with open(path.join(path.dirname(__file__), 'warden_3.0_postgres.sql')) as script:
statements = script.read()
cur.execute(statements)
self._load_tags(cur)
self._load_cats(cur)
cur.execute(
"INSERT INTO clients "
"(registered, requestor, hostname, note, valid,"
" name, secret, read, debug, write, test) "
"VALUES(NOW(), 'warden-info@cesnet.cz', 'test.server.warden.cesnet.cz', "
"NULL, true, 'cz.cesnet.warden3test', 'abc', true, true, true, false)"
)
conn.commit()
except self.ppg.OperationalError as ex:
if conn:
conn.rollback()
conn.close()
conn = None
print(
'Something went wrong during database setup. Original exception: %s' % (str(ex),))
exit()
finally:
if conn:
conn.close()
def do_sql_select(self, query, params):
"""Reads data from database"""
conn = self.ppg.connect(user=self.user, password=self.password,
dbname=self.dbname, host='localhost')
cur = conn.cursor()
cur.execute(query, params)
result = cur.fetchall()
cur.close()
conn.close()
return tuple(result)
def tear_down(self):
"""Clean up by purging the test database"""
conn = self.ppg.connect(user=self.user, password=self.password,
dbname='postgres', host='localhost')
conn.autocommit = True
cur = conn.cursor()
cur.execute(
self.ppgsql.SQL("DROP DATABASE IF EXISTS {} WITH(FORCE)").format(
self.ppgsql.Identifier(self.dbname)
)
)
conn.close()
def clean_lastid(self):
"""Cleans the lastid information for all clients"""
conn = self.ppg.connect(
user=self.user, password=self.password, dbname=self.dbname, host='localhost')
cur = conn.cursor()
cur.execute("DELETE FROM last_events")
cur.execute("DELETE FROM events")
cur.close()
conn.commit()
conn.close()
database_types = {
'MySQL': MySQL,
'PostgreSQL': PostgreSQL
}
def main():
"""Parses arguments and acts accordingly"""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('-d', '--dbms', default='MySQL', choices=database_types, help='Database management system to use for testing')
parser.add_argument('-i', '--init', action='store_true', help='Set up an user with rights to CREATE/DROP the test database')
parser.add_argument('-n', '--nopurge', action='store_true', help='Skip the database purge after running the tests')
args = parser.parse_args()
global DBMS # pylint: disable = locally-disabled, global-statement
DBMS = database_types[args.dbms](USER, PASSWORD, DB)
if args.init:
init_user()
DBMS.init_user()
else:
if args.nopurge:
global NO_PURGE # pylint: disable = locally-disabled, global-statement
......
SET TimeZone='+00:00';
-- ---------------------------------------------------------
--
-- Database: "warden3"
--
-- --------------------------------------------------------
--
-- Table structure for table "categories"
--
CREATE TABLE IF NOT EXISTS "categories" (
"id" int NOT NULL UNIQUE,
"category" text NOT NULL,
"subcategory" text DEFAULT NULL,
"cat_subcat" text NOT NULL
);
CREATE INDEX IF NOT EXISTS "cat_sub" ON "categories" ("cat_subcat");
-- --------------------------------------------------------
--
-- Table structure for table "clients"
--
CREATE TABLE IF NOT EXISTS "clients" (
"id" SERIAL PRIMARY KEY,
"registered" timestamp NOT NULL DEFAULT '1970-01-01 00:00:00',
"requestor" text NOT NULL,
"hostname" text NOT NULL,
"note" text NULL,
"valid" boolean NOT NULL DEFAULT true,
"name" text NOT NULL,
"secret" text NULL,
"read" boolean NOT NULL DEFAULT true,
"debug" boolean NOT NULL DEFAULT false,
"write" boolean NOT NULL DEFAULT false,
"test" boolean NOT NULL DEFAULT false
);
CREATE INDEX IF NOT EXISTS "clients_1" ON "clients" ("valid", "secret", "hostname");
CREATE INDEX IF NOT EXISTS "clients_2" ON "clients" ("valid", "name");
-- --------------------------------------------------------
--
-- Table structure for table "events"
--
CREATE TABLE IF NOT EXISTS "events" (
"id" bigint PRIMARY KEY GENERATED ALWAYS AS IDENTITY (MINVALUE 2),
"received" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"client_id" int NOT NULL REFERENCES "clients" ("id"),
"data" bytea NOT NULL,
"valid" boolean NOT NULL DEFAULT true
);
CREATE INDEX IF NOT EXISTS "id" ON "events" ("id", "client_id");
CREATE INDEX IF NOT EXISTS "received" ON "events" ("received");
-- --------------------------------------------------------
--
-- Table structure for table "event_category_mapping"
--
CREATE TABLE IF NOT EXISTS "event_category_mapping" (
"event_id" bigint NOT NULL REFERENCES "events" ("id") ON DELETE CASCADE,
"category_id" int NOT NULL,
PRIMARY KEY ("event_id", "category_id"),
CONSTRAINT "event_category_mapping_category_id_fk" FOREIGN KEY ("category_id") REFERENCES "categories" ("id")
);
-- --------------------------------------------------------
--
-- Table structure for table "last_events"
--
CREATE TABLE IF NOT EXISTS "last_events" (
"id" SERIAL PRIMARY KEY,
"client_id" int NOT NULL REFERENCES "clients" ("id"),
"event_id" bigint REFERENCES "events" ("id"),
"timestamp" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS "client_id" ON "last_events" ("client_id", "event_id");
-- --------------------------------------------------------
--
-- Table structure for table "tags"
--
CREATE TABLE IF NOT EXISTS "tags" (
"id" int NOT NULL UNIQUE,
"tag" text NOT NULL
);
CREATE INDEX IF NOT EXISTS "id_tag_name" ON "tags" ("id", "tag");
CREATE INDEX IF NOT EXISTS "tag_name" ON "tags" ("tag");
-- --------------------------------------------------------
--
-- Table structure for table "event_tag_mapping"
--
CREATE TABLE IF NOT EXISTS "event_tag_mapping" (
"event_id" bigint NOT NULL REFERENCES "events" ("id") ON DELETE CASCADE,
"tag_id" int NOT NULL,
PRIMARY KEY ("event_id", "tag_id"),
CONSTRAINT "event_tag_mapping_tag_id_fk" FOREIGN KEY ("tag_id") REFERENCES "tags" ("id")
);
......@@ -6,6 +6,7 @@
from __future__ import print_function
import abc
import sys
import os
import io
......@@ -16,11 +17,10 @@ import json
import re
from traceback import format_tb
from collections import namedtuple
from itertools import repeat
from time import sleep
from random import randint
import M2Crypto.X509
import MySQLdb as my
import MySQLdb.cursors as mycursors
if sys.version_info[0] >= 3:
import configparser as ConfigParser
......@@ -47,89 +47,138 @@ from jsonschema import Draft4Validator
VERSION = "3.0-beta3"
class Encoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, Error):
return o.to_dict()
if isinstance(o, ErrorMessage):
out = o.other_args.copy()
out.pop("exc", None)
out["error"] = o.error
out["message"] = o.message
if o.events:
out["events"] = list(o.events)
return out
return str(o)
class ErrorMessage(Exception):
def __init__(self, error, message, events=None, unique_id=None, **kwargs):
super(Exception, self).__setattr__("error", error)
super(Exception, self).__setattr__("message", message)
super(Exception, self).__setattr__("unique_id", unique_id)
self.events = set() if events is None else set(events)
self.other_args = kwargs
def __repr__(self):
return "%s(error=%d, message=%s)" % (
type(self).__name__, self.error, repr(self.message)
)
def __str__(self):
if sys.version_info[0] < 3:
return self.str_err().encode('ascii', 'backslashereplace')
return self.str_err()
def str_err(self):
exc = self.other_args.get("exc", None)
if exc in (None, (None, None, None)):
exc_cause = ""
else:
exc_cause = " (cause was %s: %s)" % (exc[0].__name__, str(exc[1]))
return "Error(%s) %s%s" % (self.error, self.message, exc_cause)
def str_info(self):
arg_copy = self.other_args.copy()
arg_copy.pop("req_id", None)
arg_copy.pop("method", None)
arg_copy.pop("exc", None)
if arg_copy:
return "Detail: %s" % json.dumps(arg_copy, cls=Encoder)
return ""
def str_debug(self):
exc = self.other_args.get("exc", None)
if exc in (None, (None, None, None)):
return ""
exc_tb = exc[2]
if not exc_tb:
return ""
return "Traceback:\n" + "".join(format_tb(exc_tb))
def __getattr__(self, name):
if name in self.other_args:
return self.other_args[name]
raise AttributeError
def __setattr__(self, name, value):
if name in ("events", "exc", "other_args"):
super(Exception, self).__setattr__(name, value)
return
if name in ("error", "message", "unique_id"):
raise AttributeError("Cannot change the attribute %s" % name)
self.other_args[name] = value
class Error(Exception):
def __init__(self, method=None, req_id=None, errors=None, **kwargs):
self.method = method
self.req_id = req_id
self.errors = [kwargs] if kwargs else []
if "message" in kwargs:
kwargs.setdefault("error", 500)
self.errors = [ErrorMessage(**kwargs)]
else:
self.errors = []
if errors:
self.errors.extend(errors)
def append(self, _events=None, **kwargs):
self.errors.append(kwargs)
kwargs.setdefault("message", "No further information")
kwargs.setdefault("error", 500)
self.errors.append(ErrorMessage(**kwargs))
def get_http_err_msg(self):
try:
err = self.errors[0]["error"]
msg = self.errors[0]["message"].replace("\n", " ")
except (IndexError, KeyError):
err = self.errors[0].error
msg = self.errors[0].message
except (IndexError, AttributeError):
err = 500
msg = "There's NO self-destruction button! Ah, you've just found it..."
for e in self.errors:
next_err = e.get("error", 500)
if err != next_err:
# errors not same, round to basic err code (400, 500)
# and use the highest one
err = max(err//100, next_err//100)*100
next_msg = e.get("message", "Unknown error").replace("\n", " ")
if msg != next_msg:
msg = "Multiple errors"
return err, msg
if not all(msg == e.message for e in self.errors):
# messages not the same, get Multiple errors
msg = "Multiple errors"
if not all(err == e.error for e in self.errors):
# errors not same, round to basic err code (400, 500)
# and use the highest one
err = max(e.error for e in self.errors) // 100 * 100
msg = "".join((c if '\x20' <= c != '\x7f' else r'\x{:02x}'.format(ord(c))) for c in msg) # escape control characters
return err, msg
def __str__(self):
return "\n".join(self.str_err(e) for e in self.errors)
return "\n".join(str(e) for e in self.errors)
def log(self, logger, prio=logging.ERROR):
for e in self.errors:
logger.log(prio, self.str_err(e))
info = self.str_info(e)
logger.log(prio, e.str_err())
info = e.str_info()
if info:
logger.info(info)
debug = self.str_debug(e)
debug = e.str_debug()
if debug:
logger.debug(debug)
def str_err(self, e):
out = []
out.append("Error(%s) %s " % (e.get("error", 0), e.get("message", "Unknown error")))
if "exc" in e and e["exc"]:
out.append("(cause was %s: %s)" % (e["exc"][0].__name__, str(e["exc"][1])))
return "".join(out)
def str_info(self, e):
ecopy = dict(e) # shallow copy
ecopy.pop("req_id", None)
ecopy.pop("method", None)
ecopy.pop("error", None)
ecopy.pop("message", None)
ecopy.pop("exc", None)
if ecopy:
out = "Detail: %s" % (json.dumps(ecopy, default=lambda v: str(v)))
else:
out = ""
return out
def str_debug(self, e):
out = []
if not e.get("exc"):
return ""
exc_tb = e["exc"][2]
if exc_tb:
out.append("Traceback:\n")
out.extend(format_tb(exc_tb))
return "".join(out)
def to_dict(self):
errlist = []
for e in self.errors:
ecopy = dict(e)
ecopy.pop("exc", None)
errlist.append(ecopy)
d = {
"method": self.method,
"req_id": self.req_id,
"errors": errlist
"errors": self.errors
}
return d
......@@ -457,24 +506,26 @@ class JSONSchemaValidator(NoValidator):
res = []
for error in sorted(self.validator.iter_errors(event), key=sortkey):
res.append({
"error": 460,
"message": "Validation error: key \"%s\", value \"%s\"" % (
"/".join(str(v) for v in error.path),
error.instance
),
"expected": error.schema.get('description', 'no additional info')
})
res.append(
ErrorMessage(
460, "Validation error: key \"%s\", value \"%s\"" % (
"/".join(map(str, error.path)),
error.instance
),
expected=error.schema.get('description', 'no additional info')
)
)
return res
class MySQL(ObjectBase):
class DataBase(ObjectBase):
def __init__(
self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename):
ObjectBase.__init__(self, req, log)
self.host = host
self.user = user
self.password = password
......@@ -495,12 +546,12 @@ class MySQL(ObjectBase):
self.tagmap = json.load(tagmap_fd)
self.tagmap_other = self.tagmap["Other"] # Catch error soon, avoid lookup later
self.db = None
self.con = None
@abc.abstractmethod
def connect(self):
self.con = my.connect(
host=self.host, user=self.user, passwd=self.password,
db=self.dbname, port=self.port, cursorclass=mycursors.DictCursor)
pass
def close(self):
try:
......@@ -510,7 +561,8 @@ class MySQL(ObjectBase):
pass
self.con = None
__del__ = close
def __del__(self):
self.close()
def repeat(self):
""" Allows for graceful repeating of transactions self.retry_count
......@@ -521,8 +573,7 @@ class MySQL(ObjectBase):
for attempt in self.repeat():
with attempt as db:
crs = db.query(...)
# do something with crs
res = db.query_all(...)
Note that it's not reentrant (as is not underlying MySQL
connection), so avoid nesting on the same MySQL object.
......@@ -540,8 +591,7 @@ class MySQL(ObjectBase):
exception. Can be used with self.repeat(), or alone as:
with self as db:
crs = db.query(...)
# do something with crs
res = db.query_all(...)
Note that it's not reentrant (as is not underlying MySQL
connection), so avoid nesting on the same MySQL object.
......@@ -557,24 +607,25 @@ class MySQL(ObjectBase):
open transaction is rolled back.
In case of no exception, transaction gets commited.
"""
if not exc_type:
if exc_type is None:
self.con.commit()
self.retry_attempt = 0
else:
try:
if self.con:
if self.con is not None:
self.con.rollback()
except my.Error:
except self.db.Error:
pass
try:
self.close()
except my.Error:
except self.db.Error:
pass
if self.retry_attempt:
self.log.info("Database error (%d attempts left): %s %s" % (self.retry_attempt, exc_type.__name__, exc_val))
if self.retry_attempt > 0:
self.log.info("Database error (%d attempts left): %s %s" %
(self.retry_attempt, exc_type.__name__, exc_val))
return True
def query(self, *args, **kwargs):
def _query(self, *args, **kwargs):
if not self.con:
self.connect()
crs = self.con.cursor()
......@@ -582,85 +633,105 @@ class MySQL(ObjectBase):
crs.execute(*args, **kwargs)
return crs
def _query_multiple(self, query, params, ret, fetch):
res = None
for n, (q, p) in enumerate(zip(query, params)):
cur = self._query(q, p)
if n == ret:
res = fetch(cur)
if ret == -1: # fetch the result of the last query
res = fetch(cur)
return res
def execute(self, query, params, ret=None):
"""Execute the provided queries; discard the result"""
self._query_multiple(query, params, None, None)
def query_all(self, query, params, ret=-1):
"""Execute the provided queries; return list of all rows as dicts of the ret-th query (0 based)"""
return self._query_multiple(query, params, ret, lambda cur: cur.fetchall())
def query_one(self, query, params, ret=-1):
"""Execute the provided queries; return the first result of the ret-th query (0 based)"""
return self._query_multiple(query, params, ret, lambda cur: cur.fetchone())
def query_rowcount(self, query, params, ret=-1):
"""Execute provided query; return the number of affected rows or the number of returned rows of the ret-th query (0 based)"""
return self._query_multiple(query, params, ret, lambda cur: cur.rowcount)
def _get_comma_perc(self, l):
return ','.join(['%s'] * len(l))
return ",".join(repeat("%s", l if isinstance(l, int) else len(l)))
def _get_comma_perc_n(self, n, l):
return ", ".join(repeat("(%s)" % self._get_comma_perc(n), len(l)))
def _get_not(self, b):
return "" if b else "NOT"
@abc.abstractmethod
def _build_get_client_by_name(self, cert_names, name, secret):
"""Build query and params for client lookup"""
def get_client_by_name(self, cert_names=None, name=None, secret=None):
query = ["SELECT * FROM clients WHERE valid = 1"]
params = []
if name:
query.append(" AND name = %s")
params.append(name.lower())
if secret:
query.append(" AND secret = %s")
params.append(secret)
if cert_names:
query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
params.extend(n.lower() for n in cert_names)
query, params, ret = self._build_get_client_by_name(cert_names, name, secret)
for attempt in self.repeat():
with attempt as db:
rows = db.query("".join(query), params).fetchall()
rows = db.query_all(query, params, ret)
if len(rows) > 1:
self.log.warning(
"get_client_by_name: query returned more than one result (cert_names = %s, name = %s, secret = %s): %s" % (
cert_names, name, secret, ", ".join([str(Client(**row)) for row in rows])))
"get_client_by_name: query returned more than one result (cert_names = %s, name = %s, secret = %s): %s" %
(cert_names, name, secret, ", ".join([str(Client(**row)) for row in rows]))
)
return None
return Client(**rows[0]) if rows else None
@abc.abstractmethod
def _build_get_clients(self, id):
"""Build query and params for client lookup by id"""
def get_clients(self, id=None):
query = ["SELECT * FROM clients"]
params = []
if id:
query.append("WHERE id = %s")
params.append(id)
query.append("ORDER BY id")
query, params, ret = self._build_get_clients(id)
for attempt in self.repeat():
with attempt as db:
rows = db.query(" ".join(query), params).fetchall()
rows = db.query_all(query, params, ret=ret)
return [Client(**row) for row in rows]
@abc.abstractmethod
def _build_add_modify_client(self, id, **kwargs):
"""Build query and params for adding/modifying client"""
def add_modify_client(self, id=None, **kwargs):
query = []
params = []
uquery = []
if id is None:
query.append("INSERT INTO clients SET")
uquery.append("registered = now()")
else:
query.append("UPDATE clients SET")
for attr in set(Client._fields) - set(["id", "registered"]):
val = kwargs.get(attr, None)
if val is not None:
if attr == "secret" and val == "": # disable secret
val = None
uquery.append("`%s` = %%s" % attr)
params.append(val)
if not uquery:
if id is not None and all(kwargs.get(attr, None) is None for attr in set(Client._fields) - {"id", "registered"}):
return id
query.append(", ".join(uquery))
if id is not None:
query.append("WHERE id = %s")
params.append(id)
query, params, ret = self._build_add_modify_client(id, **kwargs)
for attempt in self.repeat():
with attempt as db:
crs = db.query(" ".join(query), params)
newid = crs.lastrowid if id is None else id
res_id = db.query_one(query, params, ret=ret)["id"]
newid = res_id if id is None else id
return newid
@abc.abstractmethod
def _build_get_debug_version(self):
pass
@abc.abstractmethod
def _build_get_debug_tablestat(self):
pass
def get_debug(self):
vquery, vparams, vret = self._build_get_debug_version()
tquery, tparams, tret = self._build_get_debug_tablestat()
for attempt in self.repeat():
with attempt as db:
rows = db.query("SELECT VERSION() AS VER").fetchall()
tablestat = db.query("SHOW TABLE STATUS").fetchall()
return {
"db": "MySQL",
"version": rows[0]["VER"],
"tables": tablestat
"db": type(self).__name__,
"version": db.query_one(vquery, vparams, vret)["version"],
"tables": db.query_all(tquery, tparams, tret)
}
def getMaps(self, section, variables):
......@@ -671,10 +742,21 @@ class MySQL(ObjectBase):
except KeyError:
raise self.req.error(
message="Wrong tag or category used in query.",
error=422, exc=sys.exc_info(), key=v)
error=422, exc=sys.exc_info(), key=v
)
maps.append(mapped)
return set(maps) # unique
@abc.abstractmethod
def _build_fetch_events(
self, client, id, count,
cat, nocat, tag, notag, group, nogroup):
"""Build query and params for fetching events based on id, count and category, tag and group filters"""
@abc.abstractmethod
def _load_event_json(self, data):
"""Return decoded json from data loaded from database, if unable to decode, return None"""
def fetch_events(
self, client, id, count,
cat=None, nocat=None,
......@@ -694,43 +776,16 @@ class MySQL(ObjectBase):
message="Unrealizable conditions. Choose group or nogroup option.",
error=422, group=group, nogroup=nogroup)
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
params = [id or 0]
if cat or nocat:
cats = self.getMaps(self.catmap, (cat or nocat))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" % (
self._get_not(cat), self._get_comma_perc(cats)))
params.extend(cats)
if tag or notag:
tags = self.getMaps(self.tagmap, (tag or notag))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" % (
self._get_not(tag), self._get_comma_perc(tags)))
params.extend(tags)
if group or nogroup:
subquery = []
for name in (group or nogroup):
escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE
subquery.append("c.name = %s") # exact client
params.append(name)
subquery.append("c.name LIKE CONCAT(%s, '.%%') ESCAPE '&'") # whole subtree
params.append(escaped_name)
query.append(" AND %s (%s)" % (self._get_not(group), " OR ".join(subquery)))
query.append(" AND e.valid = 1 LIMIT %s")
params.append(count)
query_string = "".join(query)
query, params, ret = self._build_fetch_events(
client, id, count,
cat, nocat,
tag, notag,
group, nogroup
)
row = None
for attempt in self.repeat():
with attempt as db:
row = db.query(query_string, params).fetchall()
row = db.query_all(query, params, ret=ret)
if row:
maxid = max(r['id'] for r in row)
......@@ -739,9 +794,8 @@ class MySQL(ObjectBase):
events = []
for r in row:
try:
e = json.loads(r["data"])
except Exception:
e = self._load_event_json(r["data"])
if e is None: # null cannot be valid event JSON
# Note that we use Error object just for proper formatting,
# but do not raise it; from client perspective invalid
# events get skipped silently.
......@@ -749,114 +803,685 @@ class MySQL(ObjectBase):
message="Unable to deserialize JSON event from db, id=%s" % r["id"],
error=500, exc=sys.exc_info(), id=r["id"])
err.log(self.log, prio=logging.WARNING)
events.append(e)
else:
events.append(e)
return {
"lastid": maxid,
"events": events
}
@abc.abstractmethod
def _build_store_events_event(self, client, event, raw_event):
"""Build query and params for event insertion"""
@abc.abstractmethod
def _build_store_events_categories(self, event_id, cat_ids):
"""Build query and params for insertion of event-categories mapping"""
@abc.abstractmethod
def _build_store_events_tags(self, event_id, tag_ids):
"""Build query and params for insertion of event-tags mapping"""
def store_events(self, client, events, events_raw):
try:
for attempt in self.repeat():
with attempt as db:
for event, raw_event in zip(events, events_raw):
lastid = db.query(
"INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)",
(client.id, raw_event)).lastrowid
equery, eparams, eret = self._build_store_events_event(client, event, raw_event)
lastid = db.query_one(equery, eparams, ret=eret)["id"]
catlist = event.get('Category', ["Other"])
cats = set(catlist) | set(cat.split(".", 1)[0] for cat in catlist)
for cat in cats:
cat_id = self.catmap.get(cat, self.catmap_other)
db.query("INSERT INTO event_category_mapping (event_id,category_id) VALUES (%s, %s)", (lastid, cat_id))
cats = set(catlist) | {cat.split(".", 1)[0] for cat in catlist}
cat_ids = [self.catmap.get(cat, self.catmap_other) for cat in cats]
cquery, cparams, _ = self._build_store_events_categories(lastid, cat_ids)
db.execute(cquery, cparams)
nodes = event.get('Node', [])
tags = []
for node in nodes:
tags.extend(node.get('Type', []))
for tag in set(tags):
tag_id = self.tagmap.get(tag, self.tagmap_other)
db.query("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES (%s, %s)", (lastid, tag_id))
tags = {tag for node in nodes for tag in node.get('Type', [])}
if tags:
tag_ids = [self.tagmap.get(tag, self.tagmap_other) for tag in tags]
tquery, tparams, _ = self._build_store_events_tags(lastid, tag_ids)
db.execute(tquery, tparams)
return []
except Exception as e:
exception = self.req.error(message="DB error", error=500, exc=sys.exc_info(), env=self.req.env)
exception.log(self.log)
return [{"error": 500, "message": "DB error %s" % type(e).__name__}]
return [ErrorMessage(500, "DB error %s" % type(e).__name__)]
@abc.abstractmethod
def _build_insert_last_received_id(self, client, id):
"""Build query and params for insertion of the last event id received by client"""
def insertLastReceivedId(self, client, id):
self.log.debug("insertLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname))
query, params, _ = self._build_insert_last_received_id(client, id)
for attempt in self.repeat():
with attempt as db:
db.query("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())", (client.id, id))
db.execute(query, params)
@abc.abstractmethod
def _build_get_last_event_id(self):
"""Build query and params for querying the id of the last inserted event"""
def getLastEventId(self):
query, params, ret = self._build_get_last_event_id()
for attempt in self.repeat():
with attempt as db:
row = db.query("SELECT MAX(id) as id FROM events").fetchall()[0]
return row['id'] or 1
id_ = db.query_one(query, params, ret=ret)["id"]
return id_ or 1
@abc.abstractmethod
def _build_get_last_received_id(self, client):
"""Build query and params for querying the last event id received by client"""
def getLastReceivedId(self, client):
query, params, ret = self._build_get_last_received_id(client)
for attempt in self.repeat():
with attempt as db:
res = db.query(
"SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1",
(client.id,)).fetchall()
try:
row = res[0]
except IndexError:
res = db.query_one(query, params, ret=ret)
if res is None:
id = None
self.log.debug("getLastReceivedId: probably first access, unable to get id for client %i(%s)" % (
client.id, client.hostname))
self.log.debug("getLastReceivedId: probably first access, unable to get id for client %i(%s)" %
(client.id, client.hostname))
else:
id = row["id"]
self.log.debug("getLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname))
id = res["id"] or 1
self.log.debug("getLastReceivedId: id %i for client %i(%s)" %
(id, client.id, client.hostname))
return id
@abc.abstractmethod
def _build_load_maps_tags(self):
"""Build query and params for updating the tag map"""
@abc.abstractmethod
def _build_load_maps_cats(self):
"""Build query and params for updating the catetgory map"""
def load_maps(self):
tquery, tparams, _ = self._build_load_maps_tags()
cquery, cparams, _ = self._build_load_maps_cats()
with self as db:
db.query("DELETE FROM tags")
for tag, num in self.tagmap.items():
db.query("INSERT INTO tags(id, tag) VALUES (%s, %s)", (num, tag))
db.query("DELETE FROM categories")
for cat_subcat, num in self.catmap.items():
catsplit = cat_subcat.split(".", 1)
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
db.query(
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES (%s, %s, %s, %s)",
(num, category, subcategory, cat_subcat))
db.execute(tquery, tparams)
db.execute(cquery, cparams)
@abc.abstractmethod
def _build_purge_lastlog(self, days):
"""Build query and params for purging stored client last event mapping older than days"""
def purge_lastlog(self, days):
query, params, ret = self._build_purge_lastlog(days)
with self as db:
return db.query_rowcount(query, params, ret=ret)
@abc.abstractmethod
def _build_purge_events_get_id(self, days):
"""Build query and params to get largest event id of events older than days"""
@abc.abstractmethod
def _build_purge_events_events(self, id_):
"""Build query and params to remove events older then days and their mappings"""
def purge_events(self, days):
iquery, iparams, iret = self._build_purge_events_get_id(days)
with self as db:
return db.query(
id_ = db.query_one(iquery, iparams, ret=iret)["id"]
if id_ is None:
return 0
equery, eparams, eret = self._build_purge_events_events(id_)
affected = db.query_rowcount(equery, eparams, ret=eret)
return affected
DataBase = abc.ABCMeta("DataBase", (DataBase,), {})
class MySQL(DataBase):
def __init__(
self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename):
super(DataBase, self).__init__(req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename)
import MySQLdb as db
import MySQLdb.cursors as mycursors
self.db = db
self.mycursors = mycursors
def connect(self):
self.con = self.db.connect(
host=self.host, user=self.user, passwd=self.password,
db=self.dbname, port=self.port, cursorclass=self.mycursors.DictCursor)
def _build_get_client_by_name(self, cert_names=None, name=None, secret=None):
"""Build query and params for client lookup"""
query = ["SELECT * FROM clients WHERE valid = 1"]
params = []
if name:
query.append(" AND name = %s")
params.append(name.lower())
if secret:
query.append(" AND secret = %s")
params.append(secret)
if cert_names:
query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
params.extend(n.lower() for n in cert_names)
return ["".join(query)], [params], 0
def _build_get_clients(self, id):
"""Build query and params for client lookup by id"""
query = ["SELECT * FROM clients"]
params = []
if id:
query.append("WHERE id = %s")
params.append(id)
query.append("ORDER BY id")
return [" ".join(query)], [params], 0
def _build_add_modify_client(self, id, **kwargs):
"""Build query and params for adding/modifying client"""
query = []
params = []
uquery = []
if id is None:
query.append("INSERT INTO clients SET")
uquery.append("registered = now()")
else:
query.append("UPDATE clients SET")
for attr in set(Client._fields) - set(["id", "registered"]):
val = kwargs.get(attr, None)
if val is not None: # guaranteed at least one is not None
if attr == "secret" and val == "": # disable secret
val = None
uquery.append("`%s` = %%s" % attr)
params.append(val)
query.append(", ".join(uquery))
if id is not None:
query.append("WHERE id = %s")
params.append(id)
return (
[" ".join(query), 'SELECT LAST_INSERT_ID() AS id'],
[params, []],
1
)
def _build_get_debug_version(self):
return ["SELECT VERSION() AS version"], [()], 0
def _build_get_debug_tablestat(self):
return ["SHOW TABLE STATUS"], [()], 0
def _load_event_json(self, data):
"""Return decoded json from data loaded from database, if unable to decode, return None"""
try:
return json.loads(data)
except Exception:
return None
def _build_fetch_events(
self, client, id, count,
cat, nocat, tag, notag, group, nogroup):
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
params = [id or 0]
if cat or nocat:
cats = self.getMaps(self.catmap, (cat or nocat))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" %
(self._get_not(cat), self._get_comma_perc(cats))
)
params.extend(cats)
if tag or notag:
tags = self.getMaps(self.tagmap, (tag or notag))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" %
(self._get_not(tag), self._get_comma_perc(tags))
)
params.extend(tags)
if group or nogroup:
subquery = []
for name in (group or nogroup):
escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE
subquery.append("c.name = %s") # exact client
params.append(name)
subquery.append("c.name LIKE CONCAT(%s, '.%%') ESCAPE '&'") # whole subtree
params.append(escaped_name)
query.append(" AND %s (%s)" %
(self._get_not(group), " OR ".join(subquery)))
query.append(" AND e.valid = 1 LIMIT %s")
params.append(count)
return ["".join(query)], [params], 0
def _build_store_events_event(self, client, event, raw_event):
"""Build query and params for event insertion"""
return (
[
"INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)",
"SELECT LAST_INSERT_ID() AS id"
],
[(client.id, raw_event), ()],
1
)
def _build_store_events_categories(self, event_id, cat_ids):
"""Build query and params for insertion of event-categories mapping"""
return (
["INSERT INTO event_category_mapping (event_id,category_id) VALUES " +
self._get_comma_perc_n(2, cat_ids)],
[tuple(param for cat_id in cat_ids for param in (event_id, cat_id))],
None
)
def _build_store_events_tags(self, event_id, tag_ids):
"""Build query and params for insertion of event-tags mapping"""
return (
["INSERT INTO event_tag_mapping (event_id,tag_id) VALUES " +
self._get_comma_perc_n(2, tag_ids)],
[tuple(param for tag_id in tag_ids for param in (event_id, tag_id))],
None
)
def _build_insert_last_received_id(self, client, id):
"""Build query and params for insertion of the last event id received by client"""
return (
["INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())"],
[(client.id, id)],
None
)
def _build_get_last_event_id(self):
"""Build query and params for querying the id of the last inserted event"""
return ["SELECT MAX(id) as id FROM events"], [()], 0
def _build_get_last_received_id(self, client):
"""Build query and params for querying the last event id received by client"""
return (
["SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1"],
[(client.id,)],
0
)
def _build_load_maps_tags(self):
"""Build query and params for updating the tag map"""
return (
[
"DELETE FROM tags",
"INSERT INTO tags(id, tag) VALUES " +
self._get_comma_perc_n(2, self.tagmap)
],
[
(),
tuple(param for tag, num in self.tagmap.items() for param in (num, tag))
],
None
)
def _build_load_maps_cats(self):
"""Build query and params for updating the catetgory map"""
params = []
for cat_subcat, num in self.catmap.items():
catsplit = cat_subcat.split(".", 1)
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
params.extend((num, category, subcategory, cat_subcat))
return (
[
"DELETE FROM categories",
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES " +
self._get_comma_perc_n(4, self.catmap)
],
[
(),
tuple(params)
],
None
)
def _build_purge_lastlog(self, days):
"""Build query and params for purging stored client last event mapping older than days"""
return (
[
"DELETE FROM last_events "
" USING last_events LEFT JOIN ("
" SELECT MAX(id) AS last FROM last_events"
" GROUP BY client_id"
" ) AS maxids ON last=id"
" WHERE timestamp < DATE_SUB(CURDATE(), INTERVAL %s DAY) AND last IS NULL",
(days,)).rowcount
],
[(days,)],
0
)
def _build_purge_events_get_id(self, days):
"""Build query and params to get largest event id of events older than days"""
return (
[
"SELECT MAX(id) as id"
" FROM events"
" WHERE received < DATE_SUB(CURDATE(), INTERVAL %s DAY)"
],
[(days,)],
0
)
def _build_purge_events_events(self, id_):
"""Build query and params to remove events older then days and their mappings"""
return (
[
"DELETE FROM event_category_mapping WHERE event_id <= %s",
"DELETE FROM event_tag_mapping WHERE event_id <= %s",
"DELETE FROM events WHERE id <= %s",
],
[(id_,), (id_,), (id_,)],
2
)
class PostgreSQL(DataBase):
def purge_events(self, days):
with self as db:
affected = 0
id_ = db.query(
def __init__(
self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename):
super(DataBase, self).__init__(req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename)
import psycopg2 as db
from psycopg2 import sql as ppgsql
import psycopg2.extras as ppgextra
self.db = db
self.ppgsql = ppgsql
self.ppgextra = ppgextra
def connect(self):
self.con = self.db.connect(
host=self.host, user=self.user, password=self.password,
dbname=self.dbname, port=self.port, cursor_factory=self.ppgextra.RealDictCursor)
def _build_get_client_by_name(self, cert_names=None, name=None, secret=None):
"""Build query and params for client lookup"""
query = ["SELECT * FROM clients WHERE valid"]
params = []
if name:
query.append(" AND name = %s")
params.append(name.lower())
if secret:
query.append(" AND secret = %s")
params.append(secret)
if cert_names:
query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
params.extend(n.lower() for n in cert_names)
return ["".join(query)], [params], 0
def _build_get_clients(self, id):
"""Build query and params for client lookup by id"""
query = ["SELECT * FROM clients"]
params = []
if id:
query.append("WHERE id = %s")
params.append(id)
query.append("ORDER BY id")
return [" ".join(query)], [params], 0
def _build_add_modify_client(self, id, **kwargs):
"""Build query and params for adding/modifying client"""
fields = set(Client._fields) - {"id", "registered"}
cols, params = map(
list,
zip(
*(
(k, None) # disable secret
if k == "secret" and v == "" else
(k, v)
for k, v in kwargs.items()
if v is not None and k in fields
)
)
)
if id is None:
query = self.ppgsql.SQL('INSERT INTO clients ("registered", {}) VALUES (NOW(), {}) RETURNING id').format(
self.ppgsql.SQL(", ").join(map(self.ppgsql.Identifier, cols)),
self.ppgsql.SQL(", ").join(self.ppgsql.Placeholder() * len(cols))
)
elif not cols:
return ["SELECT %s AS id"], [(id,)], 0
else:
query = self.ppgsql.SQL("UPDATE clients SET {} WHERE id = {} RETURNING id").format(
self.ppgsql.SQL(", ").join(
self.ppgsql.SQL("{} = {}").format(
self.ppgsql.Identifier(col),
self.ppgsql.Placeholder()
) for col in cols
),
self.ppgsql.Placeholder()
)
params.append(id)
return [query], [params], 0
def _build_get_debug_version(self):
return ["SELECT setting AS version FROM pg_settings WHERE name = 'server_version'"], [()], 0
def _build_get_debug_tablestat(self):
return [
"SELECT "
'tablename AS "Name", '
'relnatts AS "Columns", '
'n_live_tup AS "Rows", '
'pg_catalog.pg_size_pretty(pg_catalog.pg_table_size(oid)) AS "Table_size", '
'pg_catalog.pg_size_pretty(pg_catalog.pg_indexes_size(oid)) AS "Index_size", '
'coll.collations AS "Collations" '
"FROM "
"pg_catalog.pg_tables tbls "
"LEFT OUTER JOIN pg_catalog.pg_class cls "
"ON tbls.tablename=cls.relname "
"LEFT OUTER JOIN pg_catalog.pg_stat_user_tables sut "
"ON tbls.tablename=sut.relname "
"LEFT OUTER JOIN ("
"SELECT "
"table_name, "
"string_agg("
"DISTINCT COALESCE("
"collation_name, "
"("
"SELECT "
"datcollate "
"FROM "
"pg_catalog.pg_database "
"WHERE "
"datname=%s"
")"
"), "
"','"
") AS collations "
"FROM "
"information_schema.columns "
"GROUP BY "
"table_name"
") coll "
"ON tbls.tablename=coll.table_name "
"WHERE "
"tbls.schemaname='public' "
"AND tbls.tableowner=%s"
], [(self.dbname, self.user)], 0
def _load_event_json(self, data):
"""Return decoded json from data loaded from database, if unable to decode, return None"""
try:
return json.loads(data.tobytes())
except Exception:
return None
def _build_fetch_events(
self, client, id, count,
cat, nocat, tag, notag, group, nogroup):
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
params = [id or 0]
if cat or nocat:
cats = self.getMaps(self.catmap, (cat or nocat))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" %
(self._get_not(cat), self._get_comma_perc(cats))
)
params.extend(cats)
if tag or notag:
tags = self.getMaps(self.tagmap, (tag or notag))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" %
(self._get_not(tag), self._get_comma_perc(tags))
)
params.extend(tags)
if group or nogroup:
subquery = []
for name in group or nogroup:
name = name.lower() # assumes only lowercase names
escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE
subquery.append("c.name = %s") # exact client
params.append(name)
subquery.append("c.name LIKE %s || '.%%' ESCAPE '&'") # whole subtree
params.append(escaped_name)
query.append(" AND %s (%s)" % (self._get_not(group), " OR ".join(subquery)))
query.append(" AND e.valid LIMIT %s")
params.append(count)
return ["".join(query)], [params], 0
def _build_store_events_event(self, client, event, raw_event):
"""Build query and params for event insertion"""
return (
["INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s) RETURNING id"],
[(client.id, self.db.Binary(raw_event.encode('utf8')))],
0
)
def _build_store_events_categories(self, event_id, cat_ids):
"""Build query and params for insertion of event-categories mapping"""
return (
["INSERT INTO event_category_mapping (event_id,category_id) VALUES " +
self._get_comma_perc_n(2, cat_ids)],
[tuple(param for cat_id in cat_ids for param in (event_id, cat_id))],
None
)
def _build_store_events_tags(self, event_id, tag_ids):
"""Build query and params for insertion of event-tags mapping"""
return (
["INSERT INTO event_tag_mapping (event_id,tag_id) VALUES " +
self._get_comma_perc_n(2, tag_ids)],
[tuple(param for tag_id in tag_ids for param in (event_id, tag_id))],
None
)
def _build_insert_last_received_id(self, client, id):
"""Build query and params for insertion of the last event id received by client"""
return (
["INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())"],
[(client.id, None if id == 1 else id)],
None
)
def _build_get_last_event_id(self):
"""Build query and params for querying the id of the last inserted event"""
return ["SELECT MAX(id) as id FROM events"], [()], 0
def _build_get_last_received_id(self, client):
"""Build query and params for querying the last event id received by client"""
return (
["SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1"],
[(client.id,)],
0
)
def _build_load_maps_tags(self):
"""Build query and params for updating the tag map"""
return (
[
"ALTER TABLE event_tag_mapping DROP CONSTRAINT event_tag_mapping_tag_id_fk",
"DELETE FROM tags",
"INSERT INTO tags(id, tag) VALUES " +
self._get_comma_perc_n(2, self.tagmap),
'ALTER TABLE event_tag_mapping ADD CONSTRAINT "event_tag_mapping_tag_id_fk" FOREIGN KEY ("tag_id") REFERENCES "tags" ("id")'
],
[(), (), tuple(param for tag, num in self.tagmap.items() for param in (num, tag)), ()],
None
)
def _build_load_maps_cats(self):
"""Build query and params for updating the catetgory map"""
params = []
for cat_subcat, num in self.catmap.items():
catsplit = cat_subcat.split(".", 1)
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
params.extend((num, category, subcategory, cat_subcat))
return (
[
"ALTER TABLE event_category_mapping DROP CONSTRAINT event_category_mapping_category_id_fk",
"DELETE FROM categories",
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES " +
self._get_comma_perc_n(4, self.catmap),
'ALTER TABLE event_category_mapping ADD CONSTRAINT "event_category_mapping_category_id_fk" FOREIGN KEY ("category_id") REFERENCES "categories" ("id")'
],
[(), (), tuple(params), ()],
None
)
def _build_purge_lastlog(self, days):
"""Build query and params for purging stored client last event mapping older than days"""
return (
[
"DELETE FROM last_events "
" USING last_events le LEFT JOIN ("
" SELECT MAX(id) AS last FROM last_events"
" GROUP BY client_id"
" ) AS maxids ON maxids.last=le.id"
" WHERE le.timestamp < CURRENT_DATE - INTERVAL %s DAY AND maxids.last IS NULL"
],
[(str(days),)],
0
)
def _build_purge_events_get_id(self, days):
"""Build query and params to get largest event id of events older than days"""
return (
[
"SELECT MAX(id) as id"
" FROM events"
" WHERE received < DATE_SUB(CURDATE(), INTERVAL %s DAY)",
(days,)
).fetchall()[0]["id"]
if id_ is None:
return 0
affected = db.query("DELETE FROM events WHERE id <= %s", (id_,)).rowcount
db.query("DELETE FROM event_category_mapping WHERE event_id <= %s", (id_,))
db.query("DELETE FROM event_tag_mapping WHERE event_id <= %s", (id_,))
return affected
" WHERE received < CURRENT_DATE - INTERVAL %s DAY"
],
[(str(days),)],
0
)
def _build_purge_events_events(self, id_):
"""Build query and params to remove events older then days and their mappings"""
return ["DELETE FROM events WHERE id <= %s"], [(id_,)], 0
def expose(read=1, write=0, debug=0):
def expose(read=True, write=False, debug=False):
def expose_deco(meth):
meth.exposed = True
......@@ -949,7 +1574,7 @@ class Server(ObjectBase):
if exception:
status = "%d %s" % exception.get_http_err_msg()
output = json.dumps(exception.to_dict(), default=lambda v: str(v))
output = json.dumps(exception, cls=Encoder)
exception.log(self.log)
# Make sure everything is properly encoded - JSON and various function
......@@ -989,9 +1614,7 @@ def json_wrapper(method):
result = method(self, **args) # call requested method
try:
# 'default': takes care of non JSON serializable objects,
# which could (although shouldn't) appear in handler code
output = json.dumps(result, default=lambda v: str(v))
output = json.dumps(result, cls=Encoder)
except Exception as e:
raise self.req.error(message="Serialization error", error=500, exc=sys.exc_info(), args=str(result))
......@@ -1019,7 +1642,7 @@ class WardenHandler(ObjectBase):
self.get_events_limit = get_events_limit
self.description = description
@expose(read=1, debug=1)
@expose(read=True, debug=True)
@json_wrapper
def getDebug(self):
return {
......@@ -1043,7 +1666,7 @@ class WardenHandler(ObjectBase):
}
}
@expose(read=1)
@expose(read=True)
@json_wrapper
def getInfo(self):
info = {
......@@ -1055,7 +1678,7 @@ class WardenHandler(ObjectBase):
info["description"] = self.description
return info
@expose(read=1)
@expose(read=True)
@json_wrapper
def getEvents(
self, id=None, count=None,
......@@ -1106,69 +1729,105 @@ class WardenHandler(ObjectBase):
return res
def check_node(self, event, name):
def check_node(self, event, event_indx, name):
try:
ev_id = event['Node'][0]['Name'].lower()
except (KeyError, TypeError, IndexError):
# Event does not bear valid Node attribute
return [{"error": 422, "message": "Event does not bear valid Node attribute"}]
return [
ErrorMessage(422, "Event does not bear valid Node attribute", {event_indx})
]
if ev_id != name:
return [{"error": 422, "message": "Node does not correspond with saving client"}]
return [
ErrorMessage(422, "Node does not correspond with saving client", {event_indx})
]
return []
def add_event_nums(self, ilist, events, errlist):
for err in errlist:
err.setdefault("events", []).extend(ilist)
ev_ids = err.setdefault("events_id", [])
for i in ilist:
event = events[i]
try:
id = event["ID"]
except (KeyError, TypeError, ValueError):
id = None
ev_ids.append(id)
return errlist
@expose(write=1)
def check_idea_id(self, event, event_indx):
id_length_limit = 64
try:
id_ = event["ID"]
except (KeyError, TypeError, ValueError):
return [ErrorMessage(422, "Missing IDEA ID", {event_indx})]
if not isinstance(id_, unicode) or len(id_) == 0:
return [ErrorMessage(422, "The provided IDEA ID is invalid", {event_indx})]
errors = []
if len(id_) > id_length_limit:
errors.append(
ErrorMessage(
422, "The provided event ID is too long",
{event_indx}, id_length_limit=id_length_limit
)
)
if '\x00' in id_:
errors.append(ErrorMessage(422, "IDEA ID cannot contain null bytes", {event_indx}))
return errors
def add_errors(self, errs_to_add):
for err in errs_to_add:
self.errs.setdefault((err.error, err.message, err.unique_id), err).events.update(err.events)
@expose(write=True)
@json_wrapper
def sendEvents(self, events=[]):
if not isinstance(events, list):
raise self.req.error(message="List of events expected.", error=400)
errs = []
self.errs = {}
if len(events) > self.send_events_limit:
errs.extend(self.add_event_nums(range(self.send_events_limit, len(events)), events, [
{"error": 507, "message": "Too much events in one batch.", "send_events_limit": self.send_events_limit}]))
self.add_errors(
[
ErrorMessage(
507, "Too many events in one batch.",
set(range(self.send_events_limit, len(events))),
send_events_limit=self.send_events_limit
)
]
)
saved = 0
events_tosend = []
events_raw = []
events_nums = []
for i, event in enumerate(events[0:self.send_events_limit]):
v_errs = self.validator.check(event)
if v_errs:
errs.extend(self.add_event_nums([i], events, v_errs))
self.add_errors(v_errs)
continue
node_errs = self.check_node(event, self.req.client.name)
idea_id_errs = self.check_idea_id(event, i)
if idea_id_errs:
self.add_errors(idea_id_errs)
continue
node_errs = self.check_node(event, i, self.req.client.name)
if node_errs:
errs.extend(self.add_event_nums([i], events, node_errs))
self.add_errors(node_errs)
continue
if self.req.client.test and 'Test' not in event.get('Category', []):
errs.extend(
self.add_event_nums([i], events, [{
"error": 422,
"message": "You're allowed to send only messages, containing \"Test\" among categories.",
"categories": event.get('Category', [])}]))
self.add_errors(
[
ErrorMessage(
422, "You're allowed to send only messages containing \"Test\" among categories.", {i},
# Ensure that 1the error message is contained for every combination of categories
unique_id=tuple(event.get('Category', [])),
categories=event.get('Category', [])
)
]
)
continue
raw_event = json.dumps(event)
if len(raw_event) >= self.db.event_size_limit:
errs.extend(
self.add_event_nums([i], events, [
{"error": 413, "message": "Event too long (>%i B)" % self.db.event_size_limit}
]))
self.add_errors(
[
ErrorMessage(
413, "Event too long (>%i B)" % self.db.event_size_limit, {i},
event_size_limit = self.db.event_size_limit
)
]
)
continue
events_tosend.append(event)
......@@ -1176,15 +1835,13 @@ class WardenHandler(ObjectBase):
events_nums.append(i)
db_errs = self.db.store_events(self.req.client, events_tosend, events_raw)
if db_errs:
errs.extend(self.add_event_nums(events_nums, events_tosend, db_errs))
saved = 0
else:
saved = len(events_tosend)
self.add_errors(db_errs)
saved = 0 if db_errs else len(events_tosend)
self.log.info("Saved %i events" % saved)
if errs:
raise self.req.error(errors=errs)
if self.errs:
raise self.req.error(errors=self.errs.values())
return {"saved": saved}
......@@ -1245,7 +1902,7 @@ section_order = ("log", "db", "auth", "validator", "handler", "server")
# "type" keyword in section may be used to choose other
section_def = {
"log": [FileLogger, SysLogger],
"db": [MySQL],
"db": [MySQL, PostgreSQL],
"auth": [X509NameAuthenticator, PlainAuthenticator, X509Authenticator, X509MixMatchAuthenticator],
"validator": [JSONSchemaValidator, NoValidator],
"handler": [WardenHandler],
......@@ -1305,8 +1962,22 @@ param_def = {
"retry_pause": {"type": "natural", "default": 3},
"retry_count": {"type": "natural", "default": 3},
"event_size_limit": {"type": "natural", "default": 5*1024*1024},
"catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_mysql.json")},
"tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_mysql.json")}
"catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
"tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
},
PostgreSQL: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"host": {"type": "str", "default": "localhost"},
"user": {"type": "str", "default": "warden"},
"password": {"type": "str", "default": ""},
"dbname": {"type": "str", "default": "warden3"},
"port": {"type": "natural", "default": 5432},
"retry_pause": {"type": "natural", "default": 3},
"retry_count": {"type": "natural", "default": 3},
"event_size_limit": {"type": "natural", "default": 5*1024*1024},
"catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
"tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
},
WardenHandler: {
"req": {"type": "obj", "default": "req"},
......@@ -1453,11 +2124,11 @@ def list_clients(id=None):
def register_client(**kwargs):
# argparse does _always_ return something, so we cannot rely on missing arguments
if kwargs["valid"] is None: kwargs["valid"] = 1
if kwargs["read"] is None: kwargs["read"] = 1
if kwargs["write"] is None: kwargs["write"] = 0
if kwargs["debug"] is None: kwargs["debug"] = 0
if kwargs["test"] is None: kwargs["test"] = 1
if kwargs["valid"] is None: kwargs["valid"] = True
if kwargs["read"] is None: kwargs["read"] = True
if kwargs["write"] is None: kwargs["write"] = False
if kwargs["debug"] is None: kwargs["debug"] = False
if kwargs["test"] is None: kwargs["test"] = True
return modify_client(id=None, **kwargs)
......@@ -1562,33 +2233,33 @@ def add_client_args(subargp, mod=False):
reg_valid = subargp.add_mutually_exclusive_group(required=False)
reg_valid.add_argument(
"--valid", action="store_const", const=1, default=None,
"--valid", action="store_const", const=True, default=None,
help="valid client (default)")
reg_valid.add_argument("--novalid", action="store_const", const=0, dest="valid", default=None)
reg_valid.add_argument("--novalid", action="store_const", const=False, dest="valid", default=None)
reg_read = subargp.add_mutually_exclusive_group(required=False)
reg_read.add_argument(
"--read", action="store_const", const=1, default=None,
"--read", action="store_const", const=True, default=None,
help="client is allowed to read (default)")
reg_read.add_argument("--noread", action="store_const", const=0, dest="read", default=None)
reg_read.add_argument("--noread", action="store_const", const=False, dest="read", default=None)
reg_write = subargp.add_mutually_exclusive_group(required=False)
reg_write.add_argument(
"--nowrite", action="store_const", const=0, dest="write", default=None,
"--nowrite", action="store_const", const=False, dest="write", default=None,
help="client is allowed to send (default - no)")
reg_write.add_argument("--write", action="store_const", const=1, default=None)
reg_write.add_argument("--write", action="store_const", const=True, default=None)
reg_debug = subargp.add_mutually_exclusive_group(required=False)
reg_debug.add_argument(
"--nodebug", action="store_const", const=0, dest="debug", default=None,
"--nodebug", action="store_const", const=False, dest="debug", default=None,
help="client is allowed receive debug output (default - no)")
reg_debug.add_argument("--debug", action="store_const", const=1, default=None)
reg_debug.add_argument("--debug", action="store_const", const=True, default=None)
reg_test = subargp.add_mutually_exclusive_group(required=False)
reg_test.add_argument(
"--test", action="store_const", const=1, default=None,
"--test", action="store_const", const=True, default=None,
help="client is yet in testing phase (default - yes)")
reg_test.add_argument("--notest", action="store_const", const=0, dest="test", default=None)
reg_test.add_argument("--notest", action="store_const", const=False, dest="test", default=None)
def get_args():
......@@ -1663,9 +2334,7 @@ def get_args():
subargp_loadmaps = subargp.add_parser(
"loadmaps", add_help=False,
description=(
"Load 'categories' and 'tags' table from 'catmap_mysql.json' and 'tagmap_mysql.json'."
" Note that this is NOT needed for server at all, load them into db at will,"
" should you need to run your own specific SQL queries on data directly."
"Load 'categories' and 'tags' table from 'catmap_db.json' and 'tagmap_db.json'."
" Note also that previous content of both tables will be lost."),
help="load catmap and tagmap into db")
subargp_loadmaps.set_defaults(command=load_maps)
......