Skip to content
Snippets Groups Projects
warden_server.py 83.2 KiB
Newer Older
            subquery = []
            for name in (group or nogroup):
                escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%")  # escape for LIKE
                subquery.append("c.name = %s")                 # exact client
                params.append(name)
                subquery.append("c.name LIKE CONCAT(%s, '.%%') ESCAPE '&'")   # whole subtree
                params.append(escaped_name)

            query.append(" AND %s (%s)" %
                (self._get_not(group), " OR ".join(subquery)))

        query.append(" AND e.valid = 1 LIMIT %s")
        params.append(count)

        return ["".join(query)], [params], 0

    def _build_store_events_event(self, client, event, raw_event):
        """Build query and params for event insertion"""
        return (
            [
                "INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)",
                "SELECT LAST_INSERT_ID() AS id"
            ],
            [(client.id, raw_event), ()],
            1
        )

    def _build_store_events_categories(self, event_id, cat_ids):
        """Build query and params for insertion of event-categories mapping"""
        return (
            ["INSERT INTO event_category_mapping (event_id,category_id) VALUES " +
                self._get_comma_perc_n(2, cat_ids)],
            [tuple(param for cat_id in cat_ids for param in (event_id, cat_id))],
            None
        )

    def _build_store_events_tags(self, event_id, tag_ids):
        """Build query and params for insertion of event-tags mapping"""
        return (
            ["INSERT INTO event_tag_mapping (event_id,tag_id) VALUES " +
                self._get_comma_perc_n(2, tag_ids)],
            [tuple(param for tag_id in tag_ids for param in (event_id, tag_id))],
            None
        )

    def _build_insert_last_received_id(self, client, id):
        """Build query and params for insertion of the last event id received by client"""
        return (
            ["INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())"],
            [(client.id, id)],
            None
        )

    def _build_get_last_event_id(self):
        """Build query and params for querying the id of the last inserted event"""
        return ["SELECT MAX(id) as id FROM events"], [()], 0

    def _build_get_last_received_id(self, client):
        """Build query and params for querying the last event id received by client"""
        return (
            ["SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1"],
            [(client.id,)],
            0
        )

    def _build_load_maps_tags(self):
        """Build query and params for updating the tag map"""
        return (
            [
                "DELETE FROM tags",
                "INSERT INTO tags(id, tag) VALUES " +
                    self._get_comma_perc_n(2, self.tagmap)
            ],
            [
                (),
                tuple(param for tag, num in self.tagmap.items() for param in (num, tag))
            ],
            None
        )

    def _build_load_maps_cats(self):
        """Build query and params for updating the catetgory map"""
        params = []
        for cat_subcat, num in self.catmap.items():
            catsplit = cat_subcat.split(".", 1)
            category = catsplit[0]
            subcategory = catsplit[1] if len(catsplit) > 1 else None
            params.extend((num, category, subcategory, cat_subcat))

        return (
            [
                "DELETE FROM categories",
                "INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES " +
                    self._get_comma_perc_n(4, self.catmap)
            ],
            [
                (),
                tuple(params)
            ],
            None
        )

    def _build_purge_lastlog(self, days):
        """Build query and params for purging stored client last event mapping older than days"""
        return (
            [
                "DELETE FROM last_events "
                " USING last_events LEFT JOIN ("
                "    SELECT MAX(id) AS last FROM last_events"
                "    GROUP BY client_id"
                " ) AS maxids ON last=id"
                " WHERE timestamp < DATE_SUB(CURDATE(), INTERVAL %s DAY) AND last IS NULL",
            ],
            [(days,)],
            0
        )

    def _build_purge_events_get_id(self, days):
        """Build query and params to get largest event id of events older than days"""
        return (
            [
                "SELECT MAX(id) as id"
                "  FROM events"
                "  WHERE received < DATE_SUB(CURDATE(), INTERVAL %s DAY)"
            ],
            [(days,)],
            0
        )

    def _build_purge_events_events(self, id_):
        """Build query and params to remove events older then days and their mappings"""
        return (
            [
                "DELETE FROM event_category_mapping WHERE event_id <= %s",
                "DELETE FROM event_tag_mapping WHERE event_id <= %s",
                "DELETE FROM events WHERE id <= %s",
            ],
            [(id_,), (id_,), (id_,)],
            2
        )
class PostgreSQL(DataBase):
Jakub Maloštík's avatar
Jakub Maloštík committed

    def __init__(
            self, req, log, host, user, password, dbname, port, retry_count,
            retry_pause, event_size_limit, catmap_filename, tagmap_filename):

        super(DataBase, self).__init__(req, log, host, user, password, dbname, port, retry_count,
Jakub Maloštík's avatar
Jakub Maloštík committed
            retry_pause, event_size_limit, catmap_filename, tagmap_filename)

        import psycopg2 as db
        from psycopg2 import sql as ppgsql
        import psycopg2.extras as ppgextra
        self.db = db
        self.ppgsql = ppgsql
        self.ppgextra = ppgextra

    def connect(self):
        self.con = self.db.connect(
            host=self.host, user=self.user, password=self.password,
            dbname=self.dbname, port=self.port, cursor_factory=self.ppgextra.RealDictCursor)

    def _build_get_client_by_name(self, cert_names=None, name=None, secret=None):
        """Build query and params for client lookup"""
Jakub Maloštík's avatar
Jakub Maloštík committed
        params = []
        if name:
            query.append(" AND name = %s")
            params.append(name.lower())
        if secret:
            query.append(" AND secret = %s")
            params.append(secret)
        if cert_names:
            query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
            params.extend(n.lower() for n in cert_names)

        return ["".join(query)], [params], 0

    def _build_get_clients(self, id):
        """Build query and params for client lookup by id"""
        query = ["SELECT * FROM clients"]
        params = []
        if id:
            query.append("WHERE id = %s")
            params.append(id)
        query.append("ORDER BY id")

        return [" ".join(query)], [params], 0

    def _build_add_modify_client(self, id, **kwargs):
        """Build query and params for adding/modifying client"""
        fields = set(Client._fields) - {"id", "registered"}
        cols, params = map(
            list,
            zip(
                *(
                    (k, None)  # disable secret
                    if k == "secret" and v == "" else
                    (k, v)
                    for k, v in kwargs.items()
                    if v is not None and k in fields
                )
            )
        )

        if id is None:
            query = self.ppgsql.SQL('INSERT INTO clients ("registered", {}) VALUES (NOW(), {}) RETURNING id').format(
                self.ppgsql.SQL(", ").join(map(self.ppgsql.Identifier, cols)),
                self.ppgsql.SQL(", ").join(self.ppgsql.Placeholder() * len(cols))
            )
        elif not cols:
            return ["SELECT %s AS id"], [(id,)], 0
        else:
            query = self.ppgsql.SQL("UPDATE clients SET {} WHERE id = {} RETURNING id").format(
                self.ppgsql.SQL(", ").join(
                    self.ppgsql.SQL("{} = {}").format(
                        self.ppgsql.Identifier(col),
                        self.ppgsql.Placeholder()
                    ) for col in cols
                ),
                self.ppgsql.Placeholder()
            )
            params.append(id)

        return [query], [params], 0

    def _build_get_debug_version(self):
        return ["SELECT setting AS version FROM pg_settings WHERE name = 'server_version'"], [()], 0

    def _build_get_debug_tablestat(self):
        return [
            "SELECT "
                'tablename AS "Name", '
                'relnatts AS "Columns", '
                'n_live_tup AS "Rows", '
                'pg_catalog.pg_size_pretty(pg_catalog.pg_table_size(oid)) AS "Table_size", '
                'pg_catalog.pg_size_pretty(pg_catalog.pg_indexes_size(oid)) AS "Index_size", '
                'coll.collations AS "Collations" '
            "FROM "
                "pg_catalog.pg_tables tbls "
                "LEFT OUTER JOIN pg_catalog.pg_class cls "
                "ON tbls.tablename=cls.relname "
                "LEFT OUTER JOIN pg_catalog.pg_stat_user_tables sut "
                "ON tbls.tablename=sut.relname "
                "LEFT OUTER JOIN ("
                    "SELECT "
                        "table_name, "
                        "string_agg("
                            "DISTINCT COALESCE("
                                "collation_name, "
                                "("
                                    "SELECT "
                                        "datcollate "
                                    "FROM "
                                        "pg_catalog.pg_database "
                                    "WHERE "
                                        "datname=%s"
                                ")"
                            "), "
                            "','"
                        ") AS collations "
                    "FROM "
                        "information_schema.columns "
                    "GROUP BY "
                        "table_name"
                ") coll "
                "ON tbls.tablename=coll.table_name "
            "WHERE "
                "tbls.schemaname='public' "
                "AND tbls.tableowner=%s"
        ], [(self.dbname, self.user)], 0

    def _load_event_json(self, data):
        """Return decoded json from data loaded from database, if unable to decode, return None"""
        try:
            return json.loads(data.tobytes())
        except Exception:
            return None

    def _build_fetch_events(
            self, client, id, count,
            cat, nocat, tag, notag, group, nogroup):

        query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
        params = [id or 0]

        if cat or nocat:
            cats = self.getMaps(self.catmap, (cat or nocat))
            query.append(
                " AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" %
                (self._get_not(cat), self._get_comma_perc(cats))
            )
            params.extend(cats)

        if tag or notag:
            tags = self.getMaps(self.tagmap, (tag or notag))
            query.append(
                " AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" %
                (self._get_not(tag), self._get_comma_perc(tags))
            )
            params.extend(tags)

        if group or nogroup:
            subquery = []
            for name in group or nogroup:
                name = name.lower()  # assumes only lowercase names
                escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%")  # escape for LIKE
                subquery.append("c.name = %s")                          # exact client
                params.append(name)
                subquery.append("c.name LIKE %s || '.%%' ESCAPE '&'")   # whole subtree
                params.append(escaped_name)

            query.append(" AND %s (%s)" % (self._get_not(group), " OR ".join(subquery)))

Jakub Maloštík's avatar
Jakub Maloštík committed
        params.append(count)

        return ["".join(query)], [params], 0

    def _build_store_events_event(self, client, event, raw_event):
        """Build query and params for event insertion"""
        return (
            ["INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s) RETURNING id"],
            [(client.id, raw_event)],
            0
        )

    def _build_store_events_categories(self, event_id, cat_ids):
        """Build query and params for insertion of event-categories mapping"""
        return (
            ["INSERT INTO event_category_mapping (event_id,category_id) VALUES " +
                self._get_comma_perc_n(2, cat_ids)],
            [tuple(param for cat_id in cat_ids for param in (event_id, cat_id))],
            None
        )

    def _build_store_events_tags(self, event_id, tag_ids):
        """Build query and params for insertion of event-tags mapping"""
        return (
            ["INSERT INTO event_tag_mapping (event_id,tag_id) VALUES " +
                self._get_comma_perc_n(2, tag_ids)],
            [tuple(param for tag_id in tag_ids for param in (event_id, tag_id))],
            None
        )

    def _build_insert_last_received_id(self, client, id):
        """Build query and params for insertion of the last event id received by client"""
        return (
            ["INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())"],
            [(client.id, None if id == 1 else id)],
            None
        )

    def _build_get_last_event_id(self):
        """Build query and params for querying the id of the last inserted event"""
        return ["SELECT MAX(id) as id FROM events"], [()], 0

    def _build_get_last_received_id(self, client):
        """Build query and params for querying the last event id received by client"""
        return (
            ["SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1"],
            [(client.id,)],
            0
        )

    def _build_load_maps_tags(self):
        """Build query and params for updating the tag map"""
        return (
            [
                "ALTER TABLE event_tag_mapping DROP CONSTRAINT event_tag_mapping_tag_id_fk",
                "DELETE FROM tags",
                "INSERT INTO tags(id, tag) VALUES " +
                    self._get_comma_perc_n(2, self.tagmap),
                'ALTER TABLE event_tag_mapping ADD CONSTRAINT "event_tag_mapping_tag_id_fk" FOREIGN KEY ("tag_id") REFERENCES "tags" ("id")'
            ],
            [(), (), tuple(param for tag, num in self.tagmap.items() for param in (num, tag)), ()],
            None
        )

    def _build_load_maps_cats(self):
        """Build query and params for updating the catetgory map"""
        params = []
        for cat_subcat, num in self.catmap.items():
            catsplit = cat_subcat.split(".", 1)
            category = catsplit[0]
            subcategory = catsplit[1] if len(catsplit) > 1 else None
            params.extend((num, category, subcategory, cat_subcat))

        return (
            [
                "ALTER TABLE event_category_mapping DROP CONSTRAINT event_category_mapping_category_id_fk",
                "DELETE FROM categories",
                "INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES " +
                    self._get_comma_perc_n(4, self.catmap),
                'ALTER TABLE event_category_mapping ADD CONSTRAINT "event_category_mapping_category_id_fk" FOREIGN KEY ("category_id") REFERENCES "categories" ("id")'
            ],
            [(), (), tuple(params), ()],
            None
        )

    def _build_purge_lastlog(self, days):
        """Build query and params for purging stored client last event mapping older than days"""
        return (
            [
                "DELETE FROM last_events "
                " USING last_events le LEFT JOIN ("
                "    SELECT MAX(id) AS last FROM last_events"
                "    GROUP BY client_id"
                " ) AS maxids ON maxids.last=le.id"
                " WHERE le.timestamp < CURRENT_DATE - INTERVAL %s DAY AND maxids.last IS NULL"
            ],
            [(str(days),)],
            0
        )

    def _build_purge_events_get_id(self, days):
        """Build query and params to get largest event id of events older than days"""
        return (
            [
                "SELECT MAX(id) as id"
                "  FROM events"
                "  WHERE received < CURRENT_DATE - INTERVAL %s DAY"
            ],
            [(str(days),)],
            0
        )

    def _build_purge_events_events(self, id_):
        """Build query and params to remove events older then days and their mappings"""
        return ["DELETE FROM events WHERE id <= %s"], [(id_,)], 0



    def expose_deco(meth):
        meth.exposed = True
        meth.read = read
        meth.write = write
        meth.debug = debug
        if not hasattr(meth, "arguments"):
            meth.arguments = get_method_params(meth)
    def __init__(self, req, log, auth, handler):
        ObjectBase.__init__(self, req, log)
        self.auth = auth
        self.handler = handler

    def sanitize_args(self, path, func, args, exclude=["self", "post"]):
        # silently remove internal args, these should never be used
        # but if somebody does, we do not expose them by error message
        intargs = set(args).intersection(exclude)
        for a in intargs:
            del args[a]
        if intargs:
            self.log.info("sanitize_args: Called with internal args: %s" % ", ".join(intargs))

        # silently remove surplus arguments - potential forward
        # compatibility (unknown args will get ignored)
        badargs = set(args) - set(func.arguments)
        for a in badargs:
            del args[a]
        if badargs:
            self.log.info("sanitize_args: Called with superfluous args: %s" % ", ".join(badargs))

        return args

    def wsgi_app(self, environ, start_response, exc_info=None):
        path = environ.get("PATH_INFO", "").lstrip("/")
        self.req.reset(env=environ, path=path)
        output = ""
        status = "200 OK"
        headers = [('Content-type', 'application/json')]
        exception = None

        try:
            try:
                method = getattr(self.handler, path)
                method.exposed    # dummy access to trigger AttributeError
            except Exception:
Pavel Kácha's avatar
Pavel Kácha committed
                raise self.req.error(message="You've fallen off the cliff.", error=404)
            self.req.args = args = parse_qs(environ.get('QUERY_STRING', ""))

            self.req.client = client = self.auth.authenticate(environ, args)
            if not client:
                raise self.req.error(message="I'm watching. Authenticate.", error=403)
            auth = self.auth.authorize(self.req.env, self.req.client, self.req.path, method)
            if not auth:
                raise self.req.error(message="I'm watching. Not authorized.", error=403, client=client.name)
            args = self.sanitize_args(path, method, args)

            # Based on RFC2616, section 4.4 we SHOULD respond with 400 (bad request) or 411
            # (length required) if content length was not specified. We choose not to, to
            # preserve compatibility with clients deployed in the wild, which use POST for
            # all requests (even those without payload, with no specified content length).
            # According to PEP3333, section "Input and Error Streams", the application SHOULD
            # NOT attempt to read more data than specified by CONTENT_LENGTH. As stated in
            # section "environ Variables", CONTENT_LENGTH may be empty (string) or absent.
                content_length = int(environ.get('CONTENT_LENGTH', 0))
            except ValueError:
                content_length = 0

            try:
                post_data = environ['wsgi.input'].read(content_length)
            except:
                raise self.req.error(message="Data read error.", error=408, exc=sys.exc_info())

            headers, output = method(post_data, **args)

        except Error as e:
            exception = e
        except Exception as e:
            exception = self.req.error(message="Server exception", error=500, exc=sys.exc_info())
            status = "%d %s" % exception.get_http_err_msg()
            output = json.dumps(exception.to_dict(), default=lambda v: str(v))
            exception.log(self.log)

        # Make sure everything is properly encoded - JSON and various function
        # may spit out unicode instead of str and it gets propagated up (str
        # + unicode = unicode).
        # For Python2 the right thing would be to be unicode correct among whole
        # source and always decode on input (json module does that for us) and
        # on output here.
        # For Python3 strings are internally unicode so no decoding on input is
        # necessary. For output, "status" must be unicode string, "output" must
        # be encoded bytes array, what is done here. Important: for Python 3 we
        # define: unicode = str
        if isinstance(status, unicode) and sys.version_info[0] < 3:
            status = status.encode("utf-8")
        if isinstance(output, unicode):
            output = output.encode("utf-8")
        headers.append(('Content-Length', str(len(output))))
        start_response(status, headers)
        return [output]

    __call__ = wsgi_app


def json_wrapper(method):
    def meth_deco(self, post, **args):
        if "events" in get_method_params(method):
                events = json.loads(post.decode('utf-8')) if post else None
Pavel Kácha's avatar
Pavel Kácha committed
                raise self.req.error(
                    message="Deserialization error.", error=400,
                    exc=sys.exc_info(), args=post, parser=str(e))
            if events:
                args["events"] = events

        result = method(self, **args)   # call requested method

        try:
            # 'default': takes care of non JSON serializable objects,
            # which could (although shouldn't) appear in handler code
            output = json.dumps(result, default=lambda v: str(v))
        except Exception as e:
Pavel Kácha's avatar
Pavel Kácha committed
            raise self.req.error(message="Serialization error", error=500, exc=sys.exc_info(), args=str(result))

        return [('Content-type', 'application/json')], output

    try:
        meth_deco.arguments = method.arguments
    except AttributeError:
        meth_deco.arguments = get_method_params(method)
Pavel Kácha's avatar
Pavel Kácha committed
    def __init__(
            self, req, log, validator, db, auth,
            send_events_limit=500, get_events_limit=1000,
        ObjectBase.__init__(self, req, log)
Michal Kostenec's avatar
Michal Kostenec committed
        self.auth = auth
        self.db = db
        self.validator = validator
        self.send_events_limit = send_events_limit
        self.get_events_limit = get_events_limit
        self.description = description

Pavel Kácha's avatar
Pavel Kácha committed
        return {
            "client": self.req.client._asdict(),
            "database": self.db.get_debug(),
            "system": {
                "python": sys.version,
                "uname": os.uname()
            },
            "process": {
                "cwd": unicode(os.getcwd()),
                "pid": os.getpid(),
                "ppid": os.getppid(),
                "pgrp": os.getpgrp(),
                "uid": os.getuid(),
                "gid": os.getgid(),
                "euid": os.geteuid(),
                "egid": os.getegid(),
                "groups": os.getgroups()
            }
Pavel Kácha's avatar
Pavel Kácha committed
        }
        info = {
            "version": VERSION,
            "send_events_limit": self.send_events_limit,
            "get_events_limit": self.get_events_limit
        }
        if self.description:
            info["description"] = self.description
        return info

Pavel Kácha's avatar
Pavel Kácha committed
    def getEvents(
            self, id=None, count=None,
            cat=None, nocat=None,
            tag=None, notag=None,
            group=None, nogroup=None):

        try:
            id = int(id[0])
        except (ValueError, TypeError, IndexError):
            # If client was already here, fetch server notion of his last id
Michal Kostenec's avatar
Michal Kostenec committed
            try:
                id = self.db.getLastReceivedId(self.req.client)
            except Exception as e:
                self.log.info("cannot getLastReceivedId - " + type(e).__name__ + ": " + str(e))
Pavel Kácha's avatar
Pavel Kácha committed

            # First access, remember the guy and get him last id
            id = self.db.getLastEventId()
            self.db.insertLastReceivedId(self.req.client, id)
Michal Kostenec's avatar
Michal Kostenec committed
            return {
                "lastid": id,
                "events": []
            }

Pavel Kácha's avatar
Pavel Kácha committed
        if id <= 0:
            # Client wants to get only last N events and reset server notion of last id
            id += self.db.getLastEventId()
Pavel Kácha's avatar
Pavel Kácha committed
            if id < 0: id = 0
            count = int(count[0])
        except (ValueError, TypeError, IndexError):
            count = self.get_events_limit

        if self.get_events_limit:
            count = min(count, self.get_events_limit)
        count = max(0, count)
        res = self.db.fetch_events(self.req.client, id, count, cat, nocat, tag, notag, group, nogroup)
        self.db.insertLastReceivedId(self.req.client, res['lastid'])
        self.log.info("sending %d events, lastid is %i" % (len(res["events"]), res["lastid"]))
        try:
            ev_id = event['Node'][0]['Name'].lower()
Pavel Kácha's avatar
Pavel Kácha committed
        except (KeyError, TypeError, IndexError):
            # Event does not bear valid Node attribute
            return [{"error": 422, "message": "Event does not bear valid Node attribute"}]
            return [{"error": 422, "message": "Node does not correspond with saving client"}]
    def add_event_nums(self, ilist, events, errlist):
            err.setdefault("events", []).extend(ilist)
            ev_ids = err.setdefault("events_id", [])
            for i in ilist:
                event = events[i]
Pavel Kácha's avatar
Pavel Kácha committed
                except (KeyError, TypeError, ValueError):
        if not isinstance(events, list):
            raise self.req.error(message="List of events expected.", error=400)
Pavel Kácha's avatar
Pavel Kácha committed
        if len(events) > self.send_events_limit:
            errs.extend(self.add_event_nums(range(self.send_events_limit, len(events)), events, [
                {"error": 507, "message": "Too much events in one batch.", "send_events_limit": self.send_events_limit}]))
        events_tosend = []
        events_raw = []
        events_nums = []
        for i, event in enumerate(events[0:self.send_events_limit]):
            v_errs = self.validator.check(event)
            if v_errs:
                errs.extend(self.add_event_nums([i], events, v_errs))
            node_errs = self.check_node(event, self.req.client.name)
                errs.extend(self.add_event_nums([i], events, node_errs))
            if self.req.client.test and 'Test' not in event.get('Category', []):
Pavel Kácha's avatar
Pavel Kácha committed
                errs.extend(
                    self.add_event_nums([i], events, [{
                        "error": 422,
                        "message": "You're allowed to send only messages, containing \"Test\" among categories.",
                        "categories": event.get('Category', [])}]))
Pavel Kácha's avatar
Pavel Kácha committed
                continue
            raw_event = json.dumps(event)
            if len(raw_event) >= self.db.event_size_limit:
Pavel Kácha's avatar
Pavel Kácha committed
                errs.extend(
                    self.add_event_nums([i], events, [
                        {"error": 413, "message": "Event too long (>%i B)" % self.db.event_size_limit}
                    ]))
            events_tosend.append(event)
            events_raw.append(raw_event)
            events_nums.append(i)

        db_errs = self.db.store_events(self.req.client, events_tosend, events_raw)
        if db_errs:
            errs.extend(self.add_event_nums(events_nums, events_tosend, db_errs))
            saved = 0
        else:
            saved = len(events_tosend)
        self.log.info("Saved %i events" % saved)


def read_ini(path):
    c = ConfigParser.RawConfigParser()
    res = c.read(path)
    if not res or path not in res:
        # We don't have loggin yet, hopefully this will go into webserver log
        raise Error(message="Unable to read config: %s" % path)
    data = {}
    for sect in c.sections():
        for opts in c.options(sect):
            lsect = sect.lower()
            if lsect not in data:
                data[lsect] = {}
            data[lsect][opts] = c.get(sect, opts)
    return data


def read_cfg(path):
    with io.open(path, "r", encoding="utf-8") as f:
        stripcomments = "\n".join((l for l in f if not l.lstrip().startswith(("#", "//"))))
        conf = json.loads(stripcomments)

    # Lowercase keys
Pavel Kácha's avatar
Pavel Kácha committed
    conf = dict((
        sect.lower(), dict(
            (subkey.lower(), val) for subkey, val in subsect.items())
    ) for sect, subsect in conf.items())

    return conf


def fallback_wsgi(environ, start_response, exc_info=None):

    # If server does not start, set up simple server, returning
    # Warden JSON compliant error message
Pavel Kácha's avatar
Pavel Kácha committed
    error = 503
    message = "Server not running due to initialization error"
    headers = [('Content-type', 'application/json')]

    logline = "Error(%d): %s" % (error, message)
    status = "%d %s" % (error, message)
    output = '{"errors": [{"error": %d, "message": "%s"}]}' % (
    logging.getLogger(__name__).critical(logline)
    start_response(status, headers)
    return [output]


# Order in which the base objects must get initialized
section_order = ("log", "db", "auth", "validator", "handler", "server")

# List of sections and objects, configured by them
# First object in each object list is the default one, otherwise
# "type" keyword in section may be used to choose other
section_def = {
    "log": [FileLogger, SysLogger],
Jakub Maloštík's avatar
Jakub Maloštík committed
    "db": [MySQL, PostgreSQL],
    "auth": [X509NameAuthenticator, PlainAuthenticator, X509Authenticator, X509MixMatchAuthenticator],
    "validator": [JSONSchemaValidator, NoValidator],
    "handler": [WardenHandler],
    "server": [Server]
}

# Object parameter conversions and defaults
param_def = {
    FileLogger: {
        "req": {"type": "obj", "default": "req"},
        "filename": {"type": "filepath", "default": path.join(path.dirname(__file__), path.splitext(path.split(__file__)[1])[0] + ".log")},
        "level": {"type": "loglevel", "default": "info"},
    },
    SysLogger: {
        "req": {"type": "obj", "default": "req"},
        "socket": {"type": "filepath", "default": "/dev/log"},
        "facility": {"type": "facility", "default": "daemon"},
        "level": {"type": "loglevel", "default": "info"}
    },
    PlainAuthenticator: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"},
        "db": {"type": "obj", "default": "db"}
    },
    X509Authenticator: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"},
        "db": {"type": "obj", "default": "db"}
    },
    X509NameAuthenticator: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"},
        "db": {"type": "obj", "default": "db"}
    },
    X509MixMatchAuthenticator: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"},
        "db": {"type": "obj", "default": "db"}
    },
    NoValidator: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"},
    },
    JSONSchemaValidator: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"},
        "filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "idea.schema")}
    },
    MySQL: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"},
        "host": {"type": "str", "default": "localhost"},
        "user": {"type": "str", "default": "warden"},
        "password": {"type": "str", "default": ""},
        "dbname": {"type": "str", "default": "warden3"},
        "port": {"type": "natural", "default": 3306},
        "retry_pause": {"type": "natural", "default": 3},
        "retry_count": {"type": "natural", "default": 3},
        "event_size_limit": {"type": "natural", "default": 5*1024*1024},
        "catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
        "tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
Jakub Maloštík's avatar
Jakub Maloštík committed
    PostgreSQL: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"},
        "host": {"type": "str", "default": "localhost"},
        "user": {"type": "str", "default": "warden"},
        "password": {"type": "str", "default": ""},
        "dbname": {"type": "str", "default": "warden3"},
        "port": {"type": "natural", "default": 5432},
        "retry_pause": {"type": "natural", "default": 3},
        "retry_count": {"type": "natural", "default": 3},
        "event_size_limit": {"type": "natural", "default": 5*1024*1024},
        "catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
        "tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
    },
    WardenHandler: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"},
        "validator": {"type": "obj", "default": "validator"},
        "db": {"type": "obj", "default": "DB"},
        "auth": {"type": "obj", "default": "auth"},
        "send_events_limit": {"type": "natural", "default": 500},
        "get_events_limit": {"type": "natural", "default": 1000},
        "description": {"type": "str", "default": ""}
    },
    Server: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"},
        "auth": {"type": "obj", "default": "auth"},
        "handler": {"type": "obj", "default": "handler"}
    }
}


def build_server(conf, section_order=section_order, section_def=section_def, param_def=param_def):

    objects = {}    # Already initialized objects

    # Functions for validation and conversion of config values
    def facility(name):
        return int(getattr(logging.handlers.SysLogHandler, "LOG_" + name.upper()))

    def loglevel(name):
        return int(getattr(logging, name.upper()))

    def natural(name):
        num = int(name)
Pavel Kácha's avatar
Pavel Kácha committed
        if num < 1:
            raise ValueError("Not a natural number")
        return num

    def filepath(name):
        # Make paths relative to dir of this script
        return path.join(path.dirname(__file__), name)

        return objects[name.lower()]

    # Typedef dictionary
    conv_dict = {
        "facility": facility,
        "loglevel": loglevel,
        "natural": natural,
        "filepath": filepath,
        "obj": obj,
        "str": str
    }

    def init_obj(sect_name):
        config = dict(conf.get(sect_name, {}))
        sect_name = sect_name.lower()
        sect_def = section_def[sect_name]

        try:    # Object type defined?
            objtype = config["type"]
            del config["type"]
        except KeyError:    # No, fetch default object type for this section
Pavel Kácha's avatar
Pavel Kácha committed
        else:  # Yes, get corresponding class/callable
            names = [o.__name__ for o in sect_def]
            try:
                idx = names.index(objtype)
            except ValueError:
                raise KeyError("Unknown type %s in section %s" % (objtype, sect_name))

        # No surplus parameters? Disallow also 'obj' attributes, these are only
        # to provide default referenced section
        for name in config:
            if name not in params or (name in params and params[name]["type"] == "obj"):
                raise KeyError("Unknown key %s in section %s" % (name, sect_name))

        # Process parameters
        kwargs = {}
        for name, definition in params.items():
            raw_val = config.get(name, definition["default"])
            try:
                type_callable = conv_dict[definition["type"]]
                val = type_callable(raw_val)
            except Exception:
                raise KeyError("Bad value \"%s\" for %s in section %s" % (raw_val, name, sect_name))
            kwargs[name] = val

        try:
        except Exception as e:
            raise KeyError("Cannot initialize %s from section %s: %s" % (
                cls.__name__, sect_name, str(e)))
Pavel Kácha's avatar
Pavel Kácha committed
        objects[sect_name] = obj_inst
            # Log only objects here, functions must take care of themselves
            objects["log"].info("Initialized %s" % str(obj_inst))