-from ctypes import c_void_p,create_string_buffer,c_long,c_int
+"""
+Implements interface to openssl X509 and X509Store structures,
+I.e allows to load, analyze and verify certificates.
+
+X509Store objects are also used to verify other signed documets,
+such as CMS, OCSP and timestamps.
+"""
+
+
+
+from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
from ctypescrypto.bio import Membio
from ctypescrypto.pkey import PKey
from ctypescrypto.oid import Oid
from ctypescrypto.exception import LibCryptoError
from ctypescrypto import libcrypto
+from datetime import datetime
+try:
+ from pytz import utc
+except ImportError:
+ from datetime import timedelta
+ ZERO=timedelta(0)
+ class UTC(datetime.tzinfo):
+ """tzinfo object for UTC.
+ If no pytz is available, we would use it.
+ """
+
+ def utcoffset(self, dt):
+ return ZERO
+
+ def tzname(self, dt):
+ return "UTC"
+
+ def dst(self, dt):
+ return ZERO
+
+ utc=UTC()
+
+__all__ = ['X509Error','X509Name','X509Store','StackOfX509']
+class _validity(Structure):
+ """ ctypes representation of X509_VAL structure
+ needed to access certificate validity period, because openssl
+ doesn't provide fuctions for it - only macros
+ """
+ _fields_ = [('notBefore',c_void_p),('notAfter',c_void_p)]
+
+class _cinf(Structure):
+ """ ctypes representtion of X509_CINF structure
+ neede to access certificate data, which are accessable only
+ via macros
+ """
+ _fields_ = [('version',c_void_p),
+ ('serialNumber',c_void_p),
+ ('sign_alg',c_void_p),
+ ('issuer',c_void_p),
+ ('validity',POINTER(_validity)),
+ ('subject',c_void_p),
+ ('pubkey',c_void_p),
+ ]
+
+class _x509(Structure):
+ """
+ ctypes represntation of X509 structure needed
+ to access certificate data which are accesable only via
+ macros, not functions
+ """
+ _fields_ = [('cert_info',POINTER(_cinf)),
+ ('sig_alg',c_void_p),
+ ('signature',c_void_p),
+ # There are a lot of parsed extension fields there
+ ]
+_px509 = POINTER(_x509)
+
+# X509_extlist is not exported yet, because is not implemented
class X509Error(LibCryptoError):
"""
Exception, generated when some openssl function fail
"""
Class which represents X.509 distinguished name - typically
a certificate subject name or an issuer name.
+
+ Now used only to represent information, extracted from the
+ certificate. Potentially can be also used to build DN when creating
+ certificate signing request
"""
# XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
PRINT_FLAG=0x10010
def pubkey(self):
"""EVP PKEy object of certificate public key"""
return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
- def verify(self,store=None,key=None):
+ def verify(self,store=None,chain=[],key=None):
"""
Verify self. Supports verification on both X509 store object
or just public issuer key
@param store X509Store object.
- @param key - PKey object
- parameters are mutually exclusive. If neither is specified, attempts to verify
- itself as self-signed certificate
+ @param chain - list of X509 objects to add into verification
+ context.These objects are untrusted, but can be used to
+ build certificate chain up to trusted object in the store
+ @param key - PKey object with open key to validate signature
+
+ parameters store and key are mutually exclusive. If neither
+ is specified, attempts to verify self as self-signed certificate
"""
if store is not None and key is not None:
raise X509Error("key and store cannot be specified simultaneously")
ctx=libcrypto.X509_STORE_CTX_new()
if ctx is None:
raise X509Error("Error allocating X509_STORE_CTX")
- if libcrypto.X509_STORE_CTX_init(ctx,store.ptr,self.cert,None) < 0:
+ if chain is not None and len(chain)>0:
+ ch=StackOfX509(chain)
+ else:
+ ch=None
+ if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
raise X509Error("Error allocating X509_STORE_CTX")
res= libcrypto.X509_verify_cert(ctx)
libcrypto.X509_STORE_CTX_free(ctx)
b=Membio()
libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
return int(str(b),16)
+ @property
+ def version(self):
+ """ certificate version as integer. Really certificate stores 0 for
+ version 1 and 2 for version 3, but we return 1 and 3 """
+ asn1int=cast(self.cert,_px509)[0].cert_info[0].version
+ return libcrypto.ASN1_INTEGER_get(asn1int)+1
@property
def startDate(self):
""" Certificate validity period start date """
# Need deep poke into certificate structure (x)->cert_info->validity->notBefore
- raise NotImplementedError
+ global utc
+ asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
+ b=Membio()
+ libcrypto.ASN1_TIME_print(b.bio,asn1date)
+ return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
@property
def endDate(self):
""" Certificate validity period end date """
# Need deep poke into certificate structure (x)->cert_info->validity->notAfter
- raise NotImplementedError
+ global utc
+ asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
+ b=Membio()
+ libcrypto.ASN1_TIME_print(b.bio,asn1date)
+ return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
def extensions(self):
""" Returns list of extensions """
raise NotImplementedError
return libcrypto.X509_check_ca(self.cert)>0
class X509Store:
"""
- Represents trusted certificate store. Can be used to lookup CA certificates to verify
+ Represents trusted certificate store. Can be used to lookup CA
+ certificates to verify
- @param file - file with several certificates and crls to load into store
+ @param file - file with several certificates and crls
+ to load into store
@param dir - hashed directory with certificates and crls
- @param default - if true, default verify location (directory) is installed
+ @param default - if true, default verify location (directory)
+ is installed
"""
def __init__(self,file=None,dir=None,default=False):
# Todo - set verification flags
#
self.store=libcrypto.X509_STORE_new()
+ if self.store is None:
+ raise X509Error("allocating store")
lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
if lookup is None:
raise X509Error("error installing file lookup method")
if (file is not None):
- if not libcrypto.X509_LOOKUP_loadfile(lookup,file,1):
+ if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
raise X509Error("error loading trusted certs from file "+file)
-
lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
if lookup is None:
raise X509Error("error installing hashed lookup method")
if dir is not None:
- if not libcrypto.X509_LOOKUP_add_dir(lookup,dir,1):
+ if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
raise X509Error("error adding hashed trusted certs dir "+dir)
if default:
- if not libcrypto.X509_LOOKUP.add_dir(lookup,None,3):
+ if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
raise X509Error("error adding default trusted certs dir ")
def add_cert(self,cert):
"""
purp_no = purpose
if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
raise X509Error("cannot set purpose")
+ def setdepth(self,depth):
+ libcrypto.X509_STORE_set_depth(self.store,depth)
+ def settime(self, time):
+ """
+ Set point in time used to check validity of certificates for
+ """
+ if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
+ d=int(time.strftime("%s"))
+ elif isinstance(time,int):
+ pass
+ else:
+ raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
+ raise NotImplementedError
+class StackOfX509:
+ """
+ Implements OpenSSL STACK_OF(X509) object.
+ It looks much like python container types
+ """
+ def __init__(self,certs=None,ptr=None,disposable=True):
+ """
+ Create stack
+ @param certs - list of X509 objects. If specified, read-write
+ stack is created and populated by these certificates
+ @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
+ some functions
+ @param disposable - if True, stack created from object, returned
+ by function is copy, and can be modified and need to be
+ freeid. If false, it is just pointer into another
+ structure i.e. CMS_ContentInfo
+ """
+ if ptr is None:
+ self.need_free = True
+ self.ptr=libcrypt.sk_new_null()
+ if certs is not None:
+ for crt in certs:
+ self.append(crt)
+ elif not certs is None:
+ raise ValueError("cannot handle certs an ptr simultaneously")
+ else:
+ self.need_free = disposable
+ self.ptr=ptr
+ def __len__(self):
+ return libcrypto.sk_num(self.ptr)
+ def __getitem__(self,index):
+ if index <0 or index>=len(self):
+ raise IndexError
+ p=libcrypto.sk_value(self.ptr,index)
+ return X509(ptr=libcrypto.X509_dup(p))
+ def __putitem__(self,index,value):
+ if not self.need_free:
+ raise ValueError("Stack is read-only")
+ if index <0 or index>=len(self):
+ raise IndexError
+ p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
+ libcrypto.X509_free(p)
+ def __delitem__(self,index):
+ if not self.need_free:
+ raise ValueError("Stack is read-only")
+ if index <0 or index>=len(self):
+ raise IndexError
+ p=libcrypto.sk_delete(self.ptr,index)
+ libcrypto.X509_free(p)
+ def __del__(self):
+ if self.need_free:
+ libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
+ def append(self,value):
+ if not self.need_free:
+ raise ValueError("Stack is read-only")
+ libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
+libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
+libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
+libcrypto.ASN1_INTEGER_get.restype=c_long
libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
libcrypto.X509_get_serialNumber.restype=c_void_p
libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
libcrypto.X509_NAME_get_entry.restype=c_void_p
libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
+libcrypto.X509_STORE_new.restype=c_void_p
+libcrypto.X509_STORE_add_lookup.restype=c_void_p
+libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
+libcrypto.X509_LOOKUP_file.restype=c_void_p
+libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
+libcrypto.X509_LOOKUP_ctrl.restype=c_int
+libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
+