Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Pavel.Valach/warden
1 result
Show changes
......@@ -4,24 +4,39 @@
# Copyright (C) 2011-2015 Cesnet z.s.p.o
# Use of this source is governed by a 3-clause BSD-style license, see LICENSE file.
from __future__ import print_function
import sys
import os
import io
from os import path
import logging
import logging.handlers
import ConfigParser
from traceback import format_tb
import M2Crypto.X509
import json
import MySQLdb as my
import MySQLdb.cursors as mycursors
import re
import email.utils
from traceback import format_tb
from collections import namedtuple
from time import sleep
from urlparse import parse_qs
from os import path
from random import randint
from contextlib import closing
import M2Crypto.X509
import MySQLdb as my
import MySQLdb.cursors as mycursors
if sys.version_info[0] >= 3:
import configparser as ConfigParser
from urllib.parse import parse_qs
unicode = str
def get_method_params(method):
return method.__code__.co_varnames[:method.__code__.co_argcount]
else:
import ConfigParser
from urlparse import parse_qs
def get_method_params(method):
return method.func_code.co_varnames[:method.func_code.co_argcount]
# for local version of up to date jsonschema
sys.path.append(path.join(path.dirname(__file__), "..", "lib"))
......@@ -29,7 +44,8 @@ sys.path.append(path.join(path.dirname(__file__), "..", "lib"))
from jsonschema import Draft4Validator
VERSION = "3.0-beta2"
VERSION = "3.0-beta3"
class Error(Exception):
......@@ -40,11 +56,9 @@ class Error(Exception):
if errors:
self.errors.extend(errors)
def append(self, _events=None, **kwargs):
self.errors.append(kwargs)
def get_http_err_msg(self):
try:
err = self.errors[0]["error"]
......@@ -63,11 +77,9 @@ class Error(Exception):
msg = "Multiple errors"
return err, msg
def __str__(self):
return "\n".join(self.str_err(e) for e in self.errors)
def log(self, logger, prio=logging.ERROR):
for e in self.errors:
logger.log(prio, self.str_err(e))
......@@ -78,7 +90,6 @@ class Error(Exception):
if debug:
logger.debug(debug)
def str_err(self, e):
out = []
out.append("Error(%s) %s " % (e.get("error", 0), e.get("message", "Unknown error")))
......@@ -86,7 +97,6 @@ class Error(Exception):
out.append("(cause was %s: %s)" % (e["exc"][0].__name__, str(e["exc"][1])))
return "".join(out)
def str_info(self, e):
ecopy = dict(e) # shallow copy
ecopy.pop("req_id", None)
......@@ -100,10 +110,9 @@ class Error(Exception):
out = ""
return out
def str_debug(self, e):
out = []
if not "exc" in e or not e["exc"]:
if not e.get("exc"):
return ""
exc_tb = e["exc"][2]
if exc_tb:
......@@ -111,7 +120,6 @@ class Error(Exception):
out.extend(format_tb(exc_tb))
return "".join(out)
def to_dict(self):
errlist = []
for e in self.errors:
......@@ -126,7 +134,6 @@ class Error(Exception):
return d
def get_clean_root_logger(level=logging.INFO):
""" Attempts to get logging module into clean slate state """
......@@ -141,6 +148,7 @@ def get_clean_root_logger(level=logging.INFO):
logger = logging.getLogger(__name__)
logger.setLevel(level)
while logger.handlers:
logger.handlers[0].close()
logger.removeHandler(logger.handlers[0])
while logger.filters:
logger.removeFilter(logger.filters[0])
......@@ -148,7 +156,6 @@ def get_clean_root_logger(level=logging.INFO):
return logger
def StreamLogger(stream=sys.stderr, level=logging.DEBUG):
""" Fallback handler just for setup, not meant to be used from
configuration file because during wsgi query stdout/stderr
......@@ -163,7 +170,6 @@ def StreamLogger(stream=sys.stderr, level=logging.DEBUG):
return logger
class LogRequestFilter(logging.Filter):
""" Filter class, instance of which is added to logger class to add
info about request automatically into every logline, no matter
......@@ -174,7 +180,6 @@ class LogRequestFilter(logging.Filter):
logging.Filter.__init__(self)
self.req = req
def filter(self, record):
if self.req.env:
record.req_preamble = "%08x/%s: " % (self.req.req_id or 0, self.req.path)
......@@ -183,7 +188,6 @@ class LogRequestFilter(logging.Filter):
return True
def FileLogger(req, filename, level=logging.INFO):
fhand = logging.FileHandler(filename)
......@@ -193,11 +197,10 @@ def FileLogger(req, filename, level=logging.INFO):
logger = get_clean_root_logger(level)
logger.addFilter(ffilt)
logger.addHandler(fhand)
logger.info("Initialized FileLogger(req=%s, filename=\"%s\", level=\"%d\")" % (type(req).__name__, filename, level))
logger.info("Initialized FileLogger(req=%r, filename=\"%s\", level=%s)" % (req, filename, level))
return logger
def SysLogger(req, socket="/dev/log", facility=logging.handlers.SysLogHandler.LOG_DAEMON, level=logging.INFO):
fhand = logging.handlers.SysLogHandler(address=socket, facility=facility)
......@@ -207,21 +210,21 @@ def SysLogger(req, socket="/dev/log", facility=logging.handlers.SysLogHandler.LO
logger = get_clean_root_logger(level)
logger.addFilter(ffilt)
logger.addHandler(fhand)
logger.info("Initialized SysLogger(req=%s, socket=\"%s\", facility=\"%d\", level=\"%d\")" % (type(req).__name__, socket, facility, level))
logger.info("Initialized SysLogger(req=%r, socket=\"%s\", facility=\"%d\", level=%s)" % (req, socket, facility, level))
return logger
Client = namedtuple("Client",
["id", "registered", "requestor", "hostname", "name",
Client = namedtuple("Client", [
"id", "registered", "requestor", "hostname", "name",
"secret", "valid", "read", "debug", "write", "test", "note"])
class Object(object):
def __str__(self):
return "%s()" % type(self).__name__
attrs = get_method_params(self.__init__)[1:]
eq_str = ["%s=%r" % (attr, getattr(self, attr, None)) for attr in attrs]
return "%s(%s)" % (type(self).__name__, ", ".join(eq_str))
class Request(Object):
......@@ -241,15 +244,6 @@ class Request(Object):
which data their main codepaths work with.
"""
def __init__(self):
Object.__init__(self)
self.reset()
def __str__(self):
return "%s()" % (type(self).__name__, str(self.env), str(self.client))
def reset(self, env=None, client=None, path=None, req_id=None):
self.env = env
self.client = client
......@@ -259,12 +253,12 @@ class Request(Object):
else:
self.req_id = 0 if env is None else randint(0x00000000, 0xFFFFFFFF)
__init__ = reset
def error(self, **kwargs):
return Error(self.path, self.req_id, **kwargs)
class ObjectBase(Object):
def __init__(self, req, log):
......@@ -273,23 +267,13 @@ class ObjectBase(Object):
self.log = log
def __str__(self):
return "%s(req=%s)" % (type(self).__name__, type(self.req).__name__)
class PlainAuthenticator(ObjectBase):
def __init__(self, req, log, db):
ObjectBase.__init__(self, req, log)
self.db = db
def __str__(self):
return "%s(req=%s, db=%s)" % (type(self).__name__, type(self.req).__name__, type(self.db).__name__)
def authenticate(self, env, args, hostnames = None, check_secret = True):
def authenticate(self, env, args, hostnames=None, check_secret=True):
name = args.get("client", [None])[0]
secret = args.get("secret", [None])[0] if check_secret else None
......@@ -314,7 +298,6 @@ class PlainAuthenticator(ObjectBase):
return client
def authorize(self, env, client, path, method):
if method.debug:
if not client.debug:
......@@ -346,9 +329,9 @@ class X509Authenticator(PlainAuthenticator):
commons = [n.get_data().as_text() for n in subj.get_entries_by_nid(subj.nid["CN"])]
try:
extstrs = cert.get_ext("subjectAltName").get_value().split(",")
extstrs = cert.get_ext("subjectAltName").get_value().split(",")
except LookupError:
extstrs = []
extstrs = []
extstrs = [val.strip() for val in extstrs]
altnames = [val[4:] for val in extstrs if val.startswith("DNS:")]
......@@ -356,17 +339,17 @@ class X509Authenticator(PlainAuthenticator):
firstcommon = commons[0]
return [firstcommon] + list(set(altnames+commons) - set([firstcommon]))
def is_verified_by_apache(self, env, args):
# Allows correct work while SSLVerifyClient both "optional" and "required"
verify = env.get("SSL_CLIENT_VERIFY")
if verify == "SUCCESS":
return True
exception = self.req.error(message="authenticate: certificate verification failed", error=403, args = args, ssl_client_verify=verify, cert=env.get("SSL_CLIENT_CERT"))
exception = self.req.error(
message="authenticate: certificate verification failed",
error=403, args=args, ssl_client_verify=verify, cert=env.get("SSL_CLIENT_CERT"))
exception.log(self.log)
return False
def authenticate(self, env, args):
if not self.is_verified_by_apache(env, args):
return None
......@@ -374,12 +357,14 @@ class X509Authenticator(PlainAuthenticator):
try:
cert_names = self.get_cert_dns_names(env["SSL_CLIENT_CERT"])
except:
exception = self.req.error(message="authenticate: cannot get or parse certificate from env", error=403, exc=sys.exc_info(), env=env)
exception = self.req.error(
message="authenticate: cannot get or parse certificate from env",
error=403, exc=sys.exc_info(), env=env)
exception.log(self.log)
return None
return PlainAuthenticator.authenticate(self, env, args, hostnames = cert_names)
return PlainAuthenticator.authenticate(self, env, args, hostnames=cert_names)
class X509NameAuthenticator(X509Authenticator):
......@@ -390,16 +375,20 @@ class X509NameAuthenticator(X509Authenticator):
try:
cert_name = env["SSL_CLIENT_S_DN_CN"]
except:
exception = self.req.error(message="authenticate: cannot get or parse certificate from env", error=403, exc=sys.exc_info(), env=env)
exception = self.req.error(
message="authenticate: cannot get or parse certificate from env",
error=403, exc=sys.exc_info(), env=env)
exception.log(self.log)
return None
if cert_name != args.setdefault("client", [cert_name])[0]:
exception = self.req.error(message="authenticate: client name does not correspond with certificate", error=403, cn = cert_name, args = args)
exception = self.req.error(
message="authenticate: client name does not correspond with certificate",
error=403, cn=cert_name, args=args)
exception.log(self.log)
return None
return PlainAuthenticator.authenticate(self, env, args, check_secret = False)
return PlainAuthenticator.authenticate(self, env, args, check_secret=False)
class X509MixMatchAuthenticator(X509Authenticator):
......@@ -409,7 +398,6 @@ class X509MixMatchAuthenticator(X509Authenticator):
self.hostname_auth = X509Authenticator(req, log, db)
self.name_auth = X509NameAuthenticator(req, log, db)
def authenticate(self, env, args):
if not self.is_verified_by_apache(env, args):
return None
......@@ -417,11 +405,13 @@ class X509MixMatchAuthenticator(X509Authenticator):
try:
cert_name = env["SSL_CLIENT_S_DN_CN"]
except:
exception = self.req.error(message="authenticate: cannot get or parse certificate from env", error=403, exc=sys.exc_info(), env=env)
exception = self.req.error(
message="authenticate: cannot get or parse certificate from env",
error=403, exc=sys.exc_info(), env=env)
exception.log(self.log)
return None
name = args.get("client", [None])[0]
secret = args.get("secret", [None])[0]
secret = args.get("secret", [None])[0]
# Client names are in reverse notation than DNS, client name should
# thus never be the same as machine hostname (if it is, client
......@@ -446,11 +436,6 @@ class NoValidator(ObjectBase):
def __init__(self, req, log):
ObjectBase.__init__(self, req, log)
def __str__(self):
return "%s(req=%s)" % (type(self).__name__, type(self.req).__name__)
def check(self, event):
return []
......@@ -460,15 +445,10 @@ class JSONSchemaValidator(NoValidator):
def __init__(self, req, log, filename=None):
NoValidator.__init__(self, req, log)
self.path = filename or path.join(path.dirname(__file__), "idea.schema")
with open(self.path) as f:
with io.open(self.path, "r", encoding="utf-8") as f:
self.schema = json.load(f)
self.validator = Draft4Validator(self.schema)
def __str__(self):
return "%s(req=%s, filename=\"%s\")" % (type(self).__name__, type(self.req).__name__, self.path)
def check(self, event):
def sortkey(k):
......@@ -489,10 +469,10 @@ class JSONSchemaValidator(NoValidator):
return res
class MySQL(ObjectBase):
def __init__(self, req, log, host, user, password, dbname, port, retry_count,
def __init__(
self, req, log, host, user, password, dbname, port, retry_count,
retry_pause, event_size_limit, catmap_filename, tagmap_filename):
ObjectBase.__init__(self, req, log)
self.host = host
......@@ -507,27 +487,21 @@ class MySQL(ObjectBase):
self.catmap_filename = catmap_filename
self.tagmap_filename = tagmap_filename
with open(catmap_filename, "r") as catmap_fd:
with io.open(catmap_filename, "r", encoding="utf-8") as catmap_fd:
self.catmap = json.load(catmap_fd)
self.catmap_other = self.catmap["Other"] # Catch error soon, avoid lookup later
with open(tagmap_filename, "r") as tagmap_fd:
with io.open(tagmap_filename, "r", encoding="utf-8") as tagmap_fd:
self.tagmap = json.load(tagmap_fd)
self.tagmap_other = self.catmap["Other"] # Catch error soon, avoid lookup later
self.tagmap_other = self.tagmap["Other"] # Catch error soon, avoid lookup later
self.con = None
def __str__(self):
return "%s(req=%s, host='%s', user='%s', dbname='%s', port=%d, retry_count=%d, retry_pause=%d, catmap_filename=\"%s\", tagmap_filename=\"%s\")" % (
type(self).__name__, type(self.req).__name__, self.host, self.user, self.dbname, self.port, self.retry_count, self.retry_pause, self.catmap_filename, self.tagmap_filename)
def connect(self):
self.con = my.connect(host=self.host, user=self.user, passwd=self.password,
self.con = my.connect(
host=self.host, user=self.user, passwd=self.password,
db=self.dbname, port=self.port, cursorclass=mycursors.DictCursor)
def close(self):
try:
if self.con:
......@@ -536,10 +510,8 @@ class MySQL(ObjectBase):
pass
self.con = None
__del__ = close
def repeat(self):
""" Allows for graceful repeating of transactions self.retry_count
times. Unsuccessful attempts wait for self.retry_pause until
......@@ -562,7 +534,6 @@ class MySQL(ObjectBase):
self.retry_attempt -= 1
yield self
def __enter__(self):
""" Context manager protocol. Guarantees that transaction will
get either commited or rolled back in case of database
......@@ -579,7 +550,6 @@ class MySQL(ObjectBase):
self.retry_attempt = 0
return self
def __exit__(self, exc_type, exc_val, exc_tb):
""" Context manager protocol. If db exception is fired and
self.retry_attempt is not zero, it is only logged and
......@@ -604,7 +574,6 @@ class MySQL(ObjectBase):
self.log.info("Database error (%d attempts left): %s %s" % (self.retry_attempt, exc_type.__name__, exc_val))
return True
def query(self, *args, **kwargs):
if not self.con:
self.connect()
......@@ -613,15 +582,12 @@ class MySQL(ObjectBase):
crs.execute(*args, **kwargs)
return crs
def _get_comma_perc(self, l):
return ','.join(['%s'] * len(l))
def _get_not(self, b):
return "" if b else "NOT"
def get_client_by_name(self, cert_names=None, name=None, secret=None):
query = ["SELECT * FROM clients WHERE valid = 1"]
params = []
......@@ -638,15 +604,14 @@ class MySQL(ObjectBase):
for attempt in self.repeat():
with attempt as db:
rows = db.query("".join(query), params).fetchall()
if len(rows)>1:
self.log.warn("get_client_by_name: query returned more than one result (cert_names = %s, name = %s, secret = %s): %s" % (cert_names, name, secret, ", ".join([str(Client(**row)) for row in rows])))
if len(rows) > 1:
self.log.warning(
"get_client_by_name: query returned more than one result (cert_names = %s, name = %s, secret = %s): %s" % (
cert_names, name, secret, ", ".join([str(Client(**row)) for row in rows])))
return None
return Client(**rows[0]) if rows else None
def get_clients(self, id=None):
query = ["SELECT * FROM clients"]
params = []
......@@ -659,7 +624,6 @@ class MySQL(ObjectBase):
rows = db.query(" ".join(query), params).fetchall()
return [Client(**row) for row in rows]
def add_modify_client(self, id=None, **kwargs):
query = []
params = []
......@@ -688,7 +652,6 @@ class MySQL(ObjectBase):
newid = crs.lastrowid if id is None else id
return newid
def get_debug(self):
for attempt in self.repeat():
with attempt as db:
......@@ -700,33 +663,36 @@ class MySQL(ObjectBase):
"tables": tablestat
}
def getMaps(self, section, variables):
maps = []
for v in variables:
try:
mapped = section[v]
except KeyError:
raise self.req.error(message="Wrong tag or category used in query.", error=422,
exc=sys.exc_info(), key=v)
raise self.req.error(
message="Wrong tag or category used in query.",
error=422, exc=sys.exc_info(), key=v)
maps.append(mapped)
return set(maps) # unique
def fetch_events(self, client, id, count,
def fetch_events(
self, client, id, count,
cat=None, nocat=None,
tag=None, notag=None,
group=None, nogroup=None):
if cat and nocat:
raise self.req.error(message="Unrealizable conditions. Choose cat or nocat option.", error=422,
cat=cat, nocat=nocat)
raise self.req.error(
message="Unrealizable conditions. Choose cat or nocat option.",
error=422, cat=cat, nocat=nocat)
if tag and notag:
raise self.req.error(message="Unrealizable conditions. Choose tag or notag option.", error=422,
tag=tag, notag=notag)
raise self.req.error(
message="Unrealizable conditions. Choose tag or notag option.",
error=422, tag=tag, notag=notag)
if group and nogroup:
raise self.req.error(message="Unrealizable conditions. Choose group or nogroup option.", error=422,
group=group, nogroup=nogroup)
raise self.req.error(
message="Unrealizable conditions. Choose group or nogroup option.",
error=422, group=group, nogroup=nogroup)
query = ["SELECT e.id, e.data FROM clients c RIGHT JOIN events e ON c.id = e.client_id WHERE e.id > %s"]
params = [id or 0]
......@@ -748,10 +714,11 @@ class MySQL(ObjectBase):
if group or nogroup:
subquery = []
for name in (group or nogroup):
subquery.append("c.name = %s") # exact client
escaped_name = name.replace('&', '&&').replace("_", "&_").replace("%", "&%") # escape for LIKE
subquery.append("c.name = %s") # exact client
params.append(name)
subquery.append("c.name LIKE %s") # whole subtree
params.append(name + ".%")
subquery.append("c.name LIKE CONCAT(%s, '.%%') ESCAPE '&'") # whole subtree
params.append(escaped_name)
query.append(" AND %s (%s)" % (self._get_not(group), " OR ".join(subquery)))
......@@ -778,8 +745,9 @@ class MySQL(ObjectBase):
# Note that we use Error object just for proper formatting,
# but do not raise it; from client perspective invalid
# events get skipped silently.
err = self.req.error(message="Unable to deserialize JSON event from db, id=%s" % r["id"], error=500,
exc=sys.exc_info(), id=r["id"])
err = self.req.error(
message="Unable to deserialize JSON event from db, id=%s" % r["id"],
error=500, exc=sys.exc_info(), id=r["id"])
err.log(self.log, prio=logging.WARNING)
events.append(e)
......@@ -788,13 +756,13 @@ class MySQL(ObjectBase):
"events": events
}
def store_events(self, client, events, events_raw):
try:
for attempt in self.repeat():
with attempt as db:
for event, raw_event in zip(events, events_raw):
lastid = db.query("INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)",
lastid = db.query(
"INSERT INTO events (received,client_id,data) VALUES (NOW(), %s, %s)",
(client.id, raw_event)).lastrowid
catlist = event.get('Category', ["Other"])
......@@ -816,51 +784,50 @@ class MySQL(ObjectBase):
exception.log(self.log)
return [{"error": 500, "message": "DB error %s" % type(e).__name__}]
def insertLastReceivedId(self, client, id):
self.log.debug("insertLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname))
for attempt in self.repeat():
with attempt as db:
db.query("INSERT INTO last_events(client_id, event_id, timestamp) VALUES(%s, %s, NOW())", (client.id, id))
def getLastEventId(self):
for attempt in self.repeat():
with attempt as db:
row = db.query("SELECT MAX(id) as id FROM events").fetchall()[0]
return row['id'] or 1
def getLastReceivedId(self, client):
for attempt in self.repeat():
with attempt as db:
res = db.query("SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1", (client.id,)).fetchall()
res = db.query(
"SELECT event_id as id FROM last_events WHERE client_id = %s ORDER BY last_events.id DESC LIMIT 1",
(client.id,)).fetchall()
try:
row = res[0]
except IndexError:
id = None
self.log.debug("getLastReceivedId: probably first access, unable to get id for client %i(%s)" % (client.id, client.hostname))
self.log.debug("getLastReceivedId: probably first access, unable to get id for client %i(%s)" % (
client.id, client.hostname))
else:
id = row["id"]
self.log.debug("getLastReceivedId: id %i for client %i(%s)" % (id, client.id, client.hostname))
return id
def load_maps(self):
with self as db:
db.query("DELETE FROM tags")
for tag, num in self.tagmap.iteritems():
for tag, num in self.tagmap.items():
db.query("INSERT INTO tags(id, tag) VALUES (%s, %s)", (num, tag))
db.query("DELETE FROM categories")
for cat_subcat, num in self.catmap.iteritems():
for cat_subcat, num in self.catmap.items():
catsplit = cat_subcat.split(".", 1)
category = catsplit[0]
subcategory = catsplit[1] if len(catsplit)>1 else None
db.query("INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES (%s, %s, %s, %s)",
subcategory = catsplit[1] if len(catsplit) > 1 else None
db.query(
"INSERT INTO categories(id, category, subcategory, cat_subcat) VALUES (%s, %s, %s, %s)",
(num, category, subcategory, cat_subcat))
def purge_lastlog(self, days):
with self as db:
return db.query(
......@@ -872,7 +839,6 @@ class MySQL(ObjectBase):
" WHERE timestamp < DATE_SUB(CURDATE(), INTERVAL %s DAY) AND last IS NULL",
(days,)).rowcount
def purge_events(self, days):
with self as db:
affected = 0
......@@ -890,7 +856,6 @@ class MySQL(ObjectBase):
return affected
def expose(read=1, write=0, debug=0):
def expose_deco(meth):
......@@ -899,7 +864,7 @@ def expose(read=1, write=0, debug=0):
meth.write = write
meth.debug = debug
if not hasattr(meth, "arguments"):
meth.arguments = meth.func_code.co_varnames[:meth.func_code.co_argcount]
meth.arguments = get_method_params(meth)
return meth
return expose_deco
......@@ -912,11 +877,6 @@ class Server(ObjectBase):
self.auth = auth
self.handler = handler
def __str__(self):
return "%s(req=%s, auth=%s, handler=%s)" % (type(self).__name__, type(self.req).__name__, type(self.auth).__name__, type(self.handler).__name__)
def sanitize_args(self, path, func, args, exclude=["self", "post"]):
# silently remove internal args, these should never be used
# but if somebody does, we do not expose them by error message
......@@ -936,7 +896,6 @@ class Server(ObjectBase):
return args
def wsgi_app(self, environ, start_response, exc_info=None):
path = environ.get("PATH_INFO", "").lstrip("/")
self.req.reset(env=environ, path=path)
......@@ -964,8 +923,20 @@ class Server(ObjectBase):
args = self.sanitize_args(path, method, args)
# Based on RFC2616, section 4.4 we SHOULD respond with 400 (bad request) or 411
# (length required) if content length was not specified. We choose not to, to
# preserve compatibility with clients deployed in the wild, which use POST for
# all requests (even those without payload, with no specified content length).
# According to PEP3333, section "Input and Error Streams", the application SHOULD
# NOT attempt to read more data than specified by CONTENT_LENGTH. As stated in
# section "environ Variables", CONTENT_LENGTH may be empty (string) or absent.
try:
post_data = environ['wsgi.input'].read()
content_length = int(environ.get('CONTENT_LENGTH', 0))
except ValueError:
content_length = 0
try:
post_data = environ['wsgi.input'].read(content_length)
except:
raise self.req.error(message="Data read error.", error=408, exc=sys.exc_info())
......@@ -983,10 +954,15 @@ class Server(ObjectBase):
# Make sure everything is properly encoded - JSON and various function
# may spit out unicode instead of str and it gets propagated up (str
# + unicode = unicode). However, the right thing would be to be unicode
# correct among whole source and always decode on input (json module
# does that for us) and on output here.
if isinstance(status, unicode):
# + unicode = unicode).
# For Python2 the right thing would be to be unicode correct among whole
# source and always decode on input (json module does that for us) and
# on output here.
# For Python3 strings are internally unicode so no decoding on input is
# necessary. For output, "status" must be unicode string, "output" must
# be encoded bytes array, what is done here. Important: for Python 3 we
# define: unicode = str
if isinstance(status, unicode) and sys.version_info[0] < 3:
status = status.encode("utf-8")
if isinstance(output, unicode):
output = output.encode("utf-8")
......@@ -995,19 +971,17 @@ class Server(ObjectBase):
self.req.reset()
return [output]
__call__ = wsgi_app
def json_wrapper(method):
def meth_deco(self, post, **args):
if "events" in method.func_code.co_varnames[0:method.func_code.co_argcount]:
if "events" in get_method_params(method):
try:
events = json.loads(post) if post else None
events = json.loads(post.decode('utf-8')) if post else None
except Exception as e:
raise self.req.error(message="Deserialization error.", error=400,
raise self.req.error(
message="Deserialization error.", error=400,
exc=sys.exc_info(), args=post, parser=str(e))
if events:
args["events"] = events
......@@ -1019,21 +993,21 @@ def json_wrapper(method):
# which could (although shouldn't) appear in handler code
output = json.dumps(result, default=lambda v: str(v))
except Exception as e:
raise self.req.error(message="Serialization error", error=500,
exc=sys.exc_info(), args=str(result))
raise self.req.error(message="Serialization error", error=500, exc=sys.exc_info(), args=str(result))
return [('Content-type', 'application/json')], output
try:
meth_deco.arguments = method.arguments
except AttributeError:
meth_deco.arguments = method.func_code.co_varnames[:method.func_code.co_argcount]
meth_deco.arguments = get_method_params(method)
return meth_deco
class WardenHandler(ObjectBase):
def __init__(self, req, log, validator, db, auth,
def __init__(
self, req, log, validator, db, auth,
send_events_limit=500, get_events_limit=1000,
description=None):
......@@ -1045,25 +1019,19 @@ class WardenHandler(ObjectBase):
self.get_events_limit = get_events_limit
self.description = description
def __str__(self):
return "%s(req=%s, validator=%s, db=%s, send_events_limit=%s, get_events_limit=%s, description=\"%s\")" % (
type(self).__name__, type(self.req).__name__, type(self.validator).__name__, type(self.db).__name__,
self.get_events_limit, self.send_events_limit, self.description)
@expose(read=1, debug=1)
@json_wrapper
def getDebug(self):
return {
"environment": self.req.env,
"client": self.req.client.__dict__,
"client": self.req.client._asdict(),
"database": self.db.get_debug(),
"system": {
"python": sys.version,
"uname": os.uname()
},
"process": {
"cwd": os.getcwdu(),
"cwd": unicode(os.getcwd()),
"pid": os.getpid(),
"ppid": os.getppid(),
"pgrp": os.getpgrp(),
......@@ -1075,7 +1043,6 @@ class WardenHandler(ObjectBase):
}
}
@expose(read=1)
@json_wrapper
def getInfo(self):
......@@ -1088,10 +1055,10 @@ class WardenHandler(ObjectBase):
info["description"] = self.description
return info
@expose(read=1)
@json_wrapper
def getEvents(self, id=None, count=None,
def getEvents(
self, id=None, count=None,
cat=None, nocat=None,
tag=None, notag=None,
group=None, nogroup=None):
......@@ -1105,9 +1072,9 @@ class WardenHandler(ObjectBase):
# If client was already here, fetch server notion of his last id
try:
id = self.db.getLastReceivedId(self.req.client)
except Exception, e:
except Exception as e:
self.log.info("cannot getLastReceivedId - " + type(e).__name__ + ": " + str(e))
if id is None:
# First access, remember the guy and get him last id
id = self.db.getLastEventId()
......@@ -1117,10 +1084,10 @@ class WardenHandler(ObjectBase):
"events": []
}
if id<=0:
if id <= 0:
# Client wants to get only last N events and reset server notion of last id
id += self.db.getLastEventId()
if id<0: id=0
if id < 0: id = 0
try:
count = int(count[0])
......@@ -1129,6 +1096,7 @@ class WardenHandler(ObjectBase):
if self.get_events_limit:
count = min(count, self.get_events_limit)
count = max(0, count)
res = self.db.fetch_events(self.req.client, id, count, cat, nocat, tag, notag, group, nogroup)
......@@ -1138,7 +1106,6 @@ class WardenHandler(ObjectBase):
return res
def check_node(self, event, name):
try:
ev_id = event['Node'][0]['Name'].lower()
......@@ -1149,7 +1116,6 @@ class WardenHandler(ObjectBase):
return [{"error": 422, "message": "Node does not correspond with saving client"}]
return []
def add_event_nums(self, ilist, events, errlist):
for err in errlist:
err.setdefault("events", []).extend(ilist)
......@@ -1163,7 +1129,6 @@ class WardenHandler(ObjectBase):
ev_ids.append(id)
return errlist
@expose(write=1)
@json_wrapper
def sendEvents(self, events=[]):
......@@ -1171,11 +1136,9 @@ class WardenHandler(ObjectBase):
raise self.req.error(message="List of events expected.", error=400)
errs = []
if len(events)>self.send_events_limit:
errs.extend(
self.add_event_nums(range(self.send_events_limit, len(events)), events,
[{"error": 507, "message": "Too much events in one batch.",
"send_events_limit": self.send_events_limit}]))
if len(events) > self.send_events_limit:
errs.extend(self.add_event_nums(range(self.send_events_limit, len(events)), events, [
{"error": 507, "message": "Too much events in one batch.", "send_events_limit": self.send_events_limit}]))
saved = 0
events_tosend = []
......@@ -1192,15 +1155,20 @@ class WardenHandler(ObjectBase):
errs.extend(self.add_event_nums([i], events, node_errs))
continue
if self.req.client.test and not 'Test' in event.get('Category', []):
errs.extend(self.add_event_nums([i], events, [{"error": 422,
"message": "You're allowed to send only messages, containing \"Test\" among categories.",
"categories": event.get('Category', [])}]))
if self.req.client.test and 'Test' not in event.get('Category', []):
errs.extend(
self.add_event_nums([i], events, [{
"error": 422,
"message": "You're allowed to send only messages, containing \"Test\" among categories.",
"categories": event.get('Category', [])}]))
continue
raw_event = json.dumps(event)
if len(raw_event) >= self.db.event_size_limit:
errs.extend(self.add_event_nums([i], events, [{"error": 413, "message": "Event too long (>%i B)" % self.db.event_size_limit}]))
errs.extend(
self.add_event_nums([i], events, [
{"error": 413, "message": "Event too long (>%i B)" % self.db.event_size_limit}
]))
continue
events_tosend.append(event)
......@@ -1221,32 +1189,32 @@ class WardenHandler(ObjectBase):
return {"saved": saved}
def read_ini(path):
c = ConfigParser.RawConfigParser()
res = c.read(path)
if not res or not path in res:
if not res or path not in res:
# We don't have loggin yet, hopefully this will go into webserver log
raise Error(message="Unable to read config: %s" % path)
data = {}
for sect in c.sections():
for opts in c.options(sect):
lsect = sect.lower()
if not lsect in data:
if lsect not in data:
data[lsect] = {}
data[lsect][opts] = c.get(sect, opts)
return data
def read_cfg(path):
with open(path, "r") as f:
with io.open(path, "r", encoding="utf-8") as f:
stripcomments = "\n".join((l for l in f if not l.lstrip().startswith(("#", "//"))))
conf = json.loads(stripcomments)
# Lowercase keys
conf = dict((sect.lower(), dict(
(subkey.lower(), val) for subkey, val in subsect.iteritems())
) for sect, subsect in conf.iteritems())
conf = dict((
sect.lower(), dict(
(subkey.lower(), val) for subkey, val in subsect.items())
) for sect, subsect in conf.items())
return conf
......@@ -1255,8 +1223,8 @@ def fallback_wsgi(environ, start_response, exc_info=None):
# If server does not start, set up simple server, returning
# Warden JSON compliant error message
error=503
message="Server not running due to initialization error"
error = 503
message = "Server not running due to initialization error"
headers = [('Content-type', 'application/json')]
logline = "Error(%d): %s" % (error, message)
......@@ -1278,7 +1246,7 @@ section_order = ("log", "db", "auth", "validator", "handler", "server")
section_def = {
"log": [FileLogger, SysLogger],
"db": [MySQL],
"auth": [X509Authenticator, PlainAuthenticator, X509NameAuthenticator, X509MixMatchAuthenticator],
"auth": [X509NameAuthenticator, PlainAuthenticator, X509Authenticator, X509MixMatchAuthenticator],
"validator": [JSONSchemaValidator, NoValidator],
"handler": [WardenHandler],
"server": [Server]
......@@ -1363,7 +1331,6 @@ def build_server(conf, section_order=section_order, section_def=section_def, par
objects = {} # Already initialized objects
# Functions for validation and conversion of config values
def facility(name):
return int(getattr(logging.handlers.SysLogHandler, "LOG_" + name.upper()))
......@@ -1373,7 +1340,7 @@ def build_server(conf, section_order=section_order, section_def=section_def, par
def natural(name):
num = int(name)
if num<1:
if num < 1:
raise ValueError("Not a natural number")
return num
......@@ -1384,7 +1351,6 @@ def build_server(conf, section_order=section_order, section_def=section_def, par
def obj(name):
return objects[name.lower()]
# Typedef dictionary
conv_dict = {
"facility": facility,
......@@ -1395,9 +1361,8 @@ def build_server(conf, section_order=section_order, section_def=section_def, par
"str": str
}
def init_obj(sect_name):
config = conf.get(sect_name, {})
config = dict(conf.get(sect_name, {}))
sect_name = sect_name.lower()
sect_def = section_def[sect_name]
......@@ -1406,7 +1371,7 @@ def build_server(conf, section_order=section_order, section_def=section_def, par
del config["type"]
except KeyError: # No, fetch default object type for this section
cls = sect_def[0]
else: # Yes, get corresponding class/callable
else: # Yes, get corresponding class/callable
names = [o.__name__ for o in sect_def]
try:
idx = names.index(objtype)
......@@ -1424,7 +1389,7 @@ def build_server(conf, section_order=section_order, section_def=section_def, par
# Process parameters
kwargs = {}
for name, definition in params.iteritems():
for name, definition in params.items():
raw_val = config.get(name, definition["default"])
try:
type_callable = conv_dict[definition["type"]]
......@@ -1468,12 +1433,11 @@ def build_server(conf, section_order=section_order, section_def=section_def, par
return objects["server"]
# Command line utilities
def check_config():
# If we got so far, server object got set up fine
print >>sys.stderr, "Looks clear."
print("Looks clear.", file=sys.stderr)
return 0
......@@ -1483,7 +1447,7 @@ def list_clients(id=None):
col_width = [max(len(val) for val in col) for col in zip(*(lines+[Client._fields]))]
divider = ["-" * l for l in col_width]
for line in [Client._fields, divider] + lines:
print " ".join([val.ljust(width) for val, width in zip(line, col_width)])
print(" ".join([val.ljust(width) for val, width in zip(line, col_width)]))
return 0
......@@ -1502,56 +1466,54 @@ def modify_client(**kwargs):
def isValidHostname(hostname):
if len(hostname) > 255:
return False
if hostname.endswith("."): # A single trailing dot is legal
hostname = hostname[:-1] # strip exactly one dot from the right, if present
disallowed = re.compile("[^A-Z\d-]", re.IGNORECASE)
return all( # Split by labels and verify individually
(label and len(label) <= 63 # length is within proper range
and not label.startswith("-") and not label.endswith("-") # no bordering hyphens
and not disallowed.search(label)) # contains only legal characters
if hostname.endswith("."): # A single trailing dot is legal
hostname = hostname[:-1] # strip exactly one dot from the right, if present
disallowed = re.compile(r"[^A-Z\d-]", re.IGNORECASE)
return all( # Split by labels and verify individually
(label and len(label) <= 63 # length is within proper range
and not label.startswith("-") and not label.endswith("-") # no bordering hyphens
and not disallowed.search(label)) # contains only legal characters
for label in hostname.split("."))
def isValidNSID(nsid):
allowed = re.compile("^(?:[a-zA-Z_][a-zA-Z0-9_]*\\.)*[a-zA-Z_][a-zA-Z0-9_]*$")
allowed = re.compile(r"^(?:[a-zA-Z_][a-zA-Z0-9_]*\.)*[a-zA-Z_][a-zA-Z0-9_]*$")
return allowed.match(nsid)
def isValidEmail(mail):
mails = (email.utils.parseaddr(m) for m in mail.split(","))
allowed = re.compile("^[a-zA-Z0-9_.%!+-]+@[a-zA-Z0-9-.]+$") # just basic check
valid = (allowed.match(ms[1]) for ms in mails)
allowed = re.compile(r"(^[a-zA-Z0-9_ .%!+-]*(?=<.*>))?(^|(<(?=.*(>))))[a-zA-Z0-9_.%!+-]+@[a-zA-Z0-9-.]+\4?$") # just basic check
valid = (allowed.match(ms.strip())for ms in mail.split(','))
return all(valid)
def isValidID(id):
client = server.handler.db.get_clients(id)
return client and True or False
if kwargs["name"] is not None:
kwargs["name"] = kwargs["name"].lower()
if not isValidNSID(kwargs["name"]):
print >>sys.stderr, "Invalid client name \"%s\"." % kwargs["name"]
print("Invalid client name \"%s\"." % kwargs["name"], file=sys.stderr)
return 254
if kwargs["hostname"] is not None:
kwargs["hostname"] = kwargs["hostname"].lower()
if not isValidHostname(kwargs["hostname"]):
print >>sys.stderr, "Invalid hostname \"%s\"." % kwargs["hostname"]
print("Invalid hostname \"%s\"." % kwargs["hostname"], file=sys.stderr)
return 253
if kwargs["requestor"] is not None and not isValidEmail(kwargs["requestor"]):
print >>sys.stderr, "Invalid requestor email \"%s\"." % kwargs["requestor"]
print("Invalid requestor email \"%s\"." % kwargs["requestor"], file=sys.stderr)
return 252
if kwargs["id"] is not None and not isValidID(kwargs["id"]):
print >>sys.stderr, "Invalid id \"%s\"." % kwargs["id"]
print("Invalid id \"%s\"." % kwargs["id"], file=sys.stderr)
return 251
for c in server.handler.db.get_clients():
if kwargs["name"] is not None and kwargs["name"].lower()==c.name:
print >>sys.stderr, "Clash with existing name: %s" % str(c)
if kwargs["name"] is not None and kwargs["name"].lower() == c.name:
print("Clash with existing name: %s" % str(c), file=sys.stderr)
return 250
if kwargs["secret"] is not None and kwargs["secret"]==c.secret:
print >>sys.stderr, "Clash with existing secret: %s" % str(c)
if kwargs["secret"] is not None and kwargs["secret"] == c.secret:
print("Clash with existing secret: %s" % str(c), file=sys.stderr)
return 249
newid = server.handler.db.add_modify_client(**kwargs)
......@@ -1569,51 +1531,62 @@ def purge(days=30, lastlog=None, events=None):
lastlog = events = True
if lastlog:
count = server.handler.db.purge_lastlog(days)
print "Purged %d lastlog entries." % count
print("Purged %d lastlog entries." % count)
if events:
count = server.handler.db.purge_events(days)
print "Purged %d events." % count
print("Purged %d events." % count)
return 0
def add_client_args(subargp, mod=False):
subargp.add_argument("--help", action="help", help="show this help message and exit")
if mod:
subargp.add_argument("-i", "--id", required=True, type=int,
subargp.add_argument(
"-i", "--id", required=True, type=int,
help="client id")
subargp.add_argument("-n", "--name", required=not mod,
subargp.add_argument(
"-n", "--name", required=not mod,
help="client name (in dotted reverse path notation)")
subargp.add_argument("-h", "--hostname", required=not mod,
subargp.add_argument(
"-h", "--hostname", required=not mod,
help="client FQDN hostname")
subargp.add_argument("-r", "--requestor", required=not mod,
subargp.add_argument(
"-r", "--requestor", required=not mod,
help="requestor email")
subargp.add_argument("-s", "--secret",
subargp.add_argument(
"-s", "--secret",
help="authentication token (use explicit empty string to disable)")
subargp.add_argument("--note",
subargp.add_argument(
"--note",
help="client freetext description")
reg_valid = subargp.add_mutually_exclusive_group(required=False)
reg_valid.add_argument("--valid", action="store_const", const=1, default=None,
reg_valid.add_argument(
"--valid", action="store_const", const=1, default=None,
help="valid client (default)")
reg_valid.add_argument("--novalid", action="store_const", const=0, dest="valid", default=None)
reg_read = subargp.add_mutually_exclusive_group(required=False)
reg_read.add_argument("--read", action="store_const", const=1, default=None,
reg_read.add_argument(
"--read", action="store_const", const=1, default=None,
help="client is allowed to read (default)")
reg_read.add_argument("--noread", action="store_const", const=0, dest="read", default=None)
reg_write = subargp.add_mutually_exclusive_group(required=False)
reg_write.add_argument("--nowrite", action="store_const", const=0, dest="write", default=None,
reg_write.add_argument(
"--nowrite", action="store_const", const=0, dest="write", default=None,
help="client is allowed to send (default - no)")
reg_write.add_argument("--write", action="store_const", const=1, default=None)
reg_debug = subargp.add_mutually_exclusive_group(required=False)
reg_debug.add_argument("--nodebug", action="store_const", const=0, dest="debug", default=None,
reg_debug.add_argument(
"--nodebug", action="store_const", const=0, dest="debug", default=None,
help="client is allowed receive debug output (default - no)")
reg_debug.add_argument("--debug", action="store_const", const=1, default=None)
reg_test = subargp.add_mutually_exclusive_group(required=False)
reg_test.add_argument("--test", action="store_const", const=1, default=None,
reg_test.add_argument(
"--test", action="store_const", const=1, default=None,
help="client is yet in testing phase (default - yes)")
reg_test.add_argument("--notest", action="store_const", const=0, dest="test", default=None)
......@@ -1622,71 +1595,88 @@ def get_args():
import argparse
argp = argparse.ArgumentParser(
description="Warden server " + VERSION, add_help=False)
argp.add_argument("--help", action="help",
argp.add_argument(
"--help", action="help",
help="show this help message and exit")
argp.add_argument("-c", "--config",
argp.add_argument(
"-c", "--config",
help="path to configuration file")
subargp = argp.add_subparsers(title="commands")
subargp = argp.add_subparsers(title="commands", dest="command")
subargp.required = True
subargp_check = subargp.add_parser("check", add_help=False,
subargp_check = subargp.add_parser(
"check", add_help=False,
description="Try to setup server based on configuration file.",
help="check configuration")
subargp_check.set_defaults(command=check_config)
subargp_check.add_argument("--help", action="help",
subargp_check.add_argument(
"--help", action="help",
help="show this help message and exit")
subargp_reg = subargp.add_parser("register", add_help=False,
subargp_reg = subargp.add_parser(
"register", add_help=False,
description="Add new client registration entry.",
help="register new client")
subargp_reg.set_defaults(command=register_client)
add_client_args(subargp_reg)
subargp_mod = subargp.add_parser("modify", add_help=False,
subargp_mod = subargp.add_parser(
"modify", add_help=False,
description="Modify details of client registration entry.",
help="modify client registration")
subargp_mod.set_defaults(command=modify_client)
add_client_args(subargp_mod, mod=True)
subargp_list = subargp.add_parser("list", add_help=False,
subargp_list = subargp.add_parser(
"list", add_help=False,
description="List details of client registration entries.",
help="list registered clients")
subargp_list.set_defaults(command=list_clients)
subargp_list.add_argument("--help", action="help",
subargp_list.add_argument(
"--help", action="help",
help="show this help message and exit")
subargp_list.add_argument("--id", action="store", type=int,
subargp_list.add_argument(
"--id", action="store", type=int,
help="client id", default=None)
subargp_purge = subargp.add_parser("purge", add_help=False,
description=
subargp_purge = subargp.add_parser(
"purge", add_help=False,
description=(
"Purge old events or lastlog records."
" Note that lastlog purge retains at least one newest record for each"
" client, even if it is more than number of 'days' old.",
" client, even if it is more than number of 'days' old."),
help="purge old events or lastlog records")
subargp_purge.set_defaults(command=purge)
subargp_purge.add_argument("--help", action="help",
subargp_purge.add_argument(
"--help", action="help",
help="show this help message and exit")
subargp_purge.add_argument("-l", "--lastlog", action="store_true", dest="lastlog", default=None,
subargp_purge.add_argument(
"-l", "--lastlog", action="store_true", dest="lastlog", default=None,
help="purge lastlog records")
subargp_purge.add_argument("-e", "--events", action="store_true", dest="events", default=None,
subargp_purge.add_argument(
"-e", "--events", action="store_true", dest="events", default=None,
help="purge events")
subargp_purge.add_argument("-d", "--days", action="store", dest="days", type=int, default=30,
subargp_purge.add_argument(
"-d", "--days", action="store", dest="days", type=int, default=30,
help="records older than 'days' back from today will get purged")
subargp_loadmaps = subargp.add_parser("loadmaps", add_help=False,
description=
subargp_loadmaps = subargp.add_parser(
"loadmaps", add_help=False,
description=(
"Load 'categories' and 'tags' table from 'catmap_mysql.json' and 'tagmap_mysql.json'."
" Note that this is NOT needed for server at all, load them into db at will,"
" should you need to run your own specific SQL queries on data directly."
" Note also that previous content of both tables will be lost.",
" Note also that previous content of both tables will be lost."),
help="load catmap and tagmap into db")
subargp_loadmaps.set_defaults(command=load_maps)
subargp_loadmaps.add_argument("--help", action="help",
subargp_loadmaps.add_argument(
"--help", action="help",
help="show this help message and exit")
return argp.parse_args()
if __name__=="__main__":
if __name__ == "__main__":
args = get_args()
config = path.join(path.dirname(__file__), args.config or "warden_server.cfg")
server = build_server(read_cfg(config))
......@@ -1695,6 +1685,6 @@ if __name__=="__main__":
del subargs["command"]
del subargs["config"]
if not server or server is fallback_wsgi:
print >>sys.stderr, "Failed initialization, check configured log targets for reasons."
print("Failed initialization, check configured log targets for reasons.", file=sys.stderr)
sys.exit(255)
sys.exit(command(**subargs))