2 Implements interface to openssl X509 and X509Store structures,
3 I.e allows to load, analyze and verify certificates.
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
21 from datetime import timedelta
23 class UTC(datetime.tzinfo):
24 """tzinfo object for UTC.
25 If no pytz is available, we would use it.
28 def utcoffset(self, dt):
39 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
41 class _validity(Structure):
42 """ ctypes representation of X509_VAL structure
43 needed to access certificate validity period, because openssl
44 doesn't provide fuctions for it - only macros
46 _fields_ = [('notBefore',c_void_p),('notAfter',c_void_p)]
48 class _cinf(Structure):
49 """ ctypes representtion of X509_CINF structure
50 neede to access certificate data, which are accessable only
53 _fields_ = [('version',c_void_p),
54 ('serialNumber',c_void_p),
55 ('sign_alg',c_void_p),
57 ('validity',POINTER(_validity)),
60 ('issuerUID',c_void_p),
61 ('subjectUID',c_void_p),
62 ('extensions',c_void_p),
65 class _x509(Structure):
67 ctypes represntation of X509 structure needed
68 to access certificate data which are accesable only via
71 _fields_ = [('cert_info',POINTER(_cinf)),
73 ('signature',c_void_p),
74 # There are a lot of parsed extension fields there
76 _px509 = POINTER(_x509)
78 class X509Error(LibCryptoError):
80 Exception, generated when some openssl function fail
86 class X509Name(object):
88 Class which represents X.509 distinguished name - typically
89 a certificate subject name or an issuer name.
91 Now used only to represent information, extracted from the
92 certificate. Potentially can be also used to build DN when creating
93 certificate signing request
95 # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
98 def __init__(self,ptr=None,copy=False):
100 Creates a X509Name object
101 @param ptr - pointer to X509_NAME C structure (as returned by some OpenSSL functions
102 @param copy - indicates that this structure have to be freed upon object destruction
109 self.ptr=libcrypto.X509_NAME_new()
117 libcrypto.X509_NAME_free(self.ptr)
120 Produces an ascii representation of the name, escaping all symbols > 0x80
121 Probably it is not what you want, unless your native language is English
124 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
126 def __unicode__(self):
128 Produces unicode representation of the name.
131 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
135 return number of components in the name
137 return libcrypto.X509_NAME_entry_count(self.ptr)
138 def __cmp__(self,other):
142 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
143 def __eq__(self,other):
144 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
146 def __getitem__(self,key):
147 if isinstance(key,Oid):
148 # Return first matching field
149 idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
151 raise KeyError("Key not found "+repr(Oid))
152 entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
153 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
155 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
157 elif isinstance(key,int):
158 # Return OID, string tuple
159 entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
161 raise IndexError("name entry index out of range")
162 oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
163 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
165 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
166 return (oid,unicode(b))
168 def __setitem__(self,key,val):
169 if not self.writable:
170 raise ValueError("Attempt to modify constant X509 object")
172 class _x509_ext(Structure):
173 """ Represens C structure X509_EXTENSION """
174 _fields_=[("object",c_void_p),
178 class X509_EXT(object):
179 """ Python object which represents a certificate extension """
180 def __init__(self,ptr,copy=False):
181 """ Initializes from the pointer to X509_EXTENSION.
182 If copy is True, creates a copy, otherwise just
186 self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
188 self.ptr=cast(ptr,POINTER(_x509_ext))
190 libcrypto.X509_EXTENSION_free(self.ptr)
193 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
194 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
196 def __unicode__(self):
198 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
202 return Oid.fromobj(self.ptr[0].object)
205 return self.ptr[0].critical >0
206 class _X509extlist(object):
208 Represents list of certificate extensions
210 def __init__(self,cert):
213 return libcrypto.X509_get_ext_count(self.cert.cert)
214 def __getitem__(self,item):
215 p=libcrypto.X509_get_ext(self.cert.cert,item)
218 return X509_EXT(p,True)
221 Return list of extensions with given Oid
223 if not isinstance(oid,Oid):
224 raise TypeError("Need crytypescrypto.oid.Oid as argument")
229 l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
232 found.append(self[l])
234 def find_critical(self,crit=True):
236 Return list of critical extensions (or list of non-cricital, if
237 optional second argument is False
247 l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
250 found.append(self[l])
255 Represents X.509 certificate.
257 def __init__(self,data=None,ptr=None,format="PEM"):
259 Initializes certificate
260 @param data - serialized certificate in PEM or DER format.
261 @param ptr - pointer to X509, returned by some openssl function.
262 mutually exclusive with data
263 @param format - specifies data format. "PEM" or "DER", default PEM
267 raise TypeError("Cannot use data and ptr simultaneously")
270 raise TypeError("data argument is required")
274 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
276 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
277 if self.cert is None:
278 raise X509Error("error reading certificate")
279 self.extensions=_X509extlist(self)
282 Frees certificate object
284 libcrypto.X509_free(self.cert)
286 """ Returns der string of the certificate """
288 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
289 raise X509Error("error serializing certificate")
292 """ Returns valid call to the constructor """
293 return "X509(data="+repr(str(self))+",format='DER')"
296 """EVP PKEy object of certificate public key"""
297 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
298 def verify(self,store=None,chain=[],key=None):
300 Verify self. Supports verification on both X509 store object
301 or just public issuer key
302 @param store X509Store object.
303 @param chain - list of X509 objects to add into verification
304 context.These objects are untrusted, but can be used to
305 build certificate chain up to trusted object in the store
306 @param key - PKey object with open key to validate signature
308 parameters store and key are mutually exclusive. If neither
309 is specified, attempts to verify self as self-signed certificate
311 if store is not None and key is not None:
312 raise X509Error("key and store cannot be specified simultaneously")
313 if store is not None:
314 ctx=libcrypto.X509_STORE_CTX_new()
316 raise X509Error("Error allocating X509_STORE_CTX")
317 if chain is not None and len(chain)>0:
318 ch=StackOfX509(chain)
321 if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
322 raise X509Error("Error allocating X509_STORE_CTX")
323 res= libcrypto.X509_verify_cert(ctx)
324 libcrypto.X509_STORE_CTX_free(ctx)
328 if self.issuer != self.subject:
329 # Not a self-signed certificate
332 res = libcrypto.X509_verify(self.cert,key.key)
334 raise X509Error("X509_verify failed")
339 """ X509Name for certificate subject name """
340 return X509Name(libcrypto.X509_get_subject_name(self.cert))
343 """ X509Name for certificate issuer name """
344 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
347 """ Serial number of certificate as integer """
348 asnint=libcrypto.X509_get_serialNumber(self.cert)
350 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
351 return int(str(b),16)
354 """ certificate version as integer. Really certificate stores 0 for
355 version 1 and 2 for version 3, but we return 1 and 3 """
356 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
357 return libcrypto.ASN1_INTEGER_get(asn1int)+1
360 """ Certificate validity period start date """
361 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore
363 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
365 libcrypto.ASN1_TIME_print(b.bio,asn1date)
366 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
369 """ Certificate validity period end date """
370 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
372 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
374 libcrypto.ASN1_TIME_print(b.bio,asn1date)
375 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
377 """ Returns True if certificate is CA certificate """
378 return libcrypto.X509_check_ca(self.cert)>0
379 class X509Store(object):
381 Represents trusted certificate store. Can be used to lookup CA
382 certificates to verify
384 @param file - file with several certificates and crls
386 @param dir - hashed directory with certificates and crls
387 @param default - if true, default verify location (directory)
391 def __init__(self,file=None,dir=None,default=False):
393 Creates X509 store and installs lookup method. Optionally initializes
394 by certificates from given file or directory.
397 # Todo - set verification flags
399 self.store=libcrypto.X509_STORE_new()
400 if self.store is None:
401 raise X509Error("allocating store")
402 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
404 raise X509Error("error installing file lookup method")
405 if (file is not None):
406 if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
407 raise X509Error("error loading trusted certs from file "+file)
408 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
410 raise X509Error("error installing hashed lookup method")
412 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
413 raise X509Error("error adding hashed trusted certs dir "+dir)
415 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
416 raise X509Error("error adding default trusted certs dir ")
417 def add_cert(self,cert):
419 Explicitely adds certificate to set of trusted in the store
420 @param cert - X509 object to add
422 if not isinstance(cert,X509):
423 raise TypeError("cert should be X509")
424 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
425 def add_callback(self,callback):
427 Installs callback function, which would receive detailed information
428 about verified ceritificates
430 raise NotImplementedError
431 def setflags(self,flags):
433 Set certificate verification flags.
434 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
436 libcrypto.X509_STORE_set_flags(self.store,flags)
437 def setpurpose(self,purpose):
439 Sets certificate purpose which verified certificate should match
440 @param purpose - number from 1 to 9 or standard strind defined in Openssl
441 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
443 if isinstance(purpose,str):
444 purp_no=X509_PURPOSE_get_by_sname(purpose)
446 raise X509Error("Invalid certificate purpose '"+purpose+"'")
447 elif isinstance(purpose,int):
449 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
450 raise X509Error("cannot set purpose")
451 def setdepth(self,depth):
453 Sets the verification depth i.e. max length of certificate chain
456 libcrypto.X509_STORE_set_depth(self.store,depth)
457 def settime(self, time):
459 Set point in time used to check validity of certificates for
460 Time can be either python datetime object or number of seconds
463 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
464 d=int(time.strftime("%s"))
465 elif isinstance(time,int):
468 raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
469 raise NotImplementedError
470 class StackOfX509(object):
472 Implements OpenSSL STACK_OF(X509) object.
473 It looks much like python container types
475 def __init__(self,certs=None,ptr=None,disposable=True):
478 @param certs - list of X509 objects. If specified, read-write
479 stack is created and populated by these certificates
480 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
482 @param disposable - if True, stack created from object, returned
483 by function is copy, and can be modified and need to be
484 freeid. If false, it is just pointer into another
485 structure i.e. CMS_ContentInfo
488 self.need_free = True
489 self.ptr=libcrypt.sk_new_null()
490 if certs is not None:
493 elif not certs is None:
494 raise ValueError("cannot handle certs an ptr simultaneously")
496 self.need_free = disposable
499 return libcrypto.sk_num(self.ptr)
500 def __getitem__(self,index):
501 if index <0 or index>=len(self):
503 p=libcrypto.sk_value(self.ptr,index)
504 return X509(ptr=libcrypto.X509_dup(p))
505 def __putitem__(self,index,value):
506 if not self.need_free:
507 raise ValueError("Stack is read-only")
508 if index <0 or index>=len(self):
510 p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
511 libcrypto.X509_free(p)
512 def __delitem__(self,index):
513 if not self.need_free:
514 raise ValueError("Stack is read-only")
515 if index <0 or index>=len(self):
517 p=libcrypto.sk_delete(self.ptr,index)
518 libcrypto.X509_free(p)
521 libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
522 def append(self,value):
523 if not self.need_free:
524 raise ValueError("Stack is read-only")
525 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
526 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
527 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
528 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
529 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
530 libcrypto.ASN1_INTEGER_get.restype=c_long
531 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
532 libcrypto.X509_get_serialNumber.restype=c_void_p
533 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
534 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
535 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
536 libcrypto.X509_NAME_get_entry.restype=c_void_p
537 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
538 libcrypto.X509_STORE_new.restype=c_void_p
539 libcrypto.X509_STORE_add_lookup.restype=c_void_p
540 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
541 libcrypto.X509_LOOKUP_file.restype=c_void_p
542 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
543 libcrypto.X509_LOOKUP_ctrl.restype=c_int
544 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
545 libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
546 libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
547 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
548 libcrypto.X509_get_ext.restype=c_void_p
549 libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)