]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blob - ctypescrypto/x509.py
bd81fdd01c207f04b024f5d75d3b673d97a9b628
[oss/ctypescrypto.git] / ctypescrypto / x509.py
1 """
2 Implements interface to openssl X509 and X509Store structures, 
3 I.e allows to load, analyze and verify certificates.
4
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
7 """
8
9
10
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
18 try:
19         from pytz import utc
20 except ImportError:
21         from datetime import timedelta,tzinfo
22         ZERO=timedelta(0)
23         class UTC(tzinfo):
24                 """tzinfo object for UTC. 
25                         If no pytz is available, we would use it.
26                 """
27
28                 def utcoffset(self, dt):
29                         return ZERO
30
31                 def tzname(self, dt):
32                         return "UTC"
33
34                 def dst(self, dt):
35                         return ZERO
36
37         utc=UTC()
38
39 __all__ = ['X509','X509Error','X509Name','X509Store','StackOfX509']
40
41 class _validity(Structure):
42         """ ctypes representation of X509_VAL structure 
43                 needed to access certificate validity period, because openssl
44                 doesn't provide fuctions for it - only macros
45         """
46         _fields_ =      [('notBefore',c_void_p),('notAfter',c_void_p)]
47
48 class _cinf(Structure):
49         """ ctypes representtion of X509_CINF structure 
50             neede to access certificate data, which are accessable only
51                 via macros
52         """
53         _fields_ = [('version',c_void_p),
54                 ('serialNumber',c_void_p),
55                 ('sign_alg',c_void_p),
56                 ('issuer',c_void_p),
57                 ('validity',POINTER(_validity)),
58                 ('subject',c_void_p),
59                 ('pubkey',c_void_p),
60                 ('issuerUID',c_void_p),
61                 ('subjectUID',c_void_p),
62                 ('extensions',c_void_p),
63                 ]
64
65 class _x509(Structure):
66         """
67         ctypes represntation of X509 structure needed
68         to access certificate data which are accesable only via
69         macros, not functions
70         """
71         _fields_ = [('cert_info',POINTER(_cinf)),
72                                 ('sig_alg',c_void_p),
73                                 ('signature',c_void_p),
74                                 # There are a lot of parsed extension fields there
75                                 ]
76 _px509 = POINTER(_x509)
77
78 class X509Error(LibCryptoError):
79         """
80         Exception, generated when some openssl function fail
81         during X509 operation
82         """
83         pass
84
85
86 class X509Name(object):
87         """
88         Class which represents X.509 distinguished name - typically 
89         a certificate subject name or an issuer name.
90
91         Now used only to represent information, extracted from the
92         certificate. Potentially can be also used to build DN when creating
93         certificate signing request
94         """
95         # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
96         PRINT_FLAG=0x10010
97         ESC_MSB=4
98         def __init__(self,ptr=None,copy=False):
99                 """
100                 Creates a X509Name object
101                 @param ptr - pointer to X509_NAME C structure (as returned by some  OpenSSL functions
102                 @param copy - indicates that this structure have to be freed upon object destruction
103                 """
104                 if ptr is not None:
105                         self.ptr=ptr
106                         self.need_free=copy
107                         self.writable=False
108                 else:
109                         self.ptr=libcrypto.X509_NAME_new()
110                         self.need_free=True
111                         self.writable=True
112         def __del__(self):
113                 """
114                 Frees if neccessary
115                 """
116                 if self.need_free:
117                         libcrypto.X509_NAME_free(self.ptr)
118         def __str__(self):
119                 """
120                 Produces an ascii representation of the name, escaping all symbols > 0x80
121                 Probably it is not what you want, unless your native language is English
122                 """
123                 b=Membio()
124                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
125                 return str(b)
126         def __unicode__(self):
127                 """
128                 Produces unicode representation of the name. 
129                 """
130                 b=Membio()
131                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
132                 return unicode(b)
133         def __len__(self):
134                 """
135                 return number of components in the name
136                 """
137                 return libcrypto.X509_NAME_entry_count(self.ptr)
138         def __cmp__(self,other):
139                 """
140                 Compares X509 names
141                 """
142                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
143         def __eq__(self,other):
144                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
145
146         def __getitem__(self,key):
147                 if isinstance(key,Oid):
148                         # Return first matching field
149                         idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
150                         if idx<0:
151                                 raise KeyError("Key not found "+str(Oid))
152                         entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
153                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
154                         b=Membio()
155                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
156                         return unicode(b)
157                 elif isinstance(key,(int,long)):
158                         # Return OID, string tuple
159                         entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
160                         if entry is None:
161                                 raise IndexError("name entry index out of range")
162                         oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
163                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
164                         b=Membio()
165                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
166                         return (oid,unicode(b))
167                 else:
168                         raise TypeError("X509 NAME can be indexed by Oids or integers only")
169
170         def __setitem__(self,key,val):
171                 if not self.writable:
172                         raise ValueError("Attempt to modify constant X509 object")
173                 else:
174                         raise NotImplementedError
175         def __delitem__(self,key):
176                 if not self.writable:
177                         raise ValueError("Attempt to modify constant X509 object")
178                 else:
179                         raise NotImplementedError
180         def __hash__(self):
181                 return libcrypto.X509_NAME_hash(self.ptr)
182
183 class _x509_ext(Structure):
184         """ Represens C structure X509_EXTENSION """
185         _fields_=[("object",c_void_p),
186                         ("critical",c_int),
187                         ("value",c_void_p)]
188
189 class X509_EXT(object):
190         """ Python object which represents a certificate extension """
191         def __init__(self,ptr,copy=False):
192                 """ Initializes from the pointer to X509_EXTENSION.
193                         If copy is True, creates a copy, otherwise just
194                         stores pointer.
195                 """
196                 if copy:
197                         self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
198                 else:
199                         self.ptr=cast(ptr,POINTER(_x509_ext))
200         def __del__(self):
201                 libcrypto.X509_EXTENSION_free(self.ptr)
202         def __str__(self):
203                 b=Membio()
204                 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
205                 return str(b)
206         def __unicode__(self):
207                 b=Membio()
208                 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
209                 return unicode(b)
210         @property
211         def oid(self):
212                 return Oid.fromobj(self.ptr[0].object)
213         @property
214         def critical(self):     
215                 return self.ptr[0].critical >0
216 class _X509extlist(object):     
217         """
218         Represents list of certificate extensions
219         """
220         def __init__(self,cert):
221                 self.cert=cert
222         def __len__(self):
223                 return libcrypto.X509_get_ext_count(self.cert.cert)
224         def __getitem__(self,item):
225                 p=libcrypto.X509_get_ext(self.cert.cert,item)
226                 if p is None:
227                         raise IndexError
228                 return X509_EXT(p,True)
229         def find(self,oid):
230                 """
231                 Return list of extensions with given Oid
232                 """
233                 if not isinstance(oid,Oid):
234                         raise TypeError("Need crytypescrypto.oid.Oid as argument")
235                 found=[]
236                 l=-1
237                 end=len(self)
238                 while True:
239                         l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
240                         if l>=end or l<0:
241                                  break
242                         found.append(self[l])
243                 return found
244         def find_critical(self,crit=True):
245                 """
246                 Return list of critical extensions (or list of non-cricital, if
247                 optional second argument is False
248                 """
249                 if crit:
250                         flag=1
251                 else:
252                         flag=0
253                 found=[]
254                 end=len(self)
255                 l=-1
256                 while True:
257                         l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
258                         if l>=end or l<0:
259                                  break
260                         found.append(self[l])
261                 return found                    
262
263 class X509(object):
264         """
265         Represents X.509 certificate. 
266         """
267         def __init__(self,data=None,ptr=None,format="PEM"):
268                 """
269                 Initializes certificate
270                 @param data - serialized certificate in PEM or DER format.
271                 @param ptr - pointer to X509, returned by some openssl function. 
272                         mutually exclusive with data
273                 @param format - specifies data format. "PEM" or "DER", default PEM
274                 """
275                 if ptr is not None:
276                         if data is not None: 
277                                 raise TypeError("Cannot use data and ptr simultaneously")
278                         self.cert = ptr
279                 elif data is None:
280                         raise TypeError("data argument is required")
281                 else:
282                         b=Membio(data)
283                         if format == "PEM":
284                                 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
285                         else:
286                                 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
287                         if self.cert is None:
288                                 raise X509Error("error reading certificate")
289                 self.extensions=_X509extlist(self)              
290         def __del__(self):
291                 """
292                 Frees certificate object
293                 """
294                 libcrypto.X509_free(self.cert)
295         def __str__(self):
296                 """ Returns der string of the certificate """
297                 b=Membio()
298                 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
299                         raise X509Error("error serializing certificate")
300                 return str(b)
301         def __repr__(self):
302                 """ Returns valid call to the constructor """
303                 return "X509(data="+repr(str(self))+",format='DER')"
304         @property
305         def pubkey(self):
306                 """EVP PKEy object of certificate public key"""
307                 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
308         def pem(self):
309                 """ Returns PEM represntation of the certificate """
310                 b=Membio()
311                 if libcrypto.PEM_write_bio_X509(b.bio,self.cert)==0:
312                         raise X509Error("error serializing certificate")
313                 return str(b)
314         def verify(self,store=None,chain=[],key=None):  
315                 """ 
316                 Verify self. Supports verification on both X509 store object 
317                 or just public issuer key
318                 @param store X509Store object.
319                 @param chain - list of X509 objects to add into verification
320                         context.These objects are untrusted, but can be used to
321                         build certificate chain up to trusted object in the store
322                 @param key - PKey object with open key to validate signature
323                 
324                 parameters store and key are mutually exclusive. If neither 
325                 is specified, attempts to verify self as self-signed certificate
326                 """
327                 if store is not None and key is not None:
328                         raise X509Error("key and store cannot be specified simultaneously")
329                 if store is not None:
330                         ctx=libcrypto.X509_STORE_CTX_new()
331                         if ctx is None:
332                                 raise X509Error("Error allocating X509_STORE_CTX")
333                         if chain is not None and len(chain)>0:
334                                 ch=StackOfX509(chain)
335                         else:
336                                 ch=None
337                         if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
338                                 raise X509Error("Error allocating X509_STORE_CTX")
339                         res= libcrypto.X509_verify_cert(ctx)
340                         libcrypto.X509_STORE_CTX_free(ctx)
341                         return res>0
342                 else:
343                         if key is None:
344                                 if self.issuer != self.subject:
345                                         # Not a self-signed certificate
346                                         return False
347                                 key = self.pubkey
348                         res = libcrypto.X509_verify(self.cert,key.key)
349                         if res < 0:
350                                 raise X509Error("X509_verify failed")
351                         return res>0
352                         
353         @property
354         def subject(self):
355                 """ X509Name for certificate subject name """
356                 return X509Name(libcrypto.X509_get_subject_name(self.cert))
357         @property
358         def issuer(self):
359                 """ X509Name for certificate issuer name """
360                 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
361         @property
362         def serial(self):
363                 """ Serial number of certificate as integer """
364                 asnint=libcrypto.X509_get_serialNumber(self.cert)
365                 b=Membio()
366                 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
367                 return int(str(b),16)
368         @property 
369         def version(self):
370                 """ certificate version as integer. Really certificate stores 0 for
371                 version 1 and 2 for version 3, but we return 1 and 3 """
372                 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
373                 return libcrypto.ASN1_INTEGER_get(asn1int)+1
374         @property
375         def startDate(self):
376                 """ Certificate validity period start date """
377                 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
378                 global utc
379                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
380                 b=Membio()
381                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
382                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
383         @property
384         def endDate(self):
385                 """ Certificate validity period end date """
386                 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
387                 global utc
388                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
389                 b=Membio()
390                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
391                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
392         def check_ca(self):
393                 """ Returns True if certificate is CA certificate """
394                 return libcrypto.X509_check_ca(self.cert)>0
395 class X509Store(object):
396         """
397                 Represents trusted certificate store. Can be used to lookup CA 
398                 certificates to verify
399
400                 @param file - file with several certificates and crls 
401                                 to load into store
402                 @param dir - hashed directory with certificates and crls
403                 @param default - if true, default verify location (directory) 
404                         is installed
405
406         """
407         def __init__(self,file=None,dir=None,default=False):
408                 """
409                 Creates X509 store and installs lookup method. Optionally initializes 
410                 by certificates from given file or directory.
411                 """
412                 #
413                 # Todo - set verification flags
414                 # 
415                 self.store=libcrypto.X509_STORE_new()
416                 if self.store is None:
417                         raise X509Error("allocating store")
418                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
419                 if lookup is None:
420                         raise X509Error("error installing file lookup method")
421                 if (file is not None):
422                         if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
423                                 raise X509Error("error loading trusted certs from file "+file)
424                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
425                 if lookup is None:
426                         raise X509Error("error installing hashed lookup method")
427                 if dir is not None:
428                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
429                                 raise X509Error("error adding hashed  trusted certs dir "+dir)
430                 if default:
431                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
432                                 raise X509Error("error adding default trusted certs dir ")
433         def add_cert(self,cert):
434                 """
435                 Explicitely adds certificate to set of trusted in the store
436                 @param cert - X509 object to add
437                 """
438                 if not isinstance(cert,X509):
439                         raise TypeError("cert should be X509")
440                 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
441         def add_callback(self,callback):
442                 """
443                 Installs callback function, which would receive detailed information
444                 about verified ceritificates
445                 """
446                 raise NotImplementedError
447         def setflags(self,flags):
448                 """
449                 Set certificate verification flags.
450                 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
451                 """
452                 libcrypto.X509_STORE_set_flags(self.store,flags)        
453         def setpurpose(self,purpose):
454                 """
455                 Sets certificate purpose which verified certificate should match
456                 @param purpose - number from 1 to 9 or standard strind defined in Openssl
457                 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
458                 """
459                 if isinstance(purpose,str):
460                         purp_no=X509_PURPOSE_get_by_sname(purpose)
461                         if purp_no <=0:
462                                 raise X509Error("Invalid certificate purpose '"+purpose+"'")
463                 elif isinstance(purpose,int):
464                         purp_no = purpose
465                 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
466                         raise X509Error("cannot set purpose")
467         def setdepth(self,depth):
468                 """
469                 Sets the verification depth i.e. max length of certificate chain
470                 which is acceptable
471                 """
472                 libcrypto.X509_STORE_set_depth(self.store,depth)
473         def settime(self, time):
474                 """
475                 Set point in time used to check validity of certificates for
476                 Time can be either python datetime object or number of seconds
477                 sinse epoch
478                 """
479                 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
480                         d=int(time.strftime("%s"))
481                 elif isinstance(time,int):
482                         pass
483                 else:
484                         raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
485                 raise NotImplementedError
486 class StackOfX509(object):
487         """
488         Implements OpenSSL STACK_OF(X509) object.
489         It looks much like python container types
490         """
491         def __init__(self,certs=None,ptr=None,disposable=True):
492                 """
493                 Create stack
494                 @param certs - list of X509 objects. If specified, read-write
495                         stack is created and populated by these certificates
496                 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
497                         some functions
498                 @param disposable - if True, stack created from object, returned
499                                 by function is copy, and can be modified and need to be
500                                 freeid. If false, it is just pointer into another
501                                 structure i.e. CMS_ContentInfo
502                 """
503                 if  ptr is None:
504                         self.need_free = True
505                         self.ptr=libcrypto.sk_new_null()
506                         if certs is not None:
507                                 for crt in certs:
508                                         self.append(crt)
509                 elif certs is not None:
510                                 raise ValueError("cannot handle certs an ptr simultaneously")
511                 else:
512                         self.need_free = disposable
513                         self.ptr=ptr
514         def __len__(self):
515                 return libcrypto.sk_num(self.ptr)
516         def __getitem__(self,index):
517                 if index <0 or index>=len(self):
518                         raise IndexError
519                 p=libcrypto.sk_value(self.ptr,index)
520                 return X509(ptr=libcrypto.X509_dup(p))
521         def __setitem__(self,index,value):
522                 if not self.need_free:
523                         raise ValueError("Stack is read-only")
524                 if index <0 or index>=len(self):
525                         raise IndexError
526                 if not isinstance(value,X509):
527                         raise TypeError('StackOfX508 can contain only X509 objects')
528                 p=libcrypto.sk_value(self.ptr,index)
529                 libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
530                 libcrypto.X509_free(p)
531         def __delitem__(self,index):    
532                 if not self.need_free:
533                         raise ValueError("Stack is read-only")
534                 if index <0 or index>=len(self):
535                         raise IndexError
536                 p=libcrypto.sk_delete(self.ptr,index)
537                 libcrypto.X509_free(p)
538         def __del__(self):
539                 if self.need_free:
540                         libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
541         def append(self,value):
542                 if not self.need_free:
543                         raise ValueError("Stack is read-only")
544                 if not isinstance(value,X509):
545                         raise TypeError('StackOfX508 can contain only X509 objects')
546                 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
547 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
548 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
549 libcrypto.PEM_read_bio_X509.restype=c_void_p
550 libcrypto.PEM_read_bio_X509.argtypes=(c_void_p,POINTER(c_void_p),c_void_p,c_void_p)
551 libcrypto.PEM_write_bio_X509.restype=c_int
552 libcrypto.PEM_write_bio_X509.argtypes=(c_void_p,c_void_p)
553 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
554 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
555 libcrypto.ASN1_INTEGER_get.restype=c_long
556 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
557 libcrypto.X509_get_serialNumber.restype=c_void_p
558 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
559 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
560 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
561 libcrypto.X509_NAME_get_entry.restype=c_void_p
562 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
563 libcrypto.X509_STORE_new.restype=c_void_p
564 libcrypto.X509_STORE_add_lookup.restype=c_void_p
565 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
566 libcrypto.X509_LOOKUP_file.restype=c_void_p
567 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
568 libcrypto.X509_LOOKUP_ctrl.restype=c_int
569 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
570 libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
571 libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
572 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
573 libcrypto.X509_get_ext.restype=c_void_p
574 libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
575 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
576 libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p)
577 libcrypto.sk_set.restype=c_void_p
578 libcrypto.sk_value.argtypes=(c_void_p,c_int)
579 libcrypto.sk_value.restype=c_void_p
580 libcrypto.X509_dup.restype=c_void_p
581 libcrypto.sk_new_null.restype=c_void_p
582 libcrypto.X509_dup.argtypes=(c_void_p,)
583 libcrypto.X509_NAME_hash.restype=c_long
584 libcrypto.X509_NAME_hash.argtypes=(c_void_p,)