Skip to content
Snippets Groups Projects
Commit 93b7cd42 authored by Jakub Maloštík's avatar Jakub Maloštík
Browse files

Add PostgreSQL option

parent b383c6a2
No related branches found
No related tags found
No related merge requests found
...@@ -42,13 +42,13 @@ B. Dependencies ...@@ -42,13 +42,13 @@ B. Dependencies
2. Python modules 2. Python modules
python-mysqldb 5.3.3+ python-mysqldb 5.3.3+ | python-psycopg2 2.8.6+
python-m2crypto 0.20+ python-m2crypto 0.20+
jsonschema 2.4+ jsonschema 2.4+
3. Database 3. Database
MySQL | MariaDB >= 5.5 MySQL | MariaDB >= 5.5 | PostgreSQL >= 13
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
C. Installation C. Installation
...@@ -70,14 +70,28 @@ C. Installation ...@@ -70,14 +70,28 @@ C. Installation
> GRANT ALL ON warden3.* TO `warden`@`localhost`; > GRANT ALL ON warden3.* TO `warden`@`localhost`;
> FLUSH PRIVILEGES; > FLUSH PRIVILEGES;
# psql
> CREATE DATABASE warden3 ENCODING 'UTF-8';
> CREATE ROLE "warden" LOGIN PASSWORD 'example';
> GRANT ALL ON DATABASE "warden3" TO "warden";
* Create necessary table structure * Create necessary table structure
mysql -p -u warden warden3 < warden_3.0_mysql.sql mysql -p -u warden warden3 < warden_3.0_mysql.sql
or
psql -U warden -h localhost warden3 < warden_3.0_postgres.sql
* Get up to date Idea schema * Get up to date Idea schema
wget -O warden_server/idea.schema https://idea.cesnet.cz/_media/en/idea0.schema wget -O warden_server/idea.schema https://idea.cesnet.cz/_media/en/idea0.schema
* Load category and tag maps into database (This step is optional for MySQL dbms)
./warden_server.py loadmaps
* Enable mod_wsgi, mod_ssl, include Warden configuration * Enable mod_wsgi, mod_ssl, include Warden configuration
This depends heavily on your distribution and Apache configuration. This depends heavily on your distribution and Apache configuration.
...@@ -129,7 +143,7 @@ particular implementation object of the aspect, for example type of logger ...@@ -129,7 +143,7 @@ particular implementation object of the aspect, for example type of logger
Sections and their "type" objects can be: Sections and their "type" objects can be:
Log: FileLogger, SysLogger Log: FileLogger, SysLogger
DB: MySQL DB: MySQL, PostgreSQL
Auth: X509Authenticator, X509NameAuthenticator, Auth: X509Authenticator, X509NameAuthenticator,
X509MixMatchAuthenticator,PlainAuthenticator X509MixMatchAuthenticator,PlainAuthenticator
Validator: JSONSchemaValidator, NoValidator Validator: JSONSchemaValidator, NoValidator
...@@ -190,6 +204,20 @@ object from particular section list is used ("FileLogger" for example). ...@@ -190,6 +204,20 @@ object from particular section list is used ("FileLogger" for example).
tagmap_filename": IDEA node type mapping to database ids, defaults to tagmap_filename": IDEA node type mapping to database ids, defaults to
"tagmap_db.json" at installation directory "tagmap_db.json" at installation directory
PostgreSQL: database storage backend
host: database server host, default "localhost"
user: database user, default "warden"
password: database password
dbname: database name, default "warden3"
port: database server port, default 5432
retry_pause: retry in case of database errors, in seconds, defaults to 5
retry_count: number of retries, defaults to 3
event_size_limit: max size of serialized event, defaults to 5 MB
catmap_filename: IDEA category mapping to database ids, defaults to
"catmap_db.json" at installation directory
tagmap_filename": IDEA node type mapping to database ids, defaults to
"tagmap_db.json" at installation directory
WardenHandler: Main Warden RPC worker WardenHandler: Main Warden RPC worker
send_events_limit: max events sent in one bunch, defaults to 10000 send_events_limit: max events sent in one bunch, defaults to 10000
get_events_limit: max events received in one bunch, defaults to 10000 get_events_limit: max events received in one bunch, defaults to 10000
...@@ -286,10 +314,8 @@ warden_server.py purge [--help] [-l] [-e] [-d DAYS] ...@@ -286,10 +314,8 @@ warden_server.py purge [--help] [-l] [-e] [-d DAYS]
warden_server.py loadmaps [--help] warden_server.py loadmaps [--help]
Load 'categories' and 'tags' table from 'catmap_db.json' and Load 'categories' and 'tags' table from 'catmap_db.json' and
'tagmap_db.json'. Note that this is NOT needed for server at all, load 'tagmap_db.json'. Note also that previous content of both tables
them into db at will, should you need to run your own specific SQL queries will be lost.
on data directly. Note also that previous content of both tables will be
lost.
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
Copyright (C) 2011-2015 Cesnet z.s.p.o Copyright (C) 2011-2015 Cesnet z.s.p.o
...@@ -1187,6 +1187,297 @@ class MySQL(DataBaseAPIv2): ...@@ -1187,6 +1187,297 @@ class MySQL(DataBaseAPIv2):
) )
class PostgreSQL(DataBaseAPIv2):
def __init__(
self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename):
super().__init__(req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename)
import psycopg2 as db
from psycopg2 import sql as ppgsql
import psycopg2.extras as ppgextra
self.db = db
self.ppgsql = ppgsql
self.ppgextra = ppgextra
def connect(self):
self.con = self.db.connect(
host=self.host, user=self.user, password=self.password,
dbname=self.dbname, port=self.port, cursor_factory=self.ppgextra.RealDictCursor)
def _build_get_client_by_name(self, cert_names=None, name=None, secret=None):
"""Build query and params for client lookup"""
query = ["SELECT * FROM clients WHERE valid = 1"]
params = []
if name:
query.append(" AND name = %s")
params.append(name.lower())
if secret:
query.append(" AND secret = %s")
params.append(secret)
if cert_names:
query.append(" AND hostname IN (%s)" % self._get_comma_perc(cert_names))
params.extend(n.lower() for n in cert_names)
return ["".join(query)], [params], 0
def _build_get_clients(self, id):
"""Build query and params for client lookup by id"""
query = ["SELECT * FROM clients"]
params = []
if id:
query.append("WHERE id = %s")
params.append(id)
query.append("ORDER BY id")
return [" ".join(query)], [params], 0
def _build_add_modify_client(self, id, **kwargs):
"""Build query and params for adding/modifying client"""
fields = set(Client._fields) - {"id", "registered"}
cols, params = map(
list,
zip(
*(
(k, None) # disable secret
if k == "secret" and v == "" else
(k, v)
for k, v in kwargs.items()
if v is not None and k in fields
)
)
)
if id is None:
query = self.ppgsql.SQL('INSERT INTO clients ("registered", {}) VALUES (NOW(), {}) RETURNING id').format(
self.ppgsql.SQL(", ").join(map(self.ppgsql.Identifier, cols)),
self.ppgsql.SQL(", ").join(self.ppgsql.Placeholder() * len(cols))
)
elif not cols:
return ["SELECT %s AS id"], [(id,)], 0
else:
query = self.ppgsql.SQL("UPDATE clients SET {} WHERE id = {} RETURNING id").format(
self.ppgsql.SQL(", ").join(
self.ppgsql.SQL("{} = {}").format(
self.ppgsql.Identifier(col),
self.ppgsql.Placeholder()
) for col in cols
),
self.ppgsql.Placeholder()
)
params.append(id)
return [query], [params], 0
def _build_get_debug_version(self):
return ["SELECT setting AS version FROM pg_settings WHERE name = 'server_version'"], [()], 0
def _build_get_debug_tablestat(self):
return [
"SELECT "
'tablename AS "Name", '
'relnatts AS "Columns", '
'n_live_tup AS "Rows", '
'pg_catalog.pg_size_pretty(pg_catalog.pg_table_size(oid)) AS "Table_size", '
'pg_catalog.pg_size_pretty(pg_catalog.pg_indexes_size(oid)) AS "Index_size", '
'coll.collations AS "Collations" '
"FROM "
"pg_catalog.pg_tables tbls "
"LEFT OUTER JOIN pg_catalog.pg_class cls "
"ON tbls.tablename=cls.relname "
"LEFT OUTER JOIN pg_catalog.pg_stat_user_tables sut "
"ON tbls.tablename=sut.relname "
"LEFT OUTER JOIN ("
"SELECT "
"table_name, "
"string_agg("
"DISTINCT COALESCE("
"collation_name, "
"("
"SELECT "
"datcollate "
"FROM "
"pg_catalog.pg_database "
"WHERE "
"datname=%s"
")"
"), "
"','"
") AS collations "
"FROM "
"information_schema.columns "
"GROUP BY "
"table_name"
") coll "
"ON tbls.tablename=coll.table_name "
"WHERE "
"tbls.schemaname='public' "
"AND tbls.tableowner=%s"
], [(self.dbname, self.user)], 0
def _load_event_json(self, data):
"""Return decoded json from data loaded from database, if unable to decode, return None"""
try:
return json.loads(data.tobytes())
except Exception:
return None
def _build_fetch_events(
self, client, id, count,
cat, nocat, tag, notag, group, nogroup):
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
params = [id or 0]
if cat or nocat:
cats = self.getMaps(self.catmap, (cat or nocat))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_category_mapping WHERE category_id IN (%s))" %
(self._get_not(cat), self._get_comma_perc(cats))
)
params.extend(cats)
if tag or notag:
tags = self.getMaps(self.tagmap, (tag or notag))
query.append(
" AND e.id %s IN (SELECT event_id FROM event_tag_mapping WHERE tag_id IN (%s))" %
(self._get_not(tag), self._get_comma_perc(tags))
)
params.extend(tags)
if group or nogroup:
subquery = []
for name in group or nogroup:
name = name.lower() # assumes only lowercase names
escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE
subquery.append("c.name = %s") # exact client
params.append(name)
subquery.append("c.name LIKE %s || '.%%' ESCAPE '&'") # whole subtree
params.append(escaped_name)
query.append(" AND %s (%s)" % (self._get_not(group), " OR ".join(subquery)))
query.append(" AND e.valid = 1 LIMIT %s")
params.append(count)
return ["".join(query)], [params], 0
def _build_store_events_event(self, client, event, raw_event):
"""Build query and params for event insertion"""
return (
["INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s) RETURNING id"],
[(client.id, raw_event)],
0
)
def _build_store_events_categories(self, event_id, cat_ids):
"""Build query and params for insertion of event-categories mapping"""
return (
["INSERT INTO event_category_mapping (event_id,category_id) VALUES " +
self._get_comma_perc_n(2, cat_ids)],
[tuple(param for cat_id in cat_ids for param in (event_id, cat_id))],
None
)
def _build_store_events_tags(self, event_id, tag_ids):
"""Build query and params for insertion of event-tags mapping"""
return (
["INSERT INTO event_tag_mapping (event_id,tag_id) VALUES " +
self._get_comma_perc_n(2, tag_ids)],
[tuple(param for tag_id in tag_ids for param in (event_id, tag_id))],
None
)
def _build_insert_last_received_id(self, client, id):
"""Build query and params for insertion of the last event id received by client"""
return (
["INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())"],
[(client.id, None if id == 1 else id)],
None
)
def _build_get_last_event_id(self):
"""Build query and params for querying the id of the last inserted event"""
return ["SELECT MAX(id) as id FROM events"], [()], 0
def _build_get_last_received_id(self, client):
"""Build query and params for querying the last event id received by client"""
return (
["SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1"],
[(client.id,)],
0
)
def _build_load_maps_tags(self):
"""Build query and params for updating the tag map"""
return (
[
"ALTER TABLE event_tag_mapping DROP CONSTRAINT event_tag_mapping_tag_id_fk",
"DELETE FROM tags",
"INSERT INTO tags(id, tag) VALUES " +
self._get_comma_perc_n(2, self.tagmap),
'ALTER TABLE event_tag_mapping ADD CONSTRAINT "event_tag_mapping_tag_id_fk" FOREIGN KEY ("tag_id") REFERENCES "tags" ("id")'
],
[(), (), tuple(param for tag, num in self.tagmap.items() for param in (num, tag)), ()],
None
)
def _build_load_maps_cats(self):
"""Build query and params for updating the catetgory map"""
params = []
for cat_subcat, num in self.catmap.items():
catsplit = cat_subcat.split(".", 1)
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit) > 1 else None
params.extend((num, category, subcategory, cat_subcat))
return (
[
"ALTER TABLE event_category_mapping DROP CONSTRAINT event_category_mapping_category_id_fk",
"DELETE FROM categories",
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES " +
self._get_comma_perc_n(4, self.catmap),
'ALTER TABLE event_category_mapping ADD CONSTRAINT "event_category_mapping_category_id_fk" FOREIGN KEY ("category_id") REFERENCES "categories" ("id")'
],
[(), (), tuple(params), ()],
None
)
def _build_purge_lastlog(self, days):
"""Build query and params for purging stored client last event mapping older than days"""
return (
[
"DELETE FROM last_events "
" USING last_events le LEFT JOIN ("
" SELECT MAX(id) AS last FROM last_events"
" GROUP BY client_id"
" ) AS maxids ON maxids.last=le.id"
" WHERE le.timestamp < CURRENT_DATE - INTERVAL %s DAY AND maxids.last IS NULL"
],
[(str(days),)],
0
)
def _build_purge_events_get_id(self, days):
"""Build query and params to get largest event id of events older than days"""
return (
[
"SELECT MAX(id) as id"
" FROM events"
" WHERE received < CURRENT_DATE - INTERVAL %s DAY"
],
[(str(days),)],
0
)
def _build_purge_events_events(self, id_):
"""Build query and params to remove events older then days and their mappings"""
return ["DELETE FROM events WHERE id <= %s"], [(id_,)], 0
def expose(read=1, write=0, debug=0): def expose(read=1, write=0, debug=0):
def expose_deco(meth): def expose_deco(meth):
...@@ -1576,7 +1867,7 @@ section_order = ("log", "db", "auth", "validator", "handler", "server") ...@@ -1576,7 +1867,7 @@ section_order = ("log", "db", "auth", "validator", "handler", "server")
# "type" keyword in section may be used to choose other # "type" keyword in section may be used to choose other
section_def = { section_def = {
"log": [FileLogger, SysLogger], "log": [FileLogger, SysLogger],
"db": [MySQL], "db": [MySQL, PostgreSQL],
"auth": [X509NameAuthenticator, PlainAuthenticator, X509Authenticator, X509MixMatchAuthenticator], "auth": [X509NameAuthenticator, PlainAuthenticator, X509Authenticator, X509MixMatchAuthenticator],
"validator": [JSONSchemaValidator, NoValidator], "validator": [JSONSchemaValidator, NoValidator],
"handler": [WardenHandler], "handler": [WardenHandler],
...@@ -1639,6 +1930,20 @@ param_def = { ...@@ -1639,6 +1930,20 @@ param_def = {
"catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")}, "catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
"tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")} "tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
}, },
PostgreSQL: {
"req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"},
"host": {"type": "str", "default": "localhost"},
"user": {"type": "str", "default": "warden"},
"password": {"type": "str", "default": ""},
"dbname": {"type": "str", "default": "warden3"},
"port": {"type": "natural", "default": 5432},
"retry_pause": {"type": "natural", "default": 3},
"retry_count": {"type": "natural", "default": 3},
"event_size_limit": {"type": "natural", "default": 5*1024*1024},
"catmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "catmap_db.json")},
"tagmap_filename": {"type": "filepath", "default": path.join(path.dirname(__file__), "tagmap_db.json")}
},
WardenHandler: { WardenHandler: {
"req": {"type": "obj", "default": "req"}, "req": {"type": "obj", "default": "req"},
"log": {"type": "obj", "default": "log"}, "log": {"type": "obj", "default": "log"},
...@@ -1995,8 +2300,6 @@ def get_args(): ...@@ -1995,8 +2300,6 @@ def get_args():
"loadmaps", add_help=False, "loadmaps", add_help=False,
description=( description=(
"Load 'categories' and 'tags' table from 'catmap_db.json' and 'tagmap_db.json'." "Load 'categories' and 'tags' table from 'catmap_db.json' and 'tagmap_db.json'."
" Note that this is NOT needed for server at all, load them into db at will,"
" should you need to run your own specific SQL queries on data directly."
" Note also that previous content of both tables will be lost."), " Note also that previous content of both tables will be lost."),
help="load catmap and tagmap into db") help="load catmap and tagmap into db")
subargp_loadmaps.set_defaults(command=load_maps) subargp_loadmaps.set_defaults(command=load_maps)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment