Skip to content
Snippets Groups Projects
Commit 6a603da9 authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Added Request class for request data shared among instances. Will simplify...

Added Request class for request data shared among instances. Will simplify error handling and logging
parent d27ef6b7
No related branches found
No related tags found
No related merge requests found
......@@ -116,25 +116,25 @@ def StreamLogger(stream=sys.stderr, level=logging.INFO):
def FileLogger(filename, level=logging.INFO):
def FileLogger(req, filename, level=logging.INFO):
fhand = logging.FileHandler(filename)
fform = logging.Formatter('%(asctime)s %(filename)s[%(process)d]: (%(levelname)s) %(message)s')
fhand.setFormatter(fform)
logger = get_clean_root_logger(level)
logger.addHandler(fhand)
logging.info("Initialized FileLogger(filename=\"%s\", \"%s\")" % (filename, level))
logging.info("Initialized FileLogger(req=%s, filename=\"%s\", \"%s\")" % (type(req).__name__, filename, level))
def SysLogger(socket="/dev/log", facility=logging.handlers.SysLogHandler.LOG_DAEMON, level=logging.INFO):
def SysLogger(req, socket="/dev/log", facility=logging.handlers.SysLogHandler.LOG_DAEMON, level=logging.INFO):
fhand = logging.handlers.SysLogHandler(address=socket, facility=facility)
fform = logging.Formatter('%(filename)s[%(process)d]: (%(levelname)s) %(message)s')
fhand.setFormatter(fform)
logger = get_clean_root_logger(level)
logger.addHandler(fhand)
logging.info("Initialized SysLogger(socket=\"%s\", facility=\"%s\", level=\"%s\")" % (socket, facility, level))
logging.info("Initialized SysLogger(req=%s, socket=\"%s\", facility=\"%s\", level=\"%s\")" % (type(req).__name__, socket, facility, level))
......@@ -145,29 +145,74 @@ class Object(object):
class NoAuthenticator(Object):
class Request(Object):
""" Simple container for info about ongoing request.
One instance gets created before server startup, and all other
configured objects get it as parameter during instantiation.
Server then takes care of populating this instance on the start
of wsgi request (and resetting at the end). All other objects
then can find this actual request info in their own self.req.
However, only Server.wsgi_app, handler (WardenHandler) exposed
methods and logging related objects should use self.req directly.
All other objects should use self.req only as source of data for
error/exception handling/logging, and should take/return
necessary data as arguments/return values for clarity on
which data their main codepaths work with.
"""
def __init__(self):
Object.__init__(self)
self.reset()
def __str__(self):
return "%s()" % (type(self).__name__, str(self.env), str(self.client))
def reset(self, env=None, client=None, path=None):
self.env = {} if env is None else env
self.client = {} if client is None else client
self.path = "" if path is None else path
class ObjectReq(Object):
def __init__(self, req):
Object.__init__(self)
self.req = req
def __str__(self):
return "%s(req=%s)" % (type(self).__name__, type(self.req).__name__)
class NoAuthenticator(ObjectReq):
def __init__(self, req):
ObjectReq.__init__(self, req)
def authenticate (self, env):
return "anybody" # or None
def authorize(self, env, client, method, event, args):
def authorize(self, env, client, path, event):
return (client is not None)
class X509Authenticator(NoAuthenticator):
def __init__(self, db):
def __init__(self, req, db):
NoAuthenticator.__init__(self, req)
self.db = db
NoAuthenticator.__init__(self)
def __str__(self):
return "%s(db=%s)" % (type(self).__name__, type(self.db).__name__)
return "%s(req=%s, db=%s)" % (type(self).__name__, type(self.req).__name__, type(self.db).__name__)
def get_cert_dns_names(self, pem):
......@@ -191,15 +236,15 @@ class X509Authenticator(NoAuthenticator):
return self.db.get_client_by_name(names)
def authorize(self, env, client, method, event, args):
def authorize(self, env, client, path, event):
# Authorize for debug
if (method == 'getDebug'):
if (path == 'getDebug'):
if not client["debug"]:
logging.info("Auth failed: client does not have debug enabled")
return None
return client
if method in ['getInfo', 'getEvents']:
if path in ['getInfo', 'getEvents']:
return client
try:
......@@ -219,7 +264,7 @@ class X509Authenticator(NoAuthenticator):
client["service"] = service
# Authorize for sending events
if (method == "sendEvents"):
if (path == "sendEvents"):
if not (service["write"] or service["test"]):
logging.info("Auth failed: service %i (%s) is not allowed to write or test" % (service["service_id"], identity))
return None
......@@ -232,7 +277,15 @@ class X509Authenticator(NoAuthenticator):
return client
class NoValidator(Object):
class NoValidator(ObjectReq):
def __init__(self, req):
ObjectReq.__init__(self, req)
def __str__(self):
return "%s(req=%s)" % (type(self).__name__, type(self.req).__name__)
def check(self, event):
return []
......@@ -240,7 +293,8 @@ class NoValidator(Object):
class JSONSchemaValidator(NoValidator):
def __init__(self, filename=None):
def __init__(self, req, filename=None):
NoValidator.__init__(self, req)
self.path = filename or path.join(path.dirname(__file__), "idea.schema")
with open(self.path) as f:
self.schema = json.load(f)
......@@ -248,7 +302,7 @@ class JSONSchemaValidator(NoValidator):
def __str__(self):
return "%s(filename=\"%s\")" % (type(self).__name__, self.path)
return "%s(req=%s, filename=\"%s\")" % (type(self).__name__, type(self.req).__name__, self.path)
def check(self, event):
......@@ -270,9 +324,10 @@ class JSONSchemaValidator(NoValidator):
class MySQL(Object):
class MySQL(ObjectReq):
def __init__(self, host, user, password, dbname, port, catmap_filename, tagmap_filename):
def __init__(self, req, host, user, password, dbname, port, catmap_filename, tagmap_filename):
ObjectReq.__init__(self, req)
self.host = host
self.user = user
self.password = password
......@@ -295,8 +350,8 @@ class MySQL(Object):
def __str__(self):
return "%s(host='%s', user='%s', dbname='%s', port=%d, catmap_filename=\"%s\", tagmap_filename=\"%s\")" % (
type(self).__name__, self.host, self.user, self.dbname, self.port, self.catmap_filename, self.tagmap_filename)
return "%s(req=%s, host='%s', user='%s', dbname='%s', port=%d, catmap_filename=\"%s\", tagmap_filename=\"%s\")" % (
type(self).__name__, type(self.req).__name__, self.host, self.user, self.dbname, self.port, self.catmap_filename, self.tagmap_filename)
def get_client_by_name(self, name):
......@@ -480,18 +535,19 @@ def expose(meth):
return meth
class Server(Object):
class Server(ObjectReq):
def __init__(self, auth, handler):
def __init__(self, req, auth, handler):
ObjectReq.__init__(self, req)
self.auth = auth
self.handler = handler
def __str__(self):
return "%s(auth=%s, handler=%s)" % (type(self).__name__, type(self.auth).__name__, type(self.handler).__name__)
return "%s(req=%s, auth=%s, handler=%s)" % (type(self).__name__, type(self.req).__name__, type(self.auth).__name__, type(self.handler).__name__)
def sanitize_args(self, path, func, args, exclude=["self", "_env", "_client"]):
def sanitize_args(self, path, func, args, exclude=["self"]):
# silently remove internal args, these should never be used
# but if somebody does, we do not expose them by error message
intargs = set(args).intersection(exclude)
......@@ -513,6 +569,7 @@ class Server(Object):
def wsgi_app(self, environ, start_response, exc_info=None):
path = environ.get("PATH_INFO", "").lstrip("/")
self.req.reset(env=environ, path=path)
output = ""
status = "200 OK"
headers = [('Content-type', 'application/json')]
......@@ -530,7 +587,7 @@ class Server(Object):
except Exception:
raise Error("You've fallen of the cliff.", 404, method=path)
client = self.auth.authenticate(environ)
self.req.client = client = self.auth.authenticate(environ)
if not client:
raise Error("I'm watching. Authenticate.", 403, method=path)
......@@ -546,7 +603,7 @@ class Server(Object):
args["events"] = events
args = self.sanitize_args(path, method, args)
result = method(_env=environ, _client=client, **args) # call requested method
result = method(**args) # call requested method
try:
# 'default': takes care of non JSON serializable objects,
......@@ -581,6 +638,7 @@ class Server(Object):
headers.append(('Content-Length', str(len(output))))
start_response(status, headers)
self.req.reset()
return [output]
......@@ -588,12 +646,13 @@ class Server(Object):
class WardenHandler(Object):
class WardenHandler(ObjectReq):
def __init__(self, validator, db, auth,
def __init__(self, req, validator, db, auth,
send_events_limit=100000, get_events_limit=100000,
description=None):
ObjectReq.__init__(self, req)
self.auth = auth
self.db = db
self.validator = validator
......@@ -603,19 +662,20 @@ class WardenHandler(Object):
def __str__(self):
return "%s(validator=%s, db=%s, send_events_limit=%s, get_events_limit=%s, description=\"%s\")" % (
type(self).__name__, type(self.validator).__name__, type(self.db).__name__,
return "%s(req=%s, validator=%s, db=%s, send_events_limit=%s, get_events_limit=%s, description=\"%s\")" % (
type(self).__name__, type(self.req).__name__, type(self.validator).__name__, type(self.db).__name__,
self.get_events_limit, self.send_events_limit, self.description)
@expose
def getDebug(self, _env, _client):
auth = self.auth.authorize(_env, _client, 'getDebug', None, None)
def getDebug(self):
auth = self.auth.authorize(self.req.env, self.req.client, self.req.path, None)
if not auth:
raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": _client})
raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": self.req.client})
return {
"environment": _env,
"environment": self.req.env,
"client": self.req.client,
"database": self.db.get_debug(),
"system": {
"uname": os.uname()
......@@ -635,10 +695,10 @@ class WardenHandler(Object):
@expose
def getInfo(self, _env, _client):
auth = self.auth.authorize(_env, _client, 'getInfo', None, None)
def getInfo(self):
auth = self.auth.authorize(self.req.env, self.req.client, self.req.path, None)
if not auth:
raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": _client})
raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": self.req.client})
info = {
"version": VERSION,
......@@ -651,14 +711,14 @@ class WardenHandler(Object):
@expose
def getEvents(self, _env, _client, id=None, count=None,
def getEvents(self, id=None, count=None,
cat=None, nocat=None,
tag=None, notag=None,
group=None, nogroup=None):
auth = self.auth.authorize(_env, _client, 'getEvents', None, None)
auth = self.auth.authorize(self.req.env, self.req.client, self.req.path, None)
if not auth:
raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": _client})
raise Error("I'm watching. Authorize.", 403, method='getDebug', detail={"client": self.req.client})
try:
id = int(id[0])
......@@ -667,14 +727,14 @@ class WardenHandler(Object):
if id is None:
try:
id = self.db.getLastReceivedId(_client)
id = self.db.getLastReceivedId(self.req.client)
except Exception, e:
logging.info("getEvents: cannot getLastReceivedId - " + type(e).__name__ + ": " + e)
if id is None:
# First access, remember the guy and get him last event
id = self.db.getLastEventId()
self.db.insertLastReceivedId(_client, id)
self.db.insertLastReceivedId(self.req.client, id)
return {
"lastid": id,
"events": []
......@@ -688,17 +748,17 @@ class WardenHandler(Object):
if self.get_events_limit:
count = min(count, self.get_events_limit)
res = self.db.fetch_events(_client, id, count, cat, nocat, tag, notag, group, nogroup)
res = self.db.fetch_events(self.req.client, id, count, cat, nocat, tag, notag, group, nogroup)
logging.info("getEvents(%d, %d, %s, %s, %s, %s, %s, %s): sending %d events" % (
id, count, cat, nocat, tag, notag, group, nogroup, len(res["events"])))
self.db.insertLastReceivedId(_client, res['lastid'])
self.db.insertLastReceivedId(self.req.client, res['lastid'])
return res
@expose
def sendEvents(self, _env, _client, events=[]):
def sendEvents(self, events=[]):
if not isinstance(events, list):
raise Error("List of events expected.", 400, method="sendEvents")
......@@ -711,9 +771,9 @@ class WardenHandler(Object):
for i, event in enumerate(events):
ev_errs = []
auth_cl = self.auth.authorize(_env, _client, 'sendEvents', event, None)
auth_cl = self.auth.authorize(self.req.env, self.req.client, self.req.path, event)
if not auth_cl:
errs[i] = ["Client %i(%s) does not correspond with event Node info or is not allowed to write" % (_client["service"]["service_id"], _client["service"]["identity"])]
errs[i] = ["Client %i(%s) does not correspond with event Node info or is not allowed to write" % (self.req.client["service"]["service_id"], self.req.client["service"]["identity"])]
continue
v_errs = self.validator.check(event)
......@@ -824,23 +884,32 @@ def build_server(conf):
# Object parameter conversions and defaults
param_def = {
"FileLogger": {
"req": {"type": obj, "default": "req"},
"filename": {"type": filepath, "default": path.join(path.dirname(__file__), path.splitext(path.split(__file__)[1])[0] + ".log")},
"level": {"type": loglevel, "default": "info"},
},
"SysLogger": {
"req": {"type": obj, "default": "req"},
"socket": {"type": filepath, "default": "/dev/log"},
"facility": {"type": facility, "default": "daemon"},
"level": {"type": loglevel, "default": "info"}
},
"NoAuthenticator": {},
"NoAuthenticator": {
"req": {"type": obj, "default": "req"}
},
"X509Authenticator": {
"req": {"type": obj, "default": "req"},
"db": {"type": obj, "default": "db"}
},
"NoValidator": {},
"NoValidator": {
"req": {"type": obj, "default": "req"},
},
"JSONSchemaValidator": {
"req": {"type": obj, "default": "req"},
"filename": {"type": filepath, "default": path.join(path.dirname(__file__), "idea.schema")}
},
"MySQL": {
"req": {"type": obj, "default": "req"},
"host": {"type": str, "default": "localhost"},
"user": {"type": str, "default": "warden"},
"password": {"type": str, "default": ""},
......@@ -850,6 +919,7 @@ def build_server(conf):
"tagmap_filename": {"type": filepath, "default": path.join(path.dirname(__file__), "tagmap_mysql.json")}
},
"WardenHandler": {
"req": {"type": obj, "default": "req"},
"validator": {"type": obj, "default": "validator"},
"db": {"type": obj, "default": "DB"},
"auth": {"type": obj, "default": "auth"},
......@@ -858,6 +928,7 @@ def build_server(conf):
"description": {"type": str, "default": ""}
},
"Server": {
"req": {"type": obj, "default": "req"},
"auth": {"type": obj, "default": "auth"},
"handler": {"type": obj, "default": "handler"}
}
......@@ -916,6 +987,9 @@ def build_server(conf):
# hope it at least ends up in webserver error log
StreamLogger()
# Shared container for common data of ongoing WSGI request
objects["req"] = Request()
try:
# Now try to init required objects
for o in ("log", "db", "auth", "validator", "handler", "server"):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment