From 93b7cd4213df6ba48b6be0c989ea2603e3472b60 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jakub=20Malo=C5=A1t=C3=ADk?= <malostik@cesnet.cz>
Date: Tue, 30 Aug 2022 14:34:05 +0200
Subject: [PATCH] Add PostgreSQL option

---
 warden_server/README           |  40 ++++-
 warden_server/warden_server.py | 309 ++++++++++++++++++++++++++++++++-
 2 files changed, 339 insertions(+), 10 deletions(-)

diff --git a/warden_server/README b/warden_server/README
index ec4adc0..7782626 100644
--- a/warden_server/README
+++ b/warden_server/README
@@ -42,13 +42,13 @@ B. Dependencies
 
  2. Python modules
 
-    python-mysqldb 5.3.3+
+    python-mysqldb 5.3.3+ | python-psycopg2 2.8.6+
     python-m2crypto 0.20+
     jsonschema 2.4+
 
  3. Database
 
-    MySQL | MariaDB >= 5.5
+    MySQL | MariaDB >= 5.5 | PostgreSQL >= 13
 
 ------------------------------------------------------------------------------
 C. Installation
@@ -70,14 +70,28 @@ C. Installation
    > GRANT ALL ON warden3.* TO `warden`@`localhost`;
    > FLUSH PRIVILEGES;
 
+   # psql
+
+   > CREATE DATABASE warden3 ENCODING 'UTF-8';
+   > CREATE ROLE "warden" LOGIN PASSWORD 'example';
+   > GRANT ALL ON DATABASE "warden3" TO "warden";
+
  * Create necessary table structure
 
    mysql -p -u warden warden3 < warden_3.0_mysql.sql
 
+   or
+
+   psql -U warden -h localhost warden3 < warden_3.0_postgres.sql
+
  * Get up to date Idea schema
 
    wget -O warden_server/idea.schema https://idea.cesnet.cz/_media/en/idea0.schema
 
+ * Load category and tag maps into database (This step is optional for MySQL dbms)
+
+   ./warden_server.py loadmaps
+
  * Enable mod_wsgi, mod_ssl, include Warden configuration
 
    This depends heavily on your distribution and Apache configuration.
@@ -129,7 +143,7 @@ particular implementation object of the aspect, for example type of logger
    Sections and their "type" objects can be:
 
 		Log: FileLogger, SysLogger
-		DB: MySQL
+		DB: MySQL, PostgreSQL
 		Auth: X509Authenticator, X509NameAuthenticator,
               X509MixMatchAuthenticator,PlainAuthenticator
 		Validator: JSONSchemaValidator, NoValidator
@@ -190,6 +204,20 @@ object from particular section list is used ("FileLogger" for example).
       tagmap_filename": IDEA node type mapping to database ids, defaults to
                         "tagmap_db.json" at installation directory
 
+   PostgreSQL: database storage backend
+      host: database server host, default "localhost"
+      user: database user, default "warden"
+      password: database password
+      dbname: database name, default "warden3"
+      port: database server port, default 5432
+      retry_pause: retry in case of database errors, in seconds, defaults to 5
+      retry_count: number of retries, defaults to 3
+      event_size_limit: max size of serialized event, defaults to 5 MB
+      catmap_filename: IDEA category mapping to database ids, defaults to
+                       "catmap_db.json" at installation directory
+      tagmap_filename": IDEA node type mapping to database ids, defaults to
+                        "tagmap_db.json" at installation directory
+
    WardenHandler: Main Warden RPC worker
       send_events_limit: max events sent in one bunch, defaults to 10000
       get_events_limit: max events received in one bunch, defaults to 10000
@@ -286,10 +314,8 @@ warden_server.py purge [--help] [-l] [-e] [-d DAYS]
 warden_server.py loadmaps [--help]
 
    Load 'categories' and 'tags' table from 'catmap_db.json' and
-   'tagmap_db.json'. Note that this is NOT needed for server at all, load
-   them into db at will, should you need to run your own specific SQL queries
-   on data directly. Note also that previous content of both tables will be
-   lost.
+   'tagmap_db.json'. Note also that previous content of both tables
+   will be lost.
 
 ------------------------------------------------------------------------------
 Copyright (C) 2011-2015 Cesnet z.s.p.o
diff --git a/warden_server/warden_server.py b/warden_server/warden_server.py
index 39ce25d..3ce6deb 100755
--- a/warden_server/warden_server.py
+++ b/warden_server/warden_server.py
@@ -1187,6 +1187,297 @@ class MySQL(DataBaseAPIv2):
         )
 
 
+class PostgreSQL(DataBaseAPIv2):
+
+    def __init__(
+            self, req, log, host, user, password, dbname, port, retry_count,
+            retry_pause, event_size_limit, catmap_filename, tagmap_filename):
+
+        super().__init__(req, log, host, user, password, dbname, port, retry_count,
+            retry_pause, event_size_limit, catmap_filename, tagmap_filename)
+
+        import psycopg2 as db
+        from psycopg2 import sql as ppgsql
+        import psycopg2.extras as ppgextra
+        self.db = db
+        self.ppgsql = ppgsql
+        self.ppgextra = ppgextra
+
+    def connect(self):
+        self.con = self.db.connect(
+            host=self.host, user=self.user, password=self.password,
+            dbname=self.dbname, port=self.port, cursor_factory=self.ppgextra.RealDictCursor)
+
+    def _build_get_client_by_name(self, cert_names=None, name=None, secret=None):
+        """Build query and params for client lookup"""
+        query = ["SELECT * FROM clients WHERE valid = 1"]
+        params = []
+        if name:
+            query.append(" AND name = %s")
+            params.append(name.lower())
+        if secret:
+            query.append(" AND secret = %s")
+            params.append(secret)
+        if cert_names:
+            query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
+            params.extend(n.lower() for n in cert_names)
+
+        return ["".join(query)], [params], 0
+
+    def _build_get_clients(self, id):
+        """Build query and params for client lookup by id"""
+        query = ["SELECT * FROM clients"]
+        params = []
+        if id:
+            query.append("WHERE id = %s")
+            params.append(id)
+        query.append("ORDER BY id")
+
+        return [" ".join(query)], [params], 0
+
+    def _build_add_modify_client(self, id, **kwargs):
+        """Build query and params for adding/modifying client"""
+        fields = set(Client._fields) - {"id", "registered"}
+        cols, params = map(
+            list,
+            zip(
+                *(
+                    (k, None)  # disable secret
+                    if k == "secret" and v == "" else
+                    (k, v)
+                    for k, v in kwargs.items()
+                    if v is not None and k in fields
+                )
+            )
+        )
+
+        if id is None:
+            query = self.ppgsql.SQL('INSERT INTO clients ("registered", {}) VALUES (NOW(), {}) RETURNING id').format(
+                self.ppgsql.SQL(", ").join(map(self.ppgsql.Identifier, cols)),
+                self.ppgsql.SQL(", ").join(self.ppgsql.Placeholder() * len(cols))
+            )
+        elif not cols:
+            return ["SELECT %s AS id"], [(id,)], 0
+        else:
+            query = self.ppgsql.SQL("UPDATE clients SET {} WHERE id = {} RETURNING id").format(
+                self.ppgsql.SQL(", ").join(
+                    self.ppgsql.SQL("{} = {}").format(
+                        self.ppgsql.Identifier(col),
+                        self.ppgsql.Placeholder()
+                    ) for col in cols
+                ),
+                self.ppgsql.Placeholder()
+            )
+            params.append(id)
+
+        return [query], [params], 0
+
+    def _build_get_debug_version(self):
+        return ["SELECT setting AS version FROM pg_settings WHERE name = 'server_version'"], [()], 0
+
+    def _build_get_debug_tablestat(self):
+        return [
+            "SELECT "
+                'tablename AS "Name", '
+                'relnatts AS "Columns", '
+                'n_live_tup AS "Rows", '
+                'pg_catalog.pg_size_pretty(pg_catalog.pg_table_size(oid)) AS "Table_size", '
+                'pg_catalog.pg_size_pretty(pg_catalog.pg_indexes_size(oid)) AS "Index_size", '
+                'coll.collations AS "Collations" '
+            "FROM "
+                "pg_catalog.pg_tables tbls "
+                "LEFT OUTER JOIN pg_catalog.pg_class cls "
+                "ON tbls.tablename=cls.relname "
+                "LEFT OUTER JOIN pg_catalog.pg_stat_user_tables sut "
+                "ON tbls.tablename=sut.relname "
+                "LEFT OUTER JOIN ("
+                    "SELECT "
+                        "table_name, "
+                        "string_agg("
+                            "DISTINCT COALESCE("
+                                "collation_name, "
+                                "("
+                                    "SELECT "
+                                        "datcollate "
+                                    "FROM "
+                                        "pg_catalog.pg_database "
+                                    "WHERE "
+                                        "datname=%s"
+                                ")"
+                            "), "
+                            "','"
+                        ") AS collations "
+                    "FROM "
+                        "information_schema.columns "
+                    "GROUP BY "
+                        "table_name"
+                ") coll "
+                "ON tbls.tablename=coll.table_name "
+            "WHERE "
+                "tbls.schemaname='public' "
+                "AND tbls.tableowner=%s"
+        ], [(self.dbname, self.user)], 0
+
+    def _load_event_json(self, data):
+        """Return decoded json from data loaded from database, if unable to decode, return None"""
+        try:
+            return json.loads(data.tobytes())
+        except Exception:
+            return None
+
+    def _build_fetch_events(
+            self, client, id, count,
+            cat, nocat, tag, notag, group, nogroup):
+
+        query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
+        params = [id or 0]
+
+        if cat or nocat:
+            cats = self.getMaps(self.catmap, (cat or nocat))
+            query.append(
+                " AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" %
+                (self._get_not(cat), self._get_comma_perc(cats))
+            )
+            params.extend(cats)
+
+        if tag or notag:
+            tags = self.getMaps(self.tagmap, (tag or notag))
+            query.append(
+                " AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" %
+                (self._get_not(tag), self._get_comma_perc(tags))
+            )
+            params.extend(tags)
+
+        if group or nogroup:
+            subquery = []
+            for name in group or nogroup:
+                name = name.lower()  # assumes only lowercase names
+                escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%")  # escape for LIKE
+                subquery.append("c.name = %s")                          # exact client
+                params.append(name)
+                subquery.append("c.name LIKE %s || '.%%' ESCAPE '&'")   # whole subtree
+                params.append(escaped_name)
+
+            query.append(" AND %s (%s)" % (self._get_not(group), " OR ".join(subquery)))
+
+        query.append(" AND e.valid = 1 LIMIT %s")
+        params.append(count)
+
+        return ["".join(query)], [params], 0
+
+    def _build_store_events_event(self, client, event, raw_event):
+        """Build query and params for event insertion"""
+        return (
+            ["INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s) RETURNING id"],
+            [(client.id, raw_event)],
+            0
+        )
+
+    def _build_store_events_categories(self, event_id, cat_ids):
+        """Build query and params for insertion of event-categories mapping"""
+        return (
+            ["INSERT INTO event_category_mapping (event_id,category_id) VALUES " +
+                self._get_comma_perc_n(2, cat_ids)],
+            [tuple(param for cat_id in cat_ids for param in (event_id, cat_id))],
+            None
+        )
+
+    def _build_store_events_tags(self, event_id, tag_ids):
+        """Build query and params for insertion of event-tags mapping"""
+        return (
+            ["INSERT INTO event_tag_mapping (event_id,tag_id) VALUES " +
+                self._get_comma_perc_n(2, tag_ids)],
+            [tuple(param for tag_id in tag_ids for param in (event_id, tag_id))],
+            None
+        )
+
+    def _build_insert_last_received_id(self, client, id):
+        """Build query and params for insertion of the last event id received by client"""
+        return (
+            ["INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())"],
+            [(client.id, None if id == 1 else id)],
+            None
+        )
+
+    def _build_get_last_event_id(self):
+        """Build query and params for querying the id of the last inserted event"""
+        return ["SELECT MAX(id) as id FROM events"], [()], 0
+
+    def _build_get_last_received_id(self, client):
+        """Build query and params for querying the last event id received by client"""
+        return (
+            ["SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1"],
+            [(client.id,)],
+            0
+        )
+
+    def _build_load_maps_tags(self):
+        """Build query and params for updating the tag map"""
+        return (
+            [
+                "ALTER TABLE event_tag_mapping DROP CONSTRAINT event_tag_mapping_tag_id_fk",
+                "DELETE FROM tags",
+                "INSERT INTO tags(id, tag) VALUES " +
+                    self._get_comma_perc_n(2, self.tagmap),
+                'ALTER TABLE event_tag_mapping ADD CONSTRAINT "event_tag_mapping_tag_id_fk" FOREIGN KEY ("tag_id") REFERENCES "tags" ("id")'
+            ],
+            [(), (), tuple(param for tag, num in self.tagmap.items() for param in (num, tag)), ()],
+            None
+        )
+
+    def _build_load_maps_cats(self):
+        """Build query and params for updating the catetgory map"""
+        params = []
+        for cat_subcat, num in self.catmap.items():
+            catsplit = cat_subcat.split(".", 1)
+            category = catsplit[0]
+            subcategory = catsplit[1] if len(catsplit) > 1 else None
+            params.extend((num, category, subcategory, cat_subcat))
+
+        return (
+            [
+                "ALTER TABLE event_category_mapping DROP CONSTRAINT event_category_mapping_category_id_fk",
+                "DELETE FROM categories",
+                "INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES " +
+                    self._get_comma_perc_n(4, self.catmap),
+                'ALTER TABLE event_category_mapping ADD CONSTRAINT "event_category_mapping_category_id_fk" FOREIGN KEY ("category_id") REFERENCES "categories" ("id")'
+            ],
+            [(), (), tuple(params), ()],
+            None
+        )
+
+    def _build_purge_lastlog(self, days):
+        """Build query and params for purging stored client last event mapping older than days"""
+        return (
+            [
+                "DELETE FROM last_events "
+                " USING last_events le LEFT JOIN ("
+                "    SELECT MAX(id) AS last FROM last_events"
+                "    GROUP BY client_id"
+                " ) AS maxids ON maxids.last=le.id"
+                " WHERE le.timestamp < CURRENT_DATE - INTERVAL %s DAY AND maxids.last IS NULL"
+            ],
+            [(str(days),)],
+            0
+        )
+
+    def _build_purge_events_get_id(self, days):
+        """Build query and params to get largest event id of events older than days"""
+        return (
+            [
+                "SELECT MAX(id) as id"
+                "  FROM events"
+                "  WHERE received < CURRENT_DATE - INTERVAL %s DAY"
+            ],
+            [(str(days),)],
+            0
+        )
+
+    def _build_purge_events_events(self, id_):
+        """Build query and params to remove events older then days and their mappings"""
+        return ["DELETE FROM events WHERE id <= %s"], [(id_,)], 0
+
+
 def expose(read=1, write=0, debug=0):
 
     def expose_deco(meth):
@@ -1576,7 +1867,7 @@ section_order = ("log", "db", "auth", "validator", "handler", "server")
 # "type" keyword in section may be used to choose other
 section_def = {
     "log": [FileLogger, SysLogger],
-    "db": [MySQL],
+    "db": [MySQL, PostgreSQL],
     "auth": [X509NameAuthenticator, PlainAuthenticator, X509Authenticator, X509MixMatchAuthenticator],
     "validator": [JSONSchemaValidator, NoValidator],
     "handler": [WardenHandler],
@@ -1639,6 +1930,20 @@ param_def = {
         "catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
         "tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
     },
+    PostgreSQL: {
+        "req": {"type": "obj", "default": "req"},
+        "log": {"type": "obj", "default": "log"},
+        "host": {"type": "str", "default": "localhost"},
+        "user": {"type": "str", "default": "warden"},
+        "password": {"type": "str", "default": ""},
+        "dbname": {"type": "str", "default": "warden3"},
+        "port": {"type": "natural", "default": 5432},
+        "retry_pause": {"type": "natural", "default": 3},
+        "retry_count": {"type": "natural", "default": 3},
+        "event_size_limit": {"type": "natural", "default": 5*1024*1024},
+        "catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
+        "tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
+    },
     WardenHandler: {
         "req": {"type": "obj", "default": "req"},
         "log": {"type": "obj", "default": "log"},
@@ -1995,8 +2300,6 @@ def get_args():
         "loadmaps", add_help=False,
         description=(
             "Load 'categories' and 'tags' table from 'catmap_db.json' and 'tagmap_db.json'."
-            " Note that this is NOT needed for server at all, load them into db at will,"
-            " should you need to run your own specific SQL queries on data directly."
             " Note also that previous content of both tables will be lost."),
         help="load catmap and tagmap into db")
     subargp_loadmaps.set_defaults(command=load_maps)
-- 
GitLab