]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blobdiff - ctypescrypto/x509.py
Add X509 to __all__. Add pem() method to X509
[oss/ctypescrypto.git] / ctypescrypto / x509.py
index 5c2a50d99303392a483243dc3e7db7647b5ab6b3..bd81fdd01c207f04b024f5d75d3b673d97a9b628 100644 (file)
@@ -1,9 +1,80 @@
-from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p
+"""
+Implements interface to openssl X509 and X509Store structures, 
+I.e allows to load, analyze and verify certificates.
+
+X509Store objects are also used to verify other signed documets,
+such as CMS, OCSP and timestamps.
+"""
+
+
+
+from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
 from ctypescrypto.bio import Membio
 from ctypescrypto.pkey import PKey
 from ctypescrypto.oid import Oid
 from ctypescrypto.exception import LibCryptoError
 from ctypescrypto import libcrypto
+from datetime import datetime
+try:
+       from pytz import utc
+except ImportError:
+       from datetime import timedelta,tzinfo
+       ZERO=timedelta(0)
+       class UTC(tzinfo):
+               """tzinfo object for UTC. 
+                       If no pytz is available, we would use it.
+               """
+
+               def utcoffset(self, dt):
+                       return ZERO
+
+               def tzname(self, dt):
+                       return "UTC"
+
+               def dst(self, dt):
+                       return ZERO
+
+       utc=UTC()
+
+__all__ = ['X509','X509Error','X509Name','X509Store','StackOfX509']
+
+class _validity(Structure):
+       """ ctypes representation of X509_VAL structure 
+               needed to access certificate validity period, because openssl
+               doesn't provide fuctions for it - only macros
+       """
+       _fields_ =      [('notBefore',c_void_p),('notAfter',c_void_p)]
+
+class _cinf(Structure):
+       """ ctypes representtion of X509_CINF structure 
+           neede to access certificate data, which are accessable only
+               via macros
+       """
+       _fields_ = [('version',c_void_p),
+               ('serialNumber',c_void_p),
+               ('sign_alg',c_void_p),
+               ('issuer',c_void_p),
+               ('validity',POINTER(_validity)),
+               ('subject',c_void_p),
+               ('pubkey',c_void_p),
+               ('issuerUID',c_void_p),
+               ('subjectUID',c_void_p),
+               ('extensions',c_void_p),
+               ]
+
+class _x509(Structure):
+       """
+       ctypes represntation of X509 structure needed
+       to access certificate data which are accesable only via
+       macros, not functions
+       """
+       _fields_ = [('cert_info',POINTER(_cinf)),
+                               ('sig_alg',c_void_p),
+                               ('signature',c_void_p),
+                               # There are a lot of parsed extension fields there
+                               ]
+_px509 = POINTER(_x509)
+
 class X509Error(LibCryptoError):
        """
        Exception, generated when some openssl function fail
@@ -12,10 +83,14 @@ class X509Error(LibCryptoError):
        pass
 
 
-class X509Name:
+class X509Name(object):
        """
        Class which represents X.509 distinguished name - typically 
        a certificate subject name or an issuer name.
+
+       Now used only to represent information, extracted from the
+       certificate. Potentially can be also used to build DN when creating
+       certificate signing request
        """
        # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
        PRINT_FLAG=0x10010
@@ -73,52 +148,119 @@ class X509Name:
                        # Return first matching field
                        idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
                        if idx<0:
-                               raise KeyError("Key not found "+repr(Oid))
+                               raise KeyError("Key not found "+str(Oid))
                        entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
                        s=libcrypto.X509_NAME_ENTRY_get_data(entry)
                        b=Membio()
                        libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
                        return unicode(b)
-               elif isinstance(key,int):
+               elif isinstance(key,(int,long)):
                        # Return OID, string tuple
                        entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
                        if entry is None:
                                raise IndexError("name entry index out of range")
-                       obj=libcrypto.X509_NAME_ENTRY_get_object(entry)
-                       nid=libcrypto.OBJ_obj2nid(obj)
-                       if nid==0:
-                               buf=create_string_buffer(80)
-                               len=libcrypto.OBJ_obj2txt(buf,80,obj,1)
-                               oid=Oid(buf[0:len])
-                       else:
-                               oid=Oid(nid)
+                       oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
                        s=libcrypto.X509_NAME_ENTRY_get_data(entry)
                        b=Membio()
                        libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
                        return (oid,unicode(b))
+               else:
+                       raise TypeError("X509 NAME can be indexed by Oids or integers only")
 
        def __setitem__(self,key,val):
                if not self.writable:
                        raise ValueError("Attempt to modify constant X509 object")
-class X509_extlist:
-       def __init__(self,ptr):
-               self.ptr=ptr
+               else:
+                       raise NotImplementedError
+       def __delitem__(self,key):
+               if not self.writable:
+                       raise ValueError("Attempt to modify constant X509 object")
+               else:
+                       raise NotImplementedError
+       def __hash__(self):
+               return libcrypto.X509_NAME_hash(self.ptr)
+
+class _x509_ext(Structure):
+       """ Represens C structure X509_EXTENSION """
+       _fields_=[("object",c_void_p),
+                       ("critical",c_int),
+                       ("value",c_void_p)]
+
+class X509_EXT(object):
+       """ Python object which represents a certificate extension """
+       def __init__(self,ptr,copy=False):
+               """ Initializes from the pointer to X509_EXTENSION.
+                       If copy is True, creates a copy, otherwise just
+                       stores pointer.
+               """
+               if copy:
+                       self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
+               else:
+                       self.ptr=cast(ptr,POINTER(_x509_ext))
        def __del__(self):
-               libcrypto.X509_NAME_free(self.ptr)
+               libcrypto.X509_EXTENSION_free(self.ptr)
        def __str__(self):
-               raise NotImplementedError
+               b=Membio()
+               libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
+               return str(b)
+       def __unicode__(self):
+               b=Membio()
+               libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
+               return unicode(b)
+       @property
+       def oid(self):
+               return Oid.fromobj(self.ptr[0].object)
+       @property
+       def critical(self):     
+               return self.ptr[0].critical >0
+class _X509extlist(object):    
+       """
+       Represents list of certificate extensions
+       """
+       def __init__(self,cert):
+               self.cert=cert
        def __len__(self):
-               return libcrypto.X509_NAME_entry_count(self.ptr)
-
-       def __getattr__(self,key):
-               raise NotImplementedError
-       def __setattr__(self,key,val):
-               raise NotImplementedError
-
-       
-
+               return libcrypto.X509_get_ext_count(self.cert.cert)
+       def __getitem__(self,item):
+               p=libcrypto.X509_get_ext(self.cert.cert,item)
+               if p is None:
+                       raise IndexError
+               return X509_EXT(p,True)
+       def find(self,oid):
+               """
+               Return list of extensions with given Oid
+               """
+               if not isinstance(oid,Oid):
+                       raise TypeError("Need crytypescrypto.oid.Oid as argument")
+               found=[]
+               l=-1
+               end=len(self)
+               while True:
+                       l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
+                       if l>=end or l<0:
+                                break
+                       found.append(self[l])
+               return found
+       def find_critical(self,crit=True):
+               """
+               Return list of critical extensions (or list of non-cricital, if
+               optional second argument is False
+               """
+               if crit:
+                       flag=1
+               else:
+                       flag=0
+               found=[]
+               end=len(self)
+               l=-1
+               while True:
+                       l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
+                       if l>=end or l<0:
+                                break
+                       found.append(self[l])
+               return found                    
 
-class X509:
+class X509(object):
        """
        Represents X.509 certificate. 
        """
@@ -144,6 +286,7 @@ class X509:
                                self.cert=libcrypto.d2i_X509_bio(b.bio,None)
                        if self.cert is None:
                                raise X509Error("error reading certificate")
+               self.extensions=_X509extlist(self)              
        def __del__(self):
                """
                Frees certificate object
@@ -162,6 +305,12 @@ class X509:
        def pubkey(self):
                """EVP PKEy object of certificate public key"""
                return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
+       def pem(self):
+               """ Returns PEM represntation of the certificate """
+               b=Membio()
+               if libcrypto.PEM_write_bio_X509(b.bio,self.cert)==0:
+                       raise X509Error("error serializing certificate")
+               return str(b)
        def verify(self,store=None,chain=[],key=None):  
                """ 
                Verify self. Supports verification on both X509 store object 
@@ -170,10 +319,10 @@ class X509:
                @param chain - list of X509 objects to add into verification
                        context.These objects are untrusted, but can be used to
                        build certificate chain up to trusted object in the store
-               @param key - PKey object
-               parameters stora and key are mutually exclusive. If neither is specified, attempts to verify
+               @param key - PKey object with open key to validate signature
                
-               itself as self-signed certificate
+               parameters store and key are mutually exclusive. If neither 
+               is specified, attempts to verify self as self-signed certificate
                """
                if store is not None and key is not None:
                        raise X509Error("key and store cannot be specified simultaneously")
@@ -216,29 +365,43 @@ class X509:
                b=Membio()
                libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
                return int(str(b),16)
+       @property 
+       def version(self):
+               """ certificate version as integer. Really certificate stores 0 for
+               version 1 and 2 for version 3, but we return 1 and 3 """
+               asn1int=cast(self.cert,_px509)[0].cert_info[0].version
+               return libcrypto.ASN1_INTEGER_get(asn1int)+1
        @property
        def startDate(self):
                """ Certificate validity period start date """
                # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
-               raise NotImplementedError
+               global utc
+               asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
+               b=Membio()
+               libcrypto.ASN1_TIME_print(b.bio,asn1date)
+               return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
        @property
        def endDate(self):
                """ Certificate validity period end date """
                # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
-               raise NotImplementedError
-       def extensions(self):
-               """ Returns list of extensions """
-               raise NotImplementedError
+               global utc
+               asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
+               b=Membio()
+               libcrypto.ASN1_TIME_print(b.bio,asn1date)
+               return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
        def check_ca(self):
                """ Returns True if certificate is CA certificate """
                return libcrypto.X509_check_ca(self.cert)>0
-class X509Store:
+class X509Store(object):
        """
-               Represents trusted certificate store. Can be used to lookup CA certificates to verify
+               Represents trusted certificate store. Can be used to lookup CA 
+               certificates to verify
 
-               @param file - file with several certificates and crls to load into store
+               @param file - file with several certificates and crls 
+                               to load into store
                @param dir - hashed directory with certificates and crls
-               @param default - if true, default verify location (directory) is installed
+               @param default - if true, default verify location (directory) 
+                       is installed
 
        """
        def __init__(self,file=None,dir=None,default=False):
@@ -277,7 +440,7 @@ class X509Store:
                libcrypto.X509_STORE_add_cert(self.store,cert.cert)
        def add_callback(self,callback):
                """
-               Installs callbac function, which would receive detailed information
+               Installs callback function, which would receive detailed information
                about verified ceritificates
                """
                raise NotImplementedError
@@ -302,10 +465,16 @@ class X509Store:
                if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
                        raise X509Error("cannot set purpose")
        def setdepth(self,depth):
+               """
+               Sets the verification depth i.e. max length of certificate chain
+               which is acceptable
+               """
                libcrypto.X509_STORE_set_depth(self.store,depth)
        def settime(self, time):
                """
                Set point in time used to check validity of certificates for
+               Time can be either python datetime object or number of seconds
+               sinse epoch
                """
                if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
                        d=int(time.strftime("%s"))
@@ -314,7 +483,7 @@ class X509Store:
                else:
                        raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
                raise NotImplementedError
-class StackOfX509:
+class StackOfX509(object):
        """
        Implements OpenSSL STACK_OF(X509) object.
        It looks much like python container types
@@ -333,11 +502,11 @@ class StackOfX509:
                """
                if  ptr is None:
                        self.need_free = True
-                       self.ptr=libcrypt.sk_new_null()
+                       self.ptr=libcrypto.sk_new_null()
                        if certs is not None:
                                for crt in certs:
                                        self.append(crt)
-               elif not certs is None:
+               elif certs is not None:
                                raise ValueError("cannot handle certs an ptr simultaneously")
                else:
                        self.need_free = disposable
@@ -349,12 +518,15 @@ class StackOfX509:
                        raise IndexError
                p=libcrypto.sk_value(self.ptr,index)
                return X509(ptr=libcrypto.X509_dup(p))
-       def __putitem__(self,index,value):
+       def __setitem__(self,index,value):
                if not self.need_free:
                        raise ValueError("Stack is read-only")
                if index <0 or index>=len(self):
                        raise IndexError
-               p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
+               if not isinstance(value,X509):
+                       raise TypeError('StackOfX508 can contain only X509 objects')
+               p=libcrypto.sk_value(self.ptr,index)
+               libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
                libcrypto.X509_free(p)
        def __delitem__(self,index):    
                if not self.need_free:
@@ -369,9 +541,18 @@ class StackOfX509:
        def append(self,value):
                if not self.need_free:
                        raise ValueError("Stack is read-only")
+               if not isinstance(value,X509):
+                       raise TypeError('StackOfX508 can contain only X509 objects')
                libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
+libcrypto.PEM_read_bio_X509.restype=c_void_p
+libcrypto.PEM_read_bio_X509.argtypes=(c_void_p,POINTER(c_void_p),c_void_p,c_void_p)
+libcrypto.PEM_write_bio_X509.restype=c_int
+libcrypto.PEM_write_bio_X509.argtypes=(c_void_p,c_void_p)
+libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
+libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
+libcrypto.ASN1_INTEGER_get.restype=c_long
 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
 libcrypto.X509_get_serialNumber.restype=c_void_p
 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
@@ -386,3 +567,18 @@ libcrypto.X509_LOOKUP_file.restype=c_void_p
 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
 libcrypto.X509_LOOKUP_ctrl.restype=c_int
 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
+libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
+libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
+libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
+libcrypto.X509_get_ext.restype=c_void_p
+libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
+libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
+libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
+libcrypto.sk_set.restype=c_void_p
+libcrypto.sk_value.argtypes=(c_void_p,c_int)
+libcrypto.sk_value.restype=c_void_p
+libcrypto.X509_dup.restype=c_void_p
+libcrypto.sk_new_null.restype=c_void_p
+libcrypto.X509_dup.argtypes=(c_void_p,)
+libcrypto.X509_NAME_hash.restype=c_long
+libcrypto.X509_NAME_hash.argtypes=(c_void_p,)