Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Add Support for setting SSEA name #296

Merged
merged 10 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,13 @@ There are four configuration properties to configure retry strategy exists.
integer overflow issues during delay calculation).
Default is `3`.

### AWS S3 server side encryption properties

- `aws.s3.sse.algorithm` - The name of the Server-side encryption algorithm to use for uploads. If unset the default SSE-S3 is used.
- To use SSE-S3 set to `AES256` or leave empty
- To use SSE-KMS set to `aws:kms`
- To use DSSE-KMS set to `aws:kms:dsse`

## Development

### Developing together with Commons library
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/io/aiven/kafka/connect/s3/S3OutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import org.slf4j.Logger;
Expand All @@ -52,17 +53,28 @@ public class S3OutputStream extends OutputStream {

private final int partSize;

private final String serverSideEncryptionAlgorithm;

private boolean closed = false;

public S3OutputStream(final String bucketName,
final String key,
final int partSize,
final AmazonS3 client) {
this(bucketName, key, partSize, client, null);
}

public S3OutputStream(final String bucketName,
final String key,
final int partSize,
final AmazonS3 client,
final String serverSideEncryptionAlgorithm) {
this.bucketName = bucketName;
this.key = key;
this.client = client;
this.partSize = partSize;
this.byteBuffer = ByteBuffer.allocate(partSize);
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
}

@Override
Expand Down Expand Up @@ -93,11 +105,22 @@ public void write(final byte[] b, final int off, final int len) throws IOExcepti
private MultipartUpload newMultipartUpload() throws IOException {
logger.debug("Create new multipart upload request");
final var initialRequest = new InitiateMultipartUploadRequest(bucketName, key);
initialRequest.setObjectMetadata(this.buildObjectMetadata());
final var initiateResult = client.initiateMultipartUpload(initialRequest);
logger.debug("Upload ID: {}", initiateResult.getUploadId());
return new MultipartUpload(initiateResult.getUploadId());
}

private ObjectMetadata buildObjectMetadata() {
final ObjectMetadata metadata = new ObjectMetadata();

if (this.serverSideEncryptionAlgorithm != null) {
metadata.setSSEAlgorithm(this.serverSideEncryptionAlgorithm);
}

return metadata;
}

@Override
public void close() throws IOException {
if (closed) {
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@ public String version() {

private OutputStream newStreamFor(final String filename, final SinkRecord record) {
final var fullKey = config.usesFileNameTemplate() ? filename : oldFullKey(record);
return new S3OutputStream(config.getAwsS3BucketName(), fullKey, config.getAwsS3PartSize(), s3Client);
return new S3OutputStream(
config.getAwsS3BucketName(),
fullKey, config.getAwsS3PartSize(),
s3Client,
config.getServerSideEncryptionAlgorithmName()
);
}

private EndpointConfiguration newEndpointConfiguration(final S3SinkConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class S3SinkConfig extends AivenCommonConfig {
public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id";
public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key";
public static final String AWS_S3_BUCKET_NAME_CONFIG = "aws.s3.bucket.name";
public static final String AWS_S3_SSE_ALGORITHM_CONFIG = "aws.s3.sse.algorithm";
public static final String AWS_S3_ENDPOINT_CONFIG = "aws.s3.endpoint";
public static final String AWS_S3_REGION_CONFIG = "aws.s3.region";
public static final String AWS_S3_PART_SIZE = "aws.s3.part.size.bytes";
Expand Down Expand Up @@ -228,6 +229,21 @@ private static void addAwsConfigGroup(final ConfigDef configDef) {
AWS_S3_BUCKET_NAME_CONFIG
);

// AWS S3 Server Side Encryption Algorithm configuration
// Example values: 'AES256' for S3-managed keys, 'aws:kms' for AWS KMS-managed keys
configDef.define(
AWS_S3_SSE_ALGORITHM_CONFIG,
Type.STRING,
null,
new ConfigDef.NonEmptyString(),
Importance.MEDIUM,
"AWS S3 Server Side Encryption Algorithm. Example values: 'AES256', 'aws:kms'.",
GROUP_AWS,
awsGroupCounter++,
ConfigDef.Width.NONE,
AWS_S3_SSE_ALGORITHM_CONFIG
);

configDef.define(
AWS_S3_ENDPOINT_CONFIG,
Type.STRING,
Expand Down Expand Up @@ -781,6 +797,10 @@ public String getAwsS3BucketName() {
: getString(AWS_S3_BUCKET);
}

public String getServerSideEncryptionAlgorithmName() {
return getString(AWS_S3_SSE_ALGORITHM_CONFIG);
}

public String getAwsS3Prefix() {
return Objects.nonNull(getString(AWS_S3_PREFIX_CONFIG))
? getString(AWS_S3_PREFIX_CONFIG)
Expand Down
23 changes: 23 additions & 0 deletions src/test/java/io/aiven/kafka/connect/s3/S3OutputStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class S3OutputStreamTest {

static final String UPLOAD_ID = "some_upload_id";

static final String SSEA_NAME = "AES256";

@Mock
AmazonS3 mockedAmazonS3;

Expand Down Expand Up @@ -108,6 +110,7 @@ void sendsInitialAndCompletionUploadRequests() throws Exception {

assertThat(initiateMultipartUploadRequest.getBucketName()).isEqualTo(BUCKET_NAME);
assertThat(initiateMultipartUploadRequest.getKey()).isEqualTo(FILE_KEY);
assertThat(initiateMultipartUploadRequest.getObjectMetadata().getContentLength()).isZero();

assertCompleteMultipartUploadRequest(
completeMultipartUploadRequestCaptor.getValue(),
Expand Down Expand Up @@ -137,6 +140,26 @@ void sendsAbortForAnyExceptionWhileWriting() {
assertAbortMultipartUploadRequest(abortMultipartUploadRequestCaptor.getValue());
}

@Test
void sendsServerSideEncryptionAlgorithmNameWhenPassed() throws Exception {
when(mockedAmazonS3.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)))
.thenReturn(newInitiateMultipartUploadResult());
when(mockedAmazonS3.uploadPart(any(UploadPartRequest.class)))
.thenReturn(newUploadPartResult(1, "SOME_ETAG"));
when(mockedAmazonS3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class)))
.thenReturn(new CompleteMultipartUploadResult());

try (final var out = new S3OutputStream(BUCKET_NAME, FILE_KEY, 100, mockedAmazonS3, SSEA_NAME)) {
out.write(1);
}

verify(mockedAmazonS3).initiateMultipartUpload(initiateMultipartUploadRequestCaptor.capture());

final var initiateMultipartUploadRequest = initiateMultipartUploadRequestCaptor.getValue();

assertThat(initiateMultipartUploadRequest.getObjectMetadata().getSSEAlgorithm()).isEqualTo(SSEA_NAME);
}

@Test
void sendsAbortForAnyExceptionWhenClose() throws Exception {
when(mockedAmazonS3.initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)))
Expand Down
Loading