From: Victor Wagner Date: Sat, 27 Jun 2015 16:33:06 +0000 (+0300) Subject: Converted tabs to spaces to make pylint happy X-Git-Url: https://www.wagner.pp.ru/gitweb/?p=oss%2Fctypescrypto.git;a=commitdiff_plain;h=954b6dc9e3312f8d8b49f20f8466e6d2a8342f35 Converted tabs to spaces to make pylint happy --- diff --git a/ctypescrypto/__init__.py b/ctypescrypto/__init__.py index 7ae7125..1963188 100644 --- a/ctypescrypto/__init__.py +++ b/ctypescrypto/__init__.py @@ -1,5 +1,5 @@ """ - Interface to some libcrypto functions + Interface to some libcrypto functions """ @@ -7,11 +7,11 @@ from ctypes import CDLL,c_char_p def config(filename=None): - """ - Loads OpenSSL Config file. If none are specified, loads default - (compiled in) one - """ - libcrypto.OPENSSL_config(filename) + """ + Loads OpenSSL Config file. If none are specified, loads default + (compiled in) one + """ + libcrypto.OPENSSL_config(filename) __all__ = ['config'] diff --git a/ctypescrypto/bio.py b/ctypescrypto/bio.py index b1700b9..2c54212 100644 --- a/ctypescrypto/bio.py +++ b/ctypescrypto/bio.py @@ -4,85 +4,85 @@ Interface to OpenSSL BIO library from ctypescrypto import libcrypto from ctypes import c_char_p, c_void_p, c_int, string_at, c_long,POINTER,byref, create_string_buffer class Membio(object): - """ - Provides interface to OpenSSL memory bios - use str() or unicode() to get contents of writable bio - use bio member to pass to libcrypto function - """ - def __init__(self,data=None): - """ If data is specified, creates read-only BIO. If data is - None, creates writable BIO, contents of which can be retrieved by str() or unicode() - """ - if data is None: - method=libcrypto.BIO_s_mem() - self.bio=libcrypto.BIO_new(method) - else: - self.bio=libcrypto.BIO_new_mem_buf(c_char_p(data),len(data)) - def __del__(self): - """ - Cleans up memory used by bio - """ - libcrypto.BIO_free(self.bio) - del(self.bio) - def __str__(self): - """ - Returns current contents of buffer as byte string - """ - p=c_char_p(None) - l=libcrypto.BIO_ctrl(self.bio,3,0,byref(p)) - return string_at(p,l) - def __unicode__(self): - """ - Attempts to interpret current contents of buffer as UTF-8 string and convert it to unicode - """ - return str(self).decode("utf-8") - def read(self,length=None): - """ - Reads data from readble BIO. For test purposes. - @param length - if specifed, limits amount of data read. If not BIO is read until end of buffer - """ - if not length is None: - if not isinstance(length,(int,long)): - raise TypeError("length to read should be number") - buf=create_string_buffer(length) - readbytes=libcrypto.BIO_read(self.bio,buf,length) - if readbytes==-2: - raise NotImplementedError("Function is not supported by this BIO") - if readbytes==-1: - raise IOError - if readbytes==0: - return "" - return buf.raw[:readbytes] - else: - buf=create_string_buffer(1024) - out="" - r=1 - while r>0: - r=libcrypto.BIO_read(self.bio,buf,1024) - if r==-2: - raise NotImplementedError("Function is not supported by this BIO") - if r==-1: - raise IOError - if (r>0): - out+=buf.raw[:r] - return out + """ + Provides interface to OpenSSL memory bios + use str() or unicode() to get contents of writable bio + use bio member to pass to libcrypto function + """ + def __init__(self,data=None): + """ If data is specified, creates read-only BIO. If data is + None, creates writable BIO, contents of which can be retrieved by str() or unicode() + """ + if data is None: + method=libcrypto.BIO_s_mem() + self.bio=libcrypto.BIO_new(method) + else: + self.bio=libcrypto.BIO_new_mem_buf(c_char_p(data),len(data)) + def __del__(self): + """ + Cleans up memory used by bio + """ + libcrypto.BIO_free(self.bio) + del(self.bio) + def __str__(self): + """ + Returns current contents of buffer as byte string + """ + p=c_char_p(None) + l=libcrypto.BIO_ctrl(self.bio,3,0,byref(p)) + return string_at(p,l) + def __unicode__(self): + """ + Attempts to interpret current contents of buffer as UTF-8 string and convert it to unicode + """ + return str(self).decode("utf-8") + def read(self,length=None): + """ + Reads data from readble BIO. For test purposes. + @param length - if specifed, limits amount of data read. If not BIO is read until end of buffer + """ + if not length is None: + if not isinstance(length,(int,long)): + raise TypeError("length to read should be number") + buf=create_string_buffer(length) + readbytes=libcrypto.BIO_read(self.bio,buf,length) + if readbytes==-2: + raise NotImplementedError("Function is not supported by this BIO") + if readbytes==-1: + raise IOError + if readbytes==0: + return "" + return buf.raw[:readbytes] + else: + buf=create_string_buffer(1024) + out="" + r=1 + while r>0: + r=libcrypto.BIO_read(self.bio,buf,1024) + if r==-2: + raise NotImplementedError("Function is not supported by this BIO") + if r==-1: + raise IOError + if (r>0): + out+=buf.raw[:r] + return out - def write(self,data): - """ - Writes data to writable bio. For test purposes - """ - if isinstance(data,unicode): - data=data.encode("utf-8") - r=libcrypto.BIO_write(self.bio,data,len(data)) - if r==-2: - raise NotImplementedError("Function not supported by this BIO") - if r - """ - return libcrypto.EVP_CIPHER_flags(self.cipher) - def mode(self): - """ - Returns cipher mode as string constant like CBC, OFB etc. - """ - return CIPHER_MODES[self.flags() & 0x7] - def algo(self): - """ - Return cipher's algorithm name, derived from OID - """ - return self.oid().shortname() - def oid(self): - """ - Returns ASN.1 object identifier of the cipher as - ctypescrypto.oid.Oid object - """ - return Oid(libcrypto.EVP_CIPHER_nid(self.cipher)) + """ + Describes cihper algorihm. Can be used to produce cipher + instance and to get various information about cihper + """ + + def __init__(self, cipher_name): + """ + Constructs cipher algortihm using textual name as in openssl + command line + """ + self.cipher = libcrypto.EVP_get_cipherbyname(cipher_name) + if self.cipher is None: + raise CipherError("Unknown cipher: %s" % cipher_name) + + def __del__(self): + pass + def block_size(self): + """ + Returns block size of the cipher + """ + return libcrypto.EVP_CIPHER_block_size(self.cipher) + def key_length(self): + """ + Returns key length of the cipher + """ + return libcrypto.EVP_CIPHER_key_length(self.cipher) + def iv_length(self): + """ + Returns initialization vector length of the cipher + """ + return libcrypto.EVP_CIPHER_iv_length(self.cipher) + def flags(self): + """ + Return cipher flags. Low three bits of the flags encode + cipher mode (see mode). Higher bits is combinatuon of + EVP_CIPH* constants defined in the + """ + return libcrypto.EVP_CIPHER_flags(self.cipher) + def mode(self): + """ + Returns cipher mode as string constant like CBC, OFB etc. + """ + return CIPHER_MODES[self.flags() & 0x7] + def algo(self): + """ + Return cipher's algorithm name, derived from OID + """ + return self.oid().shortname() + def oid(self): + """ + Returns ASN.1 object identifier of the cipher as + ctypescrypto.oid.Oid object + """ + return Oid(libcrypto.EVP_CIPHER_nid(self.cipher)) class Cipher: - """ - Performs actual encrypton decryption - Note that object keeps some internal state. - To obtain full ciphertext (or plaintext during decihpering) - user should concatenate results of all calls of update with - result of finish - """ - def __init__(self, cipher_type, key, iv, encrypt=True): - """ - Initializing cipher instance. - - @param cipher_type - CipherType object - @param key = binary string representing the key - @param iv - binary string representing initializtion vector - @param encrypt - if True(default) we ere encrypting. - Otherwise decrypting - - """ - self._clean_ctx() - # Check key and iv length - if key is None: - raise ValueError("No key specified") - - key_ptr = c_char_p(key) - iv_ptr = c_char_p(iv) - self.ctx = libcrypto.EVP_CIPHER_CTX_new() - if self.ctx == 0: - raise CipherError("Unable to create cipher context") - self.encrypt = encrypt - enc=1 if encrypt else 0 - if not iv is None and len(iv) != cipher_type.iv_length(): - raise ValueError("Invalid IV length for this algorithm") - - if len(key) != cipher_type.key_length(): - if (cipher_type.flags() & 8) != 0: - # Variable key length cipher. - result = libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, None, None, c_int(enc)) - result = libcrypto.EVP_CIPHER_CTX_set_key_length(self.ctx,len(key)) - if result == 0: - self._clean_ctx() - raise CipherError("Unable to set key length") - result = libcrypto.EVP_CipherInit_ex(self.ctx, None, None, key_ptr, iv_ptr, c_int(enc)) - else: - raise ValueError("Invalid key length for this algorithm") - else: - result = libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, key_ptr, iv_ptr, c_int(enc)) - if result == 0: - self._clean_ctx() - raise CipherError("Unable to initialize cipher") - self.cipher_type = cipher_type - self.block_size = self.cipher_type.block_size() - self.cipher_finalized = False - - def __del__(self): - self._clean_ctx() - - def padding(self, padding=True): - """ - Sets padding mode of the cipher - """ - padding_flag=1 if padding else 0 - libcrypto.EVP_CIPHER_CTX_set_padding(self.ctx, padding_flag) - - def update(self, data): - """ - Performs actual encrypton/decrypion - - @param data - part of the plain text/ciphertext to process - @returns - part of ciphercext/plain text - - Passd chunk of text doeesn't need to contain full ciher - blocks. If neccessery, part of passed data would be kept - internally until next data would be received or finish - called - """ - if self.cipher_finalized : - raise CipherError("No updates allowed") - if not isinstance(data,str): - raise TypeError("A string is expected") - if len(data) == 0: - return "" - outbuf=create_string_buffer(self.block_size+len(data)) - outlen=c_int(0) - ret=libcrypto.EVP_CipherUpdate(self.ctx,outbuf,byref(outlen), - data,len(data)) - if ret <=0: - self._clean_ctx() - self.cipher_finalized=True - del self.ctx - raise CipherError("problem processing data") - return outbuf.raw[:outlen.value] - - def finish(self): - """ - Finalizes processing. If some data are kept in the internal - state, they would be processed and returned. - """ - if self.cipher_finalized : - raise CipherError("Cipher operation is already completed") - outbuf=create_string_buffer(self.block_size) - self.cipher_finalized = True - outlen=c_int(0) - result = libcrypto.EVP_CipherFinal_ex(self.ctx,outbuf , byref(outlen)) - if result == 0: - self._clean_ctx() - raise CipherError("Unable to finalize cipher") - if outlen.value>0: - return outbuf.raw[:outlen.value] - else: - return "" - - def _clean_ctx(self): - try: - if self.ctx is not None: - libcrypto.EVP_CIPHER_CTX_cleanup(self.ctx) - libcrypto.EVP_CIPHER_CTX_free(self.ctx) - del(self.ctx) - except AttributeError: - pass - self.cipher_finalized = True + """ + Performs actual encrypton decryption + Note that object keeps some internal state. + To obtain full ciphertext (or plaintext during decihpering) + user should concatenate results of all calls of update with + result of finish + """ + def __init__(self, cipher_type, key, iv, encrypt=True): + """ + Initializing cipher instance. + + @param cipher_type - CipherType object + @param key = binary string representing the key + @param iv - binary string representing initializtion vector + @param encrypt - if True(default) we ere encrypting. + Otherwise decrypting + + """ + self._clean_ctx() + # Check key and iv length + if key is None: + raise ValueError("No key specified") + + key_ptr = c_char_p(key) + iv_ptr = c_char_p(iv) + self.ctx = libcrypto.EVP_CIPHER_CTX_new() + if self.ctx == 0: + raise CipherError("Unable to create cipher context") + self.encrypt = encrypt + enc=1 if encrypt else 0 + if not iv is None and len(iv) != cipher_type.iv_length(): + raise ValueError("Invalid IV length for this algorithm") + + if len(key) != cipher_type.key_length(): + if (cipher_type.flags() & 8) != 0: + # Variable key length cipher. + result = libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, None, None, c_int(enc)) + result = libcrypto.EVP_CIPHER_CTX_set_key_length(self.ctx,len(key)) + if result == 0: + self._clean_ctx() + raise CipherError("Unable to set key length") + result = libcrypto.EVP_CipherInit_ex(self.ctx, None, None, key_ptr, iv_ptr, c_int(enc)) + else: + raise ValueError("Invalid key length for this algorithm") + else: + result = libcrypto.EVP_CipherInit_ex(self.ctx, cipher_type.cipher, None, key_ptr, iv_ptr, c_int(enc)) + if result == 0: + self._clean_ctx() + raise CipherError("Unable to initialize cipher") + self.cipher_type = cipher_type + self.block_size = self.cipher_type.block_size() + self.cipher_finalized = False + + def __del__(self): + self._clean_ctx() + + def padding(self, padding=True): + """ + Sets padding mode of the cipher + """ + padding_flag=1 if padding else 0 + libcrypto.EVP_CIPHER_CTX_set_padding(self.ctx, padding_flag) + + def update(self, data): + """ + Performs actual encrypton/decrypion + + @param data - part of the plain text/ciphertext to process + @returns - part of ciphercext/plain text + + Passd chunk of text doeesn't need to contain full ciher + blocks. If neccessery, part of passed data would be kept + internally until next data would be received or finish + called + """ + if self.cipher_finalized : + raise CipherError("No updates allowed") + if not isinstance(data,str): + raise TypeError("A string is expected") + if len(data) == 0: + return "" + outbuf=create_string_buffer(self.block_size+len(data)) + outlen=c_int(0) + ret=libcrypto.EVP_CipherUpdate(self.ctx,outbuf,byref(outlen), + data,len(data)) + if ret <=0: + self._clean_ctx() + self.cipher_finalized=True + del self.ctx + raise CipherError("problem processing data") + return outbuf.raw[:outlen.value] + + def finish(self): + """ + Finalizes processing. If some data are kept in the internal + state, they would be processed and returned. + """ + if self.cipher_finalized : + raise CipherError("Cipher operation is already completed") + outbuf=create_string_buffer(self.block_size) + self.cipher_finalized = True + outlen=c_int(0) + result = libcrypto.EVP_CipherFinal_ex(self.ctx,outbuf , byref(outlen)) + if result == 0: + self._clean_ctx() + raise CipherError("Unable to finalize cipher") + if outlen.value>0: + return outbuf.raw[:outlen.value] + else: + return "" + + def _clean_ctx(self): + try: + if self.ctx is not None: + libcrypto.EVP_CIPHER_CTX_cleanup(self.ctx) + libcrypto.EVP_CIPHER_CTX_free(self.ctx) + del(self.ctx) + except AttributeError: + pass + self.cipher_finalized = True # diff --git a/ctypescrypto/cms.py b/ctypescrypto/cms.py index 5a54bde..68f9c3f 100644 --- a/ctypescrypto/cms.py +++ b/ctypescrypto/cms.py @@ -17,267 +17,267 @@ from ctypescrypto.bio import Membio from ctypescrypto.oid import Oid class CMSError(LibCryptoError): - """ - Exception which is raised when error occurs - """ - pass + """ + Exception which is raised when error occurs + """ + pass class Flags: - """ - Constants for flags passed to the CMS methods. - Can be OR-ed together - """ - TEXT=1 - NOCERTS=2 - NO_CONTENT_VERIFY=4 - NO_ATTR_VERIFY=8 - NO_SIGS=NO_CONTENT_VERIFY|NO_ATTR_VERIFY - NOINTERN=0x10 - NO_SIGNER_CERT_VERIFY=0x20 - NO_VERIFY=0x20 - DETACHED=0x40 - BINARY=0x80 - NOATTR=0x100 - NOSMIMECAP =0x200 - NOOLDMIMETYPE=0x400 - CRLFEOL=0x800 - STREAM=0x1000 - NOCRL=0x2000 - PARTIAL=0x4000 - REUSE_DIGEST=0x8000 - USE_KEYID=0x10000 - DEBUG_DECRYPT=0x20000 + """ + Constants for flags passed to the CMS methods. + Can be OR-ed together + """ + TEXT=1 + NOCERTS=2 + NO_CONTENT_VERIFY=4 + NO_ATTR_VERIFY=8 + NO_SIGS=NO_CONTENT_VERIFY|NO_ATTR_VERIFY + NOINTERN=0x10 + NO_SIGNER_CERT_VERIFY=0x20 + NO_VERIFY=0x20 + DETACHED=0x40 + BINARY=0x80 + NOATTR=0x100 + NOSMIMECAP =0x200 + NOOLDMIMETYPE=0x400 + CRLFEOL=0x800 + STREAM=0x1000 + NOCRL=0x2000 + PARTIAL=0x4000 + REUSE_DIGEST=0x8000 + USE_KEYID=0x10000 + DEBUG_DECRYPT=0x20000 def CMS(data,format="PEM"): - """ - Parses CMS data and returns either SignedData or EnvelopedData - object - """ - b=Membio(data) - if format == "PEM": - ptr=libcrypto.PEM_read_bio_CMS(b.bio,None,None,None) - else: - ptr=libcrypto.d2i_CMS_bio(b.bio,None) - typeoid = Oid(libcrypto.OBJ_obj2nid(libcrypto.CMS_get0_type(ptr))) - if typeoid.shortname()=="pkcs7-signedData": - return SignedData(ptr) - elif typeoid.shortname()=="pkcs7-envelopedData": - return EnvelopedData(ptr) - elif typeoid.shortname()=="pkcs7-encryptedData": - return EncryptedData(ptr) - else: - raise NotImplementedError("cannot handle "+typeoid.shortname()) + """ + Parses CMS data and returns either SignedData or EnvelopedData + object + """ + b=Membio(data) + if format == "PEM": + ptr=libcrypto.PEM_read_bio_CMS(b.bio,None,None,None) + else: + ptr=libcrypto.d2i_CMS_bio(b.bio,None) + typeoid = Oid(libcrypto.OBJ_obj2nid(libcrypto.CMS_get0_type(ptr))) + if typeoid.shortname()=="pkcs7-signedData": + return SignedData(ptr) + elif typeoid.shortname()=="pkcs7-envelopedData": + return EnvelopedData(ptr) + elif typeoid.shortname()=="pkcs7-encryptedData": + return EncryptedData(ptr) + else: + raise NotImplementedError("cannot handle "+typeoid.shortname()) class CMSBase(object): - """ - Common ancessor for all CMS types. - Implements serializatio/deserialization - """ - def __init__(self,ptr=None): - self.ptr=ptr - def __str__(self): - """ - Serialize in DER format - """ - b=Membio() - if not libcrypto.i2d_CMS_bio(b.bio,self.ptr): - raise CMSError("writing CMS to PEM") - return str(b) + """ + Common ancessor for all CMS types. + Implements serializatio/deserialization + """ + def __init__(self,ptr=None): + self.ptr=ptr + def __str__(self): + """ + Serialize in DER format + """ + b=Membio() + if not libcrypto.i2d_CMS_bio(b.bio,self.ptr): + raise CMSError("writing CMS to PEM") + return str(b) - def pem(self): - """ - Serialize in PEM format - """ - b=Membio() - if not libcrypto.PEM_write_bio_CMS(b.bio,self.ptr): - raise CMSError("writing CMS to PEM") - return str(b) - - - + def pem(self): + """ + Serialize in PEM format + """ + b=Membio() + if not libcrypto.PEM_write_bio_CMS(b.bio,self.ptr): + raise CMSError("writing CMS to PEM") + return str(b) + + + class SignedData(CMSBase): - @staticmethod - def create(data,cert,pkey,flags=Flags.BINARY,certs=[]): - """ - Creates SignedData message by signing data with pkey and - certificate. + @staticmethod + def create(data,cert,pkey,flags=Flags.BINARY,certs=[]): + """ + Creates SignedData message by signing data with pkey and + certificate. - @param data - data to sign - @param pkey - pkey object with private key to sign - @param flags - OReed combination of Flags constants - @param certs - list of X509 objects to include into CMS - """ - if not pkey.cansign: - raise ValueError("Specified keypair has no private part") - if cert.pubkey!=pkey: - raise ValueError("Certificate doesn't match public key") - b=Membio(data) - if certs is not None and len(certs)>0: - certstack=StackOfX509(certs) - else: - certstack=None - ptr=libcrypto.CMS_sign(cert.cert,pkey.ptr,certstack,b.bio,flags) - if ptr is None: - raise CMSError("signing message") - return SignedData(ptr) - def sign(self,cert,pkey,md=None,data=None,flags=Flags.BINARY): - """ - Adds another signer to already signed message - @param cert - signer's certificate - @param pkey - signer's private key - @param md - message digest to use as DigestType object - (if None - default for key would be used) - @param data - data to sign (if detached and - Flags.REUSE_DIGEST is not specified) - @param flags - ORed combination of Flags consants - """ - if not pkey.cansign: - raise ValueError("Specified keypair has no private part") - if cert.pubkey!=pkey: - raise ValueError("Certificate doesn't match public key") - p1=libcrypto.CMS_sign_add1_Signer(self.ptr,cert.cert,pkey.ptr, - md.digest,flags) - if p1 is None: - raise CMSError("adding signer") - if flags & Flags.REUSE_DIGEST==0: - if data is not None: - b=Membio(data) - biodata=b.bio - else: - biodata=None - res= libcrypto.CMS_final(self.ptr,biodata,None,flags) - if res<=0: - raise CMSError - def verify(self,store,flags,data=None,certs=[]): - """ - Verifies signature under CMS message using trusted cert store + @param data - data to sign + @param pkey - pkey object with private key to sign + @param flags - OReed combination of Flags constants + @param certs - list of X509 objects to include into CMS + """ + if not pkey.cansign: + raise ValueError("Specified keypair has no private part") + if cert.pubkey!=pkey: + raise ValueError("Certificate doesn't match public key") + b=Membio(data) + if certs is not None and len(certs)>0: + certstack=StackOfX509(certs) + else: + certstack=None + ptr=libcrypto.CMS_sign(cert.cert,pkey.ptr,certstack,b.bio,flags) + if ptr is None: + raise CMSError("signing message") + return SignedData(ptr) + def sign(self,cert,pkey,md=None,data=None,flags=Flags.BINARY): + """ + Adds another signer to already signed message + @param cert - signer's certificate + @param pkey - signer's private key + @param md - message digest to use as DigestType object + (if None - default for key would be used) + @param data - data to sign (if detached and + Flags.REUSE_DIGEST is not specified) + @param flags - ORed combination of Flags consants + """ + if not pkey.cansign: + raise ValueError("Specified keypair has no private part") + if cert.pubkey!=pkey: + raise ValueError("Certificate doesn't match public key") + p1=libcrypto.CMS_sign_add1_Signer(self.ptr,cert.cert,pkey.ptr, + md.digest,flags) + if p1 is None: + raise CMSError("adding signer") + if flags & Flags.REUSE_DIGEST==0: + if data is not None: + b=Membio(data) + biodata=b.bio + else: + biodata=None + res= libcrypto.CMS_final(self.ptr,biodata,None,flags) + if res<=0: + raise CMSError + def verify(self,store,flags,data=None,certs=[]): + """ + Verifies signature under CMS message using trusted cert store - @param store - X509Store object with trusted certs - @param flags - OR-ed combination of flag consants - @param data - message data, if messge has detached signature - param certs - list of certificates to use during verification - If Flags.NOINTERN is specified, these are only - sertificates to search for signing certificates - @returns True if signature valid, False otherwise - """ - bio=None - if data!=None: - b=Membio(data) - bio=b.bio - if certs is not None and len(certs)>0: - certstack=StackOfX509(certs) - else: - certstack=None - res=libcrypto.CMS_verify(self.ptr,certstack,store.store,bio,None,flags) - return res>0 - @property - def signers(self): - """ - Return list of signer's certificates - """ - p=libcrypto.CMS_get0_signers(self.ptr) - if p is None: - raise CMSError - return StackOfX509(ptr=p,disposable=False) - @property - def data(self): - """ - Returns signed data if present in the message - """ - b=Membio() - if not libcrypto.CMS_verify(self.ptr,None,None,None,b.bio,Flags.NO_VERIFY): - raise CMSError("extract data") - return str(b) - def addcert(self,cert): - """ - Adds a certificate (probably intermediate CA) to the SignedData - structure - """ - if libcrypto.CMS_add1_cert(self.ptr,cert.cert)<=0: - raise CMSError("adding cert") - def addcrl(self,crl): - """ - Adds a CRL to the signed data structure - """ - raise NotImplementedError - @property - def certs(self): - """ - List of the certificates contained in the structure - """ - p=CMS_get1_certs(self.ptr) - if p is None: - raise CMSError("getting certs") - return StackOfX509(ptr=p,disposable=True) - @property - def crls(self): - """ - List of the CRLs contained in the structure - """ - raise NotImplementedError + @param store - X509Store object with trusted certs + @param flags - OR-ed combination of flag consants + @param data - message data, if messge has detached signature + param certs - list of certificates to use during verification + If Flags.NOINTERN is specified, these are only + sertificates to search for signing certificates + @returns True if signature valid, False otherwise + """ + bio=None + if data!=None: + b=Membio(data) + bio=b.bio + if certs is not None and len(certs)>0: + certstack=StackOfX509(certs) + else: + certstack=None + res=libcrypto.CMS_verify(self.ptr,certstack,store.store,bio,None,flags) + return res>0 + @property + def signers(self): + """ + Return list of signer's certificates + """ + p=libcrypto.CMS_get0_signers(self.ptr) + if p is None: + raise CMSError + return StackOfX509(ptr=p,disposable=False) + @property + def data(self): + """ + Returns signed data if present in the message + """ + b=Membio() + if not libcrypto.CMS_verify(self.ptr,None,None,None,b.bio,Flags.NO_VERIFY): + raise CMSError("extract data") + return str(b) + def addcert(self,cert): + """ + Adds a certificate (probably intermediate CA) to the SignedData + structure + """ + if libcrypto.CMS_add1_cert(self.ptr,cert.cert)<=0: + raise CMSError("adding cert") + def addcrl(self,crl): + """ + Adds a CRL to the signed data structure + """ + raise NotImplementedError + @property + def certs(self): + """ + List of the certificates contained in the structure + """ + p=CMS_get1_certs(self.ptr) + if p is None: + raise CMSError("getting certs") + return StackOfX509(ptr=p,disposable=True) + @property + def crls(self): + """ + List of the CRLs contained in the structure + """ + raise NotImplementedError class EnvelopedData(CMSBase): - @staticmethod - def create(recipients,data,cipher,flags=0): - """ - Creates and encrypts message - @param recipients - list of X509 objects - @param data - contents of the message - @param cipher - CipherType object - @param flags - flag - """ - recp=StackOfX509(recipients) - b=Membio(data) - p=libcrypto.CMS_encrypt(recp.ptr,b.bio,cipher.cipher_type,flags) - if p is None: - raise CMSError("encrypt EnvelopedData") - return EnvelopedData(p) - def decrypt(self,pkey,cert,flags=0): - """ - Decrypts message - @param pkey - private key to decrypt - @param cert - certificate of this private key (to find - neccessary RecipientInfo - @param flags - flags - @returns - decrypted data - """ - if not pkey.cansign: - raise ValueError("Specified keypair has no private part") - if pkey != cert.pubkey: - raise ValueError("Certificate doesn't match private key") - b=Membio() - res=libcrypto.CMS_decrypt(self.ptr,pkey.ptr,cert.ccert,None,b.bio,flags) - if res<=0: - raise CMSError("decrypting CMS") - return str(b) + @staticmethod + def create(recipients,data,cipher,flags=0): + """ + Creates and encrypts message + @param recipients - list of X509 objects + @param data - contents of the message + @param cipher - CipherType object + @param flags - flag + """ + recp=StackOfX509(recipients) + b=Membio(data) + p=libcrypto.CMS_encrypt(recp.ptr,b.bio,cipher.cipher_type,flags) + if p is None: + raise CMSError("encrypt EnvelopedData") + return EnvelopedData(p) + def decrypt(self,pkey,cert,flags=0): + """ + Decrypts message + @param pkey - private key to decrypt + @param cert - certificate of this private key (to find + neccessary RecipientInfo + @param flags - flags + @returns - decrypted data + """ + if not pkey.cansign: + raise ValueError("Specified keypair has no private part") + if pkey != cert.pubkey: + raise ValueError("Certificate doesn't match private key") + b=Membio() + res=libcrypto.CMS_decrypt(self.ptr,pkey.ptr,cert.ccert,None,b.bio,flags) + if res<=0: + raise CMSError("decrypting CMS") + return str(b) class EncryptedData(CMSBase): - @staticmethod - def create(data,cipher,key,flags=0): - """ - Creates an EncryptedData message. - @param data data to encrypt - @param cipher cipher.CipherType object represening required - cipher type - @param key - byte array used as simmetic key - @param flags - OR-ed combination of Flags constant - """ - b=Membio(data) - ptr=libcrypto.CMS_EncryptedData_encrypt(b.bio,cipher.cipher_type,key,len(key),flags) - if ptr is None: - raise CMSError("encrypt data") - return EncryptedData(ptr) - def decrypt(self,key,flags=0): - """ - Decrypts encrypted data message - @param key - symmetic key to decrypt - @param flags - OR-ed combination of Flags constant - """ - b=Membio() - if libcrypto.CMS_EncryptedData_decrypt(self.ptr,key,len(key),None, - b.bio,flags)<=0: - raise CMSError("decrypt data") - return str(b) + @staticmethod + def create(data,cipher,key,flags=0): + """ + Creates an EncryptedData message. + @param data data to encrypt + @param cipher cipher.CipherType object represening required + cipher type + @param key - byte array used as simmetic key + @param flags - OR-ed combination of Flags constant + """ + b=Membio(data) + ptr=libcrypto.CMS_EncryptedData_encrypt(b.bio,cipher.cipher_type,key,len(key),flags) + if ptr is None: + raise CMSError("encrypt data") + return EncryptedData(ptr) + def decrypt(self,key,flags=0): + """ + Decrypts encrypted data message + @param key - symmetic key to decrypt + @param flags - OR-ed combination of Flags constant + """ + b=Membio() + if libcrypto.CMS_EncryptedData_decrypt(self.ptr,key,len(key),None, + b.bio,flags)<=0: + raise CMSError("decrypt data") + return str(b) __all__=['CMS','CMSError','Flags','SignedData','EnvelopedData','EncryptedData'] diff --git a/ctypescrypto/digest.py b/ctypescrypto/digest.py index 586a1fb..6ebb82a 100644 --- a/ctypescrypto/digest.py +++ b/ctypescrypto/digest.py @@ -1,16 +1,16 @@ """ - Implements interface to OpenSSL EVP_Digest* functions. + Implements interface to OpenSSL EVP_Digest* functions. - Interface made as close to hashlib as possible. + Interface made as close to hashlib as possible. - This module is really an excess effort. Hashlib allows access to - mostly same functionality except oids and nids of hashing - algortithms (which might be needed for private key operations). + This module is really an excess effort. Hashlib allows access to + mostly same functionality except oids and nids of hashing + algortithms (which might be needed for private key operations). - hashlib even allows to use engine-provided digests if it is build - with dinamically linked libcrypto - so use - ctypescrypto.engine.set_default("gost",xFFFF) and md_gost94 - algorithm would be available both to this module and hashlib. + hashlib even allows to use engine-provided digests if it is build + with dinamically linked libcrypto - so use + ctypescrypto.engine.set_default("gost",xFFFF) and md_gost94 + algorithm would be available both to this module and hashlib. """ from ctypes import c_int, c_char_p, c_void_p, POINTER, c_long,c_longlong, create_string_buffer,byref @@ -22,144 +22,144 @@ DIGEST_ALGORITHMS = ("MD5", "SHA1", "SHA224", "SHA256", "SHA384", "SHA512") __all__ = ['DigestError','Digest','DigestType','new'] class DigestError(LibCryptoError): - pass + pass def new(algname): - """ - Behaves just like hashlib.new. Creates digest object by - algorithm name - """ - md=DigestType(algname) - return Digest(md) + """ + Behaves just like hashlib.new. Creates digest object by + algorithm name + """ + md=DigestType(algname) + return Digest(md) class DigestType(object): - """ - - Represents EVP_MD object - constant structure which describes - digest algorithm - - """ - def __init__(self, digest_name): - """ - Finds digest by its name. You can pass Oid object instead of - name. - - Special case is when None is passed as name. In this case - unitialized digest is created, and can be initalized later - by setting its digest attribute to pointer to EVP_MD - """ - if digest_name is None: - return - if isinstance(digest_name,Oid): - self.digest_name=digest_name.longname() - self.digest=libcrypto.EVP_get_digestbyname(self.digest_name) - else: - self.digest_name = str(digest_name) - self.digest = libcrypto.EVP_get_digestbyname(self.digest_name) - if self.digest is None: - raise DigestError("Unknown digest: %s" % self.digest_name) - - @property - def name(self): - if not hasattr(self,'digest_name'): - self.digest_name=Oid(libcrypto.EVP_MD_type(self.digest)).longname() - return self.digest_name - def __del__(self): - pass - def digest_size(self): - return libcrypto.EVP_MD_size(self.digest) - def block_size(self): - return libcrypto.EVP_MD_block_size(self.digest) - def oid(self): - return Oid(libcrypto.EVP_MD_type(self.digest)) + """ + + Represents EVP_MD object - constant structure which describes + digest algorithm + + """ + def __init__(self, digest_name): + """ + Finds digest by its name. You can pass Oid object instead of + name. + + Special case is when None is passed as name. In this case + unitialized digest is created, and can be initalized later + by setting its digest attribute to pointer to EVP_MD + """ + if digest_name is None: + return + if isinstance(digest_name,Oid): + self.digest_name=digest_name.longname() + self.digest=libcrypto.EVP_get_digestbyname(self.digest_name) + else: + self.digest_name = str(digest_name) + self.digest = libcrypto.EVP_get_digestbyname(self.digest_name) + if self.digest is None: + raise DigestError("Unknown digest: %s" % self.digest_name) + + @property + def name(self): + if not hasattr(self,'digest_name'): + self.digest_name=Oid(libcrypto.EVP_MD_type(self.digest)).longname() + return self.digest_name + def __del__(self): + pass + def digest_size(self): + return libcrypto.EVP_MD_size(self.digest) + def block_size(self): + return libcrypto.EVP_MD_block_size(self.digest) + def oid(self): + return Oid(libcrypto.EVP_MD_type(self.digest)) class Digest(object): - """ - Represents EVP_MD_CTX object which actually used to calculate - digests. - - """ - def __init__(self,digest_type): - """ - Initializes digest using given type. - """ - self._clean_ctx() - self.ctx = libcrypto.EVP_MD_CTX_create() - if self.ctx is None: - raise DigestError("Unable to create digest context") - result = libcrypto.EVP_DigestInit_ex(self.ctx, digest_type.digest, None) - if result == 0: - self._clean_ctx() - raise DigestError("Unable to initialize digest") - self.digest_type = digest_type - self.digest_size = self.digest_type.digest_size() - self.block_size = self.digest_type.block_size() - - def __del__(self): - self._clean_ctx() - - def update(self, data, length=None): - """ - Hashes given byte string - - @param data - string to hash - @param length - if not specifed, entire string is hashed, - otherwise only first length bytes - """ - if self.digest_finalized: - raise DigestError("No updates allowed") - if not isinstance(data,str): - raise TypeError("A string is expected") - if length is None: - length = len(data) - elif length > len(data): - raise ValueError("Specified length is greater than length of data") - result = libcrypto.EVP_DigestUpdate(self.ctx, c_char_p(data), length) - if result != 1: - raise DigestError, "Unable to update digest" - - def digest(self, data=None): - """ - Finalizes digest operation and return digest value - Optionally hashes more data before finalizing - """ - if self.digest_finalized: - return self.digest_out.raw[:self.digest_size] - if data is not None: - self.update(data) - self.digest_out = create_string_buffer(256) - length = c_long(0) - result = libcrypto.EVP_DigestFinal_ex(self.ctx, self.digest_out, byref(length)) - if result != 1 : - raise DigestError("Unable to finalize digest") - self.digest_finalized = True - return self.digest_out.raw[:self.digest_size] - def copy(self): - """ - Creates copy of the digest CTX to allow to compute digest - while being able to hash more data - """ - new_digest=Digest(self.digest_type) - libcrypto.EVP_MD_CTX_copy(new_digest.ctx,self.ctx) - return new_digest - - def _clean_ctx(self): - try: - if self.ctx is not None: - libcrypto.EVP_MD_CTX_destroy(self.ctx) - del(self.ctx) - except AttributeError: - pass - self.digest_out = None - self.digest_finalized = False - - def hexdigest(self,data=None): - """ - Returns digest in the hexadecimal form. For compatibility - with hashlib - """ - from base64 import b16encode - return b16encode(self.digest(data)) + """ + Represents EVP_MD_CTX object which actually used to calculate + digests. + + """ + def __init__(self,digest_type): + """ + Initializes digest using given type. + """ + self._clean_ctx() + self.ctx = libcrypto.EVP_MD_CTX_create() + if self.ctx is None: + raise DigestError("Unable to create digest context") + result = libcrypto.EVP_DigestInit_ex(self.ctx, digest_type.digest, None) + if result == 0: + self._clean_ctx() + raise DigestError("Unable to initialize digest") + self.digest_type = digest_type + self.digest_size = self.digest_type.digest_size() + self.block_size = self.digest_type.block_size() + + def __del__(self): + self._clean_ctx() + + def update(self, data, length=None): + """ + Hashes given byte string + + @param data - string to hash + @param length - if not specifed, entire string is hashed, + otherwise only first length bytes + """ + if self.digest_finalized: + raise DigestError("No updates allowed") + if not isinstance(data,str): + raise TypeError("A string is expected") + if length is None: + length = len(data) + elif length > len(data): + raise ValueError("Specified length is greater than length of data") + result = libcrypto.EVP_DigestUpdate(self.ctx, c_char_p(data), length) + if result != 1: + raise DigestError, "Unable to update digest" + + def digest(self, data=None): + """ + Finalizes digest operation and return digest value + Optionally hashes more data before finalizing + """ + if self.digest_finalized: + return self.digest_out.raw[:self.digest_size] + if data is not None: + self.update(data) + self.digest_out = create_string_buffer(256) + length = c_long(0) + result = libcrypto.EVP_DigestFinal_ex(self.ctx, self.digest_out, byref(length)) + if result != 1 : + raise DigestError("Unable to finalize digest") + self.digest_finalized = True + return self.digest_out.raw[:self.digest_size] + def copy(self): + """ + Creates copy of the digest CTX to allow to compute digest + while being able to hash more data + """ + new_digest=Digest(self.digest_type) + libcrypto.EVP_MD_CTX_copy(new_digest.ctx,self.ctx) + return new_digest + + def _clean_ctx(self): + try: + if self.ctx is not None: + libcrypto.EVP_MD_CTX_destroy(self.ctx) + del(self.ctx) + except AttributeError: + pass + self.digest_out = None + self.digest_finalized = False + + def hexdigest(self,data=None): + """ + Returns digest in the hexadecimal form. For compatibility + with hashlib + """ + from base64 import b16encode + return b16encode(self.digest(data)) # Declare function result and argument types diff --git a/ctypescrypto/ec.py b/ctypescrypto/ec.py index 047aad9..7240b39 100644 --- a/ctypescrypto/ec.py +++ b/ctypescrypto/ec.py @@ -8,60 +8,60 @@ from ctypescrypto import libcrypto __all__ = [ 'create'] def create(curve,data): - """ - Creates EC keypair from the just secret key and curve name - - @param curve - name of elliptic curve - @param num - byte array or long number representing key - """ - ec=libcrypto.EC_KEY_new_by_curve_name(curve.nid) - if ec is None: - raise PKeyError("EC_KEY_new_by_curvename") - group=libcrypto.EC_KEY_get0_group(ec) - if group is None: - raise PKeyError("EC_KEY_get0_group") - libcrypto.EC_GROUP_set_asn1_flag(group,1) - raw_key=libcrypto.BN_new() - if isinstance(data,int): - BN_hex2bn(byref(raw_key),hex(data)) - else: - if raw_key is None: - raise PKeyError("BN_new") - if libcrypto.BN_bin2bn(data,len(data),raw_key) is None: - raise PKeyError("BN_bin2bn") - ctx=libcrypto.BN_CTX_new() - if ctx is None: - raise PKeyError("BN_CTX_new") - order=libcrypto.BN_new() - if order is None: - raise PKeyError("BN_new") - priv_key = libcrypto.BN_new() - if priv_key is None: - raise PKeyError("BN_new") - if libcrypto.EC_GROUP_get_order(group,order,ctx) <=0: - raise PKeyError("EC_GROUP_get_order") - if libcrypto.BN_nnmod(priv_key,raw_key,order,ctx) <=0: - raise PKeyError("BN_nnmod") - if libcrypto.EC_KEY_set_private_key(ec,priv_key)<=0: - raise PKeyError("EC_KEY_set_private_key") - pub_key=libcrypto.EC_POINT_new(group) - if pub_key is None: - raise PKeyError("EC_POINT_new") - if libcrypto.EC_POINT_mul(group,pub_key,priv_key,None,None,ctx)<=0: - raise PKeyError("EC_POINT_mul") - if libcrypto.EC_KEY_set_public_key(ec,pub_key)<=0: - raise PKeyError("EC_KEY_set_public_key") - libcrypto.BN_free(raw_key) - libcrypto.BN_free(order) - libcrypto.BN_free(priv_key) - libcrypto.BN_CTX_free(ctx) - p=libcrypto.EVP_PKEY_new() - if p is None: - raise PKeyError("EVP_PKEY_new") - if libcrypto.EVP_PKEY_set1_EC_KEY(p,ec)<=0: - raise PKeyError("EVP_PKEY_set1_EC_KEY") - libcrypto.EC_KEY_free(ec) - return PKey(ptr=p,cansign=True) + """ + Creates EC keypair from the just secret key and curve name + + @param curve - name of elliptic curve + @param num - byte array or long number representing key + """ + ec=libcrypto.EC_KEY_new_by_curve_name(curve.nid) + if ec is None: + raise PKeyError("EC_KEY_new_by_curvename") + group=libcrypto.EC_KEY_get0_group(ec) + if group is None: + raise PKeyError("EC_KEY_get0_group") + libcrypto.EC_GROUP_set_asn1_flag(group,1) + raw_key=libcrypto.BN_new() + if isinstance(data,int): + BN_hex2bn(byref(raw_key),hex(data)) + else: + if raw_key is None: + raise PKeyError("BN_new") + if libcrypto.BN_bin2bn(data,len(data),raw_key) is None: + raise PKeyError("BN_bin2bn") + ctx=libcrypto.BN_CTX_new() + if ctx is None: + raise PKeyError("BN_CTX_new") + order=libcrypto.BN_new() + if order is None: + raise PKeyError("BN_new") + priv_key = libcrypto.BN_new() + if priv_key is None: + raise PKeyError("BN_new") + if libcrypto.EC_GROUP_get_order(group,order,ctx) <=0: + raise PKeyError("EC_GROUP_get_order") + if libcrypto.BN_nnmod(priv_key,raw_key,order,ctx) <=0: + raise PKeyError("BN_nnmod") + if libcrypto.EC_KEY_set_private_key(ec,priv_key)<=0: + raise PKeyError("EC_KEY_set_private_key") + pub_key=libcrypto.EC_POINT_new(group) + if pub_key is None: + raise PKeyError("EC_POINT_new") + if libcrypto.EC_POINT_mul(group,pub_key,priv_key,None,None,ctx)<=0: + raise PKeyError("EC_POINT_mul") + if libcrypto.EC_KEY_set_public_key(ec,pub_key)<=0: + raise PKeyError("EC_KEY_set_public_key") + libcrypto.BN_free(raw_key) + libcrypto.BN_free(order) + libcrypto.BN_free(priv_key) + libcrypto.BN_CTX_free(ctx) + p=libcrypto.EVP_PKEY_new() + if p is None: + raise PKeyError("EVP_PKEY_new") + if libcrypto.EVP_PKEY_set1_EC_KEY(p,ec)<=0: + raise PKeyError("EVP_PKEY_set1_EC_KEY") + libcrypto.EC_KEY_free(ec) + return PKey(ptr=p,cansign=True) libcrypto.EVP_PKEY_new.restype=c_void_p diff --git a/ctypescrypto/engine.py b/ctypescrypto/engine.py index 898b20d..0446d48 100644 --- a/ctypescrypto/engine.py +++ b/ctypescrypto/engine.py @@ -10,25 +10,25 @@ __all__=['default','set_default'] default=None def set_default(engine): - """ - Loads specified engine and sets it as default for all - algorithms, supported by it - """ - global default - e=libcrypto.ENGINE_by_id(engine) - if e is None: - # Try load engine - e = libcrypto.ENGINE_by_id("dynamic") - if e is None: - raise LibCryptoError("Cannot get 'dynamic' engine") - if not libcrypto.ENGINE_ctrl_cmd_string(e,"SO_PATH",engine,0): - raise LibCryptoError("Cannot execute ctrl cmd SO_PATH") - if not libcrypto.ENGINE_ctrl_cmd_string(e,"LOAD",None,0): - raise LibCryptoError("Cannot execute ctrl cmd LOAD") - if e is None: - raise ValueError("Cannot find engine "+engine) - libcrypto.ENGINE_set_default(e,c_int(0xFFFF)) - default=e + """ + Loads specified engine and sets it as default for all + algorithms, supported by it + """ + global default + e=libcrypto.ENGINE_by_id(engine) + if e is None: + # Try load engine + e = libcrypto.ENGINE_by_id("dynamic") + if e is None: + raise LibCryptoError("Cannot get 'dynamic' engine") + if not libcrypto.ENGINE_ctrl_cmd_string(e,"SO_PATH",engine,0): + raise LibCryptoError("Cannot execute ctrl cmd SO_PATH") + if not libcrypto.ENGINE_ctrl_cmd_string(e,"LOAD",None,0): + raise LibCryptoError("Cannot execute ctrl cmd LOAD") + if e is None: + raise ValueError("Cannot find engine "+engine) + libcrypto.ENGINE_set_default(e,c_int(0xFFFF)) + default=e # Declare function result and arguments for used functions libcrypto.ENGINE_by_id.restype=c_void_p diff --git a/ctypescrypto/exception.py b/ctypescrypto/exception.py index 30c138e..0e6fc43 100644 --- a/ctypescrypto/exception.py +++ b/ctypescrypto/exception.py @@ -8,41 +8,41 @@ strings_loaded=False __all__ = ['LibCryptoError','clear_err_stack'] def _check_null(s): - """ - Handle transparently NULL returned from error reporting functions - instead of strings - """ - if s is None: - return "" - return s + """ + Handle transparently NULL returned from error reporting functions + instead of strings + """ + if s is None: + return "" + return s class LibCryptoError(Exception): - """ - Exception for libcrypto errors. Adds all the info, which can be - extracted from internal (per-thread) libcrypto error stack to the message, - passed to the constructor. - """ - def __init__(self,msg): - global strings_loaded - if not strings_loaded: - libcrypto.ERR_load_crypto_strings() - strings_loaded = True - e=libcrypto.ERR_get_error() - m = msg - while e != 0: - m+="\n\t"+_check_null(libcrypto.ERR_lib_error_string(e))+":"+\ - _check_null(libcrypto.ERR_func_error_string(e))+":"+\ - _check_null(libcrypto.ERR_reason_error_string(e)) - e=libcrypto.ERR_get_error() - self.args=(m,) + """ + Exception for libcrypto errors. Adds all the info, which can be + extracted from internal (per-thread) libcrypto error stack to the message, + passed to the constructor. + """ + def __init__(self,msg): + global strings_loaded + if not strings_loaded: + libcrypto.ERR_load_crypto_strings() + strings_loaded = True + e=libcrypto.ERR_get_error() + m = msg + while e != 0: + m+="\n\t"+_check_null(libcrypto.ERR_lib_error_string(e))+":"+\ + _check_null(libcrypto.ERR_func_error_string(e))+":"+\ + _check_null(libcrypto.ERR_reason_error_string(e)) + e=libcrypto.ERR_get_error() + self.args=(m,) def clear_err_stack(): - """ - Clears internal libcrypto err stack. Call it if you've checked - return code and processed exceptional situation, so subsequent - raising of the LibCryptoError wouldn't list already handled errors - """ - libcrypto.ERR_clear_error() + """ + Clears internal libcrypto err stack. Call it if you've checked + return code and processed exceptional situation, so subsequent + raising of the LibCryptoError wouldn't list already handled errors + """ + libcrypto.ERR_clear_error() libcrypto.ERR_lib_error_string.restype=c_char_p diff --git a/ctypescrypto/mac.py b/ctypescrypto/mac.py index 48ade09..7b9381e 100644 --- a/ctypescrypto/mac.py +++ b/ctypescrypto/mac.py @@ -13,75 +13,75 @@ from ctypes import c_int,c_char_p, c_void_p, c_size_t,POINTER,create_string_buff __all__ = ['MAC','DigestError'] class MAC(Digest): - """ - This object represents MAC context. It is quite simular - to digest algorithm. It is simular to hmac objects provided - by standard library - """ - def __init__(self,algorithm,key,digest=None,**kwargs): - """ - Constructor has to obligatory arguments: - - @param algorithm - which is name of MAC algorithm i.e 'hmac' or - 'gost-mac' or equivalent Oid object - @param key - byte buffer with key. + """ + This object represents MAC context. It is quite simular + to digest algorithm. It is simular to hmac objects provided + by standard library + """ + def __init__(self,algorithm,key,digest=None,**kwargs): + """ + Constructor has to obligatory arguments: + + @param algorithm - which is name of MAC algorithm i.e 'hmac' or + 'gost-mac' or equivalent Oid object + @param key - byte buffer with key. - Optional parameters are: - digest - Oid or name of the digest algorithm to use. If none - specified, OpenSSL will try to derive one from the MAC - algorithm (or if algorithm is hmac, we'll substititute md5 - for compatibility with standard hmac module + Optional parameters are: + digest - Oid or name of the digest algorithm to use. If none + specified, OpenSSL will try to derive one from the MAC + algorithm (or if algorithm is hmac, we'll substititute md5 + for compatibility with standard hmac module - any other keyword argument is passed to EVP_PKEY_CTX as string - option. + any other keyword argument is passed to EVP_PKEY_CTX as string + option. - """ - if isinstance(algorithm,str): - self.algorithm=Oid(algorithm) - elif isinstance(algorithm,Oid): - self.algorithm=algorithm - else: - raise TypeError("Algorthm must be string or Oid") - if self.algorithm==Oid('hmac') and digest is None: - digest='md5' - self.name=self.algorithm.shortname().lower() - if digest is not None: - self.digest_type=DigestType(digest) - self.name+='-'+self.digest_type.digest_name - d=self.digest_type.digest - else: - self.digest_type=None - d=None - self.key=libcrypto.EVP_PKEY_new_mac_key(self.algorithm.nid,None,key,len(key)) - if self.key is None: - raise DigestError("EVP_PKEY_new_mac_key") - pctx=c_void_p() - self.ctx = libcrypto.EVP_MD_CTX_create() - if self.ctx == 0: - raise DigestError("Unable to create digest context") - if libcrypto.EVP_DigestSignInit(self.ctx,pointer(pctx),d,None,self.key) <= 0: - raise DigestError("Unable to intialize digest context") - self.digest_finalized=False - if self.digest_type is None: - self.digest_type=DigestType(Oid(libcrypto.EVP_MD_type(libcrypto.EVP_MD_CTX_md(self.ctx)))) - for (name,val) in kwargs.items(): - if libcrypto.EVP_PKEY_CTX_ctrl_str(pctx,name,val)<=0: - raise DigestError("Unable to set mac parameter") - self.digest_size = self.digest_type.digest_size() - self.block_size = self.digest_type.block_size() - def digest(self,data=None): - """ - Method digest is redefined to return keyed MAC value instead of - just digest. - """ - if data is not None: - self.update(data) - b=create_string_buffer(256) - size=c_size_t(256) - if libcrypto.EVP_DigestSignFinal(self.ctx,b,pointer(size))<=0: - raise DigestError('SignFinal') - self.digest_finalized=True - return b.raw[:size.value] + """ + if isinstance(algorithm,str): + self.algorithm=Oid(algorithm) + elif isinstance(algorithm,Oid): + self.algorithm=algorithm + else: + raise TypeError("Algorthm must be string or Oid") + if self.algorithm==Oid('hmac') and digest is None: + digest='md5' + self.name=self.algorithm.shortname().lower() + if digest is not None: + self.digest_type=DigestType(digest) + self.name+='-'+self.digest_type.digest_name + d=self.digest_type.digest + else: + self.digest_type=None + d=None + self.key=libcrypto.EVP_PKEY_new_mac_key(self.algorithm.nid,None,key,len(key)) + if self.key is None: + raise DigestError("EVP_PKEY_new_mac_key") + pctx=c_void_p() + self.ctx = libcrypto.EVP_MD_CTX_create() + if self.ctx == 0: + raise DigestError("Unable to create digest context") + if libcrypto.EVP_DigestSignInit(self.ctx,pointer(pctx),d,None,self.key) <= 0: + raise DigestError("Unable to intialize digest context") + self.digest_finalized=False + if self.digest_type is None: + self.digest_type=DigestType(Oid(libcrypto.EVP_MD_type(libcrypto.EVP_MD_CTX_md(self.ctx)))) + for (name,val) in kwargs.items(): + if libcrypto.EVP_PKEY_CTX_ctrl_str(pctx,name,val)<=0: + raise DigestError("Unable to set mac parameter") + self.digest_size = self.digest_type.digest_size() + self.block_size = self.digest_type.block_size() + def digest(self,data=None): + """ + Method digest is redefined to return keyed MAC value instead of + just digest. + """ + if data is not None: + self.update(data) + b=create_string_buffer(256) + size=c_size_t(256) + if libcrypto.EVP_DigestSignFinal(self.ctx,b,pointer(size))<=0: + raise DigestError('SignFinal') + self.digest_finalized=True + return b.raw[:size.value] libcrypto.EVP_DigestSignFinal.argtypes=(c_void_p,c_char_p,POINTER(c_size_t)) libcrypto.EVP_DigestSignFinal.restype=c_int diff --git a/ctypescrypto/oid.py b/ctypescrypto/oid.py index 8caa57a..85b3aa0 100644 --- a/ctypescrypto/oid.py +++ b/ctypescrypto/oid.py @@ -1,4 +1,4 @@ -""" +""" Interface to OpenSSL object identifier database. It is primarily intended to deal with OIDs which are compiled into the @@ -13,101 +13,101 @@ from ctypes import c_char_p, c_void_p, c_int, create_string_buffer __all__ = ['Oid','create','cleanup'] class Oid(object): - """ - Represents an OID. It can be consturucted by textual - representation like Oid("commonName") or Oid("CN"), - dotted-decimal Oid("1.2.3.4") or using OpenSSL numeric - identifer (NID), which is typically returned or required by - OpenSSL API functions. If object is consturcted from textual - representation which is not present in the database, it fails - with ValueError + """ + Represents an OID. It can be consturucted by textual + representation like Oid("commonName") or Oid("CN"), + dotted-decimal Oid("1.2.3.4") or using OpenSSL numeric + identifer (NID), which is typically returned or required by + OpenSSL API functions. If object is consturcted from textual + representation which is not present in the database, it fails + with ValueError - attribute nid - contains object nid. + attribute nid - contains object nid. - """ + """ - def __init__(self,value): - " Object constuctor. Accepts string or integer" - if isinstance(value,unicode): - value=value.encode('ascii') - if isinstance(value,str): - self.nid=libcrypto.OBJ_txt2nid(value) - if self.nid==0: - raise ValueError("Cannot find object %s in the database"%(value)) - elif isinstance(value,(int,long)): - cn=libcrypto.OBJ_nid2sn(value) - if cn is None: - raise ValueError("No such nid %d in the database"%(value)) - self.nid=value - else: - raise TypeError("Cannot convert this type to object identifier") - def __hash__(self): - " Returns NID " - return self.nid - def __cmp__(self,other): - " Compares NIDs of two objects " - return self.nid-other.nid - def __str__(self): - " Default string representation of Oid is dotted-decimal" - return self.dotted() - def __repr__(self): - return "Oid('%s')"%(self.dotted()) - def shortname(self): - " Returns short name if any " - return libcrypto.OBJ_nid2sn(self.nid) - def longname(self): - " Returns logn name if any " - return libcrypto.OBJ_nid2ln(self.nid) - def dotted(self): - " Returns dotted-decimal reperesentation " - obj=libcrypto.OBJ_nid2obj(self.nid) - buf=create_string_buffer(256) - libcrypto.OBJ_obj2txt(buf,256,obj,1) - return buf.value - @staticmethod - def fromobj(obj): - """ - Creates an OID object from the pointer to ASN1_OBJECT c structure. - Strictly for internal use - """ - nid=libcrypto.OBJ_obj2nid(obj) - if nid==0: - buf=create_string_buffer(80) - l=libcrypto.OBJ_obj2txt(buf,80,obj,1) - oid=create(buf[0:l],buf[0:l],buf[0:l]) - else: - oid=Oid(nid) - return oid + def __init__(self,value): + " Object constuctor. Accepts string or integer" + if isinstance(value,unicode): + value=value.encode('ascii') + if isinstance(value,str): + self.nid=libcrypto.OBJ_txt2nid(value) + if self.nid==0: + raise ValueError("Cannot find object %s in the database"%(value)) + elif isinstance(value,(int,long)): + cn=libcrypto.OBJ_nid2sn(value) + if cn is None: + raise ValueError("No such nid %d in the database"%(value)) + self.nid=value + else: + raise TypeError("Cannot convert this type to object identifier") + def __hash__(self): + " Returns NID " + return self.nid + def __cmp__(self,other): + " Compares NIDs of two objects " + return self.nid-other.nid + def __str__(self): + " Default string representation of Oid is dotted-decimal" + return self.dotted() + def __repr__(self): + return "Oid('%s')"%(self.dotted()) + def shortname(self): + " Returns short name if any " + return libcrypto.OBJ_nid2sn(self.nid) + def longname(self): + " Returns logn name if any " + return libcrypto.OBJ_nid2ln(self.nid) + def dotted(self): + " Returns dotted-decimal reperesentation " + obj=libcrypto.OBJ_nid2obj(self.nid) + buf=create_string_buffer(256) + libcrypto.OBJ_obj2txt(buf,256,obj,1) + return buf.value + @staticmethod + def fromobj(obj): + """ + Creates an OID object from the pointer to ASN1_OBJECT c structure. + Strictly for internal use + """ + nid=libcrypto.OBJ_obj2nid(obj) + if nid==0: + buf=create_string_buffer(80) + l=libcrypto.OBJ_obj2txt(buf,80,obj,1) + oid=create(buf[0:l],buf[0:l],buf[0:l]) + else: + oid=Oid(nid) + return oid def create(dotted,shortname,longname): - """ - Creates new OID in the database + """ + Creates new OID in the database - @param dotted - dotted-decimal representation of new OID - @param shortname - short name for new OID - @param longname - long name for new OID + @param dotted - dotted-decimal representation of new OID + @param shortname - short name for new OID + @param longname - long name for new OID - @returns Oid object corresponding to new OID - - This function should be used with exreme care. Whenever - possible, it is better to add new OIDs via OpenSSL configuration - file + @returns Oid object corresponding to new OID + + This function should be used with exreme care. Whenever + possible, it is better to add new OIDs via OpenSSL configuration + file - Results of calling this function twice for same OIDor for - Oid alredy in database are undefined - """ - nid=libcrypto.OBJ_create(dotted,shortname,longname) - if nid == 0: - raise LibCryptoError("Problem adding new OID to the database") - return Oid(nid) + Results of calling this function twice for same OIDor for + Oid alredy in database are undefined + """ + nid=libcrypto.OBJ_create(dotted,shortname,longname) + if nid == 0: + raise LibCryptoError("Problem adding new OID to the database") + return Oid(nid) def cleanup(): - """ - Removes all the objects, dynamically added by current - application from database. - """ - libcrypto.OBJ_cleanup() + """ + Removes all the objects, dynamically added by current + application from database. + """ + libcrypto.OBJ_cleanup() libcrypto.OBJ_nid2sn.restype=c_char_p libcrypto.OBJ_nid2ln.restype=c_char_p diff --git a/ctypescrypto/pbkdf2.py b/ctypescrypto/pbkdf2.py index c2e01bb..85a99a5 100644 --- a/ctypescrypto/pbkdf2.py +++ b/ctypescrypto/pbkdf2.py @@ -10,26 +10,26 @@ from ctypescrypto.digest import DigestType __all__ = ['pbkdf2'] def pbkdf2(password,salt,outlen,digesttype="sha1",iterations=2000): - """ - Interface to PKCS5_PBKDF2_HMAC function - Parameters: - - @param password - password to derive key from - @param salt - random salt to use for key derivation - @param outlen - number of bytes to derive - @param digesttype - name of digest to use to use (default sha1) - @param iterations - number of iterations to use + """ + Interface to PKCS5_PBKDF2_HMAC function + Parameters: + + @param password - password to derive key from + @param salt - random salt to use for key derivation + @param outlen - number of bytes to derive + @param digesttype - name of digest to use to use (default sha1) + @param iterations - number of iterations to use - @returns outlen bytes of key material derived from password and salt - """ - dt=DigestType(digesttype) - out=create_string_buffer(outlen) - res=libcrypto.PKCS5_PBKDF2_HMAC(password,len(password),salt,len(salt), - iterations,dt.digest,outlen,out) - if res<=0: - raise LibCryptoError("error computing PBKDF2") - return out.raw + @returns outlen bytes of key material derived from password and salt + """ + dt=DigestType(digesttype) + out=create_string_buffer(outlen) + res=libcrypto.PKCS5_PBKDF2_HMAC(password,len(password),salt,len(salt), + iterations,dt.digest,outlen,out) + if res<=0: + raise LibCryptoError("error computing PBKDF2") + return out.raw libcrypto.PKCS5_PBKDF2_HMAC.argtypes=(c_char_p,c_int,c_char_p,c_int,c_int, - c_void_p,c_int,c_char_p) + c_void_p,c_int,c_char_p) libcrypto.PKCS5_PBKDF2_HMAC.restupe=c_int diff --git a/ctypescrypto/pkey.py b/ctypescrypto/pkey.py index 56e8746..1e70784 100644 --- a/ctypescrypto/pkey.py +++ b/ctypescrypto/pkey.py @@ -13,242 +13,242 @@ import sys __all__ = ['PKeyError','password_callback','PKey'] class PKeyError(LibCryptoError): - pass + pass CALLBACK_FUNC=CFUNCTYPE(c_int,c_char_p,c_int,c_int,c_char_p) def password_callback(buf,length,rwflag,u): - """ - Example password callback for private key. Assumes that - password is store in the userdata parameter, so allows to pass password - from constructor arguments to the libcrypto keyloading functions - """ - cnt=len(u) - if length0 - def derive(self,peerkey,**kwargs): - """ - Derives shared key (DH,ECDH,VKO 34.10). Requires - private key available + def verify(self,digest,signature,**kwargs): + """ + Verifies given signature on given digest + Returns True if Ok, False if don't match + Keyword arguments allows to set algorithm-specific + parameters + """ + ctx=libcrypto.EVP_PKEY_CTX_new(self.key,None) + if ctx is None: + raise PKeyError("Initailizing verify context") + if libcrypto.EVP_PKEY_verify_init(ctx)<1: + raise PKeyError("verify_init") + self._configure_context(ctx,kwargs) + rv=libcrypto.EVP_PKEY_verify(ctx,signature,len(signature),digest,len(digest)) + if rv<0: + raise PKeyError("Signature verification") + libcrypto.EVP_PKEY_CTX_free(ctx) + return rv>0 + def derive(self,peerkey,**kwargs): + """ + Derives shared key (DH,ECDH,VKO 34.10). Requires + private key available - @param peerkey - other key (may be public only) + @param peerkey - other key (may be public only) - Keyword parameters are algorithm-specific - """ - ctx=libcrypto.EVP_PKEY_CTX_new(self.key,None) - if ctx is None: - raise PKeyError("Initailizing derive context") - if libcrypto.EVP_PKEY_derive_init(ctx)<1: - raise PKeyError("derive_init") + Keyword parameters are algorithm-specific + """ + ctx=libcrypto.EVP_PKEY_CTX_new(self.key,None) + if ctx is None: + raise PKeyError("Initailizing derive context") + if libcrypto.EVP_PKEY_derive_init(ctx)<1: + raise PKeyError("derive_init") - - self._configure_context(ctx,kwargs,["ukm"]) - if libcrypto.EVP_PKEY_derive_set_peer(ctx,peerkey.key)<=0: - raise PKeyError("Cannot set peer key") - if "ukm" in kwargs: - if libcrypto.EVP_PKEY_CTX_ctrl(ctx,-1,1<<10,8,8,kwargs["ukm"])<=0: - raise PKeyError("Cannot set UKM") - keylen=c_long(0) - if libcrypto.EVP_PKEY_derive(ctx,None,byref(keylen))<=0: - raise PKeyError("computing shared key length") - buf=create_string_buffer(keylen.value) - if libcrypto.EVP_PKEY_derive(ctx,buf,byref(keylen))<=0: - raise PKeyError("computing actual shared key") - libcrypto.EVP_PKEY_CTX_free(ctx) - return buf.raw[:keylen.value] - @staticmethod - def generate(algorithm,**kwargs): - """ - Generates new private-public key pair for given algorithm - (string like 'rsa','ec','gost2001') and algorithm-specific - parameters. + + self._configure_context(ctx,kwargs,["ukm"]) + if libcrypto.EVP_PKEY_derive_set_peer(ctx,peerkey.key)<=0: + raise PKeyError("Cannot set peer key") + if "ukm" in kwargs: + if libcrypto.EVP_PKEY_CTX_ctrl(ctx,-1,1<<10,8,8,kwargs["ukm"])<=0: + raise PKeyError("Cannot set UKM") + keylen=c_long(0) + if libcrypto.EVP_PKEY_derive(ctx,None,byref(keylen))<=0: + raise PKeyError("computing shared key length") + buf=create_string_buffer(keylen.value) + if libcrypto.EVP_PKEY_derive(ctx,buf,byref(keylen))<=0: + raise PKeyError("computing actual shared key") + libcrypto.EVP_PKEY_CTX_free(ctx) + return buf.raw[:keylen.value] + @staticmethod + def generate(algorithm,**kwargs): + """ + Generates new private-public key pair for given algorithm + (string like 'rsa','ec','gost2001') and algorithm-specific + parameters. - Algorithm specific paramteers for RSA: + Algorithm specific paramteers for RSA: - rsa_keygen_bits=number - size of key to be generated - rsa_keygen_pubexp - RSA public expontent(default 65537) + rsa_keygen_bits=number - size of key to be generated + rsa_keygen_pubexp - RSA public expontent(default 65537) - Algorithm specific parameters for DSA,DH and EC + Algorithm specific parameters for DSA,DH and EC - paramsfrom=PKey object + paramsfrom=PKey object - copy parameters of newly generated key from existing key + copy parameters of newly generated key from existing key - Algorithm specific parameters for GOST2001 + Algorithm specific parameters for GOST2001 - paramset= paramset name where name is one of - 'A','B','C','XA','XB','test' + paramset= paramset name where name is one of + 'A','B','C','XA','XB','test' - paramsfrom does work too - """ - tmpeng=c_void_p(None) - ameth=libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng),algorithm,-1) - if ameth is None: - raise PKeyError("Algorithm %s not foind\n"%(algname)) - clear_err_stack() - pkey_id=c_int(0) - libcrypto.EVP_PKEY_asn1_get0_info(byref(pkey_id),None,None,None,None,ameth) - #libcrypto.ENGINE_finish(tmpeng) - if "paramsfrom" in kwargs: - ctx=libcrypto.EVP_PKEY_CTX_new(kwargs["paramsfrom"].key,None) - else: - ctx=libcrypto.EVP_PKEY_CTX_new_id(pkey_id,None) - # FIXME support EC curve as keyword param by invoking paramgen - # operation - if ctx is None: - raise PKeyError("Creating context for key type %d"%(pkey_id.value)) - if libcrypto.EVP_PKEY_keygen_init(ctx) <=0 : - raise PKeyError("keygen_init") - PKey._configure_context(ctx,kwargs,["paramsfrom"]) - key=c_void_p(None) - if libcrypto.EVP_PKEY_keygen(ctx,byref(key))<=0: - raise PKeyError("Error generating key") - libcrypto.EVP_PKEY_CTX_free(ctx) - return PKey(ptr=key,cansign=True) - def exportpub(self,format="PEM"): - """ - Returns public key as PEM or DER structure. - """ - b=Membio() - if format == "PEM": - r=libcrypto.PEM_write_bio_PUBKEY(b.bio,self.key) - else: - r=libcrypto.i2d_PUBKEY_bio(b.bio,self.key) - if r==0: - raise PKeyError("error serializing public key") - return str(b) - def exportpriv(self,format="PEM",password=None,cipher=None): - """ - Returns private key as PEM or DER Structure. - If password and cipher are specified, encrypts key - on given password, using given algorithm. Cipher must be - an ctypescrypto.cipher.CipherType object - """ - b=Membio() - if cipher is None: - evp_cipher=None - else: - if password is None: - raise NotImplementedError("Interactive password entry is not supported") - evp_cipher=cipher.cipher - if format == "PEM": - r=libcrypto.PEM_write_bio_PrivateKey(b.bio,self.key,evp_cipher,None,0,_cb, - password) - else: - if cipher is not None: - raise NotImplementedError("Der-formatted encrypted keys are not supported") - r=libcrypto.i2d_PrivateKey_bio(b.bio,self.key) - if r==0: - raise PKeyError("error serializing private key") - return str(b) - @staticmethod - def _configure_context(ctx,opts,skip=[]): - """ - Configures context of public key operations - @param ctx - context to configure - @param opts - dictionary of options (from kwargs of calling - function) - @param skip - list of options which shouldn't be passed to - context - """ + paramsfrom does work too + """ + tmpeng=c_void_p(None) + ameth=libcrypto.EVP_PKEY_asn1_find_str(byref(tmpeng),algorithm,-1) + if ameth is None: + raise PKeyError("Algorithm %s not foind\n"%(algname)) + clear_err_stack() + pkey_id=c_int(0) + libcrypto.EVP_PKEY_asn1_get0_info(byref(pkey_id),None,None,None,None,ameth) + #libcrypto.ENGINE_finish(tmpeng) + if "paramsfrom" in kwargs: + ctx=libcrypto.EVP_PKEY_CTX_new(kwargs["paramsfrom"].key,None) + else: + ctx=libcrypto.EVP_PKEY_CTX_new_id(pkey_id,None) + # FIXME support EC curve as keyword param by invoking paramgen + # operation + if ctx is None: + raise PKeyError("Creating context for key type %d"%(pkey_id.value)) + if libcrypto.EVP_PKEY_keygen_init(ctx) <=0 : + raise PKeyError("keygen_init") + PKey._configure_context(ctx,kwargs,["paramsfrom"]) + key=c_void_p(None) + if libcrypto.EVP_PKEY_keygen(ctx,byref(key))<=0: + raise PKeyError("Error generating key") + libcrypto.EVP_PKEY_CTX_free(ctx) + return PKey(ptr=key,cansign=True) + def exportpub(self,format="PEM"): + """ + Returns public key as PEM or DER structure. + """ + b=Membio() + if format == "PEM": + r=libcrypto.PEM_write_bio_PUBKEY(b.bio,self.key) + else: + r=libcrypto.i2d_PUBKEY_bio(b.bio,self.key) + if r==0: + raise PKeyError("error serializing public key") + return str(b) + def exportpriv(self,format="PEM",password=None,cipher=None): + """ + Returns private key as PEM or DER Structure. + If password and cipher are specified, encrypts key + on given password, using given algorithm. Cipher must be + an ctypescrypto.cipher.CipherType object + """ + b=Membio() + if cipher is None: + evp_cipher=None + else: + if password is None: + raise NotImplementedError("Interactive password entry is not supported") + evp_cipher=cipher.cipher + if format == "PEM": + r=libcrypto.PEM_write_bio_PrivateKey(b.bio,self.key,evp_cipher,None,0,_cb, + password) + else: + if cipher is not None: + raise NotImplementedError("Der-formatted encrypted keys are not supported") + r=libcrypto.i2d_PrivateKey_bio(b.bio,self.key) + if r==0: + raise PKeyError("error serializing private key") + return str(b) + @staticmethod + def _configure_context(ctx,opts,skip=[]): + """ + Configures context of public key operations + @param ctx - context to configure + @param opts - dictionary of options (from kwargs of calling + function) + @param skip - list of options which shouldn't be passed to + context + """ - for oper in opts: - if oper in skip: - continue - rv=libcrypto.EVP_PKEY_CTX_ctrl_str(ctx,oper,str(opts[oper])) - if rv==-2: - raise PKeyError("Parameter %s is not supported by key"%(oper,)) - if rv<1: - raise PKeyError("Error setting parameter %s"%(oper,)) + for oper in opts: + if oper in skip: + continue + rv=libcrypto.EVP_PKEY_CTX_ctrl_str(ctx,oper,str(opts[oper])) + if rv==-2: + raise PKeyError("Parameter %s is not supported by key"%(oper,)) + if rv<1: + raise PKeyError("Error setting parameter %s"%(oper,)) # Declare function prototypes libcrypto.EVP_PKEY_cmp.argtypes=(c_void_p,c_void_p) libcrypto.PEM_read_bio_PrivateKey.restype=c_void_p diff --git a/ctypescrypto/rand.py b/ctypescrypto/rand.py index 4bc073a..92e67e9 100644 --- a/ctypescrypto/rand.py +++ b/ctypescrypto/rand.py @@ -1,5 +1,5 @@ """ - Interface to the OpenSSL pseudo-random generator + Interface to the OpenSSL pseudo-random generator """ from ctypes import create_string_buffer, c_char_p, c_int, c_double @@ -9,61 +9,61 @@ from ctypescrypto.exception import LibCryptoError __all__ = ['RandError','bytes','pseudo_bytes','seed','status'] class RandError(LibCryptoError): - pass + pass def bytes( num, check_result=False): - """ - Returns num bytes of cryptographically strong pseudo-random - bytes. If checkc_result is True, raises error if PRNG is not - seeded enough - """ + """ + Returns num bytes of cryptographically strong pseudo-random + bytes. If checkc_result is True, raises error if PRNG is not + seeded enough + """ - if num <= 0 : - raise ValueError("'num' should be > 0") - buffer = create_string_buffer(num) - result = libcrypto.RAND_bytes(buffer, num) - if check_result and result == 0: - raise RandError("Random Number Generator not seeded sufficiently") - return buffer.raw[:num] + if num <= 0 : + raise ValueError("'num' should be > 0") + buffer = create_string_buffer(num) + result = libcrypto.RAND_bytes(buffer, num) + if check_result and result == 0: + raise RandError("Random Number Generator not seeded sufficiently") + return buffer.raw[:num] def pseudo_bytes(num): - """ - Returns num bytes of pseudo random data. Pseudo- random byte - sequences generated by pseudo_bytes() will be unique if - they are of sufficient length, but are not necessarily - unpredictable. They can be used for non-cryptographic purposes - and for certain purposes in cryptographic protocols, but usually - not for key generation etc. - """ - if num <= 0 : - raise ValueError("'num' should be > 0") - buffer = create_string_buffer(num) - libcrypto.RAND_pseudo_bytes(buffer, num) - return buffer.raw[:num] + """ + Returns num bytes of pseudo random data. Pseudo- random byte + sequences generated by pseudo_bytes() will be unique if + they are of sufficient length, but are not necessarily + unpredictable. They can be used for non-cryptographic purposes + and for certain purposes in cryptographic protocols, but usually + not for key generation etc. + """ + if num <= 0 : + raise ValueError("'num' should be > 0") + buffer = create_string_buffer(num) + libcrypto.RAND_pseudo_bytes(buffer, num) + return buffer.raw[:num] def seed(data, entropy=None): - """ - Seeds random generator with data. - If entropy is not None, it should be floating point(double) - value estimating amount of entropy in the data (in bytes). - """ - if not isinstance(data,str): - raise TypeError("A string is expected") - ptr = c_char_p(data) - size = len(data) - if entropy is None: - libcrypto.RAND_seed(ptr, size) - else : - libcrypto.RAND_add(ptr, size, entropy) + """ + Seeds random generator with data. + If entropy is not None, it should be floating point(double) + value estimating amount of entropy in the data (in bytes). + """ + if not isinstance(data,str): + raise TypeError("A string is expected") + ptr = c_char_p(data) + size = len(data) + if entropy is None: + libcrypto.RAND_seed(ptr, size) + else : + libcrypto.RAND_add(ptr, size, entropy) def status(): - """ - Returns 1 if random generator is sufficiently seeded and 0 - otherwise - """ + """ + Returns 1 if random generator is sufficiently seeded and 0 + otherwise + """ - return libcrypto.RAND_status() - + return libcrypto.RAND_status() + libcrypto.RAND_add.argtypes=(c_char_p,c_int,c_double) libcrypto.RAND_seed.argtypes=(c_char_p,c_int) libcrypto.RAND_pseudo_bytes.argtypes=(c_char_p,c_int) diff --git a/ctypescrypto/x509.py b/ctypescrypto/x509.py index bd81fdd..159956d 100644 --- a/ctypescrypto/x509.py +++ b/ctypescrypto/x509.py @@ -16,534 +16,534 @@ from ctypescrypto.exception import LibCryptoError from ctypescrypto import libcrypto from datetime import datetime try: - from pytz import utc + from pytz import utc except ImportError: - from datetime import timedelta,tzinfo - ZERO=timedelta(0) - class UTC(tzinfo): - """tzinfo object for UTC. - If no pytz is available, we would use it. - """ + from datetime import timedelta,tzinfo + ZERO=timedelta(0) + class UTC(tzinfo): + """tzinfo object for UTC. + If no pytz is available, we would use it. + """ - def utcoffset(self, dt): - return ZERO + def utcoffset(self, dt): + return ZERO - def tzname(self, dt): - return "UTC" + def tzname(self, dt): + return "UTC" - def dst(self, dt): - return ZERO + def dst(self, dt): + return ZERO - utc=UTC() + utc=UTC() __all__ = ['X509','X509Error','X509Name','X509Store','StackOfX509'] class _validity(Structure): - """ ctypes representation of X509_VAL structure - needed to access certificate validity period, because openssl - doesn't provide fuctions for it - only macros - """ - _fields_ = [('notBefore',c_void_p),('notAfter',c_void_p)] + """ ctypes representation of X509_VAL structure + needed to access certificate validity period, because openssl + doesn't provide fuctions for it - only macros + """ + _fields_ = [('notBefore',c_void_p),('notAfter',c_void_p)] class _cinf(Structure): - """ ctypes representtion of X509_CINF structure - neede to access certificate data, which are accessable only - via macros - """ - _fields_ = [('version',c_void_p), - ('serialNumber',c_void_p), - ('sign_alg',c_void_p), - ('issuer',c_void_p), - ('validity',POINTER(_validity)), - ('subject',c_void_p), - ('pubkey',c_void_p), - ('issuerUID',c_void_p), - ('subjectUID',c_void_p), - ('extensions',c_void_p), - ] + """ ctypes representtion of X509_CINF structure + neede to access certificate data, which are accessable only + via macros + """ + _fields_ = [('version',c_void_p), + ('serialNumber',c_void_p), + ('sign_alg',c_void_p), + ('issuer',c_void_p), + ('validity',POINTER(_validity)), + ('subject',c_void_p), + ('pubkey',c_void_p), + ('issuerUID',c_void_p), + ('subjectUID',c_void_p), + ('extensions',c_void_p), + ] class _x509(Structure): - """ - ctypes represntation of X509 structure needed - to access certificate data which are accesable only via - macros, not functions - """ - _fields_ = [('cert_info',POINTER(_cinf)), - ('sig_alg',c_void_p), - ('signature',c_void_p), - # There are a lot of parsed extension fields there - ] + """ + ctypes represntation of X509 structure needed + to access certificate data which are accesable only via + macros, not functions + """ + _fields_ = [('cert_info',POINTER(_cinf)), + ('sig_alg',c_void_p), + ('signature',c_void_p), + # There are a lot of parsed extension fields there + ] _px509 = POINTER(_x509) class X509Error(LibCryptoError): - """ - Exception, generated when some openssl function fail - during X509 operation - """ - pass + """ + Exception, generated when some openssl function fail + during X509 operation + """ + pass class X509Name(object): - """ - Class which represents X.509 distinguished name - typically - a certificate subject name or an issuer name. + """ + Class which represents X.509 distinguished name - typically + a certificate subject name or an issuer name. - Now used only to represent information, extracted from the - certificate. Potentially can be also used to build DN when creating - certificate signing request - """ - # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT - PRINT_FLAG=0x10010 - ESC_MSB=4 - def __init__(self,ptr=None,copy=False): - """ - Creates a X509Name object - @param ptr - pointer to X509_NAME C structure (as returned by some OpenSSL functions - @param copy - indicates that this structure have to be freed upon object destruction - """ - if ptr is not None: - self.ptr=ptr - self.need_free=copy - self.writable=False - else: - self.ptr=libcrypto.X509_NAME_new() - self.need_free=True - self.writable=True - def __del__(self): - """ - Frees if neccessary - """ - if self.need_free: - libcrypto.X509_NAME_free(self.ptr) - def __str__(self): - """ - Produces an ascii representation of the name, escaping all symbols > 0x80 - Probably it is not what you want, unless your native language is English - """ - b=Membio() - libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB) - return str(b) - def __unicode__(self): - """ - Produces unicode representation of the name. - """ - b=Membio() - libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG) - return unicode(b) - def __len__(self): - """ - return number of components in the name - """ - return libcrypto.X509_NAME_entry_count(self.ptr) - def __cmp__(self,other): - """ - Compares X509 names - """ - return libcrypto.X509_NAME_cmp(self.ptr,other.ptr) - def __eq__(self,other): - return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0 + Now used only to represent information, extracted from the + certificate. Potentially can be also used to build DN when creating + certificate signing request + """ + # XN_FLAG_SEP_COMMA_PLUS & ASN1_STRFLG_UTF8_CONVERT + PRINT_FLAG=0x10010 + ESC_MSB=4 + def __init__(self,ptr=None,copy=False): + """ + Creates a X509Name object + @param ptr - pointer to X509_NAME C structure (as returned by some OpenSSL functions + @param copy - indicates that this structure have to be freed upon object destruction + """ + if ptr is not None: + self.ptr=ptr + self.need_free=copy + self.writable=False + else: + self.ptr=libcrypto.X509_NAME_new() + self.need_free=True + self.writable=True + def __del__(self): + """ + Frees if neccessary + """ + if self.need_free: + libcrypto.X509_NAME_free(self.ptr) + def __str__(self): + """ + Produces an ascii representation of the name, escaping all symbols > 0x80 + Probably it is not what you want, unless your native language is English + """ + b=Membio() + libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG | self.ESC_MSB) + return str(b) + def __unicode__(self): + """ + Produces unicode representation of the name. + """ + b=Membio() + libcrypto.X509_NAME_print_ex(b.bio,self.ptr,0,self.PRINT_FLAG) + return unicode(b) + def __len__(self): + """ + return number of components in the name + """ + return libcrypto.X509_NAME_entry_count(self.ptr) + def __cmp__(self,other): + """ + Compares X509 names + """ + return libcrypto.X509_NAME_cmp(self.ptr,other.ptr) + def __eq__(self,other): + return libcrypto.X509_NAME_cmp(self.ptr,other.ptr)==0 - def __getitem__(self,key): - if isinstance(key,Oid): - # Return first matching field - idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1) - if idx<0: - raise KeyError("Key not found "+str(Oid)) - entry=libcrypto.X509_NAME_get_entry(self.ptr,idx) - s=libcrypto.X509_NAME_ENTRY_get_data(entry) - b=Membio() - libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG) - return unicode(b) - elif isinstance(key,(int,long)): - # Return OID, string tuple - entry=libcrypto.X509_NAME_get_entry(self.ptr,key) - if entry is None: - raise IndexError("name entry index out of range") - oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry)) - s=libcrypto.X509_NAME_ENTRY_get_data(entry) - b=Membio() - libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG) - return (oid,unicode(b)) - else: - raise TypeError("X509 NAME can be indexed by Oids or integers only") + def __getitem__(self,key): + if isinstance(key,Oid): + # Return first matching field + idx=libcrypto.X509_NAME_get_index_by_NID(self.ptr,key.nid,-1) + if idx<0: + raise KeyError("Key not found "+str(Oid)) + entry=libcrypto.X509_NAME_get_entry(self.ptr,idx) + s=libcrypto.X509_NAME_ENTRY_get_data(entry) + b=Membio() + libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG) + return unicode(b) + elif isinstance(key,(int,long)): + # Return OID, string tuple + entry=libcrypto.X509_NAME_get_entry(self.ptr,key) + if entry is None: + raise IndexError("name entry index out of range") + oid=Oid.fromobj(libcrypto.X509_NAME_ENTRY_get_object(entry)) + s=libcrypto.X509_NAME_ENTRY_get_data(entry) + b=Membio() + libcrypto.ASN1_STRING_print_ex(b.bio,s,self.PRINT_FLAG) + return (oid,unicode(b)) + else: + raise TypeError("X509 NAME can be indexed by Oids or integers only") - def __setitem__(self,key,val): - if not self.writable: - raise ValueError("Attempt to modify constant X509 object") - else: - raise NotImplementedError - def __delitem__(self,key): - if not self.writable: - raise ValueError("Attempt to modify constant X509 object") - else: - raise NotImplementedError - def __hash__(self): - return libcrypto.X509_NAME_hash(self.ptr) + def __setitem__(self,key,val): + if not self.writable: + raise ValueError("Attempt to modify constant X509 object") + else: + raise NotImplementedError + def __delitem__(self,key): + if not self.writable: + raise ValueError("Attempt to modify constant X509 object") + else: + raise NotImplementedError + def __hash__(self): + return libcrypto.X509_NAME_hash(self.ptr) class _x509_ext(Structure): - """ Represens C structure X509_EXTENSION """ - _fields_=[("object",c_void_p), - ("critical",c_int), - ("value",c_void_p)] + """ Represens C structure X509_EXTENSION """ + _fields_=[("object",c_void_p), + ("critical",c_int), + ("value",c_void_p)] class X509_EXT(object): - """ Python object which represents a certificate extension """ - def __init__(self,ptr,copy=False): - """ Initializes from the pointer to X509_EXTENSION. - If copy is True, creates a copy, otherwise just - stores pointer. - """ - if copy: - self.ptr=libcrypto.X509_EXTENSION_dup(ptr) - else: - self.ptr=cast(ptr,POINTER(_x509_ext)) - def __del__(self): - libcrypto.X509_EXTENSION_free(self.ptr) - def __str__(self): - b=Membio() - libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0) - return str(b) - def __unicode__(self): - b=Membio() - libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0) - return unicode(b) - @property - def oid(self): - return Oid.fromobj(self.ptr[0].object) - @property - def critical(self): - return self.ptr[0].critical >0 -class _X509extlist(object): - """ - Represents list of certificate extensions - """ - def __init__(self,cert): - self.cert=cert - def __len__(self): - return libcrypto.X509_get_ext_count(self.cert.cert) - def __getitem__(self,item): - p=libcrypto.X509_get_ext(self.cert.cert,item) - if p is None: - raise IndexError - return X509_EXT(p,True) - def find(self,oid): - """ - Return list of extensions with given Oid - """ - if not isinstance(oid,Oid): - raise TypeError("Need crytypescrypto.oid.Oid as argument") - found=[] - l=-1 - end=len(self) - while True: - l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l) - if l>=end or l<0: - break - found.append(self[l]) - return found - def find_critical(self,crit=True): - """ - Return list of critical extensions (or list of non-cricital, if - optional second argument is False - """ - if crit: - flag=1 - else: - flag=0 - found=[] - end=len(self) - l=-1 - while True: - l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l) - if l>=end or l<0: - break - found.append(self[l]) - return found + """ Python object which represents a certificate extension """ + def __init__(self,ptr,copy=False): + """ Initializes from the pointer to X509_EXTENSION. + If copy is True, creates a copy, otherwise just + stores pointer. + """ + if copy: + self.ptr=libcrypto.X509_EXTENSION_dup(ptr) + else: + self.ptr=cast(ptr,POINTER(_x509_ext)) + def __del__(self): + libcrypto.X509_EXTENSION_free(self.ptr) + def __str__(self): + b=Membio() + libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0) + return str(b) + def __unicode__(self): + b=Membio() + libcrypto.X509V3_EXT_print(b.bio,self.ptr,0x20010,0) + return unicode(b) + @property + def oid(self): + return Oid.fromobj(self.ptr[0].object) + @property + def critical(self): + return self.ptr[0].critical >0 +class _X509extlist(object): + """ + Represents list of certificate extensions + """ + def __init__(self,cert): + self.cert=cert + def __len__(self): + return libcrypto.X509_get_ext_count(self.cert.cert) + def __getitem__(self,item): + p=libcrypto.X509_get_ext(self.cert.cert,item) + if p is None: + raise IndexError + return X509_EXT(p,True) + def find(self,oid): + """ + Return list of extensions with given Oid + """ + if not isinstance(oid,Oid): + raise TypeError("Need crytypescrypto.oid.Oid as argument") + found=[] + l=-1 + end=len(self) + while True: + l=libcrypto.X509_get_ext_by_NID(self.cert.cert,oid.nid,l) + if l>=end or l<0: + break + found.append(self[l]) + return found + def find_critical(self,crit=True): + """ + Return list of critical extensions (or list of non-cricital, if + optional second argument is False + """ + if crit: + flag=1 + else: + flag=0 + found=[] + end=len(self) + l=-1 + while True: + l=libcrypto.X509_get_ext_by_critical(self.cert.cert,flag,l) + if l>=end or l<0: + break + found.append(self[l]) + return found class X509(object): - """ - Represents X.509 certificate. - """ - def __init__(self,data=None,ptr=None,format="PEM"): - """ - Initializes certificate - @param data - serialized certificate in PEM or DER format. - @param ptr - pointer to X509, returned by some openssl function. - mutually exclusive with data - @param format - specifies data format. "PEM" or "DER", default PEM - """ - if ptr is not None: - if data is not None: - raise TypeError("Cannot use data and ptr simultaneously") - self.cert = ptr - elif data is None: - raise TypeError("data argument is required") - else: - b=Membio(data) - if format == "PEM": - self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None) - else: - self.cert=libcrypto.d2i_X509_bio(b.bio,None) - if self.cert is None: - raise X509Error("error reading certificate") - self.extensions=_X509extlist(self) - def __del__(self): - """ - Frees certificate object - """ - libcrypto.X509_free(self.cert) - def __str__(self): - """ Returns der string of the certificate """ - b=Membio() - if libcrypto.i2d_X509_bio(b.bio,self.cert)==0: - raise X509Error("error serializing certificate") - return str(b) - def __repr__(self): - """ Returns valid call to the constructor """ - return "X509(data="+repr(str(self))+",format='DER')" - @property - def pubkey(self): - """EVP PKEy object of certificate public key""" - return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False)) - def pem(self): - """ Returns PEM represntation of the certificate """ - b=Membio() - if libcrypto.PEM_write_bio_X509(b.bio,self.cert)==0: - raise X509Error("error serializing certificate") - return str(b) - def verify(self,store=None,chain=[],key=None): - """ - Verify self. Supports verification on both X509 store object - or just public issuer key - @param store X509Store object. - @param chain - list of X509 objects to add into verification - context.These objects are untrusted, but can be used to - build certificate chain up to trusted object in the store - @param key - PKey object with open key to validate signature - - parameters store and key are mutually exclusive. If neither - is specified, attempts to verify self as self-signed certificate - """ - if store is not None and key is not None: - raise X509Error("key and store cannot be specified simultaneously") - if store is not None: - ctx=libcrypto.X509_STORE_CTX_new() - if ctx is None: - raise X509Error("Error allocating X509_STORE_CTX") - if chain is not None and len(chain)>0: - ch=StackOfX509(chain) - else: - ch=None - if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0: - raise X509Error("Error allocating X509_STORE_CTX") - res= libcrypto.X509_verify_cert(ctx) - libcrypto.X509_STORE_CTX_free(ctx) - return res>0 - else: - if key is None: - if self.issuer != self.subject: - # Not a self-signed certificate - return False - key = self.pubkey - res = libcrypto.X509_verify(self.cert,key.key) - if res < 0: - raise X509Error("X509_verify failed") - return res>0 - - @property - def subject(self): - """ X509Name for certificate subject name """ - return X509Name(libcrypto.X509_get_subject_name(self.cert)) - @property - def issuer(self): - """ X509Name for certificate issuer name """ - return X509Name(libcrypto.X509_get_issuer_name(self.cert)) - @property - def serial(self): - """ Serial number of certificate as integer """ - asnint=libcrypto.X509_get_serialNumber(self.cert) - b=Membio() - libcrypto.i2a_ASN1_INTEGER(b.bio,asnint) - return int(str(b),16) - @property - def version(self): - """ certificate version as integer. Really certificate stores 0 for - version 1 and 2 for version 3, but we return 1 and 3 """ - asn1int=cast(self.cert,_px509)[0].cert_info[0].version - return libcrypto.ASN1_INTEGER_get(asn1int)+1 - @property - def startDate(self): - """ Certificate validity period start date """ - # Need deep poke into certificate structure (x)->cert_info->validity->notBefore - global utc - asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore - b=Membio() - libcrypto.ASN1_TIME_print(b.bio,asn1date) - return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc) - @property - def endDate(self): - """ Certificate validity period end date """ - # Need deep poke into certificate structure (x)->cert_info->validity->notAfter - global utc - asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter - b=Membio() - libcrypto.ASN1_TIME_print(b.bio,asn1date) - return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc) - def check_ca(self): - """ Returns True if certificate is CA certificate """ - return libcrypto.X509_check_ca(self.cert)>0 + """ + Represents X.509 certificate. + """ + def __init__(self,data=None,ptr=None,format="PEM"): + """ + Initializes certificate + @param data - serialized certificate in PEM or DER format. + @param ptr - pointer to X509, returned by some openssl function. + mutually exclusive with data + @param format - specifies data format. "PEM" or "DER", default PEM + """ + if ptr is not None: + if data is not None: + raise TypeError("Cannot use data and ptr simultaneously") + self.cert = ptr + elif data is None: + raise TypeError("data argument is required") + else: + b=Membio(data) + if format == "PEM": + self.cert=libcrypto.PEM_read_bio_X509(b.bio,None,None,None) + else: + self.cert=libcrypto.d2i_X509_bio(b.bio,None) + if self.cert is None: + raise X509Error("error reading certificate") + self.extensions=_X509extlist(self) + def __del__(self): + """ + Frees certificate object + """ + libcrypto.X509_free(self.cert) + def __str__(self): + """ Returns der string of the certificate """ + b=Membio() + if libcrypto.i2d_X509_bio(b.bio,self.cert)==0: + raise X509Error("error serializing certificate") + return str(b) + def __repr__(self): + """ Returns valid call to the constructor """ + return "X509(data="+repr(str(self))+",format='DER')" + @property + def pubkey(self): + """EVP PKEy object of certificate public key""" + return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False)) + def pem(self): + """ Returns PEM represntation of the certificate """ + b=Membio() + if libcrypto.PEM_write_bio_X509(b.bio,self.cert)==0: + raise X509Error("error serializing certificate") + return str(b) + def verify(self,store=None,chain=[],key=None): + """ + Verify self. Supports verification on both X509 store object + or just public issuer key + @param store X509Store object. + @param chain - list of X509 objects to add into verification + context.These objects are untrusted, but can be used to + build certificate chain up to trusted object in the store + @param key - PKey object with open key to validate signature + + parameters store and key are mutually exclusive. If neither + is specified, attempts to verify self as self-signed certificate + """ + if store is not None and key is not None: + raise X509Error("key and store cannot be specified simultaneously") + if store is not None: + ctx=libcrypto.X509_STORE_CTX_new() + if ctx is None: + raise X509Error("Error allocating X509_STORE_CTX") + if chain is not None and len(chain)>0: + ch=StackOfX509(chain) + else: + ch=None + if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0: + raise X509Error("Error allocating X509_STORE_CTX") + res= libcrypto.X509_verify_cert(ctx) + libcrypto.X509_STORE_CTX_free(ctx) + return res>0 + else: + if key is None: + if self.issuer != self.subject: + # Not a self-signed certificate + return False + key = self.pubkey + res = libcrypto.X509_verify(self.cert,key.key) + if res < 0: + raise X509Error("X509_verify failed") + return res>0 + + @property + def subject(self): + """ X509Name for certificate subject name """ + return X509Name(libcrypto.X509_get_subject_name(self.cert)) + @property + def issuer(self): + """ X509Name for certificate issuer name """ + return X509Name(libcrypto.X509_get_issuer_name(self.cert)) + @property + def serial(self): + """ Serial number of certificate as integer """ + asnint=libcrypto.X509_get_serialNumber(self.cert) + b=Membio() + libcrypto.i2a_ASN1_INTEGER(b.bio,asnint) + return int(str(b),16) + @property + def version(self): + """ certificate version as integer. Really certificate stores 0 for + version 1 and 2 for version 3, but we return 1 and 3 """ + asn1int=cast(self.cert,_px509)[0].cert_info[0].version + return libcrypto.ASN1_INTEGER_get(asn1int)+1 + @property + def startDate(self): + """ Certificate validity period start date """ + # Need deep poke into certificate structure (x)->cert_info->validity->notBefore + global utc + asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notBefore + b=Membio() + libcrypto.ASN1_TIME_print(b.bio,asn1date) + return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc) + @property + def endDate(self): + """ Certificate validity period end date """ + # Need deep poke into certificate structure (x)->cert_info->validity->notAfter + global utc + asn1date=cast(self.cert,_px509)[0].cert_info[0].validity[0].notAfter + b=Membio() + libcrypto.ASN1_TIME_print(b.bio,asn1date) + return datetime.strptime(str(b),"%b %d %H:%M:%S %Y %Z").replace(tzinfo=utc) + def check_ca(self): + """ Returns True if certificate is CA certificate """ + return libcrypto.X509_check_ca(self.cert)>0 class X509Store(object): - """ - Represents trusted certificate store. Can be used to lookup CA - certificates to verify + """ + Represents trusted certificate store. Can be used to lookup CA + certificates to verify - @param file - file with several certificates and crls - to load into store - @param dir - hashed directory with certificates and crls - @param default - if true, default verify location (directory) - is installed + @param file - file with several certificates and crls + to load into store + @param dir - hashed directory with certificates and crls + @param default - if true, default verify location (directory) + is installed - """ - def __init__(self,file=None,dir=None,default=False): - """ - Creates X509 store and installs lookup method. Optionally initializes - by certificates from given file or directory. - """ - # - # Todo - set verification flags - # - self.store=libcrypto.X509_STORE_new() - if self.store is None: - raise X509Error("allocating store") - lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file()) - if lookup is None: - raise X509Error("error installing file lookup method") - if (file is not None): - if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0: - raise X509Error("error loading trusted certs from file "+file) - lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir()) - if lookup is None: - raise X509Error("error installing hashed lookup method") - if dir is not None: - if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0: - raise X509Error("error adding hashed trusted certs dir "+dir) - if default: - if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0: - raise X509Error("error adding default trusted certs dir ") - def add_cert(self,cert): - """ - Explicitely adds certificate to set of trusted in the store - @param cert - X509 object to add - """ - if not isinstance(cert,X509): - raise TypeError("cert should be X509") - libcrypto.X509_STORE_add_cert(self.store,cert.cert) - def add_callback(self,callback): - """ - Installs callback function, which would receive detailed information - about verified ceritificates - """ - raise NotImplementedError - def setflags(self,flags): - """ - Set certificate verification flags. - @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants - """ - libcrypto.X509_STORE_set_flags(self.store,flags) - def setpurpose(self,purpose): - """ - Sets certificate purpose which verified certificate should match - @param purpose - number from 1 to 9 or standard strind defined in Openssl - possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper - """ - if isinstance(purpose,str): - purp_no=X509_PURPOSE_get_by_sname(purpose) - if purp_no <=0: - raise X509Error("Invalid certificate purpose '"+purpose+"'") - elif isinstance(purpose,int): - purp_no = purpose - if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0: - raise X509Error("cannot set purpose") - def setdepth(self,depth): - """ - Sets the verification depth i.e. max length of certificate chain - which is acceptable - """ - libcrypto.X509_STORE_set_depth(self.store,depth) - def settime(self, time): - """ - Set point in time used to check validity of certificates for - Time can be either python datetime object or number of seconds - sinse epoch - """ - if isinstance(time,datetime.datetime) or isinstance(time,datetime.date): - d=int(time.strftime("%s")) - elif isinstance(time,int): - pass - else: - raise TypeError("datetime.date, datetime.datetime or integer is required as time argument") - raise NotImplementedError + """ + def __init__(self,file=None,dir=None,default=False): + """ + Creates X509 store and installs lookup method. Optionally initializes + by certificates from given file or directory. + """ + # + # Todo - set verification flags + # + self.store=libcrypto.X509_STORE_new() + if self.store is None: + raise X509Error("allocating store") + lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_file()) + if lookup is None: + raise X509Error("error installing file lookup method") + if (file is not None): + if not libcrypto.X509_LOOKUP_ctrl(lookup,1,file,1,None)>0: + raise X509Error("error loading trusted certs from file "+file) + lookup=libcrypto.X509_STORE_add_lookup(self.store,libcrypto.X509_LOOKUP_hash_dir()) + if lookup is None: + raise X509Error("error installing hashed lookup method") + if dir is not None: + if not libcrypto.X509_LOOKUP_ctrl(lookup,2,dir,1,None)>0: + raise X509Error("error adding hashed trusted certs dir "+dir) + if default: + if not libcrypto.X509_LOOKUP_ctrl(lookup,2,None,3,None)>0: + raise X509Error("error adding default trusted certs dir ") + def add_cert(self,cert): + """ + Explicitely adds certificate to set of trusted in the store + @param cert - X509 object to add + """ + if not isinstance(cert,X509): + raise TypeError("cert should be X509") + libcrypto.X509_STORE_add_cert(self.store,cert.cert) + def add_callback(self,callback): + """ + Installs callback function, which would receive detailed information + about verified ceritificates + """ + raise NotImplementedError + def setflags(self,flags): + """ + Set certificate verification flags. + @param flags - integer bit mask. See OpenSSL X509_V_FLAG_* constants + """ + libcrypto.X509_STORE_set_flags(self.store,flags) + def setpurpose(self,purpose): + """ + Sets certificate purpose which verified certificate should match + @param purpose - number from 1 to 9 or standard strind defined in Openssl + possible strings - sslcient,sslserver, nssslserver, smimesign,smimeencrypt, crlsign, any,ocsphelper + """ + if isinstance(purpose,str): + purp_no=X509_PURPOSE_get_by_sname(purpose) + if purp_no <=0: + raise X509Error("Invalid certificate purpose '"+purpose+"'") + elif isinstance(purpose,int): + purp_no = purpose + if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0: + raise X509Error("cannot set purpose") + def setdepth(self,depth): + """ + Sets the verification depth i.e. max length of certificate chain + which is acceptable + """ + libcrypto.X509_STORE_set_depth(self.store,depth) + def settime(self, time): + """ + Set point in time used to check validity of certificates for + Time can be either python datetime object or number of seconds + sinse epoch + """ + if isinstance(time,datetime.datetime) or isinstance(time,datetime.date): + d=int(time.strftime("%s")) + elif isinstance(time,int): + pass + else: + raise TypeError("datetime.date, datetime.datetime or integer is required as time argument") + raise NotImplementedError class StackOfX509(object): - """ - Implements OpenSSL STACK_OF(X509) object. - It looks much like python container types - """ - def __init__(self,certs=None,ptr=None,disposable=True): - """ - Create stack - @param certs - list of X509 objects. If specified, read-write - stack is created and populated by these certificates - @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by - some functions - @param disposable - if True, stack created from object, returned - by function is copy, and can be modified and need to be - freeid. If false, it is just pointer into another - structure i.e. CMS_ContentInfo - """ - if ptr is None: - self.need_free = True - self.ptr=libcrypto.sk_new_null() - if certs is not None: - for crt in certs: - self.append(crt) - elif certs is not None: - raise ValueError("cannot handle certs an ptr simultaneously") - else: - self.need_free = disposable - self.ptr=ptr - def __len__(self): - return libcrypto.sk_num(self.ptr) - def __getitem__(self,index): - if index <0 or index>=len(self): - raise IndexError - p=libcrypto.sk_value(self.ptr,index) - return X509(ptr=libcrypto.X509_dup(p)) - def __setitem__(self,index,value): - if not self.need_free: - raise ValueError("Stack is read-only") - if index <0 or index>=len(self): - raise IndexError - if not isinstance(value,X509): - raise TypeError('StackOfX508 can contain only X509 objects') - p=libcrypto.sk_value(self.ptr,index) - libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert)) - libcrypto.X509_free(p) - def __delitem__(self,index): - if not self.need_free: - raise ValueError("Stack is read-only") - if index <0 or index>=len(self): - raise IndexError - p=libcrypto.sk_delete(self.ptr,index) - libcrypto.X509_free(p) - def __del__(self): - if self.need_free: - libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free) - def append(self,value): - if not self.need_free: - raise ValueError("Stack is read-only") - if not isinstance(value,X509): - raise TypeError('StackOfX508 can contain only X509 objects') - libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert)) + """ + Implements OpenSSL STACK_OF(X509) object. + It looks much like python container types + """ + def __init__(self,certs=None,ptr=None,disposable=True): + """ + Create stack + @param certs - list of X509 objects. If specified, read-write + stack is created and populated by these certificates + @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by + some functions + @param disposable - if True, stack created from object, returned + by function is copy, and can be modified and need to be + freeid. If false, it is just pointer into another + structure i.e. CMS_ContentInfo + """ + if ptr is None: + self.need_free = True + self.ptr=libcrypto.sk_new_null() + if certs is not None: + for crt in certs: + self.append(crt) + elif certs is not None: + raise ValueError("cannot handle certs an ptr simultaneously") + else: + self.need_free = disposable + self.ptr=ptr + def __len__(self): + return libcrypto.sk_num(self.ptr) + def __getitem__(self,index): + if index <0 or index>=len(self): + raise IndexError + p=libcrypto.sk_value(self.ptr,index) + return X509(ptr=libcrypto.X509_dup(p)) + def __setitem__(self,index,value): + if not self.need_free: + raise ValueError("Stack is read-only") + if index <0 or index>=len(self): + raise IndexError + if not isinstance(value,X509): + raise TypeError('StackOfX508 can contain only X509 objects') + p=libcrypto.sk_value(self.ptr,index) + libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert)) + libcrypto.X509_free(p) + def __delitem__(self,index): + if not self.need_free: + raise ValueError("Stack is read-only") + if index <0 or index>=len(self): + raise IndexError + p=libcrypto.sk_delete(self.ptr,index) + libcrypto.X509_free(p) + def __del__(self): + if self.need_free: + libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free) + def append(self,value): + if not self.need_free: + raise ValueError("Stack is read-only") + if not isinstance(value,X509): + raise TypeError('StackOfX508 can contain only X509 objects') + libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert)) libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p) libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long) libcrypto.PEM_read_bio_X509.restype=c_void_p diff --git a/setup.py b/setup.py index 8f62662..6d95b46 100644 --- a/setup.py +++ b/setup.py @@ -3,42 +3,42 @@ import distutils.cmd import sys,os class MyTests(distutils.cmd.Command): - user_options=[] - def initialize_options(self): - pass - def finalize_options(self): - pass - def run(self): - sys.path.insert(0,os.getcwd()) - import unittest - result=unittest.TextTestResult(sys.stdout,True,True) - suite= unittest.defaultTestLoader.discover("./tests") - print "Discovered %d test cases"%suite.countTestCases() - result.buffer=True - suite.run(result) - print "" - if not result.wasSuccessful(): - if len(result.errors): - print "============ Errors disovered =================" - for r in result.errors: - print r[0],":",r[1] - - if len(result.failures): - print "============ Failures disovered =================" - for r in result.failures: - print r[0],":",r[1] - sys.exit(1) - else: - print "All tests successful" + user_options=[] + def initialize_options(self): + pass + def finalize_options(self): + pass + def run(self): + sys.path.insert(0,os.getcwd()) + import unittest + result=unittest.TextTestResult(sys.stdout,True,True) + suite= unittest.defaultTestLoader.discover("./tests") + print "Discovered %d test cases"%suite.countTestCases() + result.buffer=True + suite.run(result) + print "" + if not result.wasSuccessful(): + if len(result.errors): + print "============ Errors disovered =================" + for r in result.errors: + print r[0],":",r[1] + + if len(result.failures): + print "============ Failures disovered =================" + for r in result.failures: + print r[0],":",r[1] + sys.exit(1) + else: + print "All tests successful" setup( - name="ctypescrypto", - version="0.2.7", - description="CTypes-based interface for some OpenSSL libcrypto features", - author="Victor Wagner", - author_email="vitus@wagner.pp.ru", - url="https://github.com/vbwagner/ctypescrypto", - packages=["ctypescrypto"], - cmdclass={"test":MyTests} + name="ctypescrypto", + version="0.2.7", + description="CTypes-based interface for some OpenSSL libcrypto features", + author="Victor Wagner", + author_email="vitus@wagner.pp.ru", + url="https://github.com/vbwagner/ctypescrypto", + packages=["ctypescrypto"], + cmdclass={"test":MyTests} ) diff --git a/tests/testbio.py b/tests/testbio.py index c8fc313..81f2162 100644 --- a/tests/testbio.py +++ b/tests/testbio.py @@ -2,37 +2,37 @@ from ctypescrypto.bio import Membio import unittest class TestRead(unittest.TestCase): - def test_readshortstr(self): - s="A quick brown fox jumps over a lazy dog" - bio=Membio(s) - data=bio.read() - self.assertEqual(data,s) - data2=bio.read() - self.assertEqual(data2,"") - del bio - def test_readwithlen(self): - s="A quick brown fox jumps over a lazy dog" - bio=Membio(s) - data=bio.read(len(s)) - self.assertEqual(data,s) - data2=bio.read(5) - self.assertEqual(data2,"") - def test_readwrongtype(self): - s="A quick brown fox jumps over a lazy dog" - bio=Membio(s) - with self.assertRaises(TypeError): - data=bio.read("5") - def test_reset(self): - s="A quick brown fox jumps over a lazy dog" - bio=Membio(s) - data=bio.read() - bio.reset() - data2=bio.read() - del bio - self.assertEqual(data,data2) - self.assertEqual(data,s) - def test_readlongstr(self): - poem='''Eyes of grey--a sodden quay, + def test_readshortstr(self): + s="A quick brown fox jumps over a lazy dog" + bio=Membio(s) + data=bio.read() + self.assertEqual(data,s) + data2=bio.read() + self.assertEqual(data2,"") + del bio + def test_readwithlen(self): + s="A quick brown fox jumps over a lazy dog" + bio=Membio(s) + data=bio.read(len(s)) + self.assertEqual(data,s) + data2=bio.read(5) + self.assertEqual(data2,"") + def test_readwrongtype(self): + s="A quick brown fox jumps over a lazy dog" + bio=Membio(s) + with self.assertRaises(TypeError): + data=bio.read("5") + def test_reset(self): + s="A quick brown fox jumps over a lazy dog" + bio=Membio(s) + data=bio.read() + bio.reset() + data2=bio.read() + del bio + self.assertEqual(data,data2) + self.assertEqual(data,s) + def test_readlongstr(self): + poem='''Eyes of grey--a sodden quay, Driving rain and falling tears, As the steamer wears to sea In a parting storm of cheers. @@ -82,39 +82,39 @@ And a maiden showed me grace, Four-and-forty times would I Sing the Lovers' Litany: "Love like ours can never die!"''' - bio=Membio(poem) - data=bio.read() - self.assertEqual(data,poem) - del bio - def test_readparts(self): - s="A quick brown fox jumps over the lazy dog" - bio=Membio(s) - a=bio.read(10) - self.assertEqual(a,s[0:10]) - b=bio.read(9) - self.assertEqual(b,s[10:19]) - c=bio.read() - self.assertEqual(c,s[19:]) - d=bio.read() - self.assertEqual(d,"") + bio=Membio(poem) + data=bio.read() + self.assertEqual(data,poem) + del bio + def test_readparts(self): + s="A quick brown fox jumps over the lazy dog" + bio=Membio(s) + a=bio.read(10) + self.assertEqual(a,s[0:10]) + b=bio.read(9) + self.assertEqual(b,s[10:19]) + c=bio.read() + self.assertEqual(c,s[19:]) + d=bio.read() + self.assertEqual(d,"") class TestWrite(unittest.TestCase): - def test_write(self): - b=Membio() - b.write("A quick brown ") - b.write("fox jumps over ") - b.write("the lazy dog.") - self.assertEqual(str(b),"A quick brown fox jumps over the lazy dog.") + def test_write(self): + b=Membio() + b.write("A quick brown ") + b.write("fox jumps over ") + b.write("the lazy dog.") + self.assertEqual(str(b),"A quick brown fox jumps over the lazy dog.") - def test_unicode(self): - b=Membio() - s='\xd0\xba\xd0\xb0\xd0\xba \xd1\x8d\xd1\x82\xd0\xbe \xd0\xbf\xd0\xbe-\xd1\x80\xd1\x83\xd1\x81\xd1\x81\xd0\xba\xd0\xb8' - b.write(s) - self.assertEqual(unicode(b),u'\u043a\u0430\u043a \u044d\u0442\u043e \u043f\u043e-\u0440\u0443\u0441\u0441\u043a\u0438') - def test_unicode2(self): - b=Membio() - u=u'\u043a\u0430\u043a \u044d\u0442\u043e \u043f\u043e-\u0440\u0443\u0441\u0441\u043a\u0438' - b.write(u) - self.assertEqual(unicode(b),u) + def test_unicode(self): + b=Membio() + s='\xd0\xba\xd0\xb0\xd0\xba \xd1\x8d\xd1\x82\xd0\xbe \xd0\xbf\xd0\xbe-\xd1\x80\xd1\x83\xd1\x81\xd1\x81\xd0\xba\xd0\xb8' + b.write(s) + self.assertEqual(unicode(b),u'\u043a\u0430\u043a \u044d\u0442\u043e \u043f\u043e-\u0440\u0443\u0441\u0441\u043a\u0438') + def test_unicode2(self): + b=Membio() + u=u'\u043a\u0430\u043a \u044d\u0442\u043e \u043f\u043e-\u0440\u0443\u0441\u0441\u043a\u0438' + b.write(u) + self.assertEqual(unicode(b),u) if __name__ == '__main__': - unittest.main() + unittest.main() diff --git a/tests/testcipher.py b/tests/testcipher.py index 7fa5a97..2236943 100644 --- a/tests/testcipher.py +++ b/tests/testcipher.py @@ -5,98 +5,98 @@ import sys class TestCipherType(unittest.TestCase): - def test_ciphdescbc(self): - ct=cipher.CipherType("des-cbc") - self.assertEqual(ct.block_size(),8) - self.assertEqual(ct.key_length(),8) - self.assertEqual(ct.iv_length(),8) - self.assertEqual(ct.algo(),'DES-CBC') - self.assertEqual(ct.oid().shortname(),"DES-CBC") - self.assertEqual(ct.mode(),"CBC") - def test_ciphaesofb(self): - ct=cipher.CipherType("aes-256-ofb") - self.assertEqual(ct.block_size(),1) - self.assertEqual(ct.key_length(),32) - self.assertEqual(ct.iv_length(),16) - self.assertEqual(ct.algo(),'AES-256-OFB') - self.assertEqual(ct.oid().shortname(),"AES-256-OFB") - self.assertEqual(ct.mode(),"OFB") - def test_unknowncipher(self): - with self.assertRaises(cipher.CipherError): - ct=cipher.CipherType("no-such-cipher") + def test_ciphdescbc(self): + ct=cipher.CipherType("des-cbc") + self.assertEqual(ct.block_size(),8) + self.assertEqual(ct.key_length(),8) + self.assertEqual(ct.iv_length(),8) + self.assertEqual(ct.algo(),'DES-CBC') + self.assertEqual(ct.oid().shortname(),"DES-CBC") + self.assertEqual(ct.mode(),"CBC") + def test_ciphaesofb(self): + ct=cipher.CipherType("aes-256-ofb") + self.assertEqual(ct.block_size(),1) + self.assertEqual(ct.key_length(),32) + self.assertEqual(ct.iv_length(),16) + self.assertEqual(ct.algo(),'AES-256-OFB') + self.assertEqual(ct.oid().shortname(),"AES-256-OFB") + self.assertEqual(ct.mode(),"OFB") + def test_unknowncipher(self): + with self.assertRaises(cipher.CipherError): + ct=cipher.CipherType("no-such-cipher") class TestEncryptDecrypt(unittest.TestCase): - def test_cons_nokey(self): - ct=cipher.CipherType("DES-CBC") - with self.assertRaises(ValueError): - c=cipher.Cipher(ct,None,None) - def test_blockcipher(self): - data="sdfdsddf" - key='abcdabcd' - c=cipher.new("DES-CBC",key) - enc=c.update(data)+c.finish() - # See if padding is added by default - self.assertEqual(len(enc),16) - d=cipher.new("DES-CBC",key,encrypt=False) - dec=d.update(enc)+d.finish() - self.assertEqual(data,dec) - def test_blockcipher_nopadding(self): - data="sdfdsddf" - key='abcdabcd' - c=cipher.new("DES-CBC",key) - c.padding(False) - enc=c.update(data)+c.finish() - # See if padding is added by default - self.assertEqual(len(enc),8) - d=cipher.new("DES-CBC",key,encrypt=False) - d.padding(False) - dec=d.update(enc)+d.finish() - self.assertEqual(data,dec) - def test_ofb_cipher(self): - data="sdfdsddfxx" - key='abcdabcd' - iv='abcdabcd' - c=cipher.new("DES-OFB",key,iv=iv) - enc=c.update(data)+c.finish() - # See if padding is added by default - self.assertEqual(len(enc),len(data)) - d=cipher.new("DES-OFB",key,encrypt=False,iv=iv) - dec=d.update(enc)+d.finish() - self.assertEqual(data,dec) + def test_cons_nokey(self): + ct=cipher.CipherType("DES-CBC") + with self.assertRaises(ValueError): + c=cipher.Cipher(ct,None,None) + def test_blockcipher(self): + data="sdfdsddf" + key='abcdabcd' + c=cipher.new("DES-CBC",key) + enc=c.update(data)+c.finish() + # See if padding is added by default + self.assertEqual(len(enc),16) + d=cipher.new("DES-CBC",key,encrypt=False) + dec=d.update(enc)+d.finish() + self.assertEqual(data,dec) + def test_blockcipher_nopadding(self): + data="sdfdsddf" + key='abcdabcd' + c=cipher.new("DES-CBC",key) + c.padding(False) + enc=c.update(data)+c.finish() + # See if padding is added by default + self.assertEqual(len(enc),8) + d=cipher.new("DES-CBC",key,encrypt=False) + d.padding(False) + dec=d.update(enc)+d.finish() + self.assertEqual(data,dec) + def test_ofb_cipher(self): + data="sdfdsddfxx" + key='abcdabcd' + iv='abcdabcd' + c=cipher.new("DES-OFB",key,iv=iv) + enc=c.update(data)+c.finish() + # See if padding is added by default + self.assertEqual(len(enc),len(data)) + d=cipher.new("DES-OFB",key,encrypt=False,iv=iv) + dec=d.update(enc)+d.finish() + self.assertEqual(data,dec) - def test_ofb_noiv(self): - data="sdfdsddfxx" - encryptkey='abcdabcd'*4 - decryptkey=encryptkey[0:5]+encryptkey[5:] + def test_ofb_noiv(self): + data="sdfdsddfxx" + encryptkey='abcdabcd'*4 + decryptkey=encryptkey[0:5]+encryptkey[5:] - c=cipher.new("AES-256-OFB",encryptkey) - enc=c.update(data)+c.finish() - # See if padding is added by default - self.assertEqual(len(enc),len(data)) - d=cipher.new("AES-256-OFB",decryptkey,encrypt=False) - dec=d.update(enc)+d.finish() - self.assertEqual(data,dec) - def test_wrong_keylength(self): - data="sdfsdfxxx" - key="abcdabcd" - with self.assertRaises(ValueError): - c=cipher.new("AES-128-OFB",key) - def test_wrong_ivlength(self): - key="abcdabcdabcdabcd" - iv="xxxxx" - with self.assertRaises(ValueError): - c=cipher.new("AES-128-OFB",key,iv=iv) - def test_variable_keylength(self): - encryptkey="abcdefabcdefghtlgvasdgdgsdgdsg" - data="asdfsdfsdfsdfsdfsdfsdfsdf" - iv="abcdefgh" - c=cipher.new("bf-ofb",encryptkey,iv=iv) - ciphertext=c.update(data)+c.finish() - decryptkey=encryptkey[0:5]+encryptkey[5:] - d=cipher.new("bf-ofb",decryptkey,encrypt=False,iv=iv) - deciph=d.update(ciphertext)+d.finish() - self.assertEqual(deciph,data) + c=cipher.new("AES-256-OFB",encryptkey) + enc=c.update(data)+c.finish() + # See if padding is added by default + self.assertEqual(len(enc),len(data)) + d=cipher.new("AES-256-OFB",decryptkey,encrypt=False) + dec=d.update(enc)+d.finish() + self.assertEqual(data,dec) + def test_wrong_keylength(self): + data="sdfsdfxxx" + key="abcdabcd" + with self.assertRaises(ValueError): + c=cipher.new("AES-128-OFB",key) + def test_wrong_ivlength(self): + key="abcdabcdabcdabcd" + iv="xxxxx" + with self.assertRaises(ValueError): + c=cipher.new("AES-128-OFB",key,iv=iv) + def test_variable_keylength(self): + encryptkey="abcdefabcdefghtlgvasdgdgsdgdsg" + data="asdfsdfsdfsdfsdfsdfsdfsdf" + iv="abcdefgh" + c=cipher.new("bf-ofb",encryptkey,iv=iv) + ciphertext=c.update(data)+c.finish() + decryptkey=encryptkey[0:5]+encryptkey[5:] + d=cipher.new("bf-ofb",decryptkey,encrypt=False,iv=iv) + deciph=d.update(ciphertext)+d.finish() + self.assertEqual(deciph,data) if __name__ == '__main__': - unittest.main() + unittest.main() diff --git a/tests/testdigest.py b/tests/testdigest.py index 741c1d6..120d73c 100644 --- a/tests/testdigest.py +++ b/tests/testdigest.py @@ -4,157 +4,157 @@ from base64 import b16decode,b16encode import unittest class TestDigestType(unittest.TestCase): - def test_md4(self): - d=digest.DigestType("md4") - self.assertEqual(d.digest_size(),16) - self.assertEqual(d.block_size(),64) - self.assertEqual(d.oid(),Oid("md4")) - self.assertEqual(d.name,'md4') - def test_md5(self): - d=digest.DigestType("md5") - self.assertEqual(d.digest_size(),16) - self.assertEqual(d.block_size(),64) - self.assertEqual(d.oid(),Oid("md5")) - self.assertEqual(d.name,'md5') - def test_sha1(self): - d=digest.DigestType("sha1") - self.assertEqual(d.digest_size(),20) - self.assertEqual(d.block_size(),64) - self.assertEqual(d.oid(),Oid("sha1")) - self.assertEqual(d.name,'sha1') - def test_sha256(self): - d=digest.DigestType("sha256") - self.assertEqual(d.digest_size(),32) - self.assertEqual(d.block_size(),64) - self.assertEqual(d.oid(),Oid("sha256")) - self.assertEqual(d.name,'sha256') - def test_sha384(self): - d=digest.DigestType("sha384") - self.assertEqual(d.digest_size(),48) - self.assertEqual(d.block_size(),128) - self.assertEqual(d.oid(),Oid("sha384")) - self.assertEqual(d.name,'sha384') - def test_sha512(self): - d=digest.DigestType("sha512") - self.assertEqual(d.digest_size(),64) - self.assertEqual(d.block_size(),128) - self.assertEqual(d.oid(),Oid("sha512")) - self.assertEqual(d.name,'sha512') - def test_createfromoid(self): - oid=Oid('sha256') - d=digest.DigestType(oid) - self.assertEqual(d.digest_size(),32) - self.assertEqual(d.block_size(),64) - self.assertEqual(d.oid(),Oid("sha256")) - self.assertEqual(d.name,'sha256') - def test_createfromEVP_MD(self): - d1=digest.DigestType("sha256") - d2=digest.DigestType(None) - with self.assertRaises(AttributeError): - s=d2.name - d2.digest=d1.digest - self.assertEqual(d2.digest_size(),32) - self.assertEqual(d2.block_size(),64) - self.assertEqual(d2.oid(),Oid("sha256")) - self.assertEqual(d2.name,'sha256') - def test_invalidDigest(self): - with self.assertRaises(digest.DigestError): - d=digest.DigestType("no-such-digest") + def test_md4(self): + d=digest.DigestType("md4") + self.assertEqual(d.digest_size(),16) + self.assertEqual(d.block_size(),64) + self.assertEqual(d.oid(),Oid("md4")) + self.assertEqual(d.name,'md4') + def test_md5(self): + d=digest.DigestType("md5") + self.assertEqual(d.digest_size(),16) + self.assertEqual(d.block_size(),64) + self.assertEqual(d.oid(),Oid("md5")) + self.assertEqual(d.name,'md5') + def test_sha1(self): + d=digest.DigestType("sha1") + self.assertEqual(d.digest_size(),20) + self.assertEqual(d.block_size(),64) + self.assertEqual(d.oid(),Oid("sha1")) + self.assertEqual(d.name,'sha1') + def test_sha256(self): + d=digest.DigestType("sha256") + self.assertEqual(d.digest_size(),32) + self.assertEqual(d.block_size(),64) + self.assertEqual(d.oid(),Oid("sha256")) + self.assertEqual(d.name,'sha256') + def test_sha384(self): + d=digest.DigestType("sha384") + self.assertEqual(d.digest_size(),48) + self.assertEqual(d.block_size(),128) + self.assertEqual(d.oid(),Oid("sha384")) + self.assertEqual(d.name,'sha384') + def test_sha512(self): + d=digest.DigestType("sha512") + self.assertEqual(d.digest_size(),64) + self.assertEqual(d.block_size(),128) + self.assertEqual(d.oid(),Oid("sha512")) + self.assertEqual(d.name,'sha512') + def test_createfromoid(self): + oid=Oid('sha256') + d=digest.DigestType(oid) + self.assertEqual(d.digest_size(),32) + self.assertEqual(d.block_size(),64) + self.assertEqual(d.oid(),Oid("sha256")) + self.assertEqual(d.name,'sha256') + def test_createfromEVP_MD(self): + d1=digest.DigestType("sha256") + d2=digest.DigestType(None) + with self.assertRaises(AttributeError): + s=d2.name + d2.digest=d1.digest + self.assertEqual(d2.digest_size(),32) + self.assertEqual(d2.block_size(),64) + self.assertEqual(d2.oid(),Oid("sha256")) + self.assertEqual(d2.name,'sha256') + def test_invalidDigest(self): + with self.assertRaises(digest.DigestError): + d=digest.DigestType("no-such-digest") class TestIface(unittest.TestCase): - """ Test all methods with one algorithms """ - msg="A quick brown fox jumps over the lazy dog." - dgst="00CFFE7312BF9CA73584F24BDF7DF1D028340397" - def test_cons(self): - md=digest.DigestType("sha1") - dgst=digest.Digest(md) - dgst.update(self.msg) - self.assertEqual(dgst.digest_size,20) - self.assertEqual(dgst.hexdigest(),self.dgst) - def test_digestwithdata(self): - md=digest.DigestType("sha1") - dgst=digest.Digest(md) - self.assertEqual(dgst.digest(self.msg),b16decode(self.dgst)) - def test_length(self): - l=len(self.msg) - msg=self.msg+" Dog barks furiously." - dgst=digest.new("sha1") - dgst.update(msg,length=l) - self.assertEqual(dgst.hexdigest(),self.dgst) - def test_badlength(self): - l=len(self.msg) - dgst=digest.new("sha1") - with self.assertRaises(ValueError): - dgst.update(self.msg,length=l+1) - def test_bindigest(self): - dgst=digest.new("sha1") - dgst.update(self.msg) - self.assertEqual(dgst.digest_size,20) - self.assertEqual(dgst.digest(),b16decode(self.dgst,True)) - def test_duplicatedigest(self): - dgst=digest.new("sha1") - dgst.update(self.msg) - v1=dgst.digest() - v2=dgst.digest() - self.assertEqual(v1,v2) - def test_updatefinalized(self): - dgst=digest.new("sha1") - dgst.update(self.msg) - h=dgst.hexdigest() - with self.assertRaises(digest.DigestError): - dgst.update(self.msg) - def test_wrongtype(self): - dgst=digest.new("sha1") - with self.assertRaises(TypeError): - dgst.update(['a','b','c']) - with self.assertRaises(TypeError): - dgst.update(18) - with self.assertRaises(TypeError): - dgst.update({"a":"b","c":"D"}) - with self.assertRaises(TypeError): - dgst.update(u'\u0430\u0431') - def test_copy(self): - dgst=digest.new("sha1") - dgst.update("A quick brown fox jumps over ") - d2=dgst.copy() - dgst.update("the lazy dog.") - value1=dgst.hexdigest() - d2.update("the fat pig.") - value2=d2.hexdigest() - self.assertEqual(value1,"00CFFE7312BF9CA73584F24BDF7DF1D028340397") - self.assertEqual(value2,"5328F33739BEC2A15B6A30F17D3BC13CC11A7C78") + """ Test all methods with one algorithms """ + msg="A quick brown fox jumps over the lazy dog." + dgst="00CFFE7312BF9CA73584F24BDF7DF1D028340397" + def test_cons(self): + md=digest.DigestType("sha1") + dgst=digest.Digest(md) + dgst.update(self.msg) + self.assertEqual(dgst.digest_size,20) + self.assertEqual(dgst.hexdigest(),self.dgst) + def test_digestwithdata(self): + md=digest.DigestType("sha1") + dgst=digest.Digest(md) + self.assertEqual(dgst.digest(self.msg),b16decode(self.dgst)) + def test_length(self): + l=len(self.msg) + msg=self.msg+" Dog barks furiously." + dgst=digest.new("sha1") + dgst.update(msg,length=l) + self.assertEqual(dgst.hexdigest(),self.dgst) + def test_badlength(self): + l=len(self.msg) + dgst=digest.new("sha1") + with self.assertRaises(ValueError): + dgst.update(self.msg,length=l+1) + def test_bindigest(self): + dgst=digest.new("sha1") + dgst.update(self.msg) + self.assertEqual(dgst.digest_size,20) + self.assertEqual(dgst.digest(),b16decode(self.dgst,True)) + def test_duplicatedigest(self): + dgst=digest.new("sha1") + dgst.update(self.msg) + v1=dgst.digest() + v2=dgst.digest() + self.assertEqual(v1,v2) + def test_updatefinalized(self): + dgst=digest.new("sha1") + dgst.update(self.msg) + h=dgst.hexdigest() + with self.assertRaises(digest.DigestError): + dgst.update(self.msg) + def test_wrongtype(self): + dgst=digest.new("sha1") + with self.assertRaises(TypeError): + dgst.update(['a','b','c']) + with self.assertRaises(TypeError): + dgst.update(18) + with self.assertRaises(TypeError): + dgst.update({"a":"b","c":"D"}) + with self.assertRaises(TypeError): + dgst.update(u'\u0430\u0431') + def test_copy(self): + dgst=digest.new("sha1") + dgst.update("A quick brown fox jumps over ") + d2=dgst.copy() + dgst.update("the lazy dog.") + value1=dgst.hexdigest() + d2.update("the fat pig.") + value2=d2.hexdigest() + self.assertEqual(value1,"00CFFE7312BF9CA73584F24BDF7DF1D028340397") + self.assertEqual(value2,"5328F33739BEC2A15B6A30F17D3BC13CC11A7C78") class TestAlgo(unittest.TestCase): - """ Test all statdard algorithms """ - def test_md5(self): - d=digest.new("md5") - self.assertEqual(d.digest_size,16) - d.update("A quick brown fox jumps over the lazy dog.") - self.assertEqual(d.hexdigest(),"DF756A3769FCAB0A261880957590C768") + """ Test all statdard algorithms """ + def test_md5(self): + d=digest.new("md5") + self.assertEqual(d.digest_size,16) + d.update("A quick brown fox jumps over the lazy dog.") + self.assertEqual(d.hexdigest(),"DF756A3769FCAB0A261880957590C768") - def test_md4(self): - d=digest.new("md4") - d.update("A quick brown fox jumps over the lazy dog.") - self.assertEqual(d.digest_size,16) - self.assertEqual(d.hexdigest(),"FAAED595A3E38BBF0D9B4B98021D200F") - def test_sha256(self): - d=digest.new("sha256") - d.update("A quick brown fox jumps over the lazy dog.") - self.assertEqual(d.digest_size,32) - self.assertEqual(d.hexdigest(),"FFCA2587CFD4846E4CB975B503C9EB940F94566AA394E8BD571458B9DA5097D5") - def test_sha384(self): - d=digest.new("sha384") - d.update("A quick brown fox jumps over the lazy dog.") - self.assertEqual(d.digest_size,48) - self.assertEqual(d.hexdigest(),"C7D71B1BA81D0DD028E79C7E75CF2F83169C14BA732CA5A2AD731151584E9DE843C1A314077D62B96B03367F72E126D8") - def test_sha512(self): - d=digest.new("sha512") - self.assertEqual(d.digest_size,64) - d.update("A quick brown fox jumps over the lazy dog.") - self.assertEqual(d.hexdigest(),"3045575CF3B873DD656F5F3426E04A4ACD11950BB2538772EE14867002B408E21FF18EF7F7B2CAB484A3C1C0BE3F8ACC4AED536A427353C7748DC365FC1A8646") - def test_wrongdigest(self): - with self.assertRaises(digest.DigestError): - dgst=digest.new("no-such-digest") + def test_md4(self): + d=digest.new("md4") + d.update("A quick brown fox jumps over the lazy dog.") + self.assertEqual(d.digest_size,16) + self.assertEqual(d.hexdigest(),"FAAED595A3E38BBF0D9B4B98021D200F") + def test_sha256(self): + d=digest.new("sha256") + d.update("A quick brown fox jumps over the lazy dog.") + self.assertEqual(d.digest_size,32) + self.assertEqual(d.hexdigest(),"FFCA2587CFD4846E4CB975B503C9EB940F94566AA394E8BD571458B9DA5097D5") + def test_sha384(self): + d=digest.new("sha384") + d.update("A quick brown fox jumps over the lazy dog.") + self.assertEqual(d.digest_size,48) + self.assertEqual(d.hexdigest(),"C7D71B1BA81D0DD028E79C7E75CF2F83169C14BA732CA5A2AD731151584E9DE843C1A314077D62B96B03367F72E126D8") + def test_sha512(self): + d=digest.new("sha512") + self.assertEqual(d.digest_size,64) + d.update("A quick brown fox jumps over the lazy dog.") + self.assertEqual(d.hexdigest(),"3045575CF3B873DD656F5F3426E04A4ACD11950BB2538772EE14867002B408E21FF18EF7F7B2CAB484A3C1C0BE3F8ACC4AED536A427353C7748DC365FC1A8646") + def test_wrongdigest(self): + with self.assertRaises(digest.DigestError): + dgst=digest.new("no-such-digest") if __name__ == "__main__": - unittest.main() + unittest.main() diff --git a/tests/testec.py b/tests/testec.py index 496a747..40623c2 100644 --- a/tests/testec.py +++ b/tests/testec.py @@ -6,27 +6,27 @@ import unittest class TestEcCreation(unittest.TestCase): - ec1priv="""-----BEGIN PRIVATE KEY----- + ec1priv="""-----BEGIN PRIVATE KEY----- MIGEAgEAMBAGByqGSM49AgEGBSuBBAAKBG0wawIBAQQgKnG6neqZvB98EEuuxnHs fv+L/5abuNNG20wzUqRpncOhRANCAARWKXWeUZ6WiCKZ2kHx87jmJyx0G3ZB1iQC +Gp2AJYswbQPhGPigKolzIbZYfwnn7QOca6N8QDhPAn3QQK8trZI -----END PRIVATE KEY----- """ - bigkey="""-----BEGIN PRIVATE KEY----- + bigkey="""-----BEGIN PRIVATE KEY----- MHUCAQAwEAYHKoZIzj0CAQYFK4EEAAoEXjBcAgEBBBEBRVEjGVC3X8RALaFzL8m+ vqFEA0IABJFmwom5+QXlX549+fadfzVrSiIJX4lPRxVxSqS1Zgav8YHrlmvkrLXP +eFrZtgJvpTiFPBsk/0JEJmvmEmSVec= -----END PRIVATE KEY----- """ - def test_keyone(self): - key=create(Oid("secp256k1"),b16decode("2A71BA9DEA99BC1F7C104BAEC671EC7EFF8BFF969BB8D346DB4C3352A4699DC3",True)) - - out=key.exportpriv() - self.assertEqual(out,self.ec1priv) + def test_keyone(self): + key=create(Oid("secp256k1"),b16decode("2A71BA9DEA99BC1F7C104BAEC671EC7EFF8BFF969BB8D346DB4C3352A4699DC3",True)) + + out=key.exportpriv() + self.assertEqual(out,self.ec1priv) - def test_bignum(self): - keyval='\xff'*32 - key=create(Oid("secp256k1"),keyval) - self.assertEqual(key.exportpriv(),self.bigkey) + def test_bignum(self): + keyval='\xff'*32 + key=create(Oid("secp256k1"),keyval) + self.assertEqual(key.exportpriv(),self.bigkey) if __name__ == "__main__": - unittest.main() + unittest.main() diff --git a/tests/testmac.py b/tests/testmac.py index 1083b48..8e5f333 100644 --- a/tests/testmac.py +++ b/tests/testmac.py @@ -6,38 +6,38 @@ from ctypescrypto.engine import set_default import unittest class TestMac(unittest.TestCase): - def test_hmac_default(self): - d=MAC('hmac',key='1234'*4) - d.update('The Quick brown fox jumps over the lazy dog\n') - self.assertEqual(d.name,'hmac-md5') - self.assertEqual(d.hexdigest(),'A9C16D91CDF2A99273B72336D0D16B56') - def test_hmac_digestdataa(self): - d=MAC('hmac',key='1234'*4) - h=d.hexdigest('The Quick brown fox jumps over the lazy dog\n') - self.assertEqual(d.name,'hmac-md5') - self.assertEqual(h,'A9C16D91CDF2A99273B72336D0D16B56') - def test_hmac_byoid(self): - d=MAC(Oid('hmac'),key='1234'*4) - d.update('The Quick brown fox jumps over the lazy dog\n') - self.assertEqual(d.name,'hmac-md5') - self.assertEqual(d.hexdigest(),'A9C16D91CDF2A99273B72336D0D16B56') - def test_mac_wrongtype(self): - with self.assertRaises(TypeError): - d=MAC(Oid('hmac').nid,key='1234'*4) - def test_hmac_sha256(self): - d=MAC('hmac',key='1234'*16,digest='sha256') - d.update('The Quick brown fox jumps over the lazy dog\n') - self.assertEqual(d.name,'hmac-sha256') - self.assertEqual(d.hexdigest(),'BEBA086E1C67200664DCDEEC697D99DB1A8DAA72933A36B708FC5FD568173095') - def test_gostmac(self): - set_default('gost') - d=MAC('gost-mac',key='1234'*8) - d.update('The Quick brown fox jumps over the lazy dog\n') - self.assertEqual(d.name,'gost-mac') - self.assertEqual(d.digest_size,4) - self.assertEqual(d.hexdigest(),'76F25AE3') - with self.assertRaisesRegexp(DigestError,"invalid mac key length"): - d=MAC('gost-mac',key='1234'*4) + def test_hmac_default(self): + d=MAC('hmac',key='1234'*4) + d.update('The Quick brown fox jumps over the lazy dog\n') + self.assertEqual(d.name,'hmac-md5') + self.assertEqual(d.hexdigest(),'A9C16D91CDF2A99273B72336D0D16B56') + def test_hmac_digestdataa(self): + d=MAC('hmac',key='1234'*4) + h=d.hexdigest('The Quick brown fox jumps over the lazy dog\n') + self.assertEqual(d.name,'hmac-md5') + self.assertEqual(h,'A9C16D91CDF2A99273B72336D0D16B56') + def test_hmac_byoid(self): + d=MAC(Oid('hmac'),key='1234'*4) + d.update('The Quick brown fox jumps over the lazy dog\n') + self.assertEqual(d.name,'hmac-md5') + self.assertEqual(d.hexdigest(),'A9C16D91CDF2A99273B72336D0D16B56') + def test_mac_wrongtype(self): + with self.assertRaises(TypeError): + d=MAC(Oid('hmac').nid,key='1234'*4) + def test_hmac_sha256(self): + d=MAC('hmac',key='1234'*16,digest='sha256') + d.update('The Quick brown fox jumps over the lazy dog\n') + self.assertEqual(d.name,'hmac-sha256') + self.assertEqual(d.hexdigest(),'BEBA086E1C67200664DCDEEC697D99DB1A8DAA72933A36B708FC5FD568173095') + def test_gostmac(self): + set_default('gost') + d=MAC('gost-mac',key='1234'*8) + d.update('The Quick brown fox jumps over the lazy dog\n') + self.assertEqual(d.name,'gost-mac') + self.assertEqual(d.digest_size,4) + self.assertEqual(d.hexdigest(),'76F25AE3') + with self.assertRaisesRegexp(DigestError,"invalid mac key length"): + d=MAC('gost-mac',key='1234'*4) if __name__ == "__main__": - unittest.main() + unittest.main() diff --git a/tests/testoids.py b/tests/testoids.py index 0ff2564..981b1bd 100644 --- a/tests/testoids.py +++ b/tests/testoids.py @@ -2,74 +2,74 @@ from ctypescrypto.oid import Oid,create,cleanup import unittest class TestStandard(unittest.TestCase): - def test_cn(self): - o=Oid("2.5.4.3") - self.assertEqual(repr(o),"Oid('2.5.4.3')") - self.assertEqual(o.dotted(),"2.5.4.3") - self.assertEqual(str(o),"2.5.4.3") - self.assertEqual(o.shortname(),"CN") - self.assertEqual(o.longname(),"commonName") - def test_getnid(self): - o=Oid("2.5.4.3") - x=Oid("CN") - self.assertEqual(o.nid,x.nid) - self.assertEqual(o,x) - self.assertEqual(hash(o),hash(x)) + def test_cn(self): + o=Oid("2.5.4.3") + self.assertEqual(repr(o),"Oid('2.5.4.3')") + self.assertEqual(o.dotted(),"2.5.4.3") + self.assertEqual(str(o),"2.5.4.3") + self.assertEqual(o.shortname(),"CN") + self.assertEqual(o.longname(),"commonName") + def test_getnid(self): + o=Oid("2.5.4.3") + x=Oid("CN") + self.assertEqual(o.nid,x.nid) + self.assertEqual(o,x) + self.assertEqual(hash(o),hash(x)) - def test_cons2(self): - o=Oid("2.5.4.3") - x=Oid("commonName") - self.assertEqual(o.nid,x.nid) - def test_bynid(self): - o=Oid("2.5.4.3") - x=Oid(o.nid) - self.assertEqual(o.nid,x.nid) - def test_fromunicode(self): - o=Oid(u'commonName') - self.assertEqual(o.shortname(),'CN') - def test_wrongoid(self): - with self.assertRaises(ValueError): - o=Oid("1.2.3.4.5.6.7.8.10.111.1111") - def test_wrongname(self): - with self.assertRaises(ValueError): - o=Oid("No such oid in the database") - def test_wrongnid(self): - with self.assertRaises(ValueError): - o=Oid(9999999) - def test_wrongtype(self): - with self.assertRaises(TypeError): - o=Oid([2,5,3,4]) + def test_cons2(self): + o=Oid("2.5.4.3") + x=Oid("commonName") + self.assertEqual(o.nid,x.nid) + def test_bynid(self): + o=Oid("2.5.4.3") + x=Oid(o.nid) + self.assertEqual(o.nid,x.nid) + def test_fromunicode(self): + o=Oid(u'commonName') + self.assertEqual(o.shortname(),'CN') + def test_wrongoid(self): + with self.assertRaises(ValueError): + o=Oid("1.2.3.4.5.6.7.8.10.111.1111") + def test_wrongname(self): + with self.assertRaises(ValueError): + o=Oid("No such oid in the database") + def test_wrongnid(self): + with self.assertRaises(ValueError): + o=Oid(9999999) + def test_wrongtype(self): + with self.assertRaises(TypeError): + o=Oid([2,5,3,4]) class TestCustom(unittest.TestCase): - def testCreate(self): - d='1.2.643.100.3' - sn="SNILS" - long_name="Russian Pension security number" - o=create(d,sn,long_name) - self.assertEqual(str(o),d) - self.assertEqual(o.shortname(),sn) - self.assertEqual(o.longname(),long_name) - def testLookup(self): - d='1.2.643.100.3' - sn="SNILS" - long_name="Russian Pension security number" - o=create(d,sn,long_name) - x=Oid(sn) - self.assertEqual(o,x) - def testCleanup(self): - d='1.2.643.100.3' - sn="SNILS" - long_name="Russian Pension security number" - o=create(d,sn,long_name) - cleanup() - with self.assertRaises(ValueError): - x=Oid(sn) - def tearDown(self): - # Always call cleanup before next test - cleanup() - + def testCreate(self): + d='1.2.643.100.3' + sn="SNILS" + long_name="Russian Pension security number" + o=create(d,sn,long_name) + self.assertEqual(str(o),d) + self.assertEqual(o.shortname(),sn) + self.assertEqual(o.longname(),long_name) + def testLookup(self): + d='1.2.643.100.3' + sn="SNILS" + long_name="Russian Pension security number" + o=create(d,sn,long_name) + x=Oid(sn) + self.assertEqual(o,x) + def testCleanup(self): + d='1.2.643.100.3' + sn="SNILS" + long_name="Russian Pension security number" + o=create(d,sn,long_name) + cleanup() + with self.assertRaises(ValueError): + x=Oid(sn) + def tearDown(self): + # Always call cleanup before next test + cleanup() + if __name__ == '__main__': - unittest.main() + unittest.main() diff --git a/tests/testpbkdf.py b/tests/testpbkdf.py index dac46b4..251ce2d 100644 --- a/tests/testpbkdf.py +++ b/tests/testpbkdf.py @@ -2,21 +2,21 @@ from ctypescrypto.pbkdf2 import pbkdf2 import unittest class TestPBKDF2(unittest.TestCase): - answersha1='\xc13\xb3\xc8\x80\xc2\t\x01\xdaR]\x08\x03\xaf>\x85\xed\x9bU\xf0\x89\n\x81Ctu\xee\xe3\xfe\xd9\xfd\x85\xe2"\x8c\xfbQ\xfeb4\x8f(ZF\xfd\xc3w\x13' - answersha256='oY\xaf\xf7\xfeB7@\xa80%\t\'\xd5r0\xbe\xb4\xf7\xe6TQ\xd2|Tx\xc0e\xff[0a\xe56\xec\xff\xda\xcd\xed~\xbde\xad"\xe8\t\x01o' - answersha1_1000='\xe9\xfe\xbf\xf5K\xfc\xe6h\xfd\xe3\x01\xac\xc8Uc\xcc\x9d\xc7\x1e\xf6\xf8\xd7\xaa\xef\x06se\xbe\x0e^e"\xefa\xba\xe1\xb0\x0b\xc1;\xcd\x05G<\xcc\rE\xfb' - def test_defaults(self): - d=pbkdf2("password","saltsalt",48) - self.assertEqual(d,self.answersha1) - def test_sha1(self): - d=pbkdf2("password","saltsalt",48,digesttype="sha1",iterations=2000) - self.assertEqual(d,self.answersha1) - def test_1000iter(self): - d=pbkdf2("password","saltsalt",48,digesttype="sha1",iterations=1000) - self.assertEqual(d,self.answersha1_1000) - def test_sha256(self): - d=pbkdf2("password","\01\02\03\04\0abc",48,digesttype="sha256") - self.assertEqual(d,self.answersha256) - + answersha1='\xc13\xb3\xc8\x80\xc2\t\x01\xdaR]\x08\x03\xaf>\x85\xed\x9bU\xf0\x89\n\x81Ctu\xee\xe3\xfe\xd9\xfd\x85\xe2"\x8c\xfbQ\xfeb4\x8f(ZF\xfd\xc3w\x13' + answersha256='oY\xaf\xf7\xfeB7@\xa80%\t\'\xd5r0\xbe\xb4\xf7\xe6TQ\xd2|Tx\xc0e\xff[0a\xe56\xec\xff\xda\xcd\xed~\xbde\xad"\xe8\t\x01o' + answersha1_1000='\xe9\xfe\xbf\xf5K\xfc\xe6h\xfd\xe3\x01\xac\xc8Uc\xcc\x9d\xc7\x1e\xf6\xf8\xd7\xaa\xef\x06se\xbe\x0e^e"\xefa\xba\xe1\xb0\x0b\xc1;\xcd\x05G<\xcc\rE\xfb' + def test_defaults(self): + d=pbkdf2("password","saltsalt",48) + self.assertEqual(d,self.answersha1) + def test_sha1(self): + d=pbkdf2("password","saltsalt",48,digesttype="sha1",iterations=2000) + self.assertEqual(d,self.answersha1) + def test_1000iter(self): + d=pbkdf2("password","saltsalt",48,digesttype="sha1",iterations=1000) + self.assertEqual(d,self.answersha1_1000) + def test_sha256(self): + d=pbkdf2("password","\01\02\03\04\0abc",48,digesttype="sha256") + self.assertEqual(d,self.answersha256) + if __name__ == "__main__": - unittest.main() + unittest.main() diff --git a/tests/testpkey.py b/tests/testpkey.py index 00aee7a..7e71e8c 100644 --- a/tests/testpkey.py +++ b/tests/testpkey.py @@ -3,13 +3,13 @@ import unittest from base64 import b64decode, b16decode def pem2der(s): - start=s.find('-----\n') - finish=s.rfind('\n-----END') - data=s[start+6:finish] - return b64decode(data) + start=s.find('-----\n') + finish=s.rfind('\n-----END') + data=s[start+6:finish] + return b64decode(data) class TestPKey(unittest.TestCase): - rsa="""-----BEGIN PRIVATE KEY----- + rsa="""-----BEGIN PRIVATE KEY----- MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAL9CzVZu9bczTmB8 776pPUoPo6WbAfwQqqiGrj91bk2mYE+MNLo4yIQH45IcwGzkyS8+YyQJf8Bux5BC oZ2nwzXm5+JZkxkN1mtMzit2D7/hHmrZLoSbr0sxXFrD4a35RI4hXnSK9Sk01sXA @@ -26,7 +26,7 @@ Ao6uTm8fnkD4C836wS4mYAPqwRBK1JvnEXEQee9irf+ip89BAg74ViTcGF9lwJwQ gOM+X5Db+3pK -----END PRIVATE KEY----- """ - rsakeytext="""Public-Key: (1024 bit) + rsakeytext="""Public-Key: (1024 bit) Modulus: 00:bf:42:cd:56:6e:f5:b7:33:4e:60:7c:ef:be:a9: 3d:4a:0f:a3:a5:9b:01:fc:10:aa:a8:86:ae:3f:75: @@ -39,13 +39,13 @@ Modulus: 1b:a4:85:ab:b0:87:7b:78:2f Exponent: 65537 (0x10001) """ - ec1priv="""-----BEGIN PRIVATE KEY----- + ec1priv="""-----BEGIN PRIVATE KEY----- MIGEAgEAMBAGByqGSM49AgEGBSuBBAAKBG0wawIBAQQgKnG6neqZvB98EEuuxnHs fv+L/5abuNNG20wzUqRpncOhRANCAARWKXWeUZ6WiCKZ2kHx87jmJyx0G3ZB1iQC +Gp2AJYswbQPhGPigKolzIbZYfwnn7QOca6N8QDhPAn3QQK8trZI -----END PRIVATE KEY----- """ - ec1keytext="""Public-Key: (256 bit) + ec1keytext="""Public-Key: (256 bit) pub: 04:56:29:75:9e:51:9e:96:88:22:99:da:41:f1:f3: b8:e6:27:2c:74:1b:76:41:d6:24:02:f8:6a:76:00: @@ -54,66 +54,66 @@ pub: 02:bc:b6:b6:48 ASN1 OID: secp256k1 """ - ec1pub="""-----BEGIN PUBLIC KEY----- + ec1pub="""-----BEGIN PUBLIC KEY----- MFYwEAYHKoZIzj0CAQYFK4EEAAoDQgAEVil1nlGelogimdpB8fO45icsdBt2QdYk AvhqdgCWLMG0D4Rj4oCqJcyG2WH8J5+0DnGujfEA4TwJ90ECvLa2SA== -----END PUBLIC KEY----- """ - - def test_unencrypted_pem(self): - key=PKey(privkey=self.rsa) - self.assertIsNotNone(key.key) - self.assertEqual(str(key),self.rsakeytext) - def test_export_priv_pem(self): - key=PKey(privkey=self.ec1priv) - out=key.exportpriv() - self.assertEqual(self.ec1priv,out) - def test_unencrypted_pem_ec(self): - - key=PKey(privkey=self.ec1priv) - self.assertIsNotNone(key.key) - self.assertEqual(str(key),self.ec1keytext) - def test_unencrypted_der_ec(self): - key=PKey(privkey=pem2der(self.ec1priv),format="DER") - self.assertIsNotNone(key.key) - self.assertEqual(str(key),self.ec1keytext) - def test_pubkey_pem(self): - key=PKey(pubkey=self.ec1pub) - self.assertIsNotNone(key.key) - self.assertEqual(str(key),self.ec1keytext) - def test_pubkey_der(self): - key=PKey(pubkey=pem2der(self.ec1pub),format="DER") - self.assertIsNotNone(key.key) - self.assertEqual(str(key),self.ec1keytext) - def test_compare(self): - key1=PKey(privkey=self.ec1priv) - self.assertIsNotNone(key1.key) - key2=PKey(pubkey=self.ec1pub) - self.assertIsNotNone(key2.key) - self.assertEqual(key1,key2) - def test_sign(self): - signer=PKey(privkey=self.ec1priv) - digest=b16decode("FFCA2587CFD4846E4CB975B503C9EB940F94566AA394E8BD571458B9DA5097D5") - signature=signer.sign(digest) - self.assertTrue(len(signature)>0) - verifier=PKey(pubkey=self.ec1pub) - self.assertTrue(verifier.verify(digest,signature)) - def test_generate(self): - newkey=PKey.generate("rsa") - self.assertIsNotNone(newkey.key) - s=str(newkey) - self.assertEqual(s[:s.find("\n")],"Public-Key: (1024 bit)") - def test_generate_params(self): - newkey=PKey.generate("rsa",rsa_keygen_bits=2048) - self.assertIsNotNone(newkey.key) - s=str(newkey) - self.assertEqual(s[:s.find("\n")],"Public-Key: (2048 bit)") - def test_generate_ec(self): - templkey=PKey(pubkey=self.ec1pub) - newkey=PKey.generate("ec",paramsfrom=templkey) - self.assertIsNotNone(newkey.key) - s=str(newkey) - self.assertEqual(s[:s.find("\n")],"Public-Key: (256 bit)") - self.assertNotEqual(str(templkey),str(newkey)) + + def test_unencrypted_pem(self): + key=PKey(privkey=self.rsa) + self.assertIsNotNone(key.key) + self.assertEqual(str(key),self.rsakeytext) + def test_export_priv_pem(self): + key=PKey(privkey=self.ec1priv) + out=key.exportpriv() + self.assertEqual(self.ec1priv,out) + def test_unencrypted_pem_ec(self): + + key=PKey(privkey=self.ec1priv) + self.assertIsNotNone(key.key) + self.assertEqual(str(key),self.ec1keytext) + def test_unencrypted_der_ec(self): + key=PKey(privkey=pem2der(self.ec1priv),format="DER") + self.assertIsNotNone(key.key) + self.assertEqual(str(key),self.ec1keytext) + def test_pubkey_pem(self): + key=PKey(pubkey=self.ec1pub) + self.assertIsNotNone(key.key) + self.assertEqual(str(key),self.ec1keytext) + def test_pubkey_der(self): + key=PKey(pubkey=pem2der(self.ec1pub),format="DER") + self.assertIsNotNone(key.key) + self.assertEqual(str(key),self.ec1keytext) + def test_compare(self): + key1=PKey(privkey=self.ec1priv) + self.assertIsNotNone(key1.key) + key2=PKey(pubkey=self.ec1pub) + self.assertIsNotNone(key2.key) + self.assertEqual(key1,key2) + def test_sign(self): + signer=PKey(privkey=self.ec1priv) + digest=b16decode("FFCA2587CFD4846E4CB975B503C9EB940F94566AA394E8BD571458B9DA5097D5") + signature=signer.sign(digest) + self.assertTrue(len(signature)>0) + verifier=PKey(pubkey=self.ec1pub) + self.assertTrue(verifier.verify(digest,signature)) + def test_generate(self): + newkey=PKey.generate("rsa") + self.assertIsNotNone(newkey.key) + s=str(newkey) + self.assertEqual(s[:s.find("\n")],"Public-Key: (1024 bit)") + def test_generate_params(self): + newkey=PKey.generate("rsa",rsa_keygen_bits=2048) + self.assertIsNotNone(newkey.key) + s=str(newkey) + self.assertEqual(s[:s.find("\n")],"Public-Key: (2048 bit)") + def test_generate_ec(self): + templkey=PKey(pubkey=self.ec1pub) + newkey=PKey.generate("ec",paramsfrom=templkey) + self.assertIsNotNone(newkey.key) + s=str(newkey) + self.assertEqual(s[:s.find("\n")],"Public-Key: (256 bit)") + self.assertNotEqual(str(templkey),str(newkey)) if __name__ == "__main__": - unittest.main() + unittest.main() diff --git a/tests/testrand.py b/tests/testrand.py index ec861ba..853a491 100644 --- a/tests/testrand.py +++ b/tests/testrand.py @@ -2,27 +2,27 @@ from ctypescrypto.rand import * import unittest class TestRand(unittest.TestCase): - def test_bytes(self): - b=bytes(100) - self.assertEqual(len(b),100) - b2=bytes(100) - self.assertNotEqual(b,b2) - def test_pseudo_bytes(self): - b=pseudo_bytes(100) - self.assertEqual(len(b),100) - b2=pseudo_bytes(100) - self.assertNotEqual(b,b2) - def test_seed(self): - b="aaqwrwfsagdsgdsfgdsfgdfsgdsfgdsgfdsfgdsfg" - seed(b) - # Check if no segfault here - def test_entropy(self): - b="aaqwrwfsagdsgdsfgdsfgdfsgdsfgdsgfdsfgdsfg" - seed(b,2.25) - # Check if no segfault here - def test_Status(self): - i=status() - self.assertEqual(i,1) + def test_bytes(self): + b=bytes(100) + self.assertEqual(len(b),100) + b2=bytes(100) + self.assertNotEqual(b,b2) + def test_pseudo_bytes(self): + b=pseudo_bytes(100) + self.assertEqual(len(b),100) + b2=pseudo_bytes(100) + self.assertNotEqual(b,b2) + def test_seed(self): + b="aaqwrwfsagdsgdsfgdsfgdfsgdsfgdsgfdsfgdsfg" + seed(b) + # Check if no segfault here + def test_entropy(self): + b="aaqwrwfsagdsgdsfgdsfgdfsgdsfgdsgfdsfgdsfg" + seed(b,2.25) + # Check if no segfault here + def test_Status(self): + i=status() + self.assertEqual(i,1) if __name__ == '__main__': - unittest.main() + unittest.main() diff --git a/tests/testx509.py b/tests/testx509.py index 50b3b6a..d083b1c 100644 --- a/tests/testx509.py +++ b/tests/testx509.py @@ -10,7 +10,7 @@ import unittest class TestCertInfo(unittest.TestCase): - ca_cert="""-----BEGIN CERTIFICATE----- + ca_cert="""-----BEGIN CERTIFICATE----- MIIEDzCCAvegAwIBAgIJAN9Ejmna3JJ7MA0GCSqGSIb3DQEBBQUAMIGdMQswCQYD VQQGEwJSVTEVMBMGA1UECAwM0JzQvtGB0LrQstCwMTAwLgYDVQQKDCfQo9C00L7R gdGC0L7QstC10YDRj9GO0YnQuNC5INGG0LXQvdGC0YAxIjAgBgNVBAMMGdCS0LjQ @@ -35,7 +35,7 @@ vFY/2KV16/FdBovTFWMyKrzlYHm0Wgt28IWqhocq/golLfvkz3VAkLQvOF2i6hNc 4feBv69SRTsTCFN9PtJCtxPX/K9LZKeccBKgGjrHQpAF+JU= -----END CERTIFICATE----- """ - cert1="""-----BEGIN CERTIFICATE----- + cert1="""-----BEGIN CERTIFICATE----- MIIEDzCCAvegAwIBAgIJAN9Ejmna3JJ8MA0GCSqGSIb3DQEBBQUAMIGdMQswCQYD VQQGEwJSVTEVMBMGA1UECAwM0JzQvtGB0LrQstCwMTAwLgYDVQQKDCfQo9C00L7R gdGC0L7QstC10YDRj9GO0YnQuNC5INGG0LXQvdGC0YAxIjAgBgNVBAMMGdCS0LjQ @@ -60,7 +60,7 @@ uo291iEXyooazJdbWwZwcwk7WrNNKhqktPTg0X1ZHNnGwOAGPzwNJFGPeFj71r0t aFWU5EMRKaZK75keXq/RdaOAenl+nKF6xA2XHDhGgdndFfY= -----END CERTIFICATE----- """ - pubkey1="""-----BEGIN PUBLIC KEY----- + pubkey1="""-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArQSfrrxNROyzNEz60G2E HBP+E4BL0b1QytGAZZiQp2XIhhQeb7mx+c4mpwgvD7/IdAcK+YVGx78nfY723T3w G48U7HzFNbLvNDycxyXecXbvCmRsxPy8TxkwPf6TIT3UcixtwMMqZFqlAtSTDmOO @@ -70,7 +70,7 @@ iFTXJP8/Au8kfezlA4b+eS81zWq2BFvNlBQsgf04S88oew0CuBBgtjUIIw7XZkS0 3QIDAQAB -----END PUBLIC KEY----- """ - digicert_cert="""digicert.crt + digicert_cert="""digicert.crt -----BEGIN CERTIFICATE----- MIIG5jCCBc6gAwIBAgIQAze5KDR8YKauxa2xIX84YDANBgkqhkiG9w0BAQUFADBs MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3 @@ -111,203 +111,203 @@ BEkAui6mSnKDcp33C4ypieez12Qf1uNgywPE3IjpnSUBAHHLA7QpYCWP+UbRe3Gu zVMSW4SOwg/H7ZMZ2cn6j1g0djIvruFQFGHUqFijyDATI+/GJYw2jxyA -----END CERTIFICATE----- """ - def test_readpubkey(self): - c=X509(self.cert1) - p=c.pubkey - self.assertEqual(p.exportpub(),self.pubkey1) - def test_pem(self): - c=X509(self.cert1) - self.assertEqual(c.pem(),self.cert1) - def test_subject(self): - c=X509(self.cert1) - self.assertEqual(unicode(c.subject),u'C=RU,ST=Москва,L=Москва,O=Частное лицо,CN=Виктор Вагнер') - def test_subject_str(self): - c=X509(self.cert1) - self.assertEqual(str(c.subject),b'C=RU,ST=\\D0\\9C\\D0\\BE\\D1\\81\\D0\\BA\\D0\\B2\\D0\\B0,L=\\D0\\9C\\D0\\BE\\D1\\81\\D0\\BA\\D0\\B2\\D0\\B0,O=\\D0\\A7\\D0\\B0\\D1\\81\\D1\\82\\D0\\BD\\D0\\BE\\D0\\B5 \\D0\\BB\\D0\\B8\\D1\\86\\D0\\BE,CN=\\D0\\92\\D0\\B8\\D0\\BA\\D1\\82\\D0\\BE\\D1\\80 \\D0\\92\\D0\\B0\\D0\\B3\\D0\\BD\\D0\\B5\\D1\\80') - def test_subject_len(self): - c=X509(self.cert1) - self.assertEqual(len(c.subject),5) - def test_issuer(self): - c=X509(self.cert1) - self.assertEqual(unicode(c.issuer),u'C=RU,ST=Москва,O=Удостоверяющий центр,CN=Виктор Вагнер,emailAddress=vitus@wagner.pp.ru') - def test_subjectfields(self): - c=X509(self.cert1) - self.assertEqual(c.subject[Oid("C")],"RU") - with self.assertRaises(TypeError): - x=c.subject["CN"] - self.assertEqual(c.subject[Oid("L")],u'\u041c\u043e\u0441\u043a\u0432\u0430') - def test_subjectmodify(self): - c=X509(self.cert1) - with self.assertRaises(ValueError): - c.subject[Oid("CN")]=u'Foo' - with self.assertRaises(ValueError): - del c.subject[Oid('CN')] - def test_subjectbadsubfield(self): - c=X509(self.cert1) - with self.assertRaises(KeyError): - x=c.subject[Oid("streetAddress")] - def test_subjectfieldindex(self): - c=X509(self.cert1) - self.assertEqual(repr(c.subject[0]),repr((Oid('C'),u'RU'))) - def test_subjectbadindex(self): - c=X509(self.cert1) - with self.assertRaises(IndexError): - x=c.subject[11] - with self.assertRaises(IndexError): - x=c.subject[-1] - def test_notBefore(self): - c=X509(self.cert1) - self.assertEqual(c.startDate,datetime.datetime(2014,10,26,19,07,17,0,utc)) - def test_notAfter(self): - c=X509(self.cert1) - self.assertEqual(c.endDate,datetime.datetime(2024,10,23,19,7,17,0,utc)) - def test_subjectHash(self): - c=X509(self.cert1) - self.assertEqual(hash(c.subject),0x1f3ed722) - def test_issuerHash(self): - c=X509(self.cert1) - self.assertEqual(hash(c.issuer),0x7d3ea8c3) - def test_namecomp(self): - c=X509(self.cert1) - ca=X509(self.ca_cert) - self.assertEqual(c.issuer,ca.subject) - self.assertNotEqual(c.subject,c.issuer) - self.assertEqual(ca.issuer,ca.subject) - def test_serial(self): - c=X509(self.cert1) - self.assertEqual(c.serial,0xDF448E69DADC927CL) - def test_version(self): - c=X509(self.cert1) - self.assertEqual(c.version,3) - def test_ca_cert(self): - ca=X509(self.ca_cert) - self.assertTrue(ca.check_ca()) - notca=X509(self.cert1) - self.assertFalse(notca.check_ca()) - def test_extension_count(self): - cert=X509(self.cert1) - self.assertTrue(len(cert.extensions),4) - ca_cert=X509(self.ca_cert) - self.assertEqual(len(ca_cert.extensions),3) - def test_extension_outofrange(self): - cert=X509(self.cert1) - with self.assertRaises(IndexError): - cert.extensions[4] - with self.assertRaises(IndexError): - cert.extensions[-1] - def test_extension_oid(self): - cert=X509(self.cert1) - ext=cert.extensions[0] - ext_id=ext.oid - self.assertTrue(isinstance(ext_id,Oid)) - self.assertEqual(ext_id,Oid('basicConstraints')) - def test_extension_text(self): - cert=X509(self.cert1) - ext=cert.extensions[0] - self.assertEqual(str(ext),'CA:FALSE') - self.assertEqual(unicode(ext),u'CA:FALSE') - def test_extenson_find(self): - cert=X509(self.cert1) - exts=cert.extensions.find(Oid('subjectAltName')) - self.assertEqual(len(exts),1) - self.assertEqual(exts[0].oid,Oid('subjectAltName')) - def test_extension_bad_find(self): - cert=X509(self.cert1) - with self.assertRaises(TypeError): - exts=cert.extensions.find('subjectAltName') - def test_extenson_critical(self): - cert=X509(self.digicert_cert) - crit_exts=cert.extensions.find_critical() - self.assertEqual(len(crit_exts),2) - other_exts=cert.extensions.find_critical(False) - self.assertEqual(len(crit_exts)+len(other_exts),len(cert.extensions)) - self.assertEqual(crit_exts[0].critical,True) - self.assertEqual(other_exts[0].critical,False) - def test_verify_by_key(self): - ca=X509(self.ca_cert) - pubkey=ca.pubkey - self.assertTrue(ca.verify(key=pubkey)) - c=X509(self.cert1) - pk2=c.pubkey - self.assertFalse(c.verify(key=pk2)) - self.assertTrue(c.verify(key=pubkey)) - def test_verify_self_singed(self): - ca=X509(self.ca_cert) - self.assertTrue(ca.verify()) - def test_default_filestore(self): - store=X509Store(default=True) - c1=X509(self.cert1) - # Cert signed by our CA shouldn't be successfully verified - # by default CA store - self.assertFalse(c1.verify(store)) - # but cert, downloaded from some commercial CA - should. - c2=X509(self.digicert_cert) - self.assertTrue(c2.verify(store)) - def test_verify_by_filestore(self): - trusted=NamedTemporaryFile() - trusted.write(self.ca_cert) - trusted.flush() - goodcert=X509(self.cert1) - badcert=X509(self.cert1[0:-30]+"GG"+self.cert1[-28:]) - gitcert=X509(self.digicert_cert) - store=X509Store(file=trusted.name) - # We should successfuly verify certificate signed by our CA cert - self.assertTrue(goodcert.verify(store)) - # We should reject corrupted certificate - self.assertFalse(badcert.verify(store)) - # And if we specify explicitely certificate file, certificate, - # signed by some commercial CA should be rejected too - self.assertFalse(gitcert.verify(store)) - trusted.close() - def test_verify_by_dirstore(self): - pass - def test_certstack1(self): - l=[] - l.append(X509(self.cert1)) - self.assertEqual(unicode(l[0].subject[Oid('CN')]),u'Виктор Вагнер') - l.append(X509(self.ca_cert)) - l.append(X509(self.digicert_cert)) - stack=StackOfX509(certs=l) - self.assertEqual(len(stack),3) - self.assertTrue(isinstance(stack[1],X509)) - self.assertEqual(unicode(stack[0].subject[Oid('CN')]),u'Виктор Вагнер') - with self.assertRaises(IndexError): - c=stack[-1] - with self.assertRaises(IndexError): - c=stack[3] - del stack[1] - self.assertEqual(len(stack),2) - self.assertEqual(unicode(stack[0].subject[Oid('CN')]),u'Виктор Вагнер') - self.assertEqual(unicode(stack[1].subject[Oid('CN')]),u'DigiCert High Assurance EV CA-1') - def test_certstack2(self): - stack=StackOfX509() - stack.append(X509(self.cert1)) - stack.append(X509(self.ca_cert)) - c=stack[1] - stack[1]=X509(self.digicert_cert) - self.assertEqual(len(stack),2) - self.assertEqual(unicode(stack[1].subject[Oid('CN')]),u'DigiCert High Assurance EV CA-1') - with self.assertRaises(IndexError): - stack[-1]=c - with self.assertRaises(IndexError): - stack[3]=c - with self.assertRaises(TypeError): - stack[0]=self.cert1 - with self.assertRaises(TypeError): - stack.append(self.cert1) - def test_certstack3(self): - l=[] - l.append(X509(self.cert1)) - self.assertEqual(unicode(l[0].subject[Oid('CN')]),u'Виктор Вагнер') - l.append(X509(self.ca_cert)) - l.append(X509(self.digicert_cert)) - stack=StackOfX509(certs=l) - stack2=StackOfX509(ptr=stack.ptr,disposable=False) - with self.assertRaises(ValueError): - stack3=StackOfX509(ptr=stack.ptr,certs=l) - with self.assertRaises(ValueError): - stack2[1]=l[0] - with self.assertRaises(ValueError): - stack2.append(l[0]) + def test_readpubkey(self): + c=X509(self.cert1) + p=c.pubkey + self.assertEqual(p.exportpub(),self.pubkey1) + def test_pem(self): + c=X509(self.cert1) + self.assertEqual(c.pem(),self.cert1) + def test_subject(self): + c=X509(self.cert1) + self.assertEqual(unicode(c.subject),u'C=RU,ST=Москва,L=Москва,O=Частное лицо,CN=Виктор Вагнер') + def test_subject_str(self): + c=X509(self.cert1) + self.assertEqual(str(c.subject),b'C=RU,ST=\\D0\\9C\\D0\\BE\\D1\\81\\D0\\BA\\D0\\B2\\D0\\B0,L=\\D0\\9C\\D0\\BE\\D1\\81\\D0\\BA\\D0\\B2\\D0\\B0,O=\\D0\\A7\\D0\\B0\\D1\\81\\D1\\82\\D0\\BD\\D0\\BE\\D0\\B5 \\D0\\BB\\D0\\B8\\D1\\86\\D0\\BE,CN=\\D0\\92\\D0\\B8\\D0\\BA\\D1\\82\\D0\\BE\\D1\\80 \\D0\\92\\D0\\B0\\D0\\B3\\D0\\BD\\D0\\B5\\D1\\80') + def test_subject_len(self): + c=X509(self.cert1) + self.assertEqual(len(c.subject),5) + def test_issuer(self): + c=X509(self.cert1) + self.assertEqual(unicode(c.issuer),u'C=RU,ST=Москва,O=Удостоверяющий центр,CN=Виктор Вагнер,emailAddress=vitus@wagner.pp.ru') + def test_subjectfields(self): + c=X509(self.cert1) + self.assertEqual(c.subject[Oid("C")],"RU") + with self.assertRaises(TypeError): + x=c.subject["CN"] + self.assertEqual(c.subject[Oid("L")],u'\u041c\u043e\u0441\u043a\u0432\u0430') + def test_subjectmodify(self): + c=X509(self.cert1) + with self.assertRaises(ValueError): + c.subject[Oid("CN")]=u'Foo' + with self.assertRaises(ValueError): + del c.subject[Oid('CN')] + def test_subjectbadsubfield(self): + c=X509(self.cert1) + with self.assertRaises(KeyError): + x=c.subject[Oid("streetAddress")] + def test_subjectfieldindex(self): + c=X509(self.cert1) + self.assertEqual(repr(c.subject[0]),repr((Oid('C'),u'RU'))) + def test_subjectbadindex(self): + c=X509(self.cert1) + with self.assertRaises(IndexError): + x=c.subject[11] + with self.assertRaises(IndexError): + x=c.subject[-1] + def test_notBefore(self): + c=X509(self.cert1) + self.assertEqual(c.startDate,datetime.datetime(2014,10,26,19,07,17,0,utc)) + def test_notAfter(self): + c=X509(self.cert1) + self.assertEqual(c.endDate,datetime.datetime(2024,10,23,19,7,17,0,utc)) + def test_subjectHash(self): + c=X509(self.cert1) + self.assertEqual(hash(c.subject),0x1f3ed722) + def test_issuerHash(self): + c=X509(self.cert1) + self.assertEqual(hash(c.issuer),0x7d3ea8c3) + def test_namecomp(self): + c=X509(self.cert1) + ca=X509(self.ca_cert) + self.assertEqual(c.issuer,ca.subject) + self.assertNotEqual(c.subject,c.issuer) + self.assertEqual(ca.issuer,ca.subject) + def test_serial(self): + c=X509(self.cert1) + self.assertEqual(c.serial,0xDF448E69DADC927CL) + def test_version(self): + c=X509(self.cert1) + self.assertEqual(c.version,3) + def test_ca_cert(self): + ca=X509(self.ca_cert) + self.assertTrue(ca.check_ca()) + notca=X509(self.cert1) + self.assertFalse(notca.check_ca()) + def test_extension_count(self): + cert=X509(self.cert1) + self.assertTrue(len(cert.extensions),4) + ca_cert=X509(self.ca_cert) + self.assertEqual(len(ca_cert.extensions),3) + def test_extension_outofrange(self): + cert=X509(self.cert1) + with self.assertRaises(IndexError): + cert.extensions[4] + with self.assertRaises(IndexError): + cert.extensions[-1] + def test_extension_oid(self): + cert=X509(self.cert1) + ext=cert.extensions[0] + ext_id=ext.oid + self.assertTrue(isinstance(ext_id,Oid)) + self.assertEqual(ext_id,Oid('basicConstraints')) + def test_extension_text(self): + cert=X509(self.cert1) + ext=cert.extensions[0] + self.assertEqual(str(ext),'CA:FALSE') + self.assertEqual(unicode(ext),u'CA:FALSE') + def test_extenson_find(self): + cert=X509(self.cert1) + exts=cert.extensions.find(Oid('subjectAltName')) + self.assertEqual(len(exts),1) + self.assertEqual(exts[0].oid,Oid('subjectAltName')) + def test_extension_bad_find(self): + cert=X509(self.cert1) + with self.assertRaises(TypeError): + exts=cert.extensions.find('subjectAltName') + def test_extenson_critical(self): + cert=X509(self.digicert_cert) + crit_exts=cert.extensions.find_critical() + self.assertEqual(len(crit_exts),2) + other_exts=cert.extensions.find_critical(False) + self.assertEqual(len(crit_exts)+len(other_exts),len(cert.extensions)) + self.assertEqual(crit_exts[0].critical,True) + self.assertEqual(other_exts[0].critical,False) + def test_verify_by_key(self): + ca=X509(self.ca_cert) + pubkey=ca.pubkey + self.assertTrue(ca.verify(key=pubkey)) + c=X509(self.cert1) + pk2=c.pubkey + self.assertFalse(c.verify(key=pk2)) + self.assertTrue(c.verify(key=pubkey)) + def test_verify_self_singed(self): + ca=X509(self.ca_cert) + self.assertTrue(ca.verify()) + def test_default_filestore(self): + store=X509Store(default=True) + c1=X509(self.cert1) + # Cert signed by our CA shouldn't be successfully verified + # by default CA store + self.assertFalse(c1.verify(store)) + # but cert, downloaded from some commercial CA - should. + c2=X509(self.digicert_cert) + self.assertTrue(c2.verify(store)) + def test_verify_by_filestore(self): + trusted=NamedTemporaryFile() + trusted.write(self.ca_cert) + trusted.flush() + goodcert=X509(self.cert1) + badcert=X509(self.cert1[0:-30]+"GG"+self.cert1[-28:]) + gitcert=X509(self.digicert_cert) + store=X509Store(file=trusted.name) + # We should successfuly verify certificate signed by our CA cert + self.assertTrue(goodcert.verify(store)) + # We should reject corrupted certificate + self.assertFalse(badcert.verify(store)) + # And if we specify explicitely certificate file, certificate, + # signed by some commercial CA should be rejected too + self.assertFalse(gitcert.verify(store)) + trusted.close() + def test_verify_by_dirstore(self): + pass + def test_certstack1(self): + l=[] + l.append(X509(self.cert1)) + self.assertEqual(unicode(l[0].subject[Oid('CN')]),u'Виктор Вагнер') + l.append(X509(self.ca_cert)) + l.append(X509(self.digicert_cert)) + stack=StackOfX509(certs=l) + self.assertEqual(len(stack),3) + self.assertTrue(isinstance(stack[1],X509)) + self.assertEqual(unicode(stack[0].subject[Oid('CN')]),u'Виктор Вагнер') + with self.assertRaises(IndexError): + c=stack[-1] + with self.assertRaises(IndexError): + c=stack[3] + del stack[1] + self.assertEqual(len(stack),2) + self.assertEqual(unicode(stack[0].subject[Oid('CN')]),u'Виктор Вагнер') + self.assertEqual(unicode(stack[1].subject[Oid('CN')]),u'DigiCert High Assurance EV CA-1') + def test_certstack2(self): + stack=StackOfX509() + stack.append(X509(self.cert1)) + stack.append(X509(self.ca_cert)) + c=stack[1] + stack[1]=X509(self.digicert_cert) + self.assertEqual(len(stack),2) + self.assertEqual(unicode(stack[1].subject[Oid('CN')]),u'DigiCert High Assurance EV CA-1') + with self.assertRaises(IndexError): + stack[-1]=c + with self.assertRaises(IndexError): + stack[3]=c + with self.assertRaises(TypeError): + stack[0]=self.cert1 + with self.assertRaises(TypeError): + stack.append(self.cert1) + def test_certstack3(self): + l=[] + l.append(X509(self.cert1)) + self.assertEqual(unicode(l[0].subject[Oid('CN')]),u'Виктор Вагнер') + l.append(X509(self.ca_cert)) + l.append(X509(self.digicert_cert)) + stack=StackOfX509(certs=l) + stack2=StackOfX509(ptr=stack.ptr,disposable=False) + with self.assertRaises(ValueError): + stack3=StackOfX509(ptr=stack.ptr,certs=l) + with self.assertRaises(ValueError): + stack2[1]=l[0] + with self.assertRaises(ValueError): + stack2.append(l[0]) if __name__ == '__main__': - unittest.main() + unittest.main()