]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blobdiff - ctypescrypto/x509.py
I've discovered Python's __all__ variable and make use of it in all modles
[oss/ctypescrypto.git] / ctypescrypto / x509.py
index b933a134678e4823ed4b917d5584460402ca2e13..e2c97c669d114ddcbcb1e2f2707ecedc7e7c3751 100644 (file)
@@ -1,10 +1,22 @@
-from ctypes import c_void_p,create_string_buffer,c_long,c_int
+"""
+Implements interface to openssl X509 and X509Store structures, 
+I.e allows to load, analyze and verify certificates.
+
+X509Store objects are also used to verify other signed documets,
+such as CMS, OCSP and timestamps.
+"""
+
+
+
+from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p
 from ctypescrypto.bio import Membio
 from ctypescrypto.pkey import PKey
 from ctypescrypto.oid import Oid
 from ctypescrypto.exception import LibCryptoError
 from ctypescrypto import libcrypto
 
+__all__ = ['X509Error','X509Name','X509Store','StackOfX509']
+# X509_extlist is not exported yet, because is not implemented 
 class X509Error(LibCryptoError):
        """
        Exception, generated when some openssl function fail
@@ -17,6 +29,10 @@ class X509Name:
        """
        Class which represents X.509 distinguished name - typically 
        a certificate subject name or an issuer name.
+
+       Now used only to represent information, extracted from the
+       certificate. Potentially can be also used to build DN when creating
+       certificate signing request
        """
        # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
        PRINT_FLAG=0x10010
@@ -163,14 +179,18 @@ class X509:
        def pubkey(self):
                """EVP PKEy object of certificate public key"""
                return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
-       def verify(self,store=None,key=None):   
+       def verify(self,store=None,chain=[],key=None):  
                """ 
                Verify self. Supports verification on both X509 store object 
                or just public issuer key
                @param store X509Store object.
-               @param key - PKey object
-               parameters are mutually exclusive. If neither is specified, attempts to verify
-               itself as self-signed certificate
+               @param chain - list of X509 objects to add into verification
+                       context.These objects are untrusted, but can be used to
+                       build certificate chain up to trusted object in the store
+               @param key - PKey object with open key to validate signature
+               
+               parameters store and key are mutually exclusive. If neither 
+               is specified, attempts to verify self as self-signed certificate
                """
                if store is not None and key is not None:
                        raise X509Error("key and store cannot be specified simultaneously")
@@ -178,22 +198,26 @@ class X509:
                        ctx=libcrypto.X509_STORE_CTX_new()
                        if ctx is None:
                                raise X509Error("Error allocating X509_STORE_CTX")
-                       if libcrypt.X509_STORE_CTX_init(ctx,store.ptr,self.cert,None) < 0:
+                       if chain is not None and len(chain)>0:
+                               ch=StackOfX509(chain)
+                       else:
+                               ch=None
+                       if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
                                raise X509Error("Error allocating X509_STORE_CTX")
-                       res= libcrypto.X509_verify_cert(ctx)>0
+                       res= libcrypto.X509_verify_cert(ctx)
                        libcrypto.X509_STORE_CTX_free(ctx)
-                       return res
+                       return res>0
                else:
                        if key is None:
                                if self.issuer != self.subject:
                                        # Not a self-signed certificate
                                        return False
                                key = self.pubkey
-                               res = libcrypto.X509_verify(self.cert,key.ptr)
-                               if res < 0:
-                                       raise X509Error("X509_verify failed")
-                               return res>0
-
+                       res = libcrypto.X509_verify(self.cert,key.key)
+                       if res < 0:
+                               raise X509Error("X509_verify failed")
+                       return res>0
+                       
        @property
        def subject(self):
                """ X509Name for certificate subject name """
@@ -220,14 +244,21 @@ class X509:
                # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
                raise NotImplementedError
        def extensions(self):
+               """ Returns list of extensions """
                raise NotImplementedError
+       def check_ca(self):
+               """ Returns True if certificate is CA certificate """
+               return libcrypto.X509_check_ca(self.cert)>0
 class X509Store:
        """
-               Represents trusted certificate store. Can be used to lookup CA certificates to verify
+               Represents trusted certificate store. Can be used to lookup CA 
+               certificates to verify
 
-               @param file - file with several certificates and crls to load into store
+               @param file - file with several certificates and crls 
+                               to load into store
                @param dir - hashed directory with certificates and crls
-               @param default - if true, default verify location (directory) is installed
+               @param default - if true, default verify location (directory) 
+                       is installed
 
        """
        def __init__(self,file=None,dir=None,default=False):
@@ -239,21 +270,22 @@ class X509Store:
                # Todo - set verification flags
                # 
                self.store=libcrypto.X509_STORE_new()
+               if self.store is None:
+                       raise X509Error("allocating store")
                lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
                if lookup is None:
                        raise X509Error("error installing file lookup method")
                if (file is not None):
-                       if not libcrypto.X509_LOOKUP_loadfile(lookup,file,1):
+                       if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
                                raise X509Error("error loading trusted certs from file "+file)
-               
                lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
                if lookup is None:
                        raise X509Error("error installing hashed lookup method")
                if dir is not None:
-                       if not libcrypto.X509_LOOKUP_add_dir(lookup,dir,1):
+                       if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
                                raise X509Error("error adding hashed  trusted certs dir "+dir)
                if default:
-                       if not libcrypto.X509_LOOKUP.add_dir(lookup,None,3):
+                       if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
                                raise X509Error("error adding default trusted certs dir ")
        def add_cert(self,cert):
                """
@@ -289,6 +321,75 @@ class X509Store:
                        purp_no = purpose
                if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
                        raise X509Error("cannot set purpose")
+       def setdepth(self,depth):
+               libcrypto.X509_STORE_set_depth(self.store,depth)
+       def settime(self, time):
+               """
+               Set point in time used to check validity of certificates for
+               """
+               if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
+                       d=int(time.strftime("%s"))
+               elif isinstance(time,int):
+                       pass
+               else:
+                       raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
+               raise NotImplementedError
+class StackOfX509:
+       """
+       Implements OpenSSL STACK_OF(X509) object.
+       It looks much like python container types
+       """
+       def __init__(self,certs=None,ptr=None,disposable=True):
+               """
+               Create stack
+               @param certs - list of X509 objects. If specified, read-write
+                       stack is created and populated by these certificates
+               @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
+                       some functions
+               @param disposable - if True, stack created from object, returned
+                               by function is copy, and can be modified and need to be
+                               freeid. If false, it is just pointer into another
+                               structure i.e. CMS_ContentInfo
+               """
+               if  ptr is None:
+                       self.need_free = True
+                       self.ptr=libcrypt.sk_new_null()
+                       if certs is not None:
+                               for crt in certs:
+                                       self.append(crt)
+               elif not certs is None:
+                               raise ValueError("cannot handle certs an ptr simultaneously")
+               else:
+                       self.need_free = disposable
+                       self.ptr=ptr
+       def __len__(self):
+               return libcrypto.sk_num(self.ptr)
+       def __getitem__(self,index):
+               if index <0 or index>=len(self):
+                       raise IndexError
+               p=libcrypto.sk_value(self.ptr,index)
+               return X509(ptr=libcrypto.X509_dup(p))
+       def __putitem__(self,index,value):
+               if not self.need_free:
+                       raise ValueError("Stack is read-only")
+               if index <0 or index>=len(self):
+                       raise IndexError
+               p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
+               libcrypto.X509_free(p)
+       def __delitem__(self,index):    
+               if not self.need_free:
+                       raise ValueError("Stack is read-only")
+               if index <0 or index>=len(self):
+                       raise IndexError
+               p=libcrypto.sk_delete(self.ptr,index)
+               libcrypto.X509_free(p)
+       def __del__(self):
+               if self.need_free:
+                       libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
+       def append(self,value):
+               if not self.need_free:
+                       raise ValueError("Stack is read-only")
+               libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
@@ -298,3 +399,10 @@ libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
 libcrypto.X509_NAME_get_entry.restype=c_void_p
 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
+libcrypto.X509_STORE_new.restype=c_void_p
+libcrypto.X509_STORE_add_lookup.restype=c_void_p
+libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
+libcrypto.X509_LOOKUP_file.restype=c_void_p
+libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
+libcrypto.X509_LOOKUP_ctrl.restype=c_int
+libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))