diff --git a/warden_server/migrations/mysql/mysql_migration_0000_downgrade_deduplicate_idea_ids.sql b/warden_server/migrations/mysql/mysql_migration_0000_downgrade_deduplicate_idea_ids.sql new file mode 100644 index 0000000000000000000000000000000000000000..c9cc5eff8cde044795b7aa070cc58b767cd83688 --- /dev/null +++ b/warden_server/migrations/mysql/mysql_migration_0000_downgrade_deduplicate_idea_ids.sql @@ -0,0 +1,2 @@ +ALTER TABLE `events` + DROP COLUMN `idea_id`; \ No newline at end of file diff --git a/warden_server/migrations/mysql/mysql_migration_0000_upgrade_deduplicate_idea_ids.sql b/warden_server/migrations/mysql/mysql_migration_0000_upgrade_deduplicate_idea_ids.sql new file mode 100644 index 0000000000000000000000000000000000000000..0c0034ac42fe55b0e69e9d282adb79491c881929 --- /dev/null +++ b/warden_server/migrations/mysql/mysql_migration_0000_upgrade_deduplicate_idea_ids.sql @@ -0,0 +1,2 @@ +ALTER TABLE `events` + ADD COLUMN `idea_id` varchar(64) UNIQUE AFTER `id`; \ No newline at end of file diff --git a/warden_server/migrations/postgresql/postgresql_migration_0000_downgrade_deduplicate_idea_ids.sql b/warden_server/migrations/postgresql/postgresql_migration_0000_downgrade_deduplicate_idea_ids.sql new file mode 100644 index 0000000000000000000000000000000000000000..f88aaa50a9beb932084d88a5a68d81b9bb3e07ac --- /dev/null +++ b/warden_server/migrations/postgresql/postgresql_migration_0000_downgrade_deduplicate_idea_ids.sql @@ -0,0 +1,2 @@ +ALTER TABLE "events" + DROP COLUMN "idea_id"; \ No newline at end of file diff --git a/warden_server/migrations/postgresql/postgresql_migration_0000_upgrade_deduplicate_idea_ids.sql b/warden_server/migrations/postgresql/postgresql_migration_0000_upgrade_deduplicate_idea_ids.sql new file mode 100644 index 0000000000000000000000000000000000000000..94532eeda97ac90438bda0ea054440c41dee8950 --- /dev/null +++ b/warden_server/migrations/postgresql/postgresql_migration_0000_upgrade_deduplicate_idea_ids.sql @@ -0,0 +1,2 @@ +ALTER TABLE "events" + ADD COLUMN "idea_id" text UNIQUE; \ No newline at end of file diff --git a/warden_server/test_warden_server.py b/warden_server/test_warden_server.py index 4926f6c8c55edf7692ab0f098286ae7ac5776945..f056e5101f989dfcb5dbe5729ebc4f1cb056ada4 100755 --- a/warden_server/test_warden_server.py +++ b/warden_server/test_warden_server.py @@ -211,6 +211,55 @@ class Warden3ServerTest(unittest.TestCase): ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "ideaidcontaininga\\u0000byte"}]', "422 IDEA ID cannot contain null bytes", None), ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "verylongideaidverylongideaid\\u0000verylongideaidverylongideaidverylongideaid"}]', "422 Multiple errors", None), ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201142"}]', "200 OK", ['{"saved": 1}']), + ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201142"}]', "409 IDEA event with this ID already exists", None), + ( + "/sendEvents?secret=abc", + '[' + '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201142"}, ' + '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201143"}' + ']', + "409 IDEA event with this ID already exists", + None + ), + ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201143"}]', "409 IDEA event with this ID already exists", None), + ( + "/sendEvents?secret=abc", + '[' + '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": ""}, ' + '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201144"}' + ']', + "422 The provided IDEA ID is invalid", + None + ), + ("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201144"}]', "409 IDEA event with this ID already exists", None), + ( + "/sendEvents?secret=abc", + '[' + '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201145"}, ' + '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201145"}' + ']', + "409 IDEA event with this ID already exists", + None + ), + ( + "/sendEvents?secret=abc", + '[' + '{"Node": [{"Name": "cz.cesnet.warden3test"}]}, ' + '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "verylongideaidverylongideaidverylongideaidverylongideaidverylongideaid"}' + ']', + '422 Multiple errors', + None + ), + ( + "/sendEvents?secret=abc", + '[' + '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201146"}, ' + '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201147"}, ' + '{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201148"}' + ']', + "200 OK", + ['{"saved": 3}'] + ) ] for query, payload, expected_status, expected_response in tests: with self.subTest(query=query, payload=payload, expected_status=expected_status, expected_response=expected_response): diff --git a/warden_server/warden_3.0_mysql.sql b/warden_server/warden_3.0_mysql.sql index 9fb8118d3475f8a88331c45baaf7a37d2cc85f1a..4eaa7c70bc83a2ecc0f34605c6faa6fbdfb79d7d 100644 --- a/warden_server/warden_3.0_mysql.sql +++ b/warden_server/warden_3.0_mysql.sql @@ -66,6 +66,7 @@ CREATE TABLE IF NOT EXISTS `clients` ( CREATE TABLE IF NOT EXISTS `events` ( `id` bigint UNSIGNED NOT NULL AUTO_INCREMENT, + `idea_id` varchar(64) UNIQUE, `received` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `client_id` int UNSIGNED NOT NULL, `data` longtext NOT NULL, diff --git a/warden_server/warden_3.0_postgres.sql b/warden_server/warden_3.0_postgres.sql index 021c2f8e0253f0c3aa03a072d6d66939d0639017..985596eefe2745b2710097bcd35d5f04e983b33a 100644 --- a/warden_server/warden_3.0_postgres.sql +++ b/warden_server/warden_3.0_postgres.sql @@ -53,6 +53,7 @@ CREATE INDEX IF NOT EXISTS "clients_2" ON "clients" ("valid", "name"); CREATE TABLE IF NOT EXISTS "events" ( "id" bigint PRIMARY KEY GENERATED ALWAYS AS IDENTITY (MINVALUE 2), + "idea_id" text UNIQUE, "received" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, "client_id" int NOT NULL REFERENCES "clients" ("id"), "data" bytea NOT NULL, diff --git a/warden_server/warden_server.py b/warden_server/warden_server.py index 3e84cb829d9c1ce73db4c4407b2ca9d1f8ca8eba..f76a7629aee989044c443028bae66748637de093 100755 --- a/warden_server/warden_server.py +++ b/warden_server/warden_server.py @@ -519,8 +519,29 @@ class JSONSchemaValidator(NoValidator): return res +class UnsafeQueryContext: + """ Context manager to be used within a transaction for partial rollbacks + Meant to be used as: + with self as db: + with self.unsafe_query_context(db): + res = db.query_one(...) + """ + def __init__(self, db, silence_exc=False): + self.db = db + self.silence_exc = silence_exc + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + return self.silence_exc and exc_type is not None \ + and issubclass(exc_type, (self.db.db.IntegrityError, self.db.db.DataError)) + + class DataBase(ObjectBase): + unsafe_query_context = UnsafeQueryContext + def __init__( self, req, log, host, user, password, dbname, port, retry_count, retry_pause, event_size_limit, catmap_filename, tagmap_filename): @@ -822,13 +843,24 @@ class DataBase(ObjectBase): def _build_store_events_tags(self, event_id, tag_ids): """Build query and params for insertion of event-tags mapping""" - def store_events(self, client, events, events_raw): + def store_events(self, client, events, events_raw, events_indexes): try: for attempt in self.repeat(): with attempt as db: - for event, raw_event in zip(events, events_raw): + errors = [] + stored = 0 + for event, raw_event, event_indx in zip(events, events_raw, events_indexes): equery, eparams, eret = self._build_store_events_event(client, event, raw_event) - lastid = db.query_one(equery, eparams, ret=eret)["id"] + try: + with self.unsafe_query_context(db): + lastid = db.query_one(equery, eparams, ret=eret)["id"] + except self.db.IntegrityError: + exception = self.req.error(message="IDEA event with this ID already exists", error=400, exc=sys.exc_info(), env=self.req.env) + exception.log(self.log) + errors.append(ErrorMessage(409, "IDEA event with this ID already exists", events={event_indx})) + continue + + stored += 1 catlist = event.get('Category', ["Other"]) cats = set(catlist) | {cat.split(".", 1)[0] for cat in catlist} @@ -843,11 +875,11 @@ class DataBase(ObjectBase): tquery, tparams, _ = self._build_store_events_tags(lastid, tag_ids) db.execute(tquery, tparams) - return [] + return errors, stored except Exception as e: exception = self.req.error(message="DB error", error=500, exc=sys.exc_info(), env=self.req.env) exception.log(self.log) - return [ErrorMessage(500, "DB error %s" % type(e).__name__)] + return [ErrorMessage(500, "DB error %s" % type(e).__name__)], 0 @abc.abstractmethod def _build_insert_last_received_id(self, client, id): @@ -1068,10 +1100,10 @@ class MySQL(DataBase): """Build query and params for event insertion""" return ( [ - "INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)", + "INSERT INTO events (idea_id,received,client_id,data) VALUES (%s, NOW(), %s, %s)", "SELECT LAST_INSERT_ID() AS id" ], - [(client.id, raw_event), ()], + [(event["ID"], client.id, raw_event), ()], 1 ) @@ -1190,8 +1222,26 @@ class MySQL(DataBase): ) +class PostgresUnsafeQueryContext(UnsafeQueryContext): + + SAVEPOINT = 'context_savepoint' + + def __enter__(self): + self.db.execute([self.db.ppgsql.SQL('SAVEPOINT "context_savepoint"')], [()]) + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + if exc_type is not None: + self.db.execute([self.db.ppgsql.SQL('ROLLBACK TO SAVEPOINT "context_savepoint"')], [()]) + + return self.silence_exc and exc_type is not None \ + and issubclass(exc_type, (self.db.db.IntegrityError, self.db.db.DataError)) + + class PostgreSQL(DataBase): + unsafe_query_context = PostgresUnsafeQueryContext + def __init__( self, req, log, host, user, password, dbname, port, retry_count, retry_pause, event_size_limit, catmap_filename, tagmap_filename): @@ -1371,8 +1421,8 @@ class PostgreSQL(DataBase): def _build_store_events_event(self, client, event, raw_event): """Build query and params for event insertion""" return ( - ["INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s) RETURNING id"], - [(client.id, self.db.Binary(raw_event.encode('utf8')))], + ["INSERT INTO events (idea_id,received,client_id,data) VALUES (%s, NOW(), %s, %s) RETURNING id"], + [(event["ID"], client.id, self.db.Binary(raw_event.encode('utf8')))], 0 ) @@ -1834,11 +1884,9 @@ class WardenHandler(ObjectBase): events_raw.append(raw_event) events_nums.append(i) - db_errs = self.db.store_events(self.req.client, events_tosend, events_raw) + db_errs, saved = self.db.store_events(self.req.client, events_tosend, events_raw, events_nums) self.add_errors(db_errs) - saved = 0 if db_errs else len(events_tosend) - self.log.info("Saved %i events" % saved) if self.errs: raise self.req.error(errors=self.errs.values())