From 09c76ce5c7cf4c49892a703db4b47c21923e05eb Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jakub=20Malo=C5=A1t=C3=ADk?= <malostik@cesnet.cz>
Date: Mon, 13 Mar 2023 17:32:46 +0200
Subject: [PATCH] Enforce uniqueness of IDEA IDs

---
 ...on_0000_downgrade_deduplicate_idea_ids.sql |  2 +
 ...tion_0000_upgrade_deduplicate_idea_ids.sql |  2 +
 ...on_0000_downgrade_deduplicate_idea_ids.sql |  2 +
 ...tion_0000_upgrade_deduplicate_idea_ids.sql |  2 +
 warden_server/test_warden_server.py           | 49 +++++++++++++
 warden_server/warden_3.0_mysql.sql            |  1 +
 warden_server/warden_3.0_postgres.sql         |  1 +
 warden_server/warden_server.py                | 72 +++++++++++++++----
 8 files changed, 119 insertions(+), 12 deletions(-)
 create mode 100644 warden_server/migrations/mysql/mysql_migration_0000_downgrade_deduplicate_idea_ids.sql
 create mode 100644 warden_server/migrations/mysql/mysql_migration_0000_upgrade_deduplicate_idea_ids.sql
 create mode 100644 warden_server/migrations/postgresql/postgresql_migration_0000_downgrade_deduplicate_idea_ids.sql
 create mode 100644 warden_server/migrations/postgresql/postgresql_migration_0000_upgrade_deduplicate_idea_ids.sql

diff --git a/warden_server/migrations/mysql/mysql_migration_0000_downgrade_deduplicate_idea_ids.sql b/warden_server/migrations/mysql/mysql_migration_0000_downgrade_deduplicate_idea_ids.sql
new file mode 100644
index 0000000..c9cc5ef
--- /dev/null
+++ b/warden_server/migrations/mysql/mysql_migration_0000_downgrade_deduplicate_idea_ids.sql
@@ -0,0 +1,2 @@
+ALTER TABLE `events`
+    DROP COLUMN `idea_id`;
\ No newline at end of file
diff --git a/warden_server/migrations/mysql/mysql_migration_0000_upgrade_deduplicate_idea_ids.sql b/warden_server/migrations/mysql/mysql_migration_0000_upgrade_deduplicate_idea_ids.sql
new file mode 100644
index 0000000..0c0034a
--- /dev/null
+++ b/warden_server/migrations/mysql/mysql_migration_0000_upgrade_deduplicate_idea_ids.sql
@@ -0,0 +1,2 @@
+ALTER TABLE `events`
+    ADD COLUMN `idea_id` varchar(64) UNIQUE AFTER `id`;
\ No newline at end of file
diff --git a/warden_server/migrations/postgresql/postgresql_migration_0000_downgrade_deduplicate_idea_ids.sql b/warden_server/migrations/postgresql/postgresql_migration_0000_downgrade_deduplicate_idea_ids.sql
new file mode 100644
index 0000000..f88aaa5
--- /dev/null
+++ b/warden_server/migrations/postgresql/postgresql_migration_0000_downgrade_deduplicate_idea_ids.sql
@@ -0,0 +1,2 @@
+ALTER TABLE "events"
+    DROP COLUMN "idea_id";
\ No newline at end of file
diff --git a/warden_server/migrations/postgresql/postgresql_migration_0000_upgrade_deduplicate_idea_ids.sql b/warden_server/migrations/postgresql/postgresql_migration_0000_upgrade_deduplicate_idea_ids.sql
new file mode 100644
index 0000000..94532ee
--- /dev/null
+++ b/warden_server/migrations/postgresql/postgresql_migration_0000_upgrade_deduplicate_idea_ids.sql
@@ -0,0 +1,2 @@
+ALTER TABLE "events"
+    ADD COLUMN "idea_id" text UNIQUE;
\ No newline at end of file
diff --git a/warden_server/test_warden_server.py b/warden_server/test_warden_server.py
index 4926f6c..f056e51 100755
--- a/warden_server/test_warden_server.py
+++ b/warden_server/test_warden_server.py
@@ -211,6 +211,55 @@ class Warden3ServerTest(unittest.TestCase):
             ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "ideaidcontaininga\\u0000byte"}]', "422 IDEA ID cannot contain null bytes", None),
             ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "verylongideaidverylongideaid\\u0000verylongideaidverylongideaidverylongideaid"}]', "422 Multiple errors", None),
             ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201142"}]', "200 OK", ['{"saved": 1}']),
+            ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201142"}]', "409 IDEA event with this ID already exists", None),
+            (
+                "/sendEvents?secret=abc",
+                '['
+                    '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201142"}, '
+                    '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201143"}'
+                ']',
+                "409 IDEA event with this ID already exists",
+                None
+            ),
+            ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201143"}]', "409 IDEA event with this ID already exists", None),
+            (
+                "/sendEvents?secret=abc",
+                '['
+                    '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": ""}, '
+                    '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201144"}'
+                ']',
+                "422 The provided IDEA ID is invalid",
+                None
+            ),
+            ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201144"}]', "409 IDEA event with this ID already exists", None),
+            (
+                "/sendEvents?secret=abc",
+                '['
+                    '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201145"}, '
+                    '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201145"}'
+                ']',
+                "409 IDEA event with this ID already exists",
+                None
+            ),
+            (
+                "/sendEvents?secret=abc",
+                '['
+                    '{"Node": [{"Name": "cz.cesnet.warden3test"}]}, '
+                    '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "verylongideaidverylongideaidverylongideaidverylongideaidverylongideaid"}'
+                ']',
+                '422 Multiple errors',
+                None
+            ),
+            (
+                "/sendEvents?secret=abc",
+                '['
+                    '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201146"}, '
+                    '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201147"}, '
+                    '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201148"}'
+                ']',
+                "200 OK",
+                ['{"saved": 3}']
+            )
         ]
         for query, payload, expected_status, expected_response in tests:
             with self.subTest(query=query, payload=payload, expected_status=expected_status, expected_response=expected_response):
diff --git a/warden_server/warden_3.0_mysql.sql b/warden_server/warden_3.0_mysql.sql
index 9fb8118..4eaa7c7 100644
--- a/warden_server/warden_3.0_mysql.sql
+++ b/warden_server/warden_3.0_mysql.sql
@@ -66,6 +66,7 @@ CREATE TABLE IF NOT EXISTS `clients` (
 
 CREATE TABLE IF NOT EXISTS `events` (
   `id` bigint UNSIGNED NOT NULL AUTO_INCREMENT,
+  `idea_id` varchar(64) UNIQUE,
   `received` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
   `client_id` int UNSIGNED NOT NULL,
   `data` longtext NOT NULL,
diff --git a/warden_server/warden_3.0_postgres.sql b/warden_server/warden_3.0_postgres.sql
index 021c2f8..985596e 100644
--- a/warden_server/warden_3.0_postgres.sql
+++ b/warden_server/warden_3.0_postgres.sql
@@ -53,6 +53,7 @@ CREATE INDEX IF NOT EXISTS "clients_2" ON "clients" ("valid", "name");
 
 CREATE TABLE IF NOT EXISTS "events" (
   "id" bigint PRIMARY KEY GENERATED ALWAYS AS IDENTITY (MINVALUE 2),
+  "idea_id" text UNIQUE,
   "received" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
   "client_id" int NOT NULL REFERENCES "clients" ("id"),
   "data" bytea NOT NULL,
diff --git a/warden_server/warden_server.py b/warden_server/warden_server.py
index 3e84cb8..f76a762 100755
--- a/warden_server/warden_server.py
+++ b/warden_server/warden_server.py
@@ -519,8 +519,29 @@ class JSONSchemaValidator(NoValidator):
         return res
 
 
+class UnsafeQueryContext:
+    """ Context manager to be used within a transaction for partial rollbacks
+        Meant to be used as:
+        with self as db:
+            with self.unsafe_query_context(db):
+                res = db.query_one(...)
+    """
+    def __init__(self, db, silence_exc=False):
+        self.db = db
+        self.silence_exc = silence_exc
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_traceback):
+        return self.silence_exc and exc_type is not None \
+            and issubclass(exc_type, (self.db.db.IntegrityError, self.db.db.DataError))
+
+
 class DataBase(ObjectBase):
 
+    unsafe_query_context = UnsafeQueryContext
+
     def __init__(
             self, req, log, host, user, password, dbname, port, retry_count,
             retry_pause, event_size_limit, catmap_filename, tagmap_filename):
@@ -822,13 +843,24 @@ class DataBase(ObjectBase):
     def _build_store_events_tags(self, event_id, tag_ids):
         """Build query and params for insertion of event-tags mapping"""
 
-    def store_events(self, client, events, events_raw):
+    def store_events(self, client, events, events_raw, events_indexes):
         try:
             for attempt in self.repeat():
                 with attempt as db:
-                    for event, raw_event in zip(events, events_raw):
+                    errors = []
+                    stored = 0
+                    for event, raw_event, event_indx in zip(events, events_raw, events_indexes):
                         equery, eparams, eret = self._build_store_events_event(client, event, raw_event)
-                        lastid = db.query_one(equery, eparams, ret=eret)["id"]
+                        try:
+                            with self.unsafe_query_context(db):
+                                lastid = db.query_one(equery, eparams, ret=eret)["id"]
+                        except self.db.IntegrityError:
+                            exception = self.req.error(message="IDEA event with this ID already exists", error=400, exc=sys.exc_info(), env=self.req.env)
+                            exception.log(self.log)
+                            errors.append(ErrorMessage(409, "IDEA event with this ID already exists", events={event_indx}))
+                            continue
+
+                        stored += 1
 
                         catlist = event.get('Category', ["Other"])
                         cats = set(catlist) | {cat.split(".", 1)[0] for cat in catlist}
@@ -843,11 +875,11 @@ class DataBase(ObjectBase):
                             tquery, tparams, _ = self._build_store_events_tags(lastid, tag_ids)
                             db.execute(tquery, tparams)
 
-                    return []
+                    return errors, stored
         except Exception as e:
             exception = self.req.error(message="DB error", error=500, exc=sys.exc_info(), env=self.req.env)
             exception.log(self.log)
-            return [ErrorMessage(500, "DB error %s" % type(e).__name__)]
+            return [ErrorMessage(500, "DB error %s" % type(e).__name__)], 0
 
     @abc.abstractmethod
     def _build_insert_last_received_id(self, client, id):
@@ -1068,10 +1100,10 @@ class MySQL(DataBase):
         """Build query and params for event insertion"""
         return (
             [
-                "INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)",
+                "INSERT INTO events (idea_id,received,client_id,data) VALUES (%s, NOW(), %s, %s)",
                 "SELECT LAST_INSERT_ID() AS id"
             ],
-            [(client.id, raw_event), ()],
+            [(event["ID"], client.id, raw_event), ()],
             1
         )
 
@@ -1190,8 +1222,26 @@ class MySQL(DataBase):
         )
 
 
+class PostgresUnsafeQueryContext(UnsafeQueryContext):
+
+    SAVEPOINT = 'context_savepoint'
+
+    def __enter__(self):
+        self.db.execute([self.db.ppgsql.SQL('SAVEPOINT "context_savepoint"')], [()])
+        return self
+
+    def __exit__(self, exc_type, exc_value, exc_traceback):
+        if exc_type is not None:
+            self.db.execute([self.db.ppgsql.SQL('ROLLBACK TO SAVEPOINT "context_savepoint"')], [()])
+
+        return self.silence_exc and exc_type is not None \
+            and issubclass(exc_type, (self.db.db.IntegrityError, self.db.db.DataError))
+
+
 class PostgreSQL(DataBase):
 
+    unsafe_query_context = PostgresUnsafeQueryContext
+
     def __init__(
             self, req, log, host, user, password, dbname, port, retry_count,
             retry_pause, event_size_limit, catmap_filename, tagmap_filename):
@@ -1371,8 +1421,8 @@ class PostgreSQL(DataBase):
     def _build_store_events_event(self, client, event, raw_event):
         """Build query and params for event insertion"""
         return (
-            ["INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s) RETURNING id"],
-            [(client.id, self.db.Binary(raw_event.encode('utf8')))],
+            ["INSERT INTO events (idea_id,received,client_id,data) VALUES (%s, NOW(), %s, %s) RETURNING id"],
+            [(event["ID"], client.id, self.db.Binary(raw_event.encode('utf8')))],
             0
         )
 
@@ -1834,11 +1884,9 @@ class WardenHandler(ObjectBase):
             events_raw.append(raw_event)
             events_nums.append(i)
 
-        db_errs = self.db.store_events(self.req.client, events_tosend, events_raw)
+        db_errs, saved = self.db.store_events(self.req.client, events_tosend, events_raw, events_nums)
         self.add_errors(db_errs)
 
-        saved = 0 if db_errs else len(events_tosend)
-
         self.log.info("Saved %i events" % saved)
         if self.errs:
             raise self.req.error(errors=self.errs.values())
-- 
GitLab