X-Git-Url: https://www.wagner.pp.ru/gitweb/?p=oss%2Fctypescrypto.git;a=blobdiff_plain;f=ctypescrypto%2Fpkey.py;h=97fe4b131dd8c175db95a90168451523d9871517;hp=af31a6725bc5ed04726ee7c401bed94e3ee331e6;hb=968dd1b70b51b9df1a5ee3f7c6d1645a536fb7e0;hpb=305f347018c18fa4183f8ec76177336881b713c9 diff --git a/ctypescrypto/pkey.py b/ctypescrypto/pkey.py index af31a67..97fe4b1 100644 --- a/ctypescrypto/pkey.py +++ b/ctypescrypto/pkey.py @@ -7,7 +7,7 @@ PKey object of this module is wrapper around OpenSSL EVP_PKEY object. from ctypes import c_char, c_char_p, c_void_p, c_int, c_long, POINTER from ctypes import create_string_buffer, byref, memmove, CFUNCTYPE -from ctypescrypto import libcrypto +from ctypescrypto import libcrypto,pyver,bintype,chartype from ctypescrypto.exception import LibCryptoError, clear_err_stack from ctypescrypto.bio import Membio @@ -32,18 +32,35 @@ def _password_callback(c): if c is None: return PW_CALLBACK_FUNC(0) if callable(c): - def __cb(buf, length, rwflag, userdata): - pwd = c(rwflag) - cnt = min(len(pwd),length) - memmove(buf,pwd, cnt) - return cnt + if pyver ==2 : + def __cb(buf, length, rwflag, userdata): + pwd = c(rwflag) + cnt = min(len(pwd),length) + memmove(buf,pwd, cnt) + return cnt + else: + def __cb(buf, length, rwflag, userdata): + pwd = c(rwflag).encode("utf-8") + cnt = min(len(pwd),length) + memmove(buf,pwd, cnt) + return cnt else: + if pyver > 2: + c=c.encode("utf-8") def __cb(buf,length,rwflag,userdata): cnt=min(len(c),length) memmove(buf,c,cnt) return cnt return PW_CALLBACK_FUNC(__cb) +def _keybio(blob, format): + # But DER string should be binary + if format == "PEM" and isinstance(blob,chartype): + return Membio(blob.encode("ascii"),clone=True) + elif isinstance(blob,bintype): + return Membio(blob) + else: + raise TypeError("Key should be either blob or PEM string") class PKey(object): """ @@ -100,7 +117,7 @@ class PKey(object): if not pubkey is None: raise TypeError("Just one of ptr, pubkey or privkey can " + "be specified") - bio = Membio(privkey) + bio=_keybio(privkey,format) self.cansign = True if format == "PEM": self.key = libcrypto.PEM_read_bio_PrivateKey(bio.bio, None, @@ -116,7 +133,7 @@ class PKey(object): if self.key is None: raise PKeyError("error parsing private key") elif not pubkey is None: - bio = Membio(pubkey) + bio = _keybio(pubkey,format) self.cansign = False if format == "PEM": self.key = libcrypto.PEM_read_bio_PUBKEY(bio.bio, None, @@ -132,7 +149,8 @@ class PKey(object): def __del__(self): """ Frees EVP_PKEY object (note, it is reference counted) """ - libcrypto.EVP_PKEY_free(self.key) + if hasattr(self,"key"): + libcrypto.EVP_PKEY_free(self.key) def __eq__(self, other): """ Compares two public keys. If one has private key and other @@ -257,7 +275,11 @@ class PKey(object): paramsfrom does work too """ tmpeng = c_void_p(None) - ameth = libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng), algorithm, -1) + if isinstance(algorithm, chartype): + alg = algorithm.encode("ascii") + else: + alg = algorithm + ameth = libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng), alg, -1) if ameth is None: raise PKeyError("Algorithm %s not foind\n"%(algorithm)) clear_err_stack() @@ -317,14 +339,17 @@ class PKey(object): evp_cipher, None, 0, _password_callback(password), None) + if ret ==0: + raise PKeyError("error serializing private key") + return str(bio) else: ret = libcrypto.i2d_PKCS8PrivateKey_bio(bio.bio, self.key, evp_cipher, None, 0, _password_callback(password), None) - if ret == 0: - raise PKeyError("error serializing private key") - return str(bio) + if ret ==0: + raise PKeyError("error serializing private key") + return bintype(bio) @staticmethod def _configure_context(ctx, opts, skip=()): @@ -340,7 +365,20 @@ class PKey(object): for oper in opts: if oper in skip: continue - ret = libcrypto.EVP_PKEY_CTX_ctrl_str(ctx, oper, str(opts[oper])) + if isinstance(oper,chartype): + op = oper.encode("ascii") + else: + op = oper + if isinstance(opts[oper],chartype): + value = opts[oper].encode("ascii") + elif isinstance(opts[oper],bintype): + value = opts[oper] + else: + if pyver == 2: + value = str(opts[oper]) + else: + value = str(opts[oper]).encode('ascii') + ret = libcrypto.EVP_PKEY_CTX_ctrl_str(ctx, op, value) if ret == -2: raise PKeyError("Parameter %s is not supported by key" % oper) if ret < 1: