From 1198228bf0c20ac74f6b89ac231b66a04f094ea6 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Pavel=20K=C3=A1cha?= <ph@cesnet.cz>
Date: Wed, 25 Feb 2015 13:46:32 +0100
Subject: [PATCH] * warden_server: Added "events_id" key into errors along with
 "events" - makes error messages bigger, but allows client operators to
 identify offending messages by stable identifiers based on logs. * Removed
 too much internal info from database errors. * Removed format type checking
 from Draft4Validator, using only explicit schema regexps - Draft4Validator
 raised FormatError istead of ValidationError, rendering iter_errors unusable.
 * warden_client: Implemented retries on server errors. * Client now honours
 send_events_limit from server - too long list of events is split and sent in
 chunks. * getInfo and sendEvents update local send_events_limit according to
 one sent by server in info or error message. * Logging is now explicit, not
 authomatic in Error class, allowing to log only single time at top level
 methods. * More sane default client name. * Errors from server are now
 checked and ensure correct format. * warden_filer: Ditched error and retry
 handling, warden_client now does for us.

---
 warden3/contrib/warden_filer/warden_filer.py | 115 ++++-----
 warden3/warden_client/warden_client.py       | 251 ++++++++++++++-----
 warden3/warden_server/warden_server.py       |  40 +--
 3 files changed, 255 insertions(+), 151 deletions(-)

diff --git a/warden3/contrib/warden_filer/warden_filer.py b/warden3/contrib/warden_filer/warden_filer.py
index fc8cae6..84b1c56 100644
--- a/warden3/contrib/warden_filer/warden_filer.py
+++ b/warden3/contrib/warden_filer/warden_filer.py
@@ -161,18 +161,17 @@ def receiver(config, wclient, sdir, oneshot):
                     nf.moveto(sdir.incoming)
                     count_ok += 1
                 except Exception as e:
-                    Error("Error saving event", wclient.logger, exc=sys.exc_info(),
-                          detail={"file": str(nf), "event_id": event.get("ID"), "sdir": sdir.path})
+                    Error(message="Error saving event", exc=sys.exc_info(), file=str(nf),
+                          event_ids=[event.get("ID")], sdir=sdir.path).log(wclient.logger)
                     count_err += 1
             wclient.logger.info(
                 "warden_filer: received %d, errors %d"
                 % (count_ok, count_err))
-            if oneshot:
-                events = None
-            else:
-                events = wclient.getEvents(**filt)
+            events = wclient.getEvents(**filt)
+            count_ok = count_err = 0
         if oneshot:
-            terminate_me(None, None)
+            if not events:
+                terminate_me(None, None)
         else:
             time.sleep(poll_time)
 
@@ -192,64 +191,49 @@ def sender(config, wclient, sdir, oneshot):
             nflist = sdir.get_incoming()
         # count chunk iterations rounded up
         count = len(nflist)
-        for i in range(0, count, send_events_limit):
-            # process one at most send_events_limit long chunk
-            events = []
-            nf_sent = []
-            for j in range(i, min(i+send_events_limit, count)):
-                nf = nflist[j]
-                # prepare event array from files
-                try:
-                    nf.moveto(sdir.temp)
-                except Exception:
-                    pass    # Silently go to next filename, somebody else might have interfered
-                try:
-                    with nf.open("rb") as fd:
-                        data = fd.read()
-                        event = json.loads(data)
-                        if node:
-                            nodelist = event.setdefault("Node", [])
-                            nodelist.insert(0, node)
-                        events.append(event)
-                        nf_sent.append(nf)
-                except Exception as e:
-                    Error("Error loading event", wclient.logger, exc=sys.exc_info(),
-                          detail={"file": str(nf), "sdir": sdir.path})
-                    nf.moveto(sdir.errors)
-
-            res = wclient.sendEvents(events)
-
-            count_ok = count_err = count_retry = 0
-            if isinstance(res, Error):
-                for e in res.errors:
-                    # list of events with this error (none means all events)
-                    evlist = e.get("events", xrange(len(nf_sent)))
-                    # error number
-                    error = e.get("error", 0)
-                    for idx in evlist:
-                        try:
-                            idx_n = int(idx)
-                            # 4xx errors are permanent, 5xx are server ones, suitable for retry
-                            dest_dir = sdir.errors if 400 <= error < 500 else sdir.incoming
-                        except ValueError:
-                            # Cannot grok event number, skip
-                            continue
-                        if nf_sent[idx_n]:
-                            nf_sent[idx_n].moveto(dest_dir)
-                            nf_sent[idx_n] = None
-                            if dest_dir == sdir.errors:
-                                count_err += 1
-                            else:
-                                count_retry += 1
-
-            # Cleanup rest - succesfully sent events
-            for name in nf_sent:
-                if name:
-                    name.remove()
-                    count_ok += 1
-            wclient.logger.info(
-                "warden_filer: saved %d, errors %d, retreated %d"
-                % (count_ok, count_err, count_retry))
+
+        events = []
+        nf_sent = []
+        for nf in nflist:
+            # prepare event array from files
+            try:
+                nf.moveto(sdir.temp)
+            except Exception:
+                continue    # Silently go to next filename, somebody else might have interfered
+            try:
+                with nf.open("rb") as fd:
+                    data = fd.read()
+                    event = json.loads(data)
+                    if node:
+                        nodelist = event.setdefault("Node", [])
+                        nodelist.insert(0, node)
+                    events.append(event)
+                    nf_sent.append(nf)
+            except Exception as e:
+                Error(message="Error loading event", exc=sys.exc_info(), file=str(nf),
+                      sdir=sdir.path).log(wclient.logger)
+                nf.moveto(sdir.errors)
+
+        res = wclient.sendEvents(events)
+
+        count_ok = count_err = count_retry = 0
+        if isinstance(res, Error):
+            for e in res.errors:
+                errno = e["error"]
+                evlist = e.get("events", xrange(len(nf_sent)))  # None means all
+                for i in evlist:
+                    if nf_sent[i]:
+                        nf_sent[i].moveto(dest_dir)
+                        nf_sent[i] = None
+                        count_err += 1
+
+        # Cleanup rest - succesfully sent events
+        for name in nf_sent:
+            if name:
+                name.remove()
+                count_ok += 1
+        wclient.logger.info(
+            "warden_filer: saved %d, errors %d" % (count_ok, count_err))
 
 
 
@@ -324,7 +308,6 @@ if __name__ == "__main__":
     wconfig, fconfig = get_configs()
 
     oneshot = args.oneshot
-
     safe_dir = SafeDir(fconfig.get("dir", args.func))
 
     wclient = Client(**wconfig)
diff --git a/warden3/warden_client/warden_client.py b/warden3/warden_client/warden_client.py
index a506d4c..c177682 100644
--- a/warden3/warden_client/warden_client.py
+++ b/warden3/warden_client/warden_client.py
@@ -11,8 +11,12 @@ from sys import stderr, exc_info
 from pprint import pformat
 from traceback import format_tb
 from os import path
+from time import sleep
+from operator import itemgetter
 
 
+VERSION = "3.0-not-even-alpha"
+
 class HTTPSConnection(httplib.HTTPSConnection):
     '''
     Overridden to allow peer certificate validation, configuration
@@ -56,16 +60,13 @@ class Error(Exception):
         Also, it can be raised as an exception.
     """
 
-    def __init__(self, logger=None, prio=logging.ERROR, method=None, req_id=None,
-            exc=None, errors=None, **kwargs):
+    def __init__(self, method=None, req_id=None, errors=None, **kwargs):
 
         self.errors = []
         if errors:
             self.extend(method, req_id, errors)
         if kwargs:
             self.append(method, req_id, **kwargs)
-        if logger:
-            log(logger, prio)
 
 
     def append(self, method=None, req_id=None, **kwargs):
@@ -76,12 +77,47 @@ class Error(Exception):
             kwargs["method"] = method
         if req_id and not "req_id" in kwargs:
             kwargs["req_id"] = req_id
+        # Ugly, but be paranoid, don't rely on server reply to be well formed
+        try:
+            kwargs["error"] = int(kwargs["error"])
+        except Exception:
+            kwargs["error"] = 0
+        if "events" in kwargs:
+            evlist = kwargs["events"]
+            try:
+                evlist_new = []
+                for ev in evlist:
+                    try:
+                        evlist_new.append(int(ev))
+                    except Exception:
+                        pass
+                kwargs["events"] = evlist_new
+            except Exception:
+                kwargs["events"] = []
+        if "events_id" in kwargs:
+            try:
+                dummy = iter(kwargs["events_id"])
+            except TypeError:
+                kwargs["events_id"] = [None]*len(kwargs["events"])
+        if "send_events_limit" in kwargs:
+            try:
+                kwargs["send_events_limit"] = int(kwargs["send_events_limit"])
+            except Exception:
+                del kwargs["send_events_limit"]
         self.errors.append(kwargs)
 
 
     def extend(self, method=None, req_id=None, iterable=[]):
+        try:
+            dummy = iter(iterable)
+        except TypeError:
+            iterable = []       # Bad joke from server
         for e in iterable:
-            self.append(method, req_id, **e)
+            try:
+                args = dict(e)
+            except TypeError:
+                args = {}       # Not funny!
+            self.append(method, req_id, **args)
 
 
     def __len__ (self):
@@ -105,10 +141,16 @@ class Error(Exception):
 
 
     def __str__(self):
-        return "\n".join(self.str_err(e) for e in self.errors)
+        out = []
+        for e in self.errors:
+            out.append(self.str_err(e))
+            out.append(self.str_info(e))
+        return "\n".join(out)
 
 
-    def log(self, logger, prio=logging.ERROR):
+    def log(self, logger=None, prio=logging.ERROR):
+        if not logger:
+            logger = logging.getLogger()
         for e in self.errors:
             logger.log(prio, self.str_err(e))
             info = self.str_info(e)
@@ -167,12 +209,15 @@ class Client(object):
             keyfile=None,
             cafile=None,
             timeout=60,
+            retry=3,
+            pause=5,
             recv_events_limit=6000,
-            errlog={"level": "debug"},
+            send_events_limit=500,
+            errlog={},
             syslog=None,
             filelog=None,
             idstore=None,
-            name="warden_client",
+            name="org.example.warden.test",
             secret=None):
 
         self.name = name
@@ -193,9 +238,15 @@ class Client(object):
         self.recv_events_limit = int(recv_events_limit)
         self.idstore = path.join(base, idstore) if idstore is not None else None
 
+        self.send_events_limit = int(send_events_limit)
+        self.retry = int(retry)
+        self.pause = int(pause)
+
         self.ciphers = 'TLS_RSA_WITH_AES_256_CBC_SHA'
         self.sslversion = ssl.PROTOCOL_TLSv1
 
+        self.getInfo()  # Call to align limits with server opinion
+
 
     def init_log(self, errlog, syslog, filelog):
 
@@ -237,7 +288,7 @@ class Client(object):
                 fl.setFormatter(format_time)
                 self.logger.addHandler(fl)
             except Exception as e:
-                Error(self.logger, message="Unable to setup file logging", exc=exc_info())
+                Error(message="Unable to setup file logging", exc=exc_info()).log(self.logger)
 
         if syslog is not None:
             try:
@@ -248,7 +299,7 @@ class Client(object):
                 sl.setFormatter(format_notime)
                 self.logger.addHandler(sl)
             except Exception as e:
-                Error(self.logger, message="Unable to setup syslog logging", exc=exc_info())
+                Error(message="Unable to setup syslog logging", exc=exc_info()).log(self.logger)
 
         if not (errlog or filelog or syslog):
             # User wants explicitly no logging, so let him shoot his socks off.
@@ -257,6 +308,12 @@ class Client(object):
             self.logger.addHandler(logging.NullHandler())
 
 
+    def log_err(self, err, prio=logging.ERROR):
+        if isinstance(err, Error):
+            err.log(self.logger, prio)
+        return err
+
+
     def connect(self):
 
         try:
@@ -276,18 +333,17 @@ class Client(object):
                     strict = False,
                     timeout = self.timeout)
             else:
-                return Error(self.logger, message="Don't know how to connect to \"%s\"" % self.url.scheme,
-                        detail={"url": self.url.geturl()})
+                return Error(message="Don't know how to connect to \"%s\"" % self.url.scheme,
+                        url=self.url.geturl())
         except Exception:
-            return Error(self.logger, message="HTTPS connection failed", exc=exc_info(),
-                detail={
-                    "url": self.url.geturl(),
-                    "timeout": self.timeout,
-                    "key_file": self.keyfile,
-                    "cert_file": self.certfile,
-                    "cafile": self.cafile,
-                    "ciphers": self.ciphers,
-                    "ssl_version": self.sslversion})
+            return Error(message="HTTP(S) connection failed", exc=exc_info(),
+                    url=self.url.geturl(),
+                    timeout=self.timeout,
+                    key_file=self.keyfile,
+                    cert_file=self.certfile,
+                    cafile=self.cafile,
+                    ciphers=self.ciphers,
+                    ssl_version=self.sslversion)
 
         return conn
 
@@ -313,8 +369,8 @@ class Client(object):
             else:
                 data = json.dumps(payload)
         except:
-            return Error(self.logger, message="Serialization to JSON failed",
-                exc=exc_info(), method=func, detail=payload)
+            return Error(message="Serialization to JSON failed",
+                exc=exc_info(), method=func, payload=payload)
 
         self.headers = {
             "Content-Type": "application/json",
@@ -332,29 +388,22 @@ class Client(object):
             conn.request("POST", loc, data, self.headers)
         except:
             conn.close()
-            return Error(self.logger, message="Sending of request to server failed",
-                exc=exc_info(), method=func, detail={
-                    "loc": loc,
-                    "headers": self.headers,
-                    "data": data})
+            return Error(message="Sending of request to server failed",
+                exc=exc_info(), method=func, log=loc, headers=self.headers, data=data)
 
         try:
             res = conn.getresponse()
         except:
             conn.close()
-            return Error(self.logger, method=func, message="HTTP reply failed", exc=exc_info(), detail={
-                "loc": loc,
-                "headers": self.headers,
-                "data": data})
+            return Error(method=func, message="HTTP reply failed",
+                exc=exc_info(), loc=loc, headers=self.headers, data=data)
 
         try:
             response_data = res.read()
         except:
             conn.close()
-            return Error(self.logger, method=func, message="Fetching HTTP data from server failed", exc=exc_info(), detail={
-                "loc": loc,
-                "headers": self.headers,
-                "data": data})
+            return Error(method=func, message="Fetching HTTP data from server failed",
+                exc=exc_info(), loc=loc, headers=self.headers, data=data)
 
         conn.close()
 
@@ -362,20 +411,17 @@ class Client(object):
             try:
                 data = json.loads(response_data)
             except:
-                data = Error(self.logger, message="JSON message parsing failed",
-                    exc=exc_info(), method=func, detail={"response": response_data})
+                data = Error(method=func, message="JSON message parsing failed",
+                    exc=exc_info(), response=response_data)
         else:
             try:
                 data = json.loads(response_data)
                 data["errors"]   # trigger exception if not dict or no error key
             except:
-                data = Error(self.logger, message="Generic server HTTP error",
-                    method=func,
-                    error=res.status,
-                    exc=exc_info(),
-                    detail={"response": response_data})
+                data = Error(method=func, message="Generic server HTTP error",
+                    error=res.status, exc=exc_info(), response=response_data)
             else:
-                data = Error(self.logger,
+                data = Error(
                     method=data.get("method", None),
                     req_id=data.get("req_id", None),
                     errors=data.get("errors", []))
@@ -392,8 +438,8 @@ class Client(object):
                 f.write(str(id))
         except (ValueError, IOError) as e:
             # Use Error instance just for proper logging
-            Error(self.logger, message="Writing id file \"%s\" failed" % idf,
-                prio=logging.INFO, exc=exc_info(), detail={"idstore": idf})
+            Error(message="Writing id file \"%s\" failed" % idf, exc=exc_info(),
+                  idstore=idf).log(self.logger, logging.INFO)
         return id
 
 
@@ -405,25 +451,96 @@ class Client(object):
             with open(idf, "r") as f:
                 id = int(f.read())
         except (ValueError, IOError) as e:
-            Error(self.logger, prio=logging.INFO,
-                message="Reading id file \"%s\" failed, relying on server" % idf,
-                exc=exc_info(), detail={"idstore": idf})
+            Error(message="Reading id file \"%s\" failed, relying on server" % idf,
+                  exc=exc_info(), idstore=idf).log(self.logger, logging.INFO)
             id = None
         return id
 
 
     def getDebug(self):
-        return self.sendRequest("getDebug")
+        return self.log_err(self.sendRequest("getDebug"))
 
 
     def getInfo(self):
-        return self.sendRequest("getInfo")
+        res = self.sendRequest("getInfo")
+        if isinstance(res, Error):
+            res.log(self.logger)
+        else:
+            try:
+                self.send_events_limit = min(res["send_events_limit"], self.send_events_limit)
+                self.recv_events_limit = min(res["recv_events_limit"], self.recv_events_limit)
+            except (AttributeError, TypeError, KeyError):
+                pass
+        return res
 
 
-    def sendEvents(self, events=[]):
-        res = self.sendRequest(
-            "sendEvents", payload=events)
-        return res
+    def send_events_raw(self, events=[]):
+        return self.sendRequest("sendEvents", payload=events)
+
+
+    def send_events_chunked(self, events=[]):
+        """ Split potentially long "events" list to send_events_limit
+            long chunks to avoid slap from server.
+        """
+        count = len(events)
+        err = Error()
+        send_events_limit = self.send_events_limit  # object stored value can change during sending
+        for offset in range(0, count, send_events_limit):
+            res = self.send_events_raw(events[offset:min(offset+send_events_limit, count)])
+
+            if isinstance(res, Error):
+                # Shift all error indices by offset to correspond with 'events' list
+                for e in res.errors:
+                    evlist = e.get("events", [])
+                    # Update sending limit advice, if present in error
+                    srv_limit = e.get("send_events_limit")
+                    if srv_limit:
+                        self.send_events_limit = min(self.send_events_limit, srv_limit)
+                    for i in range(len(evlist)):
+                        evlist[i] += offset
+                err.errors.extend(res.errors)
+
+        return err if err.errors else {}
+
+
+    def sendEvents(self, events=[], retry=None, pause=None):
+        """ Send out "events" list to server, retrying on server errors.
+        """
+        ev = events
+        idx_xlat = range(len(ev))
+        err = Error()
+        retry = retry or self.retry
+        attempt = retry
+        while ev and attempt:
+            if attempt<retry:
+                self.logger.info("%d transient errors, retrying (%d to go)" % (len(ev), attempt))
+                sleep(pause or self.pause)
+            res = self.send_events_chunked(ev)
+            attempt -= 1
+
+            next_ev = []
+            next_idx_xlat = []
+            if isinstance(res, Error):
+                # Sort to process fatal errors first
+                res.errors.sort(key=itemgetter("error"))
+                for e in res.errors:
+                    errno = e["error"]
+                    evlist = e.get("events", xrange(len(ev)))   # none means all
+                    if errno < 500 or not attempt:
+                        # Fatal error or last try, translate indices
+                        # to original and prepare for returning to caller
+                        for i in range(len(evlist)):
+                            evlist[i] = idx_xlat[evlist[i]]
+                        err.errors.append(e)
+                    else:
+                        # Maybe transient error, prepare to try again
+                        for evlist_i in evlist:
+                            next_ev.append(ev[evlist_i])
+                            next_idx_xlat.append(idx_xlat[evlist_i])
+            ev = next_ev
+            idx_xlat = next_idx_xlat
+
+        return self.log_err(err) if err.errors else {}
 
 
     def getEvents(self, id=None, idstore=None, count=None,
@@ -437,19 +554,19 @@ class Client(object):
         res = self.sendRequest(
             "getEvents", id=id, count=count or self.recv_events_limit, cat=cat,
             nocat=nocat, tag=tag, notag=notag, group=group, nogroup=nogroup)
-        if not res:
-            return res  # Should be Error instance
-
-        try:
-            events = res["events"]
-            newid = res["lastid"]
-        except KeyError:
-            return Error(self.logger, message="Server returned bogus reply",
-                method="getEvents", exc=exc_info(), detail={"response": res})
 
-        self._saveID(newid)
+        if res:
+            try:
+                events = res["events"]
+                newid = res["lastid"]
+            except KeyError:
+                events = Error(method="getEvents", message="Server returned bogus reply",
+                    exc=exc_info(), response=res)
+            self._saveID(newid)
+        else:
+            events = res
 
-        return events
+        return self.log_err(events)
 
 
     def close(self):
diff --git a/warden3/warden_server/warden_server.py b/warden3/warden_server/warden_server.py
index 3752e57..141be75 100755
--- a/warden3/warden_server/warden_server.py
+++ b/warden3/warden_server/warden_server.py
@@ -26,7 +26,7 @@ from random import randint
 # for local version of up to date jsonschema
 sys.path.append(path.join(path.dirname(__file__), "..", "lib"))
 
-from jsonschema import Draft4Validator, FormatChecker
+from jsonschema import Draft4Validator
 
 
 VERSION = "3.0-not-even-alpha"
@@ -41,7 +41,7 @@ class Error(Exception):
             self.errors.extend(errors)
 
 
-    def append(self, **kwargs):
+    def append(self, _events=None, **kwargs):
         self.errors.append(kwargs)
 
 
@@ -61,7 +61,6 @@ class Error(Exception):
             next_msg = e.get("message", "Unknown error")
             if msg != next_msg:
                 msg = "Multiple errors"
-        logging.debug("get_http_err_msg: %s, msg: %s, %s" % (err, msg, type(msg)))
         return err, msg
 
 
@@ -268,8 +267,8 @@ class Request(Object):
             self.req_id = 0 if env is None else randint(0x00000000, 0xFFFFFFFF)
 
 
-    def error(self, errors=None, **kwargs):
-        return Error(self.path, self.req_id, errors=errors, **kwargs)
+    def error(self, **kwargs):
+        return Error(self.path, self.req_id, **kwargs)
 
 
 
@@ -397,7 +396,7 @@ class JSONSchemaValidator(NoValidator):
         self.path = filename or path.join(path.dirname(__file__), "idea.schema")
         with open(self.path) as f:
             self.schema = json.load(f)
-        self.validator = Draft4Validator(self.schema, format_checker=FormatChecker())
+        self.validator = Draft4Validator(self.schema)
 
 
     def __str__(self):
@@ -637,7 +636,7 @@ class MySQL(ObjectReq):
             return []
         except Exception as e:
             self.con.rollback()
-            return [{"error": 500, "message": type(e).__name__ + ": " + str(e)}]
+            return [{"error": 500, "message": type(e).__name__}]
 
 
     def insertLastReceivedId(self, client, id):
@@ -779,7 +778,6 @@ class Server(ObjectReq):
         if isinstance(output, unicode):
             output = output.encode("utf-8")
         headers.append(('Content-Length', str(len(output))))
-        logging.debug("wsgi status %s, headers %s" % (status, headers))
         start_response(status, headers)
         self.req.reset()
         return [output]
@@ -899,9 +897,14 @@ class WardenHandler(ObjectReq):
         return []
 
 
-    def add_event_num(self, errlist, i):
+    def add_event_nums(self, ilist, events, errlist):
         for err in errlist:
-            err.setdefault("events", []).append(i)
+            err.setdefault("events", []).extend(ilist)
+            ev_ids = err.setdefault("events_id", [])
+            for i in ilist:
+                event = events[i]
+                id = event.get("ID", None)
+                ev_ids.append(id)
         return errlist
 
 
@@ -912,31 +915,32 @@ class WardenHandler(ObjectReq):
 
         errs = []
         if len(events)>self.send_events_limit:
-            errs.append({"error": 413, "message": "Too much events in one batch.",
-                "events": range(self.send_events_limit, len(events)),
-                "send_events_limit": self.send_events_limit})
+            errs.extend(
+                self.add_event_nums(range(self.send_events_limit, len(events)), events,
+                    [{"error": 507, "message": "Too much events in one batch.",
+                      "send_events_limit": self.send_events_limit}]))
 
         saved = 0
         for i, event in enumerate(events[0:self.send_events_limit]):
             v_errs = self.validator.check(event)
             if v_errs:
-                errs.extend(self.add_event_num(v_errs, i))
+                errs.extend(self.add_event_nums([i], events, v_errs))
                 continue
 
             node_errs = self.check_node(event, self.req.client.identity)
             if node_errs:
-                errs.extend(self.add_event_num(node_errs, i))
+                errs.extend(self.add_event_nums([i], events, node_errs))
                 continue
 
             if self.req.client.test and not 'Test' in event.get('Category', []):
-                errs.append({"error": 422, "events": [i],
+                errs.extend(self.add_event_nums([i], events, [{"error": 422,
                     "message": "You're allowed to send only messages, containing \"Test\" among categories.",
-                    "categories": event.get('Category', [])})
+                    "categories": event.get('Category', [])}]))
                 continue
 
             db_errs = self.db.store_event(self.req.client, event)
             if db_errs:
-                errs.extend(self.add_event_num(db_errs, i))
+                errs.extend(self.add_event_nums([i], events, db_errs))
                 continue
 
             saved += 1
-- 
GitLab