]> www.wagner.pp.ru Git - oss/ctypescrypto.git/blob - ctypescrypto/cms.py
1e533875ded29e44f0604b7b95fffc8dca7a6f87
[oss/ctypescrypto.git] / ctypescrypto / cms.py
1 """
2 Implements operations with CMS EnvelopedData and SignedData messages
3
4 Contains function CMS() which parses CMS message and creates either
5 EnvelopedData or SignedData objects (EncryptedData and CompressedData
6 can be easily added, because OpenSSL contain nessesary function)
7
8 Each of these objects contains create() static method which is used to
9 create it from raw data and neccessary certificates.
10
11
12 """
13 from ctypes import c_int, c_void_p, c_char_p, c_int, c_uint, c_size_t, POINTER
14 from ctypescrypto.exception import LibCryptoError
15 from ctypescrypto import libcrypto
16 from ctypescrypto.bio import Membio
17 from ctypescrypto.oid import Oid
18 from ctypescrypto.x509 import StackOfX509
19
20 class CMSError(LibCryptoError):
21     """
22     Exception which is raised when error occurs
23     """
24     pass
25
26 class Flags:
27     """
28     Constants for flags passed to the CMS methods.
29     Can be OR-ed together
30     """
31     TEXT = 1
32     NOCERTS = 2
33     NO_CONTENT_VERIFY = 4
34     NO_ATTR_VERIFY = 8
35     NO_SIGS = NO_CONTENT_VERIFY | NO_ATTR_VERIFY
36     NOINTERN = 0x10
37     NO_SIGNER_CERT_VERIFY = 0x20
38     NO_VERIFY = 0x20
39     DETACHED = 0x40
40     BINARY = 0x80
41     NOATTR = 0x100
42     NOSMIMECAP = 0x200
43     NOOLDMIMETYPE = 0x400
44     CRLFEOL = 0x800
45     STREAM = 0x1000
46     NOCRL = 0x2000
47     PARTIAL = 0x4000
48     REUSE_DIGEST = 0x8000
49     USE_KEYID = 0x10000
50     DEBUG_DECRYPT = 0x20000
51
52 def CMS(data, format="PEM"):
53     """
54     Parses CMS data and returns either SignedData or EnvelopedData
55     object
56     """
57     bio = Membio(data)
58     if format == "PEM":
59         ptr = libcrypto.PEM_read_bio_CMS(bio.bio, None, None, None)
60     else:
61         ptr = libcrypto.d2i_CMS_bio(bio.bio, None)
62     typeoid = Oid(libcrypto.OBJ_obj2nid(libcrypto.CMS_get0_type(ptr)))
63     if typeoid.shortname() == "pkcs7-signedData":
64         return SignedData(ptr)
65     elif typeoid.shortname() == "pkcs7-envelopedData":
66         return EnvelopedData(ptr)
67     elif typeoid.shortname() == "pkcs7-encryptedData":
68         return EncryptedData(ptr)
69     else:
70         raise NotImplementedError("cannot handle "+typeoid.shortname())
71
72 class CMSBase(object):
73     """
74     Common ancessor for all CMS types.
75     Implements serializatio/deserialization
76     """
77     def __init__(self, ptr=None):
78         self.ptr = ptr
79     def __str__(self):
80         """
81         Serialize in DER format
82         """
83         bio = Membio()
84         if not libcrypto.i2d_CMS_bio(bio.bio, self.ptr):
85             raise CMSError("writing CMS to PEM")
86         return str(bio)
87
88     def pem(self):
89         """
90         Serialize in PEM format
91         """
92         bio = Membio()
93         if not libcrypto.PEM_write_bio_CMS(bio.bio, self.ptr):
94             raise CMSError("writing CMS to PEM")
95         return str(bio)
96
97
98 #pylint: disable=R0921
99 class SignedData(CMSBase):
100     """
101     Represents signed message (signeddata CMS type)
102     """
103     @staticmethod
104     def create(data, cert, pkey, flags=Flags.BINARY, certs=None):
105         """
106             Creates SignedData message by signing data with pkey and
107             certificate.
108
109             @param data - data to sign
110             @param cert - signer's certificate
111             @param pkey - pkey object with private key to sign
112             @param flags - OReed combination of Flags constants
113             @param certs - list of X509 objects to include into CMS
114         """
115         if not pkey.cansign:
116             raise ValueError("Specified keypair has no private part")
117         if cert.pubkey != pkey:
118             raise ValueError("Certificate doesn't match public key")
119         bio = Membio(data)
120         if certs is not None and len(certs) > 0:
121             certstack = StackOfX509(certs)
122         else:
123             certstack = None
124         ptr = libcrypto.CMS_sign(cert.cert, pkey.ptr, certstack, bio.bio, flags)
125         if ptr is None:
126             raise CMSError("signing message")
127         return SignedData(ptr)
128     def sign(self, cert, pkey, digest_type=None, data=None, flags=Flags.BINARY):
129         """
130             Adds another signer to already signed message
131             @param cert - signer's certificate
132             @param pkey - signer's private key
133             @param digest_type - message digest to use as DigestType object
134                 (if None - default for key would be used)
135             @param data - data to sign (if detached and
136                     Flags.REUSE_DIGEST is not specified)
137             @param flags - ORed combination of Flags consants
138         """
139         if not pkey.cansign:
140             raise ValueError("Specified keypair has no private part")
141         if cert.pubkey != pkey:
142             raise ValueError("Certificate doesn't match public key")
143         if libcrypto.CMS_add1_signer(self.ptr, cert.cert, pkey.ptr,
144                                           digest_type.digest, flags) is None:
145             raise CMSError("adding signer")
146         if flags & Flags.REUSE_DIGEST == 0:
147             if data is not None:
148                 bio = Membio(data)
149                 biodata = bio.bio
150             else:
151                 biodata = None
152             res = libcrypto.CMS_final(self.ptr, biodata, None, flags)
153             if res <= 0:
154                 raise CMSError("Cannot finalize CMS")
155     def verify(self, store, flags, data=None, certs=None):
156         """
157         Verifies signature under CMS message using trusted cert store
158
159         @param store -  X509Store object with trusted certs
160         @param flags - OR-ed combination of flag consants
161         @param data - message data, if messge has detached signature
162         param certs - list of certificates to use during verification
163                 If Flags.NOINTERN is specified, these are only
164                 sertificates to search for signing certificates
165         @returns True if signature valid, False otherwise
166         """
167         bio = None
168         if data != None:
169             bio_obj = Membio(data)
170             bio = bio_obj.bio
171         if certs is not None and len(certs) > 0:
172             certstack = StackOfX509(certs)
173         else:
174             certstack = None
175         res = libcrypto.CMS_verify(self.ptr, certstack, store.store, bio,
176                                    None, flags)
177         return res > 0
178
179     @property
180     def signers(self):
181         """
182         Return list of signer's certificates
183         """
184         signerlist = libcrypto.CMS_get0_signers(self.ptr)
185         if signerlist is None:
186             raise CMSError("Cannot get signers")
187         return StackOfX509(ptr=signerlist, disposable=False)
188
189     @property
190     def data(self):
191         """
192         Returns signed data if present in the message
193         """
194         bio = Membio()
195         if not libcrypto.CMS_verify(self.ptr, None, None, None, bio.bio,
196                                     Flags.NO_VERIFY):
197             raise CMSError("extract data")
198         return str(bio)
199
200     def addcert(self, cert):
201         """
202         Adds a certificate (probably intermediate CA) to the SignedData
203         structure
204         """
205         if libcrypto.CMS_add1_cert(self.ptr, cert.cert) <= 0:
206             raise CMSError("Cannot add cert")
207     def addcrl(self, crl):
208         """
209         Adds a CRL to the signed data structure
210         """
211         raise NotImplementedError
212     @property
213     def certs(self):
214         """
215         List of the certificates contained in the structure
216         """
217         certstack = libcrypto.CMS_get1_certs(self.ptr)
218         if certstack is None:
219             raise CMSError("getting certs")
220         return StackOfX509(ptr=certstack, disposable=True)
221     @property
222     def crls(self):
223         """
224         List of the CRLs contained in the structure
225         """
226         raise NotImplementedError
227
228 class EnvelopedData(CMSBase):
229     """
230     Represents EnvelopedData CMS, i.e. message encrypted with session
231     keys, encrypted with recipient's public keys
232     """
233     @staticmethod
234     def create(recipients, data, cipher, flags=0):
235         """
236         Creates and encrypts message
237         @param recipients - list of X509 objects
238         @param data - contents of the message
239         @param cipher - CipherType object
240         @param flags - flag
241         """
242         recp = StackOfX509(recipients)
243         bio = Membio(data)
244         cms_ptr = libcrypto.CMS_encrypt(recp.ptr, bio.bio, cipher.cipher_type,
245                                         flags)
246         if cms_ptr is None:
247             raise CMSError("encrypt EnvelopedData")
248         return EnvelopedData(cms_ptr)
249
250     def decrypt(self, pkey, cert, flags=0):
251         """
252         Decrypts message
253         @param pkey - private key to decrypt
254         @param cert - certificate of this private key (to find
255             neccessary RecipientInfo
256         @param flags - flags
257         @returns - decrypted data
258         """
259         if not pkey.cansign:
260             raise ValueError("Specified keypair has no private part")
261         if pkey != cert.pubkey:
262             raise ValueError("Certificate doesn't match private key")
263         bio = Membio()
264         res = libcrypto.CMS_decrypt(self.ptr, pkey.ptr, cert.ccert, None,
265                                     bio.bio, flags)
266         if res <= 0:
267             raise CMSError("decrypting CMS")
268         return str(bio)
269
270 class EncryptedData(CMSBase):
271     """
272     Represents encrypted data CMS structure, i.e. encrypted
273     with symmetric key, shared by sender and recepient.
274     """
275     @staticmethod
276     def create(data, cipher, key, flags=0):
277         """
278         Creates an EncryptedData message.
279         @param data data to encrypt
280         @param cipher cipher.CipherType object represening required
281                 cipher type
282         @param key - byte array used as simmetic key
283         @param flags - OR-ed combination of Flags constant
284         """
285         bio = Membio(data)
286         ptr = libcrypto.CMS_EncryptedData_encrypt(bio.bio, cipher.cipher_type,
287                                                   key, len(key), flags)
288         if ptr is None:
289             raise CMSError("encrypt data")
290         return EncryptedData(ptr)
291
292     def decrypt(self, key, flags=0):
293         """
294         Decrypts encrypted data message
295         @param key - symmetic key to decrypt
296         @param flags - OR-ed combination of Flags constant
297         """
298         bio = Membio()
299         if libcrypto.CMS_EncryptedData_decrypt(self.ptr, key, len(key), None,
300                                                bio.bio, flags) <= 0:
301             raise CMSError("decrypt data")
302         return str(bio)
303
304 __all__ = ['CMS', 'CMSError', 'Flags', 'SignedData', 'EnvelopedData',
305            'EncryptedData']
306
307 libcrypto.CMS_add1_cert.restype = c_int
308 libcrypto.CMS_add1_cert.argtypes = (c_void_p, c_void_p)
309 libcrypto.CMS_decrypt.restype = c_int
310 libcrypto.CMS_decrypt.argtypes = (c_void_p, c_void_p, c_void_p,
311                                   c_void_p, c_void_p, c_uint)
312 libcrypto.CMS_encrypt.restype = c_void_p
313 libcrypto.CMS_encrypt.argtypes = (c_void_p, c_void_p, c_void_p, c_uint)
314 libcrypto.CMS_EncryptedData_decrypt.restype = c_int
315 libcrypto.CMS_EncryptedData_decrypt.argtypes = (c_void_p, c_char_p, c_size_t,
316                                                 c_void_p, c_void_p, c_uint)
317 libcrypto.CMS_EncryptedData_encrypt.restype = c_void_p
318 libcrypto.CMS_EncryptedData_encrypt.argtypes = (c_void_p, c_void_p, c_char_p,
319                                                 c_size_t, c_uint)
320 libcrypto.CMS_final.restype = c_int
321 libcrypto.CMS_final.argtypes = (c_void_p, c_void_p, c_void_p, c_uint)
322 libcrypto.CMS_get0_signers.restype = c_void_p
323 libcrypto.CMS_get0_signers.argtypes = (c_void_p, )
324 libcrypto.CMS_get1_certs.restype = c_void_p
325 libcrypto.CMS_get1_certs.argtypes = (c_void_p, )
326 libcrypto.CMS_sign.restype = c_void_p
327 libcrypto.CMS_sign.argtypes = (c_void_p, c_void_p, c_void_p, c_void_p, c_uint)
328 libcrypto.CMS_add1_signer.restype = c_void_p
329 libcrypto.CMS_add1_signer.argtypes = (c_void_p, c_void_p, c_void_p,
330                                            c_void_p, c_uint)
331 libcrypto.CMS_verify.restype = c_int
332 libcrypto.CMS_verify.argtypes = (c_void_p, c_void_p, c_void_p, c_void_p,
333                                  c_void_p, c_int)
334 libcrypto.d2i_CMS_bio.restype = c_void_p
335 libcrypto.d2i_CMS_bio.argtypes = (c_void_p, POINTER(c_void_p))
336 libcrypto.i2d_CMS_bio.restype = c_int
337 libcrypto.i2d_CMS_bio.argtypes = (c_void_p, c_void_p)
338 libcrypto.PEM_read_bio_CMS.restype = c_void_p
339 libcrypto.PEM_read_bio_CMS.argtypes = (c_void_p, POINTER(c_void_p),
340                                        c_void_p, c_void_p)
341 libcrypto.PEM_write_bio_CMS.restype = c_int
342 libcrypto.PEM_write_bio_CMS.argtypes = (c_void_p, c_void_p)