Skip to content
Snippets Groups Projects
Commit b24a893f authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Merge branch 'malostik-#5066-deduplicate-idea-ids' into devel. (Closes: #5066)

parents c252133a 09c76ce5
No related branches found
No related tags found
No related merge requests found
ALTER TABLE `events`
DROP COLUMN `idea_id`;
\ No newline at end of file
ALTER TABLE `events`
ADD COLUMN `idea_id` varchar(64) UNIQUE AFTER `id`;
\ No newline at end of file
ALTER TABLE "events"
DROP COLUMN "idea_id";
\ No newline at end of file
ALTER TABLE "events"
ADD COLUMN "idea_id" text UNIQUE;
\ No newline at end of file
...@@ -200,12 +200,66 @@ class Warden3ServerTest(unittest.TestCase): ...@@ -200,12 +200,66 @@ class Warden3ServerTest(unittest.TestCase):
("/sendEvents?secret=abc", "", "200 OK", ['{"saved": 0}']), ("/sendEvents?secret=abc", "", "200 OK", ['{"saved": 0}']),
("/sendEvents?secret=abc", "{'test': 'true'}", "400 Deserialization error.", None), ("/sendEvents?secret=abc", "{'test': 'true'}", "400 Deserialization error.", None),
("/sendEvents?secret=abc", '{"test": "true"}', "400 List of events expected.", None), ("/sendEvents?secret=abc", '{"test": "true"}', "400 List of events expected.", None),
("/sendEvents?secret=abc", '[{"test": "true"}]', "422 Event does not bear valid Node attribute", None), ("/sendEvents?secret=abc", '[{"test": "true"}]', "422 Missing IDEA ID", None),
("/sendEvents?secret=abc", '[{"Node": ["test", "test2"]}]', "422 Event does not bear valid Node attribute", None), ("/sendEvents?secret=abc", '[{"test": "true", "ID": "120820201142"}]', "422 Event does not bear valid Node attribute", None),
("/sendEvents?secret=abc", '[{"Node": ["Name", "test"]}]', "422 Event does not bear valid Node attribute", None), ("/sendEvents?secret=abc", '[{"Node": ["test", "test2"], "ID": "120820201142"}]', "422 Event does not bear valid Node attribute", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name"}]}]', "400 Deserialization error.", None), ("/sendEvents?secret=abc", '[{"Node": ["Name", "test"], "ID": "120820201142"}]', "422 Event does not bear valid Node attribute", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "test"}]}]', "422 Node does not correspond with saving client", None), ("/sendEvents?secret=abc", '[{"Node": [{"Name"}], "ID": "120820201142"}]', "400 Deserialization error.", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}]}]', "200 OK", ['{"saved": 1}']), ("/sendEvents?secret=abc", '[{"Node": [{"Name": "test"}], "ID": "120820201142"}]', "422 Node does not correspond with saving client", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "test"}], "ID": "verylongideaidverylongideaidverylongideaidverylongideaidverylongideaid"}]', "422 The provided event ID is too long", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "verylongideaidverylongideaidverylongideaidverylongideaidverylongideaid"}]', "422 The provided event ID is too long", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "ideaidcontaininga\\u0000byte"}]', "422 IDEA ID cannot contain null bytes", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "verylongideaidverylongideaid\\u0000verylongideaidverylongideaidverylongideaid"}]', "422 Multiple errors", None),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201142"}]', "200 OK", ['{"saved": 1}']),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201142"}]', "409 IDEA event with this ID already exists", None),
(
"/sendEvents?secret=abc",
'['
'{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201142"}, '
'{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201143"}'
']',
"409 IDEA event with this ID already exists",
None
),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201143"}]', "409 IDEA event with this ID already exists", None),
(
"/sendEvents?secret=abc",
'['
'{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": ""}, '
'{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201144"}'
']',
"422 The provided IDEA ID is invalid",
None
),
("/sendEvents?secret=abc", '[{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201144"}]', "409 IDEA event with this ID already exists", None),
(
"/sendEvents?secret=abc",
'['
'{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201145"}, '
'{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201145"}'
']',
"409 IDEA event with this ID already exists",
None
),
(
"/sendEvents?secret=abc",
'['
'{"Node": [{"Name": "cz.cesnet.warden3test"}]}, '
'{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "verylongideaidverylongideaidverylongideaidverylongideaidverylongideaid"}'
']',
'422 Multiple errors',
None
),
(
"/sendEvents?secret=abc",
'['
'{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201146"}, '
'{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201147"}, '
'{"Node": [{"Name": "cz.cesnet.warden3test"}], "ID": "120820201148"}'
']',
"200 OK",
['{"saved": 3}']
)
] ]
for query, payload, expected_status, expected_response in tests: for query, payload, expected_status, expected_response in tests:
with self.subTest(query=query, payload=payload, expected_status=expected_status, expected_response=expected_response): with self.subTest(query=query, payload=payload, expected_status=expected_status, expected_response=expected_response):
......
...@@ -66,6 +66,7 @@ CREATE TABLE IF NOT EXISTS `clients` ( ...@@ -66,6 +66,7 @@ CREATE TABLE IF NOT EXISTS `clients` (
CREATE TABLE IF NOT EXISTS `events` ( CREATE TABLE IF NOT EXISTS `events` (
`id` bigint UNSIGNED NOT NULL AUTO_INCREMENT, `id` bigint UNSIGNED NOT NULL AUTO_INCREMENT,
`idea_id` varchar(64) UNIQUE,
`received` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `received` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`client_id` int UNSIGNED NOT NULL, `client_id` int UNSIGNED NOT NULL,
`data` longtext NOT NULL, `data` longtext NOT NULL,
......
...@@ -53,6 +53,7 @@ CREATE INDEX IF NOT EXISTS "clients_2" ON "clients" ("valid", "name"); ...@@ -53,6 +53,7 @@ CREATE INDEX IF NOT EXISTS "clients_2" ON "clients" ("valid", "name");
CREATE TABLE IF NOT EXISTS "events" ( CREATE TABLE IF NOT EXISTS "events" (
"id" bigint PRIMARY KEY GENERATED ALWAYS AS IDENTITY (MINVALUE 2), "id" bigint PRIMARY KEY GENERATED ALWAYS AS IDENTITY (MINVALUE 2),
"idea_id" text UNIQUE,
"received" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, "received" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"client_id" int NOT NULL REFERENCES "clients" ("id"), "client_id" int NOT NULL REFERENCES "clients" ("id"),
"data" bytea NOT NULL, "data" bytea NOT NULL,
......
...@@ -47,92 +47,138 @@ from jsonschema import Draft4Validator ...@@ -47,92 +47,138 @@ from jsonschema import Draft4Validator
VERSION = "3.0-beta3" VERSION = "3.0-beta3"
class Encoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, Error):
return o.to_dict()
if isinstance(o, ErrorMessage):
out = o.other_args.copy()
out.pop("exc", None)
out["error"] = o.error
out["message"] = o.message
if o.events:
out["events"] = list(o.events)
return out
return str(o)
class ErrorMessage(Exception):
def __init__(self, error, message, events=None, unique_id=None, **kwargs):
super(Exception, self).__setattr__("error", error)
super(Exception, self).__setattr__("message", message)
super(Exception, self).__setattr__("unique_id", unique_id)
self.events = set() if events is None else set(events)
self.other_args = kwargs
def __repr__(self):
return "%s(error=%d, message=%s)" % (
type(self).__name__, self.error, repr(self.message)
)
def __str__(self):
if sys.version_info[0] < 3:
return self.str_err().encode('ascii', 'backslashereplace')
return self.str_err()
def str_err(self):
exc = self.other_args.get("exc", None)
if exc in (None, (None, None, None)):
exc_cause = ""
else:
exc_cause = " (cause was %s: %s)" % (exc[0].__name__, str(exc[1]))
return "Error(%s) %s%s" % (self.error, self.message, exc_cause)
def str_info(self):
arg_copy = self.other_args.copy()
arg_copy.pop("req_id", None)
arg_copy.pop("method", None)
arg_copy.pop("exc", None)
if arg_copy:
return "Detail: %s" % json.dumps(arg_copy, cls=Encoder)
return ""
def str_debug(self):
exc = self.other_args.get("exc", None)
if exc in (None, (None, None, None)):
return ""
exc_tb = exc[2]
if not exc_tb:
return ""
return "Traceback:\n" + "".join(format_tb(exc_tb))
def __getattr__(self, name):
if name in self.other_args:
return self.other_args[name]
raise AttributeError
def __setattr__(self, name, value):
if name in ("events", "exc", "other_args"):
super(Exception, self).__setattr__(name, value)
return
if name in ("error", "message", "unique_id"):
raise AttributeError("Cannot change the attribute %s" % name)
self.other_args[name] = value
class Error(Exception): class Error(Exception):
def __init__(self, method=None, req_id=None, errors=None, **kwargs): def __init__(self, method=None, req_id=None, errors=None, **kwargs):
self.method = method self.method = method
self.req_id = req_id self.req_id = req_id
self.errors = [kwargs] if kwargs else [] if "message" in kwargs:
kwargs.setdefault("error", 500)
self.errors = [ErrorMessage(**kwargs)]
else:
self.errors = []
if errors: if errors:
self.errors.extend(errors) self.errors.extend(errors)
def append(self, _events=None, **kwargs): def append(self, _events=None, **kwargs):
self.errors.append(kwargs) kwargs.setdefault("message", "No further information")
kwargs.setdefault("error", 500)
self.errors.append(ErrorMessage(**kwargs))
def get_http_err_msg(self): def get_http_err_msg(self):
try: try:
err = self.errors[0]["error"] err = self.errors[0].error
msg = self.errors[0]["message"].replace("\n", " ") msg = self.errors[0].message
except (IndexError, KeyError): except (IndexError, AttributeError):
err = 500 err = 500
msg = "There's NO self-destruction button! Ah, you've just found it..." msg = "There's NO self-destruction button! Ah, you've just found it..."
for e in self.errors: return err, msg
next_err = e.get("error", 500)
if err != next_err: if not all(msg == e.message for e in self.errors):
# errors not same, round to basic err code (400, 500) # messages not the same, get Multiple errors
# and use the highest one msg = "Multiple errors"
err = max(err//100, next_err//100)*100 if not all(err == e.error for e in self.errors):
next_msg = e.get("message", "Unknown error").replace("\n", " ") # errors not same, round to basic err code (400, 500)
if msg != next_msg: # and use the highest one
msg = "Multiple errors" err = max(e.error for e in self.errors) // 100 * 100
msg = "".join((c if '\x20' <= c != '\x7f' else r'\x{:02x}'.format(ord(c))) for c in msg) # escape control characters msg = "".join((c if '\x20' <= c != '\x7f' else r'\x{:02x}'.format(ord(c))) for c in msg) # escape control characters
return err, msg return err, msg
def __str__(self): def __str__(self):
return "\n".join(self.str_err(e) for e in self.errors) return "\n".join(str(e) for e in self.errors)
def log(self, logger, prio=logging.ERROR): def log(self, logger, prio=logging.ERROR):
for e in self.errors: for e in self.errors:
logger.log(prio, self.str_err(e)) logger.log(prio, e.str_err())
info = self.str_info(e) info = e.str_info()
if info: if info:
logger.info(info) logger.info(info)
debug = self.str_debug(e) debug = e.str_debug()
if debug: if debug:
logger.debug(debug) logger.debug(debug)
def str_err(self, e):
out = []
out.append("Error(%s) %s " % (e.get("error", 0), e.get("message", "Unknown error")))
if "exc" in e and e["exc"]:
out.append("(cause was %s: %s)" % (e["exc"][0].__name__, str(e["exc"][1])))
return "".join(out)
def str_info(self, e):
ecopy = dict(e) # shallow copy
ecopy.pop("req_id", None)
ecopy.pop("method", None)
ecopy.pop("error", None)
ecopy.pop("message", None)
ecopy.pop("exc", None)
if ecopy:
out = "Detail: %s" % (json.dumps(ecopy, default=lambda v: str(v)))
else:
out = ""
return out
def str_debug(self, e):
out = []
if not e.get("exc"):
return ""
exc_tb = e["exc"][2]
if exc_tb:
out.append("Traceback:\n")
out.extend(format_tb(exc_tb))
return "".join(out)
def to_dict(self): def to_dict(self):
errlist = []
for e in self.errors:
ecopy = dict(e)
ecopy.pop("exc", None)
errlist.append(ecopy)
d = { d = {
"method": self.method, "method": self.method,
"req_id": self.req_id, "req_id": self.req_id,
"errors": errlist "errors": self.errors
} }
return d return d
...@@ -460,20 +506,42 @@ class JSONSchemaValidator(NoValidator): ...@@ -460,20 +506,42 @@ class JSONSchemaValidator(NoValidator):
res = [] res = []
for error in sorted(self.validator.iter_errors(event), key=sortkey): for error in sorted(self.validator.iter_errors(event), key=sortkey):
res.append({ res.append(
"error": 460, ErrorMessage(
"message": "Validation error: key \"%s\", value \"%s\"" % ( 460, "Validation error: key \"%s\", value \"%s\"" % (
"/".join(str(v) for v in error.path), "/".join(map(str, error.path)),
error.instance error.instance
), ),
"expected": error.schema.get('description', 'no additional info') expected=error.schema.get('description', 'no additional info')
}) )
)
return res return res
class UnsafeQueryContext:
""" Context manager to be used within a transaction for partial rollbacks
Meant to be used as:
with self as db:
with self.unsafe_query_context(db):
res = db.query_one(...)
"""
def __init__(self, db, silence_exc=False):
self.db = db
self.silence_exc = silence_exc
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
return self.silence_exc and exc_type is not None \
and issubclass(exc_type, (self.db.db.IntegrityError, self.db.db.DataError))
class DataBase(ObjectBase): class DataBase(ObjectBase):
unsafe_query_context = UnsafeQueryContext
def __init__( def __init__(
self, req, log, host, user, password, dbname, port, retry_count, self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename): retry_pause, event_size_limit, catmap_filename, tagmap_filename):
...@@ -775,13 +843,24 @@ class DataBase(ObjectBase): ...@@ -775,13 +843,24 @@ class DataBase(ObjectBase):
def _build_store_events_tags(self, event_id, tag_ids): def _build_store_events_tags(self, event_id, tag_ids):
"""Build query and params for insertion of event-tags mapping""" """Build query and params for insertion of event-tags mapping"""
def store_events(self, client, events, events_raw): def store_events(self, client, events, events_raw, events_indexes):
try: try:
for attempt in self.repeat(): for attempt in self.repeat():
with attempt as db: with attempt as db:
for event, raw_event in zip(events, events_raw): errors = []
stored = 0
for event, raw_event, event_indx in zip(events, events_raw, events_indexes):
equery, eparams, eret = self._build_store_events_event(client, event, raw_event) equery, eparams, eret = self._build_store_events_event(client, event, raw_event)
lastid = db.query_one(equery, eparams, ret=eret)["id"] try:
with self.unsafe_query_context(db):
lastid = db.query_one(equery, eparams, ret=eret)["id"]
except self.db.IntegrityError:
exception = self.req.error(message="IDEA event with this ID already exists", error=400, exc=sys.exc_info(), env=self.req.env)
exception.log(self.log)
errors.append(ErrorMessage(409, "IDEA event with this ID already exists", events={event_indx}))
continue
stored += 1
catlist = event.get('Category', ["Other"]) catlist = event.get('Category', ["Other"])
cats = set(catlist) | {cat.split(".", 1)[0] for cat in catlist} cats = set(catlist) | {cat.split(".", 1)[0] for cat in catlist}
...@@ -796,11 +875,11 @@ class DataBase(ObjectBase): ...@@ -796,11 +875,11 @@ class DataBase(ObjectBase):
tquery, tparams, _ = self._build_store_events_tags(lastid, tag_ids) tquery, tparams, _ = self._build_store_events_tags(lastid, tag_ids)
db.execute(tquery, tparams) db.execute(tquery, tparams)
return [] return errors, stored
except Exception as e: except Exception as e:
exception = self.req.error(message="DB error", error=500, exc=sys.exc_info(), env=self.req.env) exception = self.req.error(message="DB error", error=500, exc=sys.exc_info(), env=self.req.env)
exception.log(self.log) exception.log(self.log)
return [{"error": 500, "message": "DB error %s" % type(e).__name__}] return [ErrorMessage(500, "DB error %s" % type(e).__name__)], 0
@abc.abstractmethod @abc.abstractmethod
def _build_insert_last_received_id(self, client, id): def _build_insert_last_received_id(self, client, id):
...@@ -1021,10 +1100,10 @@ class MySQL(DataBase): ...@@ -1021,10 +1100,10 @@ class MySQL(DataBase):
"""Build query and params for event insertion""" """Build query and params for event insertion"""
return ( return (
[ [
"INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)", "INSERT INTO events (idea_id,received,client_id,data) VALUES (%s, NOW(), %s, %s)",
"SELECT LAST_INSERT_ID() AS id" "SELECT LAST_INSERT_ID() AS id"
], ],
[(client.id, raw_event), ()], [(event["ID"], client.id, raw_event), ()],
1 1
) )
...@@ -1143,8 +1222,26 @@ class MySQL(DataBase): ...@@ -1143,8 +1222,26 @@ class MySQL(DataBase):
) )
class PostgresUnsafeQueryContext(UnsafeQueryContext):
SAVEPOINT = 'context_savepoint'
def __enter__(self):
self.db.execute([self.db.ppgsql.SQL('SAVEPOINT "context_savepoint"')], [()])
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
if exc_type is not None:
self.db.execute([self.db.ppgsql.SQL('ROLLBACK TO SAVEPOINT "context_savepoint"')], [()])
return self.silence_exc and exc_type is not None \
and issubclass(exc_type, (self.db.db.IntegrityError, self.db.db.DataError))
class PostgreSQL(DataBase): class PostgreSQL(DataBase):
unsafe_query_context = PostgresUnsafeQueryContext
def __init__( def __init__(
self, req, log, host, user, password, dbname, port, retry_count, self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename): retry_pause, event_size_limit, catmap_filename, tagmap_filename):
...@@ -1324,8 +1421,8 @@ class PostgreSQL(DataBase): ...@@ -1324,8 +1421,8 @@ class PostgreSQL(DataBase):
def _build_store_events_event(self, client, event, raw_event): def _build_store_events_event(self, client, event, raw_event):
"""Build query and params for event insertion""" """Build query and params for event insertion"""
return ( return (
["INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s) RETURNING id"], ["INSERT INTO events (idea_id,received,client_id,data) VALUES (%s, NOW(), %s, %s) RETURNING id"],
[(client.id, self.db.Binary(raw_event.encode('utf8')))], [(event["ID"], client.id, self.db.Binary(raw_event.encode('utf8')))],
0 0
) )
...@@ -1527,7 +1624,7 @@ class Server(ObjectBase): ...@@ -1527,7 +1624,7 @@ class Server(ObjectBase):
if exception: if exception:
status = "%d %s" % exception.get_http_err_msg() status = "%d %s" % exception.get_http_err_msg()
output = json.dumps(exception.to_dict(), default=lambda v: str(v)) output = json.dumps(exception, cls=Encoder)
exception.log(self.log) exception.log(self.log)
# Make sure everything is properly encoded - JSON and various function # Make sure everything is properly encoded - JSON and various function
...@@ -1567,9 +1664,7 @@ def json_wrapper(method): ...@@ -1567,9 +1664,7 @@ def json_wrapper(method):
result = method(self, **args) # call requested method result = method(self, **args) # call requested method
try: try:
# 'default': takes care of non JSON serializable objects, output = json.dumps(result, cls=Encoder)
# which could (although shouldn't) appear in handler code
output = json.dumps(result, default=lambda v: str(v))
except Exception as e: except Exception as e:
raise self.req.error(message="Serialization error", error=500, exc=sys.exc_info(), args=str(result)) raise self.req.error(message="Serialization error", error=500, exc=sys.exc_info(), args=str(result))
...@@ -1684,28 +1779,44 @@ class WardenHandler(ObjectBase): ...@@ -1684,28 +1779,44 @@ class WardenHandler(ObjectBase):
return res return res
def check_node(self, event, name): def check_node(self, event, event_indx, name):
try: try:
ev_id = event['Node'][0]['Name'].lower() ev_id = event['Node'][0]['Name'].lower()
except (KeyError, TypeError, IndexError): except (KeyError, TypeError, IndexError):
# Event does not bear valid Node attribute # Event does not bear valid Node attribute
return [{"error": 422, "message": "Event does not bear valid Node attribute"}] return [
ErrorMessage(422, "Event does not bear valid Node attribute", {event_indx})
]
if ev_id != name: if ev_id != name:
return [{"error": 422, "message": "Node does not correspond with saving client"}] return [
ErrorMessage(422, "Node does not correspond with saving client", {event_indx})
]
return [] return []
def add_event_nums(self, ilist, events, errlist): def check_idea_id(self, event, event_indx):
for err in errlist: id_length_limit = 64
err.setdefault("events", []).extend(ilist) try:
ev_ids = err.setdefault("events_id", []) id_ = event["ID"]
for i in ilist: except (KeyError, TypeError, ValueError):
event = events[i] return [ErrorMessage(422, "Missing IDEA ID", {event_indx})]
try: if not isinstance(id_, unicode) or len(id_) == 0:
id = event["ID"] return [ErrorMessage(422, "The provided IDEA ID is invalid", {event_indx})]
except (KeyError, TypeError, ValueError):
id = None errors = []
ev_ids.append(id) if len(id_) > id_length_limit:
return errlist errors.append(
ErrorMessage(
422, "The provided event ID is too long",
{event_indx}, id_length_limit=id_length_limit
)
)
if '\x00' in id_:
errors.append(ErrorMessage(422, "IDEA ID cannot contain null bytes", {event_indx}))
return errors
def add_errors(self, errs_to_add):
for err in errs_to_add:
self.errs.setdefault((err.error, err.message, err.unique_id), err).events.update(err.events)
@expose(write=True) @expose(write=True)
@json_wrapper @json_wrapper
...@@ -1713,56 +1824,72 @@ class WardenHandler(ObjectBase): ...@@ -1713,56 +1824,72 @@ class WardenHandler(ObjectBase):
if not isinstance(events, list): if not isinstance(events, list):
raise self.req.error(message="List of events expected.", error=400) raise self.req.error(message="List of events expected.", error=400)
errs = [] self.errs = {}
if len(events) > self.send_events_limit: if len(events) > self.send_events_limit:
errs.extend(self.add_event_nums(range(self.send_events_limit, len(events)), events, [ self.add_errors(
{"error": 507, "message": "Too much events in one batch.", "send_events_limit": self.send_events_limit}])) [
ErrorMessage(
507, "Too many events in one batch.",
set(range(self.send_events_limit, len(events))),
send_events_limit=self.send_events_limit
)
]
)
saved = 0
events_tosend = [] events_tosend = []
events_raw = [] events_raw = []
events_nums = [] events_nums = []
for i, event in enumerate(events[0:self.send_events_limit]): for i, event in enumerate(events[0:self.send_events_limit]):
v_errs = self.validator.check(event) v_errs = self.validator.check(event)
if v_errs: if v_errs:
errs.extend(self.add_event_nums([i], events, v_errs)) self.add_errors(v_errs)
continue
idea_id_errs = self.check_idea_id(event, i)
if idea_id_errs:
self.add_errors(idea_id_errs)
continue continue
node_errs = self.check_node(event, self.req.client.name) node_errs = self.check_node(event, i, self.req.client.name)
if node_errs: if node_errs:
errs.extend(self.add_event_nums([i], events, node_errs)) self.add_errors(node_errs)
continue continue
if self.req.client.test and 'Test' not in event.get('Category', []): if self.req.client.test and 'Test' not in event.get('Category', []):
errs.extend( self.add_errors(
self.add_event_nums([i], events, [{ [
"error": 422, ErrorMessage(
"message": "You're allowed to send only messages, containing \"Test\" among categories.", 422, "You're allowed to send only messages containing \"Test\" among categories.", {i},
"categories": event.get('Category', [])}])) # Ensure that 1the error message is contained for every combination of categories
unique_id=tuple(event.get('Category', [])),
categories=event.get('Category', [])
)
]
)
continue continue
raw_event = json.dumps(event) raw_event = json.dumps(event)
if len(raw_event) >= self.db.event_size_limit: if len(raw_event) >= self.db.event_size_limit:
errs.extend( self.add_errors(
self.add_event_nums([i], events, [ [
{"error": 413, "message": "Event too long (>%i B)" % self.db.event_size_limit} ErrorMessage(
])) 413, "Event too long (>%i B)" % self.db.event_size_limit, {i},
event_size_limit = self.db.event_size_limit
)
]
)
continue continue
events_tosend.append(event) events_tosend.append(event)
events_raw.append(raw_event) events_raw.append(raw_event)
events_nums.append(i) events_nums.append(i)
db_errs = self.db.store_events(self.req.client, events_tosend, events_raw) db_errs, saved = self.db.store_events(self.req.client, events_tosend, events_raw, events_nums)
if db_errs: self.add_errors(db_errs)
errs.extend(self.add_event_nums(events_nums, events_tosend, db_errs))
saved = 0
else:
saved = len(events_tosend)
self.log.info("Saved %i events" % saved) self.log.info("Saved %i events" % saved)
if errs: if self.errs:
raise self.req.error(errors=errs) raise self.req.error(errors=self.errs.values())
return {"saved": saved} return {"saved": saved}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment