2 Implements interface to openssl X509 and X509Store structures,
3 I.e allows to load, analyze and verify certificates.
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
11 from ctypes import c_void_p, c_long, c_ulong, c_int, POINTER, c_char_p, Structure, cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
22 from datetime import timedelta, tzinfo
25 """tzinfo object for UTC.
26 If no pytz is available, we would use it.
28 def utcoffset(self, dt):
39 __all__ = ['X509', 'X509Error', 'X509Name', 'X509Store', 'StackOfX509']
41 if hasattr(libcrypto,"X509_get_version"):
43 # If it is OpenSSL 1.1 or above, use accessor functions
44 _X509_get_version = libcrypto.X509_get_version
45 _X509_get_version.restype = c_long
46 _X509_get_version.argtypes = (c_void_p,)
48 _X509_get_notBefore=libcrypto.X509_getm_notBefore
49 _X509_get_notBefore.restype = c_void_p
50 _X509_get_notBefore.argtypes = (c_void_p,)
52 _X509_get_notAfter=libcrypto.X509_getm_notAfter
53 _X509_get_notAfter.restype = c_void_p
54 _X509_get_notAfter.argtypes = (c_void_p,)
56 # Otherwise declare X509 structure internals and define deep poke
58 class _validity(Structure):
59 """ ctypes representation of X509_VAL structure
60 needed to access certificate validity period, because openssl
61 doesn't provide fuctions for it - only macros
63 _fields_ = [('notBefore', c_void_p), ('notAfter', c_void_p)]
65 class _cinf(Structure):
66 """ ctypes representtion of X509_CINF structure
67 neede to access certificate data, which are accessable only
70 _fields_ = [('version', c_void_p),
71 ('serialNumber', c_void_p),
72 ('sign_alg', c_void_p),
74 ('validity', POINTER(_validity)),
75 ('subject', c_void_p),
77 ('issuerUID', c_void_p),
78 ('subjectUID', c_void_p),
79 ('extensions', c_void_p),
82 class _x509(Structure):
84 ctypes represntation of X509 structure needed
85 to access certificate data which are accesable only via
88 _fields_ = [('cert_info', POINTER(_cinf)),
89 ('sig_alg', c_void_p),
90 ('signature', c_void_p),
91 # There are a lot of parsed extension fields there
93 _px509 = POINTER(_x509)
94 def _X509_get_version(ptr):
95 asn1int = cast(ptr, _px509)[0].cert_info[0].version
96 return libcrypto.ASN1_INTEGER_get(asn1int)
98 def _X509_get_notBefore(ptr):
99 # (x)->cert_info->validity->notBefore
100 return cast(ptr, _px509)[0].cert_info[0].validity[0].notBefore
101 def _X509_get_notAfter(ptr):
102 return cast(ptr, _px509)[0].cert_info[0].validity[0].notAfter
104 if hasattr(libcrypto,'sk_num'):
105 sk_num = libcrypto.sk_num
106 sk_set = libcrypto.sk_set
107 sk_value = libcrypto.sk_value
108 sk_delete = libcrypto.sk_delete
109 sk_new_null = libcrypto.sk_new_null
110 sk_pop_free = libcrypto.sk_pop_free
111 sk_push = libcrypto.sk_push
113 sk_num = libcrypto.OPENSSL_sk_num
114 sk_set = libcrypto.OPENSSL_sk_set
115 sk_value = libcrypto.OPENSSL_sk_value
116 sk_delete = libcrypto.OPENSSL_sk_delete
117 sk_new_null = libcrypto.OPENSSL_sk_new_null
118 sk_pop_free = libcrypto.OPENSSL_sk_pop_free
119 sk_push = libcrypto.OPENSSL_sk_push
120 class X509Error(LibCryptoError):
122 Exception, generated when some openssl function fail
123 during X509 operation
128 class X509Name(object):
130 Class which represents X.509 distinguished name - typically
131 a certificate subject name or an issuer name.
133 Now used only to represent information, extracted from the
134 certificate. Potentially can be also used to build DN when creating
135 certificate signing request
137 # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
140 def __init__(self, ptr=None, copy=False):
142 Creates a X509Name object
143 @param ptr - pointer to X509_NAME C structure (as returned by some
145 @param copy - indicates that this structure have to be freed upon
150 self.need_free = copy
151 self.writable = False
153 self.ptr = libcrypto.X509_NAME_new()
154 self.need_free = True
162 libcrypto.X509_NAME_free(self.ptr)
165 Produces an ascii representation of the name, escaping all
166 symbols > 0x80. Probably it is not what you want, unless
167 your native language is English
170 libcrypto.X509_NAME_print_ex(bio.bio, self.ptr, 0,
171 self.PRINT_FLAG | self.ESC_MSB)
174 def __unicode__(self):
176 Produces unicode representation of the name.
179 libcrypto.X509_NAME_print_ex(bio.bio, self.ptr, 0, self.PRINT_FLAG)
183 return number of components in the name
185 return libcrypto.X509_NAME_entry_count(self.ptr)
186 def __cmp__(self, other):
190 return libcrypto.X509_NAME_cmp(self.ptr, other.ptr)
191 def __eq__(self, other):
192 return libcrypto.X509_NAME_cmp(self.ptr, other.ptr) == 0
194 def __getitem__(self, key):
195 if isinstance(key, Oid):
196 # Return first matching field
197 idx = libcrypto.X509_NAME_get_index_by_NID(self.ptr, key.nid, -1)
199 raise KeyError("Key not found " + str(Oid))
200 entry = libcrypto.X509_NAME_get_entry(self.ptr, idx)
201 value = libcrypto.X509_NAME_ENTRY_get_data(entry)
203 libcrypto.ASN1_STRING_print_ex(bio.bio, value, self.PRINT_FLAG)
205 elif isinstance(key, (int, long)):
206 # Return OID, string tuple
207 entry = libcrypto.X509_NAME_get_entry(self.ptr, key)
209 raise IndexError("name entry index out of range")
210 oid = Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
211 value = libcrypto.X509_NAME_ENTRY_get_data(entry)
213 libcrypto.ASN1_STRING_print_ex(bio.bio, value, self.PRINT_FLAG)
214 return (oid, unicode(bio))
216 raise TypeError("X509 NAME can be indexed by Oids or integers only")
218 def __setitem__(self, key, val):
219 if not self.writable:
220 raise ValueError("Attempt to modify constant X509 object")
222 raise NotImplementedError
223 def __delitem__(self, key):
224 if not self.writable:
225 raise ValueError("Attempt to modify constant X509 object")
227 raise NotImplementedError
229 return libcrypto.X509_NAME_hash(self.ptr)
231 class _x509_ext(Structure):
232 """ Represens C structure X509_EXTENSION """
233 _fields_ = [("object", c_void_p),
238 class X509_EXT(object):
239 """ Python object which represents a certificate extension """
240 def __init__(self, ptr, copy=False):
241 """ Initializes from the pointer to X509_EXTENSION.
242 If copy is True, creates a copy, otherwise just
246 self.ptr = libcrypto.X509_EXTENSION_dup(ptr)
248 self.ptr = cast(ptr, POINTER(_x509_ext))
250 libcrypto.X509_EXTENSION_free(self.ptr)
253 libcrypto.X509V3_EXT_print(bio.bio, self.ptr, 0x20010, 0)
255 def __unicode__(self):
257 libcrypto.X509V3_EXT_print(bio.bio, self.ptr, 0x20010, 0)
261 "Returns OID of the extension"
262 return Oid.fromobj(self.ptr[0].object)
265 "Returns True if extensin have critical flag set"
266 return self.ptr[0].critical > 0
268 class _X509extlist(object):
270 Represents list of certificate extensions. Really it keeps
271 reference to certificate object
273 def __init__(self, cert):
275 Initialize from X509 object
281 Returns number of extensions
283 return libcrypto.X509_get_ext_count(self.cert.cert)
285 def __getitem__(self, item):
287 Returns extension by index, creating a copy
289 ext_ptr = libcrypto.X509_get_ext(self.cert.cert, item)
292 return X509_EXT(ext_ptr, True)
295 Return list of extensions with given Oid
297 if not isinstance(oid, Oid):
298 raise TypeError("Need crytypescrypto.oid.Oid as argument")
303 index = libcrypto.X509_get_ext_by_NID(self.cert.cert, oid.nid,
305 if index >= end or index < 0:
307 found.append(self[index])
310 def find_critical(self, crit=True):
312 Return list of critical extensions (or list of non-cricital, if
313 optional second argument is False
323 index = libcrypto.X509_get_ext_by_critical(self.cert.cert, flag,
325 if index >= end or index < 0:
327 found.append(self[index])
330 def _X509__asn1date_to_datetime(asn1date):
332 Converts openssl ASN1_TIME object to python datetime.datetime
335 libcrypto.ASN1_TIME_print(bio.bio, asn1date)
336 pydate = datetime.strptime(str(bio), "%b %d %H:%M:%S %Y %Z")
337 return pydate.replace(tzinfo=utc)
341 Represents X.509 certificate.
343 def __init__(self, data=None, ptr=None, format="PEM"):
345 Initializes certificate
346 @param data - serialized certificate in PEM or DER format.
347 @param ptr - pointer to X509, returned by some openssl function.
348 mutually exclusive with data
349 @param format - specifies data format. "PEM" or "DER", default PEM
353 raise TypeError("Cannot use data and ptr simultaneously")
356 raise TypeError("data argument is required")
360 self.cert = libcrypto.PEM_read_bio_X509(bio.bio, None, None,
363 self.cert = libcrypto.d2i_X509_bio(bio.bio, None)
364 if self.cert is None:
365 raise X509Error("error reading certificate")
366 self.extensions = _X509extlist(self)
369 Frees certificate object
371 libcrypto.X509_free(self.cert)
373 """ Returns der string of the certificate """
375 if libcrypto.i2d_X509_bio(bio.bio, self.cert) == 0:
376 raise X509Error("error serializing certificate")
379 """ Returns valid call to the constructor """
380 return "X509(data=" + repr(str(self)) + ",format='DER')"
383 """EVP PKEy object of certificate public key"""
384 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert, False))
386 """ Returns PEM represntation of the certificate """
388 if libcrypto.PEM_write_bio_X509(bio.bio, self.cert) == 0:
389 raise X509Error("error serializing certificate")
391 def verify(self, store=None, chain=None, key=None):
393 Verify self. Supports verification on both X509 store object
394 or just public issuer key
395 @param store X509Store object.
396 @param chain - list of X509 objects to add into verification
397 context.These objects are untrusted, but can be used to
398 build certificate chain up to trusted object in the store
399 @param key - PKey object with open key to validate signature
401 parameters store and key are mutually exclusive. If neither
402 is specified, attempts to verify self as self-signed certificate
404 if store is not None and key is not None:
405 raise X509Error("key and store cannot be specified simultaneously")
406 if store is not None:
407 ctx = libcrypto.X509_STORE_CTX_new()
409 raise X509Error("Error allocating X509_STORE_CTX")
410 if chain is not None and len(chain) > 0:
411 chain_ptr = StackOfX509(chain).ptr
414 if libcrypto.X509_STORE_CTX_init(ctx, store.store, self.cert,
416 raise X509Error("Error allocating X509_STORE_CTX")
417 res = libcrypto.X509_verify_cert(ctx)
418 libcrypto.X509_STORE_CTX_free(ctx)
422 if self.issuer != self.subject:
423 # Not a self-signed certificate
426 res = libcrypto.X509_verify(self.cert, key.key)
428 raise X509Error("X509_verify failed")
433 """ X509Name for certificate subject name """
434 return X509Name(libcrypto.X509_get_subject_name(self.cert))
437 """ X509Name for certificate issuer name """
438 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
441 """ Serial number of certificate as integer """
442 asnint = libcrypto.X509_get_serialNumber(self.cert)
444 libcrypto.i2a_ASN1_INTEGER(bio.bio, asnint)
445 return int(str(bio), 16)
449 certificate version as integer. Really certificate stores 0 for
450 version 1 and 2 for version 3, but we return 1 and 3
452 return _X509_get_version(self.cert) + 1
455 """ Certificate validity period start date """
456 asn1 = _X509_get_notBefore(self.cert)
457 return __asn1date_to_datetime(asn1)
460 """ Certificate validity period end date """
461 asn1 = _X509_get_notAfter(self.cert)
462 return __asn1date_to_datetime(asn1)
464 """ Returns True if certificate is CA certificate """
465 return libcrypto.X509_check_ca(self.cert) > 0
467 class X509Store(object):
469 Represents trusted certificate store. Can be used to lookup CA
470 certificates to verify
472 @param file - file with several certificates and crls
474 @param dir - hashed directory with certificates and crls
475 @param default - if true, default verify location (directory)
479 def __init__(self, file=None, dir=None, default=False):
481 Creates X509 store and installs lookup method. Optionally initializes
482 by certificates from given file or directory.
485 # Todo - set verification flags
487 self.store = libcrypto.X509_STORE_new()
488 if self.store is None:
489 raise X509Error("allocating store")
490 lookup = libcrypto.X509_STORE_add_lookup(self.store,
491 libcrypto.X509_LOOKUP_file())
493 raise X509Error("error installing file lookup method")
495 if not libcrypto.X509_LOOKUP_ctrl(lookup, 1, file, 1, None) > 0:
496 raise X509Error("error loading trusted certs from file "+file)
497 lookup = libcrypto.X509_STORE_add_lookup(self.store,
498 libcrypto.X509_LOOKUP_hash_dir())
500 raise X509Error("error installing hashed lookup method")
502 if not libcrypto.X509_LOOKUP_ctrl(lookup, 2, dir, 1, None) > 0:
503 raise X509Error("error adding hashed trusted certs dir "+dir)
505 if not libcrypto.X509_LOOKUP_ctrl(lookup, 2, None, 3, None) > 0:
506 raise X509Error("error adding default trusted certs dir ")
507 def add_cert(self, cert):
509 Explicitely adds certificate to set of trusted in the store
510 @param cert - X509 object to add
512 if not isinstance(cert, X509):
513 raise TypeError("cert should be X509")
514 libcrypto.X509_STORE_add_cert(self.store, cert.cert)
515 def add_callback(self, callback):
517 Installs callback function, which would receive detailed information
518 about verified ceritificates
520 raise NotImplementedError
521 def setflags(self, flags):
523 Set certificate verification flags.
524 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
526 libcrypto.X509_STORE_set_flags(self.store, flags)
527 def setpurpose(self, purpose):
529 Sets certificate purpose which verified certificate should match
530 @param purpose - number from 1 to 9 or standard strind defined
532 possible strings - sslcient,sslserver, nssslserver, smimesign,i
533 smimeencrypt, crlsign, any, ocsphelper
535 if isinstance(purpose, str):
536 purp_no = libcrypto.X509_PURPOSE_get_by_sname(purpose)
538 raise X509Error("Invalid certificate purpose '%s'" % purpose)
539 elif isinstance(purpose, int):
541 if libcrypto.X509_STORE_set_purpose(self.store, purp_no) <= 0:
542 raise X509Error("cannot set purpose")
543 def setdepth(self, depth):
545 Sets the verification depth i.e. max length of certificate chain
548 libcrypto.X509_STORE_set_depth(self.store, depth)
549 def settime(self, time):
551 Set point in time used to check validity of certificates for
552 Time can be either python datetime object or number of seconds
555 if isinstance(time, datetime) or isinstance(time,
557 seconds = int(time.strftime("%s"))
558 elif isinstance(time, int):
561 raise TypeError("datetime.date, datetime.datetime or integer " +
562 "is required as time argument")
563 raise NotImplementedError
564 class StackOfX509(object):
566 Implements OpenSSL STACK_OF(X509) object.
567 It looks much like python container types
569 def __init__(self, certs=None, ptr=None, disposable=True):
572 @param certs - list of X509 objects. If specified, read-write
573 stack is created and populated by these certificates
574 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
576 @param disposable - if True, stack created from object, returned
577 by function is copy, and can be modified and need to be
578 freeid. If false, it is just pointer into another
579 structure i.e. CMS_ContentInfo
581 self.need_free = False
583 self.need_free = True
584 self.ptr = sk_new_null()
585 if certs is not None:
588 elif certs is not None:
589 raise ValueError("cannot handle certs an ptr simultaneously")
591 self.need_free = disposable
594 return sk_num(self.ptr)
595 def __getitem__(self, index):
596 if index < 0 or index >= len(self):
598 p = sk_value(self.ptr, index)
599 return X509(ptr=libcrypto.X509_dup(p))
600 def __setitem__(self, index, value):
601 if not self.need_free:
602 raise ValueError("Stack is read-only")
603 if index < 0 or index >= len(self):
605 if not isinstance(value, X509):
606 raise TypeError('StackOfX509 can contain only X509 objects')
607 p = sk_value(self.ptr, index)
608 sk_set(self.ptr, index, libcrypto.X509_dup(value.cert))
609 libcrypto.X509_free(p)
610 def __delitem__(self, index):
611 if not self.need_free:
612 raise ValueError("Stack is read-only")
613 if index < 0 or index >= len(self):
615 p = sk_delete(self.ptr, index)
616 libcrypto.X509_free(p)
619 sk_pop_free(self.ptr, libcrypto.X509_free)
620 def append(self, value):
621 """ Adds certificate to stack """
622 if not self.need_free:
623 raise ValueError("Stack is read-only")
624 if not isinstance(value, X509):
625 raise TypeError('StackOfX509 can contain only X509 objects')
626 sk_push(self.ptr, libcrypto.X509_dup(value.cert))
628 libcrypto.d2i_X509_bio.argtypes = (c_void_p,POINTER(c_void_p))
629 libcrypto.X509_free.argtypes = (c_void_p,)
630 libcrypto.X509_dup.restype = c_void_p
631 libcrypto.X509_dup.argtypes = (c_void_p, )
632 libcrypto.i2a_ASN1_INTEGER.argtypes = (c_void_p, c_void_p)
633 libcrypto.ASN1_STRING_print_ex.argtypes = (c_void_p, c_void_p, c_long)
634 libcrypto.PEM_read_bio_X509.restype = c_void_p
635 libcrypto.PEM_read_bio_X509.argtypes = (c_void_p, POINTER(c_void_p),
637 libcrypto.PEM_write_bio_X509.restype = c_int
638 libcrypto.PEM_write_bio_X509.argtypes = (c_void_p, c_void_p)
639 libcrypto.ASN1_TIME_print.argtypes = (c_void_p, c_void_p)
640 libcrypto.ASN1_INTEGER_get.argtypes = (c_void_p, )
641 libcrypto.ASN1_INTEGER_get.restype = c_long
642 libcrypto.X509_check_ca.argtypes = (c_void_p, )
643 libcrypto.X509_get_serialNumber.argtypes = (c_void_p, )
644 libcrypto.X509_get_serialNumber.restype = c_void_p
645 libcrypto.X509_get_subject_name.argtypes = (c_void_p, )
646 libcrypto.X509_get_subject_name.restype = c_void_p
647 libcrypto.X509_get_issuer_name.argtypes = (c_void_p, )
648 libcrypto.X509_get_issuer_name.restype = c_void_p
649 libcrypto.X509_NAME_ENTRY_get_object.restype = c_void_p
650 libcrypto.X509_NAME_ENTRY_get_object.argtypes = (c_void_p, )
651 libcrypto.X509_NAME_ENTRY_get_data.restype = c_void_p
652 libcrypto.X509_NAME_ENTRY_get_data.argtypes = (c_void_p, )
653 libcrypto.OBJ_obj2nid.argtypes = (c_void_p, )
654 libcrypto.X509_NAME_get_entry.restype = c_void_p
655 libcrypto.X509_NAME_get_entry.argtypes = (c_void_p, c_int)
656 libcrypto.X509_STORE_new.restype = c_void_p
657 libcrypto.X509_STORE_add_lookup.restype = c_void_p
658 libcrypto.X509_STORE_add_lookup.argtypes = (c_void_p, c_void_p)
659 libcrypto.X509_STORE_add_cert.argtypes = (c_void_p, c_void_p)
660 libcrypto.X509_STORE_CTX_new.restype = c_void_p
661 libcrypto.X509_STORE_CTX_free.argtypes = (c_void_p,)
662 libcrypto.X509_STORE_CTX_init.argtypes = (c_void_p, c_void_p, c_void_p,
664 libcrypto.X509_STORE_set_depth.argtypes = (c_void_p, c_int)
665 libcrypto.X509_STORE_set_flags.argtypes = (c_void_p, c_ulong)
666 libcrypto.X509_STORE_set_purpose.argtypes = (c_void_p, c_int)
667 libcrypto.X509_LOOKUP_file.restype = c_void_p
668 libcrypto.X509_LOOKUP_hash_dir.restype = c_void_p
669 libcrypto.X509_LOOKUP_ctrl.restype = c_int
670 libcrypto.X509_LOOKUP_ctrl.argtypes = (c_void_p, c_int, c_char_p, c_long,
672 libcrypto.X509_EXTENSION_free.argtypes = (c_void_p, )
673 libcrypto.X509_EXTENSION_dup.argtypes = (c_void_p, )
674 libcrypto.X509_EXTENSION_dup.restype = POINTER(_x509_ext)
675 libcrypto.X509V3_EXT_print.argtypes = (c_void_p, POINTER(_x509_ext), c_long,
677 libcrypto.X509_get_ext.restype = c_void_p
678 libcrypto.X509_get_ext.argtypes = (c_void_p, c_int)
679 libcrypto.X509_get_ext_by_critical.argtypes = (c_void_p, c_int, c_int)
680 libcrypto.X509_get_ext_by_NID.argtypes = (c_void_p, c_int, c_int)
681 libcrypto.X509_get_ext_count.argtypes = (c_void_p, )
682 libcrypto.X509_get_pubkey.restype = c_void_p
683 libcrypto.X509_get_pubkey.argtypes = (c_void_p, )
684 libcrypto.X509V3_EXT_print.argtypes = (c_void_p, POINTER(_x509_ext), c_long,
686 libcrypto.X509_LOOKUP_file.restype = c_void_p
687 libcrypto.X509_LOOKUP_hash_dir.restype = c_void_p
688 libcrypto.X509_NAME_cmp.argtypes = (c_void_p, c_void_p)
689 libcrypto.X509_NAME_entry_count.argtypes = (c_void_p,)
690 libcrypto.X509_NAME_free.argtypes = (c_void_p,)
691 libcrypto.X509_NAME_new.restype = c_void_p
692 libcrypto.X509_NAME_print_ex.argtypes = (c_void_p, c_void_p, c_int, c_ulong)
693 libcrypto.X509_PURPOSE_get_by_sname.argtypes=(c_char_p,)
694 libcrypto.X509_verify.argtypes = (c_void_p, c_void_p)
695 libcrypto.X509_verify_cert.argtypes = (c_void_p,)
696 sk_num.restype = c_int
697 sk_num.argtypes= (c_void_p,)
698 sk_set.argtypes = (c_void_p, c_int, c_void_p)
699 sk_set.restype = c_void_p
700 sk_value.argtypes = (c_void_p, c_int)
701 sk_value.restype = c_void_p
702 sk_delete.argtypes = (c_void_p, c_int)
703 sk_delete.restype = c_void_p
704 sk_new_null.restype = c_void_p
705 sk_pop_free.argtypes = (c_void_p, c_void_p)
706 sk_push.argtypes = (c_void_p, c_void_p)
707 libcrypto.X509_NAME_hash.restype = c_long
708 libcrypto.X509_NAME_hash.argtypes = (c_void_p, )
709 libcrypto.X509_NAME_get_index_by_NID.argtypes = (c_void_p, c_int, c_int)