]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blob - ctypescrypto/x509.py
Implemented minimal cert extension support
[oss/ctypescrypto.git] / ctypescrypto / x509.py
1 """
2 Implements interface to openssl X509 and X509Store structures, 
3 I.e allows to load, analyze and verify certificates.
4
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
7 """
8
9
10
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
18 try:
19         from pytz import utc
20 except ImportError:
21         from datetime import timedelta
22         ZERO=timedelta(0)
23         class UTC(datetime.tzinfo):
24                 """tzinfo object for UTC. 
25                         If no pytz is available, we would use it.
26                 """
27
28                 def utcoffset(self, dt):
29                         return ZERO
30
31                 def tzname(self, dt):
32                         return "UTC"
33
34                 def dst(self, dt):
35                         return ZERO
36
37         utc=UTC()
38
39 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
40
41 class _validity(Structure):
42         """ ctypes representation of X509_VAL structure 
43                 needed to access certificate validity period, because openssl
44                 doesn't provide fuctions for it - only macros
45         """
46         _fields_ =      [('notBefore',c_void_p),('notAfter',c_void_p)]
47
48 class _cinf(Structure):
49         """ ctypes representtion of X509_CINF structure 
50             neede to access certificate data, which are accessable only
51                 via macros
52         """
53         _fields_ = [('version',c_void_p),
54                 ('serialNumber',c_void_p),
55                 ('sign_alg',c_void_p),
56                 ('issuer',c_void_p),
57                 ('validity',POINTER(_validity)),
58                 ('subject',c_void_p),
59                 ('pubkey',c_void_p),
60                 ('issuerUID',c_void_p),
61                 ('subjectUID',c_void_p),
62                 ('extensions',c_void_p),
63                 ]
64
65 class _x509(Structure):
66         """
67         ctypes represntation of X509 structure needed
68         to access certificate data which are accesable only via
69         macros, not functions
70         """
71         _fields_ = [('cert_info',POINTER(_cinf)),
72                                 ('sig_alg',c_void_p),
73                                 ('signature',c_void_p),
74                                 # There are a lot of parsed extension fields there
75                                 ]
76 _px509 = POINTER(_x509)
77
78 class X509Error(LibCryptoError):
79         """
80         Exception, generated when some openssl function fail
81         during X509 operation
82         """
83         pass
84
85
86 class X509Name:
87         """
88         Class which represents X.509 distinguished name - typically 
89         a certificate subject name or an issuer name.
90
91         Now used only to represent information, extracted from the
92         certificate. Potentially can be also used to build DN when creating
93         certificate signing request
94         """
95         # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
96         PRINT_FLAG=0x10010
97         ESC_MSB=4
98         def __init__(self,ptr=None,copy=False):
99                 """
100                 Creates a X509Name object
101                 @param ptr - pointer to X509_NAME C structure (as returned by some  OpenSSL functions
102                 @param copy - indicates that this structure have to be freed upon object destruction
103                 """
104                 if ptr is not None:
105                         self.ptr=ptr
106                         self.need_free=copy
107                         self.writable=False
108                 else:
109                         self.ptr=libcrypto.X509_NAME_new()
110                         self.need_free=True
111                         self.writable=True
112         def __del__(self):
113                 """
114                 Frees if neccessary
115                 """
116                 if self.need_free:
117                         libcrypto.X509_NAME_free(self.ptr)
118         def __str__(self):
119                 """
120                 Produces an ascii representation of the name, escaping all symbols > 0x80
121                 Probably it is not what you want, unless your native language is English
122                 """
123                 b=Membio()
124                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
125                 return str(b)
126         def __unicode__(self):
127                 """
128                 Produces unicode representation of the name. 
129                 """
130                 b=Membio()
131                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
132                 return unicode(b)
133         def __len__(self):
134                 """
135                 return number of components in the name
136                 """
137                 return libcrypto.X509_NAME_entry_count(self.ptr)
138         def __cmp__(self,other):
139                 """
140                 Compares X509 names
141                 """
142                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
143         def __eq__(self,other):
144                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
145
146         def __getitem__(self,key):
147                 if isinstance(key,Oid):
148                         # Return first matching field
149                         idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
150                         if idx<0:
151                                 raise KeyError("Key not found "+repr(Oid))
152                         entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
153                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
154                         b=Membio()
155                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
156                         return unicode(b)
157                 elif isinstance(key,int):
158                         # Return OID, string tuple
159                         entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
160                         if entry is None:
161                                 raise IndexError("name entry index out of range")
162                         oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
163                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
164                         b=Membio()
165                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
166                         return (oid,unicode(b))
167
168         def __setitem__(self,key,val):
169                 if not self.writable:
170                         raise ValueError("Attempt to modify constant X509 object")
171
172 class _x509_ext(Structure):
173         """ Represens C structure X509_EXTENSION """
174         _fields_=[("object",c_void_p),
175                         ("critical",c_int),
176                         ("value",c_void_p)]
177
178 class X509_EXT(object):
179         """ Python object which represents a certificate extension """
180         def __init__(self,ptr,copy=False):
181                 """ Initializes from the pointer to X509_EXTENSION.
182                         If copy is True, creates a copy, otherwise just
183                         stores pointer.
184                 """
185                 if copy:
186                         self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
187                 else:
188                         self.ptr=cast(ptr,POINTER(_x509_ext))
189         def __del__(self):
190                 libcrypto.X509_EXTENSION_free(self.ptr)
191         def __str__(self):
192                 b=Membio()
193                 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
194                 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
195                 return str(b)
196         def __unicode__(self):
197                 b=Membio()
198                 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
199                 return unicode(b)
200         @property
201         def oid(self):
202                 return Oid.fromobj(self.ptr[0].object)
203         @property
204         def critical(self):     
205                 return self.ptr[0].critical >0
206 class _X509extlist(object):     
207         """
208         Represents list of certificate extensions
209         """
210         def __init__(self,cert):
211                 self.cert=cert
212         def __len__(self):
213                 return libcrypto.X509_get_ext_count(self.cert.cert)
214         def __getitem__(self,item):
215                 p=libcrypto.X509_get_ext(self.cert.cert,item)
216                 if p is None:
217                         raise IndexError
218                 return X509_EXT(p,True)
219         def find(self,oid):
220                 """
221                 Return list of extensions with given Oid
222                 """
223                 if not isinstance(oid,Oid):
224                         raise TypeError("Need crytypescrypto.oid.Oid as argument")
225                 found=[]
226                 l=-1
227                 end=len(self)
228                 while True:
229                         l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
230                         if l>=end or l<0:
231                                  break
232                         found.append(self[l])
233                 return found
234         def find_critical(self,crit=True):
235                 """
236                 Return list of critical extensions (or list of non-cricital, if
237                 optional second argument is False
238                 """
239                 if crit:
240                         flag=1
241                 else:
242                         flag=0
243                 found=[]
244                 end=len(self)
245                 l=-1
246                 while True:
247                         l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
248                         if l>=end or l<0:
249                                  break
250                         found.append(self[l])
251                 return found                    
252
253 class X509(object):
254         """
255         Represents X.509 certificate. 
256         """
257         def __init__(self,data=None,ptr=None,format="PEM"):
258                 """
259                 Initializes certificate
260                 @param data - serialized certificate in PEM or DER format.
261                 @param ptr - pointer to X509, returned by some openssl function. 
262                         mutually exclusive with data
263                 @param format - specifies data format. "PEM" or "DER", default PEM
264                 """
265                 if ptr is not None:
266                         if data is not None: 
267                                 raise TypeError("Cannot use data and ptr simultaneously")
268                         self.cert = ptr
269                 elif data is None:
270                         raise TypeError("data argument is required")
271                 else:
272                         b=Membio(data)
273                         if format == "PEM":
274                                 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
275                         else:
276                                 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
277                         if self.cert is None:
278                                 raise X509Error("error reading certificate")
279                 self.extensions=_X509extlist(self)              
280         def __del__(self):
281                 """
282                 Frees certificate object
283                 """
284                 libcrypto.X509_free(self.cert)
285         def __str__(self):
286                 """ Returns der string of the certificate """
287                 b=Membio()
288                 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
289                         raise X509Error("error serializing certificate")
290                 return str(b)
291         def __repr__(self):
292                 """ Returns valid call to the constructor """
293                 return "X509(data="+repr(str(self))+",format='DER')"
294         @property
295         def pubkey(self):
296                 """EVP PKEy object of certificate public key"""
297                 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
298         def verify(self,store=None,chain=[],key=None):  
299                 """ 
300                 Verify self. Supports verification on both X509 store object 
301                 or just public issuer key
302                 @param store X509Store object.
303                 @param chain - list of X509 objects to add into verification
304                         context.These objects are untrusted, but can be used to
305                         build certificate chain up to trusted object in the store
306                 @param key - PKey object with open key to validate signature
307                 
308                 parameters store and key are mutually exclusive. If neither 
309                 is specified, attempts to verify self as self-signed certificate
310                 """
311                 if store is not None and key is not None:
312                         raise X509Error("key and store cannot be specified simultaneously")
313                 if store is not None:
314                         ctx=libcrypto.X509_STORE_CTX_new()
315                         if ctx is None:
316                                 raise X509Error("Error allocating X509_STORE_CTX")
317                         if chain is not None and len(chain)>0:
318                                 ch=StackOfX509(chain)
319                         else:
320                                 ch=None
321                         if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
322                                 raise X509Error("Error allocating X509_STORE_CTX")
323                         res= libcrypto.X509_verify_cert(ctx)
324                         libcrypto.X509_STORE_CTX_free(ctx)
325                         return res>0
326                 else:
327                         if key is None:
328                                 if self.issuer != self.subject:
329                                         # Not a self-signed certificate
330                                         return False
331                                 key = self.pubkey
332                         res = libcrypto.X509_verify(self.cert,key.key)
333                         if res < 0:
334                                 raise X509Error("X509_verify failed")
335                         return res>0
336                         
337         @property
338         def subject(self):
339                 """ X509Name for certificate subject name """
340                 return X509Name(libcrypto.X509_get_subject_name(self.cert))
341         @property
342         def issuer(self):
343                 """ X509Name for certificate issuer name """
344                 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
345         @property
346         def serial(self):
347                 """ Serial number of certificate as integer """
348                 asnint=libcrypto.X509_get_serialNumber(self.cert)
349                 b=Membio()
350                 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
351                 return int(str(b),16)
352         @property 
353         def version(self):
354                 """ certificate version as integer. Really certificate stores 0 for
355                 version 1 and 2 for version 3, but we return 1 and 3 """
356                 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
357                 return libcrypto.ASN1_INTEGER_get(asn1int)+1
358         @property
359         def startDate(self):
360                 """ Certificate validity period start date """
361                 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
362                 global utc
363                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
364                 b=Membio()
365                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
366                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
367         @property
368         def endDate(self):
369                 """ Certificate validity period end date """
370                 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
371                 global utc
372                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
373                 b=Membio()
374                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
375                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
376         def check_ca(self):
377                 """ Returns True if certificate is CA certificate """
378                 return libcrypto.X509_check_ca(self.cert)>0
379 class X509Store:
380         """
381                 Represents trusted certificate store. Can be used to lookup CA 
382                 certificates to verify
383
384                 @param file - file with several certificates and crls 
385                                 to load into store
386                 @param dir - hashed directory with certificates and crls
387                 @param default - if true, default verify location (directory) 
388                         is installed
389
390         """
391         def __init__(self,file=None,dir=None,default=False):
392                 """
393                 Creates X509 store and installs lookup method. Optionally initializes 
394                 by certificates from given file or directory.
395                 """
396                 #
397                 # Todo - set verification flags
398                 # 
399                 self.store=libcrypto.X509_STORE_new()
400                 if self.store is None:
401                         raise X509Error("allocating store")
402                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
403                 if lookup is None:
404                         raise X509Error("error installing file lookup method")
405                 if (file is not None):
406                         if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
407                                 raise X509Error("error loading trusted certs from file "+file)
408                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
409                 if lookup is None:
410                         raise X509Error("error installing hashed lookup method")
411                 if dir is not None:
412                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
413                                 raise X509Error("error adding hashed  trusted certs dir "+dir)
414                 if default:
415                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
416                                 raise X509Error("error adding default trusted certs dir ")
417         def add_cert(self,cert):
418                 """
419                 Explicitely adds certificate to set of trusted in the store
420                 @param cert - X509 object to add
421                 """
422                 if not isinstance(cert,X509):
423                         raise TypeError("cert should be X509")
424                 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
425         def add_callback(self,callback):
426                 """
427                 Installs callback function, which would receive detailed information
428                 about verified ceritificates
429                 """
430                 raise NotImplementedError
431         def setflags(self,flags):
432                 """
433                 Set certificate verification flags.
434                 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
435                 """
436                 libcrypto.X509_STORE_set_flags(self.store,flags)        
437         def setpurpose(self,purpose):
438                 """
439                 Sets certificate purpose which verified certificate should match
440                 @param purpose - number from 1 to 9 or standard strind defined in Openssl
441                 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
442                 """
443                 if isinstance(purpose,str):
444                         purp_no=X509_PURPOSE_get_by_sname(purpose)
445                         if purp_no <=0:
446                                 raise X509Error("Invalid certificate purpose '"+purpose+"'")
447                 elif isinstance(purpose,int):
448                         purp_no = purpose
449                 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
450                         raise X509Error("cannot set purpose")
451         def setdepth(self,depth):
452                 """
453                 Sets the verification depth i.e. max length of certificate chain
454                 which is acceptable
455                 """
456                 libcrypto.X509_STORE_set_depth(self.store,depth)
457         def settime(self, time):
458                 """
459                 Set point in time used to check validity of certificates for
460                 Time can be either python datetime object or number of seconds
461                 sinse epoch
462                 """
463                 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
464                         d=int(time.strftime("%s"))
465                 elif isinstance(time,int):
466                         pass
467                 else:
468                         raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
469                 raise NotImplementedError
470 class StackOfX509:
471         """
472         Implements OpenSSL STACK_OF(X509) object.
473         It looks much like python container types
474         """
475         def __init__(self,certs=None,ptr=None,disposable=True):
476                 """
477                 Create stack
478                 @param certs - list of X509 objects. If specified, read-write
479                         stack is created and populated by these certificates
480                 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
481                         some functions
482                 @param disposable - if True, stack created from object, returned
483                                 by function is copy, and can be modified and need to be
484                                 freeid. If false, it is just pointer into another
485                                 structure i.e. CMS_ContentInfo
486                 """
487                 if  ptr is None:
488                         self.need_free = True
489                         self.ptr=libcrypt.sk_new_null()
490                         if certs is not None:
491                                 for crt in certs:
492                                         self.append(crt)
493                 elif not certs is None:
494                                 raise ValueError("cannot handle certs an ptr simultaneously")
495                 else:
496                         self.need_free = disposable
497                         self.ptr=ptr
498         def __len__(self):
499                 return libcrypto.sk_num(self.ptr)
500         def __getitem__(self,index):
501                 if index <0 or index>=len(self):
502                         raise IndexError
503                 p=libcrypto.sk_value(self.ptr,index)
504                 return X509(ptr=libcrypto.X509_dup(p))
505         def __putitem__(self,index,value):
506                 if not self.need_free:
507                         raise ValueError("Stack is read-only")
508                 if index <0 or index>=len(self):
509                         raise IndexError
510                 p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
511                 libcrypto.X509_free(p)
512         def __delitem__(self,index):    
513                 if not self.need_free:
514                         raise ValueError("Stack is read-only")
515                 if index <0 or index>=len(self):
516                         raise IndexError
517                 p=libcrypto.sk_delete(self.ptr,index)
518                 libcrypto.X509_free(p)
519         def __del__(self):
520                 if self.need_free:
521                         libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
522         def append(self,value):
523                 if not self.need_free:
524                         raise ValueError("Stack is read-only")
525                 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
526 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
527 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
528 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
529 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
530 libcrypto.ASN1_INTEGER_get.restype=c_long
531 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
532 libcrypto.X509_get_serialNumber.restype=c_void_p
533 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
534 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
535 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
536 libcrypto.X509_NAME_get_entry.restype=c_void_p
537 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
538 libcrypto.X509_STORE_new.restype=c_void_p
539 libcrypto.X509_STORE_add_lookup.restype=c_void_p
540 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
541 libcrypto.X509_LOOKUP_file.restype=c_void_p
542 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
543 libcrypto.X509_LOOKUP_ctrl.restype=c_int
544 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
545 libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
546 libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
547 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
548 libcrypto.X509_get_ext.restype=c_void_p
549 libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)