Skip to content
Snippets Groups Projects
Commit 337ae742 authored by Rajmund Hruška's avatar Rajmund Hruška Committed by Rajmund Hruška
Browse files

Server: Uniquify sendEvents errors and append indices of events affected by integrity error

parent badbefaf
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,7 @@ from __future__ import print_function
import sys
import os
import io
import copy
from os import path
import logging
import logging.handlers
......@@ -755,19 +756,22 @@ class MySQL(ObjectBase):
"events": events
}
def store_events(self, client, events, events_raw):
integrity_errors = []
def store_events(self, client, events, events_raw, event_nums):
errors = []
error_indx = []
try:
for attempt in self.repeat():
with attempt as db:
for event, raw_event in zip(events, events_raw):
for event, raw_event, event_id in zip(events, events_raw, event_nums):
try:
try:
ideaid = event['ID']
except (KeyError, TypeError, ValueError):
exception = self.req.error(message="IdeaID is null", error=400, exc=sys.exc_info(), env=self.req.env)
exception.log(self.log)
return ([{"error": 400, "message": "IdeaID is not present"}], [])
errors.extend([{"error": 400, "message": "IdeaID is not present"}])
error_indx.append(event_id)
continue
lastid = db.query(
"INSERT INTO events (received,client_id,data,idea_id) VALUES (NOW(), %s, %s, %s)",
(client.id, raw_event, ideaid)).lastrowid
......@@ -789,9 +793,10 @@ class MySQL(ObjectBase):
except my.IntegrityError as e:
exception = self.req.error(message="Database integrity error", error=400, exc=sys.exc_info(), env=self.req.env)
exception.log(self.log)
integrity_errors.extend([{"error": 400, "message": "IdeaID is not unique", "idea_id" : ideaid}])
errors.extend([{"error": 400, "message": "IdeaID is not unique"}])
error_indx.append(event_id)
pass
return ([], integrity_errors)
return (errors, error_indx)
except Exception as e:
exception = self.req.error(message="DB error", error=500, exc=sys.exc_info(), env=self.req.env)
exception.log(self.log)
......@@ -1129,17 +1134,25 @@ class WardenHandler(ObjectBase):
return []
def add_event_nums(self, ilist, events, errlist):
for err in errlist:
err.setdefault("events", []).extend(ilist)
"""Uniquify the list of errors and append the lists of indices and ids of events affected by the error."""
if not ilist:
return errlist
unique_errors = [dict(s) for s in set(frozenset(d.items()) for d in errlist)]
for err in unique_errors:
original_error = copy.copy(err)
ev_indx = err.setdefault("events", [])
ev_ids = err.setdefault("events_id", [])
for i in ilist:
event = events[i]
for i in range(0, len(ilist)):
if errlist[i] == original_error:
event = events[ilist[i]]
try:
id = event["ID"]
except (KeyError, TypeError, ValueError):
id = None
ev_indx.append(ilist[i])
ev_ids.append(id)
return errlist
return unique_errors
@expose(write=1)
@json_wrapper
......@@ -1187,13 +1200,10 @@ class WardenHandler(ObjectBase):
events_raw.append(raw_event)
events_nums.append(i)
db_errs, integrity_errs = self.db.store_events(self.req.client, events_tosend, events_raw)
if integrity_errs:
errs.extend(integrity_errs)
saved = len(events_tosend) - len(integrity_errs)
elif db_errs:
errs.extend(self.add_event_nums(events_nums, events_tosend, db_errs))
saved = 0
db_errs, err_indx = self.db.store_events(self.req.client, events_tosend, events_raw, events_nums)
if db_errs:
saved = len(events_tosend) - len(errs) if err_indx else 0
errs.extend(self.add_event_nums(err_indx, events_tosend, db_errs))
else:
saved = len(events_tosend)
self.log.info("Saved %i events" % saved)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment