diff --git a/warden_server/warden_server.py b/warden_server/warden_server.py index 8c881e88e52ff713311f4018fbc77f2c57b72f34..39ce25df855ffa5eb9b0142750614e5f3afb61cf 100755 --- a/warden_server/warden_server.py +++ b/warden_server/warden_server.py @@ -16,11 +16,10 @@ import json import re from traceback import format_tb from collections import namedtuple +from itertools import repeat from time import sleep from random import randint import M2Crypto.X509 -import MySQLdb as my -import MySQLdb.cursors as mycursors if sys.version_info[0] >= 3: import configparser as ConfigParser @@ -134,6 +133,17 @@ class Error(Exception): return d +def override_required(method): + def abstract_method(self, *args, **kwargs): + method(self, *args, **kwargs) + raise NotImplementedError( + "Class %s needs to implement the %s() method" % + (type(self).__name__, method.__name__) + ) + abstract_method.__doc__ = method.__doc__ + return abstract_method + + def get_clean_root_logger(level=logging.INFO): """ Attempts to get logging module into clean slate state """ @@ -469,12 +479,13 @@ class JSONSchemaValidator(NoValidator): return res -class MySQL(ObjectBase): +class DataBase(ObjectBase): def __init__( self, req, log, host, user, password, dbname, port, retry_count, retry_pause, event_size_limit, catmap_filename, tagmap_filename): ObjectBase.__init__(self, req, log) + self.host = host self.user = user self.password = password @@ -497,20 +508,16 @@ class MySQL(ObjectBase): self.con = None + @override_required def connect(self): - self.con = my.connect( - host=self.host, user=self.user, passwd=self.password, - db=self.dbname, port=self.port, cursorclass=mycursors.DictCursor) + pass + @override_required def close(self): - try: - if self.con: - self.con.close() - except Exception: - pass - self.con = None + pass - __del__ = close + def __del__(self): + self.close() def repeat(self): """ Allows for graceful repeating of transactions self.retry_count @@ -521,8 +528,7 @@ class MySQL(ObjectBase): for attempt in self.repeat(): with attempt as db: - crs = db.query(...) - # do something with crs + res = db.query_all(...) Note that it's not reentrant (as is not underlying MySQL connection), so avoid nesting on the same MySQL object. @@ -540,8 +546,7 @@ class MySQL(ObjectBase): exception. Can be used with self.repeat(), or alone as: with self as db: - crs = db.query(...) - # do something with crs + res = db.query_all(...) Note that it's not reentrant (as is not underlying MySQL connection), so avoid nesting on the same MySQL object. @@ -550,6 +555,7 @@ class MySQL(ObjectBase): self.retry_attempt = 0 return self + @override_required def __exit__(self, exc_type, exc_val, exc_tb): """ Context manager protocol. If db exception is fired and self.retry_attempt is not zero, it is only logged and @@ -557,110 +563,87 @@ class MySQL(ObjectBase): open transaction is rolled back. In case of no exception, transaction gets commited. """ - if not exc_type: - self.con.commit() - self.retry_attempt = 0 - else: - try: - if self.con: - self.con.rollback() - except my.Error: - pass - try: - self.close() - except my.Error: - pass - if self.retry_attempt: - self.log.info("Database error (%d attempts left): %s %s" % (self.retry_attempt, exc_type.__name__, exc_val)) - return True - def query(self, *args, **kwargs): - if not self.con: - self.connect() - crs = self.con.cursor() - self.log.debug("execute: %s %s" % (args, kwargs)) - crs.execute(*args, **kwargs) - return crs + @override_required + def execute(self, query, params, ret=None): + """Execute the provided queries; discard the result""" - def _get_comma_perc(self, l): - return ','.join(['%s'] * len(l)) + @override_required + def query_all(self, query, params, ret=-1): + """Execute the provided queries; return list of all rows as dicts of the ret-th query (0 based)""" - def _get_not(self, b): - return "" if b else "NOT" + @override_required + def query_one(self, query, prams, ret=-1): + """Execute the provided queries; return the first result of the ret-th query (0 based)""" + + @override_required + def query_rowcount(self, query, params, ret=-1): + """Execute provided query; return the number of affected rows or the number of returned rows of the ret-th query (0 based)""" + + @override_required + def _build_get_client_by_name(self, cert_names, name, secret): + """Build query and params for client lookup""" def get_client_by_name(self, cert_names=None, name=None, secret=None): - query = ["SELECT * FROM clients WHERE valid = 1"] - params = [] - if name: - query.append(" AND name = %s") - params.append(name.lower()) - if secret: - query.append(" AND secret = %s") - params.append(secret) - if cert_names: - query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names)) - params.extend(n.lower() for n in cert_names) + query, params, ret = self._build_get_client_by_name(cert_names, name, secret) for attempt in self.repeat(): with attempt as db: - rows = db.query("".join(query), params).fetchall() + rows = db.query_all(query, params, ret) if len(rows) > 1: self.log.warning( - "get_client_by_name: query returned more than one result (cert_names = %s, name = %s, secret = %s): %s" % ( - cert_names, name, secret, ", ".join([str(Client(**row)) for row in rows]))) + "get_client_by_name: query returned more than one result (cert_names = %s, name = %s, secret = %s): %s" % + (cert_names, name, secret, ", ".join([str(Client(**row)) for row in rows])) + ) return None return Client(**rows[0]) if rows else None + @override_required + def _build_get_clients(self, id): + """Build query and params for client lookup by id""" + def get_clients(self, id=None): - query = ["SELECT * FROM clients"] - params = [] - if id: - query.append("WHERE id = %s") - params.append(id) - query.append("ORDER BY id") + query, params, ret = self._build_get_clients(id) + for attempt in self.repeat(): with attempt as db: - rows = db.query(" ".join(query), params).fetchall() + rows = db.query_all(query, params, ret=ret) return [Client(**row) for row in rows] + @override_required + def _build_add_modify_client(self, id, **kwargs): + """Build query and params for adding/modifying client""" + def add_modify_client(self, id=None, **kwargs): - query = [] - params = [] - uquery = [] - if id is None: - query.append("INSERT INTO clients SET") - uquery.append("registered = now()") - else: - query.append("UPDATE clients SET") - for attr in set(Client._fields) - set(["id", "registered"]): - val = kwargs.get(attr, None) - if val is not None: - if attr == "secret" and val == "": # disable secret - val = None - uquery.append("`%s` = %%s" % attr) - params.append(val) - if not uquery: + if id is not None and all(kwargs.get(attr, None) is None for attr in set(Client._fields) - {"id", "registered"}): return id - query.append(", ".join(uquery)) - if id is not None: - query.append("WHERE id = %s") - params.append(id) + + query, params, ret = self._build_add_modify_client(id, **kwargs) + for attempt in self.repeat(): with attempt as db: - crs = db.query(" ".join(query), params) - newid = crs.lastrowid if id is None else id + res_id = db.query_one(query, params, ret=ret)["id"] + newid = res_id if id is None else id return newid + @override_required + def _build_get_debug_version(self): + pass + + @override_required + def _build_get_debug_tablestat(self): + pass + def get_debug(self): + vquery, vparams, vret = self._build_get_debug_version() + tquery, tparams, tret = self._build_get_debug_tablestat() for attempt in self.repeat(): with attempt as db: - rows = db.query("SELECT VERSION() AS VER").fetchall() - tablestat = db.query("SHOW TABLE STATUS").fetchall() return { - "db": "MySQL", - "version": rows[0]["VER"], - "tables": tablestat + "db": type(self).__name__, + "version": db.query_one(vquery, vparams, vret)["version"], + "tables": db.query_all(tquery, tparams, tret) } def getMaps(self, section, variables): @@ -671,10 +654,21 @@ class MySQL(ObjectBase): except KeyError: raise self.req.error( message="Wrong tag or category used in query.", - error=422, exc=sys.exc_info(), key=v) + error=422, exc=sys.exc_info(), key=v + ) maps.append(mapped) return set(maps) # unique + @override_required + def _build_fetch_events( + self, client, id, count, + cat, nocat, tag, notag, group, nogroup): + """Build query and params for fetching events based on id, count and category, tag and group filters""" + + @override_required + def _load_event_json(self, data): + """Return decoded json from data loaded from database, if unable to decode, return None""" + def fetch_events( self, client, id, count, cat=None, nocat=None, @@ -694,43 +688,16 @@ class MySQL(ObjectBase): message="Unrealizable conditions. Choose group or nogroup option.", error=422, group=group, nogroup=nogroup) - query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"] - params = [id or 0] - - if cat or nocat: - cats = self.getMaps(self.catmap, (cat or nocat)) - query.append( - " AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" % ( - self._get_not(cat), self._get_comma_perc(cats))) - params.extend(cats) - - if tag or notag: - tags = self.getMaps(self.tagmap, (tag or notag)) - query.append( - " AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" % ( - self._get_not(tag), self._get_comma_perc(tags))) - params.extend(tags) - - if group or nogroup: - subquery = [] - for name in (group or nogroup): - escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE - subquery.append("c.name = %s") # exact client - params.append(name) - subquery.append("c.name LIKE CONCAT(%s, '.%%') ESCAPE '&'") # whole subtree - params.append(escaped_name) - - query.append(" AND %s (%s)" % (self._get_not(group), " OR ".join(subquery))) - - query.append(" AND e.valid = 1 LIMIT %s") - params.append(count) - - query_string = "".join(query) - + query, params, ret = self._build_fetch_events( + client, id, count, + cat, nocat, + tag, notag, + group, nogroup + ) row = None for attempt in self.repeat(): with attempt as db: - row = db.query(query_string, params).fetchall() + row = db.query_all(query, params, ret=ret) if row: maxid = max(r['id'] for r in row) @@ -739,9 +706,8 @@ class MySQL(ObjectBase): events = [] for r in row: - try: - e = json.loads(r["data"]) - except Exception: + e = self._load_event_json(r["data"]) + if e is None: # null cannot be valid event JSON # Note that we use Error object just for proper formatting, # but do not raise it; from client perspective invalid # events get skipped silently. @@ -749,111 +715,476 @@ class MySQL(ObjectBase): message="Unable to deserialize JSON event from db, id=%s" % r["id"], error=500, exc=sys.exc_info(), id=r["id"]) err.log(self.log, prio=logging.WARNING) - events.append(e) - + else: + events.append(e) return { "lastid": maxid, "events": events } + @override_required + def _build_store_events_event(self, client, event, raw_event): + """Build query and params for event insertion""" + + @override_required + def _build_store_events_categories(self, event_id, cat_ids): + """Build query and params for insertion of event-categories mapping""" + + @override_required + def _build_store_events_tags(self, event_id, tag_ids): + """Build query and params for insertion of event-tags mapping""" + def store_events(self, client, events, events_raw): try: for attempt in self.repeat(): with attempt as db: for event, raw_event in zip(events, events_raw): - lastid = db.query( - "INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)", - (client.id, raw_event)).lastrowid + equery, eparams, eret = self._build_store_events_event(client, event, raw_event) + lastid = db.query_one(equery, eparams, ret=eret)["id"] catlist = event.get('Category', ["Other"]) - cats = set(catlist) | set(cat.split(".", 1)[0] for cat in catlist) - for cat in cats: - cat_id = self.catmap.get(cat, self.catmap_other) - db.query("INSERT INTO event_category_mapping (event_id,category_id) VALUES (%s, %s)", (lastid, cat_id)) + cats = set(catlist) | {cat.split(".", 1)[0] for cat in catlist} + cat_ids = [self.catmap.get(cat, self.catmap_other) for cat in cats] + cquery, cparams, _ = self._build_store_events_categories(lastid, cat_ids) + db.execute(cquery, cparams) nodes = event.get('Node', []) - tags = [] - for node in nodes: - tags.extend(node.get('Type', [])) - for tag in set(tags): - tag_id = self.tagmap.get(tag, self.tagmap_other) - db.query("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES (%s, %s)", (lastid, tag_id)) + tags = {tag for node in nodes for tag in node.get('Type', [])} + if tags: + tag_ids = [self.tagmap.get(tag, self.tagmap_other) for tag in tags] + tquery, tparams, _ = self._build_store_events_tags(lastid, tag_ids) + db.execute(tquery, tparams) + return [] except Exception as e: exception = self.req.error(message="DB error", error=500, exc=sys.exc_info(), env=self.req.env) exception.log(self.log) return [{"error": 500, "message": "DB error %s" % type(e).__name__}] + @override_required + def _build_insert_last_received_id(self, client, id): + """Build query and params for insertion of the last event id received by client""" + def insertLastReceivedId(self, client, id): self.log.debug("insertLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname)) + query, params, _ = self._build_insert_last_received_id(client, id) for attempt in self.repeat(): with attempt as db: - db.query("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())", (client.id, id)) + db.execute(query, params) + + @override_required + def _build_get_last_event_id(self): + """Build query and params for querying the id of the last inserted event""" def getLastEventId(self): + query, params, ret = self._build_get_last_event_id() for attempt in self.repeat(): with attempt as db: - row = db.query("SELECT MAX(id) as id FROM events").fetchall()[0] - return row['id'] or 1 + id_ = db.query_one(query, params, ret=ret)["id"] + return id_ or 1 + + @override_required + def _build_get_last_received_id(self, client): + """Build query and params for querying the last event id received by client""" def getLastReceivedId(self, client): + query, params, ret = self._build_get_last_received_id(client) for attempt in self.repeat(): with attempt as db: - res = db.query( - "SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1", - (client.id,)).fetchall() - try: - row = res[0] - except IndexError: + res = db.query_one(query, params, ret=ret) + + if res is None: id = None - self.log.debug("getLastReceivedId: probably first access, unable to get id for client %i(%s)" % ( - client.id, client.hostname)) + self.log.debug("getLastReceivedId: probably first access, unable to get id for client %i(%s)" % + (client.id, client.hostname)) else: - id = row["id"] - self.log.debug("getLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname)) + id = res["id"] or 1 + self.log.debug("getLastReceivedId: id %i for client %i(%s)" % + (id, client.id, client.hostname)) return id + @override_required + def _build_load_maps_tags(self): + """Build query and params for updating the tag map""" + + @override_required + def _build_load_maps_cats(self): + """Build query and params for updating the catetgory map""" + def load_maps(self): + tquery, tparams, _ = self._build_load_maps_tags() + cquery, cparams, _ = self._build_load_maps_cats() with self as db: - db.query("DELETE FROM tags") - for tag, num in self.tagmap.items(): - db.query("INSERT INTO tags(id, tag) VALUES (%s, %s)", (num, tag)) - db.query("DELETE FROM categories") - for cat_subcat, num in self.catmap.items(): - catsplit = cat_subcat.split(".", 1) - category = catsplit[0] - subcategory = catsplit[1] if len(catsplit) > 1 else None - db.query( - "INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES (%s, %s, %s, %s)", - (num, category, subcategory, cat_subcat)) + db.execute(tquery, tparams) + db.execute(cquery, cparams) + + @override_required + def _build_purge_lastlog(self, days): + """Build query and params for purging stored client last event mapping older than days""" def purge_lastlog(self, days): + query, params, ret = self._build_purge_lastlog(days) with self as db: - return db.query( + return db.query_rowcount(query, params, ret=ret) + + @override_required + def _build_purge_events_get_id(self, days): + """Build query and params to get largest event id of events older than days""" + + @override_required + def _build_purge_events_events(self, id_): + """Build query and params to remove events older then days and their mappings""" + + def purge_events(self, days): + iquery, iparams, iret = self._build_purge_events_get_id(days) + with self as db: + id_ = db.query_one(iquery, iparams, ret=iret)["id"] + if id_ is None: + return 0 + equery, eparams, eret = self._build_purge_events_events(id_) + affected = db.query_rowcount(equery, eparams, ret=eret) + return affected + + +class DataBaseAPIv2(DataBase): + + def __init__(self, req, log, host, user, password, dbname, port, retry_count, + retry_pause, event_size_limit, catmap_filename, tagmap_filename): + + super().__init__(req, log, host, user, password, dbname, port, retry_count, + retry_pause, event_size_limit, catmap_filename, tagmap_filename) + + self.db = None + self.con = None + + def close(self): + try: + if self.con: + self.con.close() + except Exception: + pass + self.con = None + + def __exit__(self, exc_type, exc_val, exc_tb): + """ Context manager protocol. If db exception is fired and + self.retry_attempt is not zero, it is only logged and + does not propagate, otherwise it propagates up. Also + open transaction is rolled back. + In case of no exception, transaction gets commited. + """ + if exc_type is None: + self.con.commit() + self.retry_attempt = 0 + else: + try: + if self.con is not None: + self.con.rollback() + except self.db.Error: + pass + try: + self.close() + except self.db.Error: + pass + if self.retry_attempt > 0: + self.log.info("Database error (%d attempts left): %s %s" % + (self.retry_attempt, exc_type.__name__, exc_val)) + return True + + def _query(self, *args, **kwargs): + if not self.con: + self.connect() + crs = self.con.cursor() + self.log.debug("execute: %s %s" % (args, kwargs)) + crs.execute(*args, **kwargs) + return crs + + def _query_multiple(self, query, params, ret, fetch): + res = None + for n, (q, p) in enumerate(zip(query, params)): + cur = self._query(q, p) + if n == ret: + res = fetch(cur) + if ret == -1: # fetch the result of the last query + res = fetch(cur) + return res + + def execute(self, query, params, ret=None): + """Execute the provided queries; discard the result""" + self._query_multiple(query, params, None, None) + + def query_all(self, query, params, ret=-1): + """Execute the provided queries; return list of all rows as dicts of the ret-th query (0 based)""" + return self._query_multiple(query, params, ret, lambda cur: cur.fetchall()) + + def query_one(self, query, params, ret=-1): + """Execute the provided queries; return the first result of the ret-th query (0 based)""" + return self._query_multiple(query, params, ret, lambda cur: cur.fetchone()) + + def query_rowcount(self, query, params, ret=-1): + """Execute provided query; return the number of affected rows or the number of returned rows of the ret-th query (0 based)""" + return self._query_multiple(query, params, ret, lambda cur: cur.rowcount) + + def _get_comma_perc(self, l): + return ",".join(repeat("%s", l if isinstance(l, int) else len(l))) + + def _get_comma_perc_n(self, n, l): + return ", ".join(repeat("(%s)" % self._get_comma_perc(n), len(l))) + + def _get_not(self, b): + return "" if b else "NOT" + + +class MySQL(DataBaseAPIv2): + + def __init__( + self, req, log, host, user, password, dbname, port, retry_count, + retry_pause, event_size_limit, catmap_filename, tagmap_filename): + + super().__init__(req, log, host, user, password, dbname, port, retry_count, + retry_pause, event_size_limit, catmap_filename, tagmap_filename) + + import MySQLdb as db + import MySQLdb.cursors as mycursors + self.db = db + self.mycursors = mycursors + + def connect(self): + self.con = self.db.connect( + host=self.host, user=self.user, passwd=self.password, + db=self.dbname, port=self.port, cursorclass=self.mycursors.DictCursor) + + def _build_get_client_by_name(self, cert_names=None, name=None, secret=None): + """Build query and params for client lookup""" + query = ["SELECT * FROM clients WHERE valid = 1"] + params = [] + if name: + query.append(" AND name = %s") + params.append(name.lower()) + if secret: + query.append(" AND secret = %s") + params.append(secret) + if cert_names: + query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names)) + params.extend(n.lower() for n in cert_names) + + return ["".join(query)], [params], 0 + + def _build_get_clients(self, id): + """Build query and params for client lookup by id""" + query = ["SELECT * FROM clients"] + params = [] + if id: + query.append("WHERE id = %s") + params.append(id) + query.append("ORDER BY id") + + return [" ".join(query)], [params], 0 + + def _build_add_modify_client(self, id, **kwargs): + """Build query and params for adding/modifying client""" + query = [] + params = [] + uquery = [] + if id is None: + query.append("INSERT INTO clients SET") + uquery.append("registered = now()") + else: + query.append("UPDATE clients SET") + for attr in set(Client._fields) - set(["id", "registered"]): + val = kwargs.get(attr, None) + if val is not None: # guaranteed at least one is not None + if attr == "secret" and val == "": # disable secret + val = None + uquery.append("`%s` = %%s" % attr) + params.append(val) + + query.append(", ".join(uquery)) + if id is not None: + query.append("WHERE id = %s") + params.append(id) + return ( + [" ".join(query), 'SELECT LAST_INSERT_ID() AS id'], + [params, []], + 1 + ) + + def _build_get_debug_version(self): + return ["SELECT VERSION() AS version"], [()], 0 + + def _build_get_debug_tablestat(self): + return ["SHOW TABLE STATUS"], [()], 0 + + def _load_event_json(self, data): + """Return decoded json from data loaded from database, if unable to decode, return None""" + try: + return json.loads(data) + except Exception: + return None + + def _build_fetch_events( + self, client, id, count, + cat, nocat, tag, notag, group, nogroup): + query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"] + params = [id or 0] + + if cat or nocat: + cats = self.getMaps(self.catmap, (cat or nocat)) + query.append( + " AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" % + (self._get_not(cat), self._get_comma_perc(cats)) + ) + params.extend(cats) + + if tag or notag: + tags = self.getMaps(self.tagmap, (tag or notag)) + query.append( + " AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" % + (self._get_not(tag), self._get_comma_perc(tags)) + ) + params.extend(tags) + + if group or nogroup: + subquery = [] + for name in (group or nogroup): + escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE + subquery.append("c.name = %s") # exact client + params.append(name) + subquery.append("c.name LIKE CONCAT(%s, '.%%') ESCAPE '&'") # whole subtree + params.append(escaped_name) + + query.append(" AND %s (%s)" % + (self._get_not(group), " OR ".join(subquery))) + + query.append(" AND e.valid = 1 LIMIT %s") + params.append(count) + + return ["".join(query)], [params], 0 + + def _build_store_events_event(self, client, event, raw_event): + """Build query and params for event insertion""" + return ( + [ + "INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)", + "SELECT LAST_INSERT_ID() AS id" + ], + [(client.id, raw_event), ()], + 1 + ) + + def _build_store_events_categories(self, event_id, cat_ids): + """Build query and params for insertion of event-categories mapping""" + return ( + ["INSERT INTO event_category_mapping (event_id,category_id) VALUES " + + self._get_comma_perc_n(2, cat_ids)], + [tuple(param for cat_id in cat_ids for param in (event_id, cat_id))], + None + ) + + def _build_store_events_tags(self, event_id, tag_ids): + """Build query and params for insertion of event-tags mapping""" + return ( + ["INSERT INTO event_tag_mapping (event_id,tag_id) VALUES " + + self._get_comma_perc_n(2, tag_ids)], + [tuple(param for tag_id in tag_ids for param in (event_id, tag_id))], + None + ) + + def _build_insert_last_received_id(self, client, id): + """Build query and params for insertion of the last event id received by client""" + return ( + ["INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())"], + [(client.id, id)], + None + ) + + def _build_get_last_event_id(self): + """Build query and params for querying the id of the last inserted event""" + return ["SELECT MAX(id) as id FROM events"], [()], 0 + + def _build_get_last_received_id(self, client): + """Build query and params for querying the last event id received by client""" + return ( + ["SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1"], + [(client.id,)], + 0 + ) + + def _build_load_maps_tags(self): + """Build query and params for updating the tag map""" + return ( + [ + "DELETE FROM tags", + "INSERT INTO tags(id, tag) VALUES " + + self._get_comma_perc_n(2, self.tagmap) + ], + [ + (), + tuple(param for tag, num in self.tagmap.items() for param in (num, tag)) + ], + None + ) + + def _build_load_maps_cats(self): + """Build query and params for updating the catetgory map""" + params = [] + for cat_subcat, num in self.catmap.items(): + catsplit = cat_subcat.split(".", 1) + category = catsplit[0] + subcategory = catsplit[1] if len(catsplit) > 1 else None + params.extend((num, category, subcategory, cat_subcat)) + + return ( + [ + "DELETE FROM categories", + "INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES " + + self._get_comma_perc_n(4, self.catmap) + ], + [ + (), + tuple(params) + ], + None + ) + + def _build_purge_lastlog(self, days): + """Build query and params for purging stored client last event mapping older than days""" + return ( + [ "DELETE FROM last_events " " USING last_events LEFT JOIN (" " SELECT MAX(id) AS last FROM last_events" " GROUP BY client_id" " ) AS maxids ON last=id" " WHERE timestamp < DATE_SUB(CURDATE(), INTERVAL %s DAY) AND last IS NULL", - (days,)).rowcount - - def purge_events(self, days): - with self as db: - affected = 0 - id_ = db.query( + ], + [(days,)], + 0 + ) + + def _build_purge_events_get_id(self, days): + """Build query and params to get largest event id of events older than days""" + return ( + [ "SELECT MAX(id) as id" " FROM events" - " WHERE received < DATE_SUB(CURDATE(), INTERVAL %s DAY)", - (days,) - ).fetchall()[0]["id"] - if id_ is None: - return 0 - affected = db.query("DELETE FROM events WHERE id <= %s", (id_,)).rowcount - db.query("DELETE FROM event_category_mapping WHERE event_id <= %s", (id_,)) - db.query("DELETE FROM event_tag_mapping WHERE event_id <= %s", (id_,)) - return affected + " WHERE received < DATE_SUB(CURDATE(), INTERVAL %s DAY)" + ], + [(days,)], + 0 + ) + + def _build_purge_events_events(self, id_): + """Build query and params to remove events older then days and their mappings""" + return ( + [ + "DELETE FROM event_category_mapping WHERE event_id <= %s", + "DELETE FROM event_tag_mapping WHERE event_id <= %s", + "DELETE FROM events WHERE id <= %s", + ], + [(id_,), (id_,), (id_,)], + 2 + ) def expose(read=1, write=0, debug=0):