]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blob - ctypescrypto/x509.py
Added X509Name.__hash__
[oss/ctypescrypto.git] / ctypescrypto / x509.py
1 """
2 Implements interface to openssl X509 and X509Store structures, 
3 I.e allows to load, analyze and verify certificates.
4
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
7 """
8
9
10
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
18 try:
19         from pytz import utc
20 except ImportError:
21         from datetime import timedelta,tzinfo
22         ZERO=timedelta(0)
23         class UTC(tzinfo):
24                 """tzinfo object for UTC. 
25                         If no pytz is available, we would use it.
26                 """
27
28                 def utcoffset(self, dt):
29                         return ZERO
30
31                 def tzname(self, dt):
32                         return "UTC"
33
34                 def dst(self, dt):
35                         return ZERO
36
37         utc=UTC()
38
39 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
40
41 class _validity(Structure):
42         """ ctypes representation of X509_VAL structure 
43                 needed to access certificate validity period, because openssl
44                 doesn't provide fuctions for it - only macros
45         """
46         _fields_ =      [('notBefore',c_void_p),('notAfter',c_void_p)]
47
48 class _cinf(Structure):
49         """ ctypes representtion of X509_CINF structure 
50             neede to access certificate data, which are accessable only
51                 via macros
52         """
53         _fields_ = [('version',c_void_p),
54                 ('serialNumber',c_void_p),
55                 ('sign_alg',c_void_p),
56                 ('issuer',c_void_p),
57                 ('validity',POINTER(_validity)),
58                 ('subject',c_void_p),
59                 ('pubkey',c_void_p),
60                 ('issuerUID',c_void_p),
61                 ('subjectUID',c_void_p),
62                 ('extensions',c_void_p),
63                 ]
64
65 class _x509(Structure):
66         """
67         ctypes represntation of X509 structure needed
68         to access certificate data which are accesable only via
69         macros, not functions
70         """
71         _fields_ = [('cert_info',POINTER(_cinf)),
72                                 ('sig_alg',c_void_p),
73                                 ('signature',c_void_p),
74                                 # There are a lot of parsed extension fields there
75                                 ]
76 _px509 = POINTER(_x509)
77
78 class X509Error(LibCryptoError):
79         """
80         Exception, generated when some openssl function fail
81         during X509 operation
82         """
83         pass
84
85
86 class X509Name(object):
87         """
88         Class which represents X.509 distinguished name - typically 
89         a certificate subject name or an issuer name.
90
91         Now used only to represent information, extracted from the
92         certificate. Potentially can be also used to build DN when creating
93         certificate signing request
94         """
95         # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
96         PRINT_FLAG=0x10010
97         ESC_MSB=4
98         def __init__(self,ptr=None,copy=False):
99                 """
100                 Creates a X509Name object
101                 @param ptr - pointer to X509_NAME C structure (as returned by some  OpenSSL functions
102                 @param copy - indicates that this structure have to be freed upon object destruction
103                 """
104                 if ptr is not None:
105                         self.ptr=ptr
106                         self.need_free=copy
107                         self.writable=False
108                 else:
109                         self.ptr=libcrypto.X509_NAME_new()
110                         self.need_free=True
111                         self.writable=True
112         def __del__(self):
113                 """
114                 Frees if neccessary
115                 """
116                 if self.need_free:
117                         libcrypto.X509_NAME_free(self.ptr)
118         def __str__(self):
119                 """
120                 Produces an ascii representation of the name, escaping all symbols > 0x80
121                 Probably it is not what you want, unless your native language is English
122                 """
123                 b=Membio()
124                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
125                 return str(b)
126         def __unicode__(self):
127                 """
128                 Produces unicode representation of the name. 
129                 """
130                 b=Membio()
131                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
132                 return unicode(b)
133         def __len__(self):
134                 """
135                 return number of components in the name
136                 """
137                 return libcrypto.X509_NAME_entry_count(self.ptr)
138         def __cmp__(self,other):
139                 """
140                 Compares X509 names
141                 """
142                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
143         def __eq__(self,other):
144                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
145
146         def __getitem__(self,key):
147                 if isinstance(key,Oid):
148                         # Return first matching field
149                         idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
150                         if idx<0:
151                                 raise KeyError("Key not found "+str(Oid))
152                         entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
153                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
154                         b=Membio()
155                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
156                         return unicode(b)
157                 elif isinstance(key,(int,long)):
158                         # Return OID, string tuple
159                         entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
160                         if entry is None:
161                                 raise IndexError("name entry index out of range")
162                         oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
163                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
164                         b=Membio()
165                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
166                         return (oid,unicode(b))
167                 else:
168                         raise TypeError("X509 NAME can be indexed by Oids or integers only")
169
170         def __setitem__(self,key,val):
171                 if not self.writable:
172                         raise ValueError("Attempt to modify constant X509 object")
173                 else:
174                         raise NotImplementedError
175         def __delitem__(self,key):
176                 if not self.writable:
177                         raise ValueError("Attempt to modify constant X509 object")
178                 else:
179                         raise NotImplementedError
180         def __hash__(self):
181                 return libcrypto.X509_NAME_hash(self.ptr)
182
183 class _x509_ext(Structure):
184         """ Represens C structure X509_EXTENSION """
185         _fields_=[("object",c_void_p),
186                         ("critical",c_int),
187                         ("value",c_void_p)]
188
189 class X509_EXT(object):
190         """ Python object which represents a certificate extension """
191         def __init__(self,ptr,copy=False):
192                 """ Initializes from the pointer to X509_EXTENSION.
193                         If copy is True, creates a copy, otherwise just
194                         stores pointer.
195                 """
196                 if copy:
197                         self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
198                 else:
199                         self.ptr=cast(ptr,POINTER(_x509_ext))
200         def __del__(self):
201                 libcrypto.X509_EXTENSION_free(self.ptr)
202         def __str__(self):
203                 b=Membio()
204                 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
205                 return str(b)
206         def __unicode__(self):
207                 b=Membio()
208                 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
209                 return unicode(b)
210         @property
211         def oid(self):
212                 return Oid.fromobj(self.ptr[0].object)
213         @property
214         def critical(self):     
215                 return self.ptr[0].critical >0
216 class _X509extlist(object):     
217         """
218         Represents list of certificate extensions
219         """
220         def __init__(self,cert):
221                 self.cert=cert
222         def __len__(self):
223                 return libcrypto.X509_get_ext_count(self.cert.cert)
224         def __getitem__(self,item):
225                 p=libcrypto.X509_get_ext(self.cert.cert,item)
226                 if p is None:
227                         raise IndexError
228                 return X509_EXT(p,True)
229         def find(self,oid):
230                 """
231                 Return list of extensions with given Oid
232                 """
233                 if not isinstance(oid,Oid):
234                         raise TypeError("Need crytypescrypto.oid.Oid as argument")
235                 found=[]
236                 l=-1
237                 end=len(self)
238                 while True:
239                         l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
240                         if l>=end or l<0:
241                                  break
242                         found.append(self[l])
243                 return found
244         def find_critical(self,crit=True):
245                 """
246                 Return list of critical extensions (or list of non-cricital, if
247                 optional second argument is False
248                 """
249                 if crit:
250                         flag=1
251                 else:
252                         flag=0
253                 found=[]
254                 end=len(self)
255                 l=-1
256                 while True:
257                         l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
258                         if l>=end or l<0:
259                                  break
260                         found.append(self[l])
261                 return found                    
262
263 class X509(object):
264         """
265         Represents X.509 certificate. 
266         """
267         def __init__(self,data=None,ptr=None,format="PEM"):
268                 """
269                 Initializes certificate
270                 @param data - serialized certificate in PEM or DER format.
271                 @param ptr - pointer to X509, returned by some openssl function. 
272                         mutually exclusive with data
273                 @param format - specifies data format. "PEM" or "DER", default PEM
274                 """
275                 if ptr is not None:
276                         if data is not None: 
277                                 raise TypeError("Cannot use data and ptr simultaneously")
278                         self.cert = ptr
279                 elif data is None:
280                         raise TypeError("data argument is required")
281                 else:
282                         b=Membio(data)
283                         if format == "PEM":
284                                 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
285                         else:
286                                 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
287                         if self.cert is None:
288                                 raise X509Error("error reading certificate")
289                 self.extensions=_X509extlist(self)              
290         def __del__(self):
291                 """
292                 Frees certificate object
293                 """
294                 libcrypto.X509_free(self.cert)
295         def __str__(self):
296                 """ Returns der string of the certificate """
297                 b=Membio()
298                 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
299                         raise X509Error("error serializing certificate")
300                 return str(b)
301         def __repr__(self):
302                 """ Returns valid call to the constructor """
303                 return "X509(data="+repr(str(self))+",format='DER')"
304         @property
305         def pubkey(self):
306                 """EVP PKEy object of certificate public key"""
307                 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
308         def verify(self,store=None,chain=[],key=None):  
309                 """ 
310                 Verify self. Supports verification on both X509 store object 
311                 or just public issuer key
312                 @param store X509Store object.
313                 @param chain - list of X509 objects to add into verification
314                         context.These objects are untrusted, but can be used to
315                         build certificate chain up to trusted object in the store
316                 @param key - PKey object with open key to validate signature
317                 
318                 parameters store and key are mutually exclusive. If neither 
319                 is specified, attempts to verify self as self-signed certificate
320                 """
321                 if store is not None and key is not None:
322                         raise X509Error("key and store cannot be specified simultaneously")
323                 if store is not None:
324                         ctx=libcrypto.X509_STORE_CTX_new()
325                         if ctx is None:
326                                 raise X509Error("Error allocating X509_STORE_CTX")
327                         if chain is not None and len(chain)>0:
328                                 ch=StackOfX509(chain)
329                         else:
330                                 ch=None
331                         if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
332                                 raise X509Error("Error allocating X509_STORE_CTX")
333                         res= libcrypto.X509_verify_cert(ctx)
334                         libcrypto.X509_STORE_CTX_free(ctx)
335                         return res>0
336                 else:
337                         if key is None:
338                                 if self.issuer != self.subject:
339                                         # Not a self-signed certificate
340                                         return False
341                                 key = self.pubkey
342                         res = libcrypto.X509_verify(self.cert,key.key)
343                         if res < 0:
344                                 raise X509Error("X509_verify failed")
345                         return res>0
346                         
347         @property
348         def subject(self):
349                 """ X509Name for certificate subject name """
350                 return X509Name(libcrypto.X509_get_subject_name(self.cert))
351         @property
352         def issuer(self):
353                 """ X509Name for certificate issuer name """
354                 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
355         @property
356         def serial(self):
357                 """ Serial number of certificate as integer """
358                 asnint=libcrypto.X509_get_serialNumber(self.cert)
359                 b=Membio()
360                 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
361                 return int(str(b),16)
362         @property 
363         def version(self):
364                 """ certificate version as integer. Really certificate stores 0 for
365                 version 1 and 2 for version 3, but we return 1 and 3 """
366                 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
367                 return libcrypto.ASN1_INTEGER_get(asn1int)+1
368         @property
369         def startDate(self):
370                 """ Certificate validity period start date """
371                 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
372                 global utc
373                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
374                 b=Membio()
375                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
376                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
377         @property
378         def endDate(self):
379                 """ Certificate validity period end date """
380                 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
381                 global utc
382                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
383                 b=Membio()
384                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
385                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
386         def check_ca(self):
387                 """ Returns True if certificate is CA certificate """
388                 return libcrypto.X509_check_ca(self.cert)>0
389 class X509Store(object):
390         """
391                 Represents trusted certificate store. Can be used to lookup CA 
392                 certificates to verify
393
394                 @param file - file with several certificates and crls 
395                                 to load into store
396                 @param dir - hashed directory with certificates and crls
397                 @param default - if true, default verify location (directory) 
398                         is installed
399
400         """
401         def __init__(self,file=None,dir=None,default=False):
402                 """
403                 Creates X509 store and installs lookup method. Optionally initializes 
404                 by certificates from given file or directory.
405                 """
406                 #
407                 # Todo - set verification flags
408                 # 
409                 self.store=libcrypto.X509_STORE_new()
410                 if self.store is None:
411                         raise X509Error("allocating store")
412                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
413                 if lookup is None:
414                         raise X509Error("error installing file lookup method")
415                 if (file is not None):
416                         if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
417                                 raise X509Error("error loading trusted certs from file "+file)
418                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
419                 if lookup is None:
420                         raise X509Error("error installing hashed lookup method")
421                 if dir is not None:
422                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
423                                 raise X509Error("error adding hashed  trusted certs dir "+dir)
424                 if default:
425                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
426                                 raise X509Error("error adding default trusted certs dir ")
427         def add_cert(self,cert):
428                 """
429                 Explicitely adds certificate to set of trusted in the store
430                 @param cert - X509 object to add
431                 """
432                 if not isinstance(cert,X509):
433                         raise TypeError("cert should be X509")
434                 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
435         def add_callback(self,callback):
436                 """
437                 Installs callback function, which would receive detailed information
438                 about verified ceritificates
439                 """
440                 raise NotImplementedError
441         def setflags(self,flags):
442                 """
443                 Set certificate verification flags.
444                 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
445                 """
446                 libcrypto.X509_STORE_set_flags(self.store,flags)        
447         def setpurpose(self,purpose):
448                 """
449                 Sets certificate purpose which verified certificate should match
450                 @param purpose - number from 1 to 9 or standard strind defined in Openssl
451                 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
452                 """
453                 if isinstance(purpose,str):
454                         purp_no=X509_PURPOSE_get_by_sname(purpose)
455                         if purp_no <=0:
456                                 raise X509Error("Invalid certificate purpose '"+purpose+"'")
457                 elif isinstance(purpose,int):
458                         purp_no = purpose
459                 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
460                         raise X509Error("cannot set purpose")
461         def setdepth(self,depth):
462                 """
463                 Sets the verification depth i.e. max length of certificate chain
464                 which is acceptable
465                 """
466                 libcrypto.X509_STORE_set_depth(self.store,depth)
467         def settime(self, time):
468                 """
469                 Set point in time used to check validity of certificates for
470                 Time can be either python datetime object or number of seconds
471                 sinse epoch
472                 """
473                 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
474                         d=int(time.strftime("%s"))
475                 elif isinstance(time,int):
476                         pass
477                 else:
478                         raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
479                 raise NotImplementedError
480 class StackOfX509(object):
481         """
482         Implements OpenSSL STACK_OF(X509) object.
483         It looks much like python container types
484         """
485         def __init__(self,certs=None,ptr=None,disposable=True):
486                 """
487                 Create stack
488                 @param certs - list of X509 objects. If specified, read-write
489                         stack is created and populated by these certificates
490                 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
491                         some functions
492                 @param disposable - if True, stack created from object, returned
493                                 by function is copy, and can be modified and need to be
494                                 freeid. If false, it is just pointer into another
495                                 structure i.e. CMS_ContentInfo
496                 """
497                 if  ptr is None:
498                         self.need_free = True
499                         self.ptr=libcrypto.sk_new_null()
500                         if certs is not None:
501                                 for crt in certs:
502                                         self.append(crt)
503                 elif certs is not None:
504                                 raise ValueError("cannot handle certs an ptr simultaneously")
505                 else:
506                         self.need_free = disposable
507                         self.ptr=ptr
508         def __len__(self):
509                 return libcrypto.sk_num(self.ptr)
510         def __getitem__(self,index):
511                 if index <0 or index>=len(self):
512                         raise IndexError
513                 p=libcrypto.sk_value(self.ptr,index)
514                 return X509(ptr=libcrypto.X509_dup(p))
515         def __setitem__(self,index,value):
516                 if not self.need_free:
517                         raise ValueError("Stack is read-only")
518                 if index <0 or index>=len(self):
519                         raise IndexError
520                 if not isinstance(value,X509):
521                         raise TypeError('StackOfX508 can contain only X509 objects')
522                 p=libcrypto.sk_value(self.ptr,index)
523                 libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
524                 libcrypto.X509_free(p)
525         def __delitem__(self,index):    
526                 if not self.need_free:
527                         raise ValueError("Stack is read-only")
528                 if index <0 or index>=len(self):
529                         raise IndexError
530                 p=libcrypto.sk_delete(self.ptr,index)
531                 libcrypto.X509_free(p)
532         def __del__(self):
533                 if self.need_free:
534                         libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
535         def append(self,value):
536                 if not self.need_free:
537                         raise ValueError("Stack is read-only")
538                 if not isinstance(value,X509):
539                         raise TypeError('StackOfX508 can contain only X509 objects')
540                 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
541 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
542 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
543 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
544 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
545 libcrypto.ASN1_INTEGER_get.restype=c_long
546 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
547 libcrypto.X509_get_serialNumber.restype=c_void_p
548 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
549 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
550 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
551 libcrypto.X509_NAME_get_entry.restype=c_void_p
552 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
553 libcrypto.X509_STORE_new.restype=c_void_p
554 libcrypto.X509_STORE_add_lookup.restype=c_void_p
555 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
556 libcrypto.X509_LOOKUP_file.restype=c_void_p
557 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
558 libcrypto.X509_LOOKUP_ctrl.restype=c_int
559 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
560 libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
561 libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
562 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
563 libcrypto.X509_get_ext.restype=c_void_p
564 libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
565 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
566 libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
567 libcrypto.sk_set.restype=c_void_p
568 libcrypto.sk_value.argtypes=(c_void_p,c_int)
569 libcrypto.sk_value.restype=c_void_p
570 libcrypto.X509_dup.restype=c_void_p
571 libcrypto.sk_new_null.restype=c_void_p
572 libcrypto.X509_dup.argtypes=(c_void_p,)
573 libcrypto.X509_NAME_hash.restype=c_long
574 libcrypto.X509_NAME_hash.argtypes=(c_void_p,)