Skip to content
Snippets Groups Projects
warden_server.py 56.4 KiB
Newer Older
        info = {
            "version": VERSION,
            "send_events_limit": self.send_events_limit,
            "get_events_limit": self.get_events_limit
        }
        if self.description:
            info["description"] = self.description
        return info


    def getEvents(self, id=None, count=None,
            cat=None, nocat=None,
            tag=None, notag=None,
            group=None, nogroup=None):

        try:
            id = int(id[0])
        except (ValueError, TypeError, IndexError):
            # If client was already here, fetch server notion of his last id
Michal Kostenec's avatar
Michal Kostenec committed
            try:
                id = self.db.getLastReceivedId(self.req.client)
Michal Kostenec's avatar
Michal Kostenec committed
            except Exception, e:
                logging.getLogger(__name__).info("cannot getLastReceivedId - " + type(e).__name__ + ": " + str(e))
            # First access, remember the guy and get him last id
            id = self.db.getLastEventId()
            self.db.insertLastReceivedId(self.req.client, id)
Michal Kostenec's avatar
Michal Kostenec committed
            return {
                "lastid": id,
                "events": []
            }

        if id<=0:
            # Client wants to get only last N events and reset server notion of last id
            id += self.db.getLastEventId()
            if id<0: id=0

            count = int(count[0])
        except (ValueError, TypeError, IndexError):
            count = self.get_events_limit

        if self.get_events_limit:
            count = min(count, self.get_events_limit)

        res = self.db.fetch_events(self.req.client, id, count, cat, nocat, tag, notag, group, nogroup)
        self.db.insertLastReceivedId(self.req.client, res['lastid'])
        logging.getLogger(__name__).info("sending %d events, lastid is %i" % (len(res["events"]), res["lastid"]))
        try:
            ev_id = event['Node'][0]['Name'].lower()
        except (KeyError, TypeError):
            # Event does not bear valid Node attribute
            return [{"error": 422, "message": "Event does not bear valid Node attribute"}]
            return [{"error": 422, "message": "Node does not correspond with saving client"}]
    def add_event_nums(self, ilist, events, errlist):
            err.setdefault("events", []).extend(ilist)
            ev_ids = err.setdefault("events_id", [])
            for i in ilist:
                event = events[i]
                try:
                    id = event["ID"]
                except (AttributeError, TypeError, ValueError):
                    id = None
        if not isinstance(events, list):
            raise self.req.error(message="List of events expected.", error=400)
        if len(events)>self.send_events_limit:
            errs.extend(
                self.add_event_nums(range(self.send_events_limit, len(events)), events,
                    [{"error": 507, "message": "Too much events in one batch.",
                      "send_events_limit": self.send_events_limit}]))
        events_tosend = []
        events_raw = []
        events_nums = []
        for i, event in enumerate(events[0:self.send_events_limit]):
            v_errs = self.validator.check(event)
            if v_errs:
                errs.extend(self.add_event_nums([i], events, v_errs))
            node_errs = self.check_node(event, self.req.client.name)
                errs.extend(self.add_event_nums([i], events, node_errs))
                continue

            if self.req.client.test and not 'Test' in event.get('Category', []):
                errs.extend(self.add_event_nums([i], events, [{"error": 422,
                    "message": "You're allowed to send only messages, containing \"Test\" among categories.",
                    "categories": event.get('Category', [])}]))
Pavel Kácha's avatar
Pavel Kácha committed
                continue
            raw_event = json.dumps(event)
            if len(raw_event) >= self.db.event_size_limit:
                errs.extend(self.add_event_nums([i], events, [{"error": 413, "message": "Event too long (>%i B)" % self.event_size_limit}]))
            events_tosend.append(event)
            events_raw.append(raw_event)
            events_nums.append(i)

        db_errs = self.db.store_events(self.req.client, events_tosend, events_raw)
        if db_errs:
            errs.extend(self.add_event_nums(events_nums, events_tosend, db_errs))
            saved = 0
        else:
            saved = len(events_tosend)
        logging.getLogger(__name__).info("Saved %i events" % saved)



def read_ini(path):
    c = ConfigParser.RawConfigParser()
    res = c.read(path)
    if not res or not path in res:
        # We don't have loggin yet, hopefully this will go into webserver log
        raise Error(message="Unable to read config: %s" % path)
    data = {}
    for sect in c.sections():
        for opts in c.options(sect):
            lsect = sect.lower()
            if not lsect in data:
                data[lsect] = {}
            data[lsect][opts] = c.get(sect, opts)
    return data


def read_cfg(path):
    with open(path, "r") as f:
        stripcomments = "\n".join((l for l in f if not l.lstrip().startswith(("#", "//"))))
        conf = json.loads(stripcomments)

    # Lowercase keys
    conf = dict((sect.lower(), dict(
        (subkey.lower(), val) for subkey, val in subsect.iteritems())
    ) for sect, subsect in conf.iteritems())

    return conf


def fallback_wsgi(environ, start_response, exc_info=None):

    # If server does not start, set up simple server, returning
    # Warden JSON compliant error message
    error=503
    message="Server not running due to initialization error"
    headers = [('Content-type', 'application/json')]

    logline = "Error(%d): %s" % (error, message)
    status = "%d %s" % (error, message)
    output = '{"errors": [{"error": %d, "message": "%s"}]}' % (
    logging.getLogger(__name__).critical(logline)
    start_response(status, headers)
    return [output]


def build_server(conf):

    # Functions for validation and conversion of config values
    def facility(name):
        return int(getattr(logging.handlers.SysLogHandler, "LOG_" + name.upper()))

    def loglevel(name):
        return int(getattr(logging, name.upper()))

    def natural(name):
        num = int(name)
        if num<1:
            raise ValueError("Not a natural number")
        return num

    def filepath(name):
        # Make paths relative to dir of this script
        return path.join(path.dirname(__file__), name)

    def objdef(name):
        return objects[name.lower()]

    obj = objdef    # Draw into local namespace for init_obj

    objects = {}    # Already initialized objects

    # List of sections and objects, configured by them
    # First object in each object list is the default one, otherwise
    # "type" keyword in section may be used to choose other
    section_def = {
        "log": ["FileLogger", "SysLogger"],
Pavel Kácha's avatar
Pavel Kácha committed
        "db": ["MySQL"],
        "auth": ["X509Authenticator", "PlainAuthenticator", "X509NameAuthenticator", "X509MixMatchAuthenticator"],
        "validator": ["JSONSchemaValidator", "NoValidator"],
        "handler": ["WardenHandler"],
        "server": ["Server"]
    }

    # Object parameter conversions and defaults
    param_def = {
        "FileLogger": {
            "req": {"type": obj, "default": "req"},
            "filename": {"type": filepath, "default": path.join(path.dirname(__file__), path.splitext(path.split(__file__)[1])[0] + ".log")},
            "level": {"type": loglevel, "default": "info"},
        },
        "SysLogger": {
            "req": {"type": obj, "default": "req"},
            "socket": {"type": filepath, "default": "/dev/log"},
            "facility": {"type": facility, "default": "daemon"},
            "level": {"type": loglevel, "default": "info"}
        },
        "PlainAuthenticator": {
            "req": {"type": obj, "default": "req"},
            "db": {"type": obj, "default": "db"}
        "X509Authenticator": {
            "req": {"type": obj, "default": "req"},
            "db": {"type": obj, "default": "db"}
        },
        "X509NameAuthenticator": {
            "req": {"type": obj, "default": "req"},
            "db": {"type": obj, "default": "db"}
        },
        "NoValidator": {
            "req": {"type": obj, "default": "req"},
        },
        "JSONSchemaValidator": {
            "req": {"type": obj, "default": "req"},
            "filename": {"type": filepath, "default": path.join(path.dirname(__file__), "idea.schema")}
        },
Pavel Kácha's avatar
Pavel Kácha committed
        "MySQL": {
            "req": {"type": obj, "default": "req"},
Pavel Kácha's avatar
Pavel Kácha committed
            "host": {"type": str, "default": "localhost"},
            "user": {"type": str, "default": "warden"},
            "password": {"type": str, "default": ""},
Michal Kostenec's avatar
Michal Kostenec committed
            "dbname": {"type": str, "default": "warden3"},
            "retry_pause": {"type": natural, "default": 5},
            "retry_count": {"type": natural, "default": 3},
            "event_size_limit": {"type": natural, "default": 5*1024*1024},
            "catmap_filename": {"type": filepath, "default": path.join(path.dirname(__file__), "catmap_mysql.json")},
            "tagmap_filename": {"type": filepath, "default": path.join(path.dirname(__file__), "tagmap_mysql.json")}
Pavel Kácha's avatar
Pavel Kácha committed
        },
        "WardenHandler": {
            "req": {"type": obj, "default": "req"},
            "validator": {"type": obj, "default": "validator"},
            "db": {"type": obj, "default": "DB"},
Michal Kostenec's avatar
Michal Kostenec committed
            "auth": {"type": obj, "default": "auth"},
            "send_events_limit": {"type": natural, "default": 500},
            "get_events_limit": {"type": natural, "default": 1000},
            "description": {"type": str, "default": ""}
        },
        "Server": {
            "req": {"type": obj, "default": "req"},
            "auth": {"type": obj, "default": "auth"},
            "handler": {"type": obj, "default": "handler"}
        }
    }

    def init_obj(sect_name):
        config = conf.get(sect_name, {})
        sect_name = sect_name.lower()
        sect_def = section_def[sect_name]

        try:    # Object type defined?
            objtype = config["type"]
            del config["type"]
        except KeyError:    # No, fetch default object type for this section
            objtype = sect_def[0]
        else:
            if not objtype in sect_def:
                raise KeyError("Unknown type %s in section %s" % (objtype, sect_name))

        params = param_def[objtype]

        # No surplus parameters? Disallow also 'obj' attributes, these are only
        # to provide default referenced section
        for name in config:
            if name not in params or (name in params and params[name]["type"] is objdef):
                raise KeyError("Unknown key %s in section %s" % (name, sect_name))

        # Process parameters
        kwargs = {}
        for name, definition in params.iteritems():
            raw_val = config.get(name, definition["default"])
            try:
                val = definition["type"](raw_val)
            except Exception:
                raise KeyError("Bad value \"%s\" for %s in section %s" % (raw_val, name, sect_name))
            kwargs[name] = val

        cls = globals()[objtype]   # get class/function type
        try:
            obj = cls(**kwargs)         # run it
        except Exception as e:
            raise KeyError("Cannot initialize %s from section %s: %s" % (
                objtype, sect_name, str(e)))

        if isinstance(obj, Object):
            # Log only objects here, functions must take care of themselves
            logging.getLogger(__name__).info("Initialized %s" % str(obj))

        objects[sect_name] = obj
        return obj

    # Init logging with at least simple stderr StreamLogger
    # Dunno if it's ok within wsgi, but we have no other choice, let's
    # hope it at least ends up in webserver error log
    StreamLogger()

    # Shared container for common data of ongoing WSGI request
    objects["req"] = Request()

    try:
        # Now try to init required objects
        for o in ("log", "db", "auth", "validator", "handler", "server"):
            init_obj(o)
    except Exception as e:
        logging.getLogger(__name__).critical(str(e))
        logging.getLogger(__name__).debug("", exc_info=sys.exc_info())
        return fallback_wsgi

    logging.getLogger(__name__).info("Server ready")

# Command line utilities

def check_config():
    # If we got so far, server object got set up fine
    print >>sys.stderr, "Looks clear."
    return 0


def list_clients(id=None):
    clients = server.handler.db.get_clients(id)
    lines = [[str(getattr(client, col)) for col in Client._fields] for client in clients]
    col_width = [max(len(val) for val in col) for col in zip(*(lines+[Client._fields]))]
    divider = ["-" * l for l in col_width]
    for line in [Client._fields, divider] + lines:
        print " ".join([val.ljust(width) for val, width in zip(line, col_width)])


def register_client(**kwargs):
    # argparse does _always_ return something, so we cannot rely on missing arguments
    if kwargs["valid"] is None: kwargs["valid"] = 1
    if kwargs["read"] is None: kwargs["read"] = 1
    if kwargs["write"] is None: kwargs["write"] = 0
    if kwargs["debug"] is None: kwargs["debug"] = 0
    if kwargs["test"] is None: kwargs["test"] = 1
    modify_client(id=None, **kwargs)
def modify_client(**kwargs):

    def isValidHostname(hostname):
        if len(hostname) > 255:
            return False
        if hostname.endswith("."): # A single trailing dot is legal
            hostname = hostname[:-1] # strip exactly one dot from the right, if present
        disallowed = re.compile("[^A-Z\d-]", re.IGNORECASE)
        return all( # Split by labels and verify individually
            (label and len(label) <= 63 # length is within proper range
             and not label.startswith("-") and not label.endswith("-") # no bordering hyphens
             and not disallowed.search(label)) # contains only legal characters
            for label in hostname.split("."))

    def isValidNSID(nsid):
        allowed = re.compile("^(?:[a-zA-Z_][a-zA-Z0-9_]*\\.)*[a-zA-Z_][a-zA-Z0-9_]*$")
        return allowed.match(nsid)

    def isValidEmail(mail):
        mails = (email.utils.parseaddr(m) for m in mail.split(","))
        allowed = re.compile("^[a-zA-Z0-9_.%!+-]+@[a-zA-Z0-9-.]+$") # just basic check
        valid = (allowed.match(ms[1]) for ms in mails)
        return all(valid)

    def isValidID(id):
        client = server.handler.db.get_clients(id)
        return client and True or False


    if kwargs["name"] is not None:
        kwargs["name"] = kwargs["name"].lower()
        if not isValidNSID(kwargs["name"]):
            print >>sys.stderr, "Invalid client name \"%s\"." % kwargs["name"]
            return 254
    if kwargs["hostname"] is not None:
        kwargs["hostname"] = kwargs["hostname"].lower()
        if not isValidHostname(kwargs["hostname"]):
            print >>sys.stderr, "Invalid hostname \"%s\"." % kwargs["hostname"]
            return 254
    if kwargs["requestor"] is not None and not isValidEmail(kwargs["requestor"]):
        print >>sys.stderr, "Invalid requestor email \"%s\"." % kwargs["requestor"]
    if kwargs["id"] is not None and not isValidID(kwargs["id"]):
        print >>sys.stderr, "Invalid id \"%s\"." % kwargs["id"]
    for c in server.handler.db.get_clients():
        if kwargs["name"] is not None and kwargs["name"].lower()==c.name:
            print >>sys.stderr, "Clash with existing name: %s" % str(c)
            return 254
        if kwargs["secret"] is not None and kwargs["secret"]==c.secret:
            print >>sys.stderr, "Clash with existing secret: %s" % str(c)
            return 254
    newid = server.handler.db.add_modify_client(**kwargs)
def purge(days=30, lastlog=None, events=None):
    if lastlog is None and events is None:
        lastlog = events = True
    if lastlog:
        count = server.handler.db.purge_lastlog(days)
        print "Purged %d lastlog entries." % count
    if events:
        count = server.handler.db.purge_events(days)
        print "Purged %d events." % count


def add_client_args(subargp, mod=False):
    subargp.add_argument("--help", action="help", help="show this help message and exit")
    if mod:
        subargp.add_argument("-i", "--id", required=True, type=int,
            help="client id")
    subargp.add_argument("-n", "--name", required=not mod,
        help="client name (in dotted reverse path notation)")
    subargp.add_argument("-h", "--hostname", required=not mod,
        help="client FQDN hostname")
    subargp.add_argument("-r", "--requestor", required=not mod,
        help="requestor email")
    subargp.add_argument("-s", "--secret",
        help="authentication token")
    subargp.add_argument("--note",
        help="client freetext description")

    reg_valid = subargp.add_mutually_exclusive_group(required=False)
    reg_valid.add_argument("--valid", action="store_const", const=1, default=None,
        help="valid client (default)")
    reg_valid.add_argument("--novalid", action="store_const", const=0, dest="valid", default=None)

    reg_read = subargp.add_mutually_exclusive_group(required=False)
    reg_read.add_argument("--read", action="store_const", const=1, default=None,
        help="client is allowed to read (default)")
    reg_read.add_argument("--noread", action="store_const", const=0, dest="read", default=None)

    reg_write = subargp.add_mutually_exclusive_group(required=False)
    reg_write.add_argument("--nowrite", action="store_const", const=0, dest="write", default=None,
        help="client is allowed to send (default - no)")
    reg_write.add_argument("--write", action="store_const", const=1, default=None)

    reg_debug = subargp.add_mutually_exclusive_group(required=False)
    reg_debug.add_argument("--nodebug", action="store_const", const=0, dest="debug", default=None,
        help="client is allowed receive debug output (default - no)")
    reg_debug.add_argument("--debug", action="store_const", const=1, default=None)

    reg_test = subargp.add_mutually_exclusive_group(required=False)
    reg_test.add_argument("--test", action="store_const", const=1, default=None,
        help="client is yet in testing phase (default - yes)")
    reg_test.add_argument("--notest", action="store_const", const=0, dest="test", default=None)


def get_args():
    import argparse
    argp = argparse.ArgumentParser(
        description="Warden server " + VERSION, add_help=False)
    argp.add_argument("--help", action="help",
        help="show this help message and exit")
    argp.add_argument("-c", "--config",
        help="path to configuration file")
    subargp = argp.add_subparsers(title="commands")

    subargp_check = subargp.add_parser("check", add_help=False,
        description="Try to setup server based on configuration file.",
        help="check configuration")
    subargp_check.set_defaults(command=check_config)
    subargp_check.add_argument("--help", action="help",
        help="show this help message and exit")

    subargp_reg = subargp.add_parser("register", add_help=False,
        description="Add new client registration entry.",
        help="register new client")
    subargp_reg.set_defaults(command=register_client)
    add_client_args(subargp_reg)

    subargp_mod = subargp.add_parser("modify", add_help=False,
        description="Modify details of client registration entry.",
        help="modify client registration")
    subargp_mod.set_defaults(command=modify_client)
    add_client_args(subargp_mod, mod=True)

    subargp_list = subargp.add_parser("list", add_help=False,
        description="List details of client registration entries.",
        help="list registered clients")
    subargp_list.set_defaults(command=list_clients)
    subargp_list.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_list.add_argument("--id", action="store", type=int,
        help="client id", default=None)

    subargp_purge = subargp.add_parser("purge", add_help=False,
        description=
            "Purge old events or lastlog records."
            " Note that lastlog purge retains at least one newest record for each"
            " client, even if it is more than number of 'days' old.",
        help="purge old events or lastlog records")
    subargp_purge.set_defaults(command=purge)
    subargp_purge.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_purge.add_argument("-l", "--lastlog", action="store_true", dest="lastlog", default=None,
        help="purge lastlog records")
    subargp_purge.add_argument("-e", "--events", action="store_true", dest="events", default=None,
        help="purge events")
    subargp_purge.add_argument("-d", "--days", action="store", dest="days", type=int, default=30,
        help="records older than 'days' back from today will get purged")

    subargp_loadmaps = subargp.add_parser("loadmaps", add_help=False,
        description=
            "Load 'categories' and 'tags' table from 'catmap_mysql.json' and 'tagmap_mysql.json'."
            " Note that this is NOT needed for server at all, load them into db at will,"
            " should you need to run your own specific SQL queries on data directly."
            " Note also that previous content of both tables will be lost.",
        help="load catmap and tagmap into db")
    subargp_loadmaps.set_defaults(command=load_maps)
    subargp_loadmaps.add_argument("--help", action="help",
        help="show this help message and exit")

    return argp.parse_args()


if __name__=="__main__":
    args = get_args()
    config = path.join(path.dirname(__file__), args.config or "warden_server.cfg")
    server = build_server(read_cfg(config))
    command = args.command
    subargs = vars(args)
    del subargs["command"]
    del subargs["config"]
    if not server or server is fallback_wsgi:
        print >>sys.stderr, "Failed initialization, check configured log targets for reasons."
        sys.exit(255)
    sys.exit(command(**subargs))