]> www.wagner.pp.ru Git - oss/ctypescrypto.git/commitdiff
Partially implemented X509 object. Added unicode support to BIO
authorVictor Wagner <wagner@atlas-kard.ru>
Fri, 24 Oct 2014 13:50:59 +0000 (17:50 +0400)
committerVictor Wagner <wagner@atlas-kard.ru>
Fri, 24 Oct 2014 13:50:59 +0000 (17:50 +0400)
ctypescrypto/bio.py
ctypescrypto/x509.py
tests/testbio.py

index 642b96b72c140c5c74382bf01a1c5fd8823d6c55..f8150f72cebc25db4b8604a33c88f6a1c64f727d 100644 (file)
@@ -3,12 +3,12 @@ from ctypes import c_char_p, c_void_p, c_int, string_at, c_long,POINTER,byref, c
 class Membio:
        """ 
                Provides interface to OpenSSL memory bios 
-               use str() to get contents of writable bio
+               use str() or unicode() to get contents of writable bio
                use bio member to pass to libcrypto function
        """
        def __init__(self,data=None):
                """ If data is specified, creates read-only BIO. If data is
-                       None, creates writable BIO
+                       None, creates writable BIO, contents of which can be retrieved by str() or unicode()
                """
                if data is None:
                        method=libcrypto.BIO_s_mem()
@@ -16,13 +16,28 @@ class Membio:
                else:
                        self.bio=libcrypto.BIO_new_mem_buf(c_char_p(data),len(data))
        def __del__(self):
+               """
+               Cleans up memory used by bio
+               """
                libcrypto.BIO_free(self.bio)
                del(self.bio)
        def __str__(self):
+               """
+               Returns current contents of buffer as byte string
+               """
                p=c_char_p(None)
                l=libcrypto.BIO_ctrl(self.bio,3,0,byref(p))
                return string_at(p,l)
+       def __unicode__(self):
+               """
+               Attempts to interpret current contents of buffer as UTF-8 string and convert it to unicode
+               """
+               return str(self).decode("utf-8")
        def read(self,length=None):
+               """
+               Reads data from readble BIO. For test purposes.
+               @param length - if specifed, limits amount of data read. If not BIO is read until end of buffer
+               """
                if not length is None:
                        if type(length)!=type(0):
                                raise TypeError("length to read should be number")
@@ -50,13 +65,21 @@ class Membio:
                        return out      
 
        def write(self,data):
+               """
+               Writes data to writable bio. For test purposes
+               """
+               if isinstance(data,unicode):
+                       data=data.encode("utf-8")
                r=libcrypto.BIO_write(self.bio,data,len(data))
                if r==-2:
                        raise NotImplementedError("Function not supported by this BIO")
                if r<len(data):
                        raise IOError("Not all data were successfully written")
-
-#FIXME TODO - BIO should have stream-like interface
+       def reset(self):
+               """
+               Resets the read-only bio to start and discards all data from writable bio
+               """
+               libcrypto.BIO_ctrl(self.bio,1,0,None)
 libcrypto.BIO_s_mem.restype=c_void_p
 libcrypto.BIO_new.restype=c_void_p
 libcrypto.BIO_new.argtypes=(c_void_p,)
@@ -67,3 +90,4 @@ libcrypto.BIO_write.argtypes=(c_void_p,c_char_p,c_int)
 libcrypto.BIO_free.argtypes=(c_void_p,)
 libcrypto.BIO_new_mem_buf.restype=c_void_p
 libcrypto.BIO_new_mem_buf.argtypes=(c_char_p,c_int)
+libcrypto.BIO_ctrl.argtypes=(c_void_p,c_int,c_int,c_void_p)
index 04addada19460549d89982b9f1ae3134d858a791..b933a134678e4823ed4b917d5584460402ca2e13 100644 (file)
@@ -1,4 +1,4 @@
-from ctypes import c_void_p
+from ctypes import c_void_p,create_string_buffer,c_long,c_int
 from ctypescrypto.bio import Membio
 from ctypescrypto.pkey import PKey
 from ctypescrypto.oid import Oid
@@ -6,54 +6,131 @@ from ctypescrypto.exception import LibCryptoError
 from ctypescrypto import libcrypto
 
 class X509Error(LibCryptoError):
+       """
+       Exception, generated when some openssl function fail
+       during X509 operation
+       """
        pass
 
 
 class X509Name:
-       def __init__(self,ptr):
-               self.ptr=ptr
+       """
+       Class which represents X.509 distinguished name - typically 
+       a certificate subject name or an issuer name.
+       """
+       # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
+       PRINT_FLAG=0x10010
+       ESC_MSB=4
+       def __init__(self,ptr=None,copy=False):
+               """
+               Creates a X509Name object
+               @param ptr - pointer to X509_NAME C structure (as returned by some  OpenSSL functions
+               @param copy - indicates that this structure have to be freed upon object destruction
+               """
+               if ptr is not None:
+                       self.ptr=ptr
+                       self.need_free=copy
+                       self.writable=False
+               else:
+                       self.ptr=libcrypto.X509_NAME_new()
+                       self.need_free=True
+                       self.writable=True
        def __del__(self):
-               libcrypto.X509_NAME_free(self.ptr)
+               """
+               Frees if neccessary
+               """
+               if self.need_free:
+                       libcrypto.X509_NAME_free(self.ptr)
        def __str__(self):
+               """
+               Produces an ascii representation of the name, escaping all symbols > 0x80
+               Probably it is not what you want, unless your native language is English
+               """
                b=Membio()
-               libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,PRING_FLAG)
-               return str(b).decode("utf-8")
-
+               libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
+               return str(b)
+       def __unicode__(self):
+               """
+               Produces unicode representation of the name. 
+               """
+               b=Membio()
+               libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
+               return unicode(b)
        def __len__(self):
+               """
+               return number of components in the name
+               """
                return libcrypto.X509_NAME_entry_count(self.ptr)
+       def __cmp__(self,other):
+               """
+               Compares X509 names
+               """
+               return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
+       def __eq__(self,other):
+               return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
 
-       def __getattr__(self,key):
+       def __getitem__(self,key):
                if isinstance(key,Oid):
-               # Return list of strings
-                       raise NotImpemented     
+                       # Return first matching field
+                       idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
+                       if idx<0:
+                               raise KeyError("Key not found "+repr(Oid))
+                       entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
+                       s=libcrypto.X509_NAME_ENTRY_get_data(entry)
+                       b=Membio()
+                       libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
+                       return unicode(b)
                elif isinstance(key,int):
-                       # Return OID, sting tuple
-                       raise NotImplemented
-               else:
-                       raise TypeError("X509 name can be indexed with oids and numbers only")
+                       # Return OID, string tuple
+                       entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
+                       if entry is None:
+                               raise IndexError("name entry index out of range")
+                       obj=libcrypto.X509_NAME_ENTRY_get_object(entry)
+                       nid=libcrypto.OBJ_obj2nid(obj)
+                       if nid==0:
+                               buf=create_string_buffer(80)
+                               len=libcrypto.OBJ_obj2txt(buf,80,obj,1)
+                               oid=Oid(buf[0:len])
+                       else:
+                               oid=Oid(nid)
+                       s=libcrypto.X509_NAME_ENTRY_get_data(entry)
+                       b=Membio()
+                       libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
+                       return (oid,unicode(b))
 
-       def __setattr__(self,key,val):
-               pass
+       def __setitem__(self,key,val):
+               if not self.writable:
+                       raise ValueError("Attempt to modify constant X509 object")
 class X509_extlist:
        def __init__(self,ptr):
                self.ptr=ptr
        def __del__(self):
                libcrypto.X509_NAME_free(self.ptr)
        def __str__(self):
-               raise NotImplemented
+               raise NotImplementedError
        def __len__(self):
                return libcrypto.X509_NAME_entry_count(self.ptr)
 
        def __getattr__(self,key):
-               raise NotImplemented
+               raise NotImplementedError
        def __setattr__(self,key,val):
-               raise NotImplemented
+               raise NotImplementedError
 
        
 
 
 class X509:
+       """
+       Represents X.509 certificate. 
+       """
        def __init__(self,data=None,ptr=None,format="PEM"):
+               """
+               Initializes certificate
+               @param data - serialized certificate in PEM or DER format.
+               @param ptr - pointer to X509, returned by some openssl function. 
+                       mutually exclusive with data
+               @param format - specifies data format. "PEM" or "DER", default PEM
+               """
                if ptr is not None:
                        if data is not None: 
                                raise TypeError("Cannot use data and ptr simultaneously")
@@ -69,18 +146,54 @@ class X509:
                        if self.cert is None:
                                raise X509Error("error reading certificate")
        def __del__(self):
+               """
+               Frees certificate object
+               """
                libcrypto.X509_free(self.cert)
        def __str__(self):
                """ Returns der string of the certificate """
                b=Membio()
                if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
                        raise X509Error("error serializing certificate")
+               return str(b)
+       def __repr__(self):
+               """ Returns valid call to the constructor """
+               return "X509(data="+repr(str(self))+",format='DER')"
        @property
        def pubkey(self):
                """EVP PKEy object of certificate public key"""
                return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
-       def verify(self,key):   
-               """ Verify self on given issuer key """
+       def verify(self,store=None,key=None):   
+               """ 
+               Verify self. Supports verification on both X509 store object 
+               or just public issuer key
+               @param store X509Store object.
+               @param key - PKey object
+               parameters are mutually exclusive. If neither is specified, attempts to verify
+               itself as self-signed certificate
+               """
+               if store is not None and key is not None:
+                       raise X509Error("key and store cannot be specified simultaneously")
+               if store is not None:
+                       ctx=libcrypto.X509_STORE_CTX_new()
+                       if ctx is None:
+                               raise X509Error("Error allocating X509_STORE_CTX")
+                       if libcrypt.X509_STORE_CTX_init(ctx,store.ptr,self.cert,None) < 0:
+                               raise X509Error("Error allocating X509_STORE_CTX")
+                       res= libcrypto.X509_verify_cert(ctx)>0
+                       libcrypto.X509_STORE_CTX_free(ctx)
+                       return res
+               else:
+                       if key is None:
+                               if self.issuer != self.subject:
+                                       # Not a self-signed certificate
+                                       return False
+                               key = self.pubkey
+                               res = libcrypto.X509_verify(self.cert,key.ptr)
+                               if res < 0:
+                                       raise X509Error("X509_verify failed")
+                               return res>0
+
        @property
        def subject(self):
                """ X509Name for certificate subject name """
@@ -92,14 +205,96 @@ class X509:
        @property
        def serial(self):
                """ Serial number of certificate as integer """
-               return
+               asnint=libcrypto.X509_get_serialNumber(self.cert)
+               b=Membio()
+               libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
+               return int(str(b),16)
        @property
        def startDate(self):
                """ Certificate validity period start date """
-               raise NotImplemented
+               # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
+               raise NotImplementedError
        @property
        def endDate(self):
                """ Certificate validity period end date """
-               raise NotImplemented
+               # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
+               raise NotImplementedError
        def extensions(self):
-               raise NotImplemented
+               raise NotImplementedError
+class X509Store:
+       """
+               Represents trusted certificate store. Can be used to lookup CA certificates to verify
+
+               @param file - file with several certificates and crls to load into store
+               @param dir - hashed directory with certificates and crls
+               @param default - if true, default verify location (directory) is installed
+
+       """
+       def __init__(self,file=None,dir=None,default=False):
+               """
+               Creates X509 store and installs lookup method. Optionally initializes 
+               by certificates from given file or directory.
+               """
+               #
+               # Todo - set verification flags
+               # 
+               self.store=libcrypto.X509_STORE_new()
+               lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
+               if lookup is None:
+                       raise X509Error("error installing file lookup method")
+               if (file is not None):
+                       if not libcrypto.X509_LOOKUP_loadfile(lookup,file,1):
+                               raise X509Error("error loading trusted certs from file "+file)
+               
+               lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
+               if lookup is None:
+                       raise X509Error("error installing hashed lookup method")
+               if dir is not None:
+                       if not libcrypto.X509_LOOKUP_add_dir(lookup,dir,1):
+                               raise X509Error("error adding hashed  trusted certs dir "+dir)
+               if default:
+                       if not libcrypto.X509_LOOKUP.add_dir(lookup,None,3):
+                               raise X509Error("error adding default trusted certs dir ")
+       def add_cert(self,cert):
+               """
+               Explicitely adds certificate to set of trusted in the store
+               @param cert - X509 object to add
+               """
+               if not isinstance(cert,X509):
+                       raise TypeError("cert should be X509")
+               libcrypto.X509_STORE_add_cert(self.store,cert.cert)
+       def add_callback(self,callback):
+               """
+               Installs callbac function, which would receive detailed information
+               about verified ceritificates
+               """
+               raise NotImplementedError
+       def setflags(self,flags):
+               """
+               Set certificate verification flags.
+               @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
+               """
+               libcrypto.X509_STORE_set_flags(self.store,flags)        
+       def setpurpose(self,purpose):
+               """
+               Sets certificate purpose which verified certificate should match
+               @param purpose - number from 1 to 9 or standard strind defined in Openssl
+               possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
+               """
+               if isinstance(purpose,str):
+                       purp_no=X509_PURPOSE_get_by_sname(purpose)
+                       if purp_no <=0:
+                               raise X509Error("Invalid certificate purpose '"+purpose+"'")
+               elif isinstance(purpose,int):
+                       purp_no = purpose
+               if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
+                       raise X509Error("cannot set purpose")
+libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
+libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
+libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
+libcrypto.X509_get_serialNumber.restype=c_void_p
+libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
+libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
+libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
+libcrypto.X509_NAME_get_entry.restype=c_void_p
+libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
index d174a83bff31485d292ba631492561e3fec25f26..1bd2e1f1bf68b068e04b3f4fd8450c12e70741e7 100644 (file)
@@ -8,6 +8,15 @@ class TestRead(unittest.TestCase):
                data=bio.read()
                del bio
                self.assertEqual(data,s)
+       def test_reset(self):
+               s="A quick brown fox jumps over a lazy dog"
+               bio=Membio(s)
+               data=bio.read()
+               bio.reset()
+               data2=bio.read()
+               del bio
+               self.assertEqual(data,data2)
+               self.assertEqual(data,s)
        def test_readlongstr(self):
                poem='''Eyes of grey--a sodden quay,
 Driving rain and falling tears,
@@ -83,6 +92,15 @@ class TestWrite(unittest.TestCase):
                b.write("the lazy dog.")
                self.assertEqual(str(b),"A quick brown fox jumps over the lazy dog.")
 
-
+       def test_unicode(self):
+               b=Membio()
+               s='\xd0\xba\xd0\xb0\xd0\xba \xd1\x8d\xd1\x82\xd0\xbe \xd0\xbf\xd0\xbe-\xd1\x80\xd1\x83\xd1\x81\xd1\x81\xd0\xba\xd0\xb8'
+               b.write(s)
+               self.assertEqual(unicode(b),u'\u043a\u0430\u043a \u044d\u0442\u043e \u043f\u043e-\u0440\u0443\u0441\u0441\u043a\u0438')
+       def test_unicode2(self):
+               b=Membio()
+               u=u'\u043a\u0430\u043a \u044d\u0442\u043e \u043f\u043e-\u0440\u0443\u0441\u0441\u043a\u0438'
+               b.write(u)
+               self.assertEqual(unicode(b),u)
 if __name__ == '__main__':
        unittest.main()