2 Implements interface to openssl X509 and X509Store structures,
3 I.e allows to load, analyze and verify certificates.
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
21 from datetime import timedelta,tzinfo
24 """tzinfo object for UTC.
25 If no pytz is available, we would use it.
28 def utcoffset(self, dt):
39 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
41 class _validity(Structure):
42 """ ctypes representation of X509_VAL structure
43 needed to access certificate validity period, because openssl
44 doesn't provide fuctions for it - only macros
46 _fields_ = [('notBefore',c_void_p),('notAfter',c_void_p)]
48 class _cinf(Structure):
49 """ ctypes representtion of X509_CINF structure
50 neede to access certificate data, which are accessable only
53 _fields_ = [('version',c_void_p),
54 ('serialNumber',c_void_p),
55 ('sign_alg',c_void_p),
57 ('validity',POINTER(_validity)),
60 ('issuerUID',c_void_p),
61 ('subjectUID',c_void_p),
62 ('extensions',c_void_p),
65 class _x509(Structure):
67 ctypes represntation of X509 structure needed
68 to access certificate data which are accesable only via
71 _fields_ = [('cert_info',POINTER(_cinf)),
73 ('signature',c_void_p),
74 # There are a lot of parsed extension fields there
76 _px509 = POINTER(_x509)
78 class X509Error(LibCryptoError):
80 Exception, generated when some openssl function fail
86 class X509Name(object):
88 Class which represents X.509 distinguished name - typically
89 a certificate subject name or an issuer name.
91 Now used only to represent information, extracted from the
92 certificate. Potentially can be also used to build DN when creating
93 certificate signing request
95 # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
98 def __init__(self,ptr=None,copy=False):
100 Creates a X509Name object
101 @param ptr - pointer to X509_NAME C structure (as returned by some OpenSSL functions
102 @param copy - indicates that this structure have to be freed upon object destruction
109 self.ptr=libcrypto.X509_NAME_new()
117 libcrypto.X509_NAME_free(self.ptr)
120 Produces an ascii representation of the name, escaping all symbols > 0x80
121 Probably it is not what you want, unless your native language is English
124 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
126 def __unicode__(self):
128 Produces unicode representation of the name.
131 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
135 return number of components in the name
137 return libcrypto.X509_NAME_entry_count(self.ptr)
138 def __cmp__(self,other):
142 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
143 def __eq__(self,other):
144 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
146 def __getitem__(self,key):
147 if isinstance(key,Oid):
148 # Return first matching field
149 idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
151 raise KeyError("Key not found "+str(Oid))
152 entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
153 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
155 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
157 elif isinstance(key,(int,long)):
158 # Return OID, string tuple
159 entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
161 raise IndexError("name entry index out of range")
162 oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
163 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
165 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
166 return (oid,unicode(b))
168 raise TypeError("X509 NAME can be indexed by Oids or integers only")
170 def __setitem__(self,key,val):
171 if not self.writable:
172 raise ValueError("Attempt to modify constant X509 object")
174 raise NotImplementedError
175 def __delitem__(self,key):
176 if not self.writable:
177 raise ValueError("Attempt to modify constant X509 object")
179 raise NotImplementedError
181 class _x509_ext(Structure):
182 """ Represens C structure X509_EXTENSION """
183 _fields_=[("object",c_void_p),
187 class X509_EXT(object):
188 """ Python object which represents a certificate extension """
189 def __init__(self,ptr,copy=False):
190 """ Initializes from the pointer to X509_EXTENSION.
191 If copy is True, creates a copy, otherwise just
195 self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
197 self.ptr=cast(ptr,POINTER(_x509_ext))
199 libcrypto.X509_EXTENSION_free(self.ptr)
202 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
204 def __unicode__(self):
206 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
210 return Oid.fromobj(self.ptr[0].object)
213 return self.ptr[0].critical >0
214 class _X509extlist(object):
216 Represents list of certificate extensions
218 def __init__(self,cert):
221 return libcrypto.X509_get_ext_count(self.cert.cert)
222 def __getitem__(self,item):
223 p=libcrypto.X509_get_ext(self.cert.cert,item)
226 return X509_EXT(p,True)
229 Return list of extensions with given Oid
231 if not isinstance(oid,Oid):
232 raise TypeError("Need crytypescrypto.oid.Oid as argument")
237 l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
240 found.append(self[l])
242 def find_critical(self,crit=True):
244 Return list of critical extensions (or list of non-cricital, if
245 optional second argument is False
255 l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
258 found.append(self[l])
263 Represents X.509 certificate.
265 def __init__(self,data=None,ptr=None,format="PEM"):
267 Initializes certificate
268 @param data - serialized certificate in PEM or DER format.
269 @param ptr - pointer to X509, returned by some openssl function.
270 mutually exclusive with data
271 @param format - specifies data format. "PEM" or "DER", default PEM
275 raise TypeError("Cannot use data and ptr simultaneously")
278 raise TypeError("data argument is required")
282 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
284 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
285 if self.cert is None:
286 raise X509Error("error reading certificate")
287 self.extensions=_X509extlist(self)
290 Frees certificate object
292 libcrypto.X509_free(self.cert)
294 """ Returns der string of the certificate """
296 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
297 raise X509Error("error serializing certificate")
300 """ Returns valid call to the constructor """
301 return "X509(data="+repr(str(self))+",format='DER')"
304 """EVP PKEy object of certificate public key"""
305 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
306 def verify(self,store=None,chain=[],key=None):
308 Verify self. Supports verification on both X509 store object
309 or just public issuer key
310 @param store X509Store object.
311 @param chain - list of X509 objects to add into verification
312 context.These objects are untrusted, but can be used to
313 build certificate chain up to trusted object in the store
314 @param key - PKey object with open key to validate signature
316 parameters store and key are mutually exclusive. If neither
317 is specified, attempts to verify self as self-signed certificate
319 if store is not None and key is not None:
320 raise X509Error("key and store cannot be specified simultaneously")
321 if store is not None:
322 ctx=libcrypto.X509_STORE_CTX_new()
324 raise X509Error("Error allocating X509_STORE_CTX")
325 if chain is not None and len(chain)>0:
326 ch=StackOfX509(chain)
329 if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
330 raise X509Error("Error allocating X509_STORE_CTX")
331 res= libcrypto.X509_verify_cert(ctx)
332 libcrypto.X509_STORE_CTX_free(ctx)
336 if self.issuer != self.subject:
337 # Not a self-signed certificate
340 res = libcrypto.X509_verify(self.cert,key.key)
342 raise X509Error("X509_verify failed")
347 """ X509Name for certificate subject name """
348 return X509Name(libcrypto.X509_get_subject_name(self.cert))
351 """ X509Name for certificate issuer name """
352 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
355 """ Serial number of certificate as integer """
356 asnint=libcrypto.X509_get_serialNumber(self.cert)
358 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
359 return int(str(b),16)
362 """ certificate version as integer. Really certificate stores 0 for
363 version 1 and 2 for version 3, but we return 1 and 3 """
364 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
365 return libcrypto.ASN1_INTEGER_get(asn1int)+1
368 """ Certificate validity period start date """
369 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore
371 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
373 libcrypto.ASN1_TIME_print(b.bio,asn1date)
374 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
377 """ Certificate validity period end date """
378 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
380 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
382 libcrypto.ASN1_TIME_print(b.bio,asn1date)
383 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
385 """ Returns True if certificate is CA certificate """
386 return libcrypto.X509_check_ca(self.cert)>0
387 class X509Store(object):
389 Represents trusted certificate store. Can be used to lookup CA
390 certificates to verify
392 @param file - file with several certificates and crls
394 @param dir - hashed directory with certificates and crls
395 @param default - if true, default verify location (directory)
399 def __init__(self,file=None,dir=None,default=False):
401 Creates X509 store and installs lookup method. Optionally initializes
402 by certificates from given file or directory.
405 # Todo - set verification flags
407 self.store=libcrypto.X509_STORE_new()
408 if self.store is None:
409 raise X509Error("allocating store")
410 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
412 raise X509Error("error installing file lookup method")
413 if (file is not None):
414 if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
415 raise X509Error("error loading trusted certs from file "+file)
416 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
418 raise X509Error("error installing hashed lookup method")
420 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
421 raise X509Error("error adding hashed trusted certs dir "+dir)
423 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
424 raise X509Error("error adding default trusted certs dir ")
425 def add_cert(self,cert):
427 Explicitely adds certificate to set of trusted in the store
428 @param cert - X509 object to add
430 if not isinstance(cert,X509):
431 raise TypeError("cert should be X509")
432 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
433 def add_callback(self,callback):
435 Installs callback function, which would receive detailed information
436 about verified ceritificates
438 raise NotImplementedError
439 def setflags(self,flags):
441 Set certificate verification flags.
442 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
444 libcrypto.X509_STORE_set_flags(self.store,flags)
445 def setpurpose(self,purpose):
447 Sets certificate purpose which verified certificate should match
448 @param purpose - number from 1 to 9 or standard strind defined in Openssl
449 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
451 if isinstance(purpose,str):
452 purp_no=X509_PURPOSE_get_by_sname(purpose)
454 raise X509Error("Invalid certificate purpose '"+purpose+"'")
455 elif isinstance(purpose,int):
457 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
458 raise X509Error("cannot set purpose")
459 def setdepth(self,depth):
461 Sets the verification depth i.e. max length of certificate chain
464 libcrypto.X509_STORE_set_depth(self.store,depth)
465 def settime(self, time):
467 Set point in time used to check validity of certificates for
468 Time can be either python datetime object or number of seconds
471 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
472 d=int(time.strftime("%s"))
473 elif isinstance(time,int):
476 raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
477 raise NotImplementedError
478 class StackOfX509(object):
480 Implements OpenSSL STACK_OF(X509) object.
481 It looks much like python container types
483 def __init__(self,certs=None,ptr=None,disposable=True):
486 @param certs - list of X509 objects. If specified, read-write
487 stack is created and populated by these certificates
488 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
490 @param disposable - if True, stack created from object, returned
491 by function is copy, and can be modified and need to be
492 freeid. If false, it is just pointer into another
493 structure i.e. CMS_ContentInfo
496 self.need_free = True
497 self.ptr=libcrypto.sk_new_null()
498 if certs is not None:
501 elif certs is not None:
502 raise ValueError("cannot handle certs an ptr simultaneously")
504 self.need_free = disposable
507 return libcrypto.sk_num(self.ptr)
508 def __getitem__(self,index):
509 if index <0 or index>=len(self):
511 p=libcrypto.sk_value(self.ptr,index)
512 return X509(ptr=libcrypto.X509_dup(p))
513 def __setitem__(self,index,value):
514 if not self.need_free:
515 raise ValueError("Stack is read-only")
516 if index <0 or index>=len(self):
518 if not isinstance(value,X509):
519 raise TypeError('StackOfX508 can contain only X509 objects')
520 p=libcrypto.sk_value(self.ptr,index)
521 libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
522 libcrypto.X509_free(p)
523 def __delitem__(self,index):
524 if not self.need_free:
525 raise ValueError("Stack is read-only")
526 if index <0 or index>=len(self):
528 p=libcrypto.sk_delete(self.ptr,index)
529 libcrypto.X509_free(p)
532 libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
533 def append(self,value):
534 if not self.need_free:
535 raise ValueError("Stack is read-only")
536 if not isinstance(value,X509):
537 raise TypeError('StackOfX508 can contain only X509 objects')
538 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
539 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
540 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
541 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
542 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
543 libcrypto.ASN1_INTEGER_get.restype=c_long
544 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
545 libcrypto.X509_get_serialNumber.restype=c_void_p
546 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
547 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
548 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
549 libcrypto.X509_NAME_get_entry.restype=c_void_p
550 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
551 libcrypto.X509_STORE_new.restype=c_void_p
552 libcrypto.X509_STORE_add_lookup.restype=c_void_p
553 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
554 libcrypto.X509_LOOKUP_file.restype=c_void_p
555 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
556 libcrypto.X509_LOOKUP_ctrl.restype=c_int
557 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
558 libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
559 libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
560 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
561 libcrypto.X509_get_ext.restype=c_void_p
562 libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
563 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
564 libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
565 libcrypto.sk_set.restype=c_void_p
566 libcrypto.sk_value.argtypes=(c_void_p,c_int)
567 libcrypto.sk_value.restype=c_void_p
568 libcrypto.X509_dup.restype=c_void_p
569 libcrypto.sk_new_null.restype=c_void_p
570 libcrypto.X509_dup.argtypes=(c_void_p,)