2 Implements interface to openssl X509 and X509Store structures,
3 I.e allows to load, analyze and verify certificates.
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
21 from datetime import timedelta,tzinfo
24 """tzinfo object for UTC.
25 If no pytz is available, we would use it.
28 def utcoffset(self, dt):
39 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
41 class _validity(Structure):
42 """ ctypes representation of X509_VAL structure
43 needed to access certificate validity period, because openssl
44 doesn't provide fuctions for it - only macros
46 _fields_ = [('notBefore',c_void_p),('notAfter',c_void_p)]
48 class _cinf(Structure):
49 """ ctypes representtion of X509_CINF structure
50 neede to access certificate data, which are accessable only
53 _fields_ = [('version',c_void_p),
54 ('serialNumber',c_void_p),
55 ('sign_alg',c_void_p),
57 ('validity',POINTER(_validity)),
60 ('issuerUID',c_void_p),
61 ('subjectUID',c_void_p),
62 ('extensions',c_void_p),
65 class _x509(Structure):
67 ctypes represntation of X509 structure needed
68 to access certificate data which are accesable only via
71 _fields_ = [('cert_info',POINTER(_cinf)),
73 ('signature',c_void_p),
74 # There are a lot of parsed extension fields there
76 _px509 = POINTER(_x509)
78 class X509Error(LibCryptoError):
80 Exception, generated when some openssl function fail
86 class X509Name(object):
88 Class which represents X.509 distinguished name - typically
89 a certificate subject name or an issuer name.
91 Now used only to represent information, extracted from the
92 certificate. Potentially can be also used to build DN when creating
93 certificate signing request
95 # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
98 def __init__(self,ptr=None,copy=False):
100 Creates a X509Name object
101 @param ptr - pointer to X509_NAME C structure (as returned by some OpenSSL functions
102 @param copy - indicates that this structure have to be freed upon object destruction
109 self.ptr=libcrypto.X509_NAME_new()
117 libcrypto.X509_NAME_free(self.ptr)
120 Produces an ascii representation of the name, escaping all symbols > 0x80
121 Probably it is not what you want, unless your native language is English
124 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
126 def __unicode__(self):
128 Produces unicode representation of the name.
131 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
135 return number of components in the name
137 return libcrypto.X509_NAME_entry_count(self.ptr)
138 def __cmp__(self,other):
142 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
143 def __eq__(self,other):
144 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
146 def __getitem__(self,key):
147 if isinstance(key,Oid):
148 # Return first matching field
149 idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
151 raise KeyError("Key not found "+str(Oid))
152 entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
153 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
155 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
157 elif isinstance(key,int):
158 # Return OID, string tuple
159 entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
161 raise IndexError("name entry index out of range")
162 oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
163 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
165 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
166 return (oid,unicode(b))
168 def __setitem__(self,key,val):
169 if not self.writable:
170 raise ValueError("Attempt to modify constant X509 object")
172 raise NotImplementedError
174 class _x509_ext(Structure):
175 """ Represens C structure X509_EXTENSION """
176 _fields_=[("object",c_void_p),
180 class X509_EXT(object):
181 """ Python object which represents a certificate extension """
182 def __init__(self,ptr,copy=False):
183 """ Initializes from the pointer to X509_EXTENSION.
184 If copy is True, creates a copy, otherwise just
188 self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
190 self.ptr=cast(ptr,POINTER(_x509_ext))
192 libcrypto.X509_EXTENSION_free(self.ptr)
195 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
197 def __unicode__(self):
199 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
203 return Oid.fromobj(self.ptr[0].object)
206 return self.ptr[0].critical >0
207 class _X509extlist(object):
209 Represents list of certificate extensions
211 def __init__(self,cert):
214 return libcrypto.X509_get_ext_count(self.cert.cert)
215 def __getitem__(self,item):
216 p=libcrypto.X509_get_ext(self.cert.cert,item)
219 return X509_EXT(p,True)
222 Return list of extensions with given Oid
224 if not isinstance(oid,Oid):
225 raise TypeError("Need crytypescrypto.oid.Oid as argument")
230 l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
233 found.append(self[l])
235 def find_critical(self,crit=True):
237 Return list of critical extensions (or list of non-cricital, if
238 optional second argument is False
248 l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
251 found.append(self[l])
256 Represents X.509 certificate.
258 def __init__(self,data=None,ptr=None,format="PEM"):
260 Initializes certificate
261 @param data - serialized certificate in PEM or DER format.
262 @param ptr - pointer to X509, returned by some openssl function.
263 mutually exclusive with data
264 @param format - specifies data format. "PEM" or "DER", default PEM
268 raise TypeError("Cannot use data and ptr simultaneously")
271 raise TypeError("data argument is required")
275 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
277 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
278 if self.cert is None:
279 raise X509Error("error reading certificate")
280 self.extensions=_X509extlist(self)
283 Frees certificate object
285 libcrypto.X509_free(self.cert)
287 """ Returns der string of the certificate """
289 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
290 raise X509Error("error serializing certificate")
293 """ Returns valid call to the constructor """
294 return "X509(data="+repr(str(self))+",format='DER')"
297 """EVP PKEy object of certificate public key"""
298 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
299 def verify(self,store=None,chain=[],key=None):
301 Verify self. Supports verification on both X509 store object
302 or just public issuer key
303 @param store X509Store object.
304 @param chain - list of X509 objects to add into verification
305 context.These objects are untrusted, but can be used to
306 build certificate chain up to trusted object in the store
307 @param key - PKey object with open key to validate signature
309 parameters store and key are mutually exclusive. If neither
310 is specified, attempts to verify self as self-signed certificate
312 if store is not None and key is not None:
313 raise X509Error("key and store cannot be specified simultaneously")
314 if store is not None:
315 ctx=libcrypto.X509_STORE_CTX_new()
317 raise X509Error("Error allocating X509_STORE_CTX")
318 if chain is not None and len(chain)>0:
319 ch=StackOfX509(chain)
322 if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
323 raise X509Error("Error allocating X509_STORE_CTX")
324 res= libcrypto.X509_verify_cert(ctx)
325 libcrypto.X509_STORE_CTX_free(ctx)
329 if self.issuer != self.subject:
330 # Not a self-signed certificate
333 res = libcrypto.X509_verify(self.cert,key.key)
335 raise X509Error("X509_verify failed")
340 """ X509Name for certificate subject name """
341 return X509Name(libcrypto.X509_get_subject_name(self.cert))
344 """ X509Name for certificate issuer name """
345 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
348 """ Serial number of certificate as integer """
349 asnint=libcrypto.X509_get_serialNumber(self.cert)
351 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
352 return int(str(b),16)
355 """ certificate version as integer. Really certificate stores 0 for
356 version 1 and 2 for version 3, but we return 1 and 3 """
357 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
358 return libcrypto.ASN1_INTEGER_get(asn1int)+1
361 """ Certificate validity period start date """
362 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore
364 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
366 libcrypto.ASN1_TIME_print(b.bio,asn1date)
367 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
370 """ Certificate validity period end date """
371 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
373 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
375 libcrypto.ASN1_TIME_print(b.bio,asn1date)
376 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
378 """ Returns True if certificate is CA certificate """
379 return libcrypto.X509_check_ca(self.cert)>0
380 class X509Store(object):
382 Represents trusted certificate store. Can be used to lookup CA
383 certificates to verify
385 @param file - file with several certificates and crls
387 @param dir - hashed directory with certificates and crls
388 @param default - if true, default verify location (directory)
392 def __init__(self,file=None,dir=None,default=False):
394 Creates X509 store and installs lookup method. Optionally initializes
395 by certificates from given file or directory.
398 # Todo - set verification flags
400 self.store=libcrypto.X509_STORE_new()
401 if self.store is None:
402 raise X509Error("allocating store")
403 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
405 raise X509Error("error installing file lookup method")
406 if (file is not None):
407 if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
408 raise X509Error("error loading trusted certs from file "+file)
409 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
411 raise X509Error("error installing hashed lookup method")
413 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
414 raise X509Error("error adding hashed trusted certs dir "+dir)
416 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
417 raise X509Error("error adding default trusted certs dir ")
418 def add_cert(self,cert):
420 Explicitely adds certificate to set of trusted in the store
421 @param cert - X509 object to add
423 if not isinstance(cert,X509):
424 raise TypeError("cert should be X509")
425 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
426 def add_callback(self,callback):
428 Installs callback function, which would receive detailed information
429 about verified ceritificates
431 raise NotImplementedError
432 def setflags(self,flags):
434 Set certificate verification flags.
435 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
437 libcrypto.X509_STORE_set_flags(self.store,flags)
438 def setpurpose(self,purpose):
440 Sets certificate purpose which verified certificate should match
441 @param purpose - number from 1 to 9 or standard strind defined in Openssl
442 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
444 if isinstance(purpose,str):
445 purp_no=X509_PURPOSE_get_by_sname(purpose)
447 raise X509Error("Invalid certificate purpose '"+purpose+"'")
448 elif isinstance(purpose,int):
450 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
451 raise X509Error("cannot set purpose")
452 def setdepth(self,depth):
454 Sets the verification depth i.e. max length of certificate chain
457 libcrypto.X509_STORE_set_depth(self.store,depth)
458 def settime(self, time):
460 Set point in time used to check validity of certificates for
461 Time can be either python datetime object or number of seconds
464 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
465 d=int(time.strftime("%s"))
466 elif isinstance(time,int):
469 raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
470 raise NotImplementedError
471 class StackOfX509(object):
473 Implements OpenSSL STACK_OF(X509) object.
474 It looks much like python container types
476 def __init__(self,certs=None,ptr=None,disposable=True):
479 @param certs - list of X509 objects. If specified, read-write
480 stack is created and populated by these certificates
481 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
483 @param disposable - if True, stack created from object, returned
484 by function is copy, and can be modified and need to be
485 freeid. If false, it is just pointer into another
486 structure i.e. CMS_ContentInfo
489 self.need_free = True
490 self.ptr=libcrypt.sk_new_null()
491 if certs is not None:
494 elif not certs is None:
495 raise ValueError("cannot handle certs an ptr simultaneously")
497 self.need_free = disposable
500 return libcrypto.sk_num(self.ptr)
501 def __getitem__(self,index):
502 if index <0 or index>=len(self):
504 p=libcrypto.sk_value(self.ptr,index)
505 return X509(ptr=libcrypto.X509_dup(p))
506 def __putitem__(self,index,value):
507 if not self.need_free:
508 raise ValueError("Stack is read-only")
509 if index <0 or index>=len(self):
511 p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
512 libcrypto.X509_free(p)
513 def __delitem__(self,index):
514 if not self.need_free:
515 raise ValueError("Stack is read-only")
516 if index <0 or index>=len(self):
518 p=libcrypto.sk_delete(self.ptr,index)
519 libcrypto.X509_free(p)
522 libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
523 def append(self,value):
524 if not self.need_free:
525 raise ValueError("Stack is read-only")
526 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
527 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
528 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
529 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
530 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
531 libcrypto.ASN1_INTEGER_get.restype=c_long
532 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
533 libcrypto.X509_get_serialNumber.restype=c_void_p
534 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
535 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
536 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
537 libcrypto.X509_NAME_get_entry.restype=c_void_p
538 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
539 libcrypto.X509_STORE_new.restype=c_void_p
540 libcrypto.X509_STORE_add_lookup.restype=c_void_p
541 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
542 libcrypto.X509_LOOKUP_file.restype=c_void_p
543 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
544 libcrypto.X509_LOOKUP_ctrl.restype=c_int
545 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
546 libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
547 libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
548 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
549 libcrypto.X509_get_ext.restype=c_void_p
550 libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
551 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)