From 287e8a5b1d7f5a8129619f733adbfc8b2de3e4c7 Mon Sep 17 00:00:00 2001 From: Victor Wagner Date: Sun, 28 Jun 2015 00:34:15 +0300 Subject: [PATCH] Fixed most pylint warning. Incompatibile interface changes: DigestType methods size, block_size and name become properties --- ctypescrypto/__init__.py | 4 +- ctypescrypto/bio.py | 125 +++++---- ctypescrypto/cipher.py | 213 ++++++++------ ctypescrypto/cms.py | 275 ++++++++++-------- ctypescrypto/digest.py | 163 ++++++----- ctypescrypto/ec.py | 81 +++--- ctypescrypto/engine.py | 41 +-- ctypescrypto/exception.py | 51 ++-- ctypescrypto/mac.py | 4 +- ctypescrypto/oid.py | 153 +++++----- ctypescrypto/pbkdf2.py | 39 +-- ctypescrypto/pkey.py | 431 +++++++++++++++------------- ctypescrypto/rand.py | 57 ++-- ctypescrypto/x509.py | 571 +++++++++++++++++++++----------------- setup.py | 2 +- tests/testdigest.py | 48 ++-- 16 files changed, 1257 insertions(+), 1001 deletions(-) diff --git a/ctypescrypto/__init__.py b/ctypescrypto/__init__.py index 1963188..ba1329e 100644 --- a/ctypescrypto/__init__.py +++ b/ctypescrypto/__init__.py @@ -4,7 +4,7 @@ """ -from ctypes import CDLL,c_char_p +from ctypes import CDLL, c_char_p def config(filename=None): """ @@ -16,5 +16,5 @@ def config(filename=None): __all__ = ['config'] libcrypto = CDLL("libcrypto.so.1.0.0") -libcrypto.OPENSSL_config.argtypes=(c_char_p,) +libcrypto.OPENSSL_config.argtypes = (c_char_p, ) libcrypto.OPENSSL_add_all_algorithms_conf() diff --git a/ctypescrypto/bio.py b/ctypescrypto/bio.py index 2c54212..5040d0c 100644 --- a/ctypescrypto/bio.py +++ b/ctypescrypto/bio.py @@ -2,97 +2,112 @@ Interface to OpenSSL BIO library """ from ctypescrypto import libcrypto -from ctypes import c_char_p, c_void_p, c_int, string_at, c_long,POINTER,byref, create_string_buffer +from ctypes import c_char_p, c_void_p, c_int, string_at, c_long +from ctypes import POINTER, byref, create_string_buffer class Membio(object): - """ - Provides interface to OpenSSL memory bios - use str() or unicode() to get contents of writable bio - use bio member to pass to libcrypto function """ - def __init__(self,data=None): - """ If data is specified, creates read-only BIO. If data is - None, creates writable BIO, contents of which can be retrieved by str() or unicode() + Provides interface to OpenSSL memory bios + use str() or unicode() to get contents of writable bio + use bio member to pass to libcrypto function + """ + def __init__(self, data=None): + """ + If data is specified, creates read-only BIO. If data is + None, creates writable BIO, contents of which can be retrieved + by str() or unicode() """ if data is None: - method=libcrypto.BIO_s_mem() - self.bio=libcrypto.BIO_new(method) + method = libcrypto.BIO_s_mem() + self.bio = libcrypto.BIO_new(method) else: - self.bio=libcrypto.BIO_new_mem_buf(c_char_p(data),len(data)) + self.bio = libcrypto.BIO_new_mem_buf(c_char_p(data), len(data)) + def __del__(self): """ Cleans up memory used by bio """ libcrypto.BIO_free(self.bio) - del(self.bio) + del self.bio + def __str__(self): """ Returns current contents of buffer as byte string """ - p=c_char_p(None) - l=libcrypto.BIO_ctrl(self.bio,3,0,byref(p)) - return string_at(p,l) + string_ptr = c_char_p(None) + string_len = libcrypto.BIO_ctrl(self.bio, 3, 0, byref(string_ptr)) + return string_at(string_ptr, string_len) + def __unicode__(self): """ - Attempts to interpret current contents of buffer as UTF-8 string and convert it to unicode + Attempts to interpret current contents of buffer as UTF-8 string + and convert it to unicode """ return str(self).decode("utf-8") - def read(self,length=None): + + def read(self, length=None): """ Reads data from readble BIO. For test purposes. - @param length - if specifed, limits amount of data read. If not BIO is read until end of buffer + @param length - if specifed, limits amount of data read. + If not BIO is read until end of buffer """ if not length is None: - if not isinstance(length,(int,long)): + if not isinstance(length, (int, long)): raise TypeError("length to read should be number") - buf=create_string_buffer(length) - readbytes=libcrypto.BIO_read(self.bio,buf,length) - if readbytes==-2: - raise NotImplementedError("Function is not supported by this BIO") - if readbytes==-1: + buf = create_string_buffer(length) + readbytes = libcrypto.BIO_read(self.bio, buf, length) + if readbytes == -2: + raise NotImplementedError("Function is not supported by" + + "this BIO") + if readbytes == -1: raise IOError - if readbytes==0: + if readbytes == 0: return "" return buf.raw[:readbytes] else: - buf=create_string_buffer(1024) - out="" - r=1 - while r>0: - r=libcrypto.BIO_read(self.bio,buf,1024) - if r==-2: - raise NotImplementedError("Function is not supported by this BIO") - if r==-1: + buf = create_string_buffer(1024) + out = "" + readbytes = 1 + while readbytes > 0: + readbytes = libcrypto.BIO_read(self.bio, buf, 1024) + if readbytes == -2: + raise NotImplementedError("Function is not supported by " + + "this BIO") + if readbytes == -1: raise IOError - if (r>0): - out+=buf.raw[:r] - return out + if readbytes > 0: + out += buf.raw[:readbytes] + return out - def write(self,data): + def write(self, data): """ Writes data to writable bio. For test purposes """ - if isinstance(data,unicode): - data=data.encode("utf-8") - r=libcrypto.BIO_write(self.bio,data,len(data)) - if r==-2: + if isinstance(data, unicode): + data = data.encode("utf-8") + else: + data = str(data) + + written = libcrypto.BIO_write(self.bio, data, len(data)) + if written == -2: raise NotImplementedError("Function not supported by this BIO") - if r + Return cipher flags. Low three bits of the flags encode + cipher mode (see mode). Higher bits is combinatuon of + EVP_CIPH* constants defined in the """ return libcrypto.EVP_CIPHER_flags(self.cipher) + def mode(self): """ - Returns cipher mode as string constant like CBC, OFB etc. + Returns cipher mode as string constant like CBC, OFB etc. """ return CIPHER_MODES[self.flags() & 0x7] + def algo(self): """ - Return cipher's algorithm name, derived from OID + Return cipher's algorithm name, derived from OID """ - return self.oid().shortname() + return self.oid().shortname() + def oid(self): """ - Returns ASN.1 object identifier of the cipher as - ctypescrypto.oid.Oid object + Returns ASN.1 object identifier of the cipher as + ctypescrypto.oid.Oid object """ return Oid(libcrypto.EVP_CIPHER_nid(self.cipher)) -class Cipher: +class Cipher(object): """ - Performs actual encrypton decryption - Note that object keeps some internal state. - To obtain full ciphertext (or plaintext during decihpering) - user should concatenate results of all calls of update with - result of finish + Performs actual encrypton decryption + Note that object keeps some internal state. + To obtain full ciphertext (or plaintext during decihpering) + user should concatenate results of all calls of update with + result of finish """ - def __init__(self, cipher_type, key, iv, encrypt=True): + def __init__(self, cipher_type, key, iv, encrypt=True): """ - Initializing cipher instance. + Initializing cipher instance. - @param cipher_type - CipherType object - @param key = binary string representing the key - @param iv - binary string representing initializtion vector - @param encrypt - if True(default) we ere encrypting. - Otherwise decrypting + @param cipher_type - CipherType object + @param key = binary string representing the key + @param iv - binary string representing initializtion vector + @param encrypt - if True(default) we ere encrypting. + Otherwise decrypting """ self._clean_ctx() @@ -117,23 +131,31 @@ class Cipher: if self.ctx == 0: raise CipherError("Unable to create cipher context") self.encrypt = encrypt - enc=1 if encrypt else 0 + enc = 1 if encrypt else 0 if not iv is None and len(iv) != cipher_type.iv_length(): raise ValueError("Invalid IV length for this algorithm") - + if len(key) != cipher_type.key_length(): if (cipher_type.flags() & 8) != 0: # Variable key length cipher. - result = libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, None, None, c_int(enc)) - result = libcrypto.EVP_CIPHER_CTX_set_key_length(self.ctx,len(key)) + result = libcrypto.EVP_CipherInit_ex(self.ctx, + cipher_type.cipher, + None, None, None, + c_int(enc)) + result = libcrypto.EVP_CIPHER_CTX_set_key_length(self.ctx, + len(key)) if result == 0: self._clean_ctx() raise CipherError("Unable to set key length") - result = libcrypto.EVP_CipherInit_ex(self.ctx, None, None, key_ptr, iv_ptr, c_int(enc)) + result = libcrypto.EVP_CipherInit_ex(self.ctx, None, None, + key_ptr, iv_ptr, + c_int(enc)) else: raise ValueError("Invalid key length for this algorithm") else: - result = libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, key_ptr, iv_ptr, c_int(enc)) + result = libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, + None, key_ptr, iv_ptr, + c_int(enc)) if result == 0: self._clean_ctx() raise CipherError("Unable to initialize cipher") @@ -142,69 +164,74 @@ class Cipher: self.cipher_finalized = False def __del__(self): + """ + We define _clean_ctx() to do all the cleanup + """ self._clean_ctx() def padding(self, padding=True): """ - Sets padding mode of the cipher + Sets padding mode of the cipher """ - padding_flag=1 if padding else 0 + padding_flag = 1 if padding else 0 libcrypto.EVP_CIPHER_CTX_set_padding(self.ctx, padding_flag) def update(self, data): """ - Performs actual encrypton/decrypion + Performs actual encrypton/decrypion - @param data - part of the plain text/ciphertext to process - @returns - part of ciphercext/plain text + @param data - part of the plain text/ciphertext to process + @returns - part of ciphercext/plain text - Passd chunk of text doeesn't need to contain full ciher - blocks. If neccessery, part of passed data would be kept - internally until next data would be received or finish - called + Passed chunk of text doesn't need to contain full ciher + blocks. If neccessery, part of passed data would be kept + internally until next data would be received or finish + called """ - if self.cipher_finalized : + if self.cipher_finalized: raise CipherError("No updates allowed") - if not isinstance(data,str): + if not isinstance(data, str): raise TypeError("A string is expected") if len(data) == 0: return "" - outbuf=create_string_buffer(self.block_size+len(data)) - outlen=c_int(0) - ret=libcrypto.EVP_CipherUpdate(self.ctx,outbuf,byref(outlen), - data,len(data)) - if ret <=0: + outbuf = create_string_buffer(self.block_size+len(data)) + outlen = c_int(0) + ret = libcrypto.EVP_CipherUpdate(self.ctx, outbuf, byref(outlen), + data, len(data)) + if ret <= 0: self._clean_ctx() - self.cipher_finalized=True - del self.ctx + self.cipher_finalized = True raise CipherError("problem processing data") - return outbuf.raw[:outlen.value] - + return outbuf.raw[:int(outlen.value)] + def finish(self): """ - Finalizes processing. If some data are kept in the internal - state, they would be processed and returned. + Finalizes processing. If some data are kept in the internal + state, they would be processed and returned. """ - if self.cipher_finalized : + if self.cipher_finalized: raise CipherError("Cipher operation is already completed") - outbuf=create_string_buffer(self.block_size) + outbuf = create_string_buffer(self.block_size) self.cipher_finalized = True - outlen=c_int(0) - result = libcrypto.EVP_CipherFinal_ex(self.ctx,outbuf , byref(outlen)) + outlen = c_int(0) + result = libcrypto.EVP_CipherFinal_ex(self.ctx, outbuf, byref(outlen)) if result == 0: self._clean_ctx() raise CipherError("Unable to finalize cipher") - if outlen.value>0: - return outbuf.raw[:outlen.value] + if outlen.value > 0: + return outbuf.raw[:int(outlen.value)] else: return "" - + def _clean_ctx(self): + """ + Cleans up cipher ctx and deallocates it + """ try: if self.ctx is not None: libcrypto.EVP_CIPHER_CTX_cleanup(self.ctx) libcrypto.EVP_CIPHER_CTX_free(self.ctx) - del(self.ctx) + del self.ctx except AttributeError: pass self.cipher_finalized = True @@ -213,18 +240,20 @@ class Cipher: # # Used C function block_size # -libcrypto.EVP_CIPHER_block_size.argtypes=(c_void_p,) -libcrypto.EVP_CIPHER_CTX_cleanup.argtypes=(c_void_p,) -libcrypto.EVP_CIPHER_CTX_free.argtypes=(c_void_p,) -libcrypto.EVP_CIPHER_CTX_new.restype=c_void_p -libcrypto.EVP_CIPHER_CTX_set_padding.argtypes=(c_void_p,c_int) -libcrypto.EVP_CipherFinal_ex.argtypes=(c_void_p,c_char_p,POINTER(c_int)) -libcrypto.EVP_CIPHER_flags.argtypes=(c_void_p,) -libcrypto.EVP_CipherInit_ex.argtypes=(c_void_p,c_void_p,c_void_p,c_char_p,c_char_p,c_int) -libcrypto.EVP_CIPHER_iv_length.argtypes=(c_void_p,) -libcrypto.EVP_CIPHER_key_length.argtypes=(c_void_p,) -libcrypto.EVP_CIPHER_nid.argtypes=(c_void_p,) -libcrypto.EVP_CipherUpdate.argtypes=(c_void_p,c_char_p,POINTER(c_int),c_char_p,c_int) -libcrypto.EVP_get_cipherbyname.restype=c_void_p -libcrypto.EVP_get_cipherbyname.argtypes=(c_char_p,) -libcrypto.EVP_CIPHER_CTX_set_key_length.argtypes=(c_void_p,c_int) +libcrypto.EVP_CIPHER_block_size.argtypes = (c_void_p, ) +libcrypto.EVP_CIPHER_CTX_cleanup.argtypes = (c_void_p, ) +libcrypto.EVP_CIPHER_CTX_free.argtypes = (c_void_p, ) +libcrypto.EVP_CIPHER_CTX_new.restype = c_void_p +libcrypto.EVP_CIPHER_CTX_set_padding.argtypes = (c_void_p, c_int) +libcrypto.EVP_CipherFinal_ex.argtypes = (c_void_p, c_char_p, POINTER(c_int)) +libcrypto.EVP_CIPHER_flags.argtypes = (c_void_p, ) +libcrypto.EVP_CipherInit_ex.argtypes = (c_void_p, c_void_p, c_void_p, c_char_p, + c_char_p, c_int) +libcrypto.EVP_CIPHER_iv_length.argtypes = (c_void_p, ) +libcrypto.EVP_CIPHER_key_length.argtypes = (c_void_p, ) +libcrypto.EVP_CIPHER_nid.argtypes = (c_void_p, ) +libcrypto.EVP_CipherUpdate.argtypes = (c_void_p, c_char_p, POINTER(c_int), + c_char_p, c_int) +libcrypto.EVP_get_cipherbyname.restype = c_void_p +libcrypto.EVP_get_cipherbyname.argtypes = (c_char_p, ) +libcrypto.EVP_CIPHER_CTX_set_key_length.argtypes = (c_void_p, c_int) diff --git a/ctypescrypto/cms.py b/ctypescrypto/cms.py index 68f9c3f..bc27e9e 100644 --- a/ctypescrypto/cms.py +++ b/ctypescrypto/cms.py @@ -10,11 +10,12 @@ create it from raw data and neccessary certificates. """ -from ctypes import c_int, c_void_p, c_char_p, c_int +from ctypes import c_int, c_void_p, c_char_p, c_int, c_uint, c_size_t, POINTER from ctypescrypto.exception import LibCryptoError from ctypescrypto import libcrypto from ctypescrypto.bio import Membio from ctypescrypto.oid import Oid +from ctypescrypto.x509 import StackOfX509 class CMSError(LibCryptoError): """ @@ -24,108 +25,112 @@ class CMSError(LibCryptoError): class Flags: """ - Constants for flags passed to the CMS methods. + Constants for flags passed to the CMS methods. Can be OR-ed together """ - TEXT=1 - NOCERTS=2 - NO_CONTENT_VERIFY=4 - NO_ATTR_VERIFY=8 - NO_SIGS=NO_CONTENT_VERIFY|NO_ATTR_VERIFY - NOINTERN=0x10 - NO_SIGNER_CERT_VERIFY=0x20 - NO_VERIFY=0x20 - DETACHED=0x40 - BINARY=0x80 - NOATTR=0x100 - NOSMIMECAP =0x200 - NOOLDMIMETYPE=0x400 - CRLFEOL=0x800 - STREAM=0x1000 - NOCRL=0x2000 - PARTIAL=0x4000 - REUSE_DIGEST=0x8000 - USE_KEYID=0x10000 - DEBUG_DECRYPT=0x20000 + TEXT = 1 + NOCERTS = 2 + NO_CONTENT_VERIFY = 4 + NO_ATTR_VERIFY = 8 + NO_SIGS = NO_CONTENT_VERIFY | NO_ATTR_VERIFY + NOINTERN = 0x10 + NO_SIGNER_CERT_VERIFY = 0x20 + NO_VERIFY = 0x20 + DETACHED = 0x40 + BINARY = 0x80 + NOATTR = 0x100 + NOSMIMECAP = 0x200 + NOOLDMIMETYPE = 0x400 + CRLFEOL = 0x800 + STREAM = 0x1000 + NOCRL = 0x2000 + PARTIAL = 0x4000 + REUSE_DIGEST = 0x8000 + USE_KEYID = 0x10000 + DEBUG_DECRYPT = 0x20000 -def CMS(data,format="PEM"): +def CMS(data, format="PEM"): """ Parses CMS data and returns either SignedData or EnvelopedData object """ - b=Membio(data) + bio = Membio(data) if format == "PEM": - ptr=libcrypto.PEM_read_bio_CMS(b.bio,None,None,None) + ptr = libcrypto.PEM_read_bio_CMS(bio.bio, None, None, None) else: - ptr=libcrypto.d2i_CMS_bio(b.bio,None) + ptr = libcrypto.d2i_CMS_bio(bio.bio, None) typeoid = Oid(libcrypto.OBJ_obj2nid(libcrypto.CMS_get0_type(ptr))) - if typeoid.shortname()=="pkcs7-signedData": + if typeoid.shortname() == "pkcs7-signedData": return SignedData(ptr) - elif typeoid.shortname()=="pkcs7-envelopedData": + elif typeoid.shortname() == "pkcs7-envelopedData": return EnvelopedData(ptr) - elif typeoid.shortname()=="pkcs7-encryptedData": + elif typeoid.shortname() == "pkcs7-encryptedData": return EncryptedData(ptr) else: raise NotImplementedError("cannot handle "+typeoid.shortname()) -class CMSBase(object): +class CMSBase(object): """ Common ancessor for all CMS types. Implements serializatio/deserialization """ - def __init__(self,ptr=None): - self.ptr=ptr + def __init__(self, ptr=None): + self.ptr = ptr def __str__(self): """ Serialize in DER format """ - b=Membio() - if not libcrypto.i2d_CMS_bio(b.bio,self.ptr): + bio = Membio() + if not libcrypto.i2d_CMS_bio(bio.bio, self.ptr): raise CMSError("writing CMS to PEM") - return str(b) + return str(bio) def pem(self): """ Serialize in PEM format """ - b=Membio() - if not libcrypto.PEM_write_bio_CMS(b.bio,self.ptr): + bio = Membio() + if not libcrypto.PEM_write_bio_CMS(bio.bio, self.ptr): raise CMSError("writing CMS to PEM") - return str(b) - - - + return str(bio) + + +#pylint: disable=R0921 class SignedData(CMSBase): + """ + Represents signed message (signeddata CMS type) + """ @staticmethod - def create(data,cert,pkey,flags=Flags.BINARY,certs=[]): + def create(data, cert, pkey, flags=Flags.BINARY, certs=None): """ Creates SignedData message by signing data with pkey and certificate. @param data - data to sign + @param cert - signer's certificate @param pkey - pkey object with private key to sign @param flags - OReed combination of Flags constants @param certs - list of X509 objects to include into CMS """ if not pkey.cansign: raise ValueError("Specified keypair has no private part") - if cert.pubkey!=pkey: + if cert.pubkey != pkey: raise ValueError("Certificate doesn't match public key") - b=Membio(data) - if certs is not None and len(certs)>0: - certstack=StackOfX509(certs) + bio = Membio(data) + if certs is not None and len(certs) > 0: + certstack = StackOfX509(certs) else: - certstack=None - ptr=libcrypto.CMS_sign(cert.cert,pkey.ptr,certstack,b.bio,flags) + certstack = None + ptr = libcrypto.CMS_sign(cert.cert, pkey.ptr, certstack, bio.bio, flags) if ptr is None: raise CMSError("signing message") return SignedData(ptr) - def sign(self,cert,pkey,md=None,data=None,flags=Flags.BINARY): + def sign(self, cert, pkey, digest_type=None, data=None, flags=Flags.BINARY): """ Adds another signer to already signed message @param cert - signer's certificate @param pkey - signer's private key - @param md - message digest to use as DigestType object + @param digest_type - message digest to use as DigestType object (if None - default for key would be used) @param data - data to sign (if detached and Flags.REUSE_DIGEST is not specified) @@ -133,22 +138,21 @@ class SignedData(CMSBase): """ if not pkey.cansign: raise ValueError("Specified keypair has no private part") - if cert.pubkey!=pkey: + if cert.pubkey != pkey: raise ValueError("Certificate doesn't match public key") - p1=libcrypto.CMS_sign_add1_Signer(self.ptr,cert.cert,pkey.ptr, - md.digest,flags) - if p1 is None: + if libcrypto.CMS_sign_add1_Signer(self.ptr, cert.cert, pkey.ptr, + digest_type.digest, flags) is None: raise CMSError("adding signer") - if flags & Flags.REUSE_DIGEST==0: + if flags & Flags.REUSE_DIGEST == 0: if data is not None: - b=Membio(data) - biodata=b.bio + bio = Membio(data) + biodata = bio.bio else: - biodata=None - res= libcrypto.CMS_final(self.ptr,biodata,None,flags) - if res<=0: - raise CMSError - def verify(self,store,flags,data=None,certs=[]): + biodata = None + res = libcrypto.CMS_final(self.ptr, biodata, None, flags) + if res <= 0: + raise CMSError("Cannot finalize CMS") + def verify(self, store, flags, data=None, certs=None): """ Verifies signature under CMS message using trusted cert store @@ -160,42 +164,47 @@ class SignedData(CMSBase): sertificates to search for signing certificates @returns True if signature valid, False otherwise """ - bio=None - if data!=None: - b=Membio(data) - bio=b.bio - if certs is not None and len(certs)>0: - certstack=StackOfX509(certs) + bio = None + if data != None: + bio_obj = Membio(data) + bio = bio_obj.bio + if certs is not None and len(certs) > 0: + certstack = StackOfX509(certs) else: - certstack=None - res=libcrypto.CMS_verify(self.ptr,certstack,store.store,bio,None,flags) - return res>0 - @property + certstack = None + res = libcrypto.CMS_verify(self.ptr, certstack, store.store, bio, + None, flags) + return res > 0 + + @property def signers(self): """ Return list of signer's certificates """ - p=libcrypto.CMS_get0_signers(self.ptr) - if p is None: - raise CMSError - return StackOfX509(ptr=p,disposable=False) + signerlist = libcrypto.CMS_get0_signers(self.ptr) + if signerlist is None: + raise CMSError("Cannot get signers") + return StackOfX509(ptr=signerlist, disposable=False) + @property def data(self): """ Returns signed data if present in the message """ - b=Membio() - if not libcrypto.CMS_verify(self.ptr,None,None,None,b.bio,Flags.NO_VERIFY): + bio = Membio() + if not libcrypto.CMS_verify(self.ptr, None, None, None, bio.bio, + Flags.NO_VERIFY): raise CMSError("extract data") - return str(b) - def addcert(self,cert): + return str(bio) + + def addcert(self, cert): """ Adds a certificate (probably intermediate CA) to the SignedData structure """ - if libcrypto.CMS_add1_cert(self.ptr,cert.cert)<=0: - raise CMSError("adding cert") - def addcrl(self,crl): + if libcrypto.CMS_add1_cert(self.ptr, cert.cert) <= 0: + raise CMSError("Cannot add cert") + def addcrl(self, crl): """ Adds a CRL to the signed data structure """ @@ -205,10 +214,10 @@ class SignedData(CMSBase): """ List of the certificates contained in the structure """ - p=CMS_get1_certs(self.ptr) - if p is None: + certstack = libcrypto.CMS_get1_certs(self.ptr) + if certstack is None: raise CMSError("getting certs") - return StackOfX509(ptr=p,disposable=True) + return StackOfX509(ptr=certstack, disposable=True) @property def crls(self): """ @@ -217,8 +226,12 @@ class SignedData(CMSBase): raise NotImplementedError class EnvelopedData(CMSBase): + """ + Represents EnvelopedData CMS, i.e. message encrypted with session + keys, encrypted with recipient's public keys + """ @staticmethod - def create(recipients,data,cipher,flags=0): + def create(recipients, data, cipher, flags=0): """ Creates and encrypts message @param recipients - list of X509 objects @@ -226,13 +239,15 @@ class EnvelopedData(CMSBase): @param cipher - CipherType object @param flags - flag """ - recp=StackOfX509(recipients) - b=Membio(data) - p=libcrypto.CMS_encrypt(recp.ptr,b.bio,cipher.cipher_type,flags) - if p is None: + recp = StackOfX509(recipients) + bio = Membio(data) + cms_ptr = libcrypto.CMS_encrypt(recp.ptr, bio.bio, cipher.cipher_type, + flags) + if cms_ptr is None: raise CMSError("encrypt EnvelopedData") - return EnvelopedData(p) - def decrypt(self,pkey,cert,flags=0): + return EnvelopedData(cms_ptr) + + def decrypt(self, pkey, cert, flags=0): """ Decrypts message @param pkey - private key to decrypt @@ -245,15 +260,20 @@ class EnvelopedData(CMSBase): raise ValueError("Specified keypair has no private part") if pkey != cert.pubkey: raise ValueError("Certificate doesn't match private key") - b=Membio() - res=libcrypto.CMS_decrypt(self.ptr,pkey.ptr,cert.ccert,None,b.bio,flags) - if res<=0: + bio = Membio() + res = libcrypto.CMS_decrypt(self.ptr, pkey.ptr, cert.ccert, None, + bio.bio, flags) + if res <= 0: raise CMSError("decrypting CMS") - return str(b) + return str(bio) class EncryptedData(CMSBase): + """ + Represents encrypted data CMS structure, i.e. encrypted + with symmetric key, shared by sender and recepient. + """ @staticmethod - def create(data,cipher,key,flags=0): + def create(data, cipher, key, flags=0): """ Creates an EncryptedData message. @param data data to encrypt @@ -262,24 +282,61 @@ class EncryptedData(CMSBase): @param key - byte array used as simmetic key @param flags - OR-ed combination of Flags constant """ - b=Membio(data) - ptr=libcrypto.CMS_EncryptedData_encrypt(b.bio,cipher.cipher_type,key,len(key),flags) + bio = Membio(data) + ptr = libcrypto.CMS_EncryptedData_encrypt(bio.bio, cipher.cipher_type, + key, len(key), flags) if ptr is None: raise CMSError("encrypt data") return EncryptedData(ptr) - def decrypt(self,key,flags=0): + + def decrypt(self, key, flags=0): """ Decrypts encrypted data message @param key - symmetic key to decrypt @param flags - OR-ed combination of Flags constant """ - b=Membio() - if libcrypto.CMS_EncryptedData_decrypt(self.ptr,key,len(key),None, - b.bio,flags)<=0: - raise CMSError("decrypt data") - return str(b) + bio = Membio() + if libcrypto.CMS_EncryptedData_decrypt(self.ptr, key, len(key), None, + bio.bio, flags) <= 0: + raise CMSError("decrypt data") + return str(bio) -__all__=['CMS','CMSError','Flags','SignedData','EnvelopedData','EncryptedData'] +__all__ = ['CMS', 'CMSError', 'Flags', 'SignedData', 'EnvelopedData', + 'EncryptedData'] -libcrypto.CMS_verify.restype=c_int -libcrypto.CMS_verify.argtypes=(c_void_p,c_void_p,c_void_p,c_void_p,c_void_p,c_int) +libcrypto.CMS_add1_cert.restype = c_int +libcrypto.CMS_add1_cert.argtypes = (c_void_p, c_void_p) +libcrypto.CMS_decrypt.restype = c_int +libcrypto.CMS_decrypt.argtypes = (c_void_p, c_void_p, c_void_p, + c_void_p, c_void_p, c_uint) +libcrypto.CMS_encrypt.restype = c_void_p +libcrypto.CMS_encrypt.argtypes = (c_void_p, c_void_p, c_void_p, c_uint) +libcrypto.CMS_EncryptedData_decrypt.restype = c_int +libcrypto.CMS_EncryptedData_decrypt.argtypes = (c_void_p, c_char_p, c_size_t, + c_void_p, c_void_p, c_uint) +libcrypto.CMS_EncryptedData_encrypt.restype = c_void_p +libcrypto.CMS_EncryptedData_encrypt.argtypes = (c_void_p, c_void_p, c_char_p, + c_size_t, c_uint) +libcrypto.CMS_final.restype = c_int +libcrypto.CMS_final.argtypes = (c_void_p, c_void_p, c_void_p, c_uint) +libcrypto.CMS_get0_signers.restype = c_void_p +libcrypto.CMS_get0_signers.argtypes = (c_void_p, ) +libcrypto.CMS_get1_certs.restype = c_void_p +libcrypto.CMS_get1_certs.argtypes = (c_void_p, ) +libcrypto.CMS_sign.restype = c_void_p +libcrypto.CMS_sign.argtypes = (c_void_p, c_void_p, c_void_p, c_void_p, c_uint) +libcrypto.CMS_sign_add1_Signer.restype = c_void_p +libcrypto.CMS_sign_add1_Signer.argtypes = (c_void_p, c_void_p, c_void_p, + c_void_p, c_uint) +libcrypto.CMS_verify.restype = c_int +libcrypto.CMS_verify.argtypes = (c_void_p, c_void_p, c_void_p, c_void_p, + c_void_p, c_int) +libcrypto.d2i_CMS_bio.restype = c_void_p +libcrypto.d2i_CMS_bio.argtypes = (c_void_p, POINTER(c_void_p)) +libcrypto.i2d_CMS_bio.restype = c_int +libcrypto.i2d_CMS_bio.argtypes = (c_void_p, c_void_p) +libcrypto.PEM_read_bio_CMS.restype = c_void_p +libcrypto.PEM_read_bio_CMS.argtypes = (c_void_p, POINTER(c_void_p), + c_void_p, c_void_p) +libcrypto.PEM_write_bio_CMS.restype = c_int +libcrypto.PEM_write_bio_CMS.argtypes = (c_void_p, c_void_p) diff --git a/ctypescrypto/digest.py b/ctypescrypto/digest.py index 6ebb82a..8a9a74e 100644 --- a/ctypescrypto/digest.py +++ b/ctypescrypto/digest.py @@ -1,114 +1,130 @@ """ - Implements interface to OpenSSL EVP_Digest* functions. +Implements interface to OpenSSL EVP_Digest* functions. - Interface made as close to hashlib as possible. +Interface made as close to hashlib as possible. - This module is really an excess effort. Hashlib allows access to - mostly same functionality except oids and nids of hashing - algortithms (which might be needed for private key operations). +This module is really an excess effort. Hashlib allows access to +mostly same functionality except oids and nids of hashing +algortithms (which might be needed for private key operations). - hashlib even allows to use engine-provided digests if it is build - with dinamically linked libcrypto - so use - ctypescrypto.engine.set_default("gost",xFFFF) and md_gost94 - algorithm would be available both to this module and hashlib. +hashlib even allows to use engine-provided digests if it is build +with dinamically linked libcrypto - so use +ctypescrypto.engine.set_default("gost",xFFFF) and md_gost94 +algorithm would be available both to this module and hashlib. """ -from ctypes import c_int, c_char_p, c_void_p, POINTER, c_long,c_longlong, create_string_buffer,byref +from ctypes import c_int, c_char_p, c_void_p, POINTER, c_long, c_longlong +from ctypes import create_string_buffer, byref from ctypescrypto import libcrypto from ctypescrypto.exception import LibCryptoError from ctypescrypto.oid import Oid DIGEST_ALGORITHMS = ("MD5", "SHA1", "SHA224", "SHA256", "SHA384", "SHA512") -__all__ = ['DigestError','Digest','DigestType','new'] +__all__ = ['DigestError', 'Digest', 'DigestType', 'new'] class DigestError(LibCryptoError): + """ Exception raised if some OpenSSL function returns error """ pass def new(algname): """ - Behaves just like hashlib.new. Creates digest object by - algorithm name + Behaves just like hashlib.new. Creates digest object by + algorithm name """ - md=DigestType(algname) - return Digest(md) + + digest_type = DigestType(algname) + return Digest(digest_type) class DigestType(object): """ - - Represents EVP_MD object - constant structure which describes - digest algorithm - + Represents EVP_MD object - constant structure which describes + digest algorithm """ - def __init__(self, digest_name): + def __init__(self, digest_name): """ - Finds digest by its name. You can pass Oid object instead of - name. + Finds digest by its name. You can pass Oid object instead of + name. - Special case is when None is passed as name. In this case - unitialized digest is created, and can be initalized later - by setting its digest attribute to pointer to EVP_MD + Special case is when None is passed as name. In this case + unitialized digest is created, and can be initalized later + by setting its digest attribute to pointer to EVP_MD """ if digest_name is None: - return - if isinstance(digest_name,Oid): - self.digest_name=digest_name.longname() - self.digest=libcrypto.EVP_get_digestbyname(self.digest_name) + return + + if isinstance(digest_name, Oid): + self.digest_name = digest_name.longname() else: self.digest_name = str(digest_name) - self.digest = libcrypto.EVP_get_digestbyname(self.digest_name) + self.digest = libcrypto.EVP_get_digestbyname(self.digest_name) if self.digest is None: raise DigestError("Unknown digest: %s" % self.digest_name) @property def name(self): - if not hasattr(self,'digest_name'): - self.digest_name=Oid(libcrypto.EVP_MD_type(self.digest)).longname() + """ Returns name of the digest """ + if not hasattr(self, 'digest_name'): + self.digest_name = Oid(libcrypto.EVP_MD_type(self.digest) + ).longname() return self.digest_name + def __del__(self): + """ Empty destructor for constant object """ pass + + @property def digest_size(self): + """ Returns size of digest """ return libcrypto.EVP_MD_size(self.digest) + + @property def block_size(self): + """ Returns block size of the digest """ return libcrypto.EVP_MD_block_size(self.digest) + + @property def oid(self): + """ Returns Oid object of digest type """ return Oid(libcrypto.EVP_MD_type(self.digest)) class Digest(object): """ - Represents EVP_MD_CTX object which actually used to calculate - digests. - + Represents EVP_MD_CTX object which actually used to calculate + digests. """ - def __init__(self,digest_type): + + def __init__(self, digest_type): """ - Initializes digest using given type. + Initializes digest using given type. """ - self._clean_ctx() self.ctx = libcrypto.EVP_MD_CTX_create() if self.ctx is None: raise DigestError("Unable to create digest context") + self.digest_out = None + self.digest_finalized = False result = libcrypto.EVP_DigestInit_ex(self.ctx, digest_type.digest, None) if result == 0: self._clean_ctx() raise DigestError("Unable to initialize digest") self.digest_type = digest_type - self.digest_size = self.digest_type.digest_size() - self.block_size = self.digest_type.block_size() + self.digest_size = self.digest_type.digest_size + self.block_size = self.digest_type.block_size def __del__(self): + """ Uses _clean_ctx internal method """ self._clean_ctx() def update(self, data, length=None): """ - Hashes given byte string + Hashes given byte string - @param data - string to hash - @param length - if not specifed, entire string is hashed, - otherwise only first length bytes + @param data - string to hash + @param length - if not specifed, entire string is hashed, + otherwise only first length bytes """ if self.digest_finalized: raise DigestError("No updates allowed") - if not isinstance(data,str): + if not isinstance(data, str): raise TypeError("A string is expected") if length is None: length = len(data) @@ -116,12 +132,12 @@ class Digest(object): raise ValueError("Specified length is greater than length of data") result = libcrypto.EVP_DigestUpdate(self.ctx, c_char_p(data), length) if result != 1: - raise DigestError, "Unable to update digest" - + raise DigestError("Unable to update digest") + def digest(self, data=None): """ - Finalizes digest operation and return digest value - Optionally hashes more data before finalizing + Finalizes digest operation and return digest value + Optionally hashes more data before finalizing """ if self.digest_finalized: return self.digest_out.raw[:self.digest_size] @@ -129,31 +145,36 @@ class Digest(object): self.update(data) self.digest_out = create_string_buffer(256) length = c_long(0) - result = libcrypto.EVP_DigestFinal_ex(self.ctx, self.digest_out, byref(length)) - if result != 1 : + result = libcrypto.EVP_DigestFinal_ex(self.ctx, self.digest_out, + byref(length)) + if result != 1: raise DigestError("Unable to finalize digest") self.digest_finalized = True return self.digest_out.raw[:self.digest_size] def copy(self): """ - Creates copy of the digest CTX to allow to compute digest - while being able to hash more data + Creates copy of the digest CTX to allow to compute digest + while being able to hash more data """ - new_digest=Digest(self.digest_type) - libcrypto.EVP_MD_CTX_copy(new_digest.ctx,self.ctx) + + new_digest = Digest(self.digest_type) + libcrypto.EVP_MD_CTX_copy(new_digest.ctx, self.ctx) return new_digest def _clean_ctx(self): + """ + Clears and deallocates context + """ try: if self.ctx is not None: libcrypto.EVP_MD_CTX_destroy(self.ctx) - del(self.ctx) + del self.ctx except AttributeError: pass self.digest_out = None self.digest_finalized = False - def hexdigest(self,data=None): + def hexdigest(self, data=None): """ Returns digest in the hexadecimal form. For compatibility with hashlib @@ -164,13 +185,23 @@ class Digest(object): # Declare function result and argument types libcrypto.EVP_get_digestbyname.restype = c_void_p -libcrypto.EVP_get_digestbyname.argtypes = (c_char_p,) +libcrypto.EVP_get_digestbyname.argtypes = (c_char_p, ) libcrypto.EVP_MD_CTX_create.restype = c_void_p -libcrypto.EVP_DigestInit_ex.argtypes = (c_void_p,c_void_p,c_void_p) -libcrypto.EVP_DigestUpdate.argtypes = (c_void_p,c_char_p,c_longlong) -libcrypto.EVP_DigestFinal_ex.argtypes = (c_void_p,c_char_p,POINTER(c_long)) -libcrypto.EVP_MD_CTX_destroy.argtypes = (c_void_p,) -libcrypto.EVP_MD_CTX_copy.argtypes=(c_void_p, c_void_p) -libcrypto.EVP_MD_type.argtypes=(c_void_p,) -libcrypto.EVP_MD_size.argtypes=(c_void_p,) -libcrypto.EVP_MD_block_size.argtypes=(c_void_p,) +# libcrypto.EVP_MD_CTX_create has no arguments +libcrypto.EVP_DigestInit_ex.restupe = c_int +libcrypto.EVP_DigestInit_ex.argtypes = (c_void_p, c_void_p, c_void_p) +libcrypto.EVP_DigestUpdate.restype = c_int +libcrypto.EVP_DigestUpdate.argtypes = (c_void_p, c_char_p, c_longlong) +libcrypto.EVP_DigestFinal_ex.restype = c_int +libcrypto.EVP_DigestFinal_ex.argtypes = (c_void_p, c_char_p, POINTER(c_long)) +libcrypto.EVP_MD_CTX_destroy.argtypes = (c_void_p, ) +libcrypto.EVP_MD_CTX_copy.restype = c_int +libcrypto.EVP_MD_CTX_copy.argtypes = (c_void_p, c_void_p) +libcrypto.EVP_MD_type.argtypes = (c_void_p, ) +libcrypto.EVP_MD_size.argtypes = (c_void_p, ) +libcrypto.EVP_MD_block_size.restype = c_int +libcrypto.EVP_MD_block_size.argtypes = (c_void_p, ) +libcrypto.EVP_MD_size.restype = c_int +libcrypto.EVP_MD_size.argtypes = (c_void_p, ) +libcrypto.EVP_MD_type.restype = c_int +libcrypto.EVP_MD_type.argtypes = (c_void_p, ) diff --git a/ctypescrypto/ec.py b/ctypescrypto/ec.py index 7240b39..ef95fb6 100644 --- a/ctypescrypto/ec.py +++ b/ctypescrypto/ec.py @@ -2,78 +2,79 @@ Support for EC keypair operation missing form public libcrypto API """ from ctypescrypto.pkey import PKey, PKeyError -from ctypes import c_void_p,c_char_p,c_int,byref +from ctypes import c_void_p, c_char_p, c_int, byref from ctypescrypto import libcrypto -__all__ = [ 'create'] +__all__ = ['create'] -def create(curve,data): +def create(curve, data): """ - Creates EC keypair from the just secret key and curve name - - @param curve - name of elliptic curve - @param num - byte array or long number representing key + Creates EC keypair from the just secret key and curve name + + @param curve - name of elliptic curve + @param num - byte array or long number representing key """ - ec=libcrypto.EC_KEY_new_by_curve_name(curve.nid) - if ec is None: + ec_key = libcrypto.EC_KEY_new_by_curve_name(curve.nid) + if ec_key is None: raise PKeyError("EC_KEY_new_by_curvename") - group=libcrypto.EC_KEY_get0_group(ec) + group = libcrypto.EC_KEY_get0_group(ec_key) if group is None: raise PKeyError("EC_KEY_get0_group") - libcrypto.EC_GROUP_set_asn1_flag(group,1) - raw_key=libcrypto.BN_new() - if isinstance(data,int): - BN_hex2bn(byref(raw_key),hex(data)) + libcrypto.EC_GROUP_set_asn1_flag(group, 1) + raw_key = libcrypto.BN_new() + if isinstance(data, int): + libcrypto.BN_hex2bn(byref(raw_key), hex(data)) else: if raw_key is None: raise PKeyError("BN_new") - if libcrypto.BN_bin2bn(data,len(data),raw_key) is None: + if libcrypto.BN_bin2bn(data, len(data), raw_key) is None: raise PKeyError("BN_bin2bn") - ctx=libcrypto.BN_CTX_new() + ctx = libcrypto.BN_CTX_new() if ctx is None: raise PKeyError("BN_CTX_new") - order=libcrypto.BN_new() + order = libcrypto.BN_new() if order is None: raise PKeyError("BN_new") priv_key = libcrypto.BN_new() if priv_key is None: raise PKeyError("BN_new") - if libcrypto.EC_GROUP_get_order(group,order,ctx) <=0: + if libcrypto.EC_GROUP_get_order(group, order, ctx) <= 0: raise PKeyError("EC_GROUP_get_order") - if libcrypto.BN_nnmod(priv_key,raw_key,order,ctx) <=0: + if libcrypto.BN_nnmod(priv_key, raw_key, order, ctx) <= 0: raise PKeyError("BN_nnmod") - if libcrypto.EC_KEY_set_private_key(ec,priv_key)<=0: + if libcrypto.EC_KEY_set_private_key(ec_key, priv_key) <= 0: raise PKeyError("EC_KEY_set_private_key") - pub_key=libcrypto.EC_POINT_new(group) + pub_key = libcrypto.EC_POINT_new(group) if pub_key is None: raise PKeyError("EC_POINT_new") - if libcrypto.EC_POINT_mul(group,pub_key,priv_key,None,None,ctx)<=0: + if libcrypto.EC_POINT_mul(group, pub_key, priv_key, None, None, ctx) <= 0: raise PKeyError("EC_POINT_mul") - if libcrypto.EC_KEY_set_public_key(ec,pub_key)<=0: + if libcrypto.EC_KEY_set_public_key(ec_key, pub_key) <= 0: raise PKeyError("EC_KEY_set_public_key") libcrypto.BN_free(raw_key) libcrypto.BN_free(order) libcrypto.BN_free(priv_key) libcrypto.BN_CTX_free(ctx) - p=libcrypto.EVP_PKEY_new() - if p is None: - raise PKeyError("EVP_PKEY_new") - if libcrypto.EVP_PKEY_set1_EC_KEY(p,ec)<=0: + pkey = libcrypto.EVP_PKEY_new() + if pkey is None: + raise PKeyError("EVP_PKEY_new") + if libcrypto.EVP_PKEY_set1_EC_KEY(pkey, ec_key) <= 0: raise PKeyError("EVP_PKEY_set1_EC_KEY") - libcrypto.EC_KEY_free(ec) - return PKey(ptr=p,cansign=True) + libcrypto.EC_KEY_free(ec_key) + return PKey(ptr=pkey, cansign=True) -libcrypto.EVP_PKEY_new.restype=c_void_p -libcrypto.BN_new.restype=c_void_p -libcrypto.BN_free.argtypes=(c_void_p,) -libcrypto.BN_CTX_new.restype=c_void_p -libcrypto.BN_CTX_free.argtypes=(c_void_p,) -libcrypto.BN_bin2bn.argtypes=(c_char_p,c_int,c_void_p) -libcrypto.EC_KEY_set_private_key.argtypes=(c_void_p,c_void_p) -libcrypto.EC_POINT_new.argtypes=(c_void_p,) -libcrypto.EC_POINT_new.restype=c_void_p -libcrypto.EC_POINT_mul.argtypes=(c_void_p,c_void_p,c_void_p,c_void_p,c_void_p,c_void_p) -libcrypto.EC_KEY_set_public_key.argtypes=(c_void_p,c_void_p) +libcrypto.EVP_PKEY_new.restype = c_void_p +libcrypto.BN_new.restype = c_void_p +libcrypto.BN_free.argtypes = (c_void_p, ) +libcrypto.BN_CTX_new.restype = c_void_p +libcrypto.BN_CTX_free.argtypes = (c_void_p, ) +libcrypto.BN_bin2bn.argtypes = (c_char_p, c_int, c_void_p) +libcrypto.EC_KEY_set_private_key.argtypes = (c_void_p, c_void_p) +libcrypto.EC_POINT_new.argtypes = (c_void_p, ) +libcrypto.EC_POINT_new.restype = c_void_p +libcrypto.EC_POINT_mul.argtypes = (c_void_p, c_void_p, c_void_p, c_void_p, + c_void_p, c_void_p) +libcrypto.EC_KEY_set_public_key.argtypes = (c_void_p, c_void_p) diff --git a/ctypescrypto/engine.py b/ctypescrypto/engine.py index 0446d48..c2858e6 100644 --- a/ctypescrypto/engine.py +++ b/ctypescrypto/engine.py @@ -1,38 +1,39 @@ """ engine loading and configuration """ -from ctypes import * +from ctypes import c_void_p, c_char_p, c_int from ctypescrypto import libcrypto from ctypescrypto.exception import LibCryptoError -__all__=['default','set_default'] +__all__ = ['default', 'set_default'] -default=None +default = None def set_default(engine): """ - Loads specified engine and sets it as default for all - algorithms, supported by it + Loads specified engine and sets it as default for all + algorithms, supported by it """ global default - e=libcrypto.ENGINE_by_id(engine) - if e is None: + eng = libcrypto.ENGINE_by_id(engine) + if eng is None: # Try load engine - e = libcrypto.ENGINE_by_id("dynamic") - if e is None: + eng = libcrypto.ENGINE_by_id("dynamic") + if eng is None: raise LibCryptoError("Cannot get 'dynamic' engine") - if not libcrypto.ENGINE_ctrl_cmd_string(e,"SO_PATH",engine,0): + if not libcrypto.ENGINE_ctrl_cmd_string(eng, "SO_PATH", engine, 0): raise LibCryptoError("Cannot execute ctrl cmd SO_PATH") - if not libcrypto.ENGINE_ctrl_cmd_string(e,"LOAD",None,0): + if not libcrypto.ENGINE_ctrl_cmd_string(eng, "LOAD", None, 0): raise LibCryptoError("Cannot execute ctrl cmd LOAD") - if e is None: - raise ValueError("Cannot find engine "+engine) - libcrypto.ENGINE_set_default(e,c_int(0xFFFF)) - default=e + if eng is None: + raise ValueError("Cannot find engine " + engine) + libcrypto.ENGINE_set_default(eng, c_int(0xFFFF)) + default = eng # Declare function result and arguments for used functions -libcrypto.ENGINE_by_id.restype=c_void_p -libcrypto.ENGINE_by_id.argtypes=(c_char_p,) -libcrypto.ENGINE_set_default.argtypes=(c_void_p,c_int) -libcrypto.ENGINE_ctrl_cmd_string.argtypes=(c_void_p,c_char_p,c_char_p,c_int) -libcrypto.ENGINE_finish.argtypes=(c_char_p,) +libcrypto.ENGINE_by_id.restype = c_void_p +libcrypto.ENGINE_by_id.argtypes = (c_char_p, ) +libcrypto.ENGINE_set_default.argtypes = (c_void_p, c_int) +libcrypto.ENGINE_ctrl_cmd_string.argtypes = (c_void_p, c_char_p, c_char_p, + c_int) +libcrypto.ENGINE_finish.argtypes = (c_char_p, ) diff --git a/ctypescrypto/exception.py b/ctypescrypto/exception.py index 0e6fc43..5eec20a 100644 --- a/ctypescrypto/exception.py +++ b/ctypescrypto/exception.py @@ -1,50 +1,39 @@ """ Exception which extracts libcrypto error information """ -from ctypes import * +from ctypes import c_ulong, c_char_p, create_string_buffer from ctypescrypto import libcrypto -strings_loaded=False +strings_loaded = False -__all__ = ['LibCryptoError','clear_err_stack'] - -def _check_null(s): - """ - Handle transparently NULL returned from error reporting functions - instead of strings - """ - if s is None: - return "" - return s +__all__ = ['LibCryptoError', 'clear_err_stack'] class LibCryptoError(Exception): """ - Exception for libcrypto errors. Adds all the info, which can be - extracted from internal (per-thread) libcrypto error stack to the message, - passed to the constructor. + Exception for libcrypto errors. Adds all the info, which can be + extracted from internal (per-thread) libcrypto error stack to the message, + passed to the constructor. """ - def __init__(self,msg): + def __init__(self, msg): global strings_loaded if not strings_loaded: libcrypto.ERR_load_crypto_strings() strings_loaded = True - e=libcrypto.ERR_get_error() - m = msg - while e != 0: - m+="\n\t"+_check_null(libcrypto.ERR_lib_error_string(e))+":"+\ - _check_null(libcrypto.ERR_func_error_string(e))+":"+\ - _check_null(libcrypto.ERR_reason_error_string(e)) - e=libcrypto.ERR_get_error() - self.args=(m,) + err_code = libcrypto.ERR_get_error() + mesg = msg + buf = create_string_buffer(128) + while err_code != 0: + mesg += "\n\t" + libcrypto.ERR_error_string(err_code, buf) + err_code = libcrypto.ERR_get_error() + super(LibCryptoError, self).__init__(mesg) def clear_err_stack(): """ - Clears internal libcrypto err stack. Call it if you've checked - return code and processed exceptional situation, so subsequent - raising of the LibCryptoError wouldn't list already handled errors + Clears internal libcrypto err stack. Call it if you've checked + return code and processed exceptional situation, so subsequent + raising of the LibCryptoError wouldn't list already handled errors """ libcrypto.ERR_clear_error() - -libcrypto.ERR_lib_error_string.restype=c_char_p -libcrypto.ERR_func_error_string.restype=c_char_p -libcrypto.ERR_reason_error_string.restype=c_char_p +libcrypto.ERR_get_error.restype = c_ulong +libcrypto.ERR_error_string.restype = c_char_p +libcrypto.ERR_error_string.argtypes = (c_ulong, c_char_p) diff --git a/ctypescrypto/mac.py b/ctypescrypto/mac.py index 7b9381e..0a96cb0 100644 --- a/ctypescrypto/mac.py +++ b/ctypescrypto/mac.py @@ -67,8 +67,8 @@ class MAC(Digest): for (name,val) in kwargs.items(): if libcrypto.EVP_PKEY_CTX_ctrl_str(pctx,name,val)<=0: raise DigestError("Unable to set mac parameter") - self.digest_size = self.digest_type.digest_size() - self.block_size = self.digest_type.block_size() + self.digest_size = self.digest_type.digest_size + self.block_size = self.digest_type.block_size def digest(self,data=None): """ Method digest is redefined to return keyed MAC value instead of diff --git a/ctypescrypto/oid.py b/ctypescrypto/oid.py index 85b3aa0..28e12ba 100644 --- a/ctypescrypto/oid.py +++ b/ctypescrypto/oid.py @@ -1,58 +1,82 @@ -""" - Interface to OpenSSL object identifier database. +""" +Interface to OpenSSL object identifier database. + +It is primarily intended to deal with OIDs which are compiled into the +database or defined in the openssl configuration files. - It is primarily intended to deal with OIDs which are compiled into the - database or defined in the openssl configuration files. +But see create() function. - But see create() function +OpenSSL maintains database of OIDs, which contain long and short +human-readable names, which correspond to Oid as well as canonical +dotted-decimal representation, and links it to small integer, named +numeric identifier or 'nid'. Most OpenSSL functions which deals with +ASN.1 structures such as certificates or cryptographic messages, +expect or return nids, but it is very bad idea to hardcode nids into +your app, because it can change after mere recompilation of OpenSSL +library. +This module provides Oid object which represents entry to OpenSSL +OID database. """ from ctypescrypto import libcrypto from ctypes import c_char_p, c_void_p, c_int, create_string_buffer +from ctypescrypto.exception import LibCryptoError -__all__ = ['Oid','create','cleanup'] +__all__ = ['Oid', 'create', 'cleanup'] class Oid(object): """ - Represents an OID. It can be consturucted by textual - representation like Oid("commonName") or Oid("CN"), - dotted-decimal Oid("1.2.3.4") or using OpenSSL numeric - identifer (NID), which is typically returned or required by - OpenSSL API functions. If object is consturcted from textual - representation which is not present in the database, it fails - with ValueError + Represents an OID (ASN.1 Object identifier). - attribute nid - contains object nid. + It can be consturucted by textual + representation like Oid("commonName") or Oid("CN"), + dotted-decimal Oid("1.2.3.4") or using OpenSSL numeric + identifer (NID), which is typically returned or required by + OpenSSL API functions. If object is consturcted from textual + representation which is not present in the database, it fails + with ValueError + attribute nid - contains object nid. """ - def __init__(self,value): - " Object constuctor. Accepts string or integer" - if isinstance(value,unicode): - value=value.encode('ascii') - if isinstance(value,str): - self.nid=libcrypto.OBJ_txt2nid(value) - if self.nid==0: - raise ValueError("Cannot find object %s in the database"%(value)) - elif isinstance(value,(int,long)): - cn=libcrypto.OBJ_nid2sn(value) - if cn is None: - raise ValueError("No such nid %d in the database"%(value)) - self.nid=value + def __init__(self, value): + """ + Object constuctor. Accepts string, integer, or another Oid + object. + + Integer should be OpenSSL numeric identifier (nid) as returned + by some libcrypto function or extracted from some libcrypto + structure + """ + if isinstance(value, unicode): + value = value.encode('ascii') + if isinstance(value, str): + self.nid = libcrypto.OBJ_txt2nid(value) + if self.nid == 0: + raise ValueError("Cannot find object %s in the database" % + value) + elif isinstance(value, (int, long)): + short = libcrypto.OBJ_nid2sn(value) + if short is None: + raise ValueError("No such nid %d in the database" % value) + self.nid = value + elif isinstance(value, Oid): + self.nid = value.nid else: raise TypeError("Cannot convert this type to object identifier") def __hash__(self): - " Returns NID " + " Hash of object is equal to nid because Oids with same nid are same" return self.nid - def __cmp__(self,other): + def __cmp__(self, other): " Compares NIDs of two objects " - return self.nid-other.nid + return self.nid - other.nid def __str__(self): - " Default string representation of Oid is dotted-decimal" + " Default string representation of Oid is dotted-decimal " return self.dotted() def __repr__(self): - return "Oid('%s')"%(self.dotted()) + " Returns constructor call of Oid with dotted representation " + return "Oid('%s')" % (self.dotted()) def shortname(self): " Returns short name if any " return libcrypto.OBJ_nid2sn(self.nid) @@ -61,57 +85,60 @@ class Oid(object): return libcrypto.OBJ_nid2ln(self.nid) def dotted(self): " Returns dotted-decimal reperesentation " - obj=libcrypto.OBJ_nid2obj(self.nid) - buf=create_string_buffer(256) - libcrypto.OBJ_obj2txt(buf,256,obj,1) + obj = libcrypto.OBJ_nid2obj(self.nid) + buf = create_string_buffer(256) + libcrypto.OBJ_obj2txt(buf, 256, obj, 1) return buf.value @staticmethod def fromobj(obj): """ Creates an OID object from the pointer to ASN1_OBJECT c structure. - Strictly for internal use + This method intended for internal use for submodules which deal + with libcrypto ASN1 parsing functions, such as x509 or CMS """ - nid=libcrypto.OBJ_obj2nid(obj) - if nid==0: - buf=create_string_buffer(80) - l=libcrypto.OBJ_obj2txt(buf,80,obj,1) - oid=create(buf[0:l],buf[0:l],buf[0:l]) + nid = libcrypto.OBJ_obj2nid(obj) + if nid == 0: + buf = create_string_buffer(80) + dotted_len = libcrypto.OBJ_obj2txt(buf, 80, obj, 1) + dotted = buf[:dotted_len] + oid = create(dotted, dotted, dotted) else: - oid=Oid(nid) + oid = Oid(nid) return oid -def create(dotted,shortname,longname): +def create(dotted, shortname, longname): """ - Creates new OID in the database + Creates new OID in the database + + @param dotted - dotted-decimal representation of new OID + @param shortname - short name for new OID + @param longname - long name for new OID + + @returns Oid object corresponding to new OID - @param dotted - dotted-decimal representation of new OID - @param shortname - short name for new OID - @param longname - long name for new OID + This function should be used with exreme care. Whenever + possible, it is better to add new OIDs via OpenSSL configuration + file - @returns Oid object corresponding to new OID - - This function should be used with exreme care. Whenever - possible, it is better to add new OIDs via OpenSSL configuration - file + Results of calling this function twice for same OIDor for + Oid alredy in database are undefined - Results of calling this function twice for same OIDor for - Oid alredy in database are undefined """ - nid=libcrypto.OBJ_create(dotted,shortname,longname) + nid = libcrypto.OBJ_create(dotted, shortname, longname) if nid == 0: raise LibCryptoError("Problem adding new OID to the database") return Oid(nid) def cleanup(): """ - Removes all the objects, dynamically added by current - application from database. + Removes all the objects, dynamically added by current + application from database. """ libcrypto.OBJ_cleanup() -libcrypto.OBJ_nid2sn.restype=c_char_p -libcrypto.OBJ_nid2ln.restype=c_char_p -libcrypto.OBJ_nid2obj.restype=c_void_p -libcrypto.OBJ_obj2txt.argtypes=(c_char_p,c_int,c_void_p,c_int) -libcrypto.OBJ_txt2nid.argtupes=(c_char_p,) -libcrypto.OBJ_create.argtypes=(c_char_p,c_char_p,c_char_p) +libcrypto.OBJ_nid2sn.restype = c_char_p +libcrypto.OBJ_nid2ln.restype = c_char_p +libcrypto.OBJ_nid2obj.restype = c_void_p +libcrypto.OBJ_obj2txt.argtypes = (c_char_p, c_int, c_void_p, c_int) +libcrypto.OBJ_txt2nid.argtupes = (c_char_p, ) +libcrypto.OBJ_create.argtypes = (c_char_p, c_char_p, c_char_p) diff --git a/ctypescrypto/pbkdf2.py b/ctypescrypto/pbkdf2.py index 85a99a5..bde567b 100644 --- a/ctypescrypto/pbkdf2.py +++ b/ctypescrypto/pbkdf2.py @@ -3,33 +3,34 @@ PKCS5 PBKDF2 function. """ -from ctypes import c_char_p,c_int, c_void_p, create_string_buffer +from ctypes import c_char_p, c_int, c_void_p, create_string_buffer from ctypescrypto import libcrypto from ctypescrypto.digest import DigestType +from ctypescrypto.exception import LibCryptoError __all__ = ['pbkdf2'] -def pbkdf2(password,salt,outlen,digesttype="sha1",iterations=2000): +def pbkdf2(password, salt, outlen, digesttype="sha1", iterations=2000): """ - Interface to PKCS5_PBKDF2_HMAC function - Parameters: - - @param password - password to derive key from - @param salt - random salt to use for key derivation - @param outlen - number of bytes to derive - @param digesttype - name of digest to use to use (default sha1) - @param iterations - number of iterations to use + Interface to PKCS5_PBKDF2_HMAC function + Parameters: - @returns outlen bytes of key material derived from password and salt + @param password - password to derive key from + @param salt - random salt to use for key derivation + @param outlen - number of bytes to derive + @param digesttype - name of digest to use to use (default sha1) + @param iterations - number of iterations to use + + @returns outlen bytes of key material derived from password and salt """ - dt=DigestType(digesttype) - out=create_string_buffer(outlen) - res=libcrypto.PKCS5_PBKDF2_HMAC(password,len(password),salt,len(salt), - iterations,dt.digest,outlen,out) - if res<=0: + dgst = DigestType(digesttype) + out = create_string_buffer(outlen) + res = libcrypto.PKCS5_PBKDF2_HMAC(password, len(password), salt, len(salt), + iterations, dgst.digest, outlen, out) + if res <= 0: raise LibCryptoError("error computing PBKDF2") return out.raw -libcrypto.PKCS5_PBKDF2_HMAC.argtypes=(c_char_p,c_int,c_char_p,c_int,c_int, - c_void_p,c_int,c_char_p) -libcrypto.PKCS5_PBKDF2_HMAC.restupe=c_int +libcrypto.PKCS5_PBKDF2_HMAC.argtypes = (c_char_p, c_int, c_char_p, c_int, c_int, + c_void_p, c_int, c_char_p) +libcrypto.PKCS5_PBKDF2_HMAC.restupe = c_int diff --git a/ctypescrypto/pkey.py b/ctypescrypto/pkey.py index 1e70784..59da723 100644 --- a/ctypescrypto/pkey.py +++ b/ctypescrypto/pkey.py @@ -5,297 +5,354 @@ PKey object of this module is wrapper around OpenSSL EVP_PKEY object. """ -from ctypes import c_char_p,c_void_p,byref,c_int,c_long, c_longlong, create_string_buffer,CFUNCTYPE,POINTER +from ctypes import c_char_p, c_void_p, c_int, c_long, POINTER +from ctypes import create_string_buffer, byref, memmove, CFUNCTYPE from ctypescrypto import libcrypto -from ctypescrypto.exception import LibCryptoError,clear_err_stack +from ctypescrypto.exception import LibCryptoError, clear_err_stack from ctypescrypto.bio import Membio -import sys -__all__ = ['PKeyError','password_callback','PKey'] +__all__ = ['PKeyError', 'password_callback', 'PKey', 'PW_CALLBACK_FUNC'] class PKeyError(LibCryptoError): + """ Exception thrown if libcrypto finctions return an error """ pass -CALLBACK_FUNC=CFUNCTYPE(c_int,c_char_p,c_int,c_int,c_char_p) -def password_callback(buf,length,rwflag,u): +PW_CALLBACK_FUNC = CFUNCTYPE(c_int, c_char_p, c_int, c_int, c_char_p) +""" Function type for pem password callback """ + +def password_callback(buf, length, rwflag, userdata): """ - Example password callback for private key. Assumes that - password is store in the userdata parameter, so allows to pass password + Example password callback for private key. Assumes that + password is stored in the userdata parameter, so allows to pass password from constructor arguments to the libcrypto keyloading functions """ - cnt=len(u) - if length0 - def derive(self,peerkey,**kwargs): + return ret > 0 + + def derive(self, peerkey, **kwargs): """ - Derives shared key (DH,ECDH,VKO 34.10). Requires - private key available + Derives shared key (DH,ECDH,VKO 34.10). Requires + private key available - @param peerkey - other key (may be public only) + @param peerkey - other key (may be public only) - Keyword parameters are algorithm-specific + Keyword parameters are algorithm-specific """ - ctx=libcrypto.EVP_PKEY_CTX_new(self.key,None) + if not self.cansign: + raise ValueError("No private key available") + ctx = libcrypto.EVP_PKEY_CTX_new(self.key, None) if ctx is None: raise PKeyError("Initailizing derive context") - if libcrypto.EVP_PKEY_derive_init(ctx)<1: + if libcrypto.EVP_PKEY_derive_init(ctx) < 1: raise PKeyError("derive_init") - - self._configure_context(ctx,kwargs,["ukm"]) - if libcrypto.EVP_PKEY_derive_set_peer(ctx,peerkey.key)<=0: + # This is workaround around missing functionality in GOST engine + # it provides only numeric control command to set UKM, not + # string one. + self._configure_context(ctx, kwargs, ["ukm"]) + if libcrypto.EVP_PKEY_derive_set_peer(ctx, peerkey.key) <= 0: raise PKeyError("Cannot set peer key") if "ukm" in kwargs: - if libcrypto.EVP_PKEY_CTX_ctrl(ctx,-1,1<<10,8,8,kwargs["ukm"])<=0: + # We just hardcode numeric command to set UKM here + if libcrypto.EVP_PKEY_CTX_ctrl(ctx, -1, 1 << 10, 8, 8, + kwargs["ukm"]) <= 0: raise PKeyError("Cannot set UKM") - keylen=c_long(0) - if libcrypto.EVP_PKEY_derive(ctx,None,byref(keylen))<=0: + keylen = c_long(0) + if libcrypto.EVP_PKEY_derive(ctx, None, byref(keylen)) <= 0: raise PKeyError("computing shared key length") - buf=create_string_buffer(keylen.value) - if libcrypto.EVP_PKEY_derive(ctx,buf,byref(keylen))<=0: + buf = create_string_buffer(keylen.value) + if libcrypto.EVP_PKEY_derive(ctx, buf, byref(keylen)) <= 0: raise PKeyError("computing actual shared key") libcrypto.EVP_PKEY_CTX_free(ctx) - return buf.raw[:keylen.value] + return buf.raw[:int(keylen.value)] + @staticmethod - def generate(algorithm,**kwargs): + def generate(algorithm, **kwargs): """ - Generates new private-public key pair for given algorithm - (string like 'rsa','ec','gost2001') and algorithm-specific - parameters. + Generates new private-public key pair for given algorithm + (string like 'rsa','ec','gost2001') and algorithm-specific + parameters. - Algorithm specific paramteers for RSA: + Algorithm specific paramteers for RSA: - rsa_keygen_bits=number - size of key to be generated - rsa_keygen_pubexp - RSA public expontent(default 65537) + rsa_keygen_bits=number - size of key to be generated + rsa_keygen_pubexp - RSA public expontent(default 65537) - Algorithm specific parameters for DSA,DH and EC + Algorithm specific parameters for DSA,DH and EC - paramsfrom=PKey object + paramsfrom=PKey object - copy parameters of newly generated key from existing key + copy parameters of newly generated key from existing key - Algorithm specific parameters for GOST2001 + Algorithm specific parameters for GOST2001 - paramset= paramset name where name is one of - 'A','B','C','XA','XB','test' + paramset= paramset name where name is one of + 'A','B','C','XA','XB','test' - paramsfrom does work too + paramsfrom does work too """ - tmpeng=c_void_p(None) - ameth=libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng),algorithm,-1) + tmpeng = c_void_p(None) + ameth = libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng), algorithm, -1) if ameth is None: - raise PKeyError("Algorithm %s not foind\n"%(algname)) + raise PKeyError("Algorithm %s not foind\n"%(algorithm)) clear_err_stack() - pkey_id=c_int(0) - libcrypto.EVP_PKEY_asn1_get0_info(byref(pkey_id),None,None,None,None,ameth) + pkey_id = c_int(0) + libcrypto.EVP_PKEY_asn1_get0_info(byref(pkey_id), None, None, None, + None, ameth) #libcrypto.ENGINE_finish(tmpeng) if "paramsfrom" in kwargs: - ctx=libcrypto.EVP_PKEY_CTX_new(kwargs["paramsfrom"].key,None) + ctx = libcrypto.EVP_PKEY_CTX_new(kwargs["paramsfrom"].key, None) else: - ctx=libcrypto.EVP_PKEY_CTX_new_id(pkey_id,None) + ctx = libcrypto.EVP_PKEY_CTX_new_id(pkey_id, None) # FIXME support EC curve as keyword param by invoking paramgen # operation if ctx is None: - raise PKeyError("Creating context for key type %d"%(pkey_id.value)) - if libcrypto.EVP_PKEY_keygen_init(ctx) <=0 : + raise PKeyError("Creating context for key type %d"%(pkey_id.value)) + if libcrypto.EVP_PKEY_keygen_init(ctx) <= 0: raise PKeyError("keygen_init") - PKey._configure_context(ctx,kwargs,["paramsfrom"]) - key=c_void_p(None) - if libcrypto.EVP_PKEY_keygen(ctx,byref(key))<=0: + PKey._configure_context(ctx, kwargs, ["paramsfrom"]) + key = c_void_p(None) + if libcrypto.EVP_PKEY_keygen(ctx, byref(key)) <= 0: raise PKeyError("Error generating key") libcrypto.EVP_PKEY_CTX_free(ctx) - return PKey(ptr=key,cansign=True) - def exportpub(self,format="PEM"): + return PKey(ptr=key, cansign=True) + + def exportpub(self, format="PEM"): """ - Returns public key as PEM or DER structure. + Returns public key as PEM or DER structure. """ - b=Membio() + bio = Membio() if format == "PEM": - r=libcrypto.PEM_write_bio_PUBKEY(b.bio,self.key) + retcode = libcrypto.PEM_write_bio_PUBKEY(bio.bio, self.key) else: - r=libcrypto.i2d_PUBKEY_bio(b.bio,self.key) - if r==0: + retcode = libcrypto.i2d_PUBKEY_bio(bio.bio, self.key) + if retcode == 0: raise PKeyError("error serializing public key") - return str(b) - def exportpriv(self,format="PEM",password=None,cipher=None): + return str(bio) + + def exportpriv(self, format="PEM", password=None, cipher=None, + callback=_cb): """ - Returns private key as PEM or DER Structure. - If password and cipher are specified, encrypts key - on given password, using given algorithm. Cipher must be - an ctypescrypto.cipher.CipherType object + Returns private key as PEM or DER Structure. + If password and cipher are specified, encrypts key + on given password, using given algorithm. Cipher must be + an ctypescrypto.cipher.CipherType object """ - b=Membio() + bio = Membio() if cipher is None: - evp_cipher=None + evp_cipher = None else: - if password is None: - raise NotImplementedError("Interactive password entry is not supported") - evp_cipher=cipher.cipher + evp_cipher = cipher.cipher if format == "PEM": - r=libcrypto.PEM_write_bio_PrivateKey(b.bio,self.key,evp_cipher,None,0,_cb, - password) + ret = libcrypto.PEM_write_bio_PrivateKey(bio.bio, self.key, + evp_cipher, None, 0, + callback, + c_char_p(password)) else: - if cipher is not None: - raise NotImplementedError("Der-formatted encrypted keys are not supported") - r=libcrypto.i2d_PrivateKey_bio(b.bio,self.key) - if r==0: + ret = libcrypto.i2d_PKCS8PrivateKey_bio(bio.bio, self.key, + evp_cipher, None, 0, + callback, + c_char_p(password)) + if ret == 0: raise PKeyError("error serializing private key") - return str(b) + return str(bio) + @staticmethod - def _configure_context(ctx,opts,skip=[]): + def _configure_context(ctx, opts, skip=()): """ - Configures context of public key operations - @param ctx - context to configure - @param opts - dictionary of options (from kwargs of calling - function) - @param skip - list of options which shouldn't be passed to - context + Configures context of public key operations + @param ctx - context to configure + @param opts - dictionary of options (from kwargs of calling + function) + @param skip - list of options which shouldn't be passed to + context """ for oper in opts: if oper in skip: continue - rv=libcrypto.EVP_PKEY_CTX_ctrl_str(ctx,oper,str(opts[oper])) - if rv==-2: - raise PKeyError("Parameter %s is not supported by key"%(oper,)) - if rv<1: - raise PKeyError("Error setting parameter %s"%(oper,)) + ret = libcrypto.EVP_PKEY_CTX_ctrl_str(ctx, oper, str(opts[oper])) + if ret == -2: + raise PKeyError("Parameter %s is not supported by key" % oper) + if ret < 1: + raise PKeyError("Error setting parameter %s" % oper) # Declare function prototypes -libcrypto.EVP_PKEY_cmp.argtypes=(c_void_p,c_void_p) -libcrypto.PEM_read_bio_PrivateKey.restype=c_void_p -libcrypto.PEM_read_bio_PrivateKey.argtypes=(c_void_p,POINTER(c_void_p),CALLBACK_FUNC,c_char_p) -libcrypto.PEM_read_bio_PUBKEY.restype=c_void_p -libcrypto.PEM_read_bio_PUBKEY.argtypes=(c_void_p,POINTER(c_void_p),CALLBACK_FUNC,c_char_p) -libcrypto.d2i_PUBKEY_bio.restype=c_void_p -libcrypto.d2i_PUBKEY_bio.argtypes=(c_void_p,c_void_p) -libcrypto.d2i_PrivateKey_bio.restype=c_void_p -libcrypto.d2i_PrivateKey_bio.argtypes=(c_void_p,c_void_p) -libcrypto.EVP_PKEY_print_public.argtypes=(c_void_p,c_void_p,c_int,c_void_p) -libcrypto.EVP_PKEY_asn1_find_str.restype=c_void_p -libcrypto.EVP_PKEY_asn1_find_str.argtypes=(c_void_p,c_char_p,c_int) -libcrypto.EVP_PKEY_asn1_get0_info.restype=c_int -libcrypto.EVP_PKEY_asn1_get0_info.argtypes=(POINTER(c_int),POINTER(c_int),POINTER(c_int),POINTER(c_char_p), POINTER(c_char_p),c_void_p) -libcrypto.EVP_PKEY_cmp.restype=c_int -libcrypto.EVP_PKEY_cmp.argtypes=(c_void_p,c_void_p) -libcrypto.EVP_PKEY_CTX_ctrl_str.restype=c_int -libcrypto.EVP_PKEY_CTX_ctrl_str.argtypes=(c_void_p,c_void_p,c_void_p) -libcrypto.EVP_PKEY_CTX_ctrl.restype=c_int -libcrypto.EVP_PKEY_CTX_ctrl.argtypes=(c_void_p,c_int,c_int,c_int,c_int,c_void_p) -libcrypto.EVP_PKEY_CTX_free.argtypes=(c_void_p,) -libcrypto.EVP_PKEY_CTX_new.restype=c_void_p -libcrypto.EVP_PKEY_CTX_new.argtypes=(c_void_p,c_void_p) -libcrypto.EVP_PKEY_CTX_new_id.restype=c_void_p -libcrypto.EVP_PKEY_CTX_new_id.argtypes=(c_int,c_void_p) -libcrypto.EVP_PKEY_derive.restype=c_int -libcrypto.EVP_PKEY_derive.argtypes=(c_void_p,c_char_p,POINTER(c_long)) -libcrypto.EVP_PKEY_derive_init.restype=c_int -libcrypto.EVP_PKEY_derive_init.argtypes=(c_void_p,) -libcrypto.EVP_PKEY_derive_set_peer.restype=c_int -libcrypto.EVP_PKEY_derive_set_peer.argtypes=(c_void_p,c_void_p) -libcrypto.EVP_PKEY_free.argtypes=(c_void_p,) -libcrypto.EVP_PKEY_keygen.restype=c_int -libcrypto.EVP_PKEY_keygen.argtypes=(c_void_p,c_void_p) -libcrypto.EVP_PKEY_keygen_init.restype=c_int -libcrypto.EVP_PKEY_keygen_init.argtypes=(c_void_p,) -libcrypto.EVP_PKEY_sign.restype=c_int -libcrypto.EVP_PKEY_sign.argtypes=(c_void_p,c_char_p,POINTER(c_long),c_char_p,c_long) -libcrypto.EVP_PKEY_sign_init.restype=c_int -libcrypto.EVP_PKEY_sign_init.argtypes=(c_void_p,) -libcrypto.EVP_PKEY_verify.restype=c_int -libcrypto.EVP_PKEY_verify.argtypes=(c_void_p,c_char_p,c_long,c_char_p,c_long) -libcrypto.EVP_PKEY_verify_init.restype=c_int -libcrypto.EVP_PKEY_verify_init.argtypes=(c_void_p,) -libcrypto.PEM_write_bio_PrivateKey.argtypes=(c_void_p,c_void_p,c_void_p,c_char_p,c_int,CALLBACK_FUNC,c_char_p) -libcrypto.PEM_write_bio_PUBKEY.argtypes=(c_void_p,c_void_p) -libcrypto.i2d_PUBKEY_bio.argtypes=(c_void_p,c_void_p) -libcrypto.i2d_PrivateKey_bio.argtypes=(c_void_p,c_void_p) -libcrypto.ENGINE_finish.argtypes=(c_void_p,) +libcrypto.EVP_PKEY_cmp.argtypes = (c_void_p, c_void_p) +libcrypto.PEM_read_bio_PrivateKey.restype = c_void_p +libcrypto.PEM_read_bio_PrivateKey.argtypes = (c_void_p, POINTER(c_void_p), + PW_CALLBACK_FUNC, c_char_p) +libcrypto.PEM_read_bio_PUBKEY.restype = c_void_p +libcrypto.PEM_read_bio_PUBKEY.argtypes = (c_void_p, POINTER(c_void_p), + PW_CALLBACK_FUNC, c_char_p) +libcrypto.d2i_PUBKEY_bio.restype = c_void_p +libcrypto.d2i_PUBKEY_bio.argtypes = (c_void_p, c_void_p) +libcrypto.d2i_PrivateKey_bio.restype = c_void_p +libcrypto.d2i_PrivateKey_bio.argtypes = (c_void_p, c_void_p) +libcrypto.EVP_PKEY_print_public.argtypes = (c_void_p, c_void_p, c_int, c_void_p) +libcrypto.EVP_PKEY_asn1_find_str.restype = c_void_p +libcrypto.EVP_PKEY_asn1_find_str.argtypes = (c_void_p, c_char_p, c_int) +libcrypto.EVP_PKEY_asn1_get0_info.restype = c_int +libcrypto.EVP_PKEY_asn1_get0_info.argtypes = (POINTER(c_int), POINTER(c_int), + POINTER(c_int), POINTER(c_char_p), + POINTER(c_char_p), c_void_p) +libcrypto.EVP_PKEY_cmp.restype = c_int +libcrypto.EVP_PKEY_cmp.argtypes = (c_void_p, c_void_p) +libcrypto.EVP_PKEY_CTX_ctrl_str.restype = c_int +libcrypto.EVP_PKEY_CTX_ctrl_str.argtypes = (c_void_p, c_void_p, c_void_p) +libcrypto.EVP_PKEY_CTX_ctrl.restype = c_int +libcrypto.EVP_PKEY_CTX_ctrl.argtypes = (c_void_p, c_int, c_int, c_int, c_int, + c_void_p) +libcrypto.EVP_PKEY_CTX_free.argtypes = (c_void_p, ) +libcrypto.EVP_PKEY_CTX_new.restype = c_void_p +libcrypto.EVP_PKEY_CTX_new.argtypes = (c_void_p, c_void_p) +libcrypto.EVP_PKEY_CTX_new_id.restype = c_void_p +libcrypto.EVP_PKEY_CTX_new_id.argtypes = (c_int, c_void_p) +libcrypto.EVP_PKEY_derive.restype = c_int +libcrypto.EVP_PKEY_derive.argtypes = (c_void_p, c_char_p, POINTER(c_long)) +libcrypto.EVP_PKEY_derive_init.restype = c_int +libcrypto.EVP_PKEY_derive_init.argtypes = (c_void_p, ) +libcrypto.EVP_PKEY_derive_set_peer.restype = c_int +libcrypto.EVP_PKEY_derive_set_peer.argtypes = (c_void_p, c_void_p) +libcrypto.EVP_PKEY_free.argtypes = (c_void_p,) +libcrypto.EVP_PKEY_keygen.restype = c_int +libcrypto.EVP_PKEY_keygen.argtypes = (c_void_p, c_void_p) +libcrypto.EVP_PKEY_keygen_init.restype = c_int +libcrypto.EVP_PKEY_keygen_init.argtypes = (c_void_p, ) +libcrypto.EVP_PKEY_sign.restype = c_int +libcrypto.EVP_PKEY_sign.argtypes = (c_void_p, c_char_p, POINTER(c_long), + c_char_p, c_long) +libcrypto.EVP_PKEY_sign_init.restype = c_int +libcrypto.EVP_PKEY_sign_init.argtypes = (c_void_p, ) +libcrypto.EVP_PKEY_verify.restype = c_int +libcrypto.EVP_PKEY_verify.argtypes = (c_void_p, c_char_p, c_long, c_char_p, + c_long) +libcrypto.EVP_PKEY_verify_init.restype = c_int +libcrypto.EVP_PKEY_verify_init.argtypes = (c_void_p, ) +libcrypto.PEM_write_bio_PrivateKey.argtypes = (c_void_p, c_void_p, c_void_p, + c_char_p, c_int, + PW_CALLBACK_FUNC, c_char_p) +libcrypto.PEM_write_bio_PUBKEY.argtypes = (c_void_p, c_void_p) +libcrypto.i2d_PUBKEY_bio.argtypes = (c_void_p, c_void_p) +libcrypto.i2d_PKCS8PrivateKey_bio.argtypes = (c_void_p, c_void_p, c_void_p, + c_char_p, c_int, + PW_CALLBACK_FUNC, c_char_p) +libcrypto.ENGINE_finish.argtypes = (c_void_p, ) diff --git a/ctypescrypto/rand.py b/ctypescrypto/rand.py index 92e67e9..d9e966d 100644 --- a/ctypescrypto/rand.py +++ b/ctypescrypto/rand.py @@ -6,40 +6,41 @@ from ctypes import create_string_buffer, c_char_p, c_int, c_double from ctypescrypto import libcrypto from ctypescrypto.exception import LibCryptoError -__all__ = ['RandError','bytes','pseudo_bytes','seed','status'] +__all__ = ['RandError', 'bytes', 'pseudo_bytes', 'seed', 'status'] class RandError(LibCryptoError): + """ Exception raised when openssl function return error """ pass -def bytes( num, check_result=False): +def bytes(num, check_result=False): """ - Returns num bytes of cryptographically strong pseudo-random - bytes. If checkc_result is True, raises error if PRNG is not - seeded enough + Returns num bytes of cryptographically strong pseudo-random + bytes. If checkc_result is True, raises error if PRNG is not + seeded enough """ - if num <= 0 : + if num <= 0: raise ValueError("'num' should be > 0") - buffer = create_string_buffer(num) - result = libcrypto.RAND_bytes(buffer, num) + buf = create_string_buffer(num) + result = libcrypto.RAND_bytes(buf, num) if check_result and result == 0: raise RandError("Random Number Generator not seeded sufficiently") - return buffer.raw[:num] + return buf.raw[:num] def pseudo_bytes(num): """ - Returns num bytes of pseudo random data. Pseudo- random byte - sequences generated by pseudo_bytes() will be unique if - they are of sufficient length, but are not necessarily - unpredictable. They can be used for non-cryptographic purposes - and for certain purposes in cryptographic protocols, but usually - not for key generation etc. + Returns num bytes of pseudo random data. Pseudo- random byte + sequences generated by pseudo_bytes() will be unique if + they are of sufficient length, but are not necessarily + unpredictable. They can be used for non-cryptographic purposes + and for certain purposes in cryptographic protocols, but usually + not for key generation etc. """ - if num <= 0 : + if num <= 0: raise ValueError("'num' should be > 0") - buffer = create_string_buffer(num) - libcrypto.RAND_pseudo_bytes(buffer, num) - return buffer.raw[:num] + buf = create_string_buffer(num) + libcrypto.RAND_pseudo_bytes(buf, num) + return buf.raw[:num] def seed(data, entropy=None): """ @@ -47,24 +48,24 @@ def seed(data, entropy=None): If entropy is not None, it should be floating point(double) value estimating amount of entropy in the data (in bytes). """ - if not isinstance(data,str): + if not isinstance(data, str): raise TypeError("A string is expected") ptr = c_char_p(data) size = len(data) if entropy is None: libcrypto.RAND_seed(ptr, size) - else : + else: libcrypto.RAND_add(ptr, size, entropy) def status(): """ - Returns 1 if random generator is sufficiently seeded and 0 - otherwise + Returns 1 if random generator is sufficiently seeded and 0 + otherwise """ return libcrypto.RAND_status() - -libcrypto.RAND_add.argtypes=(c_char_p,c_int,c_double) -libcrypto.RAND_seed.argtypes=(c_char_p,c_int) -libcrypto.RAND_pseudo_bytes.argtypes=(c_char_p,c_int) -libcrypto.RAND_bytes.argtypes=(c_char_p,c_int) + +libcrypto.RAND_add.argtypes = (c_char_p, c_int, c_double) +libcrypto.RAND_seed.argtypes = (c_char_p, c_int) +libcrypto.RAND_pseudo_bytes.argtypes = (c_char_p, c_int) +libcrypto.RAND_bytes.argtypes = (c_char_p, c_int) diff --git a/ctypescrypto/x509.py b/ctypescrypto/x509.py index 159956d..40263cf 100644 --- a/ctypescrypto/x509.py +++ b/ctypescrypto/x509.py @@ -1,5 +1,5 @@ """ -Implements interface to openssl X509 and X509Store structures, +Implements interface to openssl X509 and X509Store structures, I.e allows to load, analyze and verify certificates. X509Store objects are also used to verify other signed documets, @@ -8,23 +8,23 @@ such as CMS, OCSP and timestamps. -from ctypes import c_void_p,create_string_buffer,c_long,c_int,POINTER,c_char_p,Structure,cast +from ctypes import c_void_p, c_long, c_int, POINTER, c_char_p, Structure, cast from ctypescrypto.bio import Membio from ctypescrypto.pkey import PKey from ctypescrypto.oid import Oid from ctypescrypto.exception import LibCryptoError from ctypescrypto import libcrypto from datetime import datetime + try: from pytz import utc except ImportError: - from datetime import timedelta,tzinfo - ZERO=timedelta(0) + from datetime import timedelta, tzinfo + ZERO = timedelta(0) class UTC(tzinfo): - """tzinfo object for UTC. + """tzinfo object for UTC. If no pytz is available, we would use it. """ - def utcoffset(self, dt): return ZERO @@ -34,33 +34,33 @@ except ImportError: def dst(self, dt): return ZERO - utc=UTC() + utc = UTC() -__all__ = ['X509','X509Error','X509Name','X509Store','StackOfX509'] +__all__ = ['X509', 'X509Error', 'X509Name', 'X509Store', 'StackOfX509'] class _validity(Structure): - """ ctypes representation of X509_VAL structure + """ ctypes representation of X509_VAL structure needed to access certificate validity period, because openssl doesn't provide fuctions for it - only macros """ - _fields_ = [('notBefore',c_void_p),('notAfter',c_void_p)] + _fields_ = [('notBefore', c_void_p), ('notAfter', c_void_p)] class _cinf(Structure): - """ ctypes representtion of X509_CINF structure + """ ctypes representtion of X509_CINF structure neede to access certificate data, which are accessable only via macros """ - _fields_ = [('version',c_void_p), - ('serialNumber',c_void_p), - ('sign_alg',c_void_p), - ('issuer',c_void_p), - ('validity',POINTER(_validity)), - ('subject',c_void_p), - ('pubkey',c_void_p), - ('issuerUID',c_void_p), - ('subjectUID',c_void_p), - ('extensions',c_void_p), - ] + _fields_ = [('version', c_void_p), + ('serialNumber', c_void_p), + ('sign_alg', c_void_p), + ('issuer', c_void_p), + ('validity', POINTER(_validity)), + ('subject', c_void_p), + ('pubkey', c_void_p), + ('issuerUID', c_void_p), + ('subjectUID', c_void_p), + ('extensions', c_void_p), + ] class _x509(Structure): """ @@ -68,11 +68,11 @@ class _x509(Structure): to access certificate data which are accesable only via macros, not functions """ - _fields_ = [('cert_info',POINTER(_cinf)), - ('sig_alg',c_void_p), - ('signature',c_void_p), + _fields_ = [('cert_info', POINTER(_cinf)), + ('sig_alg', c_void_p), + ('signature', c_void_p), # There are a lot of parsed extension fields there - ] + ] _px509 = POINTER(_x509) class X509Error(LibCryptoError): @@ -85,7 +85,7 @@ class X509Error(LibCryptoError): class X509Name(object): """ - Class which represents X.509 distinguished name - typically + Class which represents X.509 distinguished name - typically a certificate subject name or an issuer name. Now used only to represent information, extracted from the @@ -93,22 +93,25 @@ class X509Name(object): certificate signing request """ # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT - PRINT_FLAG=0x10010 - ESC_MSB=4 - def __init__(self,ptr=None,copy=False): + PRINT_FLAG = 0x10010 + ESC_MSB = 4 + def __init__(self, ptr=None, copy=False): """ Creates a X509Name object - @param ptr - pointer to X509_NAME C structure (as returned by some OpenSSL functions - @param copy - indicates that this structure have to be freed upon object destruction + @param ptr - pointer to X509_NAME C structure (as returned by some + OpenSSL functions + @param copy - indicates that this structure have to be freed upon + object destruction """ if ptr is not None: - self.ptr=ptr - self.need_free=copy - self.writable=False + self.ptr = ptr + self.need_free = copy + self.writable = False else: - self.ptr=libcrypto.X509_NAME_new() - self.need_free=True - self.writable=True + self.ptr = libcrypto.X509_NAME_new() + self.need_free = True + self.writable = True + def __del__(self): """ Frees if neccessary @@ -117,62 +120,65 @@ class X509Name(object): libcrypto.X509_NAME_free(self.ptr) def __str__(self): """ - Produces an ascii representation of the name, escaping all symbols > 0x80 - Probably it is not what you want, unless your native language is English + Produces an ascii representation of the name, escaping all + symbols > 0x80. Probably it is not what you want, unless + your native language is English """ - b=Membio() - libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB) - return str(b) + bio = Membio() + libcrypto.X509_NAME_print_ex(bio.bio, self.ptr, 0, + self.PRINT_FLAG | self.ESC_MSB) + return str(bio) + def __unicode__(self): """ - Produces unicode representation of the name. + Produces unicode representation of the name. """ - b=Membio() - libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG) - return unicode(b) + bio = Membio() + libcrypto.X509_NAME_print_ex(bio.bio, self.ptr, 0, self.PRINT_FLAG) + return unicode(bio) def __len__(self): """ return number of components in the name """ return libcrypto.X509_NAME_entry_count(self.ptr) - def __cmp__(self,other): + def __cmp__(self, other): """ Compares X509 names """ - return libcrypto.X509_NAME_cmp(self.ptr,other.ptr) - def __eq__(self,other): - return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0 + return libcrypto.X509_NAME_cmp(self.ptr, other.ptr) + def __eq__(self, other): + return libcrypto.X509_NAME_cmp(self.ptr, other.ptr) == 0 - def __getitem__(self,key): - if isinstance(key,Oid): + def __getitem__(self, key): + if isinstance(key, Oid): # Return first matching field - idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1) - if idx<0: - raise KeyError("Key not found "+str(Oid)) - entry=libcrypto.X509_NAME_get_entry(self.ptr,idx) - s=libcrypto.X509_NAME_ENTRY_get_data(entry) - b=Membio() - libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG) - return unicode(b) - elif isinstance(key,(int,long)): + idx = libcrypto.X509_NAME_get_index_by_NID(self.ptr, key.nid, -1) + if idx < 0: + raise KeyError("Key not found " + str(Oid)) + entry = libcrypto.X509_NAME_get_entry(self.ptr, idx) + value = libcrypto.X509_NAME_ENTRY_get_data(entry) + bio = Membio() + libcrypto.ASN1_STRING_print_ex(bio.bio, value, self.PRINT_FLAG) + return unicode(bio) + elif isinstance(key, (int, long)): # Return OID, string tuple - entry=libcrypto.X509_NAME_get_entry(self.ptr,key) + entry = libcrypto.X509_NAME_get_entry(self.ptr, key) if entry is None: raise IndexError("name entry index out of range") - oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry)) - s=libcrypto.X509_NAME_ENTRY_get_data(entry) - b=Membio() - libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG) - return (oid,unicode(b)) + oid = Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry)) + value = libcrypto.X509_NAME_ENTRY_get_data(entry) + bio = Membio() + libcrypto.ASN1_STRING_print_ex(bio.bio, value, self.PRINT_FLAG) + return (oid, unicode(bio)) else: raise TypeError("X509 NAME can be indexed by Oids or integers only") - def __setitem__(self,key,val): + def __setitem__(self, key, val): if not self.writable: raise ValueError("Attempt to modify constant X509 object") else: raise NotImplementedError - def __delitem__(self,key): + def __delitem__(self, key): if not self.writable: raise ValueError("Attempt to modify constant X509 object") else: @@ -182,111 +188,140 @@ class X509Name(object): class _x509_ext(Structure): """ Represens C structure X509_EXTENSION """ - _fields_=[("object",c_void_p), - ("critical",c_int), - ("value",c_void_p)] + _fields_ = [("object", c_void_p), + ("critical", c_int), + ("value", c_void_p) + ] class X509_EXT(object): """ Python object which represents a certificate extension """ - def __init__(self,ptr,copy=False): + def __init__(self, ptr, copy=False): """ Initializes from the pointer to X509_EXTENSION. If copy is True, creates a copy, otherwise just stores pointer. """ if copy: - self.ptr=libcrypto.X509_EXTENSION_dup(ptr) + self.ptr = libcrypto.X509_EXTENSION_dup(ptr) else: - self.ptr=cast(ptr,POINTER(_x509_ext)) + self.ptr = cast(ptr, POINTER(_x509_ext)) def __del__(self): libcrypto.X509_EXTENSION_free(self.ptr) def __str__(self): - b=Membio() - libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0) - return str(b) + bio = Membio() + libcrypto.X509V3_EXT_print(bio.bio, self.ptr, 0x20010, 0) + return str(bio) def __unicode__(self): - b=Membio() - libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0) - return unicode(b) + bio = Membio() + libcrypto.X509V3_EXT_print(bio.bio, self.ptr, 0x20010, 0) + return unicode(bio) @property def oid(self): + "Returns OID of the extension" return Oid.fromobj(self.ptr[0].object) @property - def critical(self): - return self.ptr[0].critical >0 -class _X509extlist(object): + def critical(self): + "Returns True if extensin have critical flag set" + return self.ptr[0].critical > 0 + +class _X509extlist(object): """ - Represents list of certificate extensions + Represents list of certificate extensions. Really it keeps + reference to certificate object """ - def __init__(self,cert): - self.cert=cert + def __init__(self, cert): + """ + Initialize from X509 object + """ + self.cert = cert + def __len__(self): + """ + Returns number of extensions + """ return libcrypto.X509_get_ext_count(self.cert.cert) - def __getitem__(self,item): - p=libcrypto.X509_get_ext(self.cert.cert,item) - if p is None: + + def __getitem__(self, item): + """ + Returns extension by index, creating a copy + """ + ext_ptr = libcrypto.X509_get_ext(self.cert.cert, item) + if ext_ptr is None: raise IndexError - return X509_EXT(p,True) - def find(self,oid): + return X509_EXT(ext_ptr, True) + def find(self, oid): """ Return list of extensions with given Oid """ - if not isinstance(oid,Oid): + if not isinstance(oid, Oid): raise TypeError("Need crytypescrypto.oid.Oid as argument") - found=[] - l=-1 - end=len(self) + found = [] + index = -1 + end = len(self) while True: - l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l) - if l>=end or l<0: - break - found.append(self[l]) + index = libcrypto.X509_get_ext_by_NID(self.cert.cert, oid.nid, + index) + if index >= end or index < 0: + break + found.append(self[index]) return found - def find_critical(self,crit=True): + + def find_critical(self, crit=True): """ Return list of critical extensions (or list of non-cricital, if optional second argument is False """ if crit: - flag=1 + flag = 1 else: - flag=0 - found=[] - end=len(self) - l=-1 + flag = 0 + found = [] + end = len(self) + index = -1 while True: - l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l) - if l>=end or l<0: - break - found.append(self[l]) - return found + index = libcrypto.X509_get_ext_by_critical(self.cert.cert, flag, + index) + if index >= end or index < 0: + break + found.append(self[index]) + return found + +def _X509__asn1date_to_datetime(asn1date): + """ + Converts openssl ASN1_TIME object to python datetime.datetime + """ + bio = Membio() + libcrypto.ASN1_TIME_print(bio.bio, asn1date) + pydate = datetime.strptime(str(bio), "%b %d %H:%M:%S %Y %Z") + return pydate.replace(tzinfo=utc) class X509(object): """ - Represents X.509 certificate. + Represents X.509 certificate. """ - def __init__(self,data=None,ptr=None,format="PEM"): + def __init__(self, data=None, ptr=None, format="PEM"): """ Initializes certificate @param data - serialized certificate in PEM or DER format. - @param ptr - pointer to X509, returned by some openssl function. + @param ptr - pointer to X509, returned by some openssl function. mutually exclusive with data @param format - specifies data format. "PEM" or "DER", default PEM """ if ptr is not None: - if data is not None: + if data is not None: raise TypeError("Cannot use data and ptr simultaneously") self.cert = ptr elif data is None: raise TypeError("data argument is required") else: - b=Membio(data) + bio = Membio(data) if format == "PEM": - self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None) + self.cert = libcrypto.PEM_read_bio_X509(bio.bio, None, None, + None) else: - self.cert=libcrypto.d2i_X509_bio(b.bio,None) + self.cert = libcrypto.d2i_X509_bio(bio.bio, None) if self.cert is None: raise X509Error("error reading certificate") - self.extensions=_X509extlist(self) + self.extensions = _X509extlist(self) def __del__(self): """ Frees certificate object @@ -294,62 +329,63 @@ class X509(object): libcrypto.X509_free(self.cert) def __str__(self): """ Returns der string of the certificate """ - b=Membio() - if libcrypto.i2d_X509_bio(b.bio,self.cert)==0: + bio = Membio() + if libcrypto.i2d_X509_bio(bio.bio, self.cert) == 0: raise X509Error("error serializing certificate") - return str(b) + return str(bio) def __repr__(self): """ Returns valid call to the constructor """ - return "X509(data="+repr(str(self))+",format='DER')" + return "X509(data=" + repr(str(self)) + ",format='DER')" @property def pubkey(self): """EVP PKEy object of certificate public key""" - return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False)) + return PKey(ptr=libcrypto.X509_get_pubkey(self.cert, False)) def pem(self): """ Returns PEM represntation of the certificate """ - b=Membio() - if libcrypto.PEM_write_bio_X509(b.bio,self.cert)==0: + bio = Membio() + if libcrypto.PEM_write_bio_X509(bio.bio, self.cert) == 0: raise X509Error("error serializing certificate") - return str(b) - def verify(self,store=None,chain=[],key=None): - """ - Verify self. Supports verification on both X509 store object + return str(bio) + def verify(self, store=None, chain=None, key=None): + """ + Verify self. Supports verification on both X509 store object or just public issuer key @param store X509Store object. @param chain - list of X509 objects to add into verification context.These objects are untrusted, but can be used to build certificate chain up to trusted object in the store @param key - PKey object with open key to validate signature - - parameters store and key are mutually exclusive. If neither + + parameters store and key are mutually exclusive. If neither is specified, attempts to verify self as self-signed certificate """ if store is not None and key is not None: raise X509Error("key and store cannot be specified simultaneously") if store is not None: - ctx=libcrypto.X509_STORE_CTX_new() + ctx = libcrypto.X509_STORE_CTX_new() if ctx is None: raise X509Error("Error allocating X509_STORE_CTX") - if chain is not None and len(chain)>0: - ch=StackOfX509(chain) + if chain is not None and len(chain) > 0: + chain_ptr = StackOfX509(chain).ptr else: - ch=None - if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0: + chain_ptr = None + if libcrypto.X509_STORE_CTX_init(ctx, store.store, self.cert, + chain_ptr) < 0: raise X509Error("Error allocating X509_STORE_CTX") - res= libcrypto.X509_verify_cert(ctx) + res = libcrypto.X509_verify_cert(ctx) libcrypto.X509_STORE_CTX_free(ctx) - return res>0 + return res > 0 else: if key is None: if self.issuer != self.subject: # Not a self-signed certificate return False key = self.pubkey - res = libcrypto.X509_verify(self.cert,key.key) + res = libcrypto.X509_verify(self.cert, key.key) if res < 0: raise X509Error("X509_verify failed") - return res>0 - + return res > 0 + @property def subject(self): """ X509Name for certificate subject name """ @@ -361,134 +397,139 @@ class X509(object): @property def serial(self): """ Serial number of certificate as integer """ - asnint=libcrypto.X509_get_serialNumber(self.cert) - b=Membio() - libcrypto.i2a_ASN1_INTEGER(b.bio,asnint) - return int(str(b),16) - @property + asnint = libcrypto.X509_get_serialNumber(self.cert) + bio = Membio() + libcrypto.i2a_ASN1_INTEGER(bio.bio, asnint) + return int(str(bio), 16) + @property def version(self): - """ certificate version as integer. Really certificate stores 0 for - version 1 and 2 for version 3, but we return 1 and 3 """ - asn1int=cast(self.cert,_px509)[0].cert_info[0].version - return libcrypto.ASN1_INTEGER_get(asn1int)+1 + """ + certificate version as integer. Really certificate stores 0 for + version 1 and 2 for version 3, but we return 1 and 3 + """ + asn1int = cast(self.cert, _px509)[0].cert_info[0].version + return libcrypto.ASN1_INTEGER_get(asn1int) + 1 @property def startDate(self): """ Certificate validity period start date """ - # Need deep poke into certificate structure (x)->cert_info->validity->notBefore - global utc - asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore - b=Membio() - libcrypto.ASN1_TIME_print(b.bio,asn1date) - return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc) + # Need deep poke into certificate structure + # (x)->cert_info->validity->notBefore + asn1 = cast(self.cert, _px509)[0].cert_info[0].validity[0].notBefore + return __asn1date_to_datetime(asn1) @property def endDate(self): """ Certificate validity period end date """ - # Need deep poke into certificate structure (x)->cert_info->validity->notAfter - global utc - asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter - b=Membio() - libcrypto.ASN1_TIME_print(b.bio,asn1date) - return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc) + # Need deep poke into certificate structure + # (x)->cert_info->validity->notAfter + asn1 = cast(self.cert, _px509)[0].cert_info[0].validity[0].notAfter + return __asn1date_to_datetime(asn1) def check_ca(self): """ Returns True if certificate is CA certificate """ - return libcrypto.X509_check_ca(self.cert)>0 + return libcrypto.X509_check_ca(self.cert) > 0 + class X509Store(object): """ - Represents trusted certificate store. Can be used to lookup CA - certificates to verify + Represents trusted certificate store. Can be used to lookup CA + certificates to verify - @param file - file with several certificates and crls - to load into store - @param dir - hashed directory with certificates and crls - @param default - if true, default verify location (directory) - is installed + @param file - file with several certificates and crls + to load into store + @param dir - hashed directory with certificates and crls + @param default - if true, default verify location (directory) + is installed """ - def __init__(self,file=None,dir=None,default=False): + def __init__(self, file=None, dir=None, default=False): """ - Creates X509 store and installs lookup method. Optionally initializes + Creates X509 store and installs lookup method. Optionally initializes by certificates from given file or directory. """ # # Todo - set verification flags - # - self.store=libcrypto.X509_STORE_new() + # + self.store = libcrypto.X509_STORE_new() if self.store is None: raise X509Error("allocating store") - lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file()) + lookup = libcrypto.X509_STORE_add_lookup(self.store, + libcrypto.X509_LOOKUP_file()) if lookup is None: raise X509Error("error installing file lookup method") - if (file is not None): - if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0: + if file is not None: + if not libcrypto.X509_LOOKUP_ctrl(lookup, 1, file, 1, None) > 0: raise X509Error("error loading trusted certs from file "+file) - lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir()) + lookup = libcrypto.X509_STORE_add_lookup(self.store, + libcrypto.X509_LOOKUP_hash_dir()) if lookup is None: raise X509Error("error installing hashed lookup method") if dir is not None: - if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0: + if not libcrypto.X509_LOOKUP_ctrl(lookup, 2, dir, 1, None) > 0: raise X509Error("error adding hashed trusted certs dir "+dir) if default: - if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0: + if not libcrypto.X509_LOOKUP_ctrl(lookup, 2, None, 3, None) > 0: raise X509Error("error adding default trusted certs dir ") - def add_cert(self,cert): + def add_cert(self, cert): """ Explicitely adds certificate to set of trusted in the store @param cert - X509 object to add """ - if not isinstance(cert,X509): + if not isinstance(cert, X509): raise TypeError("cert should be X509") - libcrypto.X509_STORE_add_cert(self.store,cert.cert) - def add_callback(self,callback): + libcrypto.X509_STORE_add_cert(self.store, cert.cert) + def add_callback(self, callback): """ Installs callback function, which would receive detailed information about verified ceritificates """ raise NotImplementedError - def setflags(self,flags): + def setflags(self, flags): """ Set certificate verification flags. @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants """ - libcrypto.X509_STORE_set_flags(self.store,flags) - def setpurpose(self,purpose): + libcrypto.X509_STORE_set_flags(self.store, flags) + def setpurpose(self, purpose): """ Sets certificate purpose which verified certificate should match - @param purpose - number from 1 to 9 or standard strind defined in Openssl - possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper - """ - if isinstance(purpose,str): - purp_no=X509_PURPOSE_get_by_sname(purpose) - if purp_no <=0: - raise X509Error("Invalid certificate purpose '"+purpose+"'") - elif isinstance(purpose,int): + @param purpose - number from 1 to 9 or standard strind defined + in Openssl + possible strings - sslcient,sslserver, nssslserver, smimesign,i + smimeencrypt, crlsign, any, ocsphelper + """ + if isinstance(purpose, str): + purp_no = libcrypto.X509_PURPOSE_get_by_sname(purpose) + if purp_no <= 0: + raise X509Error("Invalid certificate purpose '%s'" % purpose) + elif isinstance(purpose, int): purp_no = purpose - if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0: + if libcrypto.X509_STORE_set_purpose(self.store, purp_no) <= 0: raise X509Error("cannot set purpose") - def setdepth(self,depth): + def setdepth(self, depth): """ Sets the verification depth i.e. max length of certificate chain which is acceptable """ - libcrypto.X509_STORE_set_depth(self.store,depth) + libcrypto.X509_STORE_set_depth(self.store, depth) def settime(self, time): """ Set point in time used to check validity of certificates for Time can be either python datetime object or number of seconds sinse epoch """ - if isinstance(time,datetime.datetime) or isinstance(time,datetime.date): - d=int(time.strftime("%s")) - elif isinstance(time,int): - pass + if isinstance(time, datetime) or isinstance(time, + datetime.date): + seconds = int(time.strftime("%s")) + elif isinstance(time, int): + seconds = time else: - raise TypeError("datetime.date, datetime.datetime or integer is required as time argument") + raise TypeError("datetime.date, datetime.datetime or integer " + + "is required as time argument") raise NotImplementedError class StackOfX509(object): """ Implements OpenSSL STACK_OF(X509) object. It looks much like python container types """ - def __init__(self,certs=None,ptr=None,disposable=True): + def __init__(self, certs=None, ptr=None, disposable=True): """ Create stack @param certs - list of X509 objects. If specified, read-write @@ -502,83 +543,89 @@ class StackOfX509(object): """ if ptr is None: self.need_free = True - self.ptr=libcrypto.sk_new_null() + self.ptr = libcrypto.sk_new_null() if certs is not None: for crt in certs: self.append(crt) elif certs is not None: - raise ValueError("cannot handle certs an ptr simultaneously") + raise ValueError("cannot handle certs an ptr simultaneously") else: self.need_free = disposable - self.ptr=ptr + self.ptr = ptr def __len__(self): return libcrypto.sk_num(self.ptr) - def __getitem__(self,index): - if index <0 or index>=len(self): + def __getitem__(self, index): + if index < 0 or index >= len(self): raise IndexError - p=libcrypto.sk_value(self.ptr,index) + p = libcrypto.sk_value(self.ptr, index) return X509(ptr=libcrypto.X509_dup(p)) - def __setitem__(self,index,value): + def __setitem__(self, index, value): if not self.need_free: raise ValueError("Stack is read-only") - if index <0 or index>=len(self): + if index < 0 or index >= len(self): raise IndexError - if not isinstance(value,X509): + if not isinstance(value, X509): raise TypeError('StackOfX508 can contain only X509 objects') - p=libcrypto.sk_value(self.ptr,index) - libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert)) + p = libcrypto.sk_value(self.ptr, index) + libcrypto.sk_set(self.ptr, index, libcrypto.X509_dup(value.cert)) libcrypto.X509_free(p) - def __delitem__(self,index): + def __delitem__(self, index): if not self.need_free: raise ValueError("Stack is read-only") - if index <0 or index>=len(self): + if index < 0 or index >= len(self): raise IndexError - p=libcrypto.sk_delete(self.ptr,index) + p = libcrypto.sk_delete(self.ptr, index) libcrypto.X509_free(p) def __del__(self): if self.need_free: - libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free) - def append(self,value): + libcrypto.sk_pop_free(self.ptr, libcrypto.X509_free) + def append(self, value): + """ Adds certificate to stack """ if not self.need_free: raise ValueError("Stack is read-only") - if not isinstance(value,X509): + if not isinstance(value, X509): raise TypeError('StackOfX508 can contain only X509 objects') - libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert)) -libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p) -libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long) -libcrypto.PEM_read_bio_X509.restype=c_void_p -libcrypto.PEM_read_bio_X509.argtypes=(c_void_p,POINTER(c_void_p),c_void_p,c_void_p) -libcrypto.PEM_write_bio_X509.restype=c_int -libcrypto.PEM_write_bio_X509.argtypes=(c_void_p,c_void_p) -libcrypto.ASN1_TIME_print.argtypes=(c_void_p,c_void_p) -libcrypto.ASN1_INTEGER_get.argtypes=(c_void_p,) -libcrypto.ASN1_INTEGER_get.restype=c_long -libcrypto.X509_get_serialNumber.argtypes=(c_void_p,) -libcrypto.X509_get_serialNumber.restype=c_void_p -libcrypto.X509_NAME_ENTRY_get_object.restype=c_void_p -libcrypto.X509_NAME_ENTRY_get_object.argtypes=(c_void_p,) -libcrypto.OBJ_obj2nid.argtypes=(c_void_p,) -libcrypto.X509_NAME_get_entry.restype=c_void_p -libcrypto.X509_NAME_get_entry.argtypes=(c_void_p,c_int) -libcrypto.X509_STORE_new.restype=c_void_p -libcrypto.X509_STORE_add_lookup.restype=c_void_p -libcrypto.X509_STORE_add_lookup.argtypes=(c_void_p,c_void_p) -libcrypto.X509_LOOKUP_file.restype=c_void_p -libcrypto.X509_LOOKUP_hash_dir.restype=c_void_p -libcrypto.X509_LOOKUP_ctrl.restype=c_int -libcrypto.X509_LOOKUP_ctrl.argtypes=(c_void_p,c_int,c_char_p,c_long,POINTER(c_char_p)) -libcrypto.X509_EXTENSION_dup.argtypes=(c_void_p,) -libcrypto.X509_EXTENSION_dup.restype=POINTER(_x509_ext) -libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int) -libcrypto.X509_get_ext.restype=c_void_p -libcrypto.X509_get_ext.argtypes=(c_void_p,c_int) -libcrypto.X509V3_EXT_print.argtypes=(c_void_p,POINTER(_x509_ext),c_long,c_int) -libcrypto.sk_set.argtypes=(c_void_p,c_int,c_void_p) -libcrypto.sk_set.restype=c_void_p -libcrypto.sk_value.argtypes=(c_void_p,c_int) -libcrypto.sk_value.restype=c_void_p -libcrypto.X509_dup.restype=c_void_p -libcrypto.sk_new_null.restype=c_void_p -libcrypto.X509_dup.argtypes=(c_void_p,) -libcrypto.X509_NAME_hash.restype=c_long -libcrypto.X509_NAME_hash.argtypes=(c_void_p,) + libcrypto.sk_push(self.ptr, libcrypto.X509_dup(value.cert)) + +libcrypto.i2a_ASN1_INTEGER.argtypes = (c_void_p, c_void_p) +libcrypto.ASN1_STRING_print_ex.argtypes = (c_void_p, c_void_p, c_long) +libcrypto.PEM_read_bio_X509.restype = c_void_p +libcrypto.PEM_read_bio_X509.argtypes = (c_void_p, POINTER(c_void_p), + c_void_p, c_void_p) +libcrypto.PEM_write_bio_X509.restype = c_int +libcrypto.PEM_write_bio_X509.argtypes = (c_void_p, c_void_p) +libcrypto.ASN1_TIME_print.argtypes = (c_void_p, c_void_p) +libcrypto.ASN1_INTEGER_get.argtypes = (c_void_p, ) +libcrypto.ASN1_INTEGER_get.restype = c_long +libcrypto.X509_get_serialNumber.argtypes = (c_void_p, ) +libcrypto.X509_get_serialNumber.restype = c_void_p +libcrypto.X509_NAME_ENTRY_get_object.restype = c_void_p +libcrypto.X509_NAME_ENTRY_get_object.argtypes = (c_void_p, ) +libcrypto.OBJ_obj2nid.argtypes = (c_void_p, ) +libcrypto.X509_NAME_get_entry.restype = c_void_p +libcrypto.X509_NAME_get_entry.argtypes = (c_void_p, c_int) +libcrypto.X509_STORE_new.restype = c_void_p +libcrypto.X509_STORE_add_lookup.restype = c_void_p +libcrypto.X509_STORE_add_lookup.argtypes = (c_void_p, c_void_p) +libcrypto.X509_LOOKUP_file.restype = c_void_p +libcrypto.X509_LOOKUP_hash_dir.restype = c_void_p +libcrypto.X509_LOOKUP_ctrl.restype = c_int +libcrypto.X509_LOOKUP_ctrl.argtypes = (c_void_p, c_int, c_char_p, c_long, + POINTER(c_char_p)) +libcrypto.X509_EXTENSION_dup.argtypes = (c_void_p, ) +libcrypto.X509_EXTENSION_dup.restype = POINTER(_x509_ext) +libcrypto.X509V3_EXT_print.argtypes = (c_void_p, POINTER(_x509_ext), c_long, + c_int) +libcrypto.X509_get_ext.restype = c_void_p +libcrypto.X509_get_ext.argtypes = (c_void_p, c_int) +libcrypto.X509V3_EXT_print.argtypes = (c_void_p, POINTER(_x509_ext), c_long, + c_int) +libcrypto.sk_set.argtypes = (c_void_p, c_int, c_void_p) +libcrypto.sk_set.restype = c_void_p +libcrypto.sk_value.argtypes = (c_void_p, c_int) +libcrypto.sk_value.restype = c_void_p +libcrypto.X509_dup.restype = c_void_p +libcrypto.sk_new_null.restype = c_void_p +libcrypto.X509_dup.argtypes = (c_void_p, ) +libcrypto.X509_NAME_hash.restype = c_long +libcrypto.X509_NAME_hash.argtypes = (c_void_p, ) diff --git a/setup.py b/setup.py index 6d95b46..c9babe9 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,7 @@ class MyTests(distutils.cmd.Command): setup( name="ctypescrypto", - version="0.2.7", + version="0.3.0", description="CTypes-based interface for some OpenSSL libcrypto features", author="Victor Wagner", author_email="vitus@wagner.pp.ru", diff --git a/tests/testdigest.py b/tests/testdigest.py index 120d73c..7a48ed0 100644 --- a/tests/testdigest.py +++ b/tests/testdigest.py @@ -6,46 +6,46 @@ import unittest class TestDigestType(unittest.TestCase): def test_md4(self): d=digest.DigestType("md4") - self.assertEqual(d.digest_size(),16) - self.assertEqual(d.block_size(),64) - self.assertEqual(d.oid(),Oid("md4")) + self.assertEqual(d.digest_size,16) + self.assertEqual(d.block_size,64) + self.assertEqual(d.oid,Oid("md4")) self.assertEqual(d.name,'md4') def test_md5(self): d=digest.DigestType("md5") - self.assertEqual(d.digest_size(),16) - self.assertEqual(d.block_size(),64) - self.assertEqual(d.oid(),Oid("md5")) + self.assertEqual(d.digest_size,16) + self.assertEqual(d.block_size,64) + self.assertEqual(d.oid,Oid("md5")) self.assertEqual(d.name,'md5') def test_sha1(self): d=digest.DigestType("sha1") - self.assertEqual(d.digest_size(),20) - self.assertEqual(d.block_size(),64) - self.assertEqual(d.oid(),Oid("sha1")) + self.assertEqual(d.digest_size,20) + self.assertEqual(d.block_size,64) + self.assertEqual(d.oid,Oid("sha1")) self.assertEqual(d.name,'sha1') def test_sha256(self): d=digest.DigestType("sha256") - self.assertEqual(d.digest_size(),32) - self.assertEqual(d.block_size(),64) - self.assertEqual(d.oid(),Oid("sha256")) + self.assertEqual(d.digest_size,32) + self.assertEqual(d.block_size,64) + self.assertEqual(d.oid,Oid("sha256")) self.assertEqual(d.name,'sha256') def test_sha384(self): d=digest.DigestType("sha384") - self.assertEqual(d.digest_size(),48) - self.assertEqual(d.block_size(),128) - self.assertEqual(d.oid(),Oid("sha384")) + self.assertEqual(d.digest_size,48) + self.assertEqual(d.block_size,128) + self.assertEqual(d.oid,Oid("sha384")) self.assertEqual(d.name,'sha384') def test_sha512(self): d=digest.DigestType("sha512") - self.assertEqual(d.digest_size(),64) - self.assertEqual(d.block_size(),128) - self.assertEqual(d.oid(),Oid("sha512")) + self.assertEqual(d.digest_size,64) + self.assertEqual(d.block_size,128) + self.assertEqual(d.oid,Oid("sha512")) self.assertEqual(d.name,'sha512') def test_createfromoid(self): oid=Oid('sha256') d=digest.DigestType(oid) - self.assertEqual(d.digest_size(),32) - self.assertEqual(d.block_size(),64) - self.assertEqual(d.oid(),Oid("sha256")) + self.assertEqual(d.digest_size,32) + self.assertEqual(d.block_size,64) + self.assertEqual(d.oid,Oid("sha256")) self.assertEqual(d.name,'sha256') def test_createfromEVP_MD(self): d1=digest.DigestType("sha256") @@ -53,9 +53,9 @@ class TestDigestType(unittest.TestCase): with self.assertRaises(AttributeError): s=d2.name d2.digest=d1.digest - self.assertEqual(d2.digest_size(),32) - self.assertEqual(d2.block_size(),64) - self.assertEqual(d2.oid(),Oid("sha256")) + self.assertEqual(d2.digest_size,32) + self.assertEqual(d2.block_size,64) + self.assertEqual(d2.oid,Oid("sha256")) self.assertEqual(d2.name,'sha256') def test_invalidDigest(self): with self.assertRaises(digest.DigestError): -- 2.39.2