Skip to content
Snippets Groups Projects
Commit 21989021 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Reworked error handling in server, client and filer (again :( ). We definitely...

Reworked error handling in server, client and filer (again :( ). We definitely have to support multiple errors so let's do it right.  Error now contains list of errors.  Got rid of "detail" section everything now goes directly within particular error.  Error can bear "events" attribute for list of sent events concerned (or none if all sent events erred). Streamlined error logging.
Server now retries if MySQL went away.
Response result now gets properly encoded if Unicode appears somehow (however that's just fighting symptoms, should get more thorough fix).
parent 34cb2199
No related branches found
No related tags found
No related merge requests found
......@@ -221,26 +221,27 @@ def sender(config, wclient, sdir, oneshot):
count_ok = count_err = count_retry = 0
if isinstance(res, Error):
for e in res.errors:
# list of events with this error (none means all events)
evlist = e.get("events", xrange(len(nf_sent)))
# error number
error = e.get("error", 0)
for idx in evlist:
try:
errs = res.detail["errors"]
except (KeyError, AttributeError, TypeError):
errs = None
if errs:
# Event errors - move bad events into "errors"
for e in errs.iterkeys():
try:
idx = int(e)
idx_n = int(idx)
# 4xx errors are permanent, 5xx are server ones, suitable for retry
dest_dir = sdir.errors if 400 <= error < 500 else sdir.incoming
except ValueError:
# Cannot grok event number, skip
continue
nf_sent[idx].moveto(sdir.errors)
nf_sent[idx] = None
if nf_sent[idx_n]:
nf_sent[idx_n].moveto(dest_dir)
nf_sent[idx_n] = None
if dest_dir == sdir.errors:
count_err += 1
else:
# Global errors - move all events back to "incoming" for attempt in next round
for idx in range(len(nf_sent)):
nf_sent[idx].moveto(sdir.incoming)
nf_sent[idx] = None
count_retry += 1
# Cleanup rest - succesfully sent events
for name in nf_sent:
if name:
......
......@@ -56,24 +56,32 @@ class Error(Exception):
Also, it can be raised as an exception.
"""
def __init__(self, message, logger=None, error=None, prio="error", method=None,
req_id=None, detail=None, exc=None):
self.message = message
self.error = error
self.method = method
self.req_id = req_id
self.detail = detail
(self.exctype, self.excval, self.exctb) = exc or (None, None, None)
self.cause = self.excval # compatibility with other exceptions
def __init__(self, logger=None, prio=logging.ERROR, method=None, req_id=None,
exc=None, errors=None, **kwargs):
self.errors = []
if errors:
self.extend(method, req_id, errors)
if kwargs:
self.append(method, req_id, **kwargs)
if logger:
getattr(logger, prio, "error")(str(self))
info = self.info_str()
if info:
logger.info(info)
debug = self.debug_str()
if debug:
logger.debug(debug)
log(logger, prio)
def append(self, method=None, req_id=None, **kwargs):
# We shift method and req_id into each and every error, because
# we want to be able to simply merge more Error arrays (for
# returning errors from more Warden calls at once
if method and not "method" in kwargs:
kwargs["method"] = method
if req_id and not "req_id" in kwargs:
kwargs["req_id"] = req_id
self.errors.append(kwargs)
def extend(self, method=None, req_id=None, iterable=[]):
for e in iterable:
self.append(method, req_id, **e)
def __len__ (self):
......@@ -97,29 +105,56 @@ class Error(Exception):
def __str__(self):
return "\n".join(self.str_err(e) for e in self.errors)
def log(self, logger, prio=logging.ERROR):
for e in self.errors:
logger.log(prio, self.str_err(e))
info = self.str_info(e)
if info:
logger.info(info)
debug = self.str_debug(e)
if debug:
logger.debug(debug)
def str_preamble(self, e):
return "%08x/%s" % (e.get("req_id", 0), e.get("method", "?"))
def str_err(self, e):
out = []
out.append("(%s)" % (self.error or "local"))
if self.method is not None:
out.append(" in %s" % self.method)
if self.req_id is not None:
out.append("(%08x)" % self.req_id)
if self.message is not None:
out.append(": %s" % self.message)
if self.excval is not None:
out.append(" - cause was %s: %s" % (type(self.excval).__name__, str(self.excval)))
out.append(self.str_preamble(e))
out.append(" Error(%s) %s " % (e.get("error", 0), e.get("message", "Unknown error")))
if "exc" in e and e["exc"]:
out.append("(cause was %s: %s)" % (e["exc"][0].__name__, str(e["exc"][1])))
return "".join(out)
def info_str(self):
return ("Detail: %s" % pformat(self.detail)) or ""
def str_info(self, e):
ecopy = dict(e) # shallow copy
ecopy.pop("req_id", None)
ecopy.pop("method", None)
ecopy.pop("error", None)
ecopy.pop("message", None)
ecopy.pop("exc", None)
if ecopy:
out = "%s Detail: %s" % (self.str_preamble(e), json.dumps(ecopy, default=lambda v: str(v)))
else:
out = ""
return out
def debug_str(self):
def str_debug(self, e):
out = []
if self.excval is not None:
out.append("Exception %s: %s\n" % (type(self.excval).__name__, str(self.excval)))
if self.exctb is not None:
out.append("Traceback:\n%s" % "".join(format_tb(self.exctb)))
out.append(self.str_preamble(e))
if not "exc" in e or not e["exc"]:
return ""
exc_tb = e["exc"][2]
if exc_tb:
out.append("Traceback:\n")
out.extend(format_tb(exc_tb))
return "".join(out)
......@@ -178,7 +213,7 @@ class Client(object):
self.logger.warning("Unknown syslog facility \"%s\", using \"local7\"" % fac)
return logging.handlers.SysLogHandler.LOG_LOCAL7
form = "%(filename)s[%(process)d]: (%(levelname)s) %(name)s %(message)s"
form = "%(filename)s[%(process)d]: %(name)s (%(levelname)s) %(message)s"
format_notime = logging.Formatter(form)
format_time = logging.Formatter('%(asctime)s ' + form)
......@@ -189,7 +224,7 @@ class Client(object):
if errlog is not None:
el = logging.StreamHandler(stderr)
el.setFormatter(format_time)
el.setLevel(loglevel(errlog.get("level", "debug")))
el.setLevel(loglevel(errlog.get("level", "info")))
self.logger.addHandler(el)
if filelog is not None:
......@@ -198,22 +233,22 @@ class Client(object):
filename=path.join(
path.dirname(__file__),
filelog.get("file", "%s.log" % self.name)))
fl.setLevel(loglevel(filelog.get("level", "warning")))
fl.setLevel(loglevel(filelog.get("level", "debug")))
fl.setFormatter(format_time)
self.logger.addHandler(fl)
except Exception as e:
Error("Unable to setup file logging", self.logger, exc=exc_info())
Error(self.logger, message="Unable to setup file logging", exc=exc_info())
if syslog is not None:
try:
sl = logging.handlers.SysLogHandler(
address=syslog.get("socket", "/dev/log"),
facility=facility(syslog.get("facility", "local7")))
sl.setLevel(loglevel(syslog.get("level", "warning")))
sl.setLevel(loglevel(syslog.get("level", "debug")))
sl.setFormatter(format_notime)
self.logger.addHandler(sl)
except Exception as e:
Error("Unable to setup syslog logging", self.logger, exc=exc_info())
Error(self.logger, message="Unable to setup syslog logging", exc=exc_info())
if not (errlog or filelog or syslog):
# User wants explicitly no logging, so let him shoot his socks off.
......@@ -241,10 +276,10 @@ class Client(object):
strict = False,
timeout = self.timeout)
else:
return Error("Don't know how to connect to \"%s\"" % self.url.scheme, self.logger,
return Error(self.logger, message="Don't know how to connect to \"%s\"" % self.url.scheme,
detail={"url": self.url.geturl()})
except Exception:
return Error("HTTPS connection failed", self.logger, exc=exc_info(),
return Error(self.logger, message="HTTPS connection failed", exc=exc_info(),
detail={
"url": self.url.geturl(),
"timeout": self.timeout,
......@@ -278,7 +313,7 @@ class Client(object):
else:
data = json.dumps(payload)
except:
return Error("Serialization to JSON failed", self.logger,
return Error(self.logger, message="Serialization to JSON failed",
exc=exc_info(), method=func, detail=payload)
self.headers = {
......@@ -297,7 +332,7 @@ class Client(object):
conn.request("POST", loc, data, self.headers)
except:
conn.close()
return Error("Sending of request to server failed", self.logger,
return Error(self.logger, message="Sending of request to server failed",
exc=exc_info(), method=func, detail={
"loc": loc,
"headers": self.headers,
......@@ -307,7 +342,7 @@ class Client(object):
res = conn.getresponse()
except:
conn.close()
return Error("HTTP reply failed", self.logger, method=func, exc=exc_info(), detail={
return Error(self.logger, method=func, message="HTTP reply failed", exc=exc_info(), detail={
"loc": loc,
"headers": self.headers,
"data": data})
......@@ -316,7 +351,7 @@ class Client(object):
response_data = res.read()
except:
conn.close()
return Error("Fetching HTTP data from server failed", self.logger, method=func, exc=exc_info(), detail={
return Error(self.logger, method=func, message="Fetching HTTP data from server failed", exc=exc_info(), detail={
"loc": loc,
"headers": self.headers,
"data": data})
......@@ -327,24 +362,23 @@ class Client(object):
try:
data = json.loads(response_data)
except:
data = Error("JSON message parsing failed", self.logger,
data = Error(self.logger, message="JSON message parsing failed",
exc=exc_info(), method=func, detail={"response": response_data})
else:
try:
data = json.loads(response_data)
data["error"] # trigger exception if not dict or no error key
data["errors"] # trigger exception if not dict or no error key
except:
data = Error("Generic server HTTP error", self.logger,
data = Error(self.logger, message="Generic server HTTP error",
method=func,
error=res.status,
exc=exc_info(),
detail={"response": response_data})
else:
data = Error(data.get("message", None), self.logger,
data = Error(self.logger,
method=data.get("method", None),
error=res.status,
req_id=data.get("req_id", None),
detail=data.get("detail", None))
errors=data.get("errors", []))
return data
......@@ -358,8 +392,8 @@ class Client(object):
f.write(str(id))
except (ValueError, IOError) as e:
# Use Error instance just for proper logging
Error("Writing id file \"%s\" failed" % idf, self.logger,
prio="info", exc=exc_info(), detail={"idstore": idf})
Error(self.logger, message="Writing id file \"%s\" failed" % idf,
prio=logging.INFO, exc=exc_info(), detail={"idstore": idf})
return id
......@@ -371,8 +405,9 @@ class Client(object):
with open(idf, "r") as f:
id = int(f.read())
except (ValueError, IOError) as e:
Error("Reading id file \"%s\" failed, relying on server" % idf,
self.logger, prio="info", exc=exc_info(), detail={"idstore": idf})
Error(self.logger, prio=logging.INFO,
message="Reading id file \"%s\" failed, relying on server" % idf,
exc=exc_info(), detail={"idstore": idf})
id = None
return id
......@@ -409,7 +444,7 @@ class Client(object):
events = res["events"]
newid = res["lastid"]
except KeyError:
return Error("Server returned bogus reply", self.logger,
return Error(self.logger, message="Server returned bogus reply",
method="getEvents", exc=exc_info(), detail={"response": res})
self._saveID(newid)
......
......@@ -16,7 +16,7 @@ import MySQLdb as my
import MySQLdb.cursors as mycursors
from collections import namedtuple
from uuid import uuid4
from time import time, gmtime
from time import time, gmtime, sleep
from math import trunc
from io import BytesIO
from urlparse import parse_qs
......@@ -31,57 +31,99 @@ from jsonschema import Draft4Validator, FormatChecker
VERSION = "3.0-not-even-alpha"
class Error(Exception):
def __init__(self, message, error=500, exc=None,
method=None, req_id=None, detail=None):
self.message = message
self.error = int(error)
(self.exctype, self.excval, self.exctb) = exc or (None, None, None)
self.cause = self.excval # compatibility with other exceptions
def __init__(self, method=None, req_id=None, errors=None, **kwargs):
self.method = method
self.req_id = req_id
self.detail = detail
self.errors = [kwargs] if kwargs else []
if errors:
self.errors.extend(errors)
def append(self, **kwargs):
self.errors.append(kwargs)
def get_http_err_msg(self):
try:
err = self.errors[0]["error"]
msg = self.errors[0]["message"]
except (IndexError, KeyError):
err = 500
msg = "There's NO self-destruction button! Ah, you've just found it..."
for e in self.errors:
next_err = e.get("error", 500)
if err != next_err:
# errors not same, round to basic err code (400, 500)
# and use the highest one
err = max(err//100, next_err//100)*100
next_msg = e.get("message", "Unknown error")
if msg != next_msg:
msg = "Multiple errors"
logging.debug("get_http_err_msg: %s, msg: %s, %s" % (err, msg, type(msg)))
return err, msg
def __str__(self):
return "\n".join(self.str_err(e) for e in self.errors)
def log(self, logger, prio=logging.ERROR):
for e in self.errors:
logger.log(prio, self.str_err(e))
info = self.str_info(e)
if info:
logger.info(info)
debug = self.str_debug(e)
if debug:
logger.debug(debug)
def str_err(self, e):
out = []
out.append("Error(%s)" % (self.error))
if self.method is not None:
out.append(" in \"%s\"" % self.method)
if self.message is not None:
out.append(": %s" % self.message)
if self.excval is not None:
out.append(" - cause was %s: %s" % (type(self.excval).__name__, str(self.excval)))
out.append("Error(%s) %s " % (e.get("error", 0), e.get("message", "Unknown error")))
if "exc" in e and e["exc"]:
out.append("(cause was %s: %s)" % (e["exc"][0].__name__, str(e["exc"][1])))
return "".join(out)
def info_str(self):
return ("Detail: %s" % self.detail) if self.detail else ""
def str_info(self, e):
ecopy = dict(e) # shallow copy
ecopy.pop("req_id", None)
ecopy.pop("method", None)
ecopy.pop("error", None)
ecopy.pop("message", None)
ecopy.pop("exc", None)
if ecopy:
out = "Detail: %s" % (json.dumps(ecopy, default=lambda v: str(v)))
else:
out = ""
return out
def debug_str(self):
def str_debug(self, e):
out = []
if self.excval is not None:
out.append("Exception %s: %s\n" % (type(self.excval).__name__, str(self.excval)))
if self.exctb is not None:
out.append("Traceback:\n%s" % "".join(format_tb(self.exctb)))
if not "exc" in e or not e["exc"]:
return ""
exc_tb = e["exc"][2]
if exc_tb:
out.append("Traceback:\n")
out.extend(format_tb(exc_tb))
return "".join(out)
def to_dict(self):
d = {}
if self.error is not None:
d["error"] = self.error
if self.method is not None:
d["method"] = self.method
if self.message is not None:
d["message"] = self.message
if self.detail is not None:
d["detail"] = self.detail
if self.req_id is not None:
d["req_id"] = self.req_id
errlist = []
for e in self.errors:
ecopy = dict(e)
ecopy.pop("exc", None)
errlist.append(ecopy)
d = {
"method": self.method,
"req_id": self.req_id,
"errors": errlist
}
return d
......@@ -226,8 +268,8 @@ class Request(Object):
self.req_id = 0 if env is None else randint(0x00000000, 0xFFFFFFFF)
def error(self, message, error=500, exc=None, detail=None):
return Error(message, error, exc, self.path, self.req_id, detail=detail)
def error(self, errors=None, **kwargs):
return Error(self.path, self.req_id, errors=errors, **kwargs)
......@@ -370,11 +412,11 @@ class JSONSchemaValidator(NoValidator):
res = []
for error in sorted(self.validator.iter_errors(event), key=sortkey):
res.append(
"Validation error: key \"%s\", value \"%s\", expected - %s" % (
u"/".join(str(v) for v in error.path),
res.append({"error": 460,
"message": "Validation error: key \"%s\", value \"%s\", expected - %s" % (
"/".join(str(v) for v in error.path),
error.instance,
error.schema.get('description', 'no additional info')))
error.schema.get('description', 'no additional info'))})
return res
......@@ -382,13 +424,15 @@ class JSONSchemaValidator(NoValidator):
class MySQL(ObjectReq):
def __init__(self, req, host, user, password, dbname, port, catmap_filename, tagmap_filename):
def __init__(self, req, host, user, password, dbname, port, retry_count, retry_pause, catmap_filename, tagmap_filename):
ObjectReq.__init__(self, req)
self.host = host
self.user = user
self.password = password
self.dbname = dbname
self.port = port
self.retry_count = retry_count
self.retry_pause = retry_pause
self.catmap_filename = catmap_filename
self.tagmap_filename = tagmap_filename
......@@ -400,14 +444,52 @@ class MySQL(ObjectReq):
self.tagmap = json.load(tagmap_fd)
self.tagmap_other = self.catmap["Other"] # Catch error soon, avoid lookup later
self.con = self.crs = None
self.connect()
def __str__(self):
return "%s(req=%s, host='%s', user='%s', dbname='%s', port=%d, retry_count=%d, retry_pause=%d, catmap_filename=\"%s\", tagmap_filename=\"%s\")" % (
type(self).__name__, type(self.req).__name__, self.host, self.user, self.dbname, self.port, self.retry_count, self.retry_pause, self.catmap_filename, self.tagmap_filename)
def connect(self):
self.con = my.connect(host=self.host, user=self.user, passwd=self.password,
db=self.dbname, port=self.port, cursorclass=mycursors.DictCursor)
self.crs = self.con.cursor()
def __str__(self):
return "%s(req=%s, host='%s', user='%s', dbname='%s', port=%d, catmap_filename=\"%s\", tagmap_filename=\"%s\")" % (
type(self).__name__, type(self.req).__name__, self.host, self.user, self.dbname, self.port, self.catmap_filename, self.tagmap_filename)
def close(self):
try:
if self.crs:
self.crs.close()
if self.con:
self.con.close()
except Exception:
pass
__del__ = close
def execute(self, *args, **kwargs):
""" Execute query on self.con, reconnecting if necessary """
success = False
countdown = self.retry_count
while not success:
try:
self.crs.execute(*args, **kwargs)
success = True
except my.OperationalError:
if not countdown:
raise
logging.info("execute: Database down, trying to reconnect (%d attempts left)..." % countdown)
if countdown<self.retry_count:
sleep(self.retry_pause) # no need to melt down server on longer outage
self.close()
self.connect()
countdown -= 1
def _get_comma_perc(self, l):
......@@ -429,7 +511,7 @@ class MySQL(ObjectReq):
params.append(secret)
query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
params.extend(cert_names)
self.crs.execute("".join(query), params)
self.execute("".join(query), params)
rows = self.crs.fetchall()
if len(rows)>1:
......@@ -441,9 +523,9 @@ class MySQL(ObjectReq):
def get_debug(self):
self.crs.execute("SELECT VERSION() AS VER")
self.execute("SELECT VERSION() AS VER")
row = self.crs.fetchone()
self.crs.execute("SHOW TABLE STATUS")
self.execute("SHOW TABLE STATUS")
tablestat = self.crs.fetchall()
return {
"db": "MySQL",
......@@ -458,8 +540,8 @@ class MySQL(ObjectReq):
try:
mapped = section[v]
except KeyError:
raise self.req.error("Wrong tag or category used in query.", 422,
sys.exc_info(), detail={"key": v})
raise self.req.error(message="Wrong tag or category used in query.", error=422,
exc=sys.exc_info(), key=v)
maps.append(mapped)
return set(maps) # unique
......@@ -472,14 +554,14 @@ class MySQL(ObjectReq):
logging.debug("fetch_events: id=%i, count=%i, cat=%s, nocat=%s, tag=%s, notag=%s, group=%s, nogroup=%s" % (id, count, str(cat), str(nocat), str(tag), str(notag), str(group), str(nogroup)))
if cat and nocat:
raise self.req.error("Unrealizable conditions. Choose cat or nocat option.", 422,
detail={'cat': cat, 'nocat' : nocat})
raise self.req.error(message="Unrealizable conditions. Choose cat or nocat option.", error=422,
cat=cat, nocat=nocat)
if tag and notag:
raise self.req.error("Unrealizable conditions. Choose tag or notag option.", 422,
detail={'tag': cat, 'notag' : nocat})
raise self.req.error(message="Unrealizable conditions. Choose tag or notag option.", error=422,
tag=tag, notag=notag)
if group and nogroup:
raise self.req.error("Unrealizable conditions. Choose group or nogroup option.", 422,
detail={'tag': cat, 'notag' : nocat})
raise self.req.error(message="Unrealizable conditions. Choose group or nogroup option.", error=422,
group=group, nogroup=nogroup)
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
params = [id or 0]
......@@ -515,7 +597,7 @@ class MySQL(ObjectReq):
logging.debug("fetch_events: query - %s" % query_string)
logging.debug("fetch_events: params - %s", str(params))
self.crs.execute(query_string, params)
self.execute(query_string, params)
row = self.crs.fetchall()
if row:
......@@ -533,14 +615,14 @@ class MySQL(ObjectReq):
def store_event(self, client, event):
try:
self.crs.execute("INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)", (client.id, json.dumps(event)))
self.execute("INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)", (client.id, json.dumps(event)))
lastid = self.crs.lastrowid
catlist = event.get('Category', ["Other"])
cats = set(catlist) | set(cat.split(".", 1)[0] for cat in catlist)
for cat in cats:
cat_id = self.catmap.get(cat, self.catmap_other)
self.crs.execute("INSERT INTO event_category_mapping (event_id,category_id) VALUES (%s, %s)", (lastid, cat_id))
self.execute("INSERT INTO event_category_mapping (event_id,category_id) VALUES (%s, %s)", (lastid, cat_id))
try:
tags = event['Node'][0]['Tags']
......@@ -549,28 +631,28 @@ class MySQL(ObjectReq):
for tag in tags:
tag_id = self.tagmap.get(tag, self.tagmap_other)
self.crs.execute("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES (%s, %s)", (lastid, tag_id))
self.execute("INSERT INTO event_tag_mapping (event_id,tag_id) VALUES (%s, %s)", (lastid, tag_id))
self.con.commit()
return []
except Exception as e:
self.con.rollback()
return [type(e).__name__ + ": " + str(e)]
return [{"error": 500, "message": type(e).__name__ + ": " + str(e)}]
def insertLastReceivedId(self, client, id):
logging.debug("insertLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname))
self.crs.execute("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())", (client.id, id))
self.execute("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())", (client.id, id))
self.con.commit()
def getLastEventId(self):
self.crs.execute("SELECT MAX(id) as id FROM events")
self.execute("SELECT MAX(id) as id FROM events")
row = self.crs.fetchone()
return row['id'] or 0
def getLastReceivedId(self, client):
self.crs.execute("SELECT MAX(event_id) as id FROM last_events WHERE client_id = %s", client.id)
self.execute("SELECT MAX(event_id) as id FROM last_events WHERE client_id = %s", client.id)
row = self.crs.fetchone()
id = row['id'] if row is not None else 0
......@@ -636,31 +718,31 @@ class Server(ObjectReq):
try:
injson = environ['wsgi.input'].read()
except:
raise self.req.error("Data read error.", 408, sys.exc_info())
raise self.req.error(message="Data read error.", error=408, exc=sys.exc_info())
try:
method = getattr(self.handler, path)
method.exposed # dummy access to trigger AttributeError
except Exception:
raise self.req.error("You've fallen of the cliff.", 404)
raise self.req.error(message="You've fallen of the cliff.", error=404)
self.req.args = args = parse_qs(environ.get('QUERY_STRING', ""))
self.req.client = client = self.auth.authenticate(environ, args)
if not client:
raise self.req.error("I'm watching. Authenticate.", 403)
raise self.req.error(message="I'm watching. Authenticate.", error=403)
try:
events = json.loads(injson) if injson else None
except Exception as e:
raise self.req.error("Deserialization error.", 400,
sys.exc_info(), detail={"args": injson, "parser": str(e)})
raise self.req.error(message="Deserialization error.", error=400,
exc=sys.exc_info(), args=injson, parser=str(e))
if events:
args["events"] = events
auth = self.auth.authorize(self.req.env, self.req.client, self.req.path, method)
if not auth:
raise self.req.error("I'm watching. Not authorized.", 403, detail={"client": client.identity})
raise self.req.error(message="I'm watching. Not authorized.", error=403, client=client.identity)
# These args are not for handler
args.pop("client", None)
......@@ -674,33 +756,30 @@ class Server(ObjectReq):
# which could (although shouldn't) appear in handler code
output = json.dumps(result, default=lambda v: str(v))
except Exception as e:
raise self.req.error("Serialization error", 500,
sys.exc_info(), detail={"args": str(result)})
raise self.req.error(message="Serialization error", error=500,
exc=sys.exc_info(), args=str(result))
except Error as e:
exception = e
except Exception as e:
exception = self.req.error("Server exception", 500, sys.exc_info())
exception = self.req.error(message="Server exception", error=500, exc=sys.exc_info())
if exception:
status = "%d %s" % (exception.error, exception.message)
result = exception.to_dict()
try:
output = json.dumps(result, default=lambda v: str(v))
except Exception as e:
# Here all bets are off, generate at least sane output
output = '{"error": %d, "message": "%s"}' % (
exception.error, exception.message)
logging.error(str(exception))
i = exception.info_str()
if i:
logging.info(i)
d = exception.debug_str()
if d:
logging.debug(d)
status = "%d %s" % exception.get_http_err_msg()
output = json.dumps(exception.to_dict(), default=lambda v: str(v))
exception.log(logging.getLogger())
# Make sure everything is properly encoded - JSON and various function
# may spit out unicode instead of str and it gets propagated up (str
# + unicode = unicode). However, the right thing would be to be unicode
# correct among whole source and always decode on input (json module
# does that for us) and on output here.
if isinstance(status, unicode):
status = status.encode("utf-8")
if isinstance(output, unicode):
output = output.encode("utf-8")
headers.append(('Content-Length', str(len(output))))
logging.debug("wsgi status %s, headers %s" % (status, headers))
start_response(status, headers)
self.req.reset()
return [output]
......@@ -809,56 +888,65 @@ class WardenHandler(ObjectReq):
return res
def checkNode(self, event, identity):
def check_node(self, event, identity):
try:
ev_id = event['Node'][0]['Name'].lower()
except (KeyError, TypeError):
# Event does not bear valid Node attribute
return ["Event does not bear valid Node attribute"]
return [{"error": 422, message: "Event does not bear valid Node attribute"}]
if ev_id != identity:
return ["Node does not correspond with saving client"]
return [{"error": 422, message: "Node does not correspond with saving client"}]
return []
def add_event_num(self, errlist, i):
for err in errlist:
err.setdefault("events", []).append(i)
return errlist
@expose(write=1)
def sendEvents(self, events=[]):
if not isinstance(events, list):
raise self.req.error("List of events expected.", 400)
raise self.req.error(message="List of events expected.", error=400)
errs = []
if len(events)>self.send_events_limit:
raise self.req.error("Too much events in one batch.", 413,
detail={"limit": self.send_events_limit})
errs.append({"error": 413, "message": "Too much events in one batch.",
"events": range(self.send_events_limit, len(events)),
"send_events_limit": self.send_events_limit})
saved = 0
errs = {}
for i, event in enumerate(events):
for i, event in enumerate(events[0:self.send_events_limit]):
v_errs = self.validator.check(event)
if v_errs:
errs[i] = v_errs
errs.extend(self.add_event_num(v_errs, i))
continue
node_errs = self.checkNode(event, self.req.client.identity)
node_errs = self.check_node(event, self.req.client.identity)
if node_errs:
errs[i] = node_errs
errs.extend(self.add_event_num(node_errs, i))
continue
if self.req.client.test and not 'Test' in event.get('Category', []):
errs[i] = ["You're allowed to send only messages, containing \"Test\" among categories."]
errs.append({"error": 422, "events": [i],
"message": "You're allowed to send only messages, containing \"Test\" among categories.",
"categories": event.get('Category', [])})
continue
db_errs = self.db.store_event(self.req.client, event)
if db_errs:
errs[i] = db_errs
errs.extend(self.add_event_num(db_errs, i))
continue
saved += 1
logging.info("Saved %i events" % saved)
if errs:
raise self.req.error("Errors saving some messages.", 422,
detail={"errors": errs})
raise self.req.error(errors=errs)
return {}
return saved
def read_ini(path):
......@@ -866,7 +954,7 @@ def read_ini(path):
res = c.read(path)
if not res or not path in res:
# We don't have loggin yet, hopefully this will go into webserver log
raise Error("Unable to read config: %s" % path)
raise Error(message="Unable to read config: %s" % path)
data = {}
for sect in c.sections():
for opts in c.options(sect):
......@@ -900,7 +988,7 @@ def fallback_wsgi(environ, start_response, exc_info=None):
logline = "Error(%d): %s" % (error, message)
status = "%d %s" % (error, message)
output = '{"error": %d, "message": "%s"}' % (
output = '{"errors": [{"error": %d, "message": "%s"}]}' % (
error, message)
logging.critical(logline)
......@@ -980,6 +1068,8 @@ def build_server(conf):
"password": {"type": str, "default": ""},
"dbname": {"type": str, "default": "warden3"},
"port": {"type": natural, "default": 3306},
"retry_pause": {"type": natural, "default": 5},
"retry_count": {"type": natural, "default": 3},
"catmap_filename": {"type": filepath, "default": path.join(path.dirname(__file__), "catmap_mysql.json")},
"tagmap_filename": {"type": filepath, "default": path.join(path.dirname(__file__), "tagmap_mysql.json")}
},
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment