Skip to content
Snippets Groups Projects
Commit 654edef1 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

sendEvents now returns dict of specific message errors instead of fatal Error

parent c5ad11e0
No related branches found
No related tags found
No related merge requests found
...@@ -423,34 +423,31 @@ class MySQL(Object): ...@@ -423,34 +423,31 @@ class MySQL(Object):
} }
def store_events(self, client, events): def store_event(self, client, event):
errs = [] # See sendEvents and validation, should return something similar try:
# logging.debug("INSERT INTO events (detected,received,service_id,data) VALUES ('%s', NOW(), '%s', '%s')" % (event['DetectTime'], client["service"]["service_id"], self.con.escape_string(str(event))))
for event in events: self.crs.execute("INSERT INTO events (detected,received,service_id,data) VALUES ('%s', NOW(), '%s', '%s')" % (event['DetectTime'], client["service"]["service_id"], self.con.escape_string(str(event))))
try: lastid = self.crs.lastrowid
# logging.debug("INSERT INTO events (detected,received,service_id,data) VALUES ('%s', NOW(), '%s', '%s')" % (event['DetectTime'], client["service"]["service_id"], self.con.escape_string(str(event)))) # logging.debug(str(lastid))
self.crs.execute("INSERT INTO events (detected,received,service_id,data) VALUES ('%s', NOW(), '%s', '%s')" % (event['DetectTime'], client["service"]["service_id"], self.con.escape_string(str(event)))) for cat in event['Category']:
lastid = self.crs.lastrowid # logging.debug({'cat': cat})
# logging.debug(str(lastid)) cat_id = self.map_id('Category', cat) if self.map_id('Category', cat) else self.map_id('Category', 'Other.Other')
for cat in event['Category']: # logging.debug({'cat_id': cat_id})
# logging.debug({'cat': cat}) # logging.debug("INSERT INTO event_category_mapping (event_id,category_id) VALUES ('%s', '%s')" % (str(lastid), str(cat_id)))
cat_id = self.map_id('Category', cat) if self.map_id('Category', cat) else self.map_id('Category', 'Other.Other') self.crs.execute("INSERT INTO event_category_mapping (event_id,category_id) VALUES ('%s', '%s')" % (str(lastid), str(cat_id)))
# logging.debug({'cat_id': cat_id})
# logging.debug("INSERT INTO event_category_mapping (event_id,category_id) VALUES ('%s', '%s')" % (str(lastid), str(cat_id))) for tag in event['Node'][0]['Tags']:
self.crs.execute("INSERT INTO event_category_mapping (event_id,category_id) VALUES ('%s', '%s')" % (str(lastid), str(cat_id))) tag_id = self.map_id('Tag', tag) if self.map_id('Tag', tag) else self.map_id('Tag', 'Other')
# logging.debug({'tag_id': tag_id})
for tag in event['Node'][0]['Tags']: # logging.debug("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES ('%s', '%s')" % (str(lastid), tag_id))
tag_id = self.map_id('Tag', tag) if self.map_id('Tag', tag) else self.map_id('Tag', 'Other') self.crs.execute("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES ('%s', '%s')" % (str(lastid), str(tag_id)))
# logging.debug({'tag_id': tag_id})
# logging.debug("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES ('%s', '%s')" % (str(lastid), tag_id)) self.con.commit()
self.crs.execute("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES ('%s', '%s')" % (str(lastid), str(tag_id))) return []
except Exception as e:
self.con.commit() self.con.rollback()
except Exception as e: return [{"event": event, "error": type(e).__name__ + ": " + str(e)}]
self.con.rollback()
errs.append({"event": event, "error": str(e)})
return errs
def insertLastReceivedId(self, client, id): def insertLastReceivedId(self, client, id):
logging.debug("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())" % (str(client["id"]), id)) logging.debug("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())" % (str(client["id"]), id))
...@@ -798,31 +795,30 @@ class WardenHandler(Object): ...@@ -798,31 +795,30 @@ class WardenHandler(Object):
raise Error("Too much events in one batch", 400, method="sendEvents", raise Error("Too much events in one batch", 400, method="sendEvents",
detail={"limit": self.send_events_limit}) detail={"limit": self.send_events_limit})
# FIXME: Maybe just croak on first bad event, save good ones so far saved = 0
# and make client deal with the rest? Would simplify server error errs = {}
# handling greatly. for i, event in enumerate(events):
okevents = [] ev_errs = []
valerrs = []
for event in events:
auth_cl = self.auth.authorize(_env, _client, 'sendEvents', event, None) auth_cl = self.auth.authorize(_env, _client, 'sendEvents', event, None)
if not auth_cl: if not auth_cl:
raise Error("I'm watching YOU. (Authorization)", 403, method='sendEvents', detail={"client": _client}) errs[i] = ["Client %i(%s) does not correspond with event Node info or is not allowed to write" % (_client["service"]["service_id"], _client["service"]["identity"])]
continue
verrs = self.validator.check(event) v_errs = self.validator.check(event)
if verrs: if v_errs:
valerrs.append({"errors": verrs, "event": event}) errs[i] = v_errs
else: continue
okevents.append(event)
dberrs = self.db.store_events(auth_cl, okevents) db_errs = self.db.store_event(auth_cl, event)
if db_errs:
errs[i] = db_errs
if valerrs or dberrs: saved += 1
raise Error("Event storage error", 500, method="sendEvents",
detail=valerrs+dberrs)
logging.info("sendEvents(...): Saved %i events" % len(okevents)) logging.info("sendEvents(...): Saved %i events" % saved)
return {"saved": len(okevents)} return errs
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment