Skip to content

Commit c296556

Browse files
committed
added CustomSigningSuiteOnlyCMM; refactoring
1 parent 6bfc02f commit c296556

File tree

3 files changed

+124
-56
lines changed

3 files changed

+124
-56
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
"""Example to create a custom crypto material manager class."""
4+
5+
import aws_encryption_sdk
6+
from aws_encryption_sdk import CommitmentPolicy
7+
from aws_encryption_sdk.materials_managers.base import CryptoMaterialsManager
8+
9+
10+
# Custom CMM implementation.
11+
# This CMM only allows encryption/decryption using signing algorithms.
12+
# It wraps an underlying CMM implementation and checks its materials
13+
# to ensure that it is only using signed encryption algorithms.
14+
class CustomSigningSuiteOnlyCMM(CryptoMaterialsManager):
15+
"""Example custom crypto materials manager class."""
16+
17+
def __init__(self, cmm: CryptoMaterialsManager) -> None:
18+
super().__init__()
19+
self.underlying_cmm = cmm
20+
21+
def get_encryption_materials(self, request):
22+
"""Provides encryption materials appropriate for the request for the custom CMM.
23+
24+
:param EncryptionMaterialsRequest request: Request object to provide to a
25+
crypto material manager's `get_encryption_materials` method.
26+
:returns: Encryption materials
27+
:rtype: EncryptionMaterials
28+
"""
29+
materials = self.underlying_cmm.get_encryption_materials(request)
30+
if not materials.algorithm.is_signing():
31+
raise AssertionError(
32+
"Algorithm provided to CustomSigningSuiteOnlyCMM"
33+
+ " is not a supported signing algorithm: " + materials.algorithm
34+
)
35+
return materials
36+
37+
def decrypt_materials(self, request):
38+
"""Provider decryption materials appropriate for the request.
39+
40+
:param DecryptionMaterialsRequest request: Request object to provide to a
41+
crypto material manager's `decrypt_materials` method.
42+
"""
43+
if not request.algorithm.is_signing():
44+
raise AssertionError(
45+
"Algorithm provided to CustomSigningSuiteOnlyCMM"
46+
+ " is not a supported signing algorithm: " + request.algorithm
47+
)
48+
return self.underlying_cmm.decrypt_materials(request)
49+
50+
51+
def encrypt_decrypt_with_cmm(
52+
cmm: CryptoMaterialsManager,
53+
source_plaintext: str
54+
):
55+
"""Encrypts and decrypts a string using a CMM.
56+
57+
:param CryptoMaterialsManager cmm: CMM to use for encryption and decryption
58+
:param bytes source_plaintext: Data to encrypt
59+
"""
60+
# Set up an encryption client with an explicit commitment policy. Note that if you do not explicitly choose a
61+
# commitment policy, REQUIRE_ENCRYPT_REQUIRE_DECRYPT is used by default.
62+
client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT)
63+
64+
# Encrypt the plaintext source data
65+
ciphertext, encryptor_header = client.encrypt(
66+
source=source_plaintext,
67+
materials_manager=cmm
68+
)
69+
70+
# Decrypt the ciphertext
71+
cycled_plaintext, decrypted_header = client.decrypt(
72+
source=ciphertext,
73+
materials_manager=cmm
74+
)
75+
76+
# Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source plaintext
77+
assert cycled_plaintext == source_plaintext
78+
79+
# Verify that the encryption context used in the decrypt operation includes all key pairs from
80+
# the encrypt operation. (The SDK can add pairs, so don't require an exact match.)
81+
#
82+
# In production, always use a meaningful encryption context. In this sample, we omit the
83+
# encryption context (no key pairs).
84+
assert all(
85+
pair in decrypted_header.encryption_context.items() for pair in encryptor_header.encryption_context.items()
86+
)
Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,51 @@
11
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
"""Test suite for encryption and decryption using V3 defualt CMM."""
3+
"""Test suite for encryption and decryption using custom CMM."""
44

55
import botocore.session
66
import pytest
77

8-
from ...src.legacy.v3_default_cmm import encrypt_decrypt_with_v3_default_cmm
8+
import aws_encryption_sdk
9+
from ...src.legacy.custom_cmm_example import encrypt_decrypt_with_cmm, CustomSigningSuiteOnlyCMM
10+
from .v3_default_cmm import V3DefaultCryptoMaterialsManager
911
from .examples_test_utils import get_cmk_arn, static_plaintext
1012

1113
pytestmark = [pytest.mark.examples]
1214

1315

16+
def test_custom_cmm_example():
17+
"""Test method for encryption and decryption using V3 default CMM."""
18+
plaintext = static_plaintext
19+
cmk_arn = get_cmk_arn()
20+
botocore_session = botocore.session.Session()
21+
22+
# Create a KMS master key provider.
23+
kms_kwargs = dict(key_ids=[cmk_arn])
24+
if botocore_session is not None:
25+
kms_kwargs["botocore_session"] = botocore_session
26+
master_key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs)
27+
28+
# Create the V3 default CMM (V3DefaultCryptoMaterialsManager) using the master_key_provider
29+
cmm = CustomSigningSuiteOnlyCMM(master_key_provider=master_key_provider)
30+
31+
encrypt_decrypt_with_cmm(cmm=cmm,
32+
source_plaintext=plaintext)
33+
34+
1435
def test_v3_default_cmm():
15-
"""Test method for encryption and decryption using V3 defualt CMM."""
36+
"""Test method for encryption and decryption using V3 default CMM."""
1637
plaintext = static_plaintext
1738
cmk_arn = get_cmk_arn()
18-
encrypt_decrypt_with_v3_default_cmm(key_arn=cmk_arn,
19-
source_plaintext=plaintext,
20-
botocore_session=botocore.session.Session())
39+
botocore_session = botocore.session.Session()
40+
41+
# Create a KMS master key provider.
42+
kms_kwargs = dict(key_ids=[cmk_arn])
43+
if botocore_session is not None:
44+
kms_kwargs["botocore_session"] = botocore_session
45+
master_key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs)
46+
47+
# Create the V3 default CMM (V3DefaultCryptoMaterialsManager) using the master_key_provider
48+
cmm = V3DefaultCryptoMaterialsManager(master_key_provider=master_key_provider)
49+
50+
encrypt_decrypt_with_cmm(cmm=cmm,
51+
source_plaintext=plaintext)

examples/src/legacy/v3_default_cmm.py renamed to examples/test/legacy/v3_default_cmm.py

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
"""Default crypto material manager class example for ESDK V3."""
3+
"""Copy-paste of the V3 default CMM."""
44
import logging
55

66
import attr
77

8-
import aws_encryption_sdk
98
from aws_encryption_sdk.exceptions import MasterKeyProviderError, SerializationError
109
from aws_encryption_sdk.identifiers import CommitmentPolicy
1110
from aws_encryption_sdk.internal.crypto.authentication import Signer, Verifier
@@ -156,51 +155,3 @@ def decrypt_materials(self, request):
156155
)
157156

158157
return DecryptionMaterials(data_key=data_key, verification_key=verification_key)
159-
160-
161-
def encrypt_decrypt_with_v3_default_cmm(key_arn,
162-
source_plaintext,
163-
botocore_session):
164-
"""Encrypts and decrypts a string using a V3 default CMM.
165-
166-
:param str key_arn: Amazon Resource Name (ARN) of the KMS CMK
167-
:param bytes source_plaintext: Data to encrypt
168-
:param botocore_session: existing botocore session instance
169-
:type botocore_session: botocore.session.Session
170-
"""
171-
# Set up an encryption client with an explicit commitment policy. Note that if you do not explicitly choose a
172-
# commitment policy, REQUIRE_ENCRYPT_REQUIRE_DECRYPT is used by default.
173-
client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT)
174-
175-
# Create a KMS master key provider.
176-
kms_kwargs = dict(key_ids=[key_arn])
177-
if botocore_session is not None:
178-
kms_kwargs["botocore_session"] = botocore_session
179-
master_key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs)
180-
181-
# Create the V3 default CMM (V3DefaultCryptoMaterialsManager) using the master_key_provider
182-
default_cmm = V3DefaultCryptoMaterialsManager(master_key_provider=master_key_provider)
183-
184-
# Encrypt the plaintext source data
185-
ciphertext, encryptor_header = client.encrypt(
186-
source=source_plaintext,
187-
materials_manager=default_cmm
188-
)
189-
190-
# Decrypt the ciphertext
191-
cycled_plaintext, decrypted_header = client.decrypt(
192-
source=ciphertext,
193-
key_provider=master_key_provider
194-
)
195-
196-
# Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source plaintext
197-
assert cycled_plaintext == source_plaintext
198-
199-
# Verify that the encryption context used in the decrypt operation includes all key pairs from
200-
# the encrypt operation. (The SDK can add pairs, so don't require an exact match.)
201-
#
202-
# In production, always use a meaningful encryption context. In this sample, we omit the
203-
# encryption context (no key pairs).
204-
assert all(
205-
pair in decrypted_header.encryption_context.items() for pair in encryptor_header.encryption_context.items()
206-
)

0 commit comments

Comments
 (0)