diff --git a/warden3/contrib/warden_filer/warden_filer.py b/warden3/contrib/warden_filer/warden_filer.py index 62b4852592554aee8378603c4a561cbdaa1ee567..fc8cae6e358646ce86eee8852f99cd032e7dbe34 100644 --- a/warden3/contrib/warden_filer/warden_filer.py +++ b/warden3/contrib/warden_filer/warden_filer.py @@ -221,26 +221,27 @@ def sender(config, wclient, sdir, oneshot): count_ok = count_err = count_retry = 0 if isinstance(res, Error): - try: - errs = res.detail["errors"] - except (KeyError, AttributeError, TypeError): - errs = None - if errs: - # Event errors - move bad events into "errors" - for e in errs.iterkeys(): + for e in res.errors: + # list of events with this error (none means all events) + evlist = e.get("events", xrange(len(nf_sent))) + # error number + error = e.get("error", 0) + for idx in evlist: try: - idx = int(e) + idx_n = int(idx) + # 4xx errors are permanent, 5xx are server ones, suitable for retry + dest_dir = sdir.errors if 400 <= error < 500 else sdir.incoming except ValueError: + # Cannot grok event number, skip continue - nf_sent[idx].moveto(sdir.errors) - nf_sent[idx] = None - count_err += 1 - else: - # Global errors - move all events back to "incoming" for attempt in next round - for idx in range(len(nf_sent)): - nf_sent[idx].moveto(sdir.incoming) - nf_sent[idx] = None - count_retry += 1 + if nf_sent[idx_n]: + nf_sent[idx_n].moveto(dest_dir) + nf_sent[idx_n] = None + if dest_dir == sdir.errors: + count_err += 1 + else: + count_retry += 1 + # Cleanup rest - succesfully sent events for name in nf_sent: if name: diff --git a/warden3/warden_client/warden_client.py b/warden3/warden_client/warden_client.py index 9f13ab7c8501f3eedb62f93344e31061bcd0f724..a506d4c5c7f99e28c609d5d05e0c1ddaf9eee234 100644 --- a/warden3/warden_client/warden_client.py +++ b/warden3/warden_client/warden_client.py @@ -56,24 +56,32 @@ class Error(Exception): Also, it can be raised as an exception. """ - def __init__(self, message, logger=None, error=None, prio="error", method=None, - req_id=None, detail=None, exc=None): - - self.message = message - self.error = error - self.method = method - self.req_id = req_id - self.detail = detail - (self.exctype, self.excval, self.exctb) = exc or (None, None, None) - self.cause = self.excval # compatibility with other exceptions + def __init__(self, logger=None, prio=logging.ERROR, method=None, req_id=None, + exc=None, errors=None, **kwargs): + + self.errors = [] + if errors: + self.extend(method, req_id, errors) + if kwargs: + self.append(method, req_id, **kwargs) if logger: - getattr(logger, prio, "error")(str(self)) - info = self.info_str() - if info: - logger.info(info) - debug = self.debug_str() - if debug: - logger.debug(debug) + log(logger, prio) + + + def append(self, method=None, req_id=None, **kwargs): + # We shift method and req_id into each and every error, because + # we want to be able to simply merge more Error arrays (for + # returning errors from more Warden calls at once + if method and not "method" in kwargs: + kwargs["method"] = method + if req_id and not "req_id" in kwargs: + kwargs["req_id"] = req_id + self.errors.append(kwargs) + + + def extend(self, method=None, req_id=None, iterable=[]): + for e in iterable: + self.append(method, req_id, **e) def __len__ (self): @@ -97,29 +105,56 @@ class Error(Exception): def __str__(self): + return "\n".join(self.str_err(e) for e in self.errors) + + + def log(self, logger, prio=logging.ERROR): + for e in self.errors: + logger.log(prio, self.str_err(e)) + info = self.str_info(e) + if info: + logger.info(info) + debug = self.str_debug(e) + if debug: + logger.debug(debug) + + + def str_preamble(self, e): + return "%08x/%s" % (e.get("req_id", 0), e.get("method", "?")) + + + def str_err(self, e): out = [] - out.append("(%s)" % (self.error or "local")) - if self.method is not None: - out.append(" in %s" % self.method) - if self.req_id is not None: - out.append("(%08x)" % self.req_id) - if self.message is not None: - out.append(": %s" % self.message) - if self.excval is not None: - out.append(" - cause was %s: %s" % (type(self.excval).__name__, str(self.excval))) + out.append(self.str_preamble(e)) + out.append(" Error(%s) %s " % (e.get("error", 0), e.get("message", "Unknown error"))) + if "exc" in e and e["exc"]: + out.append("(cause was %s: %s)" % (e["exc"][0].__name__, str(e["exc"][1]))) return "".join(out) - def info_str(self): - return ("Detail: %s" % pformat(self.detail)) or "" + def str_info(self, e): + ecopy = dict(e) # shallow copy + ecopy.pop("req_id", None) + ecopy.pop("method", None) + ecopy.pop("error", None) + ecopy.pop("message", None) + ecopy.pop("exc", None) + if ecopy: + out = "%s Detail: %s" % (self.str_preamble(e), json.dumps(ecopy, default=lambda v: str(v))) + else: + out = "" + return out - def debug_str(self): + def str_debug(self, e): out = [] - if self.excval is not None: - out.append("Exception %s: %s\n" % (type(self.excval).__name__, str(self.excval))) - if self.exctb is not None: - out.append("Traceback:\n%s" % "".join(format_tb(self.exctb))) + out.append(self.str_preamble(e)) + if not "exc" in e or not e["exc"]: + return "" + exc_tb = e["exc"][2] + if exc_tb: + out.append("Traceback:\n") + out.extend(format_tb(exc_tb)) return "".join(out) @@ -178,7 +213,7 @@ class Client(object): self.logger.warning("Unknown syslog facility \"%s\", using \"local7\"" % fac) return logging.handlers.SysLogHandler.LOG_LOCAL7 - form = "%(filename)s[%(process)d]: (%(levelname)s) %(name)s %(message)s" + form = "%(filename)s[%(process)d]: %(name)s (%(levelname)s) %(message)s" format_notime = logging.Formatter(form) format_time = logging.Formatter('%(asctime)s ' + form) @@ -189,7 +224,7 @@ class Client(object): if errlog is not None: el = logging.StreamHandler(stderr) el.setFormatter(format_time) - el.setLevel(loglevel(errlog.get("level", "debug"))) + el.setLevel(loglevel(errlog.get("level", "info"))) self.logger.addHandler(el) if filelog is not None: @@ -198,22 +233,22 @@ class Client(object): filename=path.join( path.dirname(__file__), filelog.get("file", "%s.log" % self.name))) - fl.setLevel(loglevel(filelog.get("level", "warning"))) + fl.setLevel(loglevel(filelog.get("level", "debug"))) fl.setFormatter(format_time) self.logger.addHandler(fl) except Exception as e: - Error("Unable to setup file logging", self.logger, exc=exc_info()) + Error(self.logger, message="Unable to setup file logging", exc=exc_info()) if syslog is not None: try: sl = logging.handlers.SysLogHandler( address=syslog.get("socket", "/dev/log"), facility=facility(syslog.get("facility", "local7"))) - sl.setLevel(loglevel(syslog.get("level", "warning"))) + sl.setLevel(loglevel(syslog.get("level", "debug"))) sl.setFormatter(format_notime) self.logger.addHandler(sl) except Exception as e: - Error("Unable to setup syslog logging", self.logger, exc=exc_info()) + Error(self.logger, message="Unable to setup syslog logging", exc=exc_info()) if not (errlog or filelog or syslog): # User wants explicitly no logging, so let him shoot his socks off. @@ -241,10 +276,10 @@ class Client(object): strict = False, timeout = self.timeout) else: - return Error("Don't know how to connect to \"%s\"" % self.url.scheme, self.logger, + return Error(self.logger, message="Don't know how to connect to \"%s\"" % self.url.scheme, detail={"url": self.url.geturl()}) except Exception: - return Error("HTTPS connection failed", self.logger, exc=exc_info(), + return Error(self.logger, message="HTTPS connection failed", exc=exc_info(), detail={ "url": self.url.geturl(), "timeout": self.timeout, @@ -278,7 +313,7 @@ class Client(object): else: data = json.dumps(payload) except: - return Error("Serialization to JSON failed", self.logger, + return Error(self.logger, message="Serialization to JSON failed", exc=exc_info(), method=func, detail=payload) self.headers = { @@ -297,7 +332,7 @@ class Client(object): conn.request("POST", loc, data, self.headers) except: conn.close() - return Error("Sending of request to server failed", self.logger, + return Error(self.logger, message="Sending of request to server failed", exc=exc_info(), method=func, detail={ "loc": loc, "headers": self.headers, @@ -307,7 +342,7 @@ class Client(object): res = conn.getresponse() except: conn.close() - return Error("HTTP reply failed", self.logger, method=func, exc=exc_info(), detail={ + return Error(self.logger, method=func, message="HTTP reply failed", exc=exc_info(), detail={ "loc": loc, "headers": self.headers, "data": data}) @@ -316,7 +351,7 @@ class Client(object): response_data = res.read() except: conn.close() - return Error("Fetching HTTP data from server failed", self.logger, method=func, exc=exc_info(), detail={ + return Error(self.logger, method=func, message="Fetching HTTP data from server failed", exc=exc_info(), detail={ "loc": loc, "headers": self.headers, "data": data}) @@ -327,24 +362,23 @@ class Client(object): try: data = json.loads(response_data) except: - data = Error("JSON message parsing failed", self.logger, + data = Error(self.logger, message="JSON message parsing failed", exc=exc_info(), method=func, detail={"response": response_data}) else: try: data = json.loads(response_data) - data["error"] # trigger exception if not dict or no error key + data["errors"] # trigger exception if not dict or no error key except: - data = Error("Generic server HTTP error", self.logger, + data = Error(self.logger, message="Generic server HTTP error", method=func, error=res.status, exc=exc_info(), detail={"response": response_data}) else: - data = Error(data.get("message", None), self.logger, + data = Error(self.logger, method=data.get("method", None), - error=res.status, req_id=data.get("req_id", None), - detail=data.get("detail", None)) + errors=data.get("errors", [])) return data @@ -358,8 +392,8 @@ class Client(object): f.write(str(id)) except (ValueError, IOError) as e: # Use Error instance just for proper logging - Error("Writing id file \"%s\" failed" % idf, self.logger, - prio="info", exc=exc_info(), detail={"idstore": idf}) + Error(self.logger, message="Writing id file \"%s\" failed" % idf, + prio=logging.INFO, exc=exc_info(), detail={"idstore": idf}) return id @@ -371,8 +405,9 @@ class Client(object): with open(idf, "r") as f: id = int(f.read()) except (ValueError, IOError) as e: - Error("Reading id file \"%s\" failed, relying on server" % idf, - self.logger, prio="info", exc=exc_info(), detail={"idstore": idf}) + Error(self.logger, prio=logging.INFO, + message="Reading id file \"%s\" failed, relying on server" % idf, + exc=exc_info(), detail={"idstore": idf}) id = None return id @@ -409,7 +444,7 @@ class Client(object): events = res["events"] newid = res["lastid"] except KeyError: - return Error("Server returned bogus reply", self.logger, + return Error(self.logger, message="Server returned bogus reply", method="getEvents", exc=exc_info(), detail={"response": res}) self._saveID(newid) diff --git a/warden3/warden_server/warden_server.py b/warden3/warden_server/warden_server.py index cae6178ae65029a6a2107e85f9ca509b3ff3a3b9..3752e576aaf79c83ba8c95b044150f3e3161957f 100755 --- a/warden3/warden_server/warden_server.py +++ b/warden3/warden_server/warden_server.py @@ -16,7 +16,7 @@ import MySQLdb as my import MySQLdb.cursors as mycursors from collections import namedtuple from uuid import uuid4 -from time import time, gmtime +from time import time, gmtime, sleep from math import trunc from io import BytesIO from urlparse import parse_qs @@ -31,57 +31,99 @@ from jsonschema import Draft4Validator, FormatChecker VERSION = "3.0-not-even-alpha" - class Error(Exception): - def __init__(self, message, error=500, exc=None, - method=None, req_id=None, detail=None): - self.message = message - self.error = int(error) - (self.exctype, self.excval, self.exctb) = exc or (None, None, None) - self.cause = self.excval # compatibility with other exceptions + def __init__(self, method=None, req_id=None, errors=None, **kwargs): self.method = method self.req_id = req_id - self.detail = detail + self.errors = [kwargs] if kwargs else [] + if errors: + self.errors.extend(errors) + + + def append(self, **kwargs): + self.errors.append(kwargs) + + + def get_http_err_msg(self): + try: + err = self.errors[0]["error"] + msg = self.errors[0]["message"] + except (IndexError, KeyError): + err = 500 + msg = "There's NO self-destruction button! Ah, you've just found it..." + for e in self.errors: + next_err = e.get("error", 500) + if err != next_err: + # errors not same, round to basic err code (400, 500) + # and use the highest one + err = max(err//100, next_err//100)*100 + next_msg = e.get("message", "Unknown error") + if msg != next_msg: + msg = "Multiple errors" + logging.debug("get_http_err_msg: %s, msg: %s, %s" % (err, msg, type(msg))) + return err, msg def __str__(self): + return "\n".join(self.str_err(e) for e in self.errors) + + + def log(self, logger, prio=logging.ERROR): + for e in self.errors: + logger.log(prio, self.str_err(e)) + info = self.str_info(e) + if info: + logger.info(info) + debug = self.str_debug(e) + if debug: + logger.debug(debug) + + + def str_err(self, e): out = [] - out.append("Error(%s)" % (self.error)) - if self.method is not None: - out.append(" in \"%s\"" % self.method) - if self.message is not None: - out.append(": %s" % self.message) - if self.excval is not None: - out.append(" - cause was %s: %s" % (type(self.excval).__name__, str(self.excval))) + out.append("Error(%s) %s " % (e.get("error", 0), e.get("message", "Unknown error"))) + if "exc" in e and e["exc"]: + out.append("(cause was %s: %s)" % (e["exc"][0].__name__, str(e["exc"][1]))) return "".join(out) - def info_str(self): - return ("Detail: %s" % self.detail) if self.detail else "" + def str_info(self, e): + ecopy = dict(e) # shallow copy + ecopy.pop("req_id", None) + ecopy.pop("method", None) + ecopy.pop("error", None) + ecopy.pop("message", None) + ecopy.pop("exc", None) + if ecopy: + out = "Detail: %s" % (json.dumps(ecopy, default=lambda v: str(v))) + else: + out = "" + return out - def debug_str(self): + def str_debug(self, e): out = [] - if self.excval is not None: - out.append("Exception %s: %s\n" % (type(self.excval).__name__, str(self.excval))) - if self.exctb is not None: - out.append("Traceback:\n%s" % "".join(format_tb(self.exctb))) + if not "exc" in e or not e["exc"]: + return "" + exc_tb = e["exc"][2] + if exc_tb: + out.append("Traceback:\n") + out.extend(format_tb(exc_tb)) return "".join(out) def to_dict(self): - d = {} - if self.error is not None: - d["error"] = self.error - if self.method is not None: - d["method"] = self.method - if self.message is not None: - d["message"] = self.message - if self.detail is not None: - d["detail"] = self.detail - if self.req_id is not None: - d["req_id"] = self.req_id + errlist = [] + for e in self.errors: + ecopy = dict(e) + ecopy.pop("exc", None) + errlist.append(ecopy) + d = { + "method": self.method, + "req_id": self.req_id, + "errors": errlist + } return d @@ -226,8 +268,8 @@ class Request(Object): self.req_id = 0 if env is None else randint(0x00000000, 0xFFFFFFFF) - def error(self, message, error=500, exc=None, detail=None): - return Error(message, error, exc, self.path, self.req_id, detail=detail) + def error(self, errors=None, **kwargs): + return Error(self.path, self.req_id, errors=errors, **kwargs) @@ -370,11 +412,11 @@ class JSONSchemaValidator(NoValidator): res = [] for error in sorted(self.validator.iter_errors(event), key=sortkey): - res.append( - "Validation error: key \"%s\", value \"%s\", expected - %s" % ( - u"/".join(str(v) for v in error.path), + res.append({"error": 460, + "message": "Validation error: key \"%s\", value \"%s\", expected - %s" % ( + "/".join(str(v) for v in error.path), error.instance, - error.schema.get('description', 'no additional info'))) + error.schema.get('description', 'no additional info'))}) return res @@ -382,13 +424,15 @@ class JSONSchemaValidator(NoValidator): class MySQL(ObjectReq): - def __init__(self, req, host, user, password, dbname, port, catmap_filename, tagmap_filename): + def __init__(self, req, host, user, password, dbname, port, retry_count, retry_pause, catmap_filename, tagmap_filename): ObjectReq.__init__(self, req) self.host = host self.user = user self.password = password self.dbname = dbname self.port = port + self.retry_count = retry_count + self.retry_pause = retry_pause self.catmap_filename = catmap_filename self.tagmap_filename = tagmap_filename @@ -400,14 +444,52 @@ class MySQL(ObjectReq): self.tagmap = json.load(tagmap_fd) self.tagmap_other = self.catmap["Other"] # Catch error soon, avoid lookup later + self.con = self.crs = None + + self.connect() + + + def __str__(self): + return "%s(req=%s, host='%s', user='%s', dbname='%s', port=%d, retry_count=%d, retry_pause=%d, catmap_filename=\"%s\", tagmap_filename=\"%s\")" % ( + type(self).__name__, type(self.req).__name__, self.host, self.user, self.dbname, self.port, self.retry_count, self.retry_pause, self.catmap_filename, self.tagmap_filename) + + + def connect(self): self.con = my.connect(host=self.host, user=self.user, passwd=self.password, db=self.dbname, port=self.port, cursorclass=mycursors.DictCursor) self.crs = self.con.cursor() - def __str__(self): - return "%s(req=%s, host='%s', user='%s', dbname='%s', port=%d, catmap_filename=\"%s\", tagmap_filename=\"%s\")" % ( - type(self).__name__, type(self.req).__name__, self.host, self.user, self.dbname, self.port, self.catmap_filename, self.tagmap_filename) + def close(self): + try: + if self.crs: + self.crs.close() + if self.con: + self.con.close() + except Exception: + pass + + + __del__ = close + + + def execute(self, *args, **kwargs): + """ Execute query on self.con, reconnecting if necessary """ + success = False + countdown = self.retry_count + while not success: + try: + self.crs.execute(*args, **kwargs) + success = True + except my.OperationalError: + if not countdown: + raise + logging.info("execute: Database down, trying to reconnect (%d attempts left)..." % countdown) + if countdown<self.retry_count: + sleep(self.retry_pause) # no need to melt down server on longer outage + self.close() + self.connect() + countdown -= 1 def _get_comma_perc(self, l): @@ -429,7 +511,7 @@ class MySQL(ObjectReq): params.append(secret) query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names)) params.extend(cert_names) - self.crs.execute("".join(query), params) + self.execute("".join(query), params) rows = self.crs.fetchall() if len(rows)>1: @@ -441,9 +523,9 @@ class MySQL(ObjectReq): def get_debug(self): - self.crs.execute("SELECT VERSION() AS VER") + self.execute("SELECT VERSION() AS VER") row = self.crs.fetchone() - self.crs.execute("SHOW TABLE STATUS") + self.execute("SHOW TABLE STATUS") tablestat = self.crs.fetchall() return { "db": "MySQL", @@ -458,8 +540,8 @@ class MySQL(ObjectReq): try: mapped = section[v] except KeyError: - raise self.req.error("Wrong tag or category used in query.", 422, - sys.exc_info(), detail={"key": v}) + raise self.req.error(message="Wrong tag or category used in query.", error=422, + exc=sys.exc_info(), key=v) maps.append(mapped) return set(maps) # unique @@ -472,14 +554,14 @@ class MySQL(ObjectReq): logging.debug("fetch_events: id=%i, count=%i, cat=%s, nocat=%s, tag=%s, notag=%s, group=%s, nogroup=%s" % (id, count, str(cat), str(nocat), str(tag), str(notag), str(group), str(nogroup))) if cat and nocat: - raise self.req.error("Unrealizable conditions. Choose cat or nocat option.", 422, - detail={'cat': cat, 'nocat' : nocat}) + raise self.req.error(message="Unrealizable conditions. Choose cat or nocat option.", error=422, + cat=cat, nocat=nocat) if tag and notag: - raise self.req.error("Unrealizable conditions. Choose tag or notag option.", 422, - detail={'tag': cat, 'notag' : nocat}) + raise self.req.error(message="Unrealizable conditions. Choose tag or notag option.", error=422, + tag=tag, notag=notag) if group and nogroup: - raise self.req.error("Unrealizable conditions. Choose group or nogroup option.", 422, - detail={'tag': cat, 'notag' : nocat}) + raise self.req.error(message="Unrealizable conditions. Choose group or nogroup option.", error=422, + group=group, nogroup=nogroup) query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"] params = [id or 0] @@ -515,7 +597,7 @@ class MySQL(ObjectReq): logging.debug("fetch_events: query - %s" % query_string) logging.debug("fetch_events: params - %s", str(params)) - self.crs.execute(query_string, params) + self.execute(query_string, params) row = self.crs.fetchall() if row: @@ -533,14 +615,14 @@ class MySQL(ObjectReq): def store_event(self, client, event): try: - self.crs.execute("INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)", (client.id, json.dumps(event))) + self.execute("INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)", (client.id, json.dumps(event))) lastid = self.crs.lastrowid catlist = event.get('Category', ["Other"]) cats = set(catlist) | set(cat.split(".", 1)[0] for cat in catlist) for cat in cats: cat_id = self.catmap.get(cat, self.catmap_other) - self.crs.execute("INSERT INTO event_category_mapping (event_id,category_id) VALUES (%s, %s)", (lastid, cat_id)) + self.execute("INSERT INTO event_category_mapping (event_id,category_id) VALUES (%s, %s)", (lastid, cat_id)) try: tags = event['Node'][0]['Tags'] @@ -549,28 +631,28 @@ class MySQL(ObjectReq): for tag in tags: tag_id = self.tagmap.get(tag, self.tagmap_other) - self.crs.execute("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES (%s, %s)", (lastid, tag_id)) + self.execute("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES (%s, %s)", (lastid, tag_id)) self.con.commit() return [] except Exception as e: self.con.rollback() - return [type(e).__name__ + ": " + str(e)] + return [{"error": 500, "message": type(e).__name__ + ": " + str(e)}] def insertLastReceivedId(self, client, id): logging.debug("insertLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname)) - self.crs.execute("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())", (client.id, id)) + self.execute("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())", (client.id, id)) self.con.commit() def getLastEventId(self): - self.crs.execute("SELECT MAX(id) as id FROM events") + self.execute("SELECT MAX(id) as id FROM events") row = self.crs.fetchone() return row['id'] or 0 def getLastReceivedId(self, client): - self.crs.execute("SELECT MAX(event_id) as id FROM last_events WHERE client_id = %s", client.id) + self.execute("SELECT MAX(event_id) as id FROM last_events WHERE client_id = %s", client.id) row = self.crs.fetchone() id = row['id'] if row is not None else 0 @@ -636,31 +718,31 @@ class Server(ObjectReq): try: injson = environ['wsgi.input'].read() except: - raise self.req.error("Data read error.", 408, sys.exc_info()) + raise self.req.error(message="Data read error.", error=408, exc=sys.exc_info()) try: method = getattr(self.handler, path) method.exposed # dummy access to trigger AttributeError except Exception: - raise self.req.error("You've fallen of the cliff.", 404) + raise self.req.error(message="You've fallen of the cliff.", error=404) self.req.args = args = parse_qs(environ.get('QUERY_STRING', "")) self.req.client = client = self.auth.authenticate(environ, args) if not client: - raise self.req.error("I'm watching. Authenticate.", 403) + raise self.req.error(message="I'm watching. Authenticate.", error=403) try: events = json.loads(injson) if injson else None except Exception as e: - raise self.req.error("Deserialization error.", 400, - sys.exc_info(), detail={"args": injson, "parser": str(e)}) + raise self.req.error(message="Deserialization error.", error=400, + exc=sys.exc_info(), args=injson, parser=str(e)) if events: args["events"] = events auth = self.auth.authorize(self.req.env, self.req.client, self.req.path, method) if not auth: - raise self.req.error("I'm watching. Not authorized.", 403, detail={"client": client.identity}) + raise self.req.error(message="I'm watching. Not authorized.", error=403, client=client.identity) # These args are not for handler args.pop("client", None) @@ -674,33 +756,30 @@ class Server(ObjectReq): # which could (although shouldn't) appear in handler code output = json.dumps(result, default=lambda v: str(v)) except Exception as e: - raise self.req.error("Serialization error", 500, - sys.exc_info(), detail={"args": str(result)}) + raise self.req.error(message="Serialization error", error=500, + exc=sys.exc_info(), args=str(result)) except Error as e: exception = e except Exception as e: - exception = self.req.error("Server exception", 500, sys.exc_info()) + exception = self.req.error(message="Server exception", error=500, exc=sys.exc_info()) if exception: - status = "%d %s" % (exception.error, exception.message) - result = exception.to_dict() - try: - output = json.dumps(result, default=lambda v: str(v)) - except Exception as e: - # Here all bets are off, generate at least sane output - output = '{"error": %d, "message": "%s"}' % ( - exception.error, exception.message) - - logging.error(str(exception)) - i = exception.info_str() - if i: - logging.info(i) - d = exception.debug_str() - if d: - logging.debug(d) - + status = "%d %s" % exception.get_http_err_msg() + output = json.dumps(exception.to_dict(), default=lambda v: str(v)) + exception.log(logging.getLogger()) + + # Make sure everything is properly encoded - JSON and various function + # may spit out unicode instead of str and it gets propagated up (str + # + unicode = unicode). However, the right thing would be to be unicode + # correct among whole source and always decode on input (json module + # does that for us) and on output here. + if isinstance(status, unicode): + status = status.encode("utf-8") + if isinstance(output, unicode): + output = output.encode("utf-8") headers.append(('Content-Length', str(len(output)))) + logging.debug("wsgi status %s, headers %s" % (status, headers)) start_response(status, headers) self.req.reset() return [output] @@ -809,56 +888,65 @@ class WardenHandler(ObjectReq): return res - def checkNode(self, event, identity): + def check_node(self, event, identity): try: ev_id = event['Node'][0]['Name'].lower() except (KeyError, TypeError): # Event does not bear valid Node attribute - return ["Event does not bear valid Node attribute"] + return [{"error": 422, message: "Event does not bear valid Node attribute"}] if ev_id != identity: - return ["Node does not correspond with saving client"] + return [{"error": 422, message: "Node does not correspond with saving client"}] return [] + def add_event_num(self, errlist, i): + for err in errlist: + err.setdefault("events", []).append(i) + return errlist + + @expose(write=1) def sendEvents(self, events=[]): if not isinstance(events, list): - raise self.req.error("List of events expected.", 400) + raise self.req.error(message="List of events expected.", error=400) + errs = [] if len(events)>self.send_events_limit: - raise self.req.error("Too much events in one batch.", 413, - detail={"limit": self.send_events_limit}) + errs.append({"error": 413, "message": "Too much events in one batch.", + "events": range(self.send_events_limit, len(events)), + "send_events_limit": self.send_events_limit}) saved = 0 - errs = {} - for i, event in enumerate(events): + for i, event in enumerate(events[0:self.send_events_limit]): v_errs = self.validator.check(event) if v_errs: - errs[i] = v_errs + errs.extend(self.add_event_num(v_errs, i)) continue - node_errs = self.checkNode(event, self.req.client.identity) + node_errs = self.check_node(event, self.req.client.identity) if node_errs: - errs[i] = node_errs + errs.extend(self.add_event_num(node_errs, i)) continue if self.req.client.test and not 'Test' in event.get('Category', []): - errs[i] = ["You're allowed to send only messages, containing \"Test\" among categories."] + errs.append({"error": 422, "events": [i], + "message": "You're allowed to send only messages, containing \"Test\" among categories.", + "categories": event.get('Category', [])}) continue db_errs = self.db.store_event(self.req.client, event) if db_errs: - errs[i] = db_errs + errs.extend(self.add_event_num(db_errs, i)) continue saved += 1 logging.info("Saved %i events" % saved) if errs: - raise self.req.error("Errors saving some messages.", 422, - detail={"errors": errs}) + raise self.req.error(errors=errs) + + return {} - return saved def read_ini(path): @@ -866,7 +954,7 @@ def read_ini(path): res = c.read(path) if not res or not path in res: # We don't have loggin yet, hopefully this will go into webserver log - raise Error("Unable to read config: %s" % path) + raise Error(message="Unable to read config: %s" % path) data = {} for sect in c.sections(): for opts in c.options(sect): @@ -900,7 +988,7 @@ def fallback_wsgi(environ, start_response, exc_info=None): logline = "Error(%d): %s" % (error, message) status = "%d %s" % (error, message) - output = '{"error": %d, "message": "%s"}' % ( + output = '{"errors": [{"error": %d, "message": "%s"}]}' % ( error, message) logging.critical(logline) @@ -980,6 +1068,8 @@ def build_server(conf): "password": {"type": str, "default": ""}, "dbname": {"type": str, "default": "warden3"}, "port": {"type": natural, "default": 3306}, + "retry_pause": {"type": natural, "default": 5}, + "retry_count": {"type": natural, "default": 3}, "catmap_filename": {"type": filepath, "default": path.join(path.dirname(__file__), "catmap_mysql.json")}, "tagmap_filename": {"type": filepath, "default": path.join(path.dirname(__file__), "tagmap_mysql.json")} },