Skip to content
Snippets Groups Projects
warden_ra.py 14.8 KiB
Newer Older
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016, CESNET, z. s. p. o.
# Use of this source is governed by an ISC license, see LICENSE file.

import sys
import os
import string
import random
import struct
import argparse
import subprocess
import json
# *ph* server vulnerable to logjam, local openssl too new, use hammer to disable Diffie-Helmann
import ssl
ssl._DEFAULT_CIPHERS += ":!DH"

import ejbcaws

sys.path.append(os.path.join(os.path.dirname(__file__), "..", "warden-server"))
import warden_server
from warden_server import Request, ObjectBase, FileLogger, SysLogger, Server, expose, read_cfg

class EjbcaClient(object):

    def __init__(self, registry, ejbca_data=None):
        self.registry = registry
        self.ejbca_data = ejbca_data or {}

    @property
    def admins(self):
        return [u if not u.startswith("RFC822NAME") else u[11:] for u in self.ejbca_data["subjectAltName"].split(",")]

    @admins.setter
    def admins(self, emails):
        self.ejbca_data["subjectAltName"] = ",".join(("RFC822NAME=%s" % e for e in emails))

    @property
    def name(self):
        username = self.ejbca_data["username"]
        if not username.endswith(self.registry.username_suffix):
            raise ValueError(("Ejbca user username does not conform to config", self.ejbca_data))
        return username[:-len(self.registry.username_suffix)]

    @name.setter
    def name(self, new):
        self.ejbca_data["username"] = new + self.registry.username_suffix
        self.ejbca_data["subjectDN"] = self.registry.subject_dn_template % new

    @property
    def status(self):
        s = self.ejbca_data["status"]
        if s == ejbcaws.STATUS_NEW:
            return "Issuable"
        elif s == ejbcaws.STATUS_GENERATED:
            return "Passive"
        elif s == ejbcaws.STATUS_INITIALIZED:
            return "New"
        else:
            return "EJBCA status %d" % s

    def get_certs(self):
        return self.registry.ejbca.find_certs(self.ejbca_data["username"], validOnly=False)

    def allow_new_cert(self, pwd=None):
        self.ejbca_data["status"] = ejbcaws.STATUS_NEW
        if pwd is not None:
            self.ejbca_data["password"] = pwd
            self.ejbca_data["clearPwd"] = True

    def new_cert(self, csr, pwd):
        cert = self.registry.ejbca.pkcs10_request(
            self.ejbca_data["username"],
            pwd, csr, 0, ejbcaws.RESPONSETYPE_CERTIFICATE)
        return cert

    def __str__(self):
        return (
            "Client:   %s (%s)\n"
            "Admins:   %s\n"
            "Status:   %s\n"
        ) % (
            self.name,
            self.ejbca_data["subjectDN"],
            ", ".join(self.admins),
            self.status
        )

    def verbose_str(self):
        return "%s\n" % self.ejbca_data

    def save(self):
        self.registry.ejbca.edit_user(self.ejbca_data)


class EjbcaRegistry(object):

    def __init__(self, log, url, cert=None, key=None,
                 ca_name="", certificate_profile_name="", end_entity_profile_name="",
                 subject_dn_template="%s", username_suffix=""):
        self.log = log
        self.ejbca = ejbcaws.Ejbca(url, cert, key)
        self.ca_name = ca_name
        self.certificate_profile_name = certificate_profile_name
        self.end_entity_profile_name = end_entity_profile_name
        self.subject_dn_template = subject_dn_template
        self.username_suffix = username_suffix

    def get_clients(self):
        return (EjbcaClient(registry=self, ejbca_data=data) for data in self.ejbca.get_users())

    def get_client(self, name):
        users = self.ejbca.find_user(ejbcaws.MATCH_WITH_USERNAME, ejbcaws.MATCH_TYPE_EQUALS, name + self.username_suffix)
        if len(users) > 1:
            raise LookupError("%d users %s found (more than one?!)" % (len(users), name))
        if not users:
            return None
        return EjbcaClient(registry=self, ejbca_data=users[0])

    def new_client(self, name, admins):
        user = self.get_client(name)
        if user:
            raise LookupError("Client %s already exists" % name)
        new_ejbca_data = dict(
            caName=self.ca_name,
            certificateProfileName=self.certificate_profile_name,
            endEntityProfileName=self.end_entity_profile_name,
            keyRecoverable=False,
            sendNotification=False,
            status=ejbcaws.STATUS_INITIALIZED,
            subjectAltName="",
            subjectDN="",
            tokenType=ejbcaws.TOKEN_TYPE_USERGENERATED,
            username="",
            password = "".join((random.choice(string.ascii_letters + string.digits) for dummy in range(16))),
            clearPwd = True
        )
        client = EjbcaClient(registry=self, ejbca_data=new_ejbca_data)
        client.name = name
        client.admins = admins
        return client

    def verbose_str(self):
        return self.ejbca.get_version()


def format_cert(cert):
    return (
        "Subject:     %s\n"
        "Validity:    %s - %s\n"
        "Serial:      %s\n"
        "Fingerprint: md5:%s, sha1:%s\n"
        "Issuer:      %s\n"
    ) % (
        cert.get_subject().as_text(),
        cert.get_not_before().get_datetime().isoformat(),
        cert.get_not_after().get_datetime().isoformat(),
        ":".join(["%02x" % ord(c) for c in struct.pack('!Q', cert.get_serial_number())]),
        cert.get_fingerprint("md5"),
        cert.get_fingerprint("sha1"),
        cert.get_issuer().as_text()
    )

    def __init__(self, req, log):
        ObjectBase.__init__(self, req, log)


    def __str__(self):
        return "%s(req=%s)" % (type(self).__name__, type(self.req).__name__)


    def authenticate(self, env, args):
        return True


    def authorize(self, env, client, path, method):
        return True


    def __init__(self, req, log, registry):
        ObjectBase.__init__(self, req, log)
    def getCert(self, csr_data=None, name=None, password=None):
        if not (name and password and csr_data):
            raise self.req.error(message="Wrong or missing arguments", error=400, client=name[0], password=password[0])
        client = self.registry.get_client(name[0])
        if not client:
            raise self.req.error(message="Unknown client", error=403, client=name[0], password=password[0])
        self.log.info("Client: %s" % client.name)
        try:
            newcert = client.new_cert(csr_data, password)
        except Exception as e:
            raise self.req.error(message="Processing error", error=403, exc=sys.exc_info())
        self.log.info("Generated.")
        return [("Content-Type", "application/x-x509-user-cert")], newcert.as_pem()
# Order in which the base objects must get initialized
section_order = ("log", "auth", "registry", "handler", "server")

# List of sections and objects, configured by them
# First object in each object list is the default one, otherwise
# "type" keyword in section may be used to choose other
section_def = {
    "log": [FileLogger, SysLogger],
    "auth": [NullAuthenticator],
    "registry": [EjbcaRegistry],
    "handler": [CertHandler],
    "server": [Server]
}

# Object parameter conversions and defaults
param_def = {
    FileLogger: warden_server.param_def[FileLogger],
    SysLogger: warden_server.param_def[SysLogger],
    Server: warden_server.param_def[Server],
    NullAuthenticator: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"}
    },
    EjbcaRegistry: {
        "log":  {"type": "obj", "default": "log"},
        "url": {"type": "str", "default": "https://ejbca.example.org/ejbca/ejbcaws/ejbcaws?wsdl"},
        "cert": {"type": "filepath", "default": os.path.join(os.path.dirname(__file__), "warden_ra.cert.pem")},
        "key": {"type": "filepath", "default": os.path.join(os.path.dirname(__file__), "warden_ra.key.pem")},
        "ca_name": {"type": "str", "default": "Example CA"},
        "certificate_profile_name": {"type": "str", "default": "Example"},
        "end_entity_profile_name": {"type": "str", "default": "Example EE"},
        "subject_dn_template": {"type": "str", "default": "DC=cz,DC=example-ca,DC=warden,CN=%s"},
        "username_suffix": {"type": "str", "default": "@warden"}
    },
    CertHandler: {
        "req": {"type": "obj", "default": "req"},
        "log": {"type": "obj", "default": "log"},
        "registry": {"type": "obj", "default": "registry"}
    }
}

param_def[FileLogger]["filename"] = {"type": "filepath", "default": os.path.join(os.path.dirname(__file__), os.path.splitext(os.path.split(__file__)[1])[0] + ".log")}


    return warden_server.build_server(conf, section_order, section_def, param_def)

def list_clients(registry, name=None, verbose=False):
    if name is not None:
        client = registry.get_client(name)
        if client is None:
            print "No such client."
            return
        else:
            print(client)
            if verbose:
                print(client.verbose_str())
            for cert in sorted(client.get_certs(), key=lambda c: c.get_not_after().get_datetime()):
                print(format_cert(cert))
                if verbose:
                    print(cert.as_text())
    else:
        clients = registry.get_clients()
        for client in sorted (clients, key=lambda c: c.name):
            print(client)
                print(client.verbose_str())


def register_client(registry, name, admins=None, verbose=False):
    try:
        client = registry.new_client(name, admins or [])
    except LookupError as e:
        print(e)
        return
    client.save()
    list_clients(registry, name, verbose)


def applicant(registry, name, password=None, verbose=False):
    client = registry.get_client(name)
    if password is None:
        password = "".join((random.choice(string.ascii_letters + string.digits) for dummy in range(16)))
        print("Application password is: %s\n" % password)
    client.allow_new_cert(pwd=password)
    client.save()
    list_clients(registry, name, verbose)


def request(registry, key, csr, verbose=False):
    openssl = subprocess.Popen(
        [
            "openssl", "req", "-new", "-nodes", "-batch",
            "-keyout", key,
            "-out", csr,
            "-config", "/dev/stdin"
        ], stdin=subprocess.PIPE
    )
    openssl.stdin.write(
        "distinguished_name=req_distinguished_name\n"
        "prompt=no\n"
        "\n"
        "[req_distinguished_name]\n"
        "commonName=dummy"
    )
    openssl.stdin.close()
    openssl.wait()
    if verbose:
        with open(csr, "r") as f:
            print(f.read())


def gen_cert(registry, name, csr, cert, password, verbose=False):
    with open(csr, "r") as f:
        csr_data = f.read()
    client = registry.get_client(name)
    newcert = client.new_cert(csr_data, password)
    print(format_cert(newcert))
    if verbose:
        print(newcert.as_text())
        print(newcert.as_pem())
    with open(cert, "w") as f:
        f.write(newcert.as_text())
        f.write(newcert.as_pem())


def get_args():
    argp = argparse.ArgumentParser(
        description="Warden server certificate registry", add_help=False)
    argp.add_argument("--help", action="help",
        help="show this help message and exit")
    argp.add_argument("-c", "--config",
        help="path to configuration file")
    argp.add_argument("-v", "--verbose", action="store_true", default=False,
        help="be more chatty")
    subargp = argp.add_subparsers(title="commands")

    subargp_list = subargp.add_parser("list", add_help=False,
        description="List registered clients.",
        help="list clients")
    subargp_list.set_defaults(command=list_clients)
    subargp_list.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_list.add_argument("--name", action="store", type=str,
        help="client name")

    subargp_reg = subargp.add_parser("register", add_help=False,
        description="Add client registration entry.",
        help="register client")
    subargp_reg.set_defaults(command=register_client)
    subargp_reg.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_reg.add_argument("--name", action="store", type=str,
        required=True, help="client name")
    subargp_reg.add_argument("--admins", action="store", type=str,
        nargs="*", help="administrator list")

    subargp_apply = subargp.add_parser("applicant", add_help=False,
        description="Set client into certificate application mode and set its password",
        help="allow for certificate application")
    subargp_apply.set_defaults(command=applicant)
    subargp_apply.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_apply.add_argument("--name", action="store", type=str,
        required=True, help="client name")
    subargp_apply.add_argument("--password", action="store", type=str,
        help="password for application (will be autogenerated if not set)")

    subargp_req = subargp.add_parser("request", add_help=False,
        description="Generate certificate request",
        help="generate CSR")
    subargp_req.set_defaults(command=request)
    subargp_req.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_req.add_argument("--key", action="store", type=str,
        required=True, help="file for saving the key")
    subargp_req.add_argument("--csr", action="store", type=str,
        required=True, help="file for saving the request")

    subargp_cert = subargp.add_parser("gencert", add_help=False,
        description="Request new certificate from registry",
        help="get new certificate")
    subargp_cert.set_defaults(command=gen_cert)
    subargp_cert.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_cert.add_argument("--name", action="store", type=str,
        required=True, help="client name")
    subargp_cert.add_argument("--csr", action="store", type=str,
        required=True, help="file for saving the request")
    subargp_cert.add_argument("--cert", action="store", type=str,
        required=True, help="file for saving the new certificate")
    subargp_cert.add_argument("--password", action="store", type=str,
        required=True, help="password for application")

    return argp.parse_args()


if __name__ == "__main__":
    args = get_args()
    config = os.path.join(os.path.dirname(__file__), args.config or "warden_ra.cfg")
    server = build_server(read_cfg(config))
    registry = server.handler.registry
    if args.verbose:
        print(registry)
    command = args.command
    subargs = vars(args)
    del subargs["command"]
    del subargs["config"]
    sys.exit(command(registry, **subargs))