2 Implements interface to openssl X509 and X509Store structures,
3 I.e allows to load, analyze and verify certificates.
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
21 from datetime import timedelta,tzinfo
24 """tzinfo object for UTC.
25 If no pytz is available, we would use it.
28 def utcoffset(self, dt):
39 __all__ = ['X509','X509Error','X509Name','X509Store','StackOfX509']
41 class _validity(Structure):
42 """ ctypes representation of X509_VAL structure
43 needed to access certificate validity period, because openssl
44 doesn't provide fuctions for it - only macros
46 _fields_ = [('notBefore',c_void_p),('notAfter',c_void_p)]
48 class _cinf(Structure):
49 """ ctypes representtion of X509_CINF structure
50 neede to access certificate data, which are accessable only
53 _fields_ = [('version',c_void_p),
54 ('serialNumber',c_void_p),
55 ('sign_alg',c_void_p),
57 ('validity',POINTER(_validity)),
60 ('issuerUID',c_void_p),
61 ('subjectUID',c_void_p),
62 ('extensions',c_void_p),
65 class _x509(Structure):
67 ctypes represntation of X509 structure needed
68 to access certificate data which are accesable only via
71 _fields_ = [('cert_info',POINTER(_cinf)),
73 ('signature',c_void_p),
74 # There are a lot of parsed extension fields there
76 _px509 = POINTER(_x509)
78 class X509Error(LibCryptoError):
80 Exception, generated when some openssl function fail
86 class X509Name(object):
88 Class which represents X.509 distinguished name - typically
89 a certificate subject name or an issuer name.
91 Now used only to represent information, extracted from the
92 certificate. Potentially can be also used to build DN when creating
93 certificate signing request
95 # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
98 def __init__(self,ptr=None,copy=False):
100 Creates a X509Name object
101 @param ptr - pointer to X509_NAME C structure (as returned by some OpenSSL functions
102 @param copy - indicates that this structure have to be freed upon object destruction
109 self.ptr=libcrypto.X509_NAME_new()
117 libcrypto.X509_NAME_free(self.ptr)
120 Produces an ascii representation of the name, escaping all symbols > 0x80
121 Probably it is not what you want, unless your native language is English
124 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
126 def __unicode__(self):
128 Produces unicode representation of the name.
131 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
135 return number of components in the name
137 return libcrypto.X509_NAME_entry_count(self.ptr)
138 def __cmp__(self,other):
142 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
143 def __eq__(self,other):
144 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
146 def __getitem__(self,key):
147 if isinstance(key,Oid):
148 # Return first matching field
149 idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
151 raise KeyError("Key not found "+str(Oid))
152 entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
153 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
155 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
157 elif isinstance(key,(int,long)):
158 # Return OID, string tuple
159 entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
161 raise IndexError("name entry index out of range")
162 oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
163 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
165 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
166 return (oid,unicode(b))
168 raise TypeError("X509 NAME can be indexed by Oids or integers only")
170 def __setitem__(self,key,val):
171 if not self.writable:
172 raise ValueError("Attempt to modify constant X509 object")
174 raise NotImplementedError
175 def __delitem__(self,key):
176 if not self.writable:
177 raise ValueError("Attempt to modify constant X509 object")
179 raise NotImplementedError
181 return libcrypto.X509_NAME_hash(self.ptr)
183 class _x509_ext(Structure):
184 """ Represens C structure X509_EXTENSION """
185 _fields_=[("object",c_void_p),
189 class X509_EXT(object):
190 """ Python object which represents a certificate extension """
191 def __init__(self,ptr,copy=False):
192 """ Initializes from the pointer to X509_EXTENSION.
193 If copy is True, creates a copy, otherwise just
197 self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
199 self.ptr=cast(ptr,POINTER(_x509_ext))
201 libcrypto.X509_EXTENSION_free(self.ptr)
204 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
206 def __unicode__(self):
208 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
212 return Oid.fromobj(self.ptr[0].object)
215 return self.ptr[0].critical >0
216 class _X509extlist(object):
218 Represents list of certificate extensions
220 def __init__(self,cert):
223 return libcrypto.X509_get_ext_count(self.cert.cert)
224 def __getitem__(self,item):
225 p=libcrypto.X509_get_ext(self.cert.cert,item)
228 return X509_EXT(p,True)
231 Return list of extensions with given Oid
233 if not isinstance(oid,Oid):
234 raise TypeError("Need crytypescrypto.oid.Oid as argument")
239 l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
242 found.append(self[l])
244 def find_critical(self,crit=True):
246 Return list of critical extensions (or list of non-cricital, if
247 optional second argument is False
257 l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
260 found.append(self[l])
265 Represents X.509 certificate.
267 def __init__(self,data=None,ptr=None,format="PEM"):
269 Initializes certificate
270 @param data - serialized certificate in PEM or DER format.
271 @param ptr - pointer to X509, returned by some openssl function.
272 mutually exclusive with data
273 @param format - specifies data format. "PEM" or "DER", default PEM
277 raise TypeError("Cannot use data and ptr simultaneously")
280 raise TypeError("data argument is required")
284 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
286 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
287 if self.cert is None:
288 raise X509Error("error reading certificate")
289 self.extensions=_X509extlist(self)
292 Frees certificate object
294 libcrypto.X509_free(self.cert)
296 """ Returns der string of the certificate """
298 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
299 raise X509Error("error serializing certificate")
302 """ Returns valid call to the constructor """
303 return "X509(data="+repr(str(self))+",format='DER')"
306 """EVP PKEy object of certificate public key"""
307 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
309 """ Returns PEM represntation of the certificate """
311 if libcrypto.PEM_write_bio_X509(b.bio,self.cert)==0:
312 raise X509Error("error serializing certificate")
314 def verify(self,store=None,chain=[],key=None):
316 Verify self. Supports verification on both X509 store object
317 or just public issuer key
318 @param store X509Store object.
319 @param chain - list of X509 objects to add into verification
320 context.These objects are untrusted, but can be used to
321 build certificate chain up to trusted object in the store
322 @param key - PKey object with open key to validate signature
324 parameters store and key are mutually exclusive. If neither
325 is specified, attempts to verify self as self-signed certificate
327 if store is not None and key is not None:
328 raise X509Error("key and store cannot be specified simultaneously")
329 if store is not None:
330 ctx=libcrypto.X509_STORE_CTX_new()
332 raise X509Error("Error allocating X509_STORE_CTX")
333 if chain is not None and len(chain)>0:
334 ch=StackOfX509(chain)
337 if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
338 raise X509Error("Error allocating X509_STORE_CTX")
339 res= libcrypto.X509_verify_cert(ctx)
340 libcrypto.X509_STORE_CTX_free(ctx)
344 if self.issuer != self.subject:
345 # Not a self-signed certificate
348 res = libcrypto.X509_verify(self.cert,key.key)
350 raise X509Error("X509_verify failed")
355 """ X509Name for certificate subject name """
356 return X509Name(libcrypto.X509_get_subject_name(self.cert))
359 """ X509Name for certificate issuer name """
360 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
363 """ Serial number of certificate as integer """
364 asnint=libcrypto.X509_get_serialNumber(self.cert)
366 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
367 return int(str(b),16)
370 """ certificate version as integer. Really certificate stores 0 for
371 version 1 and 2 for version 3, but we return 1 and 3 """
372 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
373 return libcrypto.ASN1_INTEGER_get(asn1int)+1
376 """ Certificate validity period start date """
377 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore
379 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
381 libcrypto.ASN1_TIME_print(b.bio,asn1date)
382 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
385 """ Certificate validity period end date """
386 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
388 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
390 libcrypto.ASN1_TIME_print(b.bio,asn1date)
391 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
393 """ Returns True if certificate is CA certificate """
394 return libcrypto.X509_check_ca(self.cert)>0
395 class X509Store(object):
397 Represents trusted certificate store. Can be used to lookup CA
398 certificates to verify
400 @param file - file with several certificates and crls
402 @param dir - hashed directory with certificates and crls
403 @param default - if true, default verify location (directory)
407 def __init__(self,file=None,dir=None,default=False):
409 Creates X509 store and installs lookup method. Optionally initializes
410 by certificates from given file or directory.
413 # Todo - set verification flags
415 self.store=libcrypto.X509_STORE_new()
416 if self.store is None:
417 raise X509Error("allocating store")
418 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
420 raise X509Error("error installing file lookup method")
421 if (file is not None):
422 if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
423 raise X509Error("error loading trusted certs from file "+file)
424 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
426 raise X509Error("error installing hashed lookup method")
428 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
429 raise X509Error("error adding hashed trusted certs dir "+dir)
431 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
432 raise X509Error("error adding default trusted certs dir ")
433 def add_cert(self,cert):
435 Explicitely adds certificate to set of trusted in the store
436 @param cert - X509 object to add
438 if not isinstance(cert,X509):
439 raise TypeError("cert should be X509")
440 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
441 def add_callback(self,callback):
443 Installs callback function, which would receive detailed information
444 about verified ceritificates
446 raise NotImplementedError
447 def setflags(self,flags):
449 Set certificate verification flags.
450 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
452 libcrypto.X509_STORE_set_flags(self.store,flags)
453 def setpurpose(self,purpose):
455 Sets certificate purpose which verified certificate should match
456 @param purpose - number from 1 to 9 or standard strind defined in Openssl
457 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
459 if isinstance(purpose,str):
460 purp_no=X509_PURPOSE_get_by_sname(purpose)
462 raise X509Error("Invalid certificate purpose '"+purpose+"'")
463 elif isinstance(purpose,int):
465 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
466 raise X509Error("cannot set purpose")
467 def setdepth(self,depth):
469 Sets the verification depth i.e. max length of certificate chain
472 libcrypto.X509_STORE_set_depth(self.store,depth)
473 def settime(self, time):
475 Set point in time used to check validity of certificates for
476 Time can be either python datetime object or number of seconds
479 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
480 d=int(time.strftime("%s"))
481 elif isinstance(time,int):
484 raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
485 raise NotImplementedError
486 class StackOfX509(object):
488 Implements OpenSSL STACK_OF(X509) object.
489 It looks much like python container types
491 def __init__(self,certs=None,ptr=None,disposable=True):
494 @param certs - list of X509 objects. If specified, read-write
495 stack is created and populated by these certificates
496 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
498 @param disposable - if True, stack created from object, returned
499 by function is copy, and can be modified and need to be
500 freeid. If false, it is just pointer into another
501 structure i.e. CMS_ContentInfo
504 self.need_free = True
505 self.ptr=libcrypto.sk_new_null()
506 if certs is not None:
509 elif certs is not None:
510 raise ValueError("cannot handle certs an ptr simultaneously")
512 self.need_free = disposable
515 return libcrypto.sk_num(self.ptr)
516 def __getitem__(self,index):
517 if index <0 or index>=len(self):
519 p=libcrypto.sk_value(self.ptr,index)
520 return X509(ptr=libcrypto.X509_dup(p))
521 def __setitem__(self,index,value):
522 if not self.need_free:
523 raise ValueError("Stack is read-only")
524 if index <0 or index>=len(self):
526 if not isinstance(value,X509):
527 raise TypeError('StackOfX508 can contain only X509 objects')
528 p=libcrypto.sk_value(self.ptr,index)
529 libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
530 libcrypto.X509_free(p)
531 def __delitem__(self,index):
532 if not self.need_free:
533 raise ValueError("Stack is read-only")
534 if index <0 or index>=len(self):
536 p=libcrypto.sk_delete(self.ptr,index)
537 libcrypto.X509_free(p)
540 libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
541 def append(self,value):
542 if not self.need_free:
543 raise ValueError("Stack is read-only")
544 if not isinstance(value,X509):
545 raise TypeError('StackOfX508 can contain only X509 objects')
546 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
547 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
548 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
549 libcrypto.PEM_read_bio_X509.restype=c_void_p
550 libcrypto.PEM_read_bio_X509.argtypes=(c_void_p,POINTER(c_void_p),c_void_p,c_void_p)
551 libcrypto.PEM_write_bio_X509.restype=c_int
552 libcrypto.PEM_write_bio_X509.argtypes=(c_void_p,c_void_p)
553 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
554 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
555 libcrypto.ASN1_INTEGER_get.restype=c_long
556 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
557 libcrypto.X509_get_serialNumber.restype=c_void_p
558 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
559 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
560 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
561 libcrypto.X509_NAME_get_entry.restype=c_void_p
562 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
563 libcrypto.X509_STORE_new.restype=c_void_p
564 libcrypto.X509_STORE_add_lookup.restype=c_void_p
565 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
566 libcrypto.X509_LOOKUP_file.restype=c_void_p
567 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
568 libcrypto.X509_LOOKUP_ctrl.restype=c_int
569 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
570 libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
571 libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
572 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
573 libcrypto.X509_get_ext.restype=c_void_p
574 libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
575 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
576 libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
577 libcrypto.sk_set.restype=c_void_p
578 libcrypto.sk_value.argtypes=(c_void_p,c_int)
579 libcrypto.sk_value.restype=c_void_p
580 libcrypto.X509_dup.restype=c_void_p
581 libcrypto.sk_new_null.restype=c_void_p
582 libcrypto.X509_dup.argtypes=(c_void_p,)
583 libcrypto.X509_NAME_hash.restype=c_long
584 libcrypto.X509_NAME_hash.argtypes=(c_void_p,)