Skip to content
Snippets Groups Projects
Commit f700411f authored by Pavel Kácha's avatar Pavel Kácha
Browse files

Fixed grave omission - switched object order

parent ca9a3ffc
No related branches found
No related tags found
No related merge requests found
......@@ -69,78 +69,6 @@ class Client(object):
return str(self) + (str(self.opaque) if self.opaque and verbose else "")
class EjbcaRegistry(OpenSSLRegistry):
status_ejbca_to_str = {
ejbcaws.STATUS_NEW: "Issuable",
ejbcaws.STATUS_GENERATED: "Passive",
ejbcaws.STATUS_INITIALIZED: "New",
ejbcaws.STATUS_HISTORICAL: "Disabled"
}
status_str_to_ejbca = dict((v, k) for k, v in status_ejbca_to_str.items())
def __init__(self, log, url, cert=None, key=None,
ca_name="", certificate_profile_name="", end_entity_profile_name="",
subject_dn_template="%s", username_suffix=""):
self.log = log
self.ejbca = ejbcaws.Ejbca(url, cert, key)
self.ca_name = ca_name
self.certificate_profile_name = certificate_profile_name
self.end_entity_profile_name = end_entity_profile_name
self.subject_dn_template = subject_dn_template
self.username_suffix = username_suffix
def client_data(self, ejbca_data):
ejbca_username = ejbca_data["username"]
username = ejbca_username[:-len(self.username_suffix)] if ejbca_username.endswith(self.username_suffix) else ejbca_username
admins = [u if not u.startswith("RFC822NAME") else u[11:] for u in ejbca_data["subjectAltName"].split(",")]
status = self.status_ejbca_to_str.get(ejbca_data["status"], "Other")
return username, admins, status, None, ejbca_data
def get_clients(self):
return [Client(*self.client_data(u)) for u in self.ejbca.get_users()]
def get_client(self, name):
users = self.ejbca.find_user(ejbcaws.MATCH_WITH_USERNAME, ejbcaws.MATCH_TYPE_EQUALS, name + self.username_suffix)
if len(users) > 1:
raise LookupError("%d users %s found (more than one?!)" % (len(users), name))
if not users:
return None
return Client(*self.client_data(users[0]))
def save_client(self, client):
edata = client.opaque or dict(
caName=self.ca_name,
certificateProfileName=self.certificate_profile_name,
endEntityProfileName=self.end_entity_profile_name,
keyRecoverable=False,
sendNotification=False,
tokenType=ejbcaws.TOKEN_TYPE_USERGENERATED,
password = "".join((random.choice(string.ascii_letters + string.digits) for dummy in range(16))),
clearPwd = True,
username = client.name + self.username_suffix,
subjectDN = self.subject_dn_template % client.name
)
edata["subjectAltName"] = ",".join(("RFC822NAME=%s" % a for a in client.admins))
edata["status"] = self.status_str_to_ejbca.get(client.status, edata["status"])
if client.pwd:
edata["password"] = client.pwd
edata["clearPwd"] = True
self.ejbca.edit_user(edata)
def get_certs(self, client):
return self.ejbca.find_certs(client.opaque["username"], validOnly=False)
def new_cert(self, client, csr, pwd):
cert = self.ejbca.pkcs10_request(
client.opaque["username"],
pwd, csr, 0, ejbcaws.RESPONSETYPE_CERTIFICATE)
return cert
def __str__(self):
return self.ejbca.get_version()
class OpenSSLRegistry(object):
def __init__(self, log, base_dir,
......@@ -251,6 +179,78 @@ class OpenSSLRegistry(object):
return "%s<%s>" % (type(self).__name__, self.base_dir)
class EjbcaRegistry(OpenSSLRegistry):
status_ejbca_to_str = {
ejbcaws.STATUS_NEW: "Issuable",
ejbcaws.STATUS_GENERATED: "Passive",
ejbcaws.STATUS_INITIALIZED: "New",
ejbcaws.STATUS_HISTORICAL: "Disabled"
}
status_str_to_ejbca = dict((v, k) for k, v in status_ejbca_to_str.items())
def __init__(self, log, url, cert=None, key=None,
ca_name="", certificate_profile_name="", end_entity_profile_name="",
subject_dn_template="%s", username_suffix=""):
self.log = log
self.ejbca = ejbcaws.Ejbca(url, cert, key)
self.ca_name = ca_name
self.certificate_profile_name = certificate_profile_name
self.end_entity_profile_name = end_entity_profile_name
self.subject_dn_template = subject_dn_template
self.username_suffix = username_suffix
def client_data(self, ejbca_data):
ejbca_username = ejbca_data["username"]
username = ejbca_username[:-len(self.username_suffix)] if ejbca_username.endswith(self.username_suffix) else ejbca_username
admins = [u if not u.startswith("RFC822NAME") else u[11:] for u in ejbca_data["subjectAltName"].split(",")]
status = self.status_ejbca_to_str.get(ejbca_data["status"], "Other")
return username, admins, status, None, ejbca_data
def get_clients(self):
return [Client(*self.client_data(u)) for u in self.ejbca.get_users()]
def get_client(self, name):
users = self.ejbca.find_user(ejbcaws.MATCH_WITH_USERNAME, ejbcaws.MATCH_TYPE_EQUALS, name + self.username_suffix)
if len(users) > 1:
raise LookupError("%d users %s found (more than one?!)" % (len(users), name))
if not users:
return None
return Client(*self.client_data(users[0]))
def save_client(self, client):
edata = client.opaque or dict(
caName=self.ca_name,
certificateProfileName=self.certificate_profile_name,
endEntityProfileName=self.end_entity_profile_name,
keyRecoverable=False,
sendNotification=False,
tokenType=ejbcaws.TOKEN_TYPE_USERGENERATED,
password = "".join((random.choice(string.ascii_letters + string.digits) for dummy in range(16))),
clearPwd = True,
username = client.name + self.username_suffix,
subjectDN = self.subject_dn_template % client.name
)
edata["subjectAltName"] = ",".join(("RFC822NAME=%s" % a for a in client.admins))
edata["status"] = self.status_str_to_ejbca.get(client.status, edata["status"])
if client.pwd:
edata["password"] = client.pwd
edata["clearPwd"] = True
self.ejbca.edit_user(edata)
def get_certs(self, client):
return self.ejbca.find_certs(client.opaque["username"], validOnly=False)
def new_cert(self, client, csr, pwd):
cert = self.ejbca.pkcs10_request(
client.opaque["username"],
pwd, csr, 0, ejbcaws.RESPONSETYPE_CERTIFICATE)
return cert
def __str__(self):
return self.ejbca.get_version()
def format_cert(cert):
return (
"Subject: %s\n"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment