]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blob - ctypescrypto/x509.py
Merge branch 'master' of https://github.com/vbwagner/ctypescrypto
[oss/ctypescrypto.git] / ctypescrypto / x509.py
1 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p
2 from ctypescrypto.bio import Membio
3 from ctypescrypto.pkey import PKey
4 from ctypescrypto.oid import Oid
5 from ctypescrypto.exception import LibCryptoError
6 from ctypescrypto import libcrypto
7 class X509Error(LibCryptoError):
8         """
9         Exception, generated when some openssl function fail
10         during X509 operation
11         """
12         pass
13
14
15 class X509Name:
16         """
17         Class which represents X.509 distinguished name - typically 
18         a certificate subject name or an issuer name.
19         """
20         # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
21         PRINT_FLAG=0x10010
22         ESC_MSB=4
23         def __init__(self,ptr=None,copy=False):
24                 """
25                 Creates a X509Name object
26                 @param ptr - pointer to X509_NAME C structure (as returned by some  OpenSSL functions
27                 @param copy - indicates that this structure have to be freed upon object destruction
28                 """
29                 if ptr is not None:
30                         self.ptr=ptr
31                         self.need_free=copy
32                         self.writable=False
33                 else:
34                         self.ptr=libcrypto.X509_NAME_new()
35                         self.need_free=True
36                         self.writable=True
37         def __del__(self):
38                 """
39                 Frees if neccessary
40                 """
41                 if self.need_free:
42                         libcrypto.X509_NAME_free(self.ptr)
43         def __str__(self):
44                 """
45                 Produces an ascii representation of the name, escaping all symbols > 0x80
46                 Probably it is not what you want, unless your native language is English
47                 """
48                 b=Membio()
49                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
50                 return str(b)
51         def __unicode__(self):
52                 """
53                 Produces unicode representation of the name. 
54                 """
55                 b=Membio()
56                 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
57                 return unicode(b)
58         def __len__(self):
59                 """
60                 return number of components in the name
61                 """
62                 return libcrypto.X509_NAME_entry_count(self.ptr)
63         def __cmp__(self,other):
64                 """
65                 Compares X509 names
66                 """
67                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
68         def __eq__(self,other):
69                 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
70
71         def __getitem__(self,key):
72                 if isinstance(key,Oid):
73                         # Return first matching field
74                         idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
75                         if idx<0:
76                                 raise KeyError("Key not found "+repr(Oid))
77                         entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
78                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
79                         b=Membio()
80                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
81                         return unicode(b)
82                 elif isinstance(key,int):
83                         # Return OID, string tuple
84                         entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
85                         if entry is None:
86                                 raise IndexError("name entry index out of range")
87                         obj=libcrypto.X509_NAME_ENTRY_get_object(entry)
88                         nid=libcrypto.OBJ_obj2nid(obj)
89                         if nid==0:
90                                 buf=create_string_buffer(80)
91                                 len=libcrypto.OBJ_obj2txt(buf,80,obj,1)
92                                 oid=Oid(buf[0:len])
93                         else:
94                                 oid=Oid(nid)
95                         s=libcrypto.X509_NAME_ENTRY_get_data(entry)
96                         b=Membio()
97                         libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
98                         return (oid,unicode(b))
99
100         def __setitem__(self,key,val):
101                 if not self.writable:
102                         raise ValueError("Attempt to modify constant X509 object")
103 class X509_extlist:
104         def __init__(self,ptr):
105                 self.ptr=ptr
106         def __del__(self):
107                 libcrypto.X509_NAME_free(self.ptr)
108         def __str__(self):
109                 raise NotImplementedError
110         def __len__(self):
111                 return libcrypto.X509_NAME_entry_count(self.ptr)
112
113         def __getattr__(self,key):
114                 raise NotImplementedError
115         def __setattr__(self,key,val):
116                 raise NotImplementedError
117
118         
119
120
121 class X509:
122         """
123         Represents X.509 certificate. 
124         """
125         def __init__(self,data=None,ptr=None,format="PEM"):
126                 """
127                 Initializes certificate
128                 @param data - serialized certificate in PEM or DER format.
129                 @param ptr - pointer to X509, returned by some openssl function. 
130                         mutually exclusive with data
131                 @param format - specifies data format. "PEM" or "DER", default PEM
132                 """
133                 if ptr is not None:
134                         if data is not None: 
135                                 raise TypeError("Cannot use data and ptr simultaneously")
136                         self.cert = ptr
137                 elif data is None:
138                         raise TypeError("data argument is required")
139                 else:
140                         b=Membio(data)
141                         if format == "PEM":
142                                 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
143                         else:
144                                 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
145                         if self.cert is None:
146                                 raise X509Error("error reading certificate")
147         def __del__(self):
148                 """
149                 Frees certificate object
150                 """
151                 libcrypto.X509_free(self.cert)
152         def __str__(self):
153                 """ Returns der string of the certificate """
154                 b=Membio()
155                 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
156                         raise X509Error("error serializing certificate")
157                 return str(b)
158         def __repr__(self):
159                 """ Returns valid call to the constructor """
160                 return "X509(data="+repr(str(self))+",format='DER')"
161         @property
162         def pubkey(self):
163                 """EVP PKEy object of certificate public key"""
164                 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
165         def verify(self,store=None,chain=[],key=None):  
166                 """ 
167                 Verify self. Supports verification on both X509 store object 
168                 or just public issuer key
169                 @param store X509Store object.
170                 @param chain - list of X509 objects to add into verification
171                         context.These objects are untrusted, but can be used to
172                         build certificate chain up to trusted object in the store
173                 @param key - PKey object
174                 parameters stora and key are mutually exclusive. If neither is specified, attempts to verify
175                 
176                 itself as self-signed certificate
177                 """
178                 if store is not None and key is not None:
179                         raise X509Error("key and store cannot be specified simultaneously")
180                 if store is not None:
181                         ctx=libcrypto.X509_STORE_CTX_new()
182                         if ctx is None:
183                                 raise X509Error("Error allocating X509_STORE_CTX")
184                         if chain is not None and len(chain)>0:
185                                 ch=StackOfX509(chain)
186                         else:
187                                 ch=None
188                         if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
189                                 raise X509Error("Error allocating X509_STORE_CTX")
190                         res= libcrypto.X509_verify_cert(ctx)
191                         libcrypto.X509_STORE_CTX_free(ctx)
192                         return res>0
193                 else:
194                         if key is None:
195                                 if self.issuer != self.subject:
196                                         # Not a self-signed certificate
197                                         return False
198                                 key = self.pubkey
199                         res = libcrypto.X509_verify(self.cert,key.key)
200                         if res < 0:
201                                 raise X509Error("X509_verify failed")
202                         return res>0
203                         
204         @property
205         def subject(self):
206                 """ X509Name for certificate subject name """
207                 return X509Name(libcrypto.X509_get_subject_name(self.cert))
208         @property
209         def issuer(self):
210                 """ X509Name for certificate issuer name """
211                 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
212         @property
213         def serial(self):
214                 """ Serial number of certificate as integer """
215                 asnint=libcrypto.X509_get_serialNumber(self.cert)
216                 b=Membio()
217                 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
218                 return int(str(b),16)
219         @property
220         def startDate(self):
221                 """ Certificate validity period start date """
222                 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore 
223                 raise NotImplementedError
224         @property
225         def endDate(self):
226                 """ Certificate validity period end date """
227                 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
228                 raise NotImplementedError
229         def extensions(self):
230                 """ Returns list of extensions """
231                 raise NotImplementedError
232         def check_ca(self):
233                 """ Returns True if certificate is CA certificate """
234                 return libcrypto.X509_check_ca(self.cert)>0
235 class X509Store:
236         """
237                 Represents trusted certificate store. Can be used to lookup CA certificates to verify
238
239                 @param file - file with several certificates and crls to load into store
240                 @param dir - hashed directory with certificates and crls
241                 @param default - if true, default verify location (directory) is installed
242
243         """
244         def __init__(self,file=None,dir=None,default=False):
245                 """
246                 Creates X509 store and installs lookup method. Optionally initializes 
247                 by certificates from given file or directory.
248                 """
249                 #
250                 # Todo - set verification flags
251                 # 
252                 self.store=libcrypto.X509_STORE_new()
253                 if self.store is None:
254                         raise X509Error("allocating store")
255                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
256                 if lookup is None:
257                         raise X509Error("error installing file lookup method")
258                 if (file is not None):
259                         if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
260                                 raise X509Error("error loading trusted certs from file "+file)
261                 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
262                 if lookup is None:
263                         raise X509Error("error installing hashed lookup method")
264                 if dir is not None:
265                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
266                                 raise X509Error("error adding hashed  trusted certs dir "+dir)
267                 if default:
268                         if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
269                                 raise X509Error("error adding default trusted certs dir ")
270         def add_cert(self,cert):
271                 """
272                 Explicitely adds certificate to set of trusted in the store
273                 @param cert - X509 object to add
274                 """
275                 if not isinstance(cert,X509):
276                         raise TypeError("cert should be X509")
277                 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
278         def add_callback(self,callback):
279                 """
280                 Installs callbac function, which would receive detailed information
281                 about verified ceritificates
282                 """
283                 raise NotImplementedError
284         def setflags(self,flags):
285                 """
286                 Set certificate verification flags.
287                 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
288                 """
289                 libcrypto.X509_STORE_set_flags(self.store,flags)        
290         def setpurpose(self,purpose):
291                 """
292                 Sets certificate purpose which verified certificate should match
293                 @param purpose - number from 1 to 9 or standard strind defined in Openssl
294                 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
295                 """
296                 if isinstance(purpose,str):
297                         purp_no=X509_PURPOSE_get_by_sname(purpose)
298                         if purp_no <=0:
299                                 raise X509Error("Invalid certificate purpose '"+purpose+"'")
300                 elif isinstance(purpose,int):
301                         purp_no = purpose
302                 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
303                         raise X509Error("cannot set purpose")
304         def setdepth(self,depth):
305                 libcrypto.X509_STORE_set_depth(self.store,depth)
306         def settime(self, time):
307                 """
308                 Set point in time used to check validity of certificates for
309                 """
310                 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
311                         d=int(time.strftime("%s"))
312                 elif isinstance(time,int):
313                         pass
314                 else:
315                         raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
316                 raise NotImplementedError
317 class StackOfX509:
318         """
319         Implements OpenSSL STACK_OF(X509) object.
320         It looks much like python container types
321         """
322         def __init__(self,certs=None,ptr=None,disposable=True):
323                 """
324                 Create stack
325                 @param certs - list of X509 objects. If specified, read-write
326                         stack is created and populated by these certificates
327                 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
328                         some functions
329                 @param disposable - if True, stack created from object, returned
330                                 by function is copy, and can be modified and need to be
331                                 freeid. If false, it is just pointer into another
332                                 structure i.e. CMS_ContentInfo
333                 """
334                 if  ptr is None:
335                         self.need_free = True
336                         self.ptr=libcrypt.sk_new_null()
337                         if certs is not None:
338                                 for crt in certs:
339                                         self.append(crt)
340                 elif not certs is None:
341                                 raise ValueError("cannot handle certs an ptr simultaneously")
342                 else:
343                         self.need_free = disposable
344                         self.ptr=ptr
345         def __len__(self):
346                 return libcrypto.sk_num(self.ptr)
347         def __getitem__(self,index):
348                 if index <0 or index>=len(self):
349                         raise IndexError
350                 p=libcrypto.sk_value(self.ptr,index)
351                 return X509(ptr=libcrypto.X509_dup(p))
352         def __putitem__(self,index,value):
353                 if not self.need_free:
354                         raise ValueError("Stack is read-only")
355                 if index <0 or index>=len(self):
356                         raise IndexError
357                 p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
358                 libcrypto.X509_free(p)
359         def __delitem__(self,index):    
360                 if not self.need_free:
361                         raise ValueError("Stack is read-only")
362                 if index <0 or index>=len(self):
363                         raise IndexError
364                 p=libcrypto.sk_delete(self.ptr,index)
365                 libcrypto.X509_free(p)
366         def __del__(self):
367                 if self.need_free:
368                         libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
369         def append(self,value):
370                 if not self.need_free:
371                         raise ValueError("Stack is read-only")
372                 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
373 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
374 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
375 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
376 libcrypto.X509_get_serialNumber.restype=c_void_p
377 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
378 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
379 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
380 libcrypto.X509_NAME_get_entry.restype=c_void_p
381 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
382 libcrypto.X509_STORE_new.restype=c_void_p
383 libcrypto.X509_STORE_add_lookup.restype=c_void_p
384 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
385 libcrypto.X509_LOOKUP_file.restype=c_void_p
386 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
387 libcrypto.X509_LOOKUP_ctrl.restype=c_int
388 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))