From: Victor Wagner Date: Thu, 5 Jun 2014 14:11:37 +0000 (+0400) Subject: rewrote cipher module X-Git-Url: https://www.wagner.pp.ru/gitweb/?p=oss%2Fctypescrypto.git;a=commitdiff_plain;h=bab34d05e2c951375e9d20097da3752acc83ccf7 rewrote cipher module --- diff --git a/README.md b/README.md index 9abd63b..dfcab2b 100644 --- a/README.md +++ b/README.md @@ -37,13 +37,11 @@ digests.py - Interface to EVP\_Digest\* family of functions. Status: fully implemented and covered by tests ciphers.py - Interface to EVP\_Cipher family of function. - Status: Needs complete rewriting and test coverage. Idea to keep - cleartext in python variable until entire text would be passed to - update is EVIL. + Status: Needs documenting and test coverage pkey.py - Low-level private key operations (like pkey, genpkey and p keyutl command line ops), all via algorithm-agnostic EVP interface. - Status: Designed and mostly implemented but not yet covered by tests + Status: Designed and started to implement but not yet covered by tests exception.py OpenSSL error stack to python exception conversion Implemented. diff --git a/ctypescrypto/cipher.py b/ctypescrypto/cipher.py index 73a14ce..4fb5767 100644 --- a/ctypescrypto/cipher.py +++ b/ctypescrypto/cipher.py @@ -1,39 +1,52 @@ -from ctypes import * +from ctypes import create_string_buffer,c_char_p,c_void_p,c_int,c_long,byref +from ctypescrypto import libcrypto +from ctypescryto.exception import LibCrytoError CIPHER_ALGORITHMS = ("DES", "DES-EDE3", "BF", "AES-128", "AES-192", "AES-256") -CIPHER_MODES = ("CBC", "CFB", "OFB", "ECB") +CIPHER_MODES = ("STREAM","ECB","CBC", "CFB", "OFB", "CTR","GCM") -class CipherError(Exception): +# + +class CipherError(LibCryptoError): pass +def new(algname,key,encrypt=True,iv=None): + ct=CipherType(algname) + return Cipher(ct,key,iv,encrypt) + class CipherType: - def __init__(self, libcrypto, cipher_algo, cipher_mode): - self.libcrypto = libcrypto - self.cipher_algo = cipher_algo - self.cipher_mode = cipher_mode - cipher_name = "-".join([self.cipher_algo, self.cipher_mode]) - self.cipher = self.libcrypto.EVP_get_cipherbyname(cipher_name) - if self.cipher == 0: + def __init__(self, cipher_name): + self.cipher = libcrypto.EVP_get_cipherbyname(cipher_name) + if self.cipher is None: raise CipherError, "Unknown cipher: %s" % cipher_name def __del__(self): pass - + def block_size(self): + return libcrypto.EVP_CIHPER_block_size(self.cipher) + def key_length(self): + return libcrypto.EVP_CIPHER_key_length(self.cipher) + def iv_length(self): + return libcrypto.EVP_CIPHER_iv_length(self.cipher) + def flags(self): + return libcrypto.EVP_CIPHER_flags(self.cipher) + def mode(self): + return CIPHER_MODES[self.flags & 0x7] def algo(self): - return self.cipher_algo - + return self.oid().short_name() def mode(self): return self.cipher_mode + def oid(self): + return Oid(libcrypto.EVP_CIPHER_nid(self.cipher)) class Cipher: - def __init__(self, libcrypto, cipher_type, key, iv, encrypt=True): - self.libcrypto = libcrypto + def __init__(self, cipher_type, key, iv, encrypt=True): self._clean_ctx() key_ptr = c_char_p(key) iv_ptr = c_char_p(iv) - self.ctx = self.libcrypto.EVP_CIPHER_CTX_new(cipher_type.cipher, None, key_ptr, iv_ptr) + self.ctx = libcrypto.EVP_CIPHER_CTX_new(cipher_type.cipher, None, key_ptr, iv_ptr) if self.ctx == 0: raise CipherError, "Unable to create cipher context" self.encrypt = encrypt @@ -41,8 +54,9 @@ class Cipher: enc = 1 else: enc = 0 - result = self.libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, key_ptr, iv_ptr, c_int(enc)) + result = libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, key_ptr, iv_ptr, c_int(enc)) self.cipher_type = cipher_type + self.block_size = self.cipher_type.block_size() if result == 0: self._clean_ctx() raise CipherError, "Unable to initialize cipher" @@ -55,7 +69,7 @@ class Cipher: padding_flag = 1 else: padding_flag = 0 - self.libcrypto.EVP_CIPHER_CTX_set_padding(self.ctx, padding_flag) + libcrypto.EVP_CIPHER_CTX_set_padding(self.ctx, padding_flag) def update(self, data): if self.cipher_finalized : @@ -64,29 +78,30 @@ class Cipher: raise TypeError, "A string is expected" if len(data) <= 0: return "" - self.data = self.data + data + outbuf=string_buffer_create(self.blocsize+len(data)) + outlen=c_int(0) + ret=libcrypto.EVP_CipherUpdate(self.ctx,outbuf,byref(outlen), + data,len(data)) + if ret <=0: + self._clean_ctx() + self.cipher_finalized=True + del self.ctx + raise CipherError("problem processing data") + return outbuf.raw[:outlen] - def finish(self, data=None): - if data is not None: - self.update(data) - return self._finish() - - def _finish(self): + def finish(self): if self.cipher_finalized : raise CipherError, "Cipher operation is already completed" - self.cipher_out = create_string_buffer(len(self.data) + 32) - result = self.libcrypto.EVP_CipherUpdate(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len), c_char_p(self.data), len(self.data)) - if result == 0: - self._clean_ctx() - raise CipherError, "Unable to update cipher" + outbuf=create_string_buffer(self.block_size) self.cipher_finalized = True - update_data = self.cipher_out.raw[:self.cipher_out_len.value] - result = self.libcrypto.EVP_CipherFinal_ex(self.ctx, byref(self.cipher_out), byref(self.cipher_out_len)) + result = self.libcrypto.EVP_CipherFinal_ex(self.ctx,outbuf , byref(outlen)) if result == 0: self._clean_ctx() raise CipherError, "Unable to finalize cipher" - final_data = self.cipher_out.raw[:self.cipher_out_len.value] - return update_data + final_data + if outlen>0: + return outbuf.raw[:outlen] + else + return "" def _clean_ctx(self): try: @@ -96,7 +111,4 @@ class Cipher: del(self.ctx) except AttributeError: pass - self.cipher_out = None - self.cipher_out_len = c_long(0) - self.data = "" - self.cipher_finalized = False \ No newline at end of file + self.cipher_finalized = True