Skip to content
Snippets Groups Projects
warden_ra.py 12.9 KiB
Newer Older
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016, CESNET, z. s. p. o.
# Use of this source is governed by an ISC license, see LICENSE file.

import sys
import os
import string
import random
import struct
import argparse
import subprocess
import json
# *ph* server vulnerable to logjam, local openssl too new, use hammer to disable Diffie-Helmann
import ssl
ssl._DEFAULT_CIPHERS += ":!DH"

import ejbcaws

sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..", "warden_server"))
from warden_server import Request, ObjectReq, StreamLogger, FileLogger, Server, expose


class EjbcaClient(object):

    def __init__(self, registry, ejbca_data=None):
        self.registry = registry
        self.ejbca_data = ejbca_data or {}

    @property
    def admins(self):
        return [u if not u.startswith("RFC822NAME") else u[11:] for u in self.ejbca_data["subjectAltName"].split(",")]

    @admins.setter
    def admins(self, emails):
        self.ejbca_data["subjectAltName"] = ",".join(("RFC822NAME=%s" % e for e in emails))

    @property
    def name(self):
        username = self.ejbca_data["username"]
        if not username.endswith(self.registry.username_suffix):
            raise ValueError(("Ejbca user username does not conform to config", self.ejbca_data))
        return username[:-len(self.registry.username_suffix)]

    @name.setter
    def name(self, new):
        self.ejbca_data["username"] = new + self.registry.username_suffix
        self.ejbca_data["subjectDN"] = self.registry.subjectDN_template % new

    @property
    def status(self):
        s = self.ejbca_data["status"]
        if s == ejbcaws.STATUS_NEW:
            return "Issuable"
        elif s == ejbcaws.STATUS_GENERATED:
            return "Passive"
        elif s == ejbcaws.STATUS_INITIALIZED:
            return "New"
        else:
            return "EJBCA status %d" % s

    def get_certs(self):
        return self.registry.ejbca.find_certs(self.ejbca_data["username"], validOnly=False)

    def allow_new_cert(self, pwd=None):
        self.ejbca_data["status"] = ejbcaws.STATUS_NEW
        if pwd is not None:
            self.ejbca_data["password"] = pwd
            self.ejbca_data["clearPwd"] = True

    def new_cert(self, csr, pwd):
        cert = self.registry.ejbca.pkcs10_request(
            self.ejbca_data["username"],
            pwd, csr, 0, ejbcaws.RESPONSETYPE_CERTIFICATE)
        return cert

    def __str__(self):
        return (
            "Client:   %s (%s)\n"
            "Admins:   %s\n"
            "Status:   %s\n"
        ) % (
            self.name,
            self.ejbca_data["subjectDN"],
            ", ".join(self.admins),
            self.status
        )

    def verbose_str(self):
        return "%s\n" % self.ejbca_data

    def save(self):
        self.registry.ejbca.edit_user(self.ejbca_data)


class EjbcaRegistry(object):

    def __init__(self, url, cert=None, key=None,
                 caName="", certificateProfileName="", endEntityProfileName="",
                 subjectDN_template="%s", username_suffix=""):
        self.ejbca = ejbcaws.Ejbca(url, cert, key)
        self.caName = caName
        self.certificateProfileName = certificateProfileName
        self.endEntityProfileName = endEntityProfileName
        self.subjectDN_template = subjectDN_template
        self.username_suffix = username_suffix

    def get_clients(self):
        return (EjbcaClient(registry=self, ejbca_data=data) for data in self.ejbca.get_users())

    def get_client(self, name):
        users = self.ejbca.find_user(ejbcaws.MATCH_WITH_USERNAME, ejbcaws.MATCH_TYPE_EQUALS, name + self.username_suffix)
        if len(users) > 1:
            raise LookupError("%d users %s found (more than one?!)" % (len(users), name))
        if not users:
            return None
        return EjbcaClient(registry=self, ejbca_data=users[0])

    def new_client(self, name, admins):
        user = self.get_client(name)
        if user:
            raise LookupError("Client %s already exists" % name)
        new_ejbca_data = dict(
            caName=self.caName,
            certificateProfileName=self.certificateProfileName,
            endEntityProfileName=self.endEntityProfileName,
            keyRecoverable=False,
            sendNotification=False,
            status=ejbcaws.STATUS_INITIALIZED,
            subjectAltName="",
            subjectDN="",
            tokenType=ejbcaws.TOKEN_TYPE_USERGENERATED,
            username="",
            password = "".join((random.choice(string.ascii_letters + string.digits) for dummy in range(16))),
            clearPwd = True
        )
        client = EjbcaClient(registry=self, ejbca_data=new_ejbca_data)
        client.name = name
        client.admins = admins
        return client

    def verbose_str(self):
        return self.ejbca.get_version()


def format_cert(cert):
    return (
        "Subject:     %s\n"
        "Validity:    %s - %s\n"
        "Serial:      %s\n"
        "Fingerprint: md5:%s, sha1:%s\n"
        "Issuer:      %s\n"
    ) % (
        cert.get_subject().as_text(),
        cert.get_not_before().get_datetime().isoformat(),
        cert.get_not_after().get_datetime().isoformat(),
        ":".join(["%02x" % ord(c) for c in struct.pack('!Q', cert.get_serial_number())]),
        cert.get_fingerprint("md5"),
        cert.get_fingerprint("sha1"),
        cert.get_issuer().as_text()
    )

# Server side

class NullAuthenticator(ObjectReq):

    def __init__(self, req):
        ObjectReq.__init__(self, req)


    def __str__(self):
        return "%s(req=%s)" % (type(self).__name__, type(self.req).__name__)


    def authenticate(self, env, args):
        return True


    def authorize(self, env, client, path, method):
        return True


class CertHandler(ObjectReq):

    def __init__(self, req, registry):
        ObjectReq.__init__(self, req)
        self.registry = registry

    @expose(read=1, debug=1)
    def getCert(self, csr_data=None, name=None, password=None):
        if not (name and password and csr_data):
            raise self.req.error(message="Wrong or missing arguments", error=400)
        client = self.registry.get_client(name[0])
        if not client:
            raise self.req.error(message="Unknown client", error=403)
        try:
            newcert = client.new_cert(csr_data, password)
        except Exception as e:
            raise self.req.error(message="Processing error", error=403, cause=e)
        return [("Content-Type", "application/x-x509-user-cert")], newcert.as_pem()


def build_server(conf):
    StreamLogger()
    req = Request()
    log = FileLogger(
        req,
        filename=os.path.join(os.path.dirname(__file__), os.path.splitext(os.path.split(__file__)[1])[0] + ".log"),
        level=logging.DEBUG)
    auth = NullAuthenticator(req)
    registry = EjbcaRegistry(**conf)
    handler = CertHandler(req, registry)
    server = Server(req, auth, handler)
    return server


def list_clients(registry, name=None, verbose=False):
    if name is not None:
        client = registry.get_client(name)
        if client is None:
            print "No such client."
            return
        else:
            clients = [client]
    else:
        clients = registry.get_clients()
    for client in clients:
        print(client)
        if verbose:
            print(client.verbose_str())
        for cert in sorted(client.get_certs(), key=lambda c: c.get_not_after().get_datetime()):
            print(format_cert(cert))
            if verbose:
                print(cert.as_text())


def register_client(registry, name, admins=None, verbose=False):
    try:
        client = registry.new_client(name, admins or [])
    except LookupError as e:
        print(e)
        return
    client.save()
    list_clients(registry, name, verbose)


def applicant(registry, name, password=None, verbose=False):
    client = registry.get_client(name)
    if password is None:
        password = "".join((random.choice(string.ascii_letters + string.digits) for dummy in range(16)))
        print("Application password is: %s\n" % password)
    client.allow_new_cert(pwd=password)
    client.save()
    list_clients(registry, name, verbose)


def request(registry, key, csr, verbose=False):
    openssl = subprocess.Popen(
        [
            "openssl", "req", "-new", "-nodes", "-batch",
            "-keyout", key,
            "-out", csr,
            "-config", "/dev/stdin"
        ], stdin=subprocess.PIPE
    )
    openssl.stdin.write(
        "distinguished_name=req_distinguished_name\n"
        "prompt=no\n"
        "\n"
        "[req_distinguished_name]\n"
        "commonName=dummy"
    )
    openssl.stdin.close()
    openssl.wait()
    if verbose:
        with open(csr, "r") as f:
            print(f.read())


def gen_cert(registry, name, csr, cert, password, verbose=False):
    with open(csr, "r") as f:
        csr_data = f.read()
    client = registry.get_client(name)
    newcert = client.new_cert(csr_data, password)
    print(format_cert(newcert))
    if verbose:
        print(newcert.as_text())
        print(newcert.as_pem())
    with open(cert, "w") as f:
        f.write(newcert.as_text())
        f.write(newcert.as_pem())


def get_args():
    argp = argparse.ArgumentParser(
        description="Warden server certificate registry", add_help=False)
    argp.add_argument("--help", action="help",
        help="show this help message and exit")
    argp.add_argument("-c", "--config",
        help="path to configuration file")
    argp.add_argument("-v", "--verbose", action="store_true", default=False,
        help="be more chatty")
    subargp = argp.add_subparsers(title="commands")

    subargp_list = subargp.add_parser("list", add_help=False,
        description="List registered clients.",
        help="list clients")
    subargp_list.set_defaults(command=list_clients)
    subargp_list.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_list.add_argument("--name", action="store", type=str,
        help="client name")

    subargp_reg = subargp.add_parser("register", add_help=False,
        description="Add client registration entry.",
        help="register client")
    subargp_reg.set_defaults(command=register_client)
    subargp_reg.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_reg.add_argument("--name", action="store", type=str,
        required=True, help="client name")
    subargp_reg.add_argument("--admins", action="store", type=str,
        nargs="*", help="administrator list")

    subargp_apply = subargp.add_parser("applicant", add_help=False,
        description="Set client into certificate application mode and set its password",
        help="allow for certificate application")
    subargp_apply.set_defaults(command=applicant)
    subargp_apply.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_apply.add_argument("--name", action="store", type=str,
        required=True, help="client name")
    subargp_apply.add_argument("--password", action="store", type=str,
        help="password for application (will be autogenerated if not set)")

    subargp_req = subargp.add_parser("request", add_help=False,
        description="Generate certificate request",
        help="generate CSR")
    subargp_req.set_defaults(command=request)
    subargp_req.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_req.add_argument("--key", action="store", type=str,
        required=True, help="file for saving the key")
    subargp_req.add_argument("--csr", action="store", type=str,
        required=True, help="file for saving the request")

    subargp_cert = subargp.add_parser("gencert", add_help=False,
        description="Request new certificate from registry",
        help="get new certificate")
    subargp_cert.set_defaults(command=gen_cert)
    subargp_cert.add_argument("--help", action="help",
        help="show this help message and exit")
    subargp_cert.add_argument("--name", action="store", type=str,
        required=True, help="client name")
    subargp_cert.add_argument("--csr", action="store", type=str,
        required=True, help="file for saving the request")
    subargp_cert.add_argument("--cert", action="store", type=str,
        required=True, help="file for saving the new certificate")
    subargp_cert.add_argument("--password", action="store", type=str,
        required=True, help="password for application")

    return argp.parse_args()


def read_cfg(path):
    with open(path, "r") as f:
        stripcomments = "\n".join((l for l in f if not l.lstrip().startswith(("#", "//"))))
        conf = json.loads(stripcomments)
    return conf


if __name__ == "__main__":
    args = get_args()
    config = read_cfg(os.path.join(os.path.dirname(__file__), args.config or "warden_ra.cfg"))
    registry = EjbcaRegistry(**config)
    if args.verbose:
        print(registry)
    command = args.command
    subargs = vars(args)
    del subargs["command"]
    del subargs["config"]
    sys.exit(command(registry, **subargs))