Skip to content
Snippets Groups Projects
Commit 1198228b authored by Pavel Kácha's avatar Pavel Kácha
Browse files

* warden_server: Added "events_id" key into errors along with "events" - makes...

* warden_server: Added "events_id" key into errors along with "events" - makes error messages bigger, but allows client operators to identify offending messages by stable identifiers based on logs.
* Removed too much internal info from database errors.
* Removed format type checking from Draft4Validator, using only explicit schema regexps - Draft4Validator raised FormatError istead of ValidationError, rendering iter_errors unusable.
* warden_client: Implemented retries on server errors.
* Client now honours send_events_limit from server - too long list of events is split and sent in chunks.
* getInfo and sendEvents update local send_events_limit according to one sent by server in info or error message.
* Logging is now explicit, not authomatic in Error class, allowing to log only single time at top level methods.
* More sane default client name.
* Errors from server are now checked and ensure correct format.
* warden_filer: Ditched error and retry handling, warden_client now does for us.
parent 21989021
No related branches found
No related tags found
No related merge requests found
......@@ -161,18 +161,17 @@ def receiver(config, wclient, sdir, oneshot):
nf.moveto(sdir.incoming)
count_ok += 1
except Exception as e:
Error("Error saving event", wclient.logger, exc=sys.exc_info(),
detail={"file": str(nf), "event_id": event.get("ID"), "sdir": sdir.path})
Error(message="Error saving event", exc=sys.exc_info(), file=str(nf),
event_ids=[event.get("ID")], sdir=sdir.path).log(wclient.logger)
count_err += 1
wclient.logger.info(
"warden_filer: received %d, errors %d"
% (count_ok, count_err))
if oneshot:
events = None
else:
events = wclient.getEvents(**filt)
events = wclient.getEvents(**filt)
count_ok = count_err = 0
if oneshot:
terminate_me(None, None)
if not events:
terminate_me(None, None)
else:
time.sleep(poll_time)
......@@ -192,64 +191,49 @@ def sender(config, wclient, sdir, oneshot):
nflist = sdir.get_incoming()
# count chunk iterations rounded up
count = len(nflist)
for i in range(0, count, send_events_limit):
# process one at most send_events_limit long chunk
events = []
nf_sent = []
for j in range(i, min(i+send_events_limit, count)):
nf = nflist[j]
# prepare event array from files
try:
nf.moveto(sdir.temp)
except Exception:
pass # Silently go to next filename, somebody else might have interfered
try:
with nf.open("rb") as fd:
data = fd.read()
event = json.loads(data)
if node:
nodelist = event.setdefault("Node", [])
nodelist.insert(0, node)
events.append(event)
nf_sent.append(nf)
except Exception as e:
Error("Error loading event", wclient.logger, exc=sys.exc_info(),
detail={"file": str(nf), "sdir": sdir.path})
nf.moveto(sdir.errors)
res = wclient.sendEvents(events)
count_ok = count_err = count_retry = 0
if isinstance(res, Error):
for e in res.errors:
# list of events with this error (none means all events)
evlist = e.get("events", xrange(len(nf_sent)))
# error number
error = e.get("error", 0)
for idx in evlist:
try:
idx_n = int(idx)
# 4xx errors are permanent, 5xx are server ones, suitable for retry
dest_dir = sdir.errors if 400 <= error < 500 else sdir.incoming
except ValueError:
# Cannot grok event number, skip
continue
if nf_sent[idx_n]:
nf_sent[idx_n].moveto(dest_dir)
nf_sent[idx_n] = None
if dest_dir == sdir.errors:
count_err += 1
else:
count_retry += 1
# Cleanup rest - succesfully sent events
for name in nf_sent:
if name:
name.remove()
count_ok += 1
wclient.logger.info(
"warden_filer: saved %d, errors %d, retreated %d"
% (count_ok, count_err, count_retry))
events = []
nf_sent = []
for nf in nflist:
# prepare event array from files
try:
nf.moveto(sdir.temp)
except Exception:
continue # Silently go to next filename, somebody else might have interfered
try:
with nf.open("rb") as fd:
data = fd.read()
event = json.loads(data)
if node:
nodelist = event.setdefault("Node", [])
nodelist.insert(0, node)
events.append(event)
nf_sent.append(nf)
except Exception as e:
Error(message="Error loading event", exc=sys.exc_info(), file=str(nf),
sdir=sdir.path).log(wclient.logger)
nf.moveto(sdir.errors)
res = wclient.sendEvents(events)
count_ok = count_err = count_retry = 0
if isinstance(res, Error):
for e in res.errors:
errno = e["error"]
evlist = e.get("events", xrange(len(nf_sent))) # None means all
for i in evlist:
if nf_sent[i]:
nf_sent[i].moveto(dest_dir)
nf_sent[i] = None
count_err += 1
# Cleanup rest - succesfully sent events
for name in nf_sent:
if name:
name.remove()
count_ok += 1
wclient.logger.info(
"warden_filer: saved %d, errors %d" % (count_ok, count_err))
......@@ -324,7 +308,6 @@ if __name__ == "__main__":
wconfig, fconfig = get_configs()
oneshot = args.oneshot
safe_dir = SafeDir(fconfig.get("dir", args.func))
wclient = Client(**wconfig)
......
......@@ -11,8 +11,12 @@ from sys import stderr, exc_info
from pprint import pformat
from traceback import format_tb
from os import path
from time import sleep
from operator import itemgetter
VERSION = "3.0-not-even-alpha"
class HTTPSConnection(httplib.HTTPSConnection):
'''
Overridden to allow peer certificate validation, configuration
......@@ -56,16 +60,13 @@ class Error(Exception):
Also, it can be raised as an exception.
"""
def __init__(self, logger=None, prio=logging.ERROR, method=None, req_id=None,
exc=None, errors=None, **kwargs):
def __init__(self, method=None, req_id=None, errors=None, **kwargs):
self.errors = []
if errors:
self.extend(method, req_id, errors)
if kwargs:
self.append(method, req_id, **kwargs)
if logger:
log(logger, prio)
def append(self, method=None, req_id=None, **kwargs):
......@@ -76,12 +77,47 @@ class Error(Exception):
kwargs["method"] = method
if req_id and not "req_id" in kwargs:
kwargs["req_id"] = req_id
# Ugly, but be paranoid, don't rely on server reply to be well formed
try:
kwargs["error"] = int(kwargs["error"])
except Exception:
kwargs["error"] = 0
if "events" in kwargs:
evlist = kwargs["events"]
try:
evlist_new = []
for ev in evlist:
try:
evlist_new.append(int(ev))
except Exception:
pass
kwargs["events"] = evlist_new
except Exception:
kwargs["events"] = []
if "events_id" in kwargs:
try:
dummy = iter(kwargs["events_id"])
except TypeError:
kwargs["events_id"] = [None]*len(kwargs["events"])
if "send_events_limit" in kwargs:
try:
kwargs["send_events_limit"] = int(kwargs["send_events_limit"])
except Exception:
del kwargs["send_events_limit"]
self.errors.append(kwargs)
def extend(self, method=None, req_id=None, iterable=[]):
try:
dummy = iter(iterable)
except TypeError:
iterable = [] # Bad joke from server
for e in iterable:
self.append(method, req_id, **e)
try:
args = dict(e)
except TypeError:
args = {} # Not funny!
self.append(method, req_id, **args)
def __len__ (self):
......@@ -105,10 +141,16 @@ class Error(Exception):
def __str__(self):
return "\n".join(self.str_err(e) for e in self.errors)
out = []
for e in self.errors:
out.append(self.str_err(e))
out.append(self.str_info(e))
return "\n".join(out)
def log(self, logger, prio=logging.ERROR):
def log(self, logger=None, prio=logging.ERROR):
if not logger:
logger = logging.getLogger()
for e in self.errors:
logger.log(prio, self.str_err(e))
info = self.str_info(e)
......@@ -167,12 +209,15 @@ class Client(object):
keyfile=None,
cafile=None,
timeout=60,
retry=3,
pause=5,
recv_events_limit=6000,
errlog={"level": "debug"},
send_events_limit=500,
errlog={},
syslog=None,
filelog=None,
idstore=None,
name="warden_client",
name="org.example.warden.test",
secret=None):
self.name = name
......@@ -193,9 +238,15 @@ class Client(object):
self.recv_events_limit = int(recv_events_limit)
self.idstore = path.join(base, idstore) if idstore is not None else None
self.send_events_limit = int(send_events_limit)
self.retry = int(retry)
self.pause = int(pause)
self.ciphers = 'TLS_RSA_WITH_AES_256_CBC_SHA'
self.sslversion = ssl.PROTOCOL_TLSv1
self.getInfo() # Call to align limits with server opinion
def init_log(self, errlog, syslog, filelog):
......@@ -237,7 +288,7 @@ class Client(object):
fl.setFormatter(format_time)
self.logger.addHandler(fl)
except Exception as e:
Error(self.logger, message="Unable to setup file logging", exc=exc_info())
Error(message="Unable to setup file logging", exc=exc_info()).log(self.logger)
if syslog is not None:
try:
......@@ -248,7 +299,7 @@ class Client(object):
sl.setFormatter(format_notime)
self.logger.addHandler(sl)
except Exception as e:
Error(self.logger, message="Unable to setup syslog logging", exc=exc_info())
Error(message="Unable to setup syslog logging", exc=exc_info()).log(self.logger)
if not (errlog or filelog or syslog):
# User wants explicitly no logging, so let him shoot his socks off.
......@@ -257,6 +308,12 @@ class Client(object):
self.logger.addHandler(logging.NullHandler())
def log_err(self, err, prio=logging.ERROR):
if isinstance(err, Error):
err.log(self.logger, prio)
return err
def connect(self):
try:
......@@ -276,18 +333,17 @@ class Client(object):
strict = False,
timeout = self.timeout)
else:
return Error(self.logger, message="Don't know how to connect to \"%s\"" % self.url.scheme,
detail={"url": self.url.geturl()})
return Error(message="Don't know how to connect to \"%s\"" % self.url.scheme,
url=self.url.geturl())
except Exception:
return Error(self.logger, message="HTTPS connection failed", exc=exc_info(),
detail={
"url": self.url.geturl(),
"timeout": self.timeout,
"key_file": self.keyfile,
"cert_file": self.certfile,
"cafile": self.cafile,
"ciphers": self.ciphers,
"ssl_version": self.sslversion})
return Error(message="HTTP(S) connection failed", exc=exc_info(),
url=self.url.geturl(),
timeout=self.timeout,
key_file=self.keyfile,
cert_file=self.certfile,
cafile=self.cafile,
ciphers=self.ciphers,
ssl_version=self.sslversion)
return conn
......@@ -313,8 +369,8 @@ class Client(object):
else:
data = json.dumps(payload)
except:
return Error(self.logger, message="Serialization to JSON failed",
exc=exc_info(), method=func, detail=payload)
return Error(message="Serialization to JSON failed",
exc=exc_info(), method=func, payload=payload)
self.headers = {
"Content-Type": "application/json",
......@@ -332,29 +388,22 @@ class Client(object):
conn.request("POST", loc, data, self.headers)
except:
conn.close()
return Error(self.logger, message="Sending of request to server failed",
exc=exc_info(), method=func, detail={
"loc": loc,
"headers": self.headers,
"data": data})
return Error(message="Sending of request to server failed",
exc=exc_info(), method=func, log=loc, headers=self.headers, data=data)
try:
res = conn.getresponse()
except:
conn.close()
return Error(self.logger, method=func, message="HTTP reply failed", exc=exc_info(), detail={
"loc": loc,
"headers": self.headers,
"data": data})
return Error(method=func, message="HTTP reply failed",
exc=exc_info(), loc=loc, headers=self.headers, data=data)
try:
response_data = res.read()
except:
conn.close()
return Error(self.logger, method=func, message="Fetching HTTP data from server failed", exc=exc_info(), detail={
"loc": loc,
"headers": self.headers,
"data": data})
return Error(method=func, message="Fetching HTTP data from server failed",
exc=exc_info(), loc=loc, headers=self.headers, data=data)
conn.close()
......@@ -362,20 +411,17 @@ class Client(object):
try:
data = json.loads(response_data)
except:
data = Error(self.logger, message="JSON message parsing failed",
exc=exc_info(), method=func, detail={"response": response_data})
data = Error(method=func, message="JSON message parsing failed",
exc=exc_info(), response=response_data)
else:
try:
data = json.loads(response_data)
data["errors"] # trigger exception if not dict or no error key
except:
data = Error(self.logger, message="Generic server HTTP error",
method=func,
error=res.status,
exc=exc_info(),
detail={"response": response_data})
data = Error(method=func, message="Generic server HTTP error",
error=res.status, exc=exc_info(), response=response_data)
else:
data = Error(self.logger,
data = Error(
method=data.get("method", None),
req_id=data.get("req_id", None),
errors=data.get("errors", []))
......@@ -392,8 +438,8 @@ class Client(object):
f.write(str(id))
except (ValueError, IOError) as e:
# Use Error instance just for proper logging
Error(self.logger, message="Writing id file \"%s\" failed" % idf,
prio=logging.INFO, exc=exc_info(), detail={"idstore": idf})
Error(message="Writing id file \"%s\" failed" % idf, exc=exc_info(),
idstore=idf).log(self.logger, logging.INFO)
return id
......@@ -405,25 +451,96 @@ class Client(object):
with open(idf, "r") as f:
id = int(f.read())
except (ValueError, IOError) as e:
Error(self.logger, prio=logging.INFO,
message="Reading id file \"%s\" failed, relying on server" % idf,
exc=exc_info(), detail={"idstore": idf})
Error(message="Reading id file \"%s\" failed, relying on server" % idf,
exc=exc_info(), idstore=idf).log(self.logger, logging.INFO)
id = None
return id
def getDebug(self):
return self.sendRequest("getDebug")
return self.log_err(self.sendRequest("getDebug"))
def getInfo(self):
return self.sendRequest("getInfo")
res = self.sendRequest("getInfo")
if isinstance(res, Error):
res.log(self.logger)
else:
try:
self.send_events_limit = min(res["send_events_limit"], self.send_events_limit)
self.recv_events_limit = min(res["recv_events_limit"], self.recv_events_limit)
except (AttributeError, TypeError, KeyError):
pass
return res
def sendEvents(self, events=[]):
res = self.sendRequest(
"sendEvents", payload=events)
return res
def send_events_raw(self, events=[]):
return self.sendRequest("sendEvents", payload=events)
def send_events_chunked(self, events=[]):
""" Split potentially long "events" list to send_events_limit
long chunks to avoid slap from server.
"""
count = len(events)
err = Error()
send_events_limit = self.send_events_limit # object stored value can change during sending
for offset in range(0, count, send_events_limit):
res = self.send_events_raw(events[offset:min(offset+send_events_limit, count)])
if isinstance(res, Error):
# Shift all error indices by offset to correspond with 'events' list
for e in res.errors:
evlist = e.get("events", [])
# Update sending limit advice, if present in error
srv_limit = e.get("send_events_limit")
if srv_limit:
self.send_events_limit = min(self.send_events_limit, srv_limit)
for i in range(len(evlist)):
evlist[i] += offset
err.errors.extend(res.errors)
return err if err.errors else {}
def sendEvents(self, events=[], retry=None, pause=None):
""" Send out "events" list to server, retrying on server errors.
"""
ev = events
idx_xlat = range(len(ev))
err = Error()
retry = retry or self.retry
attempt = retry
while ev and attempt:
if attempt<retry:
self.logger.info("%d transient errors, retrying (%d to go)" % (len(ev), attempt))
sleep(pause or self.pause)
res = self.send_events_chunked(ev)
attempt -= 1
next_ev = []
next_idx_xlat = []
if isinstance(res, Error):
# Sort to process fatal errors first
res.errors.sort(key=itemgetter("error"))
for e in res.errors:
errno = e["error"]
evlist = e.get("events", xrange(len(ev))) # none means all
if errno < 500 or not attempt:
# Fatal error or last try, translate indices
# to original and prepare for returning to caller
for i in range(len(evlist)):
evlist[i] = idx_xlat[evlist[i]]
err.errors.append(e)
else:
# Maybe transient error, prepare to try again
for evlist_i in evlist:
next_ev.append(ev[evlist_i])
next_idx_xlat.append(idx_xlat[evlist_i])
ev = next_ev
idx_xlat = next_idx_xlat
return self.log_err(err) if err.errors else {}
def getEvents(self, id=None, idstore=None, count=None,
......@@ -437,19 +554,19 @@ class Client(object):
res = self.sendRequest(
"getEvents", id=id, count=count or self.recv_events_limit, cat=cat,
nocat=nocat, tag=tag, notag=notag, group=group, nogroup=nogroup)
if not res:
return res # Should be Error instance
try:
events = res["events"]
newid = res["lastid"]
except KeyError:
return Error(self.logger, message="Server returned bogus reply",
method="getEvents", exc=exc_info(), detail={"response": res})
self._saveID(newid)
if res:
try:
events = res["events"]
newid = res["lastid"]
except KeyError:
events = Error(method="getEvents", message="Server returned bogus reply",
exc=exc_info(), response=res)
self._saveID(newid)
else:
events = res
return events
return self.log_err(events)
def close(self):
......
......@@ -26,7 +26,7 @@ from random import randint
# for local version of up to date jsonschema
sys.path.append(path.join(path.dirname(__file__), "..", "lib"))
from jsonschema import Draft4Validator, FormatChecker
from jsonschema import Draft4Validator
VERSION = "3.0-not-even-alpha"
......@@ -41,7 +41,7 @@ class Error(Exception):
self.errors.extend(errors)
def append(self, **kwargs):
def append(self, _events=None, **kwargs):
self.errors.append(kwargs)
......@@ -61,7 +61,6 @@ class Error(Exception):
next_msg = e.get("message", "Unknown error")
if msg != next_msg:
msg = "Multiple errors"
logging.debug("get_http_err_msg: %s, msg: %s, %s" % (err, msg, type(msg)))
return err, msg
......@@ -268,8 +267,8 @@ class Request(Object):
self.req_id = 0 if env is None else randint(0x00000000, 0xFFFFFFFF)
def error(self, errors=None, **kwargs):
return Error(self.path, self.req_id, errors=errors, **kwargs)
def error(self, **kwargs):
return Error(self.path, self.req_id, **kwargs)
......@@ -397,7 +396,7 @@ class JSONSchemaValidator(NoValidator):
self.path = filename or path.join(path.dirname(__file__), "idea.schema")
with open(self.path) as f:
self.schema = json.load(f)
self.validator = Draft4Validator(self.schema, format_checker=FormatChecker())
self.validator = Draft4Validator(self.schema)
def __str__(self):
......@@ -637,7 +636,7 @@ class MySQL(ObjectReq):
return []
except Exception as e:
self.con.rollback()
return [{"error": 500, "message": type(e).__name__ + ": " + str(e)}]
return [{"error": 500, "message": type(e).__name__}]
def insertLastReceivedId(self, client, id):
......@@ -779,7 +778,6 @@ class Server(ObjectReq):
if isinstance(output, unicode):
output = output.encode("utf-8")
headers.append(('Content-Length', str(len(output))))
logging.debug("wsgi status %s, headers %s" % (status, headers))
start_response(status, headers)
self.req.reset()
return [output]
......@@ -899,9 +897,14 @@ class WardenHandler(ObjectReq):
return []
def add_event_num(self, errlist, i):
def add_event_nums(self, ilist, events, errlist):
for err in errlist:
err.setdefault("events", []).append(i)
err.setdefault("events", []).extend(ilist)
ev_ids = err.setdefault("events_id", [])
for i in ilist:
event = events[i]
id = event.get("ID", None)
ev_ids.append(id)
return errlist
......@@ -912,31 +915,32 @@ class WardenHandler(ObjectReq):
errs = []
if len(events)>self.send_events_limit:
errs.append({"error": 413, "message": "Too much events in one batch.",
"events": range(self.send_events_limit, len(events)),
"send_events_limit": self.send_events_limit})
errs.extend(
self.add_event_nums(range(self.send_events_limit, len(events)), events,
[{"error": 507, "message": "Too much events in one batch.",
"send_events_limit": self.send_events_limit}]))
saved = 0
for i, event in enumerate(events[0:self.send_events_limit]):
v_errs = self.validator.check(event)
if v_errs:
errs.extend(self.add_event_num(v_errs, i))
errs.extend(self.add_event_nums([i], events, v_errs))
continue
node_errs = self.check_node(event, self.req.client.identity)
if node_errs:
errs.extend(self.add_event_num(node_errs, i))
errs.extend(self.add_event_nums([i], events, node_errs))
continue
if self.req.client.test and not 'Test' in event.get('Category', []):
errs.append({"error": 422, "events": [i],
errs.extend(self.add_event_nums([i], events, [{"error": 422,
"message": "You're allowed to send only messages, containing \"Test\" among categories.",
"categories": event.get('Category', [])})
"categories": event.get('Category', [])}]))
continue
db_errs = self.db.store_event(self.req.client, event)
if db_errs:
errs.extend(self.add_event_num(db_errs, i))
errs.extend(self.add_event_nums([i], events, db_errs))
continue
saved += 1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment