diff --git a/warden3/contrib/warden_ra/warden_ra.py b/warden3/contrib/warden_ra/warden_ra.py index bb340842aa667f2fc0facf80592d205fcf4ffa0c..24a7f838cf92bba130e82123f319f46268bf997a 100755 --- a/warden3/contrib/warden_ra/warden_ra.py +++ b/warden3/contrib/warden_ra/warden_ra.py @@ -69,78 +69,6 @@ class Client(object): return str(self) + (str(self.opaque) if self.opaque and verbose else "") -class EjbcaRegistry(OpenSSLRegistry): - - status_ejbca_to_str = { - ejbcaws.STATUS_NEW: "Issuable", - ejbcaws.STATUS_GENERATED: "Passive", - ejbcaws.STATUS_INITIALIZED: "New", - ejbcaws.STATUS_HISTORICAL: "Disabled" - } - status_str_to_ejbca = dict((v, k) for k, v in status_ejbca_to_str.items()) - - def __init__(self, log, url, cert=None, key=None, - ca_name="", certificate_profile_name="", end_entity_profile_name="", - subject_dn_template="%s", username_suffix=""): - self.log = log - self.ejbca = ejbcaws.Ejbca(url, cert, key) - self.ca_name = ca_name - self.certificate_profile_name = certificate_profile_name - self.end_entity_profile_name = end_entity_profile_name - self.subject_dn_template = subject_dn_template - self.username_suffix = username_suffix - - def client_data(self, ejbca_data): - ejbca_username = ejbca_data["username"] - username = ejbca_username[:-len(self.username_suffix)] if ejbca_username.endswith(self.username_suffix) else ejbca_username - admins = [u if not u.startswith("RFC822NAME") else u[11:] for u in ejbca_data["subjectAltName"].split(",")] - status = self.status_ejbca_to_str.get(ejbca_data["status"], "Other") - return username, admins, status, None, ejbca_data - - def get_clients(self): - return [Client(*self.client_data(u)) for u in self.ejbca.get_users()] - - def get_client(self, name): - users = self.ejbca.find_user(ejbcaws.MATCH_WITH_USERNAME, ejbcaws.MATCH_TYPE_EQUALS, name + self.username_suffix) - if len(users) > 1: - raise LookupError("%d users %s found (more than one?!)" % (len(users), name)) - if not users: - return None - return Client(*self.client_data(users[0])) - - def save_client(self, client): - edata = client.opaque or dict( - caName=self.ca_name, - certificateProfileName=self.certificate_profile_name, - endEntityProfileName=self.end_entity_profile_name, - keyRecoverable=False, - sendNotification=False, - tokenType=ejbcaws.TOKEN_TYPE_USERGENERATED, - password = "".join((random.choice(string.ascii_letters + string.digits) for dummy in range(16))), - clearPwd = True, - username = client.name + self.username_suffix, - subjectDN = self.subject_dn_template % client.name - ) - edata["subjectAltName"] = ",".join(("RFC822NAME=%s" % a for a in client.admins)) - edata["status"] = self.status_str_to_ejbca.get(client.status, edata["status"]) - if client.pwd: - edata["password"] = client.pwd - edata["clearPwd"] = True - self.ejbca.edit_user(edata) - - def get_certs(self, client): - return self.ejbca.find_certs(client.opaque["username"], validOnly=False) - - def new_cert(self, client, csr, pwd): - cert = self.ejbca.pkcs10_request( - client.opaque["username"], - pwd, csr, 0, ejbcaws.RESPONSETYPE_CERTIFICATE) - return cert - - def __str__(self): - return self.ejbca.get_version() - - class OpenSSLRegistry(object): def __init__(self, log, base_dir, @@ -251,6 +179,78 @@ class OpenSSLRegistry(object): return "%s<%s>" % (type(self).__name__, self.base_dir) +class EjbcaRegistry(OpenSSLRegistry): + + status_ejbca_to_str = { + ejbcaws.STATUS_NEW: "Issuable", + ejbcaws.STATUS_GENERATED: "Passive", + ejbcaws.STATUS_INITIALIZED: "New", + ejbcaws.STATUS_HISTORICAL: "Disabled" + } + status_str_to_ejbca = dict((v, k) for k, v in status_ejbca_to_str.items()) + + def __init__(self, log, url, cert=None, key=None, + ca_name="", certificate_profile_name="", end_entity_profile_name="", + subject_dn_template="%s", username_suffix=""): + self.log = log + self.ejbca = ejbcaws.Ejbca(url, cert, key) + self.ca_name = ca_name + self.certificate_profile_name = certificate_profile_name + self.end_entity_profile_name = end_entity_profile_name + self.subject_dn_template = subject_dn_template + self.username_suffix = username_suffix + + def client_data(self, ejbca_data): + ejbca_username = ejbca_data["username"] + username = ejbca_username[:-len(self.username_suffix)] if ejbca_username.endswith(self.username_suffix) else ejbca_username + admins = [u if not u.startswith("RFC822NAME") else u[11:] for u in ejbca_data["subjectAltName"].split(",")] + status = self.status_ejbca_to_str.get(ejbca_data["status"], "Other") + return username, admins, status, None, ejbca_data + + def get_clients(self): + return [Client(*self.client_data(u)) for u in self.ejbca.get_users()] + + def get_client(self, name): + users = self.ejbca.find_user(ejbcaws.MATCH_WITH_USERNAME, ejbcaws.MATCH_TYPE_EQUALS, name + self.username_suffix) + if len(users) > 1: + raise LookupError("%d users %s found (more than one?!)" % (len(users), name)) + if not users: + return None + return Client(*self.client_data(users[0])) + + def save_client(self, client): + edata = client.opaque or dict( + caName=self.ca_name, + certificateProfileName=self.certificate_profile_name, + endEntityProfileName=self.end_entity_profile_name, + keyRecoverable=False, + sendNotification=False, + tokenType=ejbcaws.TOKEN_TYPE_USERGENERATED, + password = "".join((random.choice(string.ascii_letters + string.digits) for dummy in range(16))), + clearPwd = True, + username = client.name + self.username_suffix, + subjectDN = self.subject_dn_template % client.name + ) + edata["subjectAltName"] = ",".join(("RFC822NAME=%s" % a for a in client.admins)) + edata["status"] = self.status_str_to_ejbca.get(client.status, edata["status"]) + if client.pwd: + edata["password"] = client.pwd + edata["clearPwd"] = True + self.ejbca.edit_user(edata) + + def get_certs(self, client): + return self.ejbca.find_certs(client.opaque["username"], validOnly=False) + + def new_cert(self, client, csr, pwd): + cert = self.ejbca.pkcs10_request( + client.opaque["username"], + pwd, csr, 0, ejbcaws.RESPONSETYPE_CERTIFICATE) + return cert + + def __str__(self): + return self.ejbca.get_version() + + def format_cert(cert): return ( "Subject: %s\n"