2 Implements interface to openssl X509 and X509Store structures,
3 I.e allows to load, analyze and verify certificates.
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
21 from datetime import timedelta,tzinfo
24 """tzinfo object for UTC.
25 If no pytz is available, we would use it.
28 def utcoffset(self, dt):
39 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
41 class _validity(Structure):
42 """ ctypes representation of X509_VAL structure
43 needed to access certificate validity period, because openssl
44 doesn't provide fuctions for it - only macros
46 _fields_ = [('notBefore',c_void_p),('notAfter',c_void_p)]
48 class _cinf(Structure):
49 """ ctypes representtion of X509_CINF structure
50 neede to access certificate data, which are accessable only
53 _fields_ = [('version',c_void_p),
54 ('serialNumber',c_void_p),
55 ('sign_alg',c_void_p),
57 ('validity',POINTER(_validity)),
60 ('issuerUID',c_void_p),
61 ('subjectUID',c_void_p),
62 ('extensions',c_void_p),
65 class _x509(Structure):
67 ctypes represntation of X509 structure needed
68 to access certificate data which are accesable only via
71 _fields_ = [('cert_info',POINTER(_cinf)),
73 ('signature',c_void_p),
74 # There are a lot of parsed extension fields there
76 _px509 = POINTER(_x509)
78 class X509Error(LibCryptoError):
80 Exception, generated when some openssl function fail
86 class X509Name(object):
88 Class which represents X.509 distinguished name - typically
89 a certificate subject name or an issuer name.
91 Now used only to represent information, extracted from the
92 certificate. Potentially can be also used to build DN when creating
93 certificate signing request
95 # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
98 def __init__(self,ptr=None,copy=False):
100 Creates a X509Name object
101 @param ptr - pointer to X509_NAME C structure (as returned by some OpenSSL functions
102 @param copy - indicates that this structure have to be freed upon object destruction
109 self.ptr=libcrypto.X509_NAME_new()
117 libcrypto.X509_NAME_free(self.ptr)
120 Produces an ascii representation of the name, escaping all symbols > 0x80
121 Probably it is not what you want, unless your native language is English
124 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
126 def __unicode__(self):
128 Produces unicode representation of the name.
131 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
135 return number of components in the name
137 return libcrypto.X509_NAME_entry_count(self.ptr)
138 def __cmp__(self,other):
142 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
143 def __eq__(self,other):
144 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
146 def __getitem__(self,key):
147 if isinstance(key,Oid):
148 # Return first matching field
149 idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
151 raise KeyError("Key not found "+str(Oid))
152 entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
153 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
155 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
157 elif isinstance(key,(int,long)):
158 # Return OID, string tuple
159 entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
161 raise IndexError("name entry index out of range")
162 oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
163 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
165 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
166 return (oid,unicode(b))
168 raise TypeError("X509 NAME can be indexed by Oids or integers only")
170 def __setitem__(self,key,val):
171 if not self.writable:
172 raise ValueError("Attempt to modify constant X509 object")
174 raise NotImplementedError
175 def __delitem__(self,key):
176 if not self.writable:
177 raise ValueError("Attempt to modify constant X509 object")
179 raise NotImplementedError
181 return libcrypto.X509_NAME_hash(self.ptr)
183 class _x509_ext(Structure):
184 """ Represens C structure X509_EXTENSION """
185 _fields_=[("object",c_void_p),
189 class X509_EXT(object):
190 """ Python object which represents a certificate extension """
191 def __init__(self,ptr,copy=False):
192 """ Initializes from the pointer to X509_EXTENSION.
193 If copy is True, creates a copy, otherwise just
197 self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
199 self.ptr=cast(ptr,POINTER(_x509_ext))
201 libcrypto.X509_EXTENSION_free(self.ptr)
204 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
206 def __unicode__(self):
208 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
212 return Oid.fromobj(self.ptr[0].object)
215 return self.ptr[0].critical >0
216 class _X509extlist(object):
218 Represents list of certificate extensions
220 def __init__(self,cert):
223 return libcrypto.X509_get_ext_count(self.cert.cert)
224 def __getitem__(self,item):
225 p=libcrypto.X509_get_ext(self.cert.cert,item)
228 return X509_EXT(p,True)
231 Return list of extensions with given Oid
233 if not isinstance(oid,Oid):
234 raise TypeError("Need crytypescrypto.oid.Oid as argument")
239 l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
242 found.append(self[l])
244 def find_critical(self,crit=True):
246 Return list of critical extensions (or list of non-cricital, if
247 optional second argument is False
257 l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
260 found.append(self[l])
265 Represents X.509 certificate.
267 def __init__(self,data=None,ptr=None,format="PEM"):
269 Initializes certificate
270 @param data - serialized certificate in PEM or DER format.
271 @param ptr - pointer to X509, returned by some openssl function.
272 mutually exclusive with data
273 @param format - specifies data format. "PEM" or "DER", default PEM
277 raise TypeError("Cannot use data and ptr simultaneously")
280 raise TypeError("data argument is required")
284 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
286 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
287 if self.cert is None:
288 raise X509Error("error reading certificate")
289 self.extensions=_X509extlist(self)
292 Frees certificate object
294 libcrypto.X509_free(self.cert)
296 """ Returns der string of the certificate """
298 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
299 raise X509Error("error serializing certificate")
302 """ Returns valid call to the constructor """
303 return "X509(data="+repr(str(self))+",format='DER')"
306 """EVP PKEy object of certificate public key"""
307 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
308 def verify(self,store=None,chain=[],key=None):
310 Verify self. Supports verification on both X509 store object
311 or just public issuer key
312 @param store X509Store object.
313 @param chain - list of X509 objects to add into verification
314 context.These objects are untrusted, but can be used to
315 build certificate chain up to trusted object in the store
316 @param key - PKey object with open key to validate signature
318 parameters store and key are mutually exclusive. If neither
319 is specified, attempts to verify self as self-signed certificate
321 if store is not None and key is not None:
322 raise X509Error("key and store cannot be specified simultaneously")
323 if store is not None:
324 ctx=libcrypto.X509_STORE_CTX_new()
326 raise X509Error("Error allocating X509_STORE_CTX")
327 if chain is not None and len(chain)>0:
328 ch=StackOfX509(chain)
331 if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
332 raise X509Error("Error allocating X509_STORE_CTX")
333 res= libcrypto.X509_verify_cert(ctx)
334 libcrypto.X509_STORE_CTX_free(ctx)
338 if self.issuer != self.subject:
339 # Not a self-signed certificate
342 res = libcrypto.X509_verify(self.cert,key.key)
344 raise X509Error("X509_verify failed")
349 """ X509Name for certificate subject name """
350 return X509Name(libcrypto.X509_get_subject_name(self.cert))
353 """ X509Name for certificate issuer name """
354 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
357 """ Serial number of certificate as integer """
358 asnint=libcrypto.X509_get_serialNumber(self.cert)
360 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
361 return int(str(b),16)
364 """ certificate version as integer. Really certificate stores 0 for
365 version 1 and 2 for version 3, but we return 1 and 3 """
366 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
367 return libcrypto.ASN1_INTEGER_get(asn1int)+1
370 """ Certificate validity period start date """
371 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore
373 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
375 libcrypto.ASN1_TIME_print(b.bio,asn1date)
376 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
379 """ Certificate validity period end date """
380 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
382 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
384 libcrypto.ASN1_TIME_print(b.bio,asn1date)
385 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
387 """ Returns True if certificate is CA certificate """
388 return libcrypto.X509_check_ca(self.cert)>0
389 class X509Store(object):
391 Represents trusted certificate store. Can be used to lookup CA
392 certificates to verify
394 @param file - file with several certificates and crls
396 @param dir - hashed directory with certificates and crls
397 @param default - if true, default verify location (directory)
401 def __init__(self,file=None,dir=None,default=False):
403 Creates X509 store and installs lookup method. Optionally initializes
404 by certificates from given file or directory.
407 # Todo - set verification flags
409 self.store=libcrypto.X509_STORE_new()
410 if self.store is None:
411 raise X509Error("allocating store")
412 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
414 raise X509Error("error installing file lookup method")
415 if (file is not None):
416 if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
417 raise X509Error("error loading trusted certs from file "+file)
418 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
420 raise X509Error("error installing hashed lookup method")
422 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
423 raise X509Error("error adding hashed trusted certs dir "+dir)
425 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
426 raise X509Error("error adding default trusted certs dir ")
427 def add_cert(self,cert):
429 Explicitely adds certificate to set of trusted in the store
430 @param cert - X509 object to add
432 if not isinstance(cert,X509):
433 raise TypeError("cert should be X509")
434 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
435 def add_callback(self,callback):
437 Installs callback function, which would receive detailed information
438 about verified ceritificates
440 raise NotImplementedError
441 def setflags(self,flags):
443 Set certificate verification flags.
444 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
446 libcrypto.X509_STORE_set_flags(self.store,flags)
447 def setpurpose(self,purpose):
449 Sets certificate purpose which verified certificate should match
450 @param purpose - number from 1 to 9 or standard strind defined in Openssl
451 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
453 if isinstance(purpose,str):
454 purp_no=X509_PURPOSE_get_by_sname(purpose)
456 raise X509Error("Invalid certificate purpose '"+purpose+"'")
457 elif isinstance(purpose,int):
459 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
460 raise X509Error("cannot set purpose")
461 def setdepth(self,depth):
463 Sets the verification depth i.e. max length of certificate chain
466 libcrypto.X509_STORE_set_depth(self.store,depth)
467 def settime(self, time):
469 Set point in time used to check validity of certificates for
470 Time can be either python datetime object or number of seconds
473 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
474 d=int(time.strftime("%s"))
475 elif isinstance(time,int):
478 raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
479 raise NotImplementedError
480 class StackOfX509(object):
482 Implements OpenSSL STACK_OF(X509) object.
483 It looks much like python container types
485 def __init__(self,certs=None,ptr=None,disposable=True):
488 @param certs - list of X509 objects. If specified, read-write
489 stack is created and populated by these certificates
490 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
492 @param disposable - if True, stack created from object, returned
493 by function is copy, and can be modified and need to be
494 freeid. If false, it is just pointer into another
495 structure i.e. CMS_ContentInfo
498 self.need_free = True
499 self.ptr=libcrypto.sk_new_null()
500 if certs is not None:
503 elif certs is not None:
504 raise ValueError("cannot handle certs an ptr simultaneously")
506 self.need_free = disposable
509 return libcrypto.sk_num(self.ptr)
510 def __getitem__(self,index):
511 if index <0 or index>=len(self):
513 p=libcrypto.sk_value(self.ptr,index)
514 return X509(ptr=libcrypto.X509_dup(p))
515 def __setitem__(self,index,value):
516 if not self.need_free:
517 raise ValueError("Stack is read-only")
518 if index <0 or index>=len(self):
520 if not isinstance(value,X509):
521 raise TypeError('StackOfX508 can contain only X509 objects')
522 p=libcrypto.sk_value(self.ptr,index)
523 libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
524 libcrypto.X509_free(p)
525 def __delitem__(self,index):
526 if not self.need_free:
527 raise ValueError("Stack is read-only")
528 if index <0 or index>=len(self):
530 p=libcrypto.sk_delete(self.ptr,index)
531 libcrypto.X509_free(p)
534 libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
535 def append(self,value):
536 if not self.need_free:
537 raise ValueError("Stack is read-only")
538 if not isinstance(value,X509):
539 raise TypeError('StackOfX508 can contain only X509 objects')
540 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
541 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
542 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
543 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
544 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
545 libcrypto.ASN1_INTEGER_get.restype=c_long
546 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
547 libcrypto.X509_get_serialNumber.restype=c_void_p
548 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
549 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
550 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
551 libcrypto.X509_NAME_get_entry.restype=c_void_p
552 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
553 libcrypto.X509_STORE_new.restype=c_void_p
554 libcrypto.X509_STORE_add_lookup.restype=c_void_p
555 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
556 libcrypto.X509_LOOKUP_file.restype=c_void_p
557 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
558 libcrypto.X509_LOOKUP_ctrl.restype=c_int
559 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
560 libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
561 libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
562 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
563 libcrypto.X509_get_ext.restype=c_void_p
564 libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
565 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
566 libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
567 libcrypto.sk_set.restype=c_void_p
568 libcrypto.sk_value.argtypes=(c_void_p,c_int)
569 libcrypto.sk_value.restype=c_void_p
570 libcrypto.X509_dup.restype=c_void_p
571 libcrypto.sk_new_null.restype=c_void_p
572 libcrypto.X509_dup.argtypes=(c_void_p,)
573 libcrypto.X509_NAME_hash.restype=c_long
574 libcrypto.X509_NAME_hash.argtypes=(c_void_p,)