]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blob - ctypescrypto/x509.py
I've discovered Python's __all__ variable and make use of it in all modles
[oss/ctypescrypto.git] / ctypescrypto / x509.py
1 """
2 Implements interface to openssl X509 and X509Store structures, 
3 I.e allows to load, analyze and verify certificates.
4
5 X509Store objects are also used to verify other signed documets,
6 such as CMS, OCSP and timestamps.
7 """
8
9
10
11 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p
12 from ctypescrypto.bio import Membio
13 from ctypescrypto.pkey import PKey
14 from ctypescrypto.oid import Oid
15 from ctypescrypto.exception import LibCryptoError
16 from ctypescrypto import libcrypto
17
18 __all__ = ['X509Error','X509Name','X509Store','StackOfX509']
19 # X509_extlist is not exported yet, because is not implemented 
20 class X509Error(LibCryptoError):
21         """
22         Exception, generated when some openssl function fail
23         during X509 operation
24         """
25         pass
26
27
28 class X509Name:
29         """
30         Class which represents X.509 distinguished name - typically 
31         a certificate subject name or an issuer name.
32
33         Now used only to represent information, extracted from the
34         certificate. Potentially can be also used to build DN when creating
35         certificate signing request
36         """
37         # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
38         PRINT_FLAG=0x10010
39         ESC_MSB=4
40         def __init__(self,ptr=None,copy=False):
41                 """
42                 Creates a X509Name object
43                 @param ptr - pointer to X509_NAME C structure (as returned by some  OpenSSL functions
44                 @param copy - indicates that this structure have to be freed upon object destruction
45                 """
46                 if ptr is not None:
47                         self.ptr=ptr
48                         self.need_free=copy
49                         self.writable=False
50                 else:
51                         self.ptr=libcrypto.X509_NAME_new()
52                         self.need_free=True
53                         self.writable=True
54         def __del__(self):
55                 """
56                 Frees if neccessary
57                 """
58                 if self.need_free:
59                         libcrypto.X509_NAME_free(self.ptr)
60         def __str__(self):
61                 """
62                 Produces an ascii representation of the name, escaping all symbols > 0x80
63                 Probably it is not what you want, unless your native language is English
64                 """
65                 b=Membio()
66                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
67                 return str(b)
68         def __unicode__(self):
69                 """
70                 Produces unicode representation of the name. 
71                 """
72                 b=Membio()
73                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
74                 return unicode(b)
75         def __len__(self):
76                 """
77                 return number of components in the name
78                 """
79                 return libcrypto.X509_NAME_entry_count(self.ptr)
80         def __cmp__(self,other):
81                 """
82                 Compares X509 names
83                 """
84                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
85         def __eq__(self,other):
86                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
87
88         def __getitem__(self,key):
89                 if isinstance(key,Oid):
90                         # Return first matching field
91                         idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
92                         if idx<0:
93                                 raise KeyError("Key not found "+repr(Oid))
94                         entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
95                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
96                         b=Membio()
97                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
98                         return unicode(b)
99                 elif isinstance(key,int):
100                         # Return OID, string tuple
101                         entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
102                         if entry is None:
103                                 raise IndexError("name entry index out of range")
104                         obj=libcrypto.X509_NAME_ENTRY_get_object(entry)
105                         nid=libcrypto.OBJ_obj2nid(obj)
106                         if nid==0:
107                                 buf=create_string_buffer(80)
108                                 len=libcrypto.OBJ_obj2txt(buf,80,obj,1)
109                                 oid=Oid(buf[0:len])
110                         else:
111                                 oid=Oid(nid)
112                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
113                         b=Membio()
114                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
115                         return (oid,unicode(b))
116
117         def __setitem__(self,key,val):
118                 if not self.writable:
119                         raise ValueError("Attempt to modify constant X509 object")
120 class X509_extlist:
121         def __init__(self,ptr):
122                 self.ptr=ptr
123         def __del__(self):
124                 libcrypto.X509_NAME_free(self.ptr)
125         def __str__(self):
126                 raise NotImplementedError
127         def __len__(self):
128                 return libcrypto.X509_NAME_entry_count(self.ptr)
129
130         def __getattr__(self,key):
131                 raise NotImplementedError
132         def __setattr__(self,key,val):
133                 raise NotImplementedError
134
135         
136
137
138 class X509:
139         """
140         Represents X.509 certificate. 
141         """
142         def __init__(self,data=None,ptr=None,format="PEM"):
143                 """
144                 Initializes certificate
145                 @param data - serialized certificate in PEM or DER format.
146                 @param ptr - pointer to X509, returned by some openssl function. 
147                         mutually exclusive with data
148                 @param format - specifies data format. "PEM" or "DER", default PEM
149                 """
150                 if ptr is not None:
151                         if data is not None: 
152                                 raise TypeError("Cannot use data and ptr simultaneously")
153                         self.cert = ptr
154                 elif data is None:
155                         raise TypeError("data argument is required")
156                 else:
157                         b=Membio(data)
158                         if format == "PEM":
159                                 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
160                         else:
161                                 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
162                         if self.cert is None:
163                                 raise X509Error("error reading certificate")
164         def __del__(self):
165                 """
166                 Frees certificate object
167                 """
168                 libcrypto.X509_free(self.cert)
169         def __str__(self):
170                 """ Returns der string of the certificate """
171                 b=Membio()
172                 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
173                         raise X509Error("error serializing certificate")
174                 return str(b)
175         def __repr__(self):
176                 """ Returns valid call to the constructor """
177                 return "X509(data="+repr(str(self))+",format='DER')"
178         @property
179         def pubkey(self):
180                 """EVP PKEy object of certificate public key"""
181                 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
182         def verify(self,store=None,chain=[],key=None):  
183                 """ 
184                 Verify self. Supports verification on both X509 store object 
185                 or just public issuer key
186                 @param store X509Store object.
187                 @param chain - list of X509 objects to add into verification
188                         context.These objects are untrusted, but can be used to
189                         build certificate chain up to trusted object in the store
190                 @param key - PKey object with open key to validate signature
191                 
192                 parameters store and key are mutually exclusive. If neither 
193                 is specified, attempts to verify self as self-signed certificate
194                 """
195                 if store is not None and key is not None:
196                         raise X509Error("key and store cannot be specified simultaneously")
197                 if store is not None:
198                         ctx=libcrypto.X509_STORE_CTX_new()
199                         if ctx is None:
200                                 raise X509Error("Error allocating X509_STORE_CTX")
201                         if chain is not None and len(chain)>0:
202                                 ch=StackOfX509(chain)
203                         else:
204                                 ch=None
205                         if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
206                                 raise X509Error("Error allocating X509_STORE_CTX")
207                         res= libcrypto.X509_verify_cert(ctx)
208                         libcrypto.X509_STORE_CTX_free(ctx)
209                         return res>0
210                 else:
211                         if key is None:
212                                 if self.issuer != self.subject:
213                                         # Not a self-signed certificate
214                                         return False
215                                 key = self.pubkey
216                         res = libcrypto.X509_verify(self.cert,key.key)
217                         if res < 0:
218                                 raise X509Error("X509_verify failed")
219                         return res>0
220                         
221         @property
222         def subject(self):
223                 """ X509Name for certificate subject name """
224                 return X509Name(libcrypto.X509_get_subject_name(self.cert))
225         @property
226         def issuer(self):
227                 """ X509Name for certificate issuer name """
228                 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
229         @property
230         def serial(self):
231                 """ Serial number of certificate as integer """
232                 asnint=libcrypto.X509_get_serialNumber(self.cert)
233                 b=Membio()
234                 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
235                 return int(str(b),16)
236         @property
237         def startDate(self):
238                 """ Certificate validity period start date """
239                 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
240                 raise NotImplementedError
241         @property
242         def endDate(self):
243                 """ Certificate validity period end date """
244                 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
245                 raise NotImplementedError
246         def extensions(self):
247                 """ Returns list of extensions """
248                 raise NotImplementedError
249         def check_ca(self):
250                 """ Returns True if certificate is CA certificate """
251                 return libcrypto.X509_check_ca(self.cert)>0
252 class X509Store:
253         """
254                 Represents trusted certificate store. Can be used to lookup CA 
255                 certificates to verify
256
257                 @param file - file with several certificates and crls 
258                                 to load into store
259                 @param dir - hashed directory with certificates and crls
260                 @param default - if true, default verify location (directory) 
261                         is installed
262
263         """
264         def __init__(self,file=None,dir=None,default=False):
265                 """
266                 Creates X509 store and installs lookup method. Optionally initializes 
267                 by certificates from given file or directory.
268                 """
269                 #
270                 # Todo - set verification flags
271                 # 
272                 self.store=libcrypto.X509_STORE_new()
273                 if self.store is None:
274                         raise X509Error("allocating store")
275                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
276                 if lookup is None:
277                         raise X509Error("error installing file lookup method")
278                 if (file is not None):
279                         if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
280                                 raise X509Error("error loading trusted certs from file "+file)
281                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
282                 if lookup is None:
283                         raise X509Error("error installing hashed lookup method")
284                 if dir is not None:
285                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
286                                 raise X509Error("error adding hashed  trusted certs dir "+dir)
287                 if default:
288                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
289                                 raise X509Error("error adding default trusted certs dir ")
290         def add_cert(self,cert):
291                 """
292                 Explicitely adds certificate to set of trusted in the store
293                 @param cert - X509 object to add
294                 """
295                 if not isinstance(cert,X509):
296                         raise TypeError("cert should be X509")
297                 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
298         def add_callback(self,callback):
299                 """
300                 Installs callbac function, which would receive detailed information
301                 about verified ceritificates
302                 """
303                 raise NotImplementedError
304         def setflags(self,flags):
305                 """
306                 Set certificate verification flags.
307                 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
308                 """
309                 libcrypto.X509_STORE_set_flags(self.store,flags)        
310         def setpurpose(self,purpose):
311                 """
312                 Sets certificate purpose which verified certificate should match
313                 @param purpose - number from 1 to 9 or standard strind defined in Openssl
314                 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
315                 """
316                 if isinstance(purpose,str):
317                         purp_no=X509_PURPOSE_get_by_sname(purpose)
318                         if purp_no <=0:
319                                 raise X509Error("Invalid certificate purpose '"+purpose+"'")
320                 elif isinstance(purpose,int):
321                         purp_no = purpose
322                 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
323                         raise X509Error("cannot set purpose")
324         def setdepth(self,depth):
325                 libcrypto.X509_STORE_set_depth(self.store,depth)
326         def settime(self, time):
327                 """
328                 Set point in time used to check validity of certificates for
329                 """
330                 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
331                         d=int(time.strftime("%s"))
332                 elif isinstance(time,int):
333                         pass
334                 else:
335                         raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
336                 raise NotImplementedError
337 class StackOfX509:
338         """
339         Implements OpenSSL STACK_OF(X509) object.
340         It looks much like python container types
341         """
342         def __init__(self,certs=None,ptr=None,disposable=True):
343                 """
344                 Create stack
345                 @param certs - list of X509 objects. If specified, read-write
346                         stack is created and populated by these certificates
347                 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
348                         some functions
349                 @param disposable - if True, stack created from object, returned
350                                 by function is copy, and can be modified and need to be
351                                 freeid. If false, it is just pointer into another
352                                 structure i.e. CMS_ContentInfo
353                 """
354                 if  ptr is None:
355                         self.need_free = True
356                         self.ptr=libcrypt.sk_new_null()
357                         if certs is not None:
358                                 for crt in certs:
359                                         self.append(crt)
360                 elif not certs is None:
361                                 raise ValueError("cannot handle certs an ptr simultaneously")
362                 else:
363                         self.need_free = disposable
364                         self.ptr=ptr
365         def __len__(self):
366                 return libcrypto.sk_num(self.ptr)
367         def __getitem__(self,index):
368                 if index <0 or index>=len(self):
369                         raise IndexError
370                 p=libcrypto.sk_value(self.ptr,index)
371                 return X509(ptr=libcrypto.X509_dup(p))
372         def __putitem__(self,index,value):
373                 if not self.need_free:
374                         raise ValueError("Stack is read-only")
375                 if index <0 or index>=len(self):
376                         raise IndexError
377                 p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
378                 libcrypto.X509_free(p)
379         def __delitem__(self,index):    
380                 if not self.need_free:
381                         raise ValueError("Stack is read-only")
382                 if index <0 or index>=len(self):
383                         raise IndexError
384                 p=libcrypto.sk_delete(self.ptr,index)
385                 libcrypto.X509_free(p)
386         def __del__(self):
387                 if self.need_free:
388                         libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
389         def append(self,value):
390                 if not self.need_free:
391                         raise ValueError("Stack is read-only")
392                 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
393 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
394 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
395 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
396 libcrypto.X509_get_serialNumber.restype=c_void_p
397 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
398 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
399 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
400 libcrypto.X509_NAME_get_entry.restype=c_void_p
401 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
402 libcrypto.X509_STORE_new.restype=c_void_p
403 libcrypto.X509_STORE_add_lookup.restype=c_void_p
404 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
405 libcrypto.X509_LOOKUP_file.restype=c_void_p
406 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
407 libcrypto.X509_LOOKUP_ctrl.restype=c_int
408 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))