]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blob - ctypescrypto/x509.py
40263cffea5d1e26528bb989231a92808bfc86a2
[oss/ctypescrypto.git] / ctypescrypto / x509.py
1 """
2 Implements interface to openssl X509 and X509Store structures,
3 I.e allows to load, analyze and verify certificates.
4
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
7 """
8
9
10
11 from ctypes import c_void_p, c_long, c_int, POINTER, c_char_p, Structure, cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
18
19 try:
20     from pytz import utc
21 except ImportError:
22     from datetime import timedelta, tzinfo
23     ZERO = timedelta(0)
24     class UTC(tzinfo):
25         """tzinfo object for UTC.
26             If no pytz is available, we would use it.
27         """
28         def utcoffset(self, dt):
29             return ZERO
30
31         def tzname(self, dt):
32             return "UTC"
33
34         def dst(self, dt):
35             return ZERO
36
37     utc = UTC()
38
39 __all__ = ['X509', 'X509Error', 'X509Name', 'X509Store', 'StackOfX509']
40
41 class _validity(Structure):
42     """ ctypes representation of X509_VAL structure
43         needed to access certificate validity period, because openssl
44         doesn't provide fuctions for it - only macros
45     """
46     _fields_ = [('notBefore', c_void_p), ('notAfter', c_void_p)]
47
48 class _cinf(Structure):
49     """ ctypes representtion of X509_CINF structure
50         neede to access certificate data, which are accessable only
51         via macros
52     """
53     _fields_ = [('version', c_void_p),
54                 ('serialNumber', c_void_p),
55                 ('sign_alg', c_void_p),
56                 ('issuer', c_void_p),
57                 ('validity', POINTER(_validity)),
58                 ('subject', c_void_p),
59                 ('pubkey', c_void_p),
60                 ('issuerUID', c_void_p),
61                 ('subjectUID', c_void_p),
62                 ('extensions', c_void_p),
63                ]
64
65 class _x509(Structure):
66     """
67     ctypes represntation of X509 structure needed
68     to access certificate data which are accesable only via
69     macros, not functions
70     """
71     _fields_ = [('cert_info', POINTER(_cinf)),
72                 ('sig_alg', c_void_p),
73                 ('signature', c_void_p),
74                 # There are a lot of parsed extension fields there
75                ]
76 _px509 = POINTER(_x509)
77
78 class X509Error(LibCryptoError):
79     """
80     Exception, generated when some openssl function fail
81     during X509 operation
82     """
83     pass
84
85
86 class X509Name(object):
87     """
88     Class which represents X.509 distinguished name - typically
89     a certificate subject name or an issuer name.
90
91     Now used only to represent information, extracted from the
92     certificate. Potentially can be also used to build DN when creating
93     certificate signing request
94     """
95     # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
96     PRINT_FLAG = 0x10010
97     ESC_MSB = 4
98     def __init__(self, ptr=None, copy=False):
99         """
100         Creates a X509Name object
101         @param ptr - pointer to X509_NAME C structure (as returned by some
102                      OpenSSL functions
103         @param copy - indicates that this structure have to be freed upon
104                       object destruction
105         """
106         if ptr is not None:
107             self.ptr = ptr
108             self.need_free = copy
109             self.writable = False
110         else:
111             self.ptr = libcrypto.X509_NAME_new()
112             self.need_free = True
113             self.writable = True
114
115     def __del__(self):
116         """
117         Frees if neccessary
118         """
119         if self.need_free:
120             libcrypto.X509_NAME_free(self.ptr)
121     def __str__(self):
122         """
123         Produces an ascii representation of the name, escaping all
124         symbols > 0x80.  Probably it is not what you want, unless
125         your native language is English
126         """
127         bio = Membio()
128         libcrypto.X509_NAME_print_ex(bio.bio, self.ptr, 0,
129                                      self.PRINT_FLAG | self.ESC_MSB)
130         return str(bio)
131
132     def __unicode__(self):
133         """
134         Produces unicode representation of the name.
135         """
136         bio = Membio()
137         libcrypto.X509_NAME_print_ex(bio.bio, self.ptr, 0, self.PRINT_FLAG)
138         return unicode(bio)
139     def __len__(self):
140         """
141         return number of components in the name
142         """
143         return libcrypto.X509_NAME_entry_count(self.ptr)
144     def __cmp__(self, other):
145         """
146         Compares X509 names
147         """
148         return libcrypto.X509_NAME_cmp(self.ptr, other.ptr)
149     def __eq__(self, other):
150         return libcrypto.X509_NAME_cmp(self.ptr, other.ptr) == 0
151
152     def __getitem__(self, key):
153         if isinstance(key, Oid):
154             # Return first matching field
155             idx = libcrypto.X509_NAME_get_index_by_NID(self.ptr, key.nid, -1)
156             if idx < 0:
157                 raise KeyError("Key not found " + str(Oid))
158             entry = libcrypto.X509_NAME_get_entry(self.ptr, idx)
159             value = libcrypto.X509_NAME_ENTRY_get_data(entry)
160             bio = Membio()
161             libcrypto.ASN1_STRING_print_ex(bio.bio, value, self.PRINT_FLAG)
162             return unicode(bio)
163         elif isinstance(key, (int, long)):
164             # Return OID, string tuple
165             entry = libcrypto.X509_NAME_get_entry(self.ptr, key)
166             if entry is None:
167                 raise IndexError("name entry index out of range")
168             oid = Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
169             value = libcrypto.X509_NAME_ENTRY_get_data(entry)
170             bio = Membio()
171             libcrypto.ASN1_STRING_print_ex(bio.bio, value, self.PRINT_FLAG)
172             return (oid, unicode(bio))
173         else:
174             raise TypeError("X509 NAME can be indexed by Oids or integers only")
175
176     def __setitem__(self, key, val):
177         if not self.writable:
178             raise ValueError("Attempt to modify constant X509 object")
179         else:
180             raise NotImplementedError
181     def __delitem__(self, key):
182         if not self.writable:
183             raise ValueError("Attempt to modify constant X509 object")
184         else:
185             raise NotImplementedError
186     def __hash__(self):
187         return libcrypto.X509_NAME_hash(self.ptr)
188
189 class _x509_ext(Structure):
190     """ Represens C structure X509_EXTENSION """
191     _fields_ = [("object", c_void_p),
192                 ("critical", c_int),
193                 ("value", c_void_p)
194                ]
195
196 class X509_EXT(object):
197     """ Python object which represents a certificate extension """
198     def __init__(self, ptr, copy=False):
199         """ Initializes from the pointer to X509_EXTENSION.
200             If copy is True, creates a copy, otherwise just
201             stores pointer.
202         """
203         if copy:
204             self.ptr = libcrypto.X509_EXTENSION_dup(ptr)
205         else:
206             self.ptr = cast(ptr, POINTER(_x509_ext))
207     def __del__(self):
208         libcrypto.X509_EXTENSION_free(self.ptr)
209     def __str__(self):
210         bio = Membio()
211         libcrypto.X509V3_EXT_print(bio.bio, self.ptr, 0x20010, 0)
212         return str(bio)
213     def __unicode__(self):
214         bio = Membio()
215         libcrypto.X509V3_EXT_print(bio.bio, self.ptr, 0x20010, 0)
216         return unicode(bio)
217     @property
218     def oid(self):
219         "Returns OID of the extension"
220         return Oid.fromobj(self.ptr[0].object)
221     @property
222     def critical(self):
223         "Returns True if extensin have critical flag set"
224         return self.ptr[0].critical > 0
225
226 class _X509extlist(object):
227     """
228     Represents list of certificate extensions. Really it keeps
229     reference to certificate object
230     """
231     def __init__(self, cert):
232         """
233         Initialize from X509 object
234         """
235         self.cert = cert
236
237     def __len__(self):
238         """
239         Returns number of extensions
240         """
241         return libcrypto.X509_get_ext_count(self.cert.cert)
242
243     def __getitem__(self, item):
244         """
245         Returns extension by index, creating a copy
246         """
247         ext_ptr = libcrypto.X509_get_ext(self.cert.cert, item)
248         if ext_ptr is None:
249             raise IndexError
250         return X509_EXT(ext_ptr, True)
251     def find(self, oid):
252         """
253         Return list of extensions with given Oid
254         """
255         if not isinstance(oid, Oid):
256             raise TypeError("Need crytypescrypto.oid.Oid as argument")
257         found = []
258         index = -1
259         end = len(self)
260         while True:
261             index = libcrypto.X509_get_ext_by_NID(self.cert.cert, oid.nid,
262                                                   index)
263             if index >= end or index < 0:
264                 break
265             found.append(self[index])
266         return found
267
268     def find_critical(self, crit=True):
269         """
270         Return list of critical extensions (or list of non-cricital, if
271         optional second argument is False
272         """
273         if crit:
274             flag = 1
275         else:
276             flag = 0
277         found = []
278         end = len(self)
279         index = -1
280         while True:
281             index = libcrypto.X509_get_ext_by_critical(self.cert.cert, flag,
282                                                        index)
283             if index >= end or index < 0:
284                 break
285             found.append(self[index])
286         return found
287
288 def _X509__asn1date_to_datetime(asn1date):
289     """ 
290     Converts openssl ASN1_TIME object to python datetime.datetime
291     """
292     bio = Membio()
293     libcrypto.ASN1_TIME_print(bio.bio, asn1date)
294     pydate = datetime.strptime(str(bio), "%b %d %H:%M:%S %Y %Z")
295     return pydate.replace(tzinfo=utc)
296
297 class X509(object):
298     """
299     Represents X.509 certificate.
300     """
301     def __init__(self, data=None, ptr=None, format="PEM"):
302         """
303         Initializes certificate
304         @param data - serialized certificate in PEM or DER format.
305         @param ptr - pointer to X509, returned by some openssl function.
306             mutually exclusive with data
307         @param format - specifies data format. "PEM" or "DER", default PEM
308         """
309         if ptr is not None:
310             if data is not None:
311                 raise TypeError("Cannot use data and ptr simultaneously")
312             self.cert = ptr
313         elif data is None:
314             raise TypeError("data argument is required")
315         else:
316             bio = Membio(data)
317             if format == "PEM":
318                 self.cert = libcrypto.PEM_read_bio_X509(bio.bio, None, None,
319                                                         None)
320             else:
321                 self.cert = libcrypto.d2i_X509_bio(bio.bio, None)
322             if self.cert is None:
323                 raise X509Error("error reading certificate")
324         self.extensions = _X509extlist(self)
325     def __del__(self):
326         """
327         Frees certificate object
328         """
329         libcrypto.X509_free(self.cert)
330     def __str__(self):
331         """ Returns der string of the certificate """
332         bio = Membio()
333         if libcrypto.i2d_X509_bio(bio.bio, self.cert) == 0:
334             raise X509Error("error serializing certificate")
335         return str(bio)
336     def __repr__(self):
337         """ Returns valid call to the constructor """
338         return "X509(data=" + repr(str(self)) + ",format='DER')"
339     @property
340     def pubkey(self):
341         """EVP PKEy object of certificate public key"""
342         return PKey(ptr=libcrypto.X509_get_pubkey(self.cert, False))
343     def pem(self):
344         """ Returns PEM represntation of the certificate """
345         bio = Membio()
346         if libcrypto.PEM_write_bio_X509(bio.bio, self.cert) == 0:
347             raise X509Error("error serializing certificate")
348         return str(bio)
349     def verify(self, store=None, chain=None, key=None):
350         """
351         Verify self. Supports verification on both X509 store object
352         or just public issuer key
353         @param store X509Store object.
354         @param chain - list of X509 objects to add into verification
355             context.These objects are untrusted, but can be used to
356             build certificate chain up to trusted object in the store
357         @param key - PKey object with open key to validate signature
358
359         parameters store and key are mutually exclusive. If neither
360         is specified, attempts to verify self as self-signed certificate
361         """
362         if store is not None and key is not None:
363             raise X509Error("key and store cannot be specified simultaneously")
364         if store is not None:
365             ctx = libcrypto.X509_STORE_CTX_new()
366             if ctx is None:
367                 raise X509Error("Error allocating X509_STORE_CTX")
368             if chain is not None and len(chain) > 0:
369                 chain_ptr = StackOfX509(chain).ptr
370             else:
371                 chain_ptr = None
372             if libcrypto.X509_STORE_CTX_init(ctx, store.store, self.cert,
373                                              chain_ptr) < 0:
374                 raise X509Error("Error allocating X509_STORE_CTX")
375             res = libcrypto.X509_verify_cert(ctx)
376             libcrypto.X509_STORE_CTX_free(ctx)
377             return res > 0
378         else:
379             if key is None:
380                 if self.issuer != self.subject:
381                     # Not a self-signed certificate
382                     return False
383                 key = self.pubkey
384             res = libcrypto.X509_verify(self.cert, key.key)
385             if res < 0:
386                 raise X509Error("X509_verify failed")
387             return res > 0
388
389     @property
390     def subject(self):
391         """ X509Name for certificate subject name """
392         return X509Name(libcrypto.X509_get_subject_name(self.cert))
393     @property
394     def issuer(self):
395         """ X509Name for certificate issuer name """
396         return X509Name(libcrypto.X509_get_issuer_name(self.cert))
397     @property
398     def serial(self):
399         """ Serial number of certificate as integer """
400         asnint = libcrypto.X509_get_serialNumber(self.cert)
401         bio = Membio()
402         libcrypto.i2a_ASN1_INTEGER(bio.bio, asnint)
403         return int(str(bio), 16)
404     @property
405     def version(self):
406         """
407         certificate version as integer. Really certificate stores 0 for
408         version 1 and 2 for version 3, but we return 1 and 3
409         """
410         asn1int = cast(self.cert, _px509)[0].cert_info[0].version
411         return libcrypto.ASN1_INTEGER_get(asn1int) + 1
412     @property
413     def startDate(self):
414         """ Certificate validity period start date """
415         # Need deep poke into certificate structure
416         # (x)->cert_info->validity->notBefore
417         asn1 = cast(self.cert, _px509)[0].cert_info[0].validity[0].notBefore
418         return __asn1date_to_datetime(asn1)
419     @property
420     def endDate(self):
421         """ Certificate validity period end date """
422         # Need deep poke into certificate structure
423         # (x)->cert_info->validity->notAfter
424         asn1 = cast(self.cert, _px509)[0].cert_info[0].validity[0].notAfter
425         return __asn1date_to_datetime(asn1)
426     def check_ca(self):
427         """ Returns True if certificate is CA certificate """
428         return libcrypto.X509_check_ca(self.cert) > 0
429
430 class X509Store(object):
431     """
432     Represents trusted certificate store. Can be used to lookup CA
433     certificates to verify
434
435     @param file - file with several certificates and crls
436             to load into store
437     @param dir - hashed directory with certificates and crls
438     @param default - if true, default verify location (directory)
439         is installed
440
441     """
442     def __init__(self, file=None, dir=None, default=False):
443         """
444         Creates X509 store and installs lookup method. Optionally initializes
445         by certificates from given file or directory.
446         """
447         #
448         # Todo - set verification flags
449         #
450         self.store = libcrypto.X509_STORE_new()
451         if self.store is None:
452             raise X509Error("allocating store")
453         lookup = libcrypto.X509_STORE_add_lookup(self.store,
454                                                  libcrypto.X509_LOOKUP_file())
455         if lookup is None:
456             raise X509Error("error installing file lookup method")
457         if file is not None:
458             if not libcrypto.X509_LOOKUP_ctrl(lookup, 1, file, 1, None) > 0:
459                 raise X509Error("error loading trusted certs from file "+file)
460         lookup = libcrypto.X509_STORE_add_lookup(self.store,
461                                              libcrypto.X509_LOOKUP_hash_dir())
462         if lookup is None:
463             raise X509Error("error installing hashed lookup method")
464         if dir is not None:
465             if not libcrypto.X509_LOOKUP_ctrl(lookup, 2, dir, 1, None) > 0:
466                 raise X509Error("error adding hashed  trusted certs dir "+dir)
467         if default:
468             if not libcrypto.X509_LOOKUP_ctrl(lookup, 2, None, 3, None) > 0:
469                 raise X509Error("error adding default trusted certs dir ")
470     def add_cert(self, cert):
471         """
472         Explicitely adds certificate to set of trusted in the store
473         @param cert - X509 object to add
474         """
475         if not isinstance(cert, X509):
476             raise TypeError("cert should be X509")
477         libcrypto.X509_STORE_add_cert(self.store, cert.cert)
478     def add_callback(self, callback):
479         """
480         Installs callback function, which would receive detailed information
481         about verified ceritificates
482         """
483         raise NotImplementedError
484     def setflags(self, flags):
485         """
486         Set certificate verification flags.
487         @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
488         """
489         libcrypto.X509_STORE_set_flags(self.store, flags)
490     def setpurpose(self, purpose):
491         """
492         Sets certificate purpose which verified certificate should match
493         @param purpose - number from 1 to 9 or standard strind defined
494                          in Openssl
495         possible strings - sslcient,sslserver, nssslserver, smimesign,i
496                          smimeencrypt, crlsign, any, ocsphelper
497         """
498         if isinstance(purpose, str):
499             purp_no = libcrypto.X509_PURPOSE_get_by_sname(purpose)
500             if purp_no <= 0:
501                 raise X509Error("Invalid certificate purpose '%s'" % purpose)
502         elif isinstance(purpose, int):
503             purp_no = purpose
504         if libcrypto.X509_STORE_set_purpose(self.store, purp_no) <= 0:
505             raise X509Error("cannot set purpose")
506     def setdepth(self, depth):
507         """
508         Sets the verification depth i.e. max length of certificate chain
509         which is acceptable
510         """
511         libcrypto.X509_STORE_set_depth(self.store, depth)
512     def settime(self, time):
513         """
514         Set point in time used to check validity of certificates for
515         Time can be either python datetime object or number of seconds
516         sinse epoch
517         """
518         if isinstance(time, datetime) or isinstance(time,
519                                                              datetime.date):
520             seconds = int(time.strftime("%s"))
521         elif isinstance(time, int):
522             seconds = time
523         else:
524             raise TypeError("datetime.date, datetime.datetime or integer " +
525                             "is required as time argument")
526         raise NotImplementedError
527 class StackOfX509(object):
528     """
529     Implements OpenSSL STACK_OF(X509) object.
530     It looks much like python container types
531     """
532     def __init__(self, certs=None, ptr=None, disposable=True):
533         """
534         Create stack
535         @param certs - list of X509 objects. If specified, read-write
536             stack is created and populated by these certificates
537         @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
538             some functions
539         @param disposable - if True, stack created from object, returned
540                 by function is copy, and can be modified and need to be
541                 freeid. If false, it is just pointer into another
542                 structure i.e. CMS_ContentInfo
543         """
544         if  ptr is None:
545             self.need_free = True
546             self.ptr = libcrypto.sk_new_null()
547             if certs is not None:
548                 for crt in certs:
549                     self.append(crt)
550         elif certs is not None:
551             raise ValueError("cannot handle certs an ptr simultaneously")
552         else:
553             self.need_free = disposable
554             self.ptr = ptr
555     def __len__(self):
556         return libcrypto.sk_num(self.ptr)
557     def __getitem__(self, index):
558         if index < 0 or index >= len(self):
559             raise IndexError
560         p = libcrypto.sk_value(self.ptr, index)
561         return X509(ptr=libcrypto.X509_dup(p))
562     def __setitem__(self, index, value):
563         if not self.need_free:
564             raise ValueError("Stack is read-only")
565         if index < 0 or index >= len(self):
566             raise IndexError
567         if not isinstance(value, X509):
568             raise TypeError('StackOfX508 can contain only X509 objects')
569         p = libcrypto.sk_value(self.ptr, index)
570         libcrypto.sk_set(self.ptr, index, libcrypto.X509_dup(value.cert))
571         libcrypto.X509_free(p)
572     def __delitem__(self, index):
573         if not self.need_free:
574             raise ValueError("Stack is read-only")
575         if index < 0 or index >= len(self):
576             raise IndexError
577         p = libcrypto.sk_delete(self.ptr, index)
578         libcrypto.X509_free(p)
579     def __del__(self):
580         if self.need_free:
581             libcrypto.sk_pop_free(self.ptr, libcrypto.X509_free)
582     def append(self, value):
583         """ Adds certificate to stack """
584         if not self.need_free:
585             raise ValueError("Stack is read-only")
586         if not isinstance(value, X509):
587             raise TypeError('StackOfX508 can contain only X509 objects')
588         libcrypto.sk_push(self.ptr, libcrypto.X509_dup(value.cert))
589
590 libcrypto.i2a_ASN1_INTEGER.argtypes = (c_void_p, c_void_p)
591 libcrypto.ASN1_STRING_print_ex.argtypes = (c_void_p, c_void_p, c_long)
592 libcrypto.PEM_read_bio_X509.restype = c_void_p
593 libcrypto.PEM_read_bio_X509.argtypes = (c_void_p, POINTER(c_void_p),
594                                         c_void_p, c_void_p)
595 libcrypto.PEM_write_bio_X509.restype = c_int
596 libcrypto.PEM_write_bio_X509.argtypes = (c_void_p, c_void_p)
597 libcrypto.ASN1_TIME_print.argtypes = (c_void_p, c_void_p)
598 libcrypto.ASN1_INTEGER_get.argtypes = (c_void_p, )
599 libcrypto.ASN1_INTEGER_get.restype = c_long
600 libcrypto.X509_get_serialNumber.argtypes = (c_void_p, )
601 libcrypto.X509_get_serialNumber.restype = c_void_p
602 libcrypto.X509_NAME_ENTRY_get_object.restype = c_void_p
603 libcrypto.X509_NAME_ENTRY_get_object.argtypes = (c_void_p, )
604 libcrypto.OBJ_obj2nid.argtypes = (c_void_p, )
605 libcrypto.X509_NAME_get_entry.restype = c_void_p
606 libcrypto.X509_NAME_get_entry.argtypes = (c_void_p, c_int)
607 libcrypto.X509_STORE_new.restype = c_void_p
608 libcrypto.X509_STORE_add_lookup.restype = c_void_p
609 libcrypto.X509_STORE_add_lookup.argtypes = (c_void_p, c_void_p)
610 libcrypto.X509_LOOKUP_file.restype = c_void_p
611 libcrypto.X509_LOOKUP_hash_dir.restype = c_void_p
612 libcrypto.X509_LOOKUP_ctrl.restype = c_int
613 libcrypto.X509_LOOKUP_ctrl.argtypes = (c_void_p, c_int, c_char_p, c_long,
614                                        POINTER(c_char_p))
615 libcrypto.X509_EXTENSION_dup.argtypes = (c_void_p, )
616 libcrypto.X509_EXTENSION_dup.restype = POINTER(_x509_ext)
617 libcrypto.X509V3_EXT_print.argtypes = (c_void_p, POINTER(_x509_ext), c_long,
618                                        c_int)
619 libcrypto.X509_get_ext.restype = c_void_p
620 libcrypto.X509_get_ext.argtypes = (c_void_p, c_int)
621 libcrypto.X509V3_EXT_print.argtypes = (c_void_p, POINTER(_x509_ext), c_long,
622                                        c_int)
623 libcrypto.sk_set.argtypes = (c_void_p, c_int, c_void_p)
624 libcrypto.sk_set.restype = c_void_p
625 libcrypto.sk_value.argtypes = (c_void_p, c_int)
626 libcrypto.sk_value.restype = c_void_p
627 libcrypto.X509_dup.restype = c_void_p
628 libcrypto.sk_new_null.restype = c_void_p
629 libcrypto.X509_dup.argtypes = (c_void_p, )
630 libcrypto.X509_NAME_hash.restype = c_long
631 libcrypto.X509_NAME_hash.argtypes = (c_void_p, )