]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blob - ctypescrypto/x509.py
159956d72f457dc3640a95db75007092f8408bb0
[oss/ctypescrypto.git] / ctypescrypto / x509.py
1 """
2 Implements interface to openssl X509 and X509Store structures, 
3 I.e allows to load, analyze and verify certificates.
4
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
7 """
8
9
10
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
18 try:
19     from pytz import utc
20 except ImportError:
21     from datetime import timedelta,tzinfo
22     ZERO=timedelta(0)
23     class UTC(tzinfo):
24         """tzinfo object for UTC. 
25             If no pytz is available, we would use it.
26         """
27
28         def utcoffset(self, dt):
29             return ZERO
30
31         def tzname(self, dt):
32             return "UTC"
33
34         def dst(self, dt):
35             return ZERO
36
37     utc=UTC()
38
39 __all__ = ['X509','X509Error','X509Name','X509Store','StackOfX509']
40
41 class _validity(Structure):
42     """ ctypes representation of X509_VAL structure 
43         needed to access certificate validity period, because openssl
44         doesn't provide fuctions for it - only macros
45     """
46     _fields_ =  [('notBefore',c_void_p),('notAfter',c_void_p)]
47
48 class _cinf(Structure):
49     """ ctypes representtion of X509_CINF structure 
50         neede to access certificate data, which are accessable only
51         via macros
52     """
53     _fields_ = [('version',c_void_p),
54         ('serialNumber',c_void_p),
55         ('sign_alg',c_void_p),
56         ('issuer',c_void_p),
57         ('validity',POINTER(_validity)),
58         ('subject',c_void_p),
59         ('pubkey',c_void_p),
60         ('issuerUID',c_void_p),
61         ('subjectUID',c_void_p),
62         ('extensions',c_void_p),
63         ]
64
65 class _x509(Structure):
66     """
67     ctypes represntation of X509 structure needed
68     to access certificate data which are accesable only via
69     macros, not functions
70     """
71     _fields_ = [('cert_info',POINTER(_cinf)),
72                 ('sig_alg',c_void_p),
73                 ('signature',c_void_p),
74                 # There are a lot of parsed extension fields there
75                 ]
76 _px509 = POINTER(_x509)
77
78 class X509Error(LibCryptoError):
79     """
80     Exception, generated when some openssl function fail
81     during X509 operation
82     """
83     pass
84
85
86 class X509Name(object):
87     """
88     Class which represents X.509 distinguished name - typically 
89     a certificate subject name or an issuer name.
90
91     Now used only to represent information, extracted from the
92     certificate. Potentially can be also used to build DN when creating
93     certificate signing request
94     """
95     # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
96     PRINT_FLAG=0x10010
97     ESC_MSB=4
98     def __init__(self,ptr=None,copy=False):
99         """
100         Creates a X509Name object
101         @param ptr - pointer to X509_NAME C structure (as returned by some  OpenSSL functions
102         @param copy - indicates that this structure have to be freed upon object destruction
103         """
104         if ptr is not None:
105             self.ptr=ptr
106             self.need_free=copy
107             self.writable=False
108         else:
109             self.ptr=libcrypto.X509_NAME_new()
110             self.need_free=True
111             self.writable=True
112     def __del__(self):
113         """
114         Frees if neccessary
115         """
116         if self.need_free:
117             libcrypto.X509_NAME_free(self.ptr)
118     def __str__(self):
119         """
120         Produces an ascii representation of the name, escaping all symbols > 0x80
121         Probably it is not what you want, unless your native language is English
122         """
123         b=Membio()
124         libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
125         return str(b)
126     def __unicode__(self):
127         """
128         Produces unicode representation of the name. 
129         """
130         b=Membio()
131         libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
132         return unicode(b)
133     def __len__(self):
134         """
135         return number of components in the name
136         """
137         return libcrypto.X509_NAME_entry_count(self.ptr)
138     def __cmp__(self,other):
139         """
140         Compares X509 names
141         """
142         return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
143     def __eq__(self,other):
144         return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
145
146     def __getitem__(self,key):
147         if isinstance(key,Oid):
148             # Return first matching field
149             idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
150             if idx<0:
151                 raise KeyError("Key not found "+str(Oid))
152             entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
153             s=libcrypto.X509_NAME_ENTRY_get_data(entry)
154             b=Membio()
155             libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
156             return unicode(b)
157         elif isinstance(key,(int,long)):
158             # Return OID, string tuple
159             entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
160             if entry is None:
161                 raise IndexError("name entry index out of range")
162             oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
163             s=libcrypto.X509_NAME_ENTRY_get_data(entry)
164             b=Membio()
165             libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
166             return (oid,unicode(b))
167         else:
168             raise TypeError("X509 NAME can be indexed by Oids or integers only")
169
170     def __setitem__(self,key,val):
171         if not self.writable:
172             raise ValueError("Attempt to modify constant X509 object")
173         else:
174             raise NotImplementedError
175     def __delitem__(self,key):
176         if not self.writable:
177             raise ValueError("Attempt to modify constant X509 object")
178         else:
179             raise NotImplementedError
180     def __hash__(self):
181         return libcrypto.X509_NAME_hash(self.ptr)
182
183 class _x509_ext(Structure):
184     """ Represens C structure X509_EXTENSION """
185     _fields_=[("object",c_void_p),
186             ("critical",c_int),
187             ("value",c_void_p)]
188
189 class X509_EXT(object):
190     """ Python object which represents a certificate extension """
191     def __init__(self,ptr,copy=False):
192         """ Initializes from the pointer to X509_EXTENSION.
193             If copy is True, creates a copy, otherwise just
194             stores pointer.
195         """
196         if copy:
197             self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
198         else:
199             self.ptr=cast(ptr,POINTER(_x509_ext))
200     def __del__(self):
201         libcrypto.X509_EXTENSION_free(self.ptr)
202     def __str__(self):
203         b=Membio()
204         libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
205         return str(b)
206     def __unicode__(self):
207         b=Membio()
208         libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
209         return unicode(b)
210     @property
211     def oid(self):
212         return Oid.fromobj(self.ptr[0].object)
213     @property
214     def critical(self): 
215         return self.ptr[0].critical >0
216 class _X509extlist(object): 
217     """
218     Represents list of certificate extensions
219     """
220     def __init__(self,cert):
221         self.cert=cert
222     def __len__(self):
223         return libcrypto.X509_get_ext_count(self.cert.cert)
224     def __getitem__(self,item):
225         p=libcrypto.X509_get_ext(self.cert.cert,item)
226         if p is None:
227             raise IndexError
228         return X509_EXT(p,True)
229     def find(self,oid):
230         """
231         Return list of extensions with given Oid
232         """
233         if not isinstance(oid,Oid):
234             raise TypeError("Need crytypescrypto.oid.Oid as argument")
235         found=[]
236         l=-1
237         end=len(self)
238         while True:
239             l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
240             if l>=end or l<0:
241                  break
242             found.append(self[l])
243         return found
244     def find_critical(self,crit=True):
245         """
246         Return list of critical extensions (or list of non-cricital, if
247         optional second argument is False
248         """
249         if crit:
250             flag=1
251         else:
252             flag=0
253         found=[]
254         end=len(self)
255         l=-1
256         while True:
257             l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
258             if l>=end or l<0:
259                  break
260             found.append(self[l])
261         return found            
262
263 class X509(object):
264     """
265     Represents X.509 certificate. 
266     """
267     def __init__(self,data=None,ptr=None,format="PEM"):
268         """
269         Initializes certificate
270         @param data - serialized certificate in PEM or DER format.
271         @param ptr - pointer to X509, returned by some openssl function. 
272             mutually exclusive with data
273         @param format - specifies data format. "PEM" or "DER", default PEM
274         """
275         if ptr is not None:
276             if data is not None: 
277                 raise TypeError("Cannot use data and ptr simultaneously")
278             self.cert = ptr
279         elif data is None:
280             raise TypeError("data argument is required")
281         else:
282             b=Membio(data)
283             if format == "PEM":
284                 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
285             else:
286                 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
287             if self.cert is None:
288                 raise X509Error("error reading certificate")
289         self.extensions=_X509extlist(self)      
290     def __del__(self):
291         """
292         Frees certificate object
293         """
294         libcrypto.X509_free(self.cert)
295     def __str__(self):
296         """ Returns der string of the certificate """
297         b=Membio()
298         if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
299             raise X509Error("error serializing certificate")
300         return str(b)
301     def __repr__(self):
302         """ Returns valid call to the constructor """
303         return "X509(data="+repr(str(self))+",format='DER')"
304     @property
305     def pubkey(self):
306         """EVP PKEy object of certificate public key"""
307         return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
308     def pem(self):
309         """ Returns PEM represntation of the certificate """
310         b=Membio()
311         if libcrypto.PEM_write_bio_X509(b.bio,self.cert)==0:
312             raise X509Error("error serializing certificate")
313         return str(b)
314     def verify(self,store=None,chain=[],key=None):  
315         """ 
316         Verify self. Supports verification on both X509 store object 
317         or just public issuer key
318         @param store X509Store object.
319         @param chain - list of X509 objects to add into verification
320             context.These objects are untrusted, but can be used to
321             build certificate chain up to trusted object in the store
322         @param key - PKey object with open key to validate signature
323         
324         parameters store and key are mutually exclusive. If neither 
325         is specified, attempts to verify self as self-signed certificate
326         """
327         if store is not None and key is not None:
328             raise X509Error("key and store cannot be specified simultaneously")
329         if store is not None:
330             ctx=libcrypto.X509_STORE_CTX_new()
331             if ctx is None:
332                 raise X509Error("Error allocating X509_STORE_CTX")
333             if chain is not None and len(chain)>0:
334                 ch=StackOfX509(chain)
335             else:
336                 ch=None
337             if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
338                 raise X509Error("Error allocating X509_STORE_CTX")
339             res= libcrypto.X509_verify_cert(ctx)
340             libcrypto.X509_STORE_CTX_free(ctx)
341             return res>0
342         else:
343             if key is None:
344                 if self.issuer != self.subject:
345                     # Not a self-signed certificate
346                     return False
347                 key = self.pubkey
348             res = libcrypto.X509_verify(self.cert,key.key)
349             if res < 0:
350                 raise X509Error("X509_verify failed")
351             return res>0
352             
353     @property
354     def subject(self):
355         """ X509Name for certificate subject name """
356         return X509Name(libcrypto.X509_get_subject_name(self.cert))
357     @property
358     def issuer(self):
359         """ X509Name for certificate issuer name """
360         return X509Name(libcrypto.X509_get_issuer_name(self.cert))
361     @property
362     def serial(self):
363         """ Serial number of certificate as integer """
364         asnint=libcrypto.X509_get_serialNumber(self.cert)
365         b=Membio()
366         libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
367         return int(str(b),16)
368     @property 
369     def version(self):
370         """ certificate version as integer. Really certificate stores 0 for
371         version 1 and 2 for version 3, but we return 1 and 3 """
372         asn1int=cast(self.cert,_px509)[0].cert_info[0].version
373         return libcrypto.ASN1_INTEGER_get(asn1int)+1
374     @property
375     def startDate(self):
376         """ Certificate validity period start date """
377         # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
378         global utc
379         asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
380         b=Membio()
381         libcrypto.ASN1_TIME_print(b.bio,asn1date)
382         return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
383     @property
384     def endDate(self):
385         """ Certificate validity period end date """
386         # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
387         global utc
388         asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
389         b=Membio()
390         libcrypto.ASN1_TIME_print(b.bio,asn1date)
391         return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
392     def check_ca(self):
393         """ Returns True if certificate is CA certificate """
394         return libcrypto.X509_check_ca(self.cert)>0
395 class X509Store(object):
396     """
397         Represents trusted certificate store. Can be used to lookup CA 
398         certificates to verify
399
400         @param file - file with several certificates and crls 
401                 to load into store
402         @param dir - hashed directory with certificates and crls
403         @param default - if true, default verify location (directory) 
404             is installed
405
406     """
407     def __init__(self,file=None,dir=None,default=False):
408         """
409         Creates X509 store and installs lookup method. Optionally initializes 
410         by certificates from given file or directory.
411         """
412         #
413         # Todo - set verification flags
414         # 
415         self.store=libcrypto.X509_STORE_new()
416         if self.store is None:
417             raise X509Error("allocating store")
418         lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
419         if lookup is None:
420             raise X509Error("error installing file lookup method")
421         if (file is not None):
422             if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
423                 raise X509Error("error loading trusted certs from file "+file)
424         lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
425         if lookup is None:
426             raise X509Error("error installing hashed lookup method")
427         if dir is not None:
428             if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
429                 raise X509Error("error adding hashed  trusted certs dir "+dir)
430         if default:
431             if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
432                 raise X509Error("error adding default trusted certs dir ")
433     def add_cert(self,cert):
434         """
435         Explicitely adds certificate to set of trusted in the store
436         @param cert - X509 object to add
437         """
438         if not isinstance(cert,X509):
439             raise TypeError("cert should be X509")
440         libcrypto.X509_STORE_add_cert(self.store,cert.cert)
441     def add_callback(self,callback):
442         """
443         Installs callback function, which would receive detailed information
444         about verified ceritificates
445         """
446         raise NotImplementedError
447     def setflags(self,flags):
448         """
449         Set certificate verification flags.
450         @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
451         """
452         libcrypto.X509_STORE_set_flags(self.store,flags)    
453     def setpurpose(self,purpose):
454         """
455         Sets certificate purpose which verified certificate should match
456         @param purpose - number from 1 to 9 or standard strind defined in Openssl
457         possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
458         """
459         if isinstance(purpose,str):
460             purp_no=X509_PURPOSE_get_by_sname(purpose)
461             if purp_no <=0:
462                 raise X509Error("Invalid certificate purpose '"+purpose+"'")
463         elif isinstance(purpose,int):
464             purp_no = purpose
465         if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
466             raise X509Error("cannot set purpose")
467     def setdepth(self,depth):
468         """
469         Sets the verification depth i.e. max length of certificate chain
470         which is acceptable
471         """
472         libcrypto.X509_STORE_set_depth(self.store,depth)
473     def settime(self, time):
474         """
475         Set point in time used to check validity of certificates for
476         Time can be either python datetime object or number of seconds
477         sinse epoch
478         """
479         if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
480             d=int(time.strftime("%s"))
481         elif isinstance(time,int):
482             pass
483         else:
484             raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
485         raise NotImplementedError
486 class StackOfX509(object):
487     """
488     Implements OpenSSL STACK_OF(X509) object.
489     It looks much like python container types
490     """
491     def __init__(self,certs=None,ptr=None,disposable=True):
492         """
493         Create stack
494         @param certs - list of X509 objects. If specified, read-write
495             stack is created and populated by these certificates
496         @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
497             some functions
498         @param disposable - if True, stack created from object, returned
499                 by function is copy, and can be modified and need to be
500                 freeid. If false, it is just pointer into another
501                 structure i.e. CMS_ContentInfo
502         """
503         if  ptr is None:
504             self.need_free = True
505             self.ptr=libcrypto.sk_new_null()
506             if certs is not None:
507                 for crt in certs:
508                     self.append(crt)
509         elif certs is not None:
510                 raise ValueError("cannot handle certs an ptr simultaneously")
511         else:
512             self.need_free = disposable
513             self.ptr=ptr
514     def __len__(self):
515         return libcrypto.sk_num(self.ptr)
516     def __getitem__(self,index):
517         if index <0 or index>=len(self):
518             raise IndexError
519         p=libcrypto.sk_value(self.ptr,index)
520         return X509(ptr=libcrypto.X509_dup(p))
521     def __setitem__(self,index,value):
522         if not self.need_free:
523             raise ValueError("Stack is read-only")
524         if index <0 or index>=len(self):
525             raise IndexError
526         if not isinstance(value,X509):
527             raise TypeError('StackOfX508 can contain only X509 objects')
528         p=libcrypto.sk_value(self.ptr,index)
529         libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
530         libcrypto.X509_free(p)
531     def __delitem__(self,index):    
532         if not self.need_free:
533             raise ValueError("Stack is read-only")
534         if index <0 or index>=len(self):
535             raise IndexError
536         p=libcrypto.sk_delete(self.ptr,index)
537         libcrypto.X509_free(p)
538     def __del__(self):
539         if self.need_free:
540             libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
541     def append(self,value):
542         if not self.need_free:
543             raise ValueError("Stack is read-only")
544         if not isinstance(value,X509):
545             raise TypeError('StackOfX508 can contain only X509 objects')
546         libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
547 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
548 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
549 libcrypto.PEM_read_bio_X509.restype=c_void_p
550 libcrypto.PEM_read_bio_X509.argtypes=(c_void_p,POINTER(c_void_p),c_void_p,c_void_p)
551 libcrypto.PEM_write_bio_X509.restype=c_int
552 libcrypto.PEM_write_bio_X509.argtypes=(c_void_p,c_void_p)
553 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
554 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
555 libcrypto.ASN1_INTEGER_get.restype=c_long
556 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
557 libcrypto.X509_get_serialNumber.restype=c_void_p
558 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
559 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
560 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
561 libcrypto.X509_NAME_get_entry.restype=c_void_p
562 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
563 libcrypto.X509_STORE_new.restype=c_void_p
564 libcrypto.X509_STORE_add_lookup.restype=c_void_p
565 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
566 libcrypto.X509_LOOKUP_file.restype=c_void_p
567 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
568 libcrypto.X509_LOOKUP_ctrl.restype=c_int
569 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
570 libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
571 libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
572 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
573 libcrypto.X509_get_ext.restype=c_void_p
574 libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
575 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
576 libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
577 libcrypto.sk_set.restype=c_void_p
578 libcrypto.sk_value.argtypes=(c_void_p,c_int)
579 libcrypto.sk_value.restype=c_void_p
580 libcrypto.X509_dup.restype=c_void_p
581 libcrypto.sk_new_null.restype=c_void_p
582 libcrypto.X509_dup.argtypes=(c_void_p,)
583 libcrypto.X509_NAME_hash.restype=c_long
584 libcrypto.X509_NAME_hash.argtypes=(c_void_p,)