From 766de77591d848bb229ec25ab6b19fb902ee7971 Mon Sep 17 00:00:00 2001 From: Victor Wagner Date: Mon, 27 Oct 2014 15:12:37 +0300 Subject: [PATCH] StackOfX509 (untested) --- ctypescrypto/cms.py | 42 +++++++++++++++++----- ctypescrypto/x509.py | 83 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 113 insertions(+), 12 deletions(-) diff --git a/ctypescrypto/cms.py b/ctypescrypto/cms.py index f1a4dbb..2c03898 100644 --- a/ctypescrypto/cms.py +++ b/ctypescrypto/cms.py @@ -97,20 +97,26 @@ class CMSBase: class SignedData(CMSBase): @staticmethod - def create(data,cert,pkey,flags=Flags.BINARY): + def create(data,cert,pkey,flags=Flags.BINARY,certs=[]): """ Creates SignedData message by signing data with pkey and certificate. @param data - data to sign @param pkey - pkey object with private key to sign + @param flags - OReed combination of Flags constants + @param certs - list of X509 objects to include into CMS """ if not pkey.cansign: raise ValueError("Specified keypair has no private part") if cert.pubkey!=pkey: raise ValueError("Certificate doesn't match public key") b=Membio(data) - ptr=libcrypto.CMS_sign(cert.cert,pkey.ptr,None,b.bio,flags) + if certs is not None and len(certs)>0: + certstack=StackOfX509(certs) + else: + certstack=None + ptr=libcrypto.CMS_sign(cert.cert,pkey.ptr,certstack,b.bio,flags) if ptr is None: raise CMSError("signing message") return SignedData(ptr) @@ -142,27 +148,37 @@ class SignedData(CMSBase): res= libcrypto.CMS_final(self.ptr,biodata,None,flags) if res<=0: raise CMSError - def verify(self,store,flags,data=None): + def verify(self,store,flags,data=None,certs=[]): """ Verifies signature under CMS message using trusted cert store @param store - X509Store object with trusted certs @param flags - OR-ed combination of flag consants @param data - message data, if messge has detached signature + param certs - list of certificates to use during verification + If Flags.NOINTERN is specified, these are only + sertificates to search for signing certificates @returns True if signature valid, False otherwise """ bio=None if data!=None: b=Membio(data) bio=b.bio - res=libcrypto.CMS_verify(self.ptr,None,store.store,bio,None,flags) + if certs is not None and len(certs)>0: + certstack=StackOfX509(certs) + else: + certstack=None + res=libcrypto.CMS_verify(self.ptr,certstack,store.store,bio,None,flags) return res>0 @property def signers(self,store=None): """ Return list of signer's certificates """ - raise NotImplementedError + p=libcrypto.CMS_get0_signers(self.ptr) + if p is None: + raise CMSError + return StackOfX509(ptr=p,disposable=False) @property def data(self): """ @@ -177,7 +193,8 @@ class SignedData(CMSBase): Adds a certificate (probably intermediate CA) to the SignedData structure """ - raise NotImplementedError + if libcrypto.CMS_add1_cert(self.ptr,cert.cert)<=0: + raise CMSError("adding cert") def addcrl(self,crl): """ Adds a CRL to the signed data structure @@ -188,7 +205,10 @@ class SignedData(CMSBase): """ List of the certificates contained in the structure """ - raise NotImplementedError + p=CMS_get1_certs(self.ptr) + if p is None: + raise CMSError("getting certs") + return StackOfX509(ptr=p,disposable=True) @property def crls(self): """ @@ -206,8 +226,12 @@ class EnvelopedData(CMSBase): @param cipher - CipherType object @param flags - flag """ - # Need to be able to handle OPENSSL stacks - raise NotImplementedError + recp=StackOfX509(recipients) + b=Membio(data) + p=libcrypto.CMS_encrypt(recp.ptr,b.bio,cipher.cipher_type,flags) + if p is None: + raise CMSError("encrypt EnvelopedData") + return EnvelopedData(p) def decrypt(self,pkey,cert,flags=0): """ Decrypts message diff --git a/ctypescrypto/x509.py b/ctypescrypto/x509.py index d254c5a..5c2a50d 100644 --- a/ctypescrypto/x509.py +++ b/ctypescrypto/x509.py @@ -162,13 +162,17 @@ class X509: def pubkey(self): """EVP PKEy object of certificate public key""" return PKey(ptr=libcrypto.X509_get_pubkey(self.cert,False)) - def verify(self,store=None,key=None): + def verify(self,store=None,chain=[],key=None): """ Verify self. Supports verification on both X509 store object or just public issuer key @param store X509Store object. + @param chain - list of X509 objects to add into verification + context.These objects are untrusted, but can be used to + build certificate chain up to trusted object in the store @param key - PKey object - parameters are mutually exclusive. If neither is specified, attempts to verify + parameters stora and key are mutually exclusive. If neither is specified, attempts to verify + itself as self-signed certificate """ if store is not None and key is not None: @@ -177,7 +181,11 @@ class X509: ctx=libcrypto.X509_STORE_CTX_new() if ctx is None: raise X509Error("Error allocating X509_STORE_CTX") - if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,None) < 0: + if chain is not None and len(chain)>0: + ch=StackOfX509(chain) + else: + ch=None + if libcrypto.X509_STORE_CTX_init(ctx,store.store,self.cert,ch) < 0: raise X509Error("Error allocating X509_STORE_CTX") res= libcrypto.X509_verify_cert(ctx) libcrypto.X509_STORE_CTX_free(ctx) @@ -293,6 +301,75 @@ class X509Store: purp_no = purpose if libcrypto.X509_STORE_set_purpose(self.store,purp_no)<=0: raise X509Error("cannot set purpose") + def setdepth(self,depth): + libcrypto.X509_STORE_set_depth(self.store,depth) + def settime(self, time): + """ + Set point in time used to check validity of certificates for + """ + if isinstance(time,datetime.datetime) or isinstance(time,datetime.date): + d=int(time.strftime("%s")) + elif isinstance(time,int): + pass + else: + raise TypeError("datetime.date, datetime.datetime or integer is required as time argument") + raise NotImplementedError +class StackOfX509: + """ + Implements OpenSSL STACK_OF(X509) object. + It looks much like python container types + """ + def __init__(self,certs=None,ptr=None,disposable=True): + """ + Create stack + @param certs - list of X509 objects. If specified, read-write + stack is created and populated by these certificates + @param ptr - pointer to OpenSSL STACK_OF(X509) as returned by + some functions + @param disposable - if True, stack created from object, returned + by function is copy, and can be modified and need to be + freeid. If false, it is just pointer into another + structure i.e. CMS_ContentInfo + """ + if ptr is None: + self.need_free = True + self.ptr=libcrypt.sk_new_null() + if certs is not None: + for crt in certs: + self.append(crt) + elif not certs is None: + raise ValueError("cannot handle certs an ptr simultaneously") + else: + self.need_free = disposable + self.ptr=ptr + def __len__(self): + return libcrypto.sk_num(self.ptr) + def __getitem__(self,index): + if index <0 or index>=len(self): + raise IndexError + p=libcrypto.sk_value(self.ptr,index) + return X509(ptr=libcrypto.X509_dup(p)) + def __putitem__(self,index,value): + if not self.need_free: + raise ValueError("Stack is read-only") + if index <0 or index>=len(self): + raise IndexError + p=libcrypto.sk_set(self.ptr,index,libcrypto.X509_dup(value.cert)) + libcrypto.X509_free(p) + def __delitem__(self,index): + if not self.need_free: + raise ValueError("Stack is read-only") + if index <0 or index>=len(self): + raise IndexError + p=libcrypto.sk_delete(self.ptr,index) + libcrypto.X509_free(p) + def __del__(self): + if self.need_free: + libcrypto.sk_pop_free(self.ptr,libcrypto.X509_free) + def append(self,value): + if not self.need_free: + raise ValueError("Stack is read-only") + libcrypto.sk_push(self.ptr,libcrypto.X509_dup(value.cert)) libcrypto.i2a_ASN1_INTEGER.argtypes=(c_void_p,c_void_p) libcrypto.ASN1_STRING_print_ex.argtypes=(c_void_p,c_void_p,c_long) libcrypto.X509_get_serialNumber.argtypes=(c_void_p,) -- 2.39.2