Skip to content
Snippets Groups Projects
Commit 92c7487c authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Warden certificate registration authority initial commit

parent d069a1e6
Branches
Tags
No related merge requests found
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016, CESNET, z. s. p. o.
# Use of this source is governed by an ISC license, see LICENSE file.
import urllib2
import httplib
import socket
import base64
import suds.transport.http
import suds.client
import M2Crypto
STATUS_FAILED = 11
STATUS_GENERATED = 40
STATUS_HISTORICAL = 60
STATUS_INITIALIZED = 20
STATUS_INPROCESS = 30
STATUS_KEYRECOVERY = 70
STATUS_NEW = 10
STATUS_REVOKED = 50
MATCH_TYPE_BEGINSWITH = 1
MATCH_TYPE_CONTAINS = 2
MATCH_TYPE_EQUALS = 0
MATCH_WITH_CA = 5
MATCH_WITH_CERTIFICATEPROFILE = 4
MATCH_WITH_COMMONNAME = 101
MATCH_WITH_COUNTRY = 112
MATCH_WITH_DIRECTORYNAME = 204
MATCH_WITH_DN = 7
MATCH_WITH_DNSERIALNUMBER = 102
MATCH_WITH_DNSNAME = 201
MATCH_WITH_DOMAINCOMPONENT = 111
MATCH_WITH_EDIPARTNAME = 205
MATCH_WITH_EMAIL = 1
MATCH_WITH_ENDENTITYPROFILE = 3
MATCH_WITH_GIVENNAME = 103
MATCH_WITH_GUID = 209
MATCH_WITH_INITIALS = 104
MATCH_WITH_IPADDRESS = 202
MATCH_WITH_LOCALE = 109
MATCH_WITH_ORGANIZATION = 108
MATCH_WITH_ORGANIZATIONUNIT = 107
MATCH_WITH_REGISTEREDID = 207
MATCH_WITH_RFC822NAME = 200
MATCH_WITH_STATE = 110
MATCH_WITH_STATUS = 2
MATCH_WITH_SURNAME = 105
MATCH_WITH_TITLE = 106
MATCH_WITH_TOKEN = 6
MATCH_WITH_UID = 100
MATCH_WITH_UPN = 208
MATCH_WITH_URI = 206
MATCH_WITH_USERNAME = 0
MATCH_WITH_X400ADDRESS = 203
TOKEN_TYPE_JKS = "JKS"
TOKEN_TYPE_P12 = "P12"
TOKEN_TYPE_PEM = "PEM"
TOKEN_TYPE_USERGENERATED = "USERGENERATED"
VIEW_RIGHTS = "/view_end_entity"
EDIT_RIGHTS = "/edit_end_entity"
CREATE_RIGHTS = "/create_end_entity"
DELETE_RIGHTS = "/delete_end_entity"
REVOKE_RIGHTS = "/revoke_end_entity"
HISTORY_RIGHTS = "/view_end_entity_history"
APPROVAL_RIGHTS = "/approve_end_entity"
HARDTOKEN_RIGHTS = "/view_hardtoken"
HARDTOKEN_PUKDATA_RIGHTS = "/view_hardtoken/puk_data"
KEYRECOVERY_RIGHTS = "/keyrecovery"
ENDENTITYPROFILEBASE = "/endentityprofilesrules"
ENDENTITYPROFILEPREFIX = "/endentityprofilesrules/"
USERDATASOURCEBASE = "/userdatasourcesrules"
USERDATASOURCEPREFIX = "/userdatasourcesrules/"
UDS_FETCH_RIGHTS = "/fetch_userdata"
UDS_REMOVE_RIGHTS = "/remove_userdata"
CABASE = "/ca"
CAPREFIX = "/ca/"
ROLE_PUBLICWEBUSER = "/public_web_user"
ROLE_ADMINISTRATOR = "/administrator"
ROLE_SUPERADMINISTRATOR = "/super_administrator"
REGULAR_CAFUNCTIONALTY = "/ca_functionality"
REGULAR_CABASICFUNCTIONS = "/ca_functionality/basic_functions"
REGULAR_ACTIVATECA = "/ca_functionality/basic_functions/activate_ca"
REGULAR_RENEWCA = "/ca_functionality/renew_ca"
REGULAR_VIEWCERTIFICATE = "/ca_functionality/view_certificate"
REGULAR_APPROVECAACTION = "/ca_functionality/approve_caaction"
REGULAR_CREATECRL = "/ca_functionality/create_crl"
REGULAR_EDITCERTIFICATEPROFILES = "/ca_functionality/edit_certificate_profiles"
REGULAR_CREATECERTIFICATE = "/ca_functionality/create_certificate"
REGULAR_STORECERTIFICATE = "/ca_functionality/store_certificate"
REGULAR_RAFUNCTIONALITY = "/ra_functionality"
REGULAR_EDITENDENTITYPROFILES = "/ra_functionality/edit_end_entity_profiles"
REGULAR_EDITUSERDATASOURCES = "/ra_functionality/edit_user_data_sources"
REGULAR_VIEWENDENTITY = "/ra_functionality/view_end_entity"
REGULAR_CREATEENDENTITY = "/ra_functionality/create_end_entity"
REGULAR_EDITENDENTITY = "/ra_functionality/edit_end_entity"
REGULAR_DELETEENDENTITY = "/ra_functionality/delete_end_entity"
REGULAR_REVOKEENDENTITY = "/ra_functionality/revoke_end_entity"
REGULAR_VIEWENDENTITYHISTORY = "/ra_functionality/view_end_entity_history"
REGULAR_APPROVEENDENTITY = "/ra_functionality/approve_end_entity"
REGULAR_LOGFUNCTIONALITY = "/log_functionality"
REGULAR_VIEWLOG = "/log_functionality/view_log"
REGULAR_LOGCONFIGURATION = "/log_functionality/edit_log_configuration"
REGULAR_LOG_CUSTOM_EVENTS = "/log_functionality/log_custom_events"
REGULAR_SYSTEMFUNCTIONALITY = "/system_functionality"
REGULAR_EDITADMINISTRATORPRIVILEDGES = "/system_functionality/edit_administrator_privileges"
REGULAR_EDITSYSTEMCONFIGURATION = "/system_functionality/edit_systemconfiguration"
REGULAR_VIEWHARDTOKENS = "/ra_functionality/view_hardtoken"
REGULAR_VIEWPUKS = "/ra_functionality/view_hardtoken/puk_data"
REGULAR_KEYRECOVERY = "/ra_functionality/keyrecovery"
HARDTOKEN_HARDTOKENFUNCTIONALITY = "/hardtoken_functionality"
HARDTOKEN_EDITHARDTOKENISSUERS = "/hardtoken_functionality/edit_hardtoken_issuers"
HARDTOKEN_EDITHARDTOKENPROFILES = "/hardtoken_functionality/edit_hardtoken_profiles"
HARDTOKEN_ISSUEHARDTOKENS = "/hardtoken_functionality/issue_hardtokens"
HARDTOKEN_ISSUEHARDTOKENADMINISTRATORS = "/hardtoken_functionality/issue_hardtoken_administrators"
RESPONSETYPE_CERTIFICATE = "CERTIFICATE"
RESPONSETYPE_PKCS7 = "PKCS7"
RESPONSETYPE_PKCS7WITHCHAIN = "PKCS7WITHCHAIN"
NOT_REVOKED = -1
REVOKATION_REASON_UNSPECIFIED = 0
REVOKATION_REASON_KEYCOMPROMISE = 1
REVOKATION_REASON_CACOMPROMISE = 2
REVOKATION_REASON_AFFILIATIONCHANGED = 3
REVOKATION_REASON_SUPERSEDED = 4
REVOKATION_REASON_CESSATIONOFOPERATION = 5
REVOKATION_REASON_CERTIFICATEHOLD = 6
REVOKATION_REASON_REMOVEFROMCRL = 8
REVOKATION_REASON_PRIVILEGESWITHDRAWN = 9
REVOKATION_REASON_AACOMPROMISE = 10
class HTTPSClientAuthHandler(urllib2.HTTPSHandler):
def __init__(self, key, cert):
urllib2.HTTPSHandler.__init__(self)
self.key = key
self.cert = cert
def https_open(self, req):
return self.do_open(self.get_connection, req, context=self._context)
def get_connection(self, host, timeout=5, context=None):
return httplib.HTTPSConnection(host, key_file=self.key, cert_file=self.cert, timeout=timeout, context=context)
class HTTPSClientCertTransport(suds.transport.http.HttpTransport):
def __init__(self, key, cert, *args, **kwargs):
suds.transport.http.HttpTransport.__init__(self, *args, **kwargs)
self.key = key
self.cert = cert
def u2open(self, u2request):
tm = self.options.timeout
url = urllib2.build_opener(HTTPSClientAuthHandler(self.key, self.cert))
if self.u2ver() < 2.6:
socket.setdefaulttimeout(tm)
return url.open(u2request)
else:
return url.open(u2request, timeout=tm)
class Ejbca(object):
def __init__(self, url, cert=None, key=None):
self.url = url
self.cert = cert
self.key = key
self.transport = HTTPSClientCertTransport(self.key, self.cert) if self.cert else None
self.wsclient = suds.client.Client(self.url, transport=self.transport)
def get_version(self):
return self.wsclient.service.getEjbcaVersion()
def get_users(self):
return self.find_user(MATCH_WITH_DN, MATCH_TYPE_CONTAINS, "=")
def find_user(self, matchwith, matchtype, matchvalue):
usermatch = self.wsclient.factory.create('userMatch')
usermatch.matchwith = matchwith
usermatch.matchtype = matchtype
usermatch.matchvalue = matchvalue
return self.wsclient.service.findUser(usermatch)
def edit_user(self, user):
return self.wsclient.service.editUser(user)
def _decode_ejbca_cert(self, double_mess):
single_mess = base64.b64decode(double_mess)
cert_data = base64.b64decode(single_mess)
cert = M2Crypto.X509.load_cert_string(cert_data, M2Crypto.X509.FORMAT_DER)
return cert
def pkcs10_request(self, username, password, pkcs10, hardTokenSN, responseType):
res = self.wsclient.service.pkcs10Request(
arg0=username,
arg1=password,
arg2=pkcs10,
arg3=hardTokenSN,
arg4=responseType)
return self._decode_ejbca_cert(res["data"])
def find_certs(self, loginName, validOnly=False):
reslist = self.wsclient.service.findCerts(
arg0=loginName,
arg1=validOnly)
certs = []
for res in reslist:
double_mess = res["certificateData"]
if double_mess is not None:
cert = self._decode_ejbca_cert(double_mess)
cert.ejbca_status = res["type"]
certs.append(cert)
return certs
{
"url": "https://ejbca.example.org/ejbca/ejbcaws/ejbcaws?wsdl",
"cert": "warden_ra.cert.pem",
"key": "warden_ra.key.pem",
"caName": "Example CA",
"certificateProfileName": "Example",
"endEntityProfileName": "Example EE",
"subjectDN_template": "DC=cz,DC=example-ca,DC=warden,CN=%s",
"username_suffix": "@warden"
}
\ No newline at end of file
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016, CESNET, z. s. p. o.
# Use of this source is governed by an ISC license, see LICENSE file.
import sys
import os
import string
import random
import struct
import argparse
import subprocess
import json
# *ph* server vulnerable to logjam, local openssl too new, use hammer to disable Diffie-Helmann
import ssl
ssl._DEFAULT_CIPHERS += ":!DH"
import ejbcaws
class EjbcaClient(object):
def __init__(self, registry, ejbca_data=None):
self.registry = registry
self.ejbca_data = ejbca_data or {}
@property
def admins(self):
return [u if not u.startswith("RFC822NAME") else u[11:] for u in self.ejbca_data["subjectAltName"].split(",")]
@admins.setter
def admins(self, emails):
self.ejbca_data["subjectAltName"] = ",".join(("RFC822NAME=%s" % e for e in emails))
@property
def name(self):
username = self.ejbca_data["username"]
if not username.endswith(self.registry.username_suffix):
raise ValueError(("Ejbca user username does not conform to config", self.ejbca_data))
return username[:-len(self.registry.username_suffix)]
@name.setter
def name(self, new):
self.ejbca_data["username"] = new + self.registry.username_suffix
self.ejbca_data["subjectDN"] = self.registry.subjectDN_template % new
@property
def status(self):
s = self.ejbca_data["status"]
if s == ejbcaws.STATUS_NEW:
return "Issuable"
elif s == ejbcaws.STATUS_GENERATED:
return "Passive"
elif s == ejbcaws.STATUS_INITIALIZED:
return "New"
else:
return "EJBCA status %d" % s
def get_certs(self):
return self.registry.ejbca.find_certs(self.ejbca_data["username"], validOnly=False)
def allow_new_cert(self, pwd=None):
self.ejbca_data["status"] = ejbcaws.STATUS_NEW
if pwd is not None:
self.ejbca_data["password"] = pwd
self.ejbca_data["clearPwd"] = True
def new_cert(self, csr, pwd):
cert = self.registry.ejbca.pkcs10_request(
self.ejbca_data["username"],
pwd, csr, 0, ejbcaws.RESPONSETYPE_CERTIFICATE)
return cert
def __str__(self):
return (
"Client: %s (%s)\n"
"Admins: %s\n"
"Status: %s\n"
) % (
self.name,
self.ejbca_data["subjectDN"],
", ".join(self.admins),
self.status
)
def verbose_str(self):
return "%s\n" % self.ejbca_data
def save(self):
self.registry.ejbca.edit_user(self.ejbca_data)
class EjbcaRegistry(object):
def __init__(self, url, cert=None, key=None,
caName="", certificateProfileName="", endEntityProfileName="",
subjectDN_template="%s", username_suffix=""):
self.ejbca = ejbcaws.Ejbca(url, cert, key)
self.caName = caName
self.certificateProfileName = certificateProfileName
self.endEntityProfileName = endEntityProfileName
self.subjectDN_template = subjectDN_template
self.username_suffix = username_suffix
def get_clients(self):
return (EjbcaClient(registry=self, ejbca_data=data) for data in self.ejbca.get_users())
def get_client(self, name):
users = self.ejbca.find_user(ejbcaws.MATCH_WITH_USERNAME, ejbcaws.MATCH_TYPE_EQUALS, name + self.username_suffix)
if len(users) > 1:
raise LookupError("%d users %s found (more than one?!)" % (len(users), name))
if not users:
return None
return EjbcaClient(registry=self, ejbca_data=users[0])
def new_client(self, name, admins):
user = self.get_client(name)
if user:
raise LookupError("Client %s already exists" % name)
new_ejbca_data = dict(
caName=self.caName,
certificateProfileName=self.certificateProfileName,
endEntityProfileName=self.endEntityProfileName,
keyRecoverable=False,
sendNotification=False,
status=ejbcaws.STATUS_INITIALIZED,
subjectAltName="",
subjectDN="",
tokenType=ejbcaws.TOKEN_TYPE_USERGENERATED,
username="")
client = EjbcaClient(registry=self, ejbca_data=new_ejbca_data)
client.name = name
client.admins = admins
return client
def verbose_str(self):
return self.ejbca.get_version()
def format_cert(cert):
return (
"Subject: %s\n"
"Validity: %s - %s\n"
"Serial: %s\n"
"Fingerprint: md5:%s, sha1:%s\n"
"Issuer: %s\n"
) % (
cert.get_subject().as_text(),
cert.get_not_before().get_datetime().isoformat(),
cert.get_not_after().get_datetime().isoformat(),
":".join(["%02x" % ord(c) for c in struct.pack('!Q', cert.get_serial_number())]),
cert.get_fingerprint("md5"),
cert.get_fingerprint("sha1"),
cert.get_issuer().as_text()
)
# Command line arguments
def list_clients(registry, name=None, verbose=False):
if name is not None:
client = registry.get_client(name)
if client is None:
print "No such client."
return
else:
clients = [client]
else:
clients = registry.get_clients()
for client in clients:
print(client)
if verbose:
print(client.verbose_str())
for cert in sorted(client.get_certs(), key=lambda c: c.get_not_after()):
print(format_cert(cert))
if verbose:
print(cert.as_text())
def register_client(registry, name, admins=None, verbose=False):
try:
client = registry.new_client(name, admins or [])
except LookupError as e:
print(e)
return
client.save()
list_clients(registry, name, verbose)
def applicant(registry, name, password=None, verbose=False):
client = registry.get_client(name)
if password is None:
password = "".join((random.choice(string.ascii_letters + string.digits) for dummy in range(16)))
print("Application password is: %s\n" % password)
client.allow_new_cert(pwd=password)
client.save()
list_clients(registry, name, verbose)
def request(registry, key, csr, verbose=False):
openssl = subprocess.Popen(
[
"openssl", "req", "-new", "-nodes", "-batch",
"-keyout", key,
"-out", csr,
"-config", "/dev/stdin"
], stdin=subprocess.PIPE
)
openssl.stdin.write(
"distinguished_name=req_distinguished_name\n"
"prompt=no\n"
"\n"
"[req_distinguished_name]\n"
"commonName=dummy"
)
openssl.stdin.close()
openssl.wait()
if verbose:
with open(csr, "r") as f:
print(f.read())
def gen_cert(registry, name, csr, cert, password, verbose=False):
with open(csr, "r") as f:
csr_data = f.read()
client = registry.get_client(name)
newcert = client.new_cert(csr_data, password)
print(format_cert(newcert))
if verbose:
print(newcert.as_text())
print(newcert.as_pem())
with open(cert, "w") as f:
f.write(newcert.as_text())
f.write(newcert.as_pem())
def get_args():
argp = argparse.ArgumentParser(
description="Warden server certificate registry", add_help=False)
argp.add_argument("--help", action="help",
help="show this help message and exit")
argp.add_argument("-c", "--config",
help="path to configuration file")
argp.add_argument("-v", "--verbose", action="store_true", default=False,
help="be more chatty")
subargp = argp.add_subparsers(title="commands")
subargp_list = subargp.add_parser("list", add_help=False,
description="List registered clients.",
help="list clients")
subargp_list.set_defaults(command=list_clients)
subargp_list.add_argument("--help", action="help",
help="show this help message and exit")
subargp_list.add_argument("--name", action="store", type=str,
help="client name")
subargp_reg = subargp.add_parser("register", add_help=False,
description="Add client registration entry.",
help="register client")
subargp_reg.set_defaults(command=register_client)
subargp_reg.add_argument("--help", action="help",
help="show this help message and exit")
subargp_reg.add_argument("--name", action="store", type=str,
required=True, help="client name")
subargp_reg.add_argument("--admins", action="store", type=str,
nargs="*", help="administrator list")
subargp_apply = subargp.add_parser("applicant", add_help=False,
description="Set client into certificate application mode and set its password",
help="allow for certificate application")
subargp_apply.set_defaults(command=applicant)
subargp_apply.add_argument("--help", action="help",
help="show this help message and exit")
subargp_apply.add_argument("--name", action="store", type=str,
required=True, help="client name")
subargp_apply.add_argument("--password", action="store", type=str,
help="password for application (will be autogenerated if not set)")
subargp_req = subargp.add_parser("request", add_help=False,
description="Generate certificate request",
help="generate CSR")
subargp_req.set_defaults(command=request)
subargp_req.add_argument("--help", action="help",
help="show this help message and exit")
subargp_req.add_argument("--key", action="store", type=str,
required=True, help="file for saving the key")
subargp_req.add_argument("--csr", action="store", type=str,
required=True, help="file for saving the request")
subargp_cert = subargp.add_parser("gencert", add_help=False,
description="Request new certificate from registry",
help="get new certificate")
subargp_cert.set_defaults(command=gen_cert)
subargp_cert.add_argument("--help", action="help",
help="show this help message and exit")
subargp_cert.add_argument("--name", action="store", type=str,
required=True, help="client name")
subargp_cert.add_argument("--csr", action="store", type=str,
required=True, help="file for saving the request")
subargp_cert.add_argument("--cert", action="store", type=str,
required=True, help="file for saving the new certificate")
subargp_cert.add_argument("--password", action="store", type=str,
required=True, help="password for application")
return argp.parse_args()
def read_cfg(path):
with open(path, "r") as f:
stripcomments = "\n".join((l for l in f if not l.lstrip().startswith(("#", "//"))))
conf = json.loads(stripcomments)
return conf
if __name__ == "__main__":
args = get_args()
config = read_cfg(os.path.join(os.path.dirname(__file__), args.config or "warden_ra.cfg"))
registry = EjbcaRegistry(**config)
if args.verbose:
print(registry)
command = args.command
subargs = vars(args)
del subargs["command"]
del subargs["config"]
sys.exit(command(registry, **subargs))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment