diff --git a/warden3/contrib/warden_filer/warden_filer.py b/warden3/contrib/warden_filer/warden_filer.py index fc8cae6e358646ce86eee8852f99cd032e7dbe34..84b1c56eee7e7ee9cebeaf46970d99e2d3506360 100644 --- a/warden3/contrib/warden_filer/warden_filer.py +++ b/warden3/contrib/warden_filer/warden_filer.py @@ -161,18 +161,17 @@ def receiver(config, wclient, sdir, oneshot): nf.moveto(sdir.incoming) count_ok += 1 except Exception as e: - Error("Error saving event", wclient.logger, exc=sys.exc_info(), - detail={"file": str(nf), "event_id": event.get("ID"), "sdir": sdir.path}) + Error(message="Error saving event", exc=sys.exc_info(), file=str(nf), + event_ids=[event.get("ID")], sdir=sdir.path).log(wclient.logger) count_err += 1 wclient.logger.info( "warden_filer: received %d, errors %d" % (count_ok, count_err)) - if oneshot: - events = None - else: - events = wclient.getEvents(**filt) + events = wclient.getEvents(**filt) + count_ok = count_err = 0 if oneshot: - terminate_me(None, None) + if not events: + terminate_me(None, None) else: time.sleep(poll_time) @@ -192,64 +191,49 @@ def sender(config, wclient, sdir, oneshot): nflist = sdir.get_incoming() # count chunk iterations rounded up count = len(nflist) - for i in range(0, count, send_events_limit): - # process one at most send_events_limit long chunk - events = [] - nf_sent = [] - for j in range(i, min(i+send_events_limit, count)): - nf = nflist[j] - # prepare event array from files - try: - nf.moveto(sdir.temp) - except Exception: - pass # Silently go to next filename, somebody else might have interfered - try: - with nf.open("rb") as fd: - data = fd.read() - event = json.loads(data) - if node: - nodelist = event.setdefault("Node", []) - nodelist.insert(0, node) - events.append(event) - nf_sent.append(nf) - except Exception as e: - Error("Error loading event", wclient.logger, exc=sys.exc_info(), - detail={"file": str(nf), "sdir": sdir.path}) - nf.moveto(sdir.errors) - - res = wclient.sendEvents(events) - - count_ok = count_err = count_retry = 0 - if isinstance(res, Error): - for e in res.errors: - # list of events with this error (none means all events) - evlist = e.get("events", xrange(len(nf_sent))) - # error number - error = e.get("error", 0) - for idx in evlist: - try: - idx_n = int(idx) - # 4xx errors are permanent, 5xx are server ones, suitable for retry - dest_dir = sdir.errors if 400 <= error < 500 else sdir.incoming - except ValueError: - # Cannot grok event number, skip - continue - if nf_sent[idx_n]: - nf_sent[idx_n].moveto(dest_dir) - nf_sent[idx_n] = None - if dest_dir == sdir.errors: - count_err += 1 - else: - count_retry += 1 - - # Cleanup rest - succesfully sent events - for name in nf_sent: - if name: - name.remove() - count_ok += 1 - wclient.logger.info( - "warden_filer: saved %d, errors %d, retreated %d" - % (count_ok, count_err, count_retry)) + + events = [] + nf_sent = [] + for nf in nflist: + # prepare event array from files + try: + nf.moveto(sdir.temp) + except Exception: + continue # Silently go to next filename, somebody else might have interfered + try: + with nf.open("rb") as fd: + data = fd.read() + event = json.loads(data) + if node: + nodelist = event.setdefault("Node", []) + nodelist.insert(0, node) + events.append(event) + nf_sent.append(nf) + except Exception as e: + Error(message="Error loading event", exc=sys.exc_info(), file=str(nf), + sdir=sdir.path).log(wclient.logger) + nf.moveto(sdir.errors) + + res = wclient.sendEvents(events) + + count_ok = count_err = count_retry = 0 + if isinstance(res, Error): + for e in res.errors: + errno = e["error"] + evlist = e.get("events", xrange(len(nf_sent))) # None means all + for i in evlist: + if nf_sent[i]: + nf_sent[i].moveto(dest_dir) + nf_sent[i] = None + count_err += 1 + + # Cleanup rest - succesfully sent events + for name in nf_sent: + if name: + name.remove() + count_ok += 1 + wclient.logger.info( + "warden_filer: saved %d, errors %d" % (count_ok, count_err)) @@ -324,7 +308,6 @@ if __name__ == "__main__": wconfig, fconfig = get_configs() oneshot = args.oneshot - safe_dir = SafeDir(fconfig.get("dir", args.func)) wclient = Client(**wconfig) diff --git a/warden3/warden_client/warden_client.py b/warden3/warden_client/warden_client.py index a506d4c5c7f99e28c609d5d05e0c1ddaf9eee234..c17768213bcfdcd3eb5b185906c758a372776f3e 100644 --- a/warden3/warden_client/warden_client.py +++ b/warden3/warden_client/warden_client.py @@ -11,8 +11,12 @@ from sys import stderr, exc_info from pprint import pformat from traceback import format_tb from os import path +from time import sleep +from operator import itemgetter +VERSION = "3.0-not-even-alpha" + class HTTPSConnection(httplib.HTTPSConnection): ''' Overridden to allow peer certificate validation, configuration @@ -56,16 +60,13 @@ class Error(Exception): Also, it can be raised as an exception. """ - def __init__(self, logger=None, prio=logging.ERROR, method=None, req_id=None, - exc=None, errors=None, **kwargs): + def __init__(self, method=None, req_id=None, errors=None, **kwargs): self.errors = [] if errors: self.extend(method, req_id, errors) if kwargs: self.append(method, req_id, **kwargs) - if logger: - log(logger, prio) def append(self, method=None, req_id=None, **kwargs): @@ -76,12 +77,47 @@ class Error(Exception): kwargs["method"] = method if req_id and not "req_id" in kwargs: kwargs["req_id"] = req_id + # Ugly, but be paranoid, don't rely on server reply to be well formed + try: + kwargs["error"] = int(kwargs["error"]) + except Exception: + kwargs["error"] = 0 + if "events" in kwargs: + evlist = kwargs["events"] + try: + evlist_new = [] + for ev in evlist: + try: + evlist_new.append(int(ev)) + except Exception: + pass + kwargs["events"] = evlist_new + except Exception: + kwargs["events"] = [] + if "events_id" in kwargs: + try: + dummy = iter(kwargs["events_id"]) + except TypeError: + kwargs["events_id"] = [None]*len(kwargs["events"]) + if "send_events_limit" in kwargs: + try: + kwargs["send_events_limit"] = int(kwargs["send_events_limit"]) + except Exception: + del kwargs["send_events_limit"] self.errors.append(kwargs) def extend(self, method=None, req_id=None, iterable=[]): + try: + dummy = iter(iterable) + except TypeError: + iterable = [] # Bad joke from server for e in iterable: - self.append(method, req_id, **e) + try: + args = dict(e) + except TypeError: + args = {} # Not funny! + self.append(method, req_id, **args) def __len__ (self): @@ -105,10 +141,16 @@ class Error(Exception): def __str__(self): - return "\n".join(self.str_err(e) for e in self.errors) + out = [] + for e in self.errors: + out.append(self.str_err(e)) + out.append(self.str_info(e)) + return "\n".join(out) - def log(self, logger, prio=logging.ERROR): + def log(self, logger=None, prio=logging.ERROR): + if not logger: + logger = logging.getLogger() for e in self.errors: logger.log(prio, self.str_err(e)) info = self.str_info(e) @@ -167,12 +209,15 @@ class Client(object): keyfile=None, cafile=None, timeout=60, + retry=3, + pause=5, recv_events_limit=6000, - errlog={"level": "debug"}, + send_events_limit=500, + errlog={}, syslog=None, filelog=None, idstore=None, - name="warden_client", + name="org.example.warden.test", secret=None): self.name = name @@ -193,9 +238,15 @@ class Client(object): self.recv_events_limit = int(recv_events_limit) self.idstore = path.join(base, idstore) if idstore is not None else None + self.send_events_limit = int(send_events_limit) + self.retry = int(retry) + self.pause = int(pause) + self.ciphers = 'TLS_RSA_WITH_AES_256_CBC_SHA' self.sslversion = ssl.PROTOCOL_TLSv1 + self.getInfo() # Call to align limits with server opinion + def init_log(self, errlog, syslog, filelog): @@ -237,7 +288,7 @@ class Client(object): fl.setFormatter(format_time) self.logger.addHandler(fl) except Exception as e: - Error(self.logger, message="Unable to setup file logging", exc=exc_info()) + Error(message="Unable to setup file logging", exc=exc_info()).log(self.logger) if syslog is not None: try: @@ -248,7 +299,7 @@ class Client(object): sl.setFormatter(format_notime) self.logger.addHandler(sl) except Exception as e: - Error(self.logger, message="Unable to setup syslog logging", exc=exc_info()) + Error(message="Unable to setup syslog logging", exc=exc_info()).log(self.logger) if not (errlog or filelog or syslog): # User wants explicitly no logging, so let him shoot his socks off. @@ -257,6 +308,12 @@ class Client(object): self.logger.addHandler(logging.NullHandler()) + def log_err(self, err, prio=logging.ERROR): + if isinstance(err, Error): + err.log(self.logger, prio) + return err + + def connect(self): try: @@ -276,18 +333,17 @@ class Client(object): strict = False, timeout = self.timeout) else: - return Error(self.logger, message="Don't know how to connect to \"%s\"" % self.url.scheme, - detail={"url": self.url.geturl()}) + return Error(message="Don't know how to connect to \"%s\"" % self.url.scheme, + url=self.url.geturl()) except Exception: - return Error(self.logger, message="HTTPS connection failed", exc=exc_info(), - detail={ - "url": self.url.geturl(), - "timeout": self.timeout, - "key_file": self.keyfile, - "cert_file": self.certfile, - "cafile": self.cafile, - "ciphers": self.ciphers, - "ssl_version": self.sslversion}) + return Error(message="HTTP(S) connection failed", exc=exc_info(), + url=self.url.geturl(), + timeout=self.timeout, + key_file=self.keyfile, + cert_file=self.certfile, + cafile=self.cafile, + ciphers=self.ciphers, + ssl_version=self.sslversion) return conn @@ -313,8 +369,8 @@ class Client(object): else: data = json.dumps(payload) except: - return Error(self.logger, message="Serialization to JSON failed", - exc=exc_info(), method=func, detail=payload) + return Error(message="Serialization to JSON failed", + exc=exc_info(), method=func, payload=payload) self.headers = { "Content-Type": "application/json", @@ -332,29 +388,22 @@ class Client(object): conn.request("POST", loc, data, self.headers) except: conn.close() - return Error(self.logger, message="Sending of request to server failed", - exc=exc_info(), method=func, detail={ - "loc": loc, - "headers": self.headers, - "data": data}) + return Error(message="Sending of request to server failed", + exc=exc_info(), method=func, log=loc, headers=self.headers, data=data) try: res = conn.getresponse() except: conn.close() - return Error(self.logger, method=func, message="HTTP reply failed", exc=exc_info(), detail={ - "loc": loc, - "headers": self.headers, - "data": data}) + return Error(method=func, message="HTTP reply failed", + exc=exc_info(), loc=loc, headers=self.headers, data=data) try: response_data = res.read() except: conn.close() - return Error(self.logger, method=func, message="Fetching HTTP data from server failed", exc=exc_info(), detail={ - "loc": loc, - "headers": self.headers, - "data": data}) + return Error(method=func, message="Fetching HTTP data from server failed", + exc=exc_info(), loc=loc, headers=self.headers, data=data) conn.close() @@ -362,20 +411,17 @@ class Client(object): try: data = json.loads(response_data) except: - data = Error(self.logger, message="JSON message parsing failed", - exc=exc_info(), method=func, detail={"response": response_data}) + data = Error(method=func, message="JSON message parsing failed", + exc=exc_info(), response=response_data) else: try: data = json.loads(response_data) data["errors"] # trigger exception if not dict or no error key except: - data = Error(self.logger, message="Generic server HTTP error", - method=func, - error=res.status, - exc=exc_info(), - detail={"response": response_data}) + data = Error(method=func, message="Generic server HTTP error", + error=res.status, exc=exc_info(), response=response_data) else: - data = Error(self.logger, + data = Error( method=data.get("method", None), req_id=data.get("req_id", None), errors=data.get("errors", [])) @@ -392,8 +438,8 @@ class Client(object): f.write(str(id)) except (ValueError, IOError) as e: # Use Error instance just for proper logging - Error(self.logger, message="Writing id file \"%s\" failed" % idf, - prio=logging.INFO, exc=exc_info(), detail={"idstore": idf}) + Error(message="Writing id file \"%s\" failed" % idf, exc=exc_info(), + idstore=idf).log(self.logger, logging.INFO) return id @@ -405,25 +451,96 @@ class Client(object): with open(idf, "r") as f: id = int(f.read()) except (ValueError, IOError) as e: - Error(self.logger, prio=logging.INFO, - message="Reading id file \"%s\" failed, relying on server" % idf, - exc=exc_info(), detail={"idstore": idf}) + Error(message="Reading id file \"%s\" failed, relying on server" % idf, + exc=exc_info(), idstore=idf).log(self.logger, logging.INFO) id = None return id def getDebug(self): - return self.sendRequest("getDebug") + return self.log_err(self.sendRequest("getDebug")) def getInfo(self): - return self.sendRequest("getInfo") + res = self.sendRequest("getInfo") + if isinstance(res, Error): + res.log(self.logger) + else: + try: + self.send_events_limit = min(res["send_events_limit"], self.send_events_limit) + self.recv_events_limit = min(res["recv_events_limit"], self.recv_events_limit) + except (AttributeError, TypeError, KeyError): + pass + return res - def sendEvents(self, events=[]): - res = self.sendRequest( - "sendEvents", payload=events) - return res + def send_events_raw(self, events=[]): + return self.sendRequest("sendEvents", payload=events) + + + def send_events_chunked(self, events=[]): + """ Split potentially long "events" list to send_events_limit + long chunks to avoid slap from server. + """ + count = len(events) + err = Error() + send_events_limit = self.send_events_limit # object stored value can change during sending + for offset in range(0, count, send_events_limit): + res = self.send_events_raw(events[offset:min(offset+send_events_limit, count)]) + + if isinstance(res, Error): + # Shift all error indices by offset to correspond with 'events' list + for e in res.errors: + evlist = e.get("events", []) + # Update sending limit advice, if present in error + srv_limit = e.get("send_events_limit") + if srv_limit: + self.send_events_limit = min(self.send_events_limit, srv_limit) + for i in range(len(evlist)): + evlist[i] += offset + err.errors.extend(res.errors) + + return err if err.errors else {} + + + def sendEvents(self, events=[], retry=None, pause=None): + """ Send out "events" list to server, retrying on server errors. + """ + ev = events + idx_xlat = range(len(ev)) + err = Error() + retry = retry or self.retry + attempt = retry + while ev and attempt: + if attempt<retry: + self.logger.info("%d transient errors, retrying (%d to go)" % (len(ev), attempt)) + sleep(pause or self.pause) + res = self.send_events_chunked(ev) + attempt -= 1 + + next_ev = [] + next_idx_xlat = [] + if isinstance(res, Error): + # Sort to process fatal errors first + res.errors.sort(key=itemgetter("error")) + for e in res.errors: + errno = e["error"] + evlist = e.get("events", xrange(len(ev))) # none means all + if errno < 500 or not attempt: + # Fatal error or last try, translate indices + # to original and prepare for returning to caller + for i in range(len(evlist)): + evlist[i] = idx_xlat[evlist[i]] + err.errors.append(e) + else: + # Maybe transient error, prepare to try again + for evlist_i in evlist: + next_ev.append(ev[evlist_i]) + next_idx_xlat.append(idx_xlat[evlist_i]) + ev = next_ev + idx_xlat = next_idx_xlat + + return self.log_err(err) if err.errors else {} def getEvents(self, id=None, idstore=None, count=None, @@ -437,19 +554,19 @@ class Client(object): res = self.sendRequest( "getEvents", id=id, count=count or self.recv_events_limit, cat=cat, nocat=nocat, tag=tag, notag=notag, group=group, nogroup=nogroup) - if not res: - return res # Should be Error instance - - try: - events = res["events"] - newid = res["lastid"] - except KeyError: - return Error(self.logger, message="Server returned bogus reply", - method="getEvents", exc=exc_info(), detail={"response": res}) - self._saveID(newid) + if res: + try: + events = res["events"] + newid = res["lastid"] + except KeyError: + events = Error(method="getEvents", message="Server returned bogus reply", + exc=exc_info(), response=res) + self._saveID(newid) + else: + events = res - return events + return self.log_err(events) def close(self): diff --git a/warden3/warden_server/warden_server.py b/warden3/warden_server/warden_server.py index 3752e576aaf79c83ba8c95b044150f3e3161957f..141be756b566343926093ec3d7e35e1e97e769c4 100755 --- a/warden3/warden_server/warden_server.py +++ b/warden3/warden_server/warden_server.py @@ -26,7 +26,7 @@ from random import randint # for local version of up to date jsonschema sys.path.append(path.join(path.dirname(__file__), "..", "lib")) -from jsonschema import Draft4Validator, FormatChecker +from jsonschema import Draft4Validator VERSION = "3.0-not-even-alpha" @@ -41,7 +41,7 @@ class Error(Exception): self.errors.extend(errors) - def append(self, **kwargs): + def append(self, _events=None, **kwargs): self.errors.append(kwargs) @@ -61,7 +61,6 @@ class Error(Exception): next_msg = e.get("message", "Unknown error") if msg != next_msg: msg = "Multiple errors" - logging.debug("get_http_err_msg: %s, msg: %s, %s" % (err, msg, type(msg))) return err, msg @@ -268,8 +267,8 @@ class Request(Object): self.req_id = 0 if env is None else randint(0x00000000, 0xFFFFFFFF) - def error(self, errors=None, **kwargs): - return Error(self.path, self.req_id, errors=errors, **kwargs) + def error(self, **kwargs): + return Error(self.path, self.req_id, **kwargs) @@ -397,7 +396,7 @@ class JSONSchemaValidator(NoValidator): self.path = filename or path.join(path.dirname(__file__), "idea.schema") with open(self.path) as f: self.schema = json.load(f) - self.validator = Draft4Validator(self.schema, format_checker=FormatChecker()) + self.validator = Draft4Validator(self.schema) def __str__(self): @@ -637,7 +636,7 @@ class MySQL(ObjectReq): return [] except Exception as e: self.con.rollback() - return [{"error": 500, "message": type(e).__name__ + ": " + str(e)}] + return [{"error": 500, "message": type(e).__name__}] def insertLastReceivedId(self, client, id): @@ -779,7 +778,6 @@ class Server(ObjectReq): if isinstance(output, unicode): output = output.encode("utf-8") headers.append(('Content-Length', str(len(output)))) - logging.debug("wsgi status %s, headers %s" % (status, headers)) start_response(status, headers) self.req.reset() return [output] @@ -899,9 +897,14 @@ class WardenHandler(ObjectReq): return [] - def add_event_num(self, errlist, i): + def add_event_nums(self, ilist, events, errlist): for err in errlist: - err.setdefault("events", []).append(i) + err.setdefault("events", []).extend(ilist) + ev_ids = err.setdefault("events_id", []) + for i in ilist: + event = events[i] + id = event.get("ID", None) + ev_ids.append(id) return errlist @@ -912,31 +915,32 @@ class WardenHandler(ObjectReq): errs = [] if len(events)>self.send_events_limit: - errs.append({"error": 413, "message": "Too much events in one batch.", - "events": range(self.send_events_limit, len(events)), - "send_events_limit": self.send_events_limit}) + errs.extend( + self.add_event_nums(range(self.send_events_limit, len(events)), events, + [{"error": 507, "message": "Too much events in one batch.", + "send_events_limit": self.send_events_limit}])) saved = 0 for i, event in enumerate(events[0:self.send_events_limit]): v_errs = self.validator.check(event) if v_errs: - errs.extend(self.add_event_num(v_errs, i)) + errs.extend(self.add_event_nums([i], events, v_errs)) continue node_errs = self.check_node(event, self.req.client.identity) if node_errs: - errs.extend(self.add_event_num(node_errs, i)) + errs.extend(self.add_event_nums([i], events, node_errs)) continue if self.req.client.test and not 'Test' in event.get('Category', []): - errs.append({"error": 422, "events": [i], + errs.extend(self.add_event_nums([i], events, [{"error": 422, "message": "You're allowed to send only messages, containing \"Test\" among categories.", - "categories": event.get('Category', [])}) + "categories": event.get('Category', [])}])) continue db_errs = self.db.store_event(self.req.client, event) if db_errs: - errs.extend(self.add_event_num(db_errs, i)) + errs.extend(self.add_event_nums([i], events, db_errs)) continue saved += 1