Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Pavel.Valach/warden
1 result
Show changes
Commits on Source (5)
...@@ -42,13 +42,13 @@ B. Dependencies ...@@ -42,13 +42,13 @@ B. Dependencies
2. Python modules 2. Python modules
python-mysqldb 5.3.3+ python-mysqldb 5.3.3+ | python-psycopg2 2.8.6+
python-m2crypto 0.20+ python-m2crypto 0.20+
jsonschema 2.4+ jsonschema 2.4+
3. Database 3. Database
MySQL | MariaDB >= 5.5 MySQL | MariaDB >= 5.5 | PostgreSQL >= 13
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
C. Installation C. Installation
...@@ -70,14 +70,28 @@ C. Installation ...@@ -70,14 +70,28 @@ C. Installation
> GRANT ALL ON warden3.* TO `warden`@`localhost`; > GRANT ALL ON warden3.* TO `warden`@`localhost`;
> FLUSH PRIVILEGES; > FLUSH PRIVILEGES;
# psql
> CREATE DATABASE warden3 ENCODING 'UTF-8';
> CREATE ROLE "warden" LOGIN PASSWORD 'example';
> GRANT ALL ON DATABASE "warden3" TO "warden";
* Create necessary table structure * Create necessary table structure
mysql -p -u warden warden3 < warden_3.0.sql mysql -p -u warden warden3 < warden_3.0_mysql.sql
or
psql -U warden -h localhost warden3 < warden_3.0_postgres.sql
* Get up to date Idea schema * Get up to date Idea schema
wget -O warden_server/idea.schema https://idea.cesnet.cz/_media/en/idea0.schema wget -O warden_server/idea.schema https://idea.cesnet.cz/_media/en/idea0.schema
* Load category and tag maps into database (This step is optional for MySQL dbms)
./warden_server.py loadmaps
* Enable mod_wsgi, mod_ssl, include Warden configuration * Enable mod_wsgi, mod_ssl, include Warden configuration
This depends heavily on your distribution and Apache configuration. This depends heavily on your distribution and Apache configuration.
...@@ -129,7 +143,7 @@ particular implementation object of the aspect, for example type of logger ...@@ -129,7 +143,7 @@ particular implementation object of the aspect, for example type of logger
Sections and their "type" objects can be: Sections and their "type" objects can be:
Log: FileLogger, SysLogger Log: FileLogger, SysLogger
DB: MySQL DB: MySQL, PostgreSQL
Auth: X509Authenticator, X509NameAuthenticator, Auth: X509Authenticator, X509NameAuthenticator,
X509MixMatchAuthenticator,PlainAuthenticator X509MixMatchAuthenticator,PlainAuthenticator
Validator: JSONSchemaValidator, NoValidator Validator: JSONSchemaValidator, NoValidator
...@@ -186,22 +200,36 @@ object from particular section list is used ("FileLogger" for example). ...@@ -186,22 +200,36 @@ object from particular section list is used ("FileLogger" for example).
retry_count: number of retries, defaults to 3 retry_count: number of retries, defaults to 3
event_size_limit: max size of serialized event, defaults to 5 MB event_size_limit: max size of serialized event, defaults to 5 MB
catmap_filename: IDEA category mapping to database ids, defaults to catmap_filename: IDEA category mapping to database ids, defaults to
"catmap_mysql.json" at installation directory "catmap_db.json" at installation directory
tagmap_filename": IDEA node type mapping to database ids, defaults to
"tagmap_db.json" at installation directory
PostgreSQL: database storage backend
host: database server host, default "localhost"
user: database user, default "warden"
password: database password
dbname: database name, default "warden3"
port: database server port, default 5432
retry_pause: retry in case of database errors, in seconds, defaults to 5
retry_count: number of retries, defaults to 3
event_size_limit: max size of serialized event, defaults to 5 MB
catmap_filename: IDEA category mapping to database ids, defaults to
"catmap_db.json" at installation directory
tagmap_filename": IDEA node type mapping to database ids, defaults to tagmap_filename": IDEA node type mapping to database ids, defaults to
"tagmap_mysql.json" at installation directory "tagmap_db.json" at installation directory
WardenHandler: Main Warden RPC worker WardenHandler: Main Warden RPC worker
send_events_limit: max events sent in one bunch, defaults to 10000 send_events_limit: max events sent in one bunch, defaults to 10000
get_events_limit: max events received in one bunch, defaults to 10000 get_events_limit: max events received in one bunch, defaults to 10000
description: human readable description, sent in server info description: human readable description, sent in server info
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
E. Command line E. Command line
When run from command line, server offers set of commands and options for When run from command line, server offers set of commands and options for
runtime and database management. You can also use --help option for each runtime and database management. You can also use --help option for each
command and for server itself. command and for server itself.
warden_server.py [--help] [-c CONFIG] <command> warden_server.py [--help] [-c CONFIG] <command>
optional arguments: optional arguments:
...@@ -285,11 +313,9 @@ warden_server.py purge [--help] [-l] [-e] [-d DAYS] ...@@ -285,11 +313,9 @@ warden_server.py purge [--help] [-l] [-e] [-d DAYS]
warden_server.py loadmaps [--help] warden_server.py loadmaps [--help]
Load 'categories' and 'tags' table from 'catmap_mysql.json' and Load 'categories' and 'tags' table from 'catmap_db.json' and
'tagmap_mysql.json'. Note that this is NOT needed for server at all, load 'tagmap_db.json'. Note also that previous content of both tables
them into db at will, should you need to run your own specific SQL queries will be lost.
on data directly. Note also that previous content of both tables will be
lost.
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
Copyright (C) 2011-2015 Cesnet z.s.p.o Copyright (C) 2011-2015 Cesnet z.s.p.o
...@@ -39,7 +39,7 @@ def setUpModule(): # pylint: disable = locally-disabled, invalid-name ...@@ -39,7 +39,7 @@ def setUpModule(): # pylint: disable = locally-disabled, invalid-name
cur.execute("DROP DATABASE IF EXISTS %s" % (DB,)) # NOT SECURE cur.execute("DROP DATABASE IF EXISTS %s" % (DB,)) # NOT SECURE
cur.execute("CREATE DATABASE %s" % (DB,)) # NOT SECURE cur.execute("CREATE DATABASE %s" % (DB,)) # NOT SECURE
cur.execute("USE %s" % (DB,)) # NOT SECURE cur.execute("USE %s" % (DB,)) # NOT SECURE
with open(path.join(path.dirname(__file__), 'warden_3.0.sql')) as script: with open(path.join(path.dirname(__file__), 'warden_3.0_mysql.sql')) as script:
statements = ''.join([line.replace('\n', '') for line in script if line[0:2] != '--']).split(';')[:-1] statements = ''.join([line.replace('\n', '') for line in script if line[0:2] != '--']).split(';')[:-1]
for statement in statements: for statement in statements:
cur.execute(statement) cur.execute(statement)
......
SET TimeZone='+00:00';
CREATE COLLATION case_insensitive (
provider = icu,
locale = 'und-u-ks-level2',
deterministic = false
);
-- ---------------------------------------------------------
--
-- Database: "warden3"
--
-- --------------------------------------------------------
--
-- Table structure for table "categories"
--
CREATE TABLE "categories" (
"id" int NOT NULL UNIQUE CHECK ("id" >= 0),
"category" text NOT NULL COLLATE case_insensitive,
"subcategory" text DEFAULT NULL COLLATE case_insensitive,
"cat_subcat" text NOT NULL COLLATE case_insensitive
);
CREATE INDEX "cat_sub" ON "categories" ("cat_subcat");
-- --------------------------------------------------------
--
-- Table structure for table "clients"
--
CREATE TABLE "clients" (
"id" SERIAL PRIMARY KEY,
"registered" timestamp NOT NULL DEFAULT '1970-01-01 00:00:00',
"requestor" text NOT NULL COLLATE case_insensitive,
"hostname" text NOT NULL COLLATE case_insensitive,
"note" text NULL COLLATE case_insensitive,
"valid" smallint NOT NULL DEFAULT '1' CHECK ("valid" >= 0),
"name" text NOT NULL,
"secret" text NULL,
"read" smallint NOT NULL DEFAULT '1' CHECK ("read" >= 0),
"debug" smallint NOT NULL DEFAULT '0' CHECK ("debug" >= 0),
"write" smallint NOT NULL DEFAULT '0' CHECK ("write" >= 0),
"test" smallint NOT NULL DEFAULT '0' CHECK ("test" >= 0)
);
CREATE INDEX "clients_1" ON "clients" ("valid", "secret", "hostname");
CREATE INDEX "clients_2" ON "clients" ("valid", "name");
-- --------------------------------------------------------
--
-- Table structure for table "events"
--
CREATE TABLE "events" (
"id" BIGSERIAL PRIMARY KEY,
"received" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"client_id" int NOT NULL REFERENCES "clients" ("id"),
"data" bytea NOT NULL,
"valid" smallint NOT NULL DEFAULT '1' CHECK ("valid" >= 0)
);
CREATE INDEX "id" ON "events" ("id", "client_id");
CREATE INDEX "received" ON "events" ("received");
SELECT nextval('events_id_seq'); -- AUTO_INCREMENT = 2
-- --------------------------------------------------------
--
-- Table structure for table "event_category_mapping"
--
CREATE TABLE "event_category_mapping" (
"event_id" bigint NOT NULL REFERENCES "events" ("id") ON DELETE CASCADE,
"category_id" int NOT NULL,
PRIMARY KEY ("event_id", "category_id"),
CONSTRAINT "event_category_mapping_category_id_fk" FOREIGN KEY ("category_id") REFERENCES "categories" ("id")
);
-- --------------------------------------------------------
--
-- Table structure for table "last_events"
--
CREATE TABLE "last_events" (
"id" SERIAL PRIMARY KEY,
"client_id" int NOT NULL REFERENCES "clients" ("id"),
"event_id" bigint REFERENCES "events" ("id"),
"timestamp" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX "client_id" ON "last_events" ("client_id", "event_id");
-- --------------------------------------------------------
--
-- Table structure for table "tags"
--
CREATE TABLE "tags" (
"id" int NOT NULL UNIQUE CHECK ("id" >= 0),
"tag" text NOT NULL COLLATE case_insensitive
);
CREATE INDEX "id_tag_name" ON "tags" ("id", "tag");
CREATE INDEX "tag_name" ON "tags" ("tag");
-- --------------------------------------------------------
--
-- Table structure for table "event_tag_mapping"
--
CREATE TABLE "event_tag_mapping" (
"event_id" bigint NOT NULL REFERENCES "events" ("id") ON DELETE CASCADE,
"tag_id" int NOT NULL,
PRIMARY KEY ("event_id", "tag_id"),
CONSTRAINT "event_tag_mapping_tag_id_fk" FOREIGN KEY ("tag_id") REFERENCES "tags" ("id")
);
...@@ -16,11 +16,10 @@ import json ...@@ -16,11 +16,10 @@ import json
import re import re
from traceback import format_tb from traceback import format_tb
from collections import namedtuple from collections import namedtuple
from itertools import repeat
from time import sleep from time import sleep
from random import randint from random import randint
import M2Crypto.X509 import M2Crypto.X509
import MySQLdb as my
import MySQLdb.cursors as mycursors
if sys.version_info[0] >= 3: if sys.version_info[0] >= 3:
import configparser as ConfigParser import configparser as ConfigParser
...@@ -134,6 +133,17 @@ class Error(Exception): ...@@ -134,6 +133,17 @@ class Error(Exception):
return d return d
def override_required(method):
def abstract_method(self, *args, **kwargs):
method(self, *args, **kwargs)
raise NotImplementedError(
"Class %s needs to implement the %s() method" %
(type(self).__name__, method.__name__)
)
abstract_method.__doc__ = method.__doc__
return abstract_method
def get_clean_root_logger(level=logging.INFO): def get_clean_root_logger(level=logging.INFO):
""" Attempts to get logging module into clean slate state """ """ Attempts to get logging module into clean slate state """
...@@ -469,12 +479,13 @@ class JSONSchemaValidator(NoValidator): ...@@ -469,12 +479,13 @@ class JSONSchemaValidator(NoValidator):
return res return res
class MySQL(ObjectBase): class DataBase(ObjectBase):
def __init__( def __init__(
self, req, log, host, user, password, dbname, port, retry_count, self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename): retry_pause, event_size_limit, catmap_filename, tagmap_filename):
ObjectBase.__init__(self, req, log) ObjectBase.__init__(self, req, log)
self.host = host self.host = host
self.user = user self.user = user
self.password = password self.password = password
...@@ -497,20 +508,16 @@ class MySQL(ObjectBase): ...@@ -497,20 +508,16 @@ class MySQL(ObjectBase):
self.con = None self.con = None
@override_required
def connect(self): def connect(self):
self.con = my.connect( pass
host=self.host, user=self.user, passwd=self.password,
db=self.dbname, port=self.port, cursorclass=mycursors.DictCursor)
@override_required
def close(self): def close(self):
try: pass
if self.con:
self.con.close()
except Exception:
pass
self.con = None
__del__ = close def __del__(self):
self.close()
def repeat(self): def repeat(self):
""" Allows for graceful repeating of transactions self.retry_count """ Allows for graceful repeating of transactions self.retry_count
...@@ -521,8 +528,7 @@ class MySQL(ObjectBase): ...@@ -521,8 +528,7 @@ class MySQL(ObjectBase):
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
crs = db.query(...) res = db.query_all(...)
# do something with crs
Note that it's not reentrant (as is not underlying MySQL Note that it's not reentrant (as is not underlying MySQL
connection), so avoid nesting on the same MySQL object. connection), so avoid nesting on the same MySQL object.
...@@ -540,8 +546,7 @@ class MySQL(ObjectBase): ...@@ -540,8 +546,7 @@ class MySQL(ObjectBase):
exception. Can be used with self.repeat(), or alone as: exception. Can be used with self.repeat(), or alone as:
with self as db: with self as db:
crs = db.query(...) res = db.query_all(...)
# do something with crs
Note that it's not reentrant (as is not underlying MySQL Note that it's not reentrant (as is not underlying MySQL
connection), so avoid nesting on the same MySQL object. connection), so avoid nesting on the same MySQL object.
...@@ -550,6 +555,7 @@ class MySQL(ObjectBase): ...@@ -550,6 +555,7 @@ class MySQL(ObjectBase):
self.retry_attempt = 0 self.retry_attempt = 0
return self return self
@override_required
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
""" Context manager protocol. If db exception is fired and """ Context manager protocol. If db exception is fired and
self.retry_attempt is not zero, it is only logged and self.retry_attempt is not zero, it is only logged and
...@@ -557,110 +563,87 @@ class MySQL(ObjectBase): ...@@ -557,110 +563,87 @@ class MySQL(ObjectBase):
open transaction is rolled back. open transaction is rolled back.
In case of no exception, transaction gets commited. In case of no exception, transaction gets commited.
""" """
if not exc_type:
self.con.commit()
self.retry_attempt = 0
else:
try:
if self.con:
self.con.rollback()
except my.Error:
pass
try:
self.close()
except my.Error:
pass
if self.retry_attempt:
self.log.info("Database error (%d attempts left): %s %s" % (self.retry_attempt, exc_type.__name__, exc_val))
return True
def query(self, *args, **kwargs): @override_required
if not self.con: def execute(self, query, params, ret=None):
self.connect() """Execute the provided queries; discard the result"""
crs = self.con.cursor()
self.log.debug("execute: %s %s" % (args, kwargs))
crs.execute(*args, **kwargs)
return crs
def _get_comma_perc(self, l): @override_required
return ','.join(['%s'] * len(l)) def query_all(self, query, params, ret=-1):
"""Execute the provided queries; return list of all rows as dicts of the ret-th query (0 based)"""
def _get_not(self, b): @override_required
return "" if b else "NOT" def query_one(self, query, prams, ret=-1):
"""Execute the provided queries; return the first result of the ret-th query (0 based)"""
@override_required
def query_rowcount(self, query, params, ret=-1):
"""Execute provided query; return the number of affected rows or the number of returned rows of the ret-th query (0 based)"""
@override_required
def _build_get_client_by_name(self, cert_names, name, secret):
"""Build query and params for client lookup"""
def get_client_by_name(self, cert_names=None, name=None, secret=None): def get_client_by_name(self, cert_names=None, name=None, secret=None):
query = ["SELECT * FROM clients WHERE valid = 1"] query, params, ret = self._build_get_client_by_name(cert_names, name, secret)
params = []
if name:
query.append(" AND name = %s")
params.append(name.lower())
if secret:
query.append(" AND secret = %s")
params.append(secret)
if cert_names:
query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
params.extend(n.lower() for n in cert_names)
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
rows = db.query("".join(query), params).fetchall() rows = db.query_all(query, params, ret)
if len(rows) > 1: if len(rows) > 1:
self.log.warning( self.log.warning(
"get_client_by_name: query returned more than one result (cert_names = %s, name = %s, secret = %s): %s" % ( "get_client_by_name: query returned more than one result (cert_names = %s, name = %s, secret = %s): %s" %
cert_names, name, secret, ", ".join([str(Client(**row)) for row in rows]))) (cert_names, name, secret, ", ".join([str(Client(**row)) for row in rows]))
)
return None return None
return Client(**rows[0]) if rows else None return Client(**rows[0]) if rows else None
@override_required
def _build_get_clients(self, id):
"""Build query and params for client lookup by id"""
def get_clients(self, id=None): def get_clients(self, id=None):
query = ["SELECT * FROM clients"] query, params, ret = self._build_get_clients(id)
params = []
if id:
query.append("WHERE id = %s")
params.append(id)
query.append("ORDER BY id")
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
rows = db.query(" ".join(query), params).fetchall() rows = db.query_all(query, params, ret=ret)
return [Client(**row) for row in rows] return [Client(**row) for row in rows]
@override_required
def _build_add_modify_client(self, id, **kwargs):
"""Build query and params for adding/modifying client"""
def add_modify_client(self, id=None, **kwargs): def add_modify_client(self, id=None, **kwargs):
query = [] if id is not None and all(kwargs.get(attr, None) is None for attr in set(Client._fields) - {"id", "registered"}):
params = []
uquery = []
if id is None:
query.append("INSERT INTO clients SET")
uquery.append("registered = now()")
else:
query.append("UPDATE clients SET")
for attr in set(Client._fields) - set(["id", "registered"]):
val = kwargs.get(attr, None)
if val is not None:
if attr == "secret" and val == "": # disable secret
val = None
uquery.append("`%s` = %%s" % attr)
params.append(val)
if not uquery:
return id return id
query.append(", ".join(uquery))
if id is not None: query, params, ret = self._build_add_modify_client(id, **kwargs)
query.append("WHERE id = %s")
params.append(id)
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
crs = db.query(" ".join(query), params) res_id = db.query_one(query, params, ret=ret)["id"]
newid = crs.lastrowid if id is None else id newid = res_id if id is None else id
return newid return newid
@override_required
def _build_get_debug_version(self):
pass
@override_required
def _build_get_debug_tablestat(self):
pass
def get_debug(self): def get_debug(self):
vquery, vparams, vret = self._build_get_debug_version()
tquery, tparams, tret = self._build_get_debug_tablestat()
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
rows = db.query("SELECT VERSION() AS VER").fetchall()
tablestat = db.query("SHOW TABLE STATUS").fetchall()
return { return {
"db": "MySQL", "db": type(self).__name__,
"version": rows[0]["VER"], "version": db.query_one(vquery, vparams, vret)["version"],
"tables": tablestat "tables": db.query_all(tquery, tparams, tret)
} }
def getMaps(self, section, variables): def getMaps(self, section, variables):
...@@ -671,10 +654,21 @@ class MySQL(ObjectBase): ...@@ -671,10 +654,21 @@ class MySQL(ObjectBase):
except KeyError: except KeyError:
raise self.req.error( raise self.req.error(
message="Wrong tag or category used in query.", message="Wrong tag or category used in query.",
error=422, exc=sys.exc_info(), key=v) error=422, exc=sys.exc_info(), key=v
)
maps.append(mapped) maps.append(mapped)
return set(maps) # unique return set(maps) # unique
@override_required
def _build_fetch_events(
self, client, id, count,
cat, nocat, tag, notag, group, nogroup):
"""Build query and params for fetching events based on id, count and category, tag and group filters"""
@override_required
def _load_event_json(self, data):
"""Return decoded json from data loaded from database, if unable to decode, return None"""
def fetch_events( def fetch_events(
self, client, id, count, self, client, id, count,
cat=None, nocat=None, cat=None, nocat=None,
...@@ -694,43 +688,16 @@ class MySQL(ObjectBase): ...@@ -694,43 +688,16 @@ class MySQL(ObjectBase):
message="Unrealizable conditions. Choose group or nogroup option.", message="Unrealizable conditions. Choose group or nogroup option.",
error=422, group=group, nogroup=nogroup) error=422, group=group, nogroup=nogroup)
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"] query, params, ret = self._build_fetch_events(
params = [id or 0] client, id, count,
cat, nocat,
if cat or nocat: tag, notag,
cats = self.getMaps(self.catmap, (cat or nocat)) group, nogroup
query.append( )
" AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" % (
self._get_not(cat), self._get_comma_perc(cats)))
params.extend(cats)
if tag or notag:
tags = self.getMaps(self.tagmap, (tag or notag))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" % (
self._get_not(tag), self._get_comma_perc(tags)))
params.extend(tags)
if group or nogroup:
subquery = []
for name in (group or nogroup):
escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE
subquery.append("c.name = %s") # exact client
params.append(name)
subquery.append("c.name LIKE CONCAT(%s, '.%%') ESCAPE '&'") # whole subtree
params.append(escaped_name)
query.append(" AND %s (%s)" % (self._get_not(group), " OR ".join(subquery)))
query.append(" AND e.valid = 1 LIMIT %s")
params.append(count)
query_string = "".join(query)
row = None row = None
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
row = db.query(query_string, params).fetchall() row = db.query_all(query, params, ret=ret)
if row: if row:
maxid = max(r['id'] for r in row) maxid = max(r['id'] for r in row)
...@@ -739,9 +706,8 @@ class MySQL(ObjectBase): ...@@ -739,9 +706,8 @@ class MySQL(ObjectBase):
events = [] events = []
for r in row: for r in row:
try: e = self._load_event_json(r["data"])
e = json.loads(r["data"]) if e is None: # null cannot be valid event JSON
except Exception:
# Note that we use Error object just for proper formatting, # Note that we use Error object just for proper formatting,
# but do not raise it; from client perspective invalid # but do not raise it; from client perspective invalid
# events get skipped silently. # events get skipped silently.
...@@ -749,111 +715,767 @@ class MySQL(ObjectBase): ...@@ -749,111 +715,767 @@ class MySQL(ObjectBase):
message="Unable to deserialize JSON event from db, id=%s" % r["id"], message="Unable to deserialize JSON event from db, id=%s" % r["id"],
error=500, exc=sys.exc_info(), id=r["id"]) error=500, exc=sys.exc_info(), id=r["id"])
err.log(self.log, prio=logging.WARNING) err.log(self.log, prio=logging.WARNING)
events.append(e) else:
events.append(e)
return { return {
"lastid": maxid, "lastid": maxid,
"events": events "events": events
} }
@override_required
def _build_store_events_event(self, client, event, raw_event):
"""Build query and params for event insertion"""
@override_required
def _build_store_events_categories(self, event_id, cat_ids):
"""Build query and params for insertion of event-categories mapping"""
@override_required
def _build_store_events_tags(self, event_id, tag_ids):
"""Build query and params for insertion of event-tags mapping"""
def store_events(self, client, events, events_raw): def store_events(self, client, events, events_raw):
try: try:
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
for event, raw_event in zip(events, events_raw): for event, raw_event in zip(events, events_raw):
lastid = db.query( equery, eparams, eret = self._build_store_events_event(client, event, raw_event)
"INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)", lastid = db.query_one(equery, eparams, ret=eret)["id"]
(client.id, raw_event)).lastrowid
catlist = event.get('Category', ["Other"]) catlist = event.get('Category', ["Other"])
cats = set(catlist) | set(cat.split(".", 1)[0] for cat in catlist) cats = set(catlist) | {cat.split(".", 1)[0] for cat in catlist}
for cat in cats: cat_ids = [self.catmap.get(cat, self.catmap_other) for cat in cats]
cat_id = self.catmap.get(cat, self.catmap_other) cquery, cparams, _ = self._build_store_events_categories(lastid, cat_ids)
db.query("INSERT INTO event_category_mapping (event_id,category_id) VALUES (%s, %s)", (lastid, cat_id)) db.execute(cquery, cparams)
nodes = event.get('Node', []) nodes = event.get('Node', [])
tags = [] tags = {tag for node in nodes for tag in node.get('Type', [])}
for node in nodes: if tags:
tags.extend(node.get('Type', [])) tag_ids = [self.tagmap.get(tag, self.tagmap_other) for tag in tags]
for tag in set(tags): tquery, tparams, _ = self._build_store_events_tags(lastid, tag_ids)
tag_id = self.tagmap.get(tag, self.tagmap_other) db.execute(tquery, tparams)
db.query("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES (%s, %s)", (lastid, tag_id))
return [] return []
except Exception as e: except Exception as e:
exception = self.req.error(message="DB error", error=500, exc=sys.exc_info(), env=self.req.env) exception = self.req.error(message="DB error", error=500, exc=sys.exc_info(), env=self.req.env)
exception.log(self.log) exception.log(self.log)
return [{"error": 500, "message": "DB error %s" % type(e).__name__}] return [{"error": 500, "message": "DB error %s" % type(e).__name__}]
@override_required
def _build_insert_last_received_id(self, client, id):
"""Build query and params for insertion of the last event id received by client"""
def insertLastReceivedId(self, client, id): def insertLastReceivedId(self, client, id):
self.log.debug("insertLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname)) self.log.debug("insertLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname))
query, params, _ = self._build_insert_last_received_id(client, id)
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
db.query("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())", (client.id, id)) db.execute(query, params)
@override_required
def _build_get_last_event_id(self):
"""Build query and params for querying the id of the last inserted event"""
def getLastEventId(self): def getLastEventId(self):
query, params, ret = self._build_get_last_event_id()
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
row = db.query("SELECT MAX(id) as id FROM events").fetchall()[0] id_ = db.query_one(query, params, ret=ret)["id"]
return row['id'] or 1 return id_ or 1
@override_required
def _build_get_last_received_id(self, client):
"""Build query and params for querying the last event id received by client"""
def getLastReceivedId(self, client): def getLastReceivedId(self, client):
query, params, ret = self._build_get_last_received_id(client)
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
res = db.query( res = db.query_one(query, params, ret=ret)
"SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1",
(client.id,)).fetchall() if res is None:
try:
row = res[0]
except IndexError:
id = None id = None
self.log.debug("getLastReceivedId: probably first access, unable to get id for client %i(%s)" % ( self.log.debug("getLastReceivedId: probably first access, unable to get id for client %i(%s)" %
client.id, client.hostname)) (client.id, client.hostname))
else: else:
id = row["id"] id = res["id"] or 1
self.log.debug("getLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname)) self.log.debug("getLastReceivedId: id %i for client %i(%s)" %
(id, client.id, client.hostname))
return id return id
@override_required
def _build_load_maps_tags(self):
"""Build query and params for updating the tag map"""
@override_required
def _build_load_maps_cats(self):
"""Build query and params for updating the catetgory map"""
def load_maps(self): def load_maps(self):
tquery, tparams, _ = self._build_load_maps_tags()
cquery, cparams, _ = self._build_load_maps_cats()
with self as db: with self as db:
db.query("DELETE FROM tags") db.execute(tquery, tparams)
for tag, num in self.tagmap.items(): db.execute(cquery, cparams)
db.query("INSERT INTO tags(id, tag) VALUES (%s, %s)", (num, tag))
db.query("DELETE FROM categories") @override_required
for cat_subcat, num in self.catmap.items(): def _build_purge_lastlog(self, days):
catsplit = cat_subcat.split(".", 1) """Build query and params for purging stored client last event mapping older than days"""
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
db.query(
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES (%s, %s, %s, %s)",
(num, category, subcategory, cat_subcat))
def purge_lastlog(self, days): def purge_lastlog(self, days):
query, params, ret = self._build_purge_lastlog(days)
with self as db:
return db.query_rowcount(query, params, ret=ret)
@override_required
def _build_purge_events_get_id(self, days):
"""Build query and params to get largest event id of events older than days"""
@override_required
def _build_purge_events_events(self, id_):
"""Build query and params to remove events older then days and their mappings"""
def purge_events(self, days):
iquery, iparams, iret = self._build_purge_events_get_id(days)
with self as db: with self as db:
return db.query( id_ = db.query_one(iquery, iparams, ret=iret)["id"]
if id_ is None:
return 0
equery, eparams, eret = self._build_purge_events_events(id_)
affected = db.query_rowcount(equery, eparams, ret=eret)
return affected
class DataBaseAPIv2(DataBase):
def __init__(self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename):
super().__init__(req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename)
self.db = None
self.con = None
def close(self):
try:
if self.con:
self.con.close()
except Exception:
pass
self.con = None
def __exit__(self, exc_type, exc_val, exc_tb):
""" Context manager protocol. If db exception is fired and
self.retry_attempt is not zero, it is only logged and
does not propagate, otherwise it propagates up. Also
open transaction is rolled back.
In case of no exception, transaction gets commited.
"""
if exc_type is None:
self.con.commit()
self.retry_attempt = 0
else:
try:
if self.con is not None:
self.con.rollback()
except self.db.Error:
pass
try:
self.close()
except self.db.Error:
pass
if self.retry_attempt > 0:
self.log.info("Database error (%d attempts left): %s %s" %
(self.retry_attempt, exc_type.__name__, exc_val))
return True
def _query(self, *args, **kwargs):
if not self.con:
self.connect()
crs = self.con.cursor()
self.log.debug("execute: %s %s" % (args, kwargs))
crs.execute(*args, **kwargs)
return crs
def _query_multiple(self, query, params, ret, fetch):
res = None
for n, (q, p) in enumerate(zip(query, params)):
cur = self._query(q, p)
if n == ret:
res = fetch(cur)
if ret == -1: # fetch the result of the last query
res = fetch(cur)
return res
def execute(self, query, params, ret=None):
"""Execute the provided queries; discard the result"""
self._query_multiple(query, params, None, None)
def query_all(self, query, params, ret=-1):
"""Execute the provided queries; return list of all rows as dicts of the ret-th query (0 based)"""
return self._query_multiple(query, params, ret, lambda cur: cur.fetchall())
def query_one(self, query, params, ret=-1):
"""Execute the provided queries; return the first result of the ret-th query (0 based)"""
return self._query_multiple(query, params, ret, lambda cur: cur.fetchone())
def query_rowcount(self, query, params, ret=-1):
"""Execute provided query; return the number of affected rows or the number of returned rows of the ret-th query (0 based)"""
return self._query_multiple(query, params, ret, lambda cur: cur.rowcount)
def _get_comma_perc(self, l):
return ",".join(repeat("%s", l if isinstance(l, int) else len(l)))
def _get_comma_perc_n(self, n, l):
return ", ".join(repeat("(%s)" % self._get_comma_perc(n), len(l)))
def _get_not(self, b):
return "" if b else "NOT"
class MySQL(DataBaseAPIv2):
def __init__(
self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename):
super().__init__(req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename)
import MySQLdb as db
import MySQLdb.cursors as mycursors
self.db = db
self.mycursors = mycursors
def connect(self):
self.con = self.db.connect(
host=self.host, user=self.user, passwd=self.password,
db=self.dbname, port=self.port, cursorclass=self.mycursors.DictCursor)
def _build_get_client_by_name(self, cert_names=None, name=None, secret=None):
"""Build query and params for client lookup"""
query = ["SELECT * FROM clients WHERE valid = 1"]
params = []
if name:
query.append(" AND name = %s")
params.append(name.lower())
if secret:
query.append(" AND secret = %s")
params.append(secret)
if cert_names:
query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
params.extend(n.lower() for n in cert_names)
return ["".join(query)], [params], 0
def _build_get_clients(self, id):
"""Build query and params for client lookup by id"""
query = ["SELECT * FROM clients"]
params = []
if id:
query.append("WHERE id = %s")
params.append(id)
query.append("ORDER BY id")
return [" ".join(query)], [params], 0
def _build_add_modify_client(self, id, **kwargs):
"""Build query and params for adding/modifying client"""
query = []
params = []
uquery = []
if id is None:
query.append("INSERT INTO clients SET")
uquery.append("registered = now()")
else:
query.append("UPDATE clients SET")
for attr in set(Client._fields) - set(["id", "registered"]):
val = kwargs.get(attr, None)
if val is not None: # guaranteed at least one is not None
if attr == "secret" and val == "": # disable secret
val = None
uquery.append("`%s` = %%s" % attr)
params.append(val)
query.append(", ".join(uquery))
if id is not None:
query.append("WHERE id = %s")
params.append(id)
return (
[" ".join(query), 'SELECT LAST_INSERT_ID() AS id'],
[params, []],
1
)
def _build_get_debug_version(self):
return ["SELECT VERSION() AS version"], [()], 0
def _build_get_debug_tablestat(self):
return ["SHOW TABLE STATUS"], [()], 0
def _load_event_json(self, data):
"""Return decoded json from data loaded from database, if unable to decode, return None"""
try:
return json.loads(data)
except Exception:
return None
def _build_fetch_events(
self, client, id, count,
cat, nocat, tag, notag, group, nogroup):
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
params = [id or 0]
if cat or nocat:
cats = self.getMaps(self.catmap, (cat or nocat))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" %
(self._get_not(cat), self._get_comma_perc(cats))
)
params.extend(cats)
if tag or notag:
tags = self.getMaps(self.tagmap, (tag or notag))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" %
(self._get_not(tag), self._get_comma_perc(tags))
)
params.extend(tags)
if group or nogroup:
subquery = []
for name in (group or nogroup):
escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE
subquery.append("c.name = %s") # exact client
params.append(name)
subquery.append("c.name LIKE CONCAT(%s, '.%%') ESCAPE '&'") # whole subtree
params.append(escaped_name)
query.append(" AND %s (%s)" %
(self._get_not(group), " OR ".join(subquery)))
query.append(" AND e.valid = 1 LIMIT %s")
params.append(count)
return ["".join(query)], [params], 0
def _build_store_events_event(self, client, event, raw_event):
"""Build query and params for event insertion"""
return (
[
"INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)",
"SELECT LAST_INSERT_ID() AS id"
],
[(client.id, raw_event), ()],
1
)
def _build_store_events_categories(self, event_id, cat_ids):
"""Build query and params for insertion of event-categories mapping"""
return (
["INSERT INTO event_category_mapping (event_id,category_id) VALUES " +
self._get_comma_perc_n(2, cat_ids)],
[tuple(param for cat_id in cat_ids for param in (event_id, cat_id))],
None
)
def _build_store_events_tags(self, event_id, tag_ids):
"""Build query and params for insertion of event-tags mapping"""
return (
["INSERT INTO event_tag_mapping (event_id,tag_id) VALUES " +
self._get_comma_perc_n(2, tag_ids)],
[tuple(param for tag_id in tag_ids for param in (event_id, tag_id))],
None
)
def _build_insert_last_received_id(self, client, id):
"""Build query and params for insertion of the last event id received by client"""
return (
["INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())"],
[(client.id, id)],
None
)
def _build_get_last_event_id(self):
"""Build query and params for querying the id of the last inserted event"""
return ["SELECT MAX(id) as id FROM events"], [()], 0
def _build_get_last_received_id(self, client):
"""Build query and params for querying the last event id received by client"""
return (
["SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1"],
[(client.id,)],
0
)
def _build_load_maps_tags(self):
"""Build query and params for updating the tag map"""
return (
[
"DELETE FROM tags",
"INSERT INTO tags(id, tag) VALUES " +
self._get_comma_perc_n(2, self.tagmap)
],
[
(),
tuple(param for tag, num in self.tagmap.items() for param in (num, tag))
],
None
)
def _build_load_maps_cats(self):
"""Build query and params for updating the catetgory map"""
params = []
for cat_subcat, num in self.catmap.items():
catsplit = cat_subcat.split(".", 1)
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
params.extend((num, category, subcategory, cat_subcat))
return (
[
"DELETE FROM categories",
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES " +
self._get_comma_perc_n(4, self.catmap)
],
[
(),
tuple(params)
],
None
)
def _build_purge_lastlog(self, days):
"""Build query and params for purging stored client last event mapping older than days"""
return (
[
"DELETE FROM last_events " "DELETE FROM last_events "
" USING last_events LEFT JOIN (" " USING last_events LEFT JOIN ("
" SELECT MAX(id) AS last FROM last_events" " SELECT MAX(id) AS last FROM last_events"
" GROUP BY client_id" " GROUP BY client_id"
" ) AS maxids ON last=id" " ) AS maxids ON last=id"
" WHERE timestamp < DATE_SUB(CURDATE(), INTERVAL %s DAY) AND last IS NULL", " WHERE timestamp < DATE_SUB(CURDATE(), INTERVAL %s DAY) AND last IS NULL",
(days,)).rowcount ],
[(days,)],
0
)
def _build_purge_events_get_id(self, days):
"""Build query and params to get largest event id of events older than days"""
return (
[
"SELECT MAX(id) as id"
" FROM events"
" WHERE received < DATE_SUB(CURDATE(), INTERVAL %s DAY)"
],
[(days,)],
0
)
def _build_purge_events_events(self, id_):
"""Build query and params to remove events older then days and their mappings"""
return (
[
"DELETE FROM event_category_mapping WHERE event_id <= %s",
"DELETE FROM event_tag_mapping WHERE event_id <= %s",
"DELETE FROM events WHERE id <= %s",
],
[(id_,), (id_,), (id_,)],
2
)
class PostgreSQL(DataBaseAPIv2):
def purge_events(self, days): def __init__(
with self as db: self, req, log, host, user, password, dbname, port, retry_count,
affected = 0 retry_pause, event_size_limit, catmap_filename, tagmap_filename):
id_ = db.query(
super().__init__(req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename)
import psycopg2 as db
from psycopg2 import sql as ppgsql
import psycopg2.extras as ppgextra
self.db = db
self.ppgsql = ppgsql
self.ppgextra = ppgextra
def connect(self):
self.con = self.db.connect(
host=self.host, user=self.user, password=self.password,
dbname=self.dbname, port=self.port, cursor_factory=self.ppgextra.RealDictCursor)
def _build_get_client_by_name(self, cert_names=None, name=None, secret=None):
"""Build query and params for client lookup"""
query = ["SELECT * FROM clients WHERE valid = 1"]
params = []
if name:
query.append(" AND name = %s")
params.append(name.lower())
if secret:
query.append(" AND secret = %s")
params.append(secret)
if cert_names:
query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
params.extend(n.lower() for n in cert_names)
return ["".join(query)], [params], 0
def _build_get_clients(self, id):
"""Build query and params for client lookup by id"""
query = ["SELECT * FROM clients"]
params = []
if id:
query.append("WHERE id = %s")
params.append(id)
query.append("ORDER BY id")
return [" ".join(query)], [params], 0
def _build_add_modify_client(self, id, **kwargs):
"""Build query and params for adding/modifying client"""
fields = set(Client._fields) - {"id", "registered"}
cols, params = map(
list,
zip(
*(
(k, None) # disable secret
if k == "secret" and v == "" else
(k, v)
for k, v in kwargs.items()
if v is not None and k in fields
)
)
)
if id is None:
query = self.ppgsql.SQL('INSERT INTO clients ("registered", {}) VALUES (NOW(), {}) RETURNING id').format(
self.ppgsql.SQL(", ").join(map(self.ppgsql.Identifier, cols)),
self.ppgsql.SQL(", ").join(self.ppgsql.Placeholder() * len(cols))
)
elif not cols:
return ["SELECT %s AS id"], [(id,)], 0
else:
query = self.ppgsql.SQL("UPDATE clients SET {} WHERE id = {} RETURNING id").format(
self.ppgsql.SQL(", ").join(
self.ppgsql.SQL("{} = {}").format(
self.ppgsql.Identifier(col),
self.ppgsql.Placeholder()
) for col in cols
),
self.ppgsql.Placeholder()
)
params.append(id)
return [query], [params], 0
def _build_get_debug_version(self):
return ["SELECT setting AS version FROM pg_settings WHERE name = 'server_version'"], [()], 0
def _build_get_debug_tablestat(self):
return [
"SELECT "
'tablename AS "Name", '
'relnatts AS "Columns", '
'n_live_tup AS "Rows", '
'pg_catalog.pg_size_pretty(pg_catalog.pg_table_size(oid)) AS "Table_size", '
'pg_catalog.pg_size_pretty(pg_catalog.pg_indexes_size(oid)) AS "Index_size", '
'coll.collations AS "Collations" '
"FROM "
"pg_catalog.pg_tables tbls "
"LEFT OUTER JOIN pg_catalog.pg_class cls "
"ON tbls.tablename=cls.relname "
"LEFT OUTER JOIN pg_catalog.pg_stat_user_tables sut "
"ON tbls.tablename=sut.relname "
"LEFT OUTER JOIN ("
"SELECT "
"table_name, "
"string_agg("
"DISTINCT COALESCE("
"collation_name, "
"("
"SELECT "
"datcollate "
"FROM "
"pg_catalog.pg_database "
"WHERE "
"datname=%s"
")"
"), "
"','"
") AS collations "
"FROM "
"information_schema.columns "
"GROUP BY "
"table_name"
") coll "
"ON tbls.tablename=coll.table_name "
"WHERE "
"tbls.schemaname='public' "
"AND tbls.tableowner=%s"
], [(self.dbname, self.user)], 0
def _load_event_json(self, data):
"""Return decoded json from data loaded from database, if unable to decode, return None"""
try:
return json.loads(data.tobytes())
except Exception:
return None
def _build_fetch_events(
self, client, id, count,
cat, nocat, tag, notag, group, nogroup):
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
params = [id or 0]
if cat or nocat:
cats = self.getMaps(self.catmap, (cat or nocat))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" %
(self._get_not(cat), self._get_comma_perc(cats))
)
params.extend(cats)
if tag or notag:
tags = self.getMaps(self.tagmap, (tag or notag))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" %
(self._get_not(tag), self._get_comma_perc(tags))
)
params.extend(tags)
if group or nogroup:
subquery = []
for name in group or nogroup:
name = name.lower() # assumes only lowercase names
escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE
subquery.append("c.name = %s") # exact client
params.append(name)
subquery.append("c.name LIKE %s || '.%%' ESCAPE '&'") # whole subtree
params.append(escaped_name)
query.append(" AND %s (%s)" % (self._get_not(group), " OR ".join(subquery)))
query.append(" AND e.valid = 1 LIMIT %s")
params.append(count)
return ["".join(query)], [params], 0
def _build_store_events_event(self, client, event, raw_event):
"""Build query and params for event insertion"""
return (
["INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s) RETURNING id"],
[(client.id, raw_event)],
0
)
def _build_store_events_categories(self, event_id, cat_ids):
"""Build query and params for insertion of event-categories mapping"""
return (
["INSERT INTO event_category_mapping (event_id,category_id) VALUES " +
self._get_comma_perc_n(2, cat_ids)],
[tuple(param for cat_id in cat_ids for param in (event_id, cat_id))],
None
)
def _build_store_events_tags(self, event_id, tag_ids):
"""Build query and params for insertion of event-tags mapping"""
return (
["INSERT INTO event_tag_mapping (event_id,tag_id) VALUES " +
self._get_comma_perc_n(2, tag_ids)],
[tuple(param for tag_id in tag_ids for param in (event_id, tag_id))],
None
)
def _build_insert_last_received_id(self, client, id):
"""Build query and params for insertion of the last event id received by client"""
return (
["INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())"],
[(client.id, None if id == 1 else id)],
None
)
def _build_get_last_event_id(self):
"""Build query and params for querying the id of the last inserted event"""
return ["SELECT MAX(id) as id FROM events"], [()], 0
def _build_get_last_received_id(self, client):
"""Build query and params for querying the last event id received by client"""
return (
["SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1"],
[(client.id,)],
0
)
def _build_load_maps_tags(self):
"""Build query and params for updating the tag map"""
return (
[
"ALTER TABLE event_tag_mapping DROP CONSTRAINT event_tag_mapping_tag_id_fk",
"DELETE FROM tags",
"INSERT INTO tags(id, tag) VALUES " +
self._get_comma_perc_n(2, self.tagmap),
'ALTER TABLE event_tag_mapping ADD CONSTRAINT "event_tag_mapping_tag_id_fk" FOREIGN KEY ("tag_id") REFERENCES "tags" ("id")'
],
[(), (), tuple(param for tag, num in self.tagmap.items() for param in (num, tag)), ()],
None
)
def _build_load_maps_cats(self):
"""Build query and params for updating the catetgory map"""
params = []
for cat_subcat, num in self.catmap.items():
catsplit = cat_subcat.split(".", 1)
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
params.extend((num, category, subcategory, cat_subcat))
return (
[
"ALTER TABLE event_category_mapping DROP CONSTRAINT event_category_mapping_category_id_fk",
"DELETE FROM categories",
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES " +
self._get_comma_perc_n(4, self.catmap),
'ALTER TABLE event_category_mapping ADD CONSTRAINT "event_category_mapping_category_id_fk" FOREIGN KEY ("category_id") REFERENCES "categories" ("id")'
],
[(), (), tuple(params), ()],
None
)
def _build_purge_lastlog(self, days):
"""Build query and params for purging stored client last event mapping older than days"""
return (
[
"DELETE FROM last_events "
" USING last_events le LEFT JOIN ("
" SELECT MAX(id) AS last FROM last_events"
" GROUP BY client_id"
" ) AS maxids ON maxids.last=le.id"
" WHERE le.timestamp < CURRENT_DATE - INTERVAL %s DAY AND maxids.last IS NULL"
],
[(str(days),)],
0
)
def _build_purge_events_get_id(self, days):
"""Build query and params to get largest event id of events older than days"""
return (
[
"SELECT MAX(id) as id" "SELECT MAX(id) as id"
" FROM events" " FROM events"
" WHERE received < DATE_SUB(CURDATE(), INTERVAL %s DAY)", " WHERE received < CURRENT_DATE - INTERVAL %s DAY"
(days,) ],
).fetchall()[0]["id"] [(str(days),)],
if id_ is None: 0
return 0 )
affected = db.query("DELETE FROM events WHERE id <= %s", (id_,)).rowcount
db.query("DELETE FROM event_category_mapping WHERE event_id <= %s", (id_,)) def _build_purge_events_events(self, id_):
db.query("DELETE FROM event_tag_mapping WHERE event_id <= %s", (id_,)) """Build query and params to remove events older then days and their mappings"""
return affected return ["DELETE FROM events WHERE id <= %s"], [(id_,)], 0
def expose(read=1, write=0, debug=0): def expose(read=1, write=0, debug=0):
...@@ -1245,7 +1867,7 @@ section_order = ("log", "db", "auth", "validator", "handler", "server") ...@@ -1245,7 +1867,7 @@ section_order = ("log", "db", "auth", "validator", "handler", "server")
# "type" keyword in section may be used to choose other # "type" keyword in section may be used to choose other
section_def = { section_def = {
"log": [FileLogger, SysLogger], "log": [FileLogger, SysLogger],
"db": [MySQL], "db": [MySQL, PostgreSQL],
"auth": [X509NameAuthenticator, PlainAuthenticator, X509Authenticator, X509MixMatchAuthenticator], "auth": [X509NameAuthenticator, PlainAuthenticator, X509Authenticator, X509MixMatchAuthenticator],
"validator": [JSONSchemaValidator, NoValidator], "validator": [JSONSchemaValidator, NoValidator],
"handler": [WardenHandler], "handler": [WardenHandler],
...@@ -1305,8 +1927,22 @@ param_def = { ...@@ -1305,8 +1927,22 @@ param_def = {
"retry_pause": {"type": "natural", "default": 3}, "retry_pause": {"type": "natural", "default": 3},
"retry_count": {"type": "natural", "default": 3}, "retry_count": {"type": "natural", "default": 3},
"event_size_limit": {"type": "natural", "default": 5*1024*1024}, "event_size_limit": {"type": "natural", "default": 5*1024*1024},
"catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_mysql.json")}, "catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
"tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_mysql.json")} "tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
},
PostgreSQL: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"host": {"type": "str", "default": "localhost"},
"user": {"type": "str", "default": "warden"},
"password": {"type": "str", "default": ""},
"dbname": {"type": "str", "default": "warden3"},
"port": {"type": "natural", "default": 5432},
"retry_pause": {"type": "natural", "default": 3},
"retry_count": {"type": "natural", "default": 3},
"event_size_limit": {"type": "natural", "default": 5*1024*1024},
"catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
"tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
}, },
WardenHandler: { WardenHandler: {
"req": {"type": "obj", "default": "req"}, "req": {"type": "obj", "default": "req"},
...@@ -1663,9 +2299,7 @@ def get_args(): ...@@ -1663,9 +2299,7 @@ def get_args():
subargp_loadmaps = subargp.add_parser( subargp_loadmaps = subargp.add_parser(
"loadmaps", add_help=False, "loadmaps", add_help=False,
description=( description=(
"Load 'categories' and 'tags' table from 'catmap_mysql.json' and 'tagmap_mysql.json'." "Load 'categories' and 'tags' table from 'catmap_db.json' and 'tagmap_db.json'."
" Note that this is NOT needed for server at all, load them into db at will,"
" should you need to run your own specific SQL queries on data directly."
" Note also that previous content of both tables will be lost."), " Note also that previous content of both tables will be lost."),
help="load catmap and tagmap into db") help="load catmap and tagmap into db")
subargp_loadmaps.set_defaults(command=load_maps) subargp_loadmaps.set_defaults(command=load_maps)
......