]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blob - ctypescrypto/x509.py
4f086328f2ee143afed0c939c6efbb9d7631be22
[oss/ctypescrypto.git] / ctypescrypto / x509.py
1 """
2 Implements interface to openssl X509 and X509Store structures, 
3 I.e allows to load, analyze and verify certificates.
4
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
7 """
8
9
10
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
18 try:
19         from pytz import utc
20 except ImportError:
21         from datetime import timedelta,tzinfo
22         ZERO=timedelta(0)
23         class UTC(tzinfo):
24                 """tzinfo object for UTC. 
25                         If no pytz is available, we would use it.
26                 """
27
28                 def utcoffset(self, dt):
29                         return ZERO
30
31                 def tzname(self, dt):
32                         return "UTC"
33
34                 def dst(self, dt):
35                         return ZERO
36
37         utc=UTC()
38
39 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
40
41 class _validity(Structure):
42         """ ctypes representation of X509_VAL structure 
43                 needed to access certificate validity period, because openssl
44                 doesn't provide fuctions for it - only macros
45         """
46         _fields_ =      [('notBefore',c_void_p),('notAfter',c_void_p)]
47
48 class _cinf(Structure):
49         """ ctypes representtion of X509_CINF structure 
50             neede to access certificate data, which are accessable only
51                 via macros
52         """
53         _fields_ = [('version',c_void_p),
54                 ('serialNumber',c_void_p),
55                 ('sign_alg',c_void_p),
56                 ('issuer',c_void_p),
57                 ('validity',POINTER(_validity)),
58                 ('subject',c_void_p),
59                 ('pubkey',c_void_p),
60                 ('issuerUID',c_void_p),
61                 ('subjectUID',c_void_p),
62                 ('extensions',c_void_p),
63                 ]
64
65 class _x509(Structure):
66         """
67         ctypes represntation of X509 structure needed
68         to access certificate data which are accesable only via
69         macros, not functions
70         """
71         _fields_ = [('cert_info',POINTER(_cinf)),
72                                 ('sig_alg',c_void_p),
73                                 ('signature',c_void_p),
74                                 # There are a lot of parsed extension fields there
75                                 ]
76 _px509 = POINTER(_x509)
77
78 class X509Error(LibCryptoError):
79         """
80         Exception, generated when some openssl function fail
81         during X509 operation
82         """
83         pass
84
85
86 class X509Name(object):
87         """
88         Class which represents X.509 distinguished name - typically 
89         a certificate subject name or an issuer name.
90
91         Now used only to represent information, extracted from the
92         certificate. Potentially can be also used to build DN when creating
93         certificate signing request
94         """
95         # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
96         PRINT_FLAG=0x10010
97         ESC_MSB=4
98         def __init__(self,ptr=None,copy=False):
99                 """
100                 Creates a X509Name object
101                 @param ptr - pointer to X509_NAME C structure (as returned by some  OpenSSL functions
102                 @param copy - indicates that this structure have to be freed upon object destruction
103                 """
104                 if ptr is not None:
105                         self.ptr=ptr
106                         self.need_free=copy
107                         self.writable=False
108                 else:
109                         self.ptr=libcrypto.X509_NAME_new()
110                         self.need_free=True
111                         self.writable=True
112         def __del__(self):
113                 """
114                 Frees if neccessary
115                 """
116                 if self.need_free:
117                         libcrypto.X509_NAME_free(self.ptr)
118         def __str__(self):
119                 """
120                 Produces an ascii representation of the name, escaping all symbols > 0x80
121                 Probably it is not what you want, unless your native language is English
122                 """
123                 b=Membio()
124                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
125                 return str(b)
126         def __unicode__(self):
127                 """
128                 Produces unicode representation of the name. 
129                 """
130                 b=Membio()
131                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
132                 return unicode(b)
133         def __len__(self):
134                 """
135                 return number of components in the name
136                 """
137                 return libcrypto.X509_NAME_entry_count(self.ptr)
138         def __cmp__(self,other):
139                 """
140                 Compares X509 names
141                 """
142                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
143         def __eq__(self,other):
144                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
145
146         def __getitem__(self,key):
147                 if isinstance(key,Oid):
148                         # Return first matching field
149                         idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
150                         if idx<0:
151                                 raise KeyError("Key not found "+str(Oid))
152                         entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
153                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
154                         b=Membio()
155                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
156                         return unicode(b)
157                 elif isinstance(key,int):
158                         # Return OID, string tuple
159                         entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
160                         if entry is None:
161                                 raise IndexError("name entry index out of range")
162                         oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry))
163                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
164                         b=Membio()
165                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
166                         return (oid,unicode(b))
167
168         def __setitem__(self,key,val):
169                 if not self.writable:
170                         raise ValueError("Attempt to modify constant X509 object")
171                 else:
172                         raise NotImplementedError
173
174 class _x509_ext(Structure):
175         """ Represens C structure X509_EXTENSION """
176         _fields_=[("object",c_void_p),
177                         ("critical",c_int),
178                         ("value",c_void_p)]
179
180 class X509_EXT(object):
181         """ Python object which represents a certificate extension """
182         def __init__(self,ptr,copy=False):
183                 """ Initializes from the pointer to X509_EXTENSION.
184                         If copy is True, creates a copy, otherwise just
185                         stores pointer.
186                 """
187                 if copy:
188                         self.ptr=libcrypto.X509_EXTENSION_dup(ptr)
189                 else:
190                         self.ptr=cast(ptr,POINTER(_x509_ext))
191         def __del__(self):
192                 libcrypto.X509_EXTENSION_free(self.ptr)
193         def __str__(self):
194                 b=Membio()
195                 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
196                 return str(b)
197         def __unicode__(self):
198                 b=Membio()
199                 libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0)
200                 return unicode(b)
201         @property
202         def oid(self):
203                 return Oid.fromobj(self.ptr[0].object)
204         @property
205         def critical(self):     
206                 return self.ptr[0].critical >0
207 class _X509extlist(object):     
208         """
209         Represents list of certificate extensions
210         """
211         def __init__(self,cert):
212                 self.cert=cert
213         def __len__(self):
214                 return libcrypto.X509_get_ext_count(self.cert.cert)
215         def __getitem__(self,item):
216                 p=libcrypto.X509_get_ext(self.cert.cert,item)
217                 if p is None:
218                         raise IndexError
219                 return X509_EXT(p,True)
220         def find(self,oid):
221                 """
222                 Return list of extensions with given Oid
223                 """
224                 if not isinstance(oid,Oid):
225                         raise TypeError("Need crytypescrypto.oid.Oid as argument")
226                 found=[]
227                 l=-1
228                 end=len(self)
229                 while True:
230                         l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l)
231                         if l>=end or l<0:
232                                  break
233                         found.append(self[l])
234                 return found
235         def find_critical(self,crit=True):
236                 """
237                 Return list of critical extensions (or list of non-cricital, if
238                 optional second argument is False
239                 """
240                 if crit:
241                         flag=1
242                 else:
243                         flag=0
244                 found=[]
245                 end=len(self)
246                 l=-1
247                 while True:
248                         l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l)
249                         if l>=end or l<0:
250                                  break
251                         found.append(self[l])
252                 return found                    
253
254 class X509(object):
255         """
256         Represents X.509 certificate. 
257         """
258         def __init__(self,data=None,ptr=None,format="PEM"):
259                 """
260                 Initializes certificate
261                 @param data - serialized certificate in PEM or DER format.
262                 @param ptr - pointer to X509, returned by some openssl function. 
263                         mutually exclusive with data
264                 @param format - specifies data format. "PEM" or "DER", default PEM
265                 """
266                 if ptr is not None:
267                         if data is not None: 
268                                 raise TypeError("Cannot use data and ptr simultaneously")
269                         self.cert = ptr
270                 elif data is None:
271                         raise TypeError("data argument is required")
272                 else:
273                         b=Membio(data)
274                         if format == "PEM":
275                                 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
276                         else:
277                                 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
278                         if self.cert is None:
279                                 raise X509Error("error reading certificate")
280                 self.extensions=_X509extlist(self)              
281         def __del__(self):
282                 """
283                 Frees certificate object
284                 """
285                 libcrypto.X509_free(self.cert)
286         def __str__(self):
287                 """ Returns der string of the certificate """
288                 b=Membio()
289                 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
290                         raise X509Error("error serializing certificate")
291                 return str(b)
292         def __repr__(self):
293                 """ Returns valid call to the constructor """
294                 return "X509(data="+repr(str(self))+",format='DER')"
295         @property
296         def pubkey(self):
297                 """EVP PKEy object of certificate public key"""
298                 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
299         def verify(self,store=None,chain=[],key=None):  
300                 """ 
301                 Verify self. Supports verification on both X509 store object 
302                 or just public issuer key
303                 @param store X509Store object.
304                 @param chain - list of X509 objects to add into verification
305                         context.These objects are untrusted, but can be used to
306                         build certificate chain up to trusted object in the store
307                 @param key - PKey object with open key to validate signature
308                 
309                 parameters store and key are mutually exclusive. If neither 
310                 is specified, attempts to verify self as self-signed certificate
311                 """
312                 if store is not None and key is not None:
313                         raise X509Error("key and store cannot be specified simultaneously")
314                 if store is not None:
315                         ctx=libcrypto.X509_STORE_CTX_new()
316                         if ctx is None:
317                                 raise X509Error("Error allocating X509_STORE_CTX")
318                         if chain is not None and len(chain)>0:
319                                 ch=StackOfX509(chain)
320                         else:
321                                 ch=None
322                         if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
323                                 raise X509Error("Error allocating X509_STORE_CTX")
324                         res= libcrypto.X509_verify_cert(ctx)
325                         libcrypto.X509_STORE_CTX_free(ctx)
326                         return res>0
327                 else:
328                         if key is None:
329                                 if self.issuer != self.subject:
330                                         # Not a self-signed certificate
331                                         return False
332                                 key = self.pubkey
333                         res = libcrypto.X509_verify(self.cert,key.key)
334                         if res < 0:
335                                 raise X509Error("X509_verify failed")
336                         return res>0
337                         
338         @property
339         def subject(self):
340                 """ X509Name for certificate subject name """
341                 return X509Name(libcrypto.X509_get_subject_name(self.cert))
342         @property
343         def issuer(self):
344                 """ X509Name for certificate issuer name """
345                 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
346         @property
347         def serial(self):
348                 """ Serial number of certificate as integer """
349                 asnint=libcrypto.X509_get_serialNumber(self.cert)
350                 b=Membio()
351                 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
352                 return int(str(b),16)
353         @property 
354         def version(self):
355                 """ certificate version as integer. Really certificate stores 0 for
356                 version 1 and 2 for version 3, but we return 1 and 3 """
357                 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
358                 return libcrypto.ASN1_INTEGER_get(asn1int)+1
359         @property
360         def startDate(self):
361                 """ Certificate validity period start date """
362                 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
363                 global utc
364                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
365                 b=Membio()
366                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
367                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
368         @property
369         def endDate(self):
370                 """ Certificate validity period end date """
371                 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
372                 global utc
373                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
374                 b=Membio()
375                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
376                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
377         def check_ca(self):
378                 """ Returns True if certificate is CA certificate """
379                 return libcrypto.X509_check_ca(self.cert)>0
380 class X509Store(object):
381         """
382                 Represents trusted certificate store. Can be used to lookup CA 
383                 certificates to verify
384
385                 @param file - file with several certificates and crls 
386                                 to load into store
387                 @param dir - hashed directory with certificates and crls
388                 @param default - if true, default verify location (directory) 
389                         is installed
390
391         """
392         def __init__(self,file=None,dir=None,default=False):
393                 """
394                 Creates X509 store and installs lookup method. Optionally initializes 
395                 by certificates from given file or directory.
396                 """
397                 #
398                 # Todo - set verification flags
399                 # 
400                 self.store=libcrypto.X509_STORE_new()
401                 if self.store is None:
402                         raise X509Error("allocating store")
403                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
404                 if lookup is None:
405                         raise X509Error("error installing file lookup method")
406                 if (file is not None):
407                         if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
408                                 raise X509Error("error loading trusted certs from file "+file)
409                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
410                 if lookup is None:
411                         raise X509Error("error installing hashed lookup method")
412                 if dir is not None:
413                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
414                                 raise X509Error("error adding hashed  trusted certs dir "+dir)
415                 if default:
416                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
417                                 raise X509Error("error adding default trusted certs dir ")
418         def add_cert(self,cert):
419                 """
420                 Explicitely adds certificate to set of trusted in the store
421                 @param cert - X509 object to add
422                 """
423                 if not isinstance(cert,X509):
424                         raise TypeError("cert should be X509")
425                 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
426         def add_callback(self,callback):
427                 """
428                 Installs callback function, which would receive detailed information
429                 about verified ceritificates
430                 """
431                 raise NotImplementedError
432         def setflags(self,flags):
433                 """
434                 Set certificate verification flags.
435                 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
436                 """
437                 libcrypto.X509_STORE_set_flags(self.store,flags)        
438         def setpurpose(self,purpose):
439                 """
440                 Sets certificate purpose which verified certificate should match
441                 @param purpose - number from 1 to 9 or standard strind defined in Openssl
442                 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
443                 """
444                 if isinstance(purpose,str):
445                         purp_no=X509_PURPOSE_get_by_sname(purpose)
446                         if purp_no <=0:
447                                 raise X509Error("Invalid certificate purpose '"+purpose+"'")
448                 elif isinstance(purpose,int):
449                         purp_no = purpose
450                 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
451                         raise X509Error("cannot set purpose")
452         def setdepth(self,depth):
453                 """
454                 Sets the verification depth i.e. max length of certificate chain
455                 which is acceptable
456                 """
457                 libcrypto.X509_STORE_set_depth(self.store,depth)
458         def settime(self, time):
459                 """
460                 Set point in time used to check validity of certificates for
461                 Time can be either python datetime object or number of seconds
462                 sinse epoch
463                 """
464                 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
465                         d=int(time.strftime("%s"))
466                 elif isinstance(time,int):
467                         pass
468                 else:
469                         raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
470                 raise NotImplementedError
471 class StackOfX509(object):
472         """
473         Implements OpenSSL STACK_OF(X509) object.
474         It looks much like python container types
475         """
476         def __init__(self,certs=None,ptr=None,disposable=True):
477                 """
478                 Create stack
479                 @param certs - list of X509 objects. If specified, read-write
480                         stack is created and populated by these certificates
481                 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
482                         some functions
483                 @param disposable - if True, stack created from object, returned
484                                 by function is copy, and can be modified and need to be
485                                 freeid. If false, it is just pointer into another
486                                 structure i.e. CMS_ContentInfo
487                 """
488                 if  ptr is None:
489                         self.need_free = True
490                         self.ptr=libcrypt.sk_new_null()
491                         if certs is not None:
492                                 for crt in certs:
493                                         self.append(crt)
494                 elif not certs is None:
495                                 raise ValueError("cannot handle certs an ptr simultaneously")
496                 else:
497                         self.need_free = disposable
498                         self.ptr=ptr
499         def __len__(self):
500                 return libcrypto.sk_num(self.ptr)
501         def __getitem__(self,index):
502                 if index <0 or index>=len(self):
503                         raise IndexError
504                 p=libcrypto.sk_value(self.ptr,index)
505                 return X509(ptr=libcrypto.X509_dup(p))
506         def __putitem__(self,index,value):
507                 if not self.need_free:
508                         raise ValueError("Stack is read-only")
509                 if index <0 or index>=len(self):
510                         raise IndexError
511                 p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
512                 libcrypto.X509_free(p)
513         def __delitem__(self,index):    
514                 if not self.need_free:
515                         raise ValueError("Stack is read-only")
516                 if index <0 or index>=len(self):
517                         raise IndexError
518                 p=libcrypto.sk_delete(self.ptr,index)
519                 libcrypto.X509_free(p)
520         def __del__(self):
521                 if self.need_free:
522                         libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
523         def append(self,value):
524                 if not self.need_free:
525                         raise ValueError("Stack is read-only")
526                 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
527 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
528 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
529 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
530 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
531 libcrypto.ASN1_INTEGER_get.restype=c_long
532 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
533 libcrypto.X509_get_serialNumber.restype=c_void_p
534 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
535 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
536 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
537 libcrypto.X509_NAME_get_entry.restype=c_void_p
538 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
539 libcrypto.X509_STORE_new.restype=c_void_p
540 libcrypto.X509_STORE_add_lookup.restype=c_void_p
541 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
542 libcrypto.X509_LOOKUP_file.restype=c_void_p
543 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
544 libcrypto.X509_LOOKUP_ctrl.restype=c_int
545 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
546 libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,)
547 libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext)
548 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)
549 libcrypto.X509_get_ext.restype=c_void_p
550 libcrypto.X509_get_ext.argtypes=(c_void_p,c_int)
551 libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int)