From 4367362775bc1936c63cb0bff16a71affc31f8a2 Mon Sep 17 00:00:00 2001 From: Victor Wagner Date: Sat, 20 Dec 2014 14:58:54 +0300 Subject: [PATCH] Implemented minimal cert extension support --- ctypescrypto/oid.py | 18 ++++++- ctypescrypto/x509.py | 125 +++++++++++++++++++++++++++++++++---------- tests/testx509.py | 35 +++++++++++- 3 files changed, 146 insertions(+), 32 deletions(-) diff --git a/ctypescrypto/oid.py b/ctypescrypto/oid.py index 7d4fc21..6941ce4 100644 --- a/ctypescrypto/oid.py +++ b/ctypescrypto/oid.py @@ -12,7 +12,7 @@ from ctypes import c_char_p, c_void_p, c_int, create_string_buffer __all__ = ['Oid','create','cleanup'] -class Oid: +class Oid(object): """ Represents an OID. It can be consturucted by textual representation like Oid("commonName") or Oid("CN"), @@ -58,11 +58,25 @@ class Oid: " Returns logn name if any " return libcrypto.OBJ_nid2ln(self.nid) def dotted(self): - " Returns dotted-decimal reperesntation " + " Returns dotted-decimal reperesentation " obj=libcrypto.OBJ_nid2obj(self.nid) buf=create_string_buffer(256) libcrypto.OBJ_obj2txt(buf,256,obj,1) return buf.value + @staticmethod + def fromobj(obj): + """ + Creates an OID object from the pointer to ASN1_OBJECT c structure. + Strictly for internal use + """ + nid=libcrypto.OBJ_obj2nid(obj) + if nid==0: + buf=create_string_buffer(80) + l=libcrypto.OBJ_obj2txt(buf,80,obj,1) + oid=create(buf[0:l],buf[0:l],buf[0:l]) + else: + oid=Oid(nid) + return oid def create(dotted,shortname,longname): """ diff --git a/ctypescrypto/x509.py b/ctypescrypto/x509.py index bd056d3..358ce7d 100644 --- a/ctypescrypto/x509.py +++ b/ctypescrypto/x509.py @@ -57,6 +57,9 @@ class _cinf(Structure): ('validity',POINTER(_validity)), ('subject',c_void_p), ('pubkey',c_void_p), + ('issuerUID',c_void_p), + ('subjectUID',c_void_p), + ('extensions',c_void_p), ] class _x509(Structure): @@ -72,7 +75,6 @@ class _x509(Structure): ] _px509 = POINTER(_x509) -# X509_extlist is not exported yet, because is not implemented class X509Error(LibCryptoError): """ Exception, generated when some openssl function fail @@ -157,14 +159,7 @@ class X509Name: entry=libcrypto.X509_NAME_get_entry(self.ptr,key) if entry is None: raise IndexError("name entry index out of range") - obj=libcrypto.X509_NAME_ENTRY_get_object(entry) - nid=libcrypto.OBJ_obj2nid(obj) - if nid==0: - buf=create_string_buffer(80) - len=libcrypto.OBJ_obj2txt(buf,80,obj,1) - oid=Oid(buf[0:len]) - else: - oid=Oid(nid) + oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry)) s=libcrypto.X509_NAME_ENTRY_get_data(entry) b=Membio() libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG) @@ -173,25 +168,89 @@ class X509Name: def __setitem__(self,key,val): if not self.writable: raise ValueError("Attempt to modify constant X509 object") -class X509_extlist: - def __init__(self,ptr): - self.ptr=ptr + +class _x509_ext(Structure): + """ Represens C structure X509_EXTENSION """ + _fields_=[("object",c_void_p), + ("critical",c_int), + ("value",c_void_p)] + +class X509_EXT(object): + """ Python object which represents a certificate extension """ + def __init__(self,ptr,copy=False): + """ Initializes from the pointer to X509_EXTENSION. + If copy is True, creates a copy, otherwise just + stores pointer. + """ + if copy: + self.ptr=libcrypto.X509_EXTENSION_dup(ptr) + else: + self.ptr=cast(ptr,POINTER(_x509_ext)) def __del__(self): - libcrypto.X509_NAME_free(self.ptr) + libcrypto.X509_EXTENSION_free(self.ptr) def __str__(self): - raise NotImplementedError + b=Membio() + libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0) + libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int) + return str(b) + def __unicode__(self): + b=Membio() + libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0) + return unicode(b) + @property + def oid(self): + return Oid.fromobj(self.ptr[0].object) + @property + def critical(self): + return self.ptr[0].critical >0 +class _X509extlist(object): + """ + Represents list of certificate extensions + """ + def __init__(self,cert): + self.cert=cert def __len__(self): - return libcrypto.X509_NAME_entry_count(self.ptr) - - def __getattr__(self,key): - raise NotImplementedError - def __setattr__(self,key,val): - raise NotImplementedError - - - + return libcrypto.X509_get_ext_count(self.cert.cert) + def __getitem__(self,item): + p=libcrypto.X509_get_ext(self.cert.cert,item) + if p is None: + raise IndexError + return X509_EXT(p,True) + def find(self,oid): + """ + Return list of extensions with given Oid + """ + if not isinstance(oid,Oid): + raise TypeError("Need crytypescrypto.oid.Oid as argument") + found=[] + l=-1 + end=len(self) + while True: + l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l) + if l>=end or l<0: + break + found.append(self[l]) + return found + def find_critical(self,crit=True): + """ + Return list of critical extensions (or list of non-cricital, if + optional second argument is False + """ + if crit: + flag=1 + else: + flag=0 + found=[] + end=len(self) + l=-1 + while True: + l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l) + if l>=end or l<0: + break + found.append(self[l]) + return found -class X509: +class X509(object): """ Represents X.509 certificate. """ @@ -217,6 +276,7 @@ class X509: self.cert=libcrypto.d2i_X509_bio(b.bio,None) if self.cert is None: raise X509Error("error reading certificate") + self.extensions=_X509extlist(self) def __del__(self): """ Frees certificate object @@ -313,9 +373,6 @@ class X509: b=Membio() libcrypto.ASN1_TIME_print(b.bio,asn1date) return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc) - def extensions(self): - """ Returns list of extensions """ - raise NotImplementedError def check_ca(self): """ Returns True if certificate is CA certificate """ return libcrypto.X509_check_ca(self.cert)>0 @@ -367,7 +424,7 @@ class X509Store: libcrypto.X509_STORE_add_cert(self.store,cert.cert) def add_callback(self,callback): """ - Installs callbac function, which would receive detailed information + Installs callback function, which would receive detailed information about verified ceritificates """ raise NotImplementedError @@ -392,10 +449,16 @@ class X509Store: if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0: raise X509Error("cannot set purpose") def setdepth(self,depth): + """ + Sets the verification depth i.e. max length of certificate chain + which is acceptable + """ libcrypto.X509_STORE_set_depth(self.store,depth) def settime(self, time): """ Set point in time used to check validity of certificates for + Time can be either python datetime object or number of seconds + sinse epoch """ if isinstance(time,datetime.datetime) or isinstance(time,datetime.date): d=int(time.strftime("%s")) @@ -479,4 +542,8 @@ libcrypto.X509_LOOKUP_file.restype=c_void_p libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p libcrypto.X509_LOOKUP_ctrl.restype=c_int libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p)) - +libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,) +libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext) +libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int) +libcrypto.X509_get_ext.restype=c_void_p +libcrypto.X509_get_ext.argtypes=(c_void_p,c_int) diff --git a/tests/testx509.py b/tests/testx509.py index 011d487..a54417b 100644 --- a/tests/testx509.py +++ b/tests/testx509.py @@ -148,6 +148,40 @@ zVMSW4SOwg/H7ZMZ2cn6j1g0djIvruFQFGHUqFijyDATI+/GJYw2jxyA self.assertTrue(ca.check_ca()) notca=X509(self.cert1) self.assertFalse(notca.check_ca()) + def test_extension_count(self): + cert=X509(self.cert1) + self.assertTrue(len(cert.extensions),4) + ca_cert=X509(self.ca_cert) + self.assertEqual(len(ca_cert.extensions),3) + def test_extension_outofrange(self): + cert=X509(self.cert1) + with self.assertRaises(IndexError): + cert.extensions[4] + with self.assertRaises(IndexError): + cert.extensions[-1] + def test_extension_oid(self): + cert=X509(self.cert1) + ext=cert.extensions[0] + ext_id=ext.oid + self.assertTrue(isinstance(ext_id,Oid)) + self.assertEqual(ext_id,Oid('basicConstraints')) + def text_extension_text(self): + cert=X509(self.cert1) + ext=cert.extensions[0] + self.assertEqual(str(ext),'CA:FALSE') + def test_extenson_find(self): + cert=X509(self.cert1) + exts=cert.extensions.find(Oid('subjectAltName')) + self.assertEqual(len(exts),1) + self.assertEqual(exts[0].oid,Oid('subjectAltName')) + def test_extenson_critical(self): + cert=X509(self.digicert_cert) + crit_exts=cert.extensions.find_critical() + self.assertEqual(len(crit_exts),2) + other_exts=cert.extensions.find_critical(False) + self.assertEqual(len(crit_exts)+len(other_exts),len(cert.extensions)) + self.assertEqual(crit_exts[0].critical,True) + self.assertEqual(other_exts[0].critical,False) def test_verify_by_key(self): ca=X509(self.ca_cert) pubkey=ca.pubkey @@ -181,7 +215,6 @@ zVMSW4SOwg/H7ZMZ2cn6j1g0djIvruFQFGHUqFijyDATI+/GJYw2jxyA # signed by some commercial CA should be rejected too self.assertFalse(gitcert.verify(store)) trusted.close() - pass def test_verify_by_dirstore(self): pass if __name__ == '__main__': -- 2.39.2