Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Add Support for setting SSEA name #296

Merged
merged 10 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,13 @@ There are four configuration properties to configure retry strategy exists.
integer overflow issues during delay calculation).
Default is `3`.

### AWS S3 server side encryption properties

- `aws.s3.sse.algorithm` - The name of the Server-side encryption algorithm to use for uploads. If unset the default SSE-S3 is used.
- To use SSE-S3 set to `AES256` or leave empty
- To use SSE-KMS set to `aws:kms`
- To use DSSE-KMS set to `aws:kms:dsse`

## Development

### Developing together with Commons library
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import org.slf4j.Logger;
Expand All @@ -52,17 +53,28 @@ public class S3OutputStream extends OutputStream {

private final int partSize;

private final String serverSideEncryptionAlgorithm;

private boolean closed = false;

public S3OutputStream(final String bucketName,
final String key,
final int partSize,
final AmazonS3 client) {
this(bucketName, key, partSize, client, null);
}

public S3OutputStream(final String bucketName,
final String key,
final int partSize,
final AmazonS3 client,
final String serverSideEncryptionAlgorithm) {
this.bucketName = bucketName;
this.key = key;
this.client = client;
this.partSize = partSize;
this.byteBuffer = ByteBuffer.allocate(partSize);
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
}

@Override
Expand Down Expand Up @@ -93,11 +105,22 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
private MultipartUpload newMultipartUpload() throws IOException {
logger.debug("Create new multipart upload request");
final var initialRequest = new InitiateMultipartUploadRequest(bucketName, key);
initialRequest.setObjectMetadata(this.buildObjectMetadata());
final var initiateResult = client.initiateMultipartUpload(initialRequest);
logger.debug("Upload ID: {}", initiateResult.getUploadId());
return new MultipartUpload(initiateResult.getUploadId());
}

private ObjectMetadata buildObjectMetadata() {
final ObjectMetadata metadata = new ObjectMetadata();

if (this.serverSideEncryptionAlgorithm != null) {
metadata.setSSEAlgorithm(this.serverSideEncryptionAlgorithm);
}

return metadata;
}

@Override
public void close() throws IOException {
if (closed) {
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,13 @@ public String version() {

private OutputStream newStreamFor(final String filename, final SinkRecord record) {
final var fullKey = config.usesFileNameTemplate() ? filename : oldFullKey(record);
return new S3OutputStream(config.getAwsS3BucketName(), fullKey, config.getAwsS3PartSize(), s3Client);
return new S3OutputStream(
config.getAwsS3BucketName(),
fullKey,
config.getAwsS3PartSize(),
s3Client,
config.getServerSideEncryptionAlgorithmName()
);
}

private EndpointConfiguration newEndpointConfiguration(final S3SinkConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class S3SinkConfig extends AivenCommonConfig {
public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id";
public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key";
public static final String AWS_S3_BUCKET_NAME_CONFIG = "aws.s3.bucket.name";
public static final String AWS_S3_SSE_ALGORITHM_CONFIG = "aws.s3.sse.algorithm";
public static final String AWS_S3_ENDPOINT_CONFIG = "aws.s3.endpoint";
public static final String AWS_S3_REGION_CONFIG = "aws.s3.region";
public static final String AWS_S3_PART_SIZE = "aws.s3.part.size.bytes";
Expand Down Expand Up @@ -228,6 +229,21 @@ private static void addAwsConfigGroup(final ConfigDef configDef) {
AWS_S3_BUCKET_NAME_CONFIG
);

// AWS S3 Server Side Encryption Algorithm configuration
// Example values: 'AES256' for S3-managed keys, 'aws:kms' for AWS KMS-managed keys
configDef.define(
AWS_S3_SSE_ALGORITHM_CONFIG,
Type.STRING,
null,
new ConfigDef.NonEmptyString(),
Importance.MEDIUM,
"AWS S3 Server Side Encryption Algorithm. Example values: 'AES256', 'aws:kms'.",
GROUP_AWS,
awsGroupCounter++,
ConfigDef.Width.NONE,
AWS_S3_SSE_ALGORITHM_CONFIG
);

configDef.define(
AWS_S3_ENDPOINT_CONFIG,
Type.STRING,
Expand Down Expand Up @@ -781,6 +797,10 @@ public String getAwsS3BucketName() {
: getString(AWS_S3_BUCKET);
}

public String getServerSideEncryptionAlgorithmName() {
return getString(AWS_S3_SSE_ALGORITHM_CONFIG);
}

public String getAwsS3Prefix() {
return Objects.nonNull(getString(AWS_S3_PREFIX_CONFIG))
? getString(AWS_S3_PREFIX_CONFIG)
Expand Down
23 changes: 23 additions & 0 deletions src/test/java/io/aiven/kafka/connect/s3/S3OutputStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class S3OutputStreamTest {

static final String UPLOAD_ID = "some_upload_id";

static final String SSEA_NAME = "AES256";

@Mock
AmazonS3 mockedAmazonS3;

Expand Down Expand Up @@ -108,6 +110,7 @@ void sendsInitialAndCompletionUploadRequests() throws Exception {

assertThat(initiateMultipartUploadRequest.getBucketName()).isEqualTo(BUCKET_NAME);
assertThat(initiateMultipartUploadRequest.getKey()).isEqualTo(FILE_KEY);
assertThat(initiateMultipartUploadRequest.getObjectMetadata().getContentLength()).isZero();

assertCompleteMultipartUploadRequest(
completeMultipartUploadRequestCaptor.getValue(),
Expand Down Expand Up @@ -137,6 +140,26 @@ void sendsAbortForAnyExceptionWhileWriting() {
assertAbortMultipartUploadRequest(abortMultipartUploadRequestCaptor.getValue());
}

@Test
void sendsServerSideEncryptionAlgorithmNameWhenPassed() throws Exception {
when(mockedAmazonS3.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)))
.thenReturn(newInitiateMultipartUploadResult());
when(mockedAmazonS3.uploadPart(any(UploadPartRequest.class)))
.thenReturn(newUploadPartResult(1, "SOME_ETAG"));
when(mockedAmazonS3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class)))
.thenReturn(new CompleteMultipartUploadResult());

try (final var out = new S3OutputStream(BUCKET_NAME, FILE_KEY, 100, mockedAmazonS3, SSEA_NAME)) {
out.write(1);
}

verify(mockedAmazonS3).initiateMultipartUpload(initiateMultipartUploadRequestCaptor.capture());

final var initiateMultipartUploadRequest = initiateMultipartUploadRequestCaptor.getValue();

assertThat(initiateMultipartUploadRequest.getObjectMetadata().getSSEAlgorithm()).isEqualTo(SSEA_NAME);
}

@Test
void sendsAbortForAnyExceptionWhenClose() throws Exception {
when(mockedAmazonS3.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)))
Expand Down
Loading