Newer
Older
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
def json_wrapper(method):
def meth_deco(self, post, **args):
if "events" in method.func_code.co_varnames[0:method.func_code.co_argcount]:
try:
events = json.loads(post) if post else None
except Exception as e:
raise self.req.error(message="Deserialization error.", error=400,
exc=sys.exc_info(), args=post, parser=str(e))
if events:
args["events"] = events
result = method(self, **args) # call requested method
try:
# 'default': takes care of non JSON serializable objects,
# which could (although shouldn't) appear in handler code
output = json.dumps(result, default=lambda v: str(v))
except Exception as e:
raise self.req.error(message="Serialization error", error=500,
exc=sys.exc_info(), args=str(result))
return [('Content-type', 'application/json')], output
try:
meth_deco.arguments = method.arguments
except AttributeError:
meth_deco.arguments = method.func_code.co_varnames[:method.func_code.co_argcount]
return meth_deco
class WardenHandler(ObjectBase):
def __init__(self, req, log, validator, db, auth,
send_events_limit=500, get_events_limit=1000,
ObjectBase.__init__(self, req, log)
self.db = db
self.validator = validator
self.send_events_limit = send_events_limit
self.get_events_limit = get_events_limit
self.description = description
def __str__(self):
return "%s(req=%s, validator=%s, db=%s, send_events_limit=%s, get_events_limit=%s, description=\"%s\")" % (
type(self).__name__, type(self.req).__name__, type(self.validator).__name__, type(self.db).__name__,
self.get_events_limit, self.send_events_limit, self.description)

Pavel Kácha
committed
@expose(read=1, debug=1)
@json_wrapper
def getDebug(self):
"environment": self.req.env,

Pavel Kácha
committed
"client": self.req.client.__dict__,
"database": self.db.get_debug(),
"system": {
"uname": os.uname()
},
"process": {
"cwd": os.getcwdu(),
"pid": os.getpid(),
"ppid": os.getppid(),
"pgrp": os.getpgrp(),
"uid": os.getuid(),
"gid": os.getgid(),
"euid": os.geteuid(),
"egid": os.getegid(),
"groups": os.getgroups()
}

Pavel Kácha
committed
@expose(read=1)
@json_wrapper
def getInfo(self):
info = {
"version": VERSION,
"send_events_limit": self.send_events_limit,
"get_events_limit": self.get_events_limit
}
if self.description:
info["description"] = self.description
return info

Pavel Kácha
committed
@expose(read=1)
@json_wrapper
def getEvents(self, id=None, count=None,
cat=None, nocat=None,
tag=None, notag=None,
group=None, nogroup=None):
try:
id = int(id[0])
except (ValueError, TypeError, IndexError):

Pavel Kácha
committed
# If client was already here, fetch server notion of his last id
id = self.db.getLastReceivedId(self.req.client)
self.log.info("cannot getLastReceivedId - " + type(e).__name__ + ": " + str(e))

Pavel Kácha
committed
# First access, remember the guy and get him last id
id = self.db.getLastEventId()
self.db.insertLastReceivedId(self.req.client, id)
return {
"lastid": id,
"events": []
}

Pavel Kácha
committed
if id<=0:
# Client wants to get only last N events and reset server notion of last id
id += self.db.getLastEventId()
if id<0: id=0
count = int(count[0])
except (ValueError, TypeError, IndexError):
count = self.get_events_limit
if self.get_events_limit:
count = min(count, self.get_events_limit)
res = self.db.fetch_events(self.req.client, id, count, cat, nocat, tag, notag, group, nogroup)
self.db.insertLastReceivedId(self.req.client, res['lastid'])
self.log.info("sending %d events, lastid is %i" % (len(res["events"]), res["lastid"]))
def check_node(self, event, name):

Pavel Kácha
committed
try:
ev_id = event['Node'][0]['Name'].lower()

Pavel Kácha
committed
# Event does not bear valid Node attribute
return [{"error": 422, "message": "Event does not bear valid Node attribute"}]
if ev_id != name:
return [{"error": 422, "message": "Node does not correspond with saving client"}]

Pavel Kácha
committed
return []

Pavel Kácha
committed
def add_event_nums(self, ilist, events, errlist):

Pavel Kácha
committed
for err in errlist:

Pavel Kácha
committed
err.setdefault("events", []).extend(ilist)
ev_ids = err.setdefault("events_id", [])
for i in ilist:
event = events[i]
try:
id = event["ID"]
id = None

Pavel Kácha
committed
ev_ids.append(id)

Pavel Kácha
committed
return errlist

Pavel Kácha
committed
@expose(write=1)
@json_wrapper
def sendEvents(self, events=[]):
if not isinstance(events, list):

Pavel Kácha
committed
raise self.req.error(message="List of events expected.", error=400)

Pavel Kácha
committed
errs = []
if len(events)>self.send_events_limit:

Pavel Kácha
committed
errs.extend(
self.add_event_nums(range(self.send_events_limit, len(events)), events,
[{"error": 507, "message": "Too much events in one batch.",
"send_events_limit": self.send_events_limit}]))
saved = 0
events_tosend = []
events_raw = []
events_nums = []

Pavel Kácha
committed
for i, event in enumerate(events[0:self.send_events_limit]):
v_errs = self.validator.check(event)
if v_errs:

Pavel Kácha
committed
errs.extend(self.add_event_nums([i], events, v_errs))
continue
node_errs = self.check_node(event, self.req.client.name)

Pavel Kácha
committed
if node_errs:

Pavel Kácha
committed
errs.extend(self.add_event_nums([i], events, node_errs))

Pavel Kácha
committed
continue
if self.req.client.test and not 'Test' in event.get('Category', []):

Pavel Kácha
committed
errs.extend(self.add_event_nums([i], events, [{"error": 422,

Pavel Kácha
committed
"message": "You're allowed to send only messages, containing \"Test\" among categories.",

Pavel Kácha
committed
"categories": event.get('Category', [])}]))

Pavel Kácha
committed
raw_event = json.dumps(event)
if len(raw_event) >= self.db.event_size_limit:
errs.extend(self.add_event_nums([i], events, [{"error": 413, "message": "Event too long (>%i B)" % self.event_size_limit}]))

Pavel Kácha
committed
continue
events_tosend.append(event)
events_raw.append(raw_event)
events_nums.append(i)
db_errs = self.db.store_events(self.req.client, events_tosend, events_raw)
if db_errs:
errs.extend(self.add_event_nums(events_nums, events_tosend, db_errs))
saved = 0
else:
saved = len(events_tosend)
self.log.info("Saved %i events" % saved)

Pavel Kácha
committed
if errs:

Pavel Kácha
committed
raise self.req.error(errors=errs)

Pavel Kácha
committed
return {"saved": saved}
def read_ini(path):
c = ConfigParser.RawConfigParser()
res = c.read(path)
if not res or not path in res:
# We don't have loggin yet, hopefully this will go into webserver log

Pavel Kácha
committed
raise Error(message="Unable to read config: %s" % path)
data = {}
for sect in c.sections():
for opts in c.options(sect):
lsect = sect.lower()
if not lsect in data:
data[lsect] = {}
data[lsect][opts] = c.get(sect, opts)
return data
def read_cfg(path):
with open(path, "r") as f:
stripcomments = "\n".join((l for l in f if not l.lstrip().startswith(("#", "//"))))
conf = json.loads(stripcomments)
# Lowercase keys
conf = dict((sect.lower(), dict(
(subkey.lower(), val) for subkey, val in subsect.iteritems())
) for sect, subsect in conf.iteritems())
return conf
def fallback_wsgi(environ, start_response, exc_info=None):
# If server does not start, set up simple server, returning
# Warden JSON compliant error message
error=503
message="Server not running due to initialization error"
headers = [('Content-type', 'application/json')]
logline = "Error(%d): %s" % (error, message)
status = "%d %s" % (error, message)

Pavel Kácha
committed
output = '{"errors": [{"error": %d, "message": "%s"}]}' % (
logging.getLogger(__name__).critical(logline)
start_response(status, headers)
return [output]

Pavel Kácha
committed
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
# Order in which the base objects must get initialized
section_order = ("log", "db", "auth", "validator", "handler", "server")
# List of sections and objects, configured by them
# First object in each object list is the default one, otherwise
# "type" keyword in section may be used to choose other
section_def = {
"log": [FileLogger, SysLogger],
"db": [MySQL],
"auth": [X509Authenticator, PlainAuthenticator, X509NameAuthenticator, X509MixMatchAuthenticator],
"validator": [JSONSchemaValidator, NoValidator],
"handler": [WardenHandler],
"server": [Server]
}
# Object parameter conversions and defaults
param_def = {
FileLogger: {
"req": {"type": "obj", "default": "req"},
"filename": {"type": "filepath", "default": path.join(path.dirname(__file__), path.splitext(path.split(__file__)[1])[0] + ".log")},
"level": {"type": "loglevel", "default": "info"},
},
SysLogger: {
"req": {"type": "obj", "default": "req"},
"socket": {"type": "filepath", "default": "/dev/log"},
"facility": {"type": "facility", "default": "daemon"},
"level": {"type": "loglevel", "default": "info"}
},
PlainAuthenticator: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"db": {"type": "obj", "default": "db"}
},
X509Authenticator: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"db": {"type": "obj", "default": "db"}
},
X509NameAuthenticator: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"db": {"type": "obj", "default": "db"}
},

Pavel Kácha
committed
X509MixMatchAuthenticator: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"db": {"type": "obj", "default": "db"}
},

Pavel Kácha
committed
NoValidator: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
},
JSONSchemaValidator: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "idea.schema")}
},
MySQL: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"host": {"type": "str", "default": "localhost"},
"user": {"type": "str", "default": "warden"},
"password": {"type": "str", "default": ""},
"dbname": {"type": "str", "default": "warden3"},
"port": {"type": "natural", "default": 3306},

Pavel Kácha
committed
"retry_pause": {"type": "natural", "default": 3},

Pavel Kácha
committed
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
"retry_count": {"type": "natural", "default": 3},
"event_size_limit": {"type": "natural", "default": 5*1024*1024},
"catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_mysql.json")},
"tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_mysql.json")}
},
WardenHandler: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"validator": {"type": "obj", "default": "validator"},
"db": {"type": "obj", "default": "DB"},
"auth": {"type": "obj", "default": "auth"},
"send_events_limit": {"type": "natural", "default": 500},
"get_events_limit": {"type": "natural", "default": 1000},
"description": {"type": "str", "default": ""}
},
Server: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"auth": {"type": "obj", "default": "auth"},
"handler": {"type": "obj", "default": "handler"}
}
}
def build_server(conf, section_order=section_order, section_def=section_def, param_def=param_def):
objects = {} # Already initialized objects
# Functions for validation and conversion of config values
def facility(name):
return int(getattr(logging.handlers.SysLogHandler, "LOG_" + name.upper()))
def loglevel(name):
return int(getattr(logging, name.upper()))
def natural(name):
num = int(name)
if num<1:
raise ValueError("Not a natural number")
return num
def filepath(name):
# Make paths relative to dir of this script
return path.join(path.dirname(__file__), name)

Pavel Kácha
committed
def obj(name):
return objects[name.lower()]

Pavel Kácha
committed
# Typedef dictionary
conv_dict = {
"facility": facility,
"loglevel": loglevel,
"natural": natural,
"filepath": filepath,
"obj": obj,
"str": str
}
def init_obj(sect_name):
config = conf.get(sect_name, {})
sect_name = sect_name.lower()
sect_def = section_def[sect_name]
try: # Object type defined?
objtype = config["type"]
del config["type"]
except KeyError: # No, fetch default object type for this section

Pavel Kácha
committed
cls = sect_def[0]
else: # Yes, get corresponding class/callable
names = [o.__name__ for o in sect_def]
try:
idx = names.index(objtype)
except ValueError:
raise KeyError("Unknown type %s in section %s" % (objtype, sect_name))

Pavel Kácha
committed
cls = sect_def[idx]

Pavel Kácha
committed
params = param_def[cls]
# No surplus parameters? Disallow also 'obj' attributes, these are only
# to provide default referenced section
for name in config:

Pavel Kácha
committed
if name not in params or (name in params and params[name]["type"] == "obj"):
raise KeyError("Unknown key %s in section %s" % (name, sect_name))
# Process parameters
kwargs = {}
for name, definition in params.iteritems():
raw_val = config.get(name, definition["default"])
try:

Pavel Kácha
committed
type_callable = conv_dict[definition["type"]]
val = type_callable(raw_val)
except Exception:
raise KeyError("Bad value \"%s\" for %s in section %s" % (raw_val, name, sect_name))
kwargs[name] = val
try:

Pavel Kácha
committed
obj_inst = cls(**kwargs) # run it
except Exception as e:
raise KeyError("Cannot initialize %s from section %s: %s" % (

Pavel Kácha
committed
if isinstance(obj_inst, Object):
# Log only objects here, functions must take care of themselves
objects["log"].info("Initialized %s" % str(obj_inst))

Pavel Kácha
committed
return obj_inst
# Init logging with at least simple stderr StreamLogger
# Dunno if it's ok within wsgi, but we have no other choice, let's
# hope it at least ends up in webserver error log
# Shared container for common data of ongoing WSGI request
objects["req"] = Request()
try:
# Now try to init required objects

Pavel Kácha
committed
for o in section_order:
init_obj(o)
except Exception as e:
objects["log"].critical(str(e))
objects["log"].debug("", exc_info=sys.exc_info())
return objects["server"]
# Command line utilities
def check_config():
# If we got so far, server object got set up fine
print >>sys.stderr, "Looks clear."
return 0
def list_clients(id=None):
clients = server.handler.db.get_clients(id)
lines = [[str(getattr(client, col)) for col in Client._fields] for client in clients]
col_width = [max(len(val) for val in col) for col in zip(*(lines+[Client._fields]))]
divider = ["-" * l for l in col_width]
for line in [Client._fields, divider] + lines:
print " ".join([val.ljust(width) for val, width in zip(line, col_width)])
return 0
def register_client(**kwargs):
# argparse does _always_ return something, so we cannot rely on missing arguments
if kwargs["valid"] is None: kwargs["valid"] = 1
if kwargs["read"] is None: kwargs["read"] = 1
if kwargs["write"] is None: kwargs["write"] = 0
if kwargs["debug"] is None: kwargs["debug"] = 0
if kwargs["test"] is None: kwargs["test"] = 1
return modify_client(id=None, **kwargs)
def modify_client(**kwargs):
def isValidHostname(hostname):
if len(hostname) > 255:
return False
if hostname.endswith("."): # A single trailing dot is legal
hostname = hostname[:-1] # strip exactly one dot from the right, if present
disallowed = re.compile("[^A-Z\d-]", re.IGNORECASE)
return all( # Split by labels and verify individually
(label and len(label) <= 63 # length is within proper range
and not label.startswith("-") and not label.endswith("-") # no bordering hyphens
and not disallowed.search(label)) # contains only legal characters
for label in hostname.split("."))
def isValidNSID(nsid):
allowed = re.compile("^(?:[a-zA-Z_][a-zA-Z0-9_]*\\.)*[a-zA-Z_][a-zA-Z0-9_]*$")
return allowed.match(nsid)
def isValidEmail(mail):
mails = (email.utils.parseaddr(m) for m in mail.split(","))
allowed = re.compile("^[a-zA-Z0-9_.%!+-]+@[a-zA-Z0-9-.]+$") # just basic check
valid = (allowed.match(ms[1]) for ms in mails)
return all(valid)
def isValidID(id):
client = server.handler.db.get_clients(id)
return client and True or False
if kwargs["name"] is not None:
kwargs["name"] = kwargs["name"].lower()
if not isValidNSID(kwargs["name"]):
print >>sys.stderr, "Invalid client name \"%s\"." % kwargs["name"]
return 254
if kwargs["hostname"] is not None:
kwargs["hostname"] = kwargs["hostname"].lower()
if not isValidHostname(kwargs["hostname"]):
print >>sys.stderr, "Invalid hostname \"%s\"." % kwargs["hostname"]
return 253
if kwargs["requestor"] is not None and not isValidEmail(kwargs["requestor"]):
print >>sys.stderr, "Invalid requestor email \"%s\"." % kwargs["requestor"]
return 252
if kwargs["id"] is not None and not isValidID(kwargs["id"]):
print >>sys.stderr, "Invalid id \"%s\"." % kwargs["id"]
return 251
for c in server.handler.db.get_clients():
if kwargs["name"] is not None and kwargs["name"].lower()==c.name:
print >>sys.stderr, "Clash with existing name: %s" % str(c)
return 250
if kwargs["secret"] is not None and kwargs["secret"]==c.secret:
print >>sys.stderr, "Clash with existing secret: %s" % str(c)
return 249
newid = server.handler.db.add_modify_client(**kwargs)
return list_clients(id=newid)
def load_maps():
server.handler.db.load_maps()
return 0
def purge(days=30, lastlog=None, events=None):
if lastlog is None and events is None:
lastlog = events = True
if lastlog:
count = server.handler.db.purge_lastlog(days)
print "Purged %d lastlog entries." % count
if events:
count = server.handler.db.purge_events(days)
print "Purged %d events." % count
return 0
def add_client_args(subargp, mod=False):
subargp.add_argument("--help", action="help", help="show this help message and exit")
if mod:
subargp.add_argument("-i", "--id", required=True, type=int,
help="client id")
subargp.add_argument("-n", "--name", required=not mod,
help="client name (in dotted reverse path notation)")
subargp.add_argument("-h", "--hostname", required=not mod,
help="client FQDN hostname")
subargp.add_argument("-r", "--requestor", required=not mod,
help="requestor email")
subargp.add_argument("-s", "--secret",

Pavel Kácha
committed
help="authentication token (use explicit empty string to disable)")
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
subargp.add_argument("--note",
help="client freetext description")
reg_valid = subargp.add_mutually_exclusive_group(required=False)
reg_valid.add_argument("--valid", action="store_const", const=1, default=None,
help="valid client (default)")
reg_valid.add_argument("--novalid", action="store_const", const=0, dest="valid", default=None)
reg_read = subargp.add_mutually_exclusive_group(required=False)
reg_read.add_argument("--read", action="store_const", const=1, default=None,
help="client is allowed to read (default)")
reg_read.add_argument("--noread", action="store_const", const=0, dest="read", default=None)
reg_write = subargp.add_mutually_exclusive_group(required=False)
reg_write.add_argument("--nowrite", action="store_const", const=0, dest="write", default=None,
help="client is allowed to send (default - no)")
reg_write.add_argument("--write", action="store_const", const=1, default=None)
reg_debug = subargp.add_mutually_exclusive_group(required=False)
reg_debug.add_argument("--nodebug", action="store_const", const=0, dest="debug", default=None,
help="client is allowed receive debug output (default - no)")
reg_debug.add_argument("--debug", action="store_const", const=1, default=None)
reg_test = subargp.add_mutually_exclusive_group(required=False)
reg_test.add_argument("--test", action="store_const", const=1, default=None,
help="client is yet in testing phase (default - yes)")
reg_test.add_argument("--notest", action="store_const", const=0, dest="test", default=None)
def get_args():
import argparse
argp = argparse.ArgumentParser(
description="Warden server " + VERSION, add_help=False)
argp.add_argument("--help", action="help",
help="show this help message and exit")
argp.add_argument("-c", "--config",
help="path to configuration file")
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
subargp = argp.add_subparsers(title="commands")
subargp_check = subargp.add_parser("check", add_help=False,
description="Try to setup server based on configuration file.",
help="check configuration")
subargp_check.set_defaults(command=check_config)
subargp_check.add_argument("--help", action="help",
help="show this help message and exit")
subargp_reg = subargp.add_parser("register", add_help=False,
description="Add new client registration entry.",
help="register new client")
subargp_reg.set_defaults(command=register_client)
add_client_args(subargp_reg)
subargp_mod = subargp.add_parser("modify", add_help=False,
description="Modify details of client registration entry.",
help="modify client registration")
subargp_mod.set_defaults(command=modify_client)
add_client_args(subargp_mod, mod=True)
subargp_list = subargp.add_parser("list", add_help=False,
description="List details of client registration entries.",
help="list registered clients")
subargp_list.set_defaults(command=list_clients)
subargp_list.add_argument("--help", action="help",
help="show this help message and exit")
subargp_list.add_argument("--id", action="store", type=int,
help="client id", default=None)
subargp_purge = subargp.add_parser("purge", add_help=False,
description=
"Purge old events or lastlog records."
" Note that lastlog purge retains at least one newest record for each"
" client, even if it is more than number of 'days' old.",
help="purge old events or lastlog records")
subargp_purge.set_defaults(command=purge)
subargp_purge.add_argument("--help", action="help",
help="show this help message and exit")
subargp_purge.add_argument("-l", "--lastlog", action="store_true", dest="lastlog", default=None,
help="purge lastlog records")
subargp_purge.add_argument("-e", "--events", action="store_true", dest="events", default=None,
help="purge events")
subargp_purge.add_argument("-d", "--days", action="store", dest="days", type=int, default=30,
help="records older than 'days' back from today will get purged")
subargp_loadmaps = subargp.add_parser("loadmaps", add_help=False,
description=
"Load 'categories' and 'tags' table from 'catmap_mysql.json' and 'tagmap_mysql.json'."
" Note that this is NOT needed for server at all, load them into db at will,"
" should you need to run your own specific SQL queries on data directly."
" Note also that previous content of both tables will be lost.",
help="load catmap and tagmap into db")
subargp_loadmaps.set_defaults(command=load_maps)
subargp_loadmaps.add_argument("--help", action="help",
help="show this help message and exit")
args = get_args()
config = path.join(path.dirname(__file__), args.config or "warden_server.cfg")
server = build_server(read_cfg(config))
command = args.command
subargs = vars(args)
del subargs["command"]
del subargs["config"]
if not server or server is fallback_wsgi:
print >>sys.stderr, "Failed initialization, check configured log targets for reasons."
sys.exit(255)
sys.exit(command(**subargs))