1 from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p
2 from ctypescrypto.bio import Membio
3 from ctypescrypto.pkey import PKey
4 from ctypescrypto.oid import Oid
5 from ctypescrypto.exception import LibCryptoError
6 from ctypescrypto import libcrypto
7 class X509Error(LibCryptoError):
9 Exception, generated when some openssl function fail
17 Class which represents X.509 distinguished name - typically
18 a certificate subject name or an issuer name.
20 # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT
23 def __init__(self,ptr=None,copy=False):
25 Creates a X509Name object
26 @param ptr - pointer to X509_NAME C structure (as returned by some OpenSSL functions
27 @param copy - indicates that this structure have to be freed upon object destruction
34 self.ptr=libcrypto.X509_NAME_new()
42 libcrypto.X509_NAME_free(self.ptr)
45 Produces an ascii representation of the name, escaping all symbols > 0x80
46 Probably it is not what you want, unless your native language is English
49 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB)
51 def __unicode__(self):
53 Produces unicode representation of the name.
56 libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG)
60 return number of components in the name
62 return libcrypto.X509_NAME_entry_count(self.ptr)
63 def __cmp__(self,other):
67 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)
68 def __eq__(self,other):
69 return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0
71 def __getitem__(self,key):
72 if isinstance(key,Oid):
73 # Return first matching field
74 idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1)
76 raise KeyError("Key not found "+repr(Oid))
77 entry=libcrypto.X509_NAME_get_entry(self.ptr,idx)
78 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
80 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
82 elif isinstance(key,int):
83 # Return OID, string tuple
84 entry=libcrypto.X509_NAME_get_entry(self.ptr,key)
86 raise IndexError("name entry index out of range")
87 obj=libcrypto.X509_NAME_ENTRY_get_object(entry)
88 nid=libcrypto.OBJ_obj2nid(obj)
90 buf=create_string_buffer(80)
91 len=libcrypto.OBJ_obj2txt(buf,80,obj,1)
95 s=libcrypto.X509_NAME_ENTRY_get_data(entry)
97 libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG)
98 return (oid,unicode(b))
100 def __setitem__(self,key,val):
101 if not self.writable:
102 raise ValueError("Attempt to modify constant X509 object")
104 def __init__(self,ptr):
107 libcrypto.X509_NAME_free(self.ptr)
109 raise NotImplementedError
111 return libcrypto.X509_NAME_entry_count(self.ptr)
113 def __getattr__(self,key):
114 raise NotImplementedError
115 def __setattr__(self,key,val):
116 raise NotImplementedError
123 Represents X.509 certificate.
125 def __init__(self,data=None,ptr=None,format="PEM"):
127 Initializes certificate
128 @param data - serialized certificate in PEM or DER format.
129 @param ptr - pointer to X509, returned by some openssl function.
130 mutually exclusive with data
131 @param format - specifies data format. "PEM" or "DER", default PEM
135 raise TypeError("Cannot use data and ptr simultaneously")
138 raise TypeError("data argument is required")
142 self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None)
144 self.cert=libcrypto.d2i_X509_bio(b.bio,None)
145 if self.cert is None:
146 raise X509Error("error reading certificate")
149 Frees certificate object
151 libcrypto.X509_free(self.cert)
153 """ Returns der string of the certificate """
155 if libcrypto.i2d_X509_bio(b.bio,self.cert)==0:
156 raise X509Error("error serializing certificate")
159 """ Returns valid call to the constructor """
160 return "X509(data="+repr(str(self))+",format='DER')"
163 """EVP PKEy object of certificate public key"""
164 return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False))
165 def verify(self,store=None,chain=[],key=None):
167 Verify self. Supports verification on both X509 store object
168 or just public issuer key
169 @param store X509Store object.
170 @param chain - list of X509 objects to add into verification
171 context.These objects are untrusted, but can be used to
172 build certificate chain up to trusted object in the store
173 @param key - PKey object
174 parameters stora and key are mutually exclusive. If neither is specified, attempts to verify
176 itself as self-signed certificate
178 if store is not None and key is not None:
179 raise X509Error("key and store cannot be specified simultaneously")
180 if store is not None:
181 ctx=libcrypto.X509_STORE_CTX_new()
183 raise X509Error("Error allocating X509_STORE_CTX")
184 if chain is not None and len(chain)>0:
185 ch=StackOfX509(chain)
188 if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0:
189 raise X509Error("Error allocating X509_STORE_CTX")
190 res= libcrypto.X509_verify_cert(ctx)
191 libcrypto.X509_STORE_CTX_free(ctx)
195 if self.issuer != self.subject:
196 # Not a self-signed certificate
199 res = libcrypto.X509_verify(self.cert,key.key)
201 raise X509Error("X509_verify failed")
206 """ X509Name for certificate subject name """
207 return X509Name(libcrypto.X509_get_subject_name(self.cert))
210 """ X509Name for certificate issuer name """
211 return X509Name(libcrypto.X509_get_issuer_name(self.cert))
214 """ Serial number of certificate as integer """
215 asnint=libcrypto.X509_get_serialNumber(self.cert)
217 libcrypto.i2a_ASN1_INTEGER(b.bio,asnint)
218 return int(str(b),16)
221 """ Certificate validity period start date """
222 # Need deep poke into certificate structure (x)->cert_info->validity->notBefore
223 raise NotImplementedError
226 """ Certificate validity period end date """
227 # Need deep poke into certificate structure (x)->cert_info->validity->notAfter
228 raise NotImplementedError
229 def extensions(self):
230 """ Returns list of extensions """
231 raise NotImplementedError
233 """ Returns True if certificate is CA certificate """
234 return libcrypto.X509_check_ca(self.cert)>0
237 Represents trusted certificate store. Can be used to lookup CA certificates to verify
239 @param file - file with several certificates and crls to load into store
240 @param dir - hashed directory with certificates and crls
241 @param default - if true, default verify location (directory) is installed
244 def __init__(self,file=None,dir=None,default=False):
246 Creates X509 store and installs lookup method. Optionally initializes
247 by certificates from given file or directory.
250 # Todo - set verification flags
252 self.store=libcrypto.X509_STORE_new()
253 if self.store is None:
254 raise X509Error("allocating store")
255 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file())
257 raise X509Error("error installing file lookup method")
258 if (file is not None):
259 if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0:
260 raise X509Error("error loading trusted certs from file "+file)
261 lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir())
263 raise X509Error("error installing hashed lookup method")
265 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0:
266 raise X509Error("error adding hashed trusted certs dir "+dir)
268 if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0:
269 raise X509Error("error adding default trusted certs dir ")
270 def add_cert(self,cert):
272 Explicitely adds certificate to set of trusted in the store
273 @param cert - X509 object to add
275 if not isinstance(cert,X509):
276 raise TypeError("cert should be X509")
277 libcrypto.X509_STORE_add_cert(self.store,cert.cert)
278 def add_callback(self,callback):
280 Installs callbac function, which would receive detailed information
281 about verified ceritificates
283 raise NotImplementedError
284 def setflags(self,flags):
286 Set certificate verification flags.
287 @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants
289 libcrypto.X509_STORE_set_flags(self.store,flags)
290 def setpurpose(self,purpose):
292 Sets certificate purpose which verified certificate should match
293 @param purpose - number from 1 to 9 or standard strind defined in Openssl
294 possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper
296 if isinstance(purpose,str):
297 purp_no=X509_PURPOSE_get_by_sname(purpose)
299 raise X509Error("Invalid certificate purpose '"+purpose+"'")
300 elif isinstance(purpose,int):
302 if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0:
303 raise X509Error("cannot set purpose")
304 def setdepth(self,depth):
305 libcrypto.X509_STORE_set_depth(self.store,depth)
306 def settime(self, time):
308 Set point in time used to check validity of certificates for
310 if isinstance(time,datetime.datetime) or isinstance(time,datetime.date):
311 d=int(time.strftime("%s"))
312 elif isinstance(time,int):
315 raise TypeError("datetime.date, datetime.datetime or integer is required as time argument")
316 raise NotImplementedError
319 Implements OpenSSL STACK_OF(X509) object.
320 It looks much like python container types
322 def __init__(self,certs=None,ptr=None,disposable=True):
325 @param certs - list of X509 objects. If specified, read-write
326 stack is created and populated by these certificates
327 @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by
329 @param disposable - if True, stack created from object, returned
330 by function is copy, and can be modified and need to be
331 freeid. If false, it is just pointer into another
332 structure i.e. CMS_ContentInfo
335 self.need_free = True
336 self.ptr=libcrypt.sk_new_null()
337 if certs is not None:
340 elif not certs is None:
341 raise ValueError("cannot handle certs an ptr simultaneously")
343 self.need_free = disposable
346 return libcrypto.sk_num(self.ptr)
347 def __getitem__(self,index):
348 if index <0 or index>=len(self):
350 p=libcrypto.sk_value(self.ptr,index)
351 return X509(ptr=libcrypto.X509_dup(p))
352 def __putitem__(self,index,value):
353 if not self.need_free:
354 raise ValueError("Stack is read-only")
355 if index <0 or index>=len(self):
357 p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert))
358 libcrypto.X509_free(p)
359 def __delitem__(self,index):
360 if not self.need_free:
361 raise ValueError("Stack is read-only")
362 if index <0 or index>=len(self):
364 p=libcrypto.sk_delete(self.ptr,index)
365 libcrypto.X509_free(p)
368 libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free)
369 def append(self,value):
370 if not self.need_free:
371 raise ValueError("Stack is read-only")
372 libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert))
373 libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p)
374 libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long)
375 libcrypto.X509_get_serialNumber.argtypes=(c_void_p,)
376 libcrypto.X509_get_serialNumber.restype=c_void_p
377 libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p
378 libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,)
379 libcrypto.OBJ_obj2nid.argtypes=(c_void_p,)
380 libcrypto.X509_NAME_get_entry.restype=c_void_p
381 libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int)
382 libcrypto.X509_STORE_new.restype=c_void_p
383 libcrypto.X509_STORE_add_lookup.restype=c_void_p
384 libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p)
385 libcrypto.X509_LOOKUP_file.restype=c_void_p
386 libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p
387 libcrypto.X509_LOOKUP_ctrl.restype=c_int
388 libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p))