diff --git a/warden3/warden_server/warden_server.py b/warden3/warden_server/warden_server.py index d1701e572779f9b2b77503f3758b375d37a3ac97..e215fd98db17b95a797351173667ea47b5c38dbc 100755 --- a/warden3/warden_server/warden_server.py +++ b/warden3/warden_server/warden_server.py @@ -423,34 +423,31 @@ class MySQL(Object): } - def store_events(self, client, events): - errs = [] # See sendEvents and validation, should return something similar - - for event in events: - try: - # logging.debug("INSERT INTO events (detected,received,service_id,data) VALUES ('%s', NOW(), '%s', '%s')" % (event['DetectTime'], client["service"]["service_id"], self.con.escape_string(str(event)))) - self.crs.execute("INSERT INTO events (detected,received,service_id,data) VALUES ('%s', NOW(), '%s', '%s')" % (event['DetectTime'], client["service"]["service_id"], self.con.escape_string(str(event)))) - lastid = self.crs.lastrowid - # logging.debug(str(lastid)) - for cat in event['Category']: - # logging.debug({'cat': cat}) - cat_id = self.map_id('Category', cat) if self.map_id('Category', cat) else self.map_id('Category', 'Other.Other') - # logging.debug({'cat_id': cat_id}) - # logging.debug("INSERT INTO event_category_mapping (event_id,category_id) VALUES ('%s', '%s')" % (str(lastid), str(cat_id))) - self.crs.execute("INSERT INTO event_category_mapping (event_id,category_id) VALUES ('%s', '%s')" % (str(lastid), str(cat_id))) - - for tag in event['Node'][0]['Tags']: - tag_id = self.map_id('Tag', tag) if self.map_id('Tag', tag) else self.map_id('Tag', 'Other') - # logging.debug({'tag_id': tag_id}) - # logging.debug("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES ('%s', '%s')" % (str(lastid), tag_id)) - self.crs.execute("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES ('%s', '%s')" % (str(lastid), str(tag_id))) - - self.con.commit() - except Exception as e: - self.con.rollback() - errs.append({"event": event, "error": str(e)}) - - return errs + def store_event(self, client, event): + try: + # logging.debug("INSERT INTO events (detected,received,service_id,data) VALUES ('%s', NOW(), '%s', '%s')" % (event['DetectTime'], client["service"]["service_id"], self.con.escape_string(str(event)))) + self.crs.execute("INSERT INTO events (detected,received,service_id,data) VALUES ('%s', NOW(), '%s', '%s')" % (event['DetectTime'], client["service"]["service_id"], self.con.escape_string(str(event)))) + lastid = self.crs.lastrowid + # logging.debug(str(lastid)) + for cat in event['Category']: + # logging.debug({'cat': cat}) + cat_id = self.map_id('Category', cat) if self.map_id('Category', cat) else self.map_id('Category', 'Other.Other') + # logging.debug({'cat_id': cat_id}) + # logging.debug("INSERT INTO event_category_mapping (event_id,category_id) VALUES ('%s', '%s')" % (str(lastid), str(cat_id))) + self.crs.execute("INSERT INTO event_category_mapping (event_id,category_id) VALUES ('%s', '%s')" % (str(lastid), str(cat_id))) + + for tag in event['Node'][0]['Tags']: + tag_id = self.map_id('Tag', tag) if self.map_id('Tag', tag) else self.map_id('Tag', 'Other') + # logging.debug({'tag_id': tag_id}) + # logging.debug("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES ('%s', '%s')" % (str(lastid), tag_id)) + self.crs.execute("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES ('%s', '%s')" % (str(lastid), str(tag_id))) + + self.con.commit() + return [] + except Exception as e: + self.con.rollback() + return [{"event": event, "error": type(e).__name__ + ": " + str(e)}] + def insertLastReceivedId(self, client, id): logging.debug("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())" % (str(client["id"]), id)) @@ -798,31 +795,30 @@ class WardenHandler(Object): raise Error("Too much events in one batch", 400, method="sendEvents", detail={"limit": self.send_events_limit}) - # FIXME: Maybe just croak on first bad event, save good ones so far - # and make client deal with the rest? Would simplify server error - # handling greatly. - okevents = [] - valerrs = [] - for event in events: + saved = 0 + errs = {} + for i, event in enumerate(events): + ev_errs = [] + auth_cl = self.auth.authorize(_env, _client, 'sendEvents', event, None) if not auth_cl: - raise Error("I'm watching YOU. (Authorization)", 403, method='sendEvents', detail={"client": _client}) + errs[i] = ["Client %i(%s) does not correspond with event Node info or is not allowed to write" % (_client["service"]["service_id"], _client["service"]["identity"])] + continue - verrs = self.validator.check(event) - if verrs: - valerrs.append({"errors": verrs, "event": event}) - else: - okevents.append(event) + v_errs = self.validator.check(event) + if v_errs: + errs[i] = v_errs + continue - dberrs = self.db.store_events(auth_cl, okevents) + db_errs = self.db.store_event(auth_cl, event) + if db_errs: + errs[i] = db_errs - if valerrs or dberrs: - raise Error("Event storage error", 500, method="sendEvents", - detail=valerrs+dberrs) + saved += 1 - logging.info("sendEvents(...): Saved %i events" % len(okevents)) + logging.info("sendEvents(...): Saved %i events" % saved) - return {"saved": len(okevents)} + return errs