From 6a603da9e959eb5bf290a9fd2bca9c175421fe27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavel=20K=C3=A1cha?= <ph@cesnet.cz> Date: Sat, 17 Jan 2015 16:36:30 +0100 Subject: [PATCH] Added Request class for request data shared among instances. Will simplify error handling and logging --- warden3/warden_server/warden_server.py | 172 ++++++++++++++++++------- 1 file changed, 123 insertions(+), 49 deletions(-) diff --git a/warden3/warden_server/warden_server.py b/warden3/warden_server/warden_server.py index ea505dd..e876648 100755 --- a/warden3/warden_server/warden_server.py +++ b/warden3/warden_server/warden_server.py @@ -116,25 +116,25 @@ def StreamLogger(stream=sys.stderr, level=logging.INFO): -def FileLogger(filename, level=logging.INFO): +def FileLogger(req, filename, level=logging.INFO): fhand = logging.FileHandler(filename) fform = logging.Formatter('%(asctime)s %(filename)s[%(process)d]: (%(levelname)s) %(message)s') fhand.setFormatter(fform) logger = get_clean_root_logger(level) logger.addHandler(fhand) - logging.info("Initialized FileLogger(filename=\"%s\", \"%s\")" % (filename, level)) + logging.info("Initialized FileLogger(req=%s, filename=\"%s\", \"%s\")" % (type(req).__name__, filename, level)) -def SysLogger(socket="/dev/log", facility=logging.handlers.SysLogHandler.LOG_DAEMON, level=logging.INFO): +def SysLogger(req, socket="/dev/log", facility=logging.handlers.SysLogHandler.LOG_DAEMON, level=logging.INFO): fhand = logging.handlers.SysLogHandler(address=socket, facility=facility) fform = logging.Formatter('%(filename)s[%(process)d]: (%(levelname)s) %(message)s') fhand.setFormatter(fform) logger = get_clean_root_logger(level) logger.addHandler(fhand) - logging.info("Initialized SysLogger(socket=\"%s\", facility=\"%s\", level=\"%s\")" % (socket, facility, level)) + logging.info("Initialized SysLogger(req=%s, socket=\"%s\", facility=\"%s\", level=\"%s\")" % (type(req).__name__, socket, facility, level)) @@ -145,29 +145,74 @@ class Object(object): -class NoAuthenticator(Object): +class Request(Object): + """ Simple container for info about ongoing request. + One instance gets created before server startup, and all other + configured objects get it as parameter during instantiation. + + Server then takes care of populating this instance on the start + of wsgi request (and resetting at the end). All other objects + then can find this actual request info in their own self.req. + + However, only Server.wsgi_app, handler (WardenHandler) exposed + methods and logging related objects should use self.req directly. + All other objects should use self.req only as source of data for + error/exception handling/logging, and should take/return + necessary data as arguments/return values for clarity on + which data their main codepaths work with. + """ def __init__(self): Object.__init__(self) + self.reset() + + + def __str__(self): + return "%s()" % (type(self).__name__, str(self.env), str(self.client)) + + + def reset(self, env=None, client=None, path=None): + self.env = {} if env is None else env + self.client = {} if client is None else client + self.path = "" if path is None else path + + + +class ObjectReq(Object): + + def __init__(self, req): + Object.__init__(self) + self.req = req + + + def __str__(self): + return "%s(req=%s)" % (type(self).__name__, type(self.req).__name__) + + +class NoAuthenticator(ObjectReq): + + def __init__(self, req): + ObjectReq.__init__(self, req) + def authenticate (self, env): return "anybody" # or None - def authorize(self, env, client, method, event, args): + def authorize(self, env, client, path, event): return (client is not None) class X509Authenticator(NoAuthenticator): - def __init__(self, db): + def __init__(self, req, db): + NoAuthenticator.__init__(self, req) self.db = db - NoAuthenticator.__init__(self) def __str__(self): - return "%s(db=%s)" % (type(self).__name__, type(self.db).__name__) + return "%s(req=%s, db=%s)" % (type(self).__name__, type(self.req).__name__, type(self.db).__name__) def get_cert_dns_names(self, pem): @@ -191,15 +236,15 @@ class X509Authenticator(NoAuthenticator): return self.db.get_client_by_name(names) - def authorize(self, env, client, method, event, args): + def authorize(self, env, client, path, event): # Authorize for debug - if (method == 'getDebug'): + if (path == 'getDebug'): if not client["debug"]: logging.info("Auth failed: client does not have debug enabled") return None return client - if method in ['getInfo', 'getEvents']: + if path in ['getInfo', 'getEvents']: return client try: @@ -219,7 +264,7 @@ class X509Authenticator(NoAuthenticator): client["service"] = service # Authorize for sending events - if (method == "sendEvents"): + if (path == "sendEvents"): if not (service["write"] or service["test"]): logging.info("Auth failed: service %i (%s) is not allowed to write or test" % (service["service_id"], identity)) return None @@ -232,7 +277,15 @@ class X509Authenticator(NoAuthenticator): return client -class NoValidator(Object): +class NoValidator(ObjectReq): + + def __init__(self, req): + ObjectReq.__init__(self, req) + + + def __str__(self): + return "%s(req=%s)" % (type(self).__name__, type(self.req).__name__) + def check(self, event): return [] @@ -240,7 +293,8 @@ class NoValidator(Object): class JSONSchemaValidator(NoValidator): - def __init__(self, filename=None): + def __init__(self, req, filename=None): + NoValidator.__init__(self, req) self.path = filename or path.join(path.dirname(__file__), "idea.schema") with open(self.path) as f: self.schema = json.load(f) @@ -248,7 +302,7 @@ class JSONSchemaValidator(NoValidator): def __str__(self): - return "%s(filename=\"%s\")" % (type(self).__name__, self.path) + return "%s(req=%s, filename=\"%s\")" % (type(self).__name__, type(self.req).__name__, self.path) def check(self, event): @@ -270,9 +324,10 @@ class JSONSchemaValidator(NoValidator): -class MySQL(Object): +class MySQL(ObjectReq): - def __init__(self, host, user, password, dbname, port, catmap_filename, tagmap_filename): + def __init__(self, req, host, user, password, dbname, port, catmap_filename, tagmap_filename): + ObjectReq.__init__(self, req) self.host = host self.user = user self.password = password @@ -295,8 +350,8 @@ class MySQL(Object): def __str__(self): - return "%s(host='%s', user='%s', dbname='%s', port=%d, catmap_filename=\"%s\", tagmap_filename=\"%s\")" % ( - type(self).__name__, self.host, self.user, self.dbname, self.port, self.catmap_filename, self.tagmap_filename) + return "%s(req=%s, host='%s', user='%s', dbname='%s', port=%d, catmap_filename=\"%s\", tagmap_filename=\"%s\")" % ( + type(self).__name__, type(self.req).__name__, self.host, self.user, self.dbname, self.port, self.catmap_filename, self.tagmap_filename) def get_client_by_name(self, name): @@ -480,18 +535,19 @@ def expose(meth): return meth -class Server(Object): +class Server(ObjectReq): - def __init__(self, auth, handler): + def __init__(self, req, auth, handler): + ObjectReq.__init__(self, req) self.auth = auth self.handler = handler def __str__(self): - return "%s(auth=%s, handler=%s)" % (type(self).__name__, type(self.auth).__name__, type(self.handler).__name__) + return "%s(req=%s, auth=%s, handler=%s)" % (type(self).__name__, type(self.req).__name__, type(self.auth).__name__, type(self.handler).__name__) - def sanitize_args(self, path, func, args, exclude=["self", "_env", "_client"]): + def sanitize_args(self, path, func, args, exclude=["self"]): # silently remove internal args, these should never be used # but if somebody does, we do not expose them by error message intargs = set(args).intersection(exclude) @@ -513,6 +569,7 @@ class Server(Object): def wsgi_app(self, environ, start_response, exc_info=None): path = environ.get("PATH_INFO", "").lstrip("/") + self.req.reset(env=environ, path=path) output = "" status = "200 OK" headers = [('Content-type', 'application/json')] @@ -530,7 +587,7 @@ class Server(Object): except Exception: raise Error("You've fallen of the cliff.", 404, method=path) - client = self.auth.authenticate(environ) + self.req.client = client = self.auth.authenticate(environ) if not client: raise Error("I'm watching. Authenticate.", 403, method=path) @@ -546,7 +603,7 @@ class Server(Object): args["events"] = events args = self.sanitize_args(path, method, args) - result = method(_env=environ, _client=client, **args) # call requested method + result = method(**args) # call requested method try: # 'default': takes care of non JSON serializable objects, @@ -581,6 +638,7 @@ class Server(Object): headers.append(('Content-Length', str(len(output)))) start_response(status, headers) + self.req.reset() return [output] @@ -588,12 +646,13 @@ class Server(Object): -class WardenHandler(Object): +class WardenHandler(ObjectReq): - def __init__(self, validator, db, auth, + def __init__(self, req, validator, db, auth, send_events_limit=100000, get_events_limit=100000, description=None): + ObjectReq.__init__(self, req) self.auth = auth self.db = db self.validator = validator @@ -603,19 +662,20 @@ class WardenHandler(Object): def __str__(self): - return "%s(validator=%s, db=%s, send_events_limit=%s, get_events_limit=%s, description=\"%s\")" % ( - type(self).__name__, type(self.validator).__name__, type(self.db).__name__, + return "%s(req=%s, validator=%s, db=%s, send_events_limit=%s, get_events_limit=%s, description=\"%s\")" % ( + type(self).__name__, type(self.req).__name__, type(self.validator).__name__, type(self.db).__name__, self.get_events_limit, self.send_events_limit, self.description) @expose - def getDebug(self, _env, _client): - auth = self.auth.authorize(_env, _client, 'getDebug', None, None) + def getDebug(self): + auth = self.auth.authorize(self.req.env, self.req.client, self.req.path, None) if not auth: - raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": _client}) + raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": self.req.client}) return { - "environment": _env, + "environment": self.req.env, + "client": self.req.client, "database": self.db.get_debug(), "system": { "uname": os.uname() @@ -635,10 +695,10 @@ class WardenHandler(Object): @expose - def getInfo(self, _env, _client): - auth = self.auth.authorize(_env, _client, 'getInfo', None, None) + def getInfo(self): + auth = self.auth.authorize(self.req.env, self.req.client, self.req.path, None) if not auth: - raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": _client}) + raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": self.req.client}) info = { "version": VERSION, @@ -651,14 +711,14 @@ class WardenHandler(Object): @expose - def getEvents(self, _env, _client, id=None, count=None, + def getEvents(self, id=None, count=None, cat=None, nocat=None, tag=None, notag=None, group=None, nogroup=None): - auth = self.auth.authorize(_env, _client, 'getEvents', None, None) + auth = self.auth.authorize(self.req.env, self.req.client, self.req.path, None) if not auth: - raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": _client}) + raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": self.req.client}) try: id = int(id[0]) @@ -667,14 +727,14 @@ class WardenHandler(Object): if id is None: try: - id = self.db.getLastReceivedId(_client) + id = self.db.getLastReceivedId(self.req.client) except Exception, e: logging.info("getEvents: cannot getLastReceivedId - " + type(e).__name__ + ": " + e) if id is None: # First access, remember the guy and get him last event id = self.db.getLastEventId() - self.db.insertLastReceivedId(_client, id) + self.db.insertLastReceivedId(self.req.client, id) return { "lastid": id, "events": [] @@ -688,17 +748,17 @@ class WardenHandler(Object): if self.get_events_limit: count = min(count, self.get_events_limit) - res = self.db.fetch_events(_client, id, count, cat, nocat, tag, notag, group, nogroup) + res = self.db.fetch_events(self.req.client, id, count, cat, nocat, tag, notag, group, nogroup) logging.info("getEvents(%d, %d, %s, %s, %s, %s, %s, %s): sending %d events" % ( id, count, cat, nocat, tag, notag, group, nogroup, len(res["events"]))) - self.db.insertLastReceivedId(_client, res['lastid']) + self.db.insertLastReceivedId(self.req.client, res['lastid']) return res @expose - def sendEvents(self, _env, _client, events=[]): + def sendEvents(self, events=[]): if not isinstance(events, list): raise Error("List of events expected.", 400, method="sendEvents") @@ -711,9 +771,9 @@ class WardenHandler(Object): for i, event in enumerate(events): ev_errs = [] - auth_cl = self.auth.authorize(_env, _client, 'sendEvents', event, None) + auth_cl = self.auth.authorize(self.req.env, self.req.client, self.req.path, event) if not auth_cl: - errs[i] = ["Client %i(%s) does not correspond with event Node info or is not allowed to write" % (_client["service"]["service_id"], _client["service"]["identity"])] + errs[i] = ["Client %i(%s) does not correspond with event Node info or is not allowed to write" % (self.req.client["service"]["service_id"], self.req.client["service"]["identity"])] continue v_errs = self.validator.check(event) @@ -824,23 +884,32 @@ def build_server(conf): # Object parameter conversions and defaults param_def = { "FileLogger": { + "req": {"type": obj, "default": "req"}, "filename": {"type": filepath, "default": path.join(path.dirname(__file__), path.splitext(path.split(__file__)[1])[0] + ".log")}, "level": {"type": loglevel, "default": "info"}, }, "SysLogger": { + "req": {"type": obj, "default": "req"}, "socket": {"type": filepath, "default": "/dev/log"}, "facility": {"type": facility, "default": "daemon"}, "level": {"type": loglevel, "default": "info"} }, - "NoAuthenticator": {}, + "NoAuthenticator": { + "req": {"type": obj, "default": "req"} + }, "X509Authenticator": { + "req": {"type": obj, "default": "req"}, "db": {"type": obj, "default": "db"} }, - "NoValidator": {}, + "NoValidator": { + "req": {"type": obj, "default": "req"}, + }, "JSONSchemaValidator": { + "req": {"type": obj, "default": "req"}, "filename": {"type": filepath, "default": path.join(path.dirname(__file__), "idea.schema")} }, "MySQL": { + "req": {"type": obj, "default": "req"}, "host": {"type": str, "default": "localhost"}, "user": {"type": str, "default": "warden"}, "password": {"type": str, "default": ""}, @@ -850,6 +919,7 @@ def build_server(conf): "tagmap_filename": {"type": filepath, "default": path.join(path.dirname(__file__), "tagmap_mysql.json")} }, "WardenHandler": { + "req": {"type": obj, "default": "req"}, "validator": {"type": obj, "default": "validator"}, "db": {"type": obj, "default": "DB"}, "auth": {"type": obj, "default": "auth"}, @@ -858,6 +928,7 @@ def build_server(conf): "description": {"type": str, "default": ""} }, "Server": { + "req": {"type": obj, "default": "req"}, "auth": {"type": obj, "default": "auth"}, "handler": {"type": obj, "default": "handler"} } @@ -916,6 +987,9 @@ def build_server(conf): # hope it at least ends up in webserver error log StreamLogger() + # Shared container for common data of ongoing WSGI request + objects["req"] = Request() + try: # Now try to init required objects for o in ("log", "db", "auth", "validator", "handler", "server"): -- GitLab