]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blob - ctypescrypto/x509.py
Covered StackOfX509 by tests, fixed some typos in pkey docstrings
[oss/ctypescrypto.git] / ctypescrypto / x509.py
1 """
2 Implements interface to openssl X509 and X509Store structures, 
3 I.e allows to load, analyze and verify certificates.
4
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
7 """
8
9
10
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
18 try:
19         from pytz import utc
20 except ImportError:
21         from datetime import timedelta,tzinfo
22         ZERO=timedelta(0)
23         class UTC(tzinfo):
24                 """tzinfo object for UTC. 
25                         If no pytz is available, we would use it.
26                 """
27
28                 def utcoffset(self, dt):
29                         return ZERO
30
31                 def tzname(self, dt):
32                         return "UTC"
33
34                 def dst(self, dt):
35                         return ZERO
36
37         utc=UTC()
38
39 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
40
41 class _validity(Structure):
42         """ ctypes representation of X509_VAL structure 
43                 needed to access certificate validity period, because openssl
44                 doesn't provide fuctions for it - only macros
45         """
46         _fields_ =      [('notBefore',c_void_p),('notAfter',c_void_p)]
47
48 class _cinf(Structure):
49         """ ctypes representtion of X509_CINF structure 
50             neede to access certificate data, which are accessable only
51                 via macros
52         """
53         _fields_ = [('version',c_void_p),
54                 ('serialNumber',c_void_p),
55                 ('sign_alg',c_void_p),
56                 ('issuer',c_void_p),
57                 ('validity',POINTER(_validity)),
58                 ('subject',c_void_p),
59                 ('pubkey',c_void_p),
60                 ('issuerUID',c_void_p),
61                 ('subjectUID',c_void_p),
62                 ('extensions',c_void_p),
63                 ]
64
65 class _x509(Structure):
66         """
67         ctypes represntation of X509 structure needed
68         to access certificate data which are accesable only via
69         macros, not functions
70         """
71         _fields_ = [('cert_info',POINTER(_cinf)),
72                                 ('sig_alg',c_void_p),
73                                 ('signature',c_void_p),
74                                 # There are a lot of parsed extension fields there
75                                 ]
76 _px509 = POINTER(_x509)
77
78 class X509Error(LibCryptoError):
79         """
80         Exception, generated when some openssl function fail
81         during X509 operation
82         """
83         pass
84
85
86 class X509Name(object):
87         """
88         Class which represents X.509 distinguished name - typically 
89         a certificate subject name or an issuer name.
90
91         Now used only to represent information, extracted from the
92         certificate. Potentially can be also used to build DN when creating
93         certificate signing request
94         """
95         # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
96         PRINT_FLAG=0x10010
97         ESC_MSB=4
98         def __init__(self,ptr=None,copy=False):
99                 """
100                 Creates a X509Name object
101                 @param ptr - pointer to X509_NAME C structure (as returned by some  OpenSSL functions
102                 @param copy - indicates that this structure have to be freed upon object destruction
103                 """
104                 if ptr is not None:
105                         self.ptr=ptr
106                         self.need_free=copy
107                         self.writable=False
108                 else:
109                         self.ptr=libcrypto.X509_NAME_new()
110                         self.need_free=True
111                         self.writable=True
112         def __del__(self):
113                 """
114                 Frees if neccessary
115                 """
116                 if self.need_free:
117                         libcrypto.X509_NAME_free(self.ptr)
118         def __str__(self):
119                 """
120                 Produces an ascii representation of the name, escaping all symbols > 0x80
121                 Probably it is not what you want, unless your native language is English
122                 """
123                 b=Membio()
124                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
125                 return str(b)
126         def __unicode__(self):
127                 """
128                 Produces unicode representation of the name. 
129                 """
130                 b=Membio()
131                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
132                 return unicode(b)
133         def __len__(self):
134                 """
135                 return number of components in the name
136                 """
137                 return libcrypto.X509_NAME_entry_count(self.ptr)
138         def __cmp__(self,other):
139                 """
140                 Compares X509 names
141                 """
142                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
143         def __eq__(self,other):
144                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
145
146         def __getitem__(self,key):
147                 if isinstance(key,Oid):
148                         # Return first matching field
149                         idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
150                         if idx<0:
151                                 raise KeyError("Key not found "+str(Oid))
152                         entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
153                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
154                         b=Membio()
155                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
156                         return unicode(b)
157                 elif isinstance(key,(int,long)):
158                         # Return OID, string tuple
159                         entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
160                         if entry is None:
161                                 raise IndexError("name entry index out of range")
162                         oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
163                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
164                         b=Membio()
165                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
166                         return (oid,unicode(b))
167                 else:
168                         raise TypeError("X509 NAME can be indexed by Oids or integers only")
169
170         def __setitem__(self,key,val):
171                 if not self.writable:
172                         raise ValueError("Attempt to modify constant X509 object")
173                 else:
174                         raise NotImplementedError
175         def __delitem__(self,key):
176                 if not self.writable:
177                         raise ValueError("Attempt to modify constant X509 object")
178                 else:
179                         raise NotImplementedError
180
181 class _x509_ext(Structure):
182         """ Represens C structure X509_EXTENSION """
183         _fields_=[("object",c_void_p),
184                         ("critical",c_int),
185                         ("value",c_void_p)]
186
187 class X509_EXT(object):
188         """ Python object which represents a certificate extension """
189         def __init__(self,ptr,copy=False):
190                 """ Initializes from the pointer to X509_EXTENSION.
191                         If copy is True, creates a copy, otherwise just
192                         stores pointer.
193                 """
194                 if copy:
195                         self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
196                 else:
197                         self.ptr=cast(ptr,POINTER(_x509_ext))
198         def __del__(self):
199                 libcrypto.X509_EXTENSION_free(self.ptr)
200         def __str__(self):
201                 b=Membio()
202                 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
203                 return str(b)
204         def __unicode__(self):
205                 b=Membio()
206                 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
207                 return unicode(b)
208         @property
209         def oid(self):
210                 return Oid.fromobj(self.ptr[0].object)
211         @property
212         def critical(self):     
213                 return self.ptr[0].critical >0
214 class _X509extlist(object):     
215         """
216         Represents list of certificate extensions
217         """
218         def __init__(self,cert):
219                 self.cert=cert
220         def __len__(self):
221                 return libcrypto.X509_get_ext_count(self.cert.cert)
222         def __getitem__(self,item):
223                 p=libcrypto.X509_get_ext(self.cert.cert,item)
224                 if p is None:
225                         raise IndexError
226                 return X509_EXT(p,True)
227         def find(self,oid):
228                 """
229                 Return list of extensions with given Oid
230                 """
231                 if not isinstance(oid,Oid):
232                         raise TypeError("Need crytypescrypto.oid.Oid as argument")
233                 found=[]
234                 l=-1
235                 end=len(self)
236                 while True:
237                         l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
238                         if l>=end or l<0:
239                                  break
240                         found.append(self[l])
241                 return found
242         def find_critical(self,crit=True):
243                 """
244                 Return list of critical extensions (or list of non-cricital, if
245                 optional second argument is False
246                 """
247                 if crit:
248                         flag=1
249                 else:
250                         flag=0
251                 found=[]
252                 end=len(self)
253                 l=-1
254                 while True:
255                         l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
256                         if l>=end or l<0:
257                                  break
258                         found.append(self[l])
259                 return found                    
260
261 class X509(object):
262         """
263         Represents X.509 certificate. 
264         """
265         def __init__(self,data=None,ptr=None,format="PEM"):
266                 """
267                 Initializes certificate
268                 @param data - serialized certificate in PEM or DER format.
269                 @param ptr - pointer to X509, returned by some openssl function. 
270                         mutually exclusive with data
271                 @param format - specifies data format. "PEM" or "DER", default PEM
272                 """
273                 if ptr is not None:
274                         if data is not None: 
275                                 raise TypeError("Cannot use data and ptr simultaneously")
276                         self.cert = ptr
277                 elif data is None:
278                         raise TypeError("data argument is required")
279                 else:
280                         b=Membio(data)
281                         if format == "PEM":
282                                 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
283                         else:
284                                 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
285                         if self.cert is None:
286                                 raise X509Error("error reading certificate")
287                 self.extensions=_X509extlist(self)              
288         def __del__(self):
289                 """
290                 Frees certificate object
291                 """
292                 libcrypto.X509_free(self.cert)
293         def __str__(self):
294                 """ Returns der string of the certificate """
295                 b=Membio()
296                 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
297                         raise X509Error("error serializing certificate")
298                 return str(b)
299         def __repr__(self):
300                 """ Returns valid call to the constructor """
301                 return "X509(data="+repr(str(self))+",format='DER')"
302         @property
303         def pubkey(self):
304                 """EVP PKEy object of certificate public key"""
305                 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
306         def verify(self,store=None,chain=[],key=None):  
307                 """ 
308                 Verify self. Supports verification on both X509 store object 
309                 or just public issuer key
310                 @param store X509Store object.
311                 @param chain - list of X509 objects to add into verification
312                         context.These objects are untrusted, but can be used to
313                         build certificate chain up to trusted object in the store
314                 @param key - PKey object with open key to validate signature
315                 
316                 parameters store and key are mutually exclusive. If neither 
317                 is specified, attempts to verify self as self-signed certificate
318                 """
319                 if store is not None and key is not None:
320                         raise X509Error("key and store cannot be specified simultaneously")
321                 if store is not None:
322                         ctx=libcrypto.X509_STORE_CTX_new()
323                         if ctx is None:
324                                 raise X509Error("Error allocating X509_STORE_CTX")
325                         if chain is not None and len(chain)>0:
326                                 ch=StackOfX509(chain)
327                         else:
328                                 ch=None
329                         if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
330                                 raise X509Error("Error allocating X509_STORE_CTX")
331                         res= libcrypto.X509_verify_cert(ctx)
332                         libcrypto.X509_STORE_CTX_free(ctx)
333                         return res>0
334                 else:
335                         if key is None:
336                                 if self.issuer != self.subject:
337                                         # Not a self-signed certificate
338                                         return False
339                                 key = self.pubkey
340                         res = libcrypto.X509_verify(self.cert,key.key)
341                         if res < 0:
342                                 raise X509Error("X509_verify failed")
343                         return res>0
344                         
345         @property
346         def subject(self):
347                 """ X509Name for certificate subject name """
348                 return X509Name(libcrypto.X509_get_subject_name(self.cert))
349         @property
350         def issuer(self):
351                 """ X509Name for certificate issuer name """
352                 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
353         @property
354         def serial(self):
355                 """ Serial number of certificate as integer """
356                 asnint=libcrypto.X509_get_serialNumber(self.cert)
357                 b=Membio()
358                 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
359                 return int(str(b),16)
360         @property 
361         def version(self):
362                 """ certificate version as integer. Really certificate stores 0 for
363                 version 1 and 2 for version 3, but we return 1 and 3 """
364                 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
365                 return libcrypto.ASN1_INTEGER_get(asn1int)+1
366         @property
367         def startDate(self):
368                 """ Certificate validity period start date """
369                 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
370                 global utc
371                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
372                 b=Membio()
373                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
374                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
375         @property
376         def endDate(self):
377                 """ Certificate validity period end date """
378                 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
379                 global utc
380                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
381                 b=Membio()
382                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
383                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
384         def check_ca(self):
385                 """ Returns True if certificate is CA certificate """
386                 return libcrypto.X509_check_ca(self.cert)>0
387 class X509Store(object):
388         """
389                 Represents trusted certificate store. Can be used to lookup CA 
390                 certificates to verify
391
392                 @param file - file with several certificates and crls 
393                                 to load into store
394                 @param dir - hashed directory with certificates and crls
395                 @param default - if true, default verify location (directory) 
396                         is installed
397
398         """
399         def __init__(self,file=None,dir=None,default=False):
400                 """
401                 Creates X509 store and installs lookup method. Optionally initializes 
402                 by certificates from given file or directory.
403                 """
404                 #
405                 # Todo - set verification flags
406                 # 
407                 self.store=libcrypto.X509_STORE_new()
408                 if self.store is None:
409                         raise X509Error("allocating store")
410                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
411                 if lookup is None:
412                         raise X509Error("error installing file lookup method")
413                 if (file is not None):
414                         if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
415                                 raise X509Error("error loading trusted certs from file "+file)
416                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
417                 if lookup is None:
418                         raise X509Error("error installing hashed lookup method")
419                 if dir is not None:
420                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
421                                 raise X509Error("error adding hashed  trusted certs dir "+dir)
422                 if default:
423                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
424                                 raise X509Error("error adding default trusted certs dir ")
425         def add_cert(self,cert):
426                 """
427                 Explicitely adds certificate to set of trusted in the store
428                 @param cert - X509 object to add
429                 """
430                 if not isinstance(cert,X509):
431                         raise TypeError("cert should be X509")
432                 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
433         def add_callback(self,callback):
434                 """
435                 Installs callback function, which would receive detailed information
436                 about verified ceritificates
437                 """
438                 raise NotImplementedError
439         def setflags(self,flags):
440                 """
441                 Set certificate verification flags.
442                 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
443                 """
444                 libcrypto.X509_STORE_set_flags(self.store,flags)        
445         def setpurpose(self,purpose):
446                 """
447                 Sets certificate purpose which verified certificate should match
448                 @param purpose - number from 1 to 9 or standard strind defined in Openssl
449                 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
450                 """
451                 if isinstance(purpose,str):
452                         purp_no=X509_PURPOSE_get_by_sname(purpose)
453                         if purp_no <=0:
454                                 raise X509Error("Invalid certificate purpose '"+purpose+"'")
455                 elif isinstance(purpose,int):
456                         purp_no = purpose
457                 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
458                         raise X509Error("cannot set purpose")
459         def setdepth(self,depth):
460                 """
461                 Sets the verification depth i.e. max length of certificate chain
462                 which is acceptable
463                 """
464                 libcrypto.X509_STORE_set_depth(self.store,depth)
465         def settime(self, time):
466                 """
467                 Set point in time used to check validity of certificates for
468                 Time can be either python datetime object or number of seconds
469                 sinse epoch
470                 """
471                 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
472                         d=int(time.strftime("%s"))
473                 elif isinstance(time,int):
474                         pass
475                 else:
476                         raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
477                 raise NotImplementedError
478 class StackOfX509(object):
479         """
480         Implements OpenSSL STACK_OF(X509) object.
481         It looks much like python container types
482         """
483         def __init__(self,certs=None,ptr=None,disposable=True):
484                 """
485                 Create stack
486                 @param certs - list of X509 objects. If specified, read-write
487                         stack is created and populated by these certificates
488                 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
489                         some functions
490                 @param disposable - if True, stack created from object, returned
491                                 by function is copy, and can be modified and need to be
492                                 freeid. If false, it is just pointer into another
493                                 structure i.e. CMS_ContentInfo
494                 """
495                 if  ptr is None:
496                         self.need_free = True
497                         self.ptr=libcrypto.sk_new_null()
498                         if certs is not None:
499                                 for crt in certs:
500                                         self.append(crt)
501                 elif certs is not None:
502                                 raise ValueError("cannot handle certs an ptr simultaneously")
503                 else:
504                         self.need_free = disposable
505                         self.ptr=ptr
506         def __len__(self):
507                 return libcrypto.sk_num(self.ptr)
508         def __getitem__(self,index):
509                 if index <0 or index>=len(self):
510                         raise IndexError
511                 p=libcrypto.sk_value(self.ptr,index)
512                 return X509(ptr=libcrypto.X509_dup(p))
513         def __setitem__(self,index,value):
514                 if not self.need_free:
515                         raise ValueError("Stack is read-only")
516                 if index <0 or index>=len(self):
517                         raise IndexError
518                 if not isinstance(value,X509):
519                         raise TypeError('StackOfX508 can contain only X509 objects')
520                 p=libcrypto.sk_value(self.ptr,index)
521                 libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
522                 libcrypto.X509_free(p)
523         def __delitem__(self,index):    
524                 if not self.need_free:
525                         raise ValueError("Stack is read-only")
526                 if index <0 or index>=len(self):
527                         raise IndexError
528                 p=libcrypto.sk_delete(self.ptr,index)
529                 libcrypto.X509_free(p)
530         def __del__(self):
531                 if self.need_free:
532                         libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
533         def append(self,value):
534                 if not self.need_free:
535                         raise ValueError("Stack is read-only")
536                 if not isinstance(value,X509):
537                         raise TypeError('StackOfX508 can contain only X509 objects')
538                 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
539 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
540 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
541 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
542 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
543 libcrypto.ASN1_INTEGER_get.restype=c_long
544 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
545 libcrypto.X509_get_serialNumber.restype=c_void_p
546 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
547 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
548 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
549 libcrypto.X509_NAME_get_entry.restype=c_void_p
550 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
551 libcrypto.X509_STORE_new.restype=c_void_p
552 libcrypto.X509_STORE_add_lookup.restype=c_void_p
553 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
554 libcrypto.X509_LOOKUP_file.restype=c_void_p
555 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
556 libcrypto.X509_LOOKUP_ctrl.restype=c_int
557 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
558 libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
559 libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
560 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
561 libcrypto.X509_get_ext.restype=c_void_p
562 libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
563 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
564 libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
565 libcrypto.sk_set.restype=c_void_p
566 libcrypto.sk_value.argtypes=(c_void_p,c_int)
567 libcrypto.sk_value.restype=c_void_p
568 libcrypto.X509_dup.restype=c_void_p
569 libcrypto.sk_new_null.restype=c_void_p
570 libcrypto.X509_dup.argtypes=(c_void_p,)