]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blob - ctypescrypto/x509.py
bd056d32f6227bcba4593e26fd79d06504ee5097
[oss/ctypescrypto.git] / ctypescrypto / x509.py
1 """
2 Implements interface to openssl X509 and X509Store structures, 
3 I.e allows to load, analyze and verify certificates.
4
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
7 """
8
9
10
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17 from datetime import datetime
18 try:
19         from pytz import utc
20 except ImportError:
21         from datetime import timedelta
22         ZERO=timedelta(0)
23         class UTC(datetime.tzinfo):
24                 """tzinfo object for UTC. 
25                         If no pytz is available, we would use it.
26                 """
27
28                 def utcoffset(self, dt):
29                         return ZERO
30
31                 def tzname(self, dt):
32                         return "UTC"
33
34                 def dst(self, dt):
35                         return ZERO
36
37         utc=UTC()
38
39 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
40
41 class _validity(Structure):
42         """ ctypes representation of X509_VAL structure 
43                 needed to access certificate validity period, because openssl
44                 doesn't provide fuctions for it - only macros
45         """
46         _fields_ =      [('notBefore',c_void_p),('notAfter',c_void_p)]
47
48 class _cinf(Structure):
49         """ ctypes representtion of X509_CINF structure 
50             neede to access certificate data, which are accessable only
51                 via macros
52         """
53         _fields_ = [('version',c_void_p),
54                 ('serialNumber',c_void_p),
55                 ('sign_alg',c_void_p),
56                 ('issuer',c_void_p),
57                 ('validity',POINTER(_validity)),
58                 ('subject',c_void_p),
59                 ('pubkey',c_void_p),
60                 ]
61
62 class _x509(Structure):
63         """
64         ctypes represntation of X509 structure needed
65         to access certificate data which are accesable only via
66         macros, not functions
67         """
68         _fields_ = [('cert_info',POINTER(_cinf)),
69                                 ('sig_alg',c_void_p),
70                                 ('signature',c_void_p),
71                                 # There are a lot of parsed extension fields there
72                                 ]
73 _px509 = POINTER(_x509)
74
75 # X509_extlist is not exported yet, because is not implemented 
76 class X509Error(LibCryptoError):
77         """
78         Exception, generated when some openssl function fail
79         during X509 operation
80         """
81         pass
82
83
84 class X509Name:
85         """
86         Class which represents X.509 distinguished name - typically 
87         a certificate subject name or an issuer name.
88
89         Now used only to represent information, extracted from the
90         certificate. Potentially can be also used to build DN when creating
91         certificate signing request
92         """
93         # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
94         PRINT_FLAG=0x10010
95         ESC_MSB=4
96         def __init__(self,ptr=None,copy=False):
97                 """
98                 Creates a X509Name object
99                 @param ptr - pointer to X509_NAME C structure (as returned by some  OpenSSL functions
100                 @param copy - indicates that this structure have to be freed upon object destruction
101                 """
102                 if ptr is not None:
103                         self.ptr=ptr
104                         self.need_free=copy
105                         self.writable=False
106                 else:
107                         self.ptr=libcrypto.X509_NAME_new()
108                         self.need_free=True
109                         self.writable=True
110         def __del__(self):
111                 """
112                 Frees if neccessary
113                 """
114                 if self.need_free:
115                         libcrypto.X509_NAME_free(self.ptr)
116         def __str__(self):
117                 """
118                 Produces an ascii representation of the name, escaping all symbols > 0x80
119                 Probably it is not what you want, unless your native language is English
120                 """
121                 b=Membio()
122                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
123                 return str(b)
124         def __unicode__(self):
125                 """
126                 Produces unicode representation of the name. 
127                 """
128                 b=Membio()
129                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
130                 return unicode(b)
131         def __len__(self):
132                 """
133                 return number of components in the name
134                 """
135                 return libcrypto.X509_NAME_entry_count(self.ptr)
136         def __cmp__(self,other):
137                 """
138                 Compares X509 names
139                 """
140                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
141         def __eq__(self,other):
142                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
143
144         def __getitem__(self,key):
145                 if isinstance(key,Oid):
146                         # Return first matching field
147                         idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
148                         if idx<0:
149                                 raise KeyError("Key not found "+repr(Oid))
150                         entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
151                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
152                         b=Membio()
153                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
154                         return unicode(b)
155                 elif isinstance(key,int):
156                         # Return OID, string tuple
157                         entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
158                         if entry is None:
159                                 raise IndexError("name entry index out of range")
160                         obj=libcrypto.X509_NAME_ENTRY_get_object(entry)
161                         nid=libcrypto.OBJ_obj2nid(obj)
162                         if nid==0:
163                                 buf=create_string_buffer(80)
164                                 len=libcrypto.OBJ_obj2txt(buf,80,obj,1)
165                                 oid=Oid(buf[0:len])
166                         else:
167                                 oid=Oid(nid)
168                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
169                         b=Membio()
170                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
171                         return (oid,unicode(b))
172
173         def __setitem__(self,key,val):
174                 if not self.writable:
175                         raise ValueError("Attempt to modify constant X509 object")
176 class X509_extlist:
177         def __init__(self,ptr):
178                 self.ptr=ptr
179         def __del__(self):
180                 libcrypto.X509_NAME_free(self.ptr)
181         def __str__(self):
182                 raise NotImplementedError
183         def __len__(self):
184                 return libcrypto.X509_NAME_entry_count(self.ptr)
185
186         def __getattr__(self,key):
187                 raise NotImplementedError
188         def __setattr__(self,key,val):
189                 raise NotImplementedError
190
191         
192
193
194 class X509:
195         """
196         Represents X.509 certificate. 
197         """
198         def __init__(self,data=None,ptr=None,format="PEM"):
199                 """
200                 Initializes certificate
201                 @param data - serialized certificate in PEM or DER format.
202                 @param ptr - pointer to X509, returned by some openssl function. 
203                         mutually exclusive with data
204                 @param format - specifies data format. "PEM" or "DER", default PEM
205                 """
206                 if ptr is not None:
207                         if data is not None: 
208                                 raise TypeError("Cannot use data and ptr simultaneously")
209                         self.cert = ptr
210                 elif data is None:
211                         raise TypeError("data argument is required")
212                 else:
213                         b=Membio(data)
214                         if format == "PEM":
215                                 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
216                         else:
217                                 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
218                         if self.cert is None:
219                                 raise X509Error("error reading certificate")
220         def __del__(self):
221                 """
222                 Frees certificate object
223                 """
224                 libcrypto.X509_free(self.cert)
225         def __str__(self):
226                 """ Returns der string of the certificate """
227                 b=Membio()
228                 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
229                         raise X509Error("error serializing certificate")
230                 return str(b)
231         def __repr__(self):
232                 """ Returns valid call to the constructor """
233                 return "X509(data="+repr(str(self))+",format='DER')"
234         @property
235         def pubkey(self):
236                 """EVP PKEy object of certificate public key"""
237                 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
238         def verify(self,store=None,chain=[],key=None):  
239                 """ 
240                 Verify self. Supports verification on both X509 store object 
241                 or just public issuer key
242                 @param store X509Store object.
243                 @param chain - list of X509 objects to add into verification
244                         context.These objects are untrusted, but can be used to
245                         build certificate chain up to trusted object in the store
246                 @param key - PKey object with open key to validate signature
247                 
248                 parameters store and key are mutually exclusive. If neither 
249                 is specified, attempts to verify self as self-signed certificate
250                 """
251                 if store is not None and key is not None:
252                         raise X509Error("key and store cannot be specified simultaneously")
253                 if store is not None:
254                         ctx=libcrypto.X509_STORE_CTX_new()
255                         if ctx is None:
256                                 raise X509Error("Error allocating X509_STORE_CTX")
257                         if chain is not None and len(chain)>0:
258                                 ch=StackOfX509(chain)
259                         else:
260                                 ch=None
261                         if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
262                                 raise X509Error("Error allocating X509_STORE_CTX")
263                         res= libcrypto.X509_verify_cert(ctx)
264                         libcrypto.X509_STORE_CTX_free(ctx)
265                         return res>0
266                 else:
267                         if key is None:
268                                 if self.issuer != self.subject:
269                                         # Not a self-signed certificate
270                                         return False
271                                 key = self.pubkey
272                         res = libcrypto.X509_verify(self.cert,key.key)
273                         if res < 0:
274                                 raise X509Error("X509_verify failed")
275                         return res>0
276                         
277         @property
278         def subject(self):
279                 """ X509Name for certificate subject name """
280                 return X509Name(libcrypto.X509_get_subject_name(self.cert))
281         @property
282         def issuer(self):
283                 """ X509Name for certificate issuer name """
284                 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
285         @property
286         def serial(self):
287                 """ Serial number of certificate as integer """
288                 asnint=libcrypto.X509_get_serialNumber(self.cert)
289                 b=Membio()
290                 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
291                 return int(str(b),16)
292         @property 
293         def version(self):
294                 """ certificate version as integer. Really certificate stores 0 for
295                 version 1 and 2 for version 3, but we return 1 and 3 """
296                 asn1int=cast(self.cert,_px509)[0].cert_info[0].version
297                 return libcrypto.ASN1_INTEGER_get(asn1int)+1
298         @property
299         def startDate(self):
300                 """ Certificate validity period start date """
301                 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
302                 global utc
303                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore
304                 b=Membio()
305                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
306                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
307         @property
308         def endDate(self):
309                 """ Certificate validity period end date """
310                 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
311                 global utc
312                 asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter
313                 b=Membio()
314                 libcrypto.ASN1_TIME_print(b.bio,asn1date)
315                 return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc)
316         def extensions(self):
317                 """ Returns list of extensions """
318                 raise NotImplementedError
319         def check_ca(self):
320                 """ Returns True if certificate is CA certificate """
321                 return libcrypto.X509_check_ca(self.cert)>0
322 class X509Store:
323         """
324                 Represents trusted certificate store. Can be used to lookup CA 
325                 certificates to verify
326
327                 @param file - file with several certificates and crls 
328                                 to load into store
329                 @param dir - hashed directory with certificates and crls
330                 @param default - if true, default verify location (directory) 
331                         is installed
332
333         """
334         def __init__(self,file=None,dir=None,default=False):
335                 """
336                 Creates X509 store and installs lookup method. Optionally initializes 
337                 by certificates from given file or directory.
338                 """
339                 #
340                 # Todo - set verification flags
341                 # 
342                 self.store=libcrypto.X509_STORE_new()
343                 if self.store is None:
344                         raise X509Error("allocating store")
345                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
346                 if lookup is None:
347                         raise X509Error("error installing file lookup method")
348                 if (file is not None):
349                         if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
350                                 raise X509Error("error loading trusted certs from file "+file)
351                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
352                 if lookup is None:
353                         raise X509Error("error installing hashed lookup method")
354                 if dir is not None:
355                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
356                                 raise X509Error("error adding hashed  trusted certs dir "+dir)
357                 if default:
358                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
359                                 raise X509Error("error adding default trusted certs dir ")
360         def add_cert(self,cert):
361                 """
362                 Explicitely adds certificate to set of trusted in the store
363                 @param cert - X509 object to add
364                 """
365                 if not isinstance(cert,X509):
366                         raise TypeError("cert should be X509")
367                 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
368         def add_callback(self,callback):
369                 """
370                 Installs callbac function, which would receive detailed information
371                 about verified ceritificates
372                 """
373                 raise NotImplementedError
374         def setflags(self,flags):
375                 """
376                 Set certificate verification flags.
377                 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
378                 """
379                 libcrypto.X509_STORE_set_flags(self.store,flags)        
380         def setpurpose(self,purpose):
381                 """
382                 Sets certificate purpose which verified certificate should match
383                 @param purpose - number from 1 to 9 or standard strind defined in Openssl
384                 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
385                 """
386                 if isinstance(purpose,str):
387                         purp_no=X509_PURPOSE_get_by_sname(purpose)
388                         if purp_no <=0:
389                                 raise X509Error("Invalid certificate purpose '"+purpose+"'")
390                 elif isinstance(purpose,int):
391                         purp_no = purpose
392                 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
393                         raise X509Error("cannot set purpose")
394         def setdepth(self,depth):
395                 libcrypto.X509_STORE_set_depth(self.store,depth)
396         def settime(self, time):
397                 """
398                 Set point in time used to check validity of certificates for
399                 """
400                 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
401                         d=int(time.strftime("%s"))
402                 elif isinstance(time,int):
403                         pass
404                 else:
405                         raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
406                 raise NotImplementedError
407 class StackOfX509:
408         """
409         Implements OpenSSL STACK_OF(X509) object.
410         It looks much like python container types
411         """
412         def __init__(self,certs=None,ptr=None,disposable=True):
413                 """
414                 Create stack
415                 @param certs - list of X509 objects. If specified, read-write
416                         stack is created and populated by these certificates
417                 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
418                         some functions
419                 @param disposable - if True, stack created from object, returned
420                                 by function is copy, and can be modified and need to be
421                                 freeid. If false, it is just pointer into another
422                                 structure i.e. CMS_ContentInfo
423                 """
424                 if  ptr is None:
425                         self.need_free = True
426                         self.ptr=libcrypt.sk_new_null()
427                         if certs is not None:
428                                 for crt in certs:
429                                         self.append(crt)
430                 elif not certs is None:
431                                 raise ValueError("cannot handle certs an ptr simultaneously")
432                 else:
433                         self.need_free = disposable
434                         self.ptr=ptr
435         def __len__(self):
436                 return libcrypto.sk_num(self.ptr)
437         def __getitem__(self,index):
438                 if index <0 or index>=len(self):
439                         raise IndexError
440                 p=libcrypto.sk_value(self.ptr,index)
441                 return X509(ptr=libcrypto.X509_dup(p))
442         def __putitem__(self,index,value):
443                 if not self.need_free:
444                         raise ValueError("Stack is read-only")
445                 if index <0 or index>=len(self):
446                         raise IndexError
447                 p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
448                 libcrypto.X509_free(p)
449         def __delitem__(self,index):    
450                 if not self.need_free:
451                         raise ValueError("Stack is read-only")
452                 if index <0 or index>=len(self):
453                         raise IndexError
454                 p=libcrypto.sk_delete(self.ptr,index)
455                 libcrypto.X509_free(p)
456         def __del__(self):
457                 if self.need_free:
458                         libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
459         def append(self,value):
460                 if not self.need_free:
461                         raise ValueError("Stack is read-only")
462                 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
463 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
464 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
465 libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p)
466 libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,)
467 libcrypto.ASN1_INTEGER_get.restype=c_long
468 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
469 libcrypto.X509_get_serialNumber.restype=c_void_p
470 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
471 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
472 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
473 libcrypto.X509_NAME_get_entry.restype=c_void_p
474 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
475 libcrypto.X509_STORE_new.restype=c_void_p
476 libcrypto.X509_STORE_add_lookup.restype=c_void_p
477 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
478 libcrypto.X509_LOOKUP_file.restype=c_void_p
479 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
480 libcrypto.X509_LOOKUP_ctrl.restype=c_int
481 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))
482