diff --git a/buildspecs/s3-regression-tests.yml b/buildspecs/s3-regression-tests.yml new file mode 100644 index 000000000000..51ec28445c8e --- /dev/null +++ b/buildspecs/s3-regression-tests.yml @@ -0,0 +1,14 @@ +version: 0.2 + +phases: + build: + commands: + - mvn clean install -P s3-regression-tests -pl :s3-tests -am -T1C $MAVEN_OPTIONS + - echo $MAVEN_OPTIONS + finally: + - mkdir -p codebuild-test-reports + - find ./ -name 'TEST-*.xml' -type f -exec cp {} codebuild-test-reports/ \; +reports: + ChecksumsTests: + files: + - 'codebuild-test-reports/**/*' diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java index b39f7b568a11..1ad90667a657 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java @@ -393,8 +393,8 @@ static AsyncRequestBody fromInputStream(Consumer + + s3-regression-tests + + + doRelease + + + + + true + true + true + true + true + true + + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${maven-failsafe-plugin.version} + + + integration-test + + integration-test + verify + + + + + -Xmx12g -Xms4g -XX:+AllowRedefinitionToAddDeleteMethods + + **/*RegressionTesting.java + + false + + + + + + + + + endpoint-tests diff --git a/test/s3-tests/README.md b/test/s3-tests/README.md new file mode 100644 index 000000000000..c8a458f30372 --- /dev/null +++ b/test/s3-tests/README.md @@ -0,0 +1,31 @@ +# SDK Regression Tests for Amazon S3 + +## Description +This module contains SDK regression tests for Amazon S3 streaming operations with various SDK configurations. + + +## How to run + +### Credentials + +The tests require valid AWS credentials to be available in the default credential file under the `aws-test-account` profile. + +### Run the tests + +- Run from your IDE + +- Run from maven command line + +``` +mvn clean install -P s3-regression-tests -pl :stability-tests +``` + +## Adding New Tests + +- The tests are built using [JUnit 5](https://junit.org/junit5/). Make sure you are using the correct APIs and mixing of + Junit 4 and Junit 5 APIs on the same test can have unexpected results. + +- All tests should have the suffix of `RegressionTesting`, eg: `` + + + diff --git a/test/s3-tests/pom.xml b/test/s3-tests/pom.xml index 5ce83df5680a..3df269d4e930 100644 --- a/test/s3-tests/pom.xml +++ b/test/s3-tests/pom.xml @@ -152,6 +152,23 @@ ${awsjavasdk.version} test + + software.amazon.awssdk + http-auth-aws-crt + ${awsjavasdk.version} + test + + + software.amazon.awssdk + s3-transfer-manager + ${awsjavasdk.version} + test + + + software.amazon.awssdk + test-utils + test + diff --git a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/checksum/ChecksumIntegrationTesting.java b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/checksum/ChecksumIntegrationTesting.java deleted file mode 100644 index ab927aa9eda0..000000000000 --- a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/checksum/ChecksumIntegrationTesting.java +++ /dev/null @@ -1,1061 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.services.s3.checksum; - -import static org.assertj.core.api.Assertions.assertThat; - -import io.reactivex.Flowable; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assumptions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; -import software.amazon.awssdk.auth.signer.S3SignerExecutionAttribute; -import software.amazon.awssdk.awscore.AwsClient; -import software.amazon.awssdk.awscore.exception.AwsErrorDetails; -import software.amazon.awssdk.checksums.DefaultChecksumAlgorithm; -import software.amazon.awssdk.checksums.SdkChecksum; -import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; -import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; -import software.amazon.awssdk.core.interceptor.Context; -import software.amazon.awssdk.core.interceptor.ExecutionAttributes; -import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.http.SdkHttpMethod; -import software.amazon.awssdk.http.SdkHttpRequest; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.BucketAccelerateStatus; -import software.amazon.awssdk.services.s3.model.BucketLocationConstraint; -import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration; -import software.amazon.awssdk.services.s3.model.CreateBucketRequest; -import software.amazon.awssdk.services.s3.model.DataRedundancy; -import software.amazon.awssdk.services.s3.model.Delete; -import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; -import software.amazon.awssdk.services.s3.model.GlacierJobParameters; -import software.amazon.awssdk.services.s3.model.LocationInfo; -import software.amazon.awssdk.services.s3.model.LocationType; -import software.amazon.awssdk.services.s3.model.ObjectIdentifier; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.PutObjectResponse; -import software.amazon.awssdk.services.s3.model.RestoreObjectRequest; -import software.amazon.awssdk.services.s3.model.RestoreRequest; -import software.amazon.awssdk.services.s3.model.S3Exception; -import software.amazon.awssdk.services.s3.model.StorageClass; -import software.amazon.awssdk.services.s3.model.Tier; -import software.amazon.awssdk.services.s3control.S3ControlClient; -import software.amazon.awssdk.services.s3control.model.CreateMultiRegionAccessPointRequest; -import software.amazon.awssdk.services.s3control.model.GetMultiRegionAccessPointResponse; -import software.amazon.awssdk.services.s3control.model.MultiRegionAccessPointStatus; -import software.amazon.awssdk.services.s3control.model.S3ControlException; -import software.amazon.awssdk.services.sts.StsClient; -import software.amazon.awssdk.utils.BinaryUtils; -import software.amazon.awssdk.utils.CompletableFutureUtils; -import software.amazon.awssdk.utils.FunctionalUtils; -import software.amazon.awssdk.utils.Logger; -import software.amazon.awssdk.crt.Log; - -public class ChecksumIntegrationTesting { - private static final String BUCKET_NAME_PREFIX = "do-not-delete-checksums-"; - private static final String MRAP_NAME = "do-not-delete-checksum-testing"; - private static final String AP_NAME = "do-not-delete-checksum-testing-ap"; - private static final String EOZ_SUFFIX = "--usw2-az3--x-s3"; - - private static final Logger LOG = Logger.loggerFor(ChecksumIntegrationTesting.class); - private static final Region REGION = Region.US_WEST_2; - private static final String TEST_CREDENTIALS_PROFILE_NAME = "aws-test-account"; - - - public static final AwsCredentialsProviderChain CREDENTIALS_PROVIDER_CHAIN = - AwsCredentialsProviderChain.of(ProfileCredentialsProvider.builder() - .profileName(TEST_CREDENTIALS_PROFILE_NAME) - .build(), - DefaultCredentialsProvider.create()); - - private static final SdkChecksum CRC32 = SdkChecksum.forAlgorithm(DefaultChecksumAlgorithm.CRC32); - - private static final ExecutorService ASYNC_REQUEST_BODY_EXECUTOR = Executors.newSingleThreadExecutor(); - - private static String accountId; - private static String bucketName; - private static String mrapArn; - private static String eozBucket; - private static String apArn; - - private static S3ControlClient s3Control; - private static S3Client s3; - private static StsClient sts; - - private static Path testFile; - - private Map> bucketCleanup = new HashMap<>(); - - @BeforeAll - static void setup() throws InterruptedException, IOException { - // Log.initLoggingToStdout(Log.LogLevel.Trace); - - s3 = S3Client.builder() - .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) - .region(REGION) - .build(); - - s3Control = S3ControlClient.builder() - .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) - .region(REGION) - .build(); - - sts = StsClient.builder().credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) - .region(REGION) - .build(); - - accountId = getAccountId(); - - bucketName = createBucket(); - - mrapArn = createMrap(); - - eozBucket = createEozBucket(); - - apArn = createAccessPoint(); - - testFile = createRandomFile(); - } - - @AfterEach - public void methodCleanup() { - bucketCleanup.forEach((bt, keys) -> { - String bucket = bucketForType(bt); - keys.forEach(k -> s3.deleteObject(r -> r.bucket(bucket).key(k))); - }); - - bucketCleanup.clear(); - } - - @AfterAll - public static void cleanup() { - ASYNC_REQUEST_BODY_EXECUTOR.shutdownNow(); - } - - private void assumeNotAccessPointWithPathStyle(TestConfig config) { - BucketType bucketType = config.getBucketType(); - Assumptions.assumeFalse(config.isForcePathStyle() && bucketType.isArnType(), - "Path style doesn't work with ARN type buckets"); - } - - private void assumeNotAccelerateWithPathStyle(TestConfig config) { - Assumptions.assumeFalse(config.isForcePathStyle() && config.isAccelerateEnabled(), - "Path style doesn't work with Accelerate"); - } - - private void assumeNotAccelerateWithArnType(TestConfig config) { - Assumptions.assumeFalse(config.isAccelerateEnabled() && config.getBucketType().isArnType(), - "Accelerate doesn't work with ARN buckets"); - } - - private void assumeNotAccelerateWithEoz(TestConfig config) { - Assumptions.assumeFalse(config.isAccelerateEnabled() && config.getBucketType() == BucketType.EOZ, - "Accelerate is not supported with Express One Zone"); - } - - // Request checksum required - @ParameterizedTest - @MethodSource("testConfigs") - void deleteObject(TestConfig config) throws Exception { - assumeNotAccessPointWithPathStyle(config); - assumeNotAccelerateWithPathStyle(config); - assumeNotAccelerateWithArnType(config); - assumeNotAccelerateWithEoz(config); - - String bucket = bucketForType(config.getBucketType()); - String key = putRandomObject(config.getBucketType()); - TestCallable callable = null; - try { - DeleteObjectsRequest req = DeleteObjectsRequest.builder() - .bucket(bucket) - .delete(Delete.builder() - .objects(ObjectIdentifier.builder() - .key(key) - .build()) - .build()) - .build(); - - callable = callDeleteObjects(req, config); - callable.runnable.call(); - } finally { - if (callable != null) { - callable.client.close(); - } - } - } - - // Request checksum optional - @ParameterizedTest - @MethodSource("testConfigs") - void restoreObject(TestConfig config) throws Exception { - assumeNotAccessPointWithPathStyle(config); - assumeNotAccelerateWithPathStyle(config); - assumeNotAccelerateWithArnType(config); - - Assumptions.assumeFalse(config.getBucketType() == BucketType.EOZ, - "Restore is not supported for S3 Express"); - - String bucket = bucketForType(config.getBucketType()); - String key = putRandomArchivedObject(config.getBucketType()); - TestCallable callable = null; - try { - RestoreObjectRequest request = RestoreObjectRequest.builder() - .bucket(bucket) - .key(key) - .restoreRequest(RestoreRequest.builder() - .days(5) - .glacierJobParameters(GlacierJobParameters.builder() - .tier(Tier.STANDARD) - .build()) - .build()) - .build(); - - callable = callRestoreObject(request, config); - callable.runnable.call(); - } finally { - if (callable != null) { - callable.client.close(); - } - } - } - - @ParameterizedTest - @MethodSource("uploadConfigs") - void putObject(UploadConfig config) throws Exception { - assumeNotAccelerateWithPathStyle(config.getBaseConfig()); - assumeNotAccessPointWithPathStyle(config.getBaseConfig()); - assumeNotAccelerateWithArnType(config.getBaseConfig()); - assumeNotAccelerateWithEoz(config.getBaseConfig()); - - // For testing purposes, ContentProvider is Publisher for async clients - // There is no way to create AsyncRequestBody with a Publisher and also provide the content length - Assumptions.assumeFalse(config.getBodyType() == BodyType.CONTENT_PROVIDER_WITH_LENGTH - && config.getBaseConfig().getFlavor().isAsync(), - "No way to create AsyncRequestBody by giving both an Publisher and the content length"); - - // Payload signing doesn't work correctly for async java based - Assumptions.assumeFalse(config.getBaseConfig().getFlavor() == S3ClientFlavor.ASYNC_JAVA_BASED - && (config.getBaseConfig().isPayloadSigning() - // MRAP requires body signing - || config.getBaseConfig().getBucketType() == BucketType.MRAP), - "Async payload signing doesn't work with Java based clients"); - - // For testing purposes, ContentProvider is Publisher for async clients - // Async java based clients don't currently support unknown content-length bodies - Assumptions.assumeFalse(config.getBaseConfig().getFlavor() == S3ClientFlavor.ASYNC_JAVA_BASED - && config.getBodyType() == BodyType.CONTENT_PROVIDER_NO_LENGTH, - "Async Java based support unknown content length"); - - BucketType bucketType = config.getBaseConfig().getBucketType(); - - String bucket = bucketForType(bucketType); - String key = randomKey(); - - PutObjectRequest request = PutObjectRequest.builder() - .bucket(bucket) - .key(key) - .build(); - - - RequestRecorder recorder = new RequestRecorder(); - - ClientOverrideConfiguration.Builder overrideConfiguration = - ClientOverrideConfiguration.builder() - .addExecutionInterceptor(recorder); - - if (config.getBaseConfig().isPayloadSigning()) { - overrideConfiguration.addExecutionInterceptor(new EnablePayloadSigningInterceptor()); - } - - TestCallable callable = null; - try { - - Long actualContentLength = null; - boolean requestBodyHasContentLength = false; - String actualCrc32; - - if (!config.getBaseConfig().getFlavor().isAsync()) { - TestRequestBody body = getRequestBody(config.getBodyType()); - callable = callPutObject(request, body, config.getBaseConfig(), overrideConfiguration.build()); - actualContentLength = body.getActualContentLength(); - requestBodyHasContentLength = body.optionalContentLength().isPresent(); - actualCrc32 = body.getChecksum(); - } else { - TestAsyncBody body = getAsyncRequestBody(config.getBodyType()); - callable = callPutObject(request, body, config.getBaseConfig(), overrideConfiguration.build()); - actualContentLength = body.getActualContentLength(); - requestBodyHasContentLength = body.getAsyncRequestBody().contentLength().isPresent(); - actualCrc32 = body.getChecksum(); - } - - PutObjectResponse response = callable.runnable.call(); - - recordObjectToCleanup(bucketType, key); - - // We only validate when configured to WHEN_SUPPORTED since checksums are optional for PutObject - if (config.getBaseConfig().getRequestChecksumValidation() == RequestChecksumCalculation.WHEN_SUPPORTED - // CRT switches to MPU under the hood which doesn't support checksums - && config.getBaseConfig().getFlavor() != S3ClientFlavor.ASYNC_CRT) { - assertThat(response.checksumCRC32()).isEqualTo(actualCrc32); - } - - // We can't set an execution interceptor when using CRT - if (config.getBaseConfig().getFlavor() != S3ClientFlavor.ASYNC_CRT) { - assertThat(recorder.getRequests()).isNotEmpty(); - - for (SdkHttpRequest httpRequest : recorder.getRequests()) { - // skip any non-PUT requests, e.g. GetSession for EOZ requests - if (httpRequest.method() != SdkHttpMethod.PUT) { - continue; - } - - String payloadSha = httpRequest.firstMatchingHeader("x-amz-content-sha256").get(); - if (payloadSha.startsWith("STREAMING")) { - String decodedContentLength = httpRequest.firstMatchingHeader("x-amz-decoded-content-length").get(); - assertThat(Long.parseLong(decodedContentLength)).isEqualTo(actualContentLength); - } else { - Optional contentLength = httpRequest.firstMatchingHeader("Content-Length"); - if (requestBodyHasContentLength) { - assertThat(Long.parseLong(contentLength.get())).isEqualTo(actualContentLength); - } - } - } - } - } finally { - if (callable != null) { - callable.client.close(); - } - } - } - - private TestCallable callDeleteObjects(DeleteObjectsRequest request, TestConfig config) { - AwsClient toClose; - Callable runnable = null; - - if (config.getFlavor().isAsync()) { - S3AsyncClient s3Async = makeAsyncClient(config, null); - toClose = s3Async; - runnable = () -> { - CompletableFutureUtils.joinLikeSync(s3Async.deleteObjects(request)); - return null; - }; - } else { - S3Client s3 = makeSyncClient(config, null); - toClose = s3; - runnable = () -> { - s3.deleteObjects(request); - return null; - }; - } - - return new TestCallable<>(toClose, runnable); - } - - private TestCallable callRestoreObject(RestoreObjectRequest request, TestConfig config) { - AwsClient toClose; - Callable callable = null; - - if (config.getFlavor().isAsync()) { - S3AsyncClient s3Async = makeAsyncClient(config, null); - toClose = s3Async; - callable = () -> { - s3Async.restoreObject(request).join(); - return null; - }; - } else { - S3Client s3 = makeSyncClient(config, null); - toClose = s3; - callable = () -> { - s3.restoreObject(request); - return null; - }; - } - - return new TestCallable<>(toClose, callable); - } - - private TestCallable callPutObject(PutObjectRequest request, TestRequestBody requestBody, TestConfig config, - ClientOverrideConfiguration overrideConfiguration) throws IOException { - S3Client s3Client = makeSyncClient(config, overrideConfiguration); - Callable callable = () -> { - try { - return s3Client.putObject(request, requestBody); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - return new TestCallable<>(s3Client, callable); - } - - private TestCallable callPutObject(PutObjectRequest request, TestAsyncBody requestBody, TestConfig config, - ClientOverrideConfiguration overrideConfiguration) throws IOException { - S3AsyncClient s3Client = makeAsyncClient(config, overrideConfiguration); - Callable callable = () -> { - try { - CompletableFuture future = s3Client.putObject(request, requestBody.getAsyncRequestBody()); - return CompletableFutureUtils.joinLikeSync(future); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - return new TestCallable<>(s3Client, callable); - } - - private static class TestCallable { - private AwsClient client; - private Callable runnable; - - TestCallable(AwsClient client, Callable runnable) { - this.client = client; - this.runnable = runnable; - } - } - - private S3Client makeSyncClient(TestConfig config, ClientOverrideConfiguration overrideConfiguration) { - switch (config.getFlavor()) { - case JAVA_BASED: - return S3Client.builder() - .forcePathStyle(config.isForcePathStyle()) - .requestChecksumCalculation(config.getRequestChecksumValidation()) - .region(REGION) - .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) - .accelerate(config.isAccelerateEnabled()) - .overrideConfiguration(overrideConfiguration) - .build(); - default: - throw new RuntimeException("Unsupported sync flavor: " + config.getFlavor()); - } - } - - private S3AsyncClient makeAsyncClient(TestConfig config, ClientOverrideConfiguration overrideConfiguration) { - switch (config.getFlavor()) { - case ASYNC_JAVA_BASED: - return S3AsyncClient.builder() - .forcePathStyle(config.isForcePathStyle()) - .requestChecksumCalculation(config.getRequestChecksumValidation()) - .region(REGION) - .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) - .accelerate(config.isAccelerateEnabled()) - .overrideConfiguration(overrideConfiguration) - .build(); - case ASYNC_CRT: { - if (overrideConfiguration != null) { - LOG.warn(() -> "Override configuration cannot be set for Async S3 CRT!"); - } - return S3AsyncClient.crtBuilder() - .forcePathStyle(config.isForcePathStyle()) - .requestChecksumCalculation(config.getRequestChecksumValidation()) - .region(REGION) - .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) - .accelerate(config.isAccelerateEnabled()) - .build(); - } - default: - throw new RuntimeException("Unsupported async flavor: " + config.getFlavor()); - } - } - - private static String bucketForType(BucketType type) { - switch (type) { - case STANDARD_BUCKET: - return bucketName; - case MRAP: - return mrapArn; - case EOZ: - return eozBucket; - case ACCESS_POINT: - return apArn; - default: - throw new RuntimeException("Unknown bucket type: " + type); - } - } - - enum BucketType { - STANDARD_BUCKET(false), - ACCESS_POINT(true), - // Multi-region access point - MRAP(true), - // Express one zone/S3 express - EOZ(false), - ; - - private boolean arnType; - - private BucketType(boolean arnType) { - this.arnType = arnType; - } - - public boolean isArnType() { - return arnType; - } - } - - enum S3ClientFlavor { - JAVA_BASED(false), - ASYNC_JAVA_BASED(true), - - ASYNC_CRT(true) - ; - - private boolean async; - - private S3ClientFlavor(boolean async) { - this.async = async; - } - - public boolean isAsync() { - return async; - } - } - - static class UploadConfig { - private TestConfig baseConfig; - private BodyType bodyType; - - public TestConfig getBaseConfig() { - return baseConfig; - } - - public void setBaseConfig(TestConfig baseConfig) { - this.baseConfig = baseConfig; - } - - public BodyType getBodyType() { - return bodyType; - } - - public void setBodyType(BodyType bodyType) { - this.bodyType = bodyType; - } - - @Override - public String toString() { - return "UploadConfig{" + - "baseConfig=" + baseConfig + - ", bodyType=" + bodyType + - '}'; - } - } - - static class TestRequestBody extends RequestBody { - private final long contentLength; - private final String checksum; - - protected TestRequestBody(RequestBody wrapped, long contentLength, String checksum) { - super(wrapped.contentStreamProvider(), wrapped.optionalContentLength().orElse(null), wrapped.contentType()); - this.contentLength = contentLength; - this.checksum = checksum; - } - - public long getActualContentLength() { - return contentLength; - } - - public String getChecksum() { - return checksum; - } - } - - private static class TestAsyncBody { - private final AsyncRequestBody asyncRequestBody; - private final long actualContentLength; - private final String checksum; - - private TestAsyncBody(AsyncRequestBody asyncRequestBody, long actualContentLength, String checksum) { - this.asyncRequestBody = asyncRequestBody; - this.actualContentLength = actualContentLength; - this.checksum = checksum; - } - - public AsyncRequestBody getAsyncRequestBody() { - return asyncRequestBody; - } - - public long getActualContentLength() { - return actualContentLength; - } - - public String getChecksum() { - return checksum; - } - - } - - static class TestConfig { - private S3ClientFlavor flavor; - private BucketType bucketType; - private boolean forcePathStyle; - private RequestChecksumCalculation requestChecksumValidation; - private boolean accelerateEnabled; - private boolean payloadSigning; - - public S3ClientFlavor getFlavor() { - return flavor; - } - - public void setFlavor(S3ClientFlavor flavor) { - this.flavor = flavor; - } - - public BucketType getBucketType() { - return bucketType; - } - - public void setBucketType(BucketType bucketType) { - this.bucketType = bucketType; - } - - public boolean isForcePathStyle() { - return forcePathStyle; - } - - public void setForcePathStyle(boolean forcePathStyle) { - this.forcePathStyle = forcePathStyle; - } - - public RequestChecksumCalculation getRequestChecksumValidation() { - return requestChecksumValidation; - } - - public void setRequestChecksumValidation(RequestChecksumCalculation requestChecksumValidation) { - this.requestChecksumValidation = requestChecksumValidation; - } - - public boolean isAccelerateEnabled() { - return accelerateEnabled; - } - - public void setAccelerateEnabled(boolean accelerateEnabled) { - this.accelerateEnabled = accelerateEnabled; - } - - public boolean isPayloadSigning() { - return payloadSigning; - } - - public void setPayloadSigning(boolean payloadSigning) { - this.payloadSigning = payloadSigning; - } - - @Override - public String toString() { - return "[" + - "flavor=" + flavor + - ", bucketType=" + bucketType + - ", forcePathStyle=" + forcePathStyle + - ", requestChecksumValidation=" + requestChecksumValidation + - ", accelerateEnabled=" + accelerateEnabled + - ", payloadSigning=" + payloadSigning + - ']'; - } - } - - static List testConfigs() { - List configs = new ArrayList<>(); - - boolean[] forcePathStyle = {true, false}; - RequestChecksumCalculation[] checksumValidations = {RequestChecksumCalculation.WHEN_REQUIRED, - RequestChecksumCalculation.WHEN_SUPPORTED}; - boolean[] accelerateEnabled = {true, false}; - boolean[] payloadSigningEnabled = {true, false}; - for (boolean pathStyle : forcePathStyle) { - for (RequestChecksumCalculation checksumValidation : checksumValidations) { - for (S3ClientFlavor flavor : S3ClientFlavor.values()) { - for (BucketType bucketType : BucketType.values()) { - for (boolean accelerate : accelerateEnabled) { - for (boolean payloadSigning : payloadSigningEnabled) { - TestConfig testConfig = new TestConfig(); - testConfig.setFlavor(flavor); - testConfig.setBucketType(bucketType); - testConfig.setForcePathStyle(pathStyle); - testConfig.setRequestChecksumValidation(checksumValidation); - testConfig.setAccelerateEnabled(accelerate); - testConfig.setPayloadSigning(payloadSigning); - configs.add(testConfig); - } - } - } - } - } - } - - return configs; - } - - enum BodyType { - INPUTSTREAM_RESETABLE, - INPUTSTREAM_NOT_RESETABLE, - - STRING, - - FILE, - - CONTENT_PROVIDER_WITH_LENGTH, - - CONTENT_PROVIDER_NO_LENGTH - } - - private static List uploadConfigs() { - List configs = new ArrayList<>(); - - for (BodyType bodyType : BodyType.values()) { - for (TestConfig baseConfig : testConfigs()) { - UploadConfig config = new UploadConfig(); - config.setBaseConfig(baseConfig); - config.setBodyType(bodyType); - configs.add(config); - } - } - return configs; - } - - private String putRandomObject(BucketType bucketType) { - String key = randomKey(); - String bucketName = bucketForType(bucketType); - s3.putObject(r -> r.bucket(bucketName).key(key), RequestBody.fromString("hello")); - recordObjectToCleanup(bucketType, key); - return key; - } - - - private String putRandomArchivedObject(BucketType bucketType) { - String key = randomKey(); - String bucketName = bucketForType(bucketType); - s3.putObject(r -> r.bucket(bucketName).key(key).storageClass(StorageClass.GLACIER), RequestBody.fromString("hello")); - recordObjectToCleanup(bucketType, key); - return key; - } - - private TestRequestBody getRequestBody(BodyType bodyType) throws IOException { - switch (bodyType) { - case STRING: { - String content = "Hello world"; - long contentLength = content.getBytes(StandardCharsets.UTF_8).length; - return new TestRequestBody(RequestBody.fromString("Hello world"), contentLength, crc32(content)); - } - case FILE: - return new TestRequestBody(RequestBody.fromFile(testFile), Files.size(testFile), crc32(testFile)); - case CONTENT_PROVIDER_NO_LENGTH: { - RequestBody wrapped = - RequestBody.fromContentProvider(() -> FunctionalUtils.invokeSafely(() -> Files.newInputStream(testFile)), - "application/octet-stream"); - - return new TestRequestBody(wrapped, Files.size(testFile), crc32(testFile)); - } - case CONTENT_PROVIDER_WITH_LENGTH: { - long contentLength = Files.size(testFile); - RequestBody wrapped = RequestBody.fromContentProvider(() -> FunctionalUtils.invokeSafely(() -> Files.newInputStream(testFile)), - Files.size(testFile), - "application/octet-stream"); - return new TestRequestBody(wrapped, contentLength, crc32(testFile)); - } - case INPUTSTREAM_RESETABLE: { - byte[] content = "Hello world".getBytes(StandardCharsets.UTF_8); - RequestBody wrapped = RequestBody.fromInputStream(new ByteArrayInputStream(content), content.length); - return new TestRequestBody(wrapped, content.length, crc32(content)); - } - case INPUTSTREAM_NOT_RESETABLE: { - byte[] content = "Hello world".getBytes(StandardCharsets.UTF_8); - RequestBody wrapped = RequestBody.fromInputStream(new NonResettableByteStream(content), content.length); - return new TestRequestBody(wrapped, content.length, crc32(content)); - } - default: - throw new RuntimeException("Unsupported body type: " + bodyType); - } - } - - private TestAsyncBody getAsyncRequestBody(BodyType bodyType) throws IOException { - switch (bodyType) { - case STRING: { - String content = "Hello world"; - long contentLength = content.getBytes(StandardCharsets.UTF_8).length; - return new TestAsyncBody(AsyncRequestBody.fromString(content), contentLength, crc32(content)); - } - case FILE: { - long contentLength = Files.size(testFile); - return new TestAsyncBody(AsyncRequestBody.fromFile(testFile), contentLength, crc32(testFile)); - } - case INPUTSTREAM_RESETABLE: { - byte[] content = "Hello world".getBytes(StandardCharsets.UTF_8); - AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromInputStream(new ByteArrayInputStream(content), - (long) content.length, - ASYNC_REQUEST_BODY_EXECUTOR); - return new TestAsyncBody(asyncRequestBody, content.length, crc32(content)); - } - case INPUTSTREAM_NOT_RESETABLE: { - byte[] content = "Hello world".getBytes(StandardCharsets.UTF_8); - AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromInputStream(new NonResettableByteStream(content), - (long) content.length, - ASYNC_REQUEST_BODY_EXECUTOR); - return new TestAsyncBody(asyncRequestBody, content.length, crc32(content)); - } - case CONTENT_PROVIDER_NO_LENGTH: { - byte[] content = "Hello world".getBytes(StandardCharsets.UTF_8); - Flowable publisher = Flowable.just(ByteBuffer.wrap(content)); - AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromPublisher(publisher); - return new TestAsyncBody(asyncRequestBody, content.length, crc32(content)); - } - default: - throw new RuntimeException("Unsupported async body type: " + bodyType); - } - } - - private String randomKey() { - return BinaryUtils.toHex(UUID.randomUUID().toString().getBytes()); - } - - private static String getAccountId() { - return sts.getCallerIdentity().account(); - } - - private static String getBucketName() { - return BUCKET_NAME_PREFIX + accountId; - } - - private static String createAccessPoint() { - try { - s3Control.getAccessPoint(r -> r.accountId(accountId).name(AP_NAME)); - } catch (S3ControlException e) { - if (e.awsErrorDetails().sdkHttpResponse().statusCode() != 404) { - throw e; - } - - s3Control.createAccessPoint(r -> r.bucket(bucketName).name(AP_NAME).accountId(accountId)); - } - - return waitForApToBeReady(); - } - - private static String createMrap() throws InterruptedException { - try { - s3Control.getMultiRegionAccessPoint(r -> r.accountId(accountId).name(MRAP_NAME)); - } catch (S3ControlException e) { - if (e.awsErrorDetails().sdkHttpResponse().statusCode() != 404) { - throw e; - } - - CreateMultiRegionAccessPointRequest createMrap = - CreateMultiRegionAccessPointRequest.builder() - .accountId(accountId) - .details(d -> d.name(MRAP_NAME) - .regions(software.amazon.awssdk.services.s3control.model.Region.builder() - .bucket(bucketName) - .build())) - .build(); - - s3Control.createMultiRegionAccessPoint(createMrap); - } - - return waitForMrapToBeReady(); - } - - private static String createBucket() { - String name = getBucketName(); - LOG.debug(() -> "Creating bucket: " + name); - createBucket(name, 3); - s3.putBucketAccelerateConfiguration(r -> r.bucket(name) - .accelerateConfiguration(c -> c.status(BucketAccelerateStatus.ENABLED))); - return name; - } - - private static String createEozBucket() { - String eozBucketName = getBucketName() + EOZ_SUFFIX; - LOG.debug(() -> "Creating EOZ bucket: " + eozBucketName); - CreateBucketConfiguration cfg = CreateBucketConfiguration.builder() - .bucket(info -> info.dataRedundancy(DataRedundancy.SINGLE_AVAILABILITY_ZONE) - .type(software.amazon.awssdk.services.s3.model.BucketType.DIRECTORY)) - .location(LocationInfo.builder() - .name("usw2-az3") - .type(LocationType.AVAILABILITY_ZONE) - .build()) - .build(); - - try { - s3.createBucket(r -> r.bucket(eozBucketName).createBucketConfiguration(cfg)); - } catch (S3Exception e) { - AwsErrorDetails awsErrorDetails = e.awsErrorDetails(); - if (!"BucketAlreadyOwnedByYou".equals(awsErrorDetails.errorCode())) { - throw e; - } - } - return eozBucketName; - } - - private static String waitForMrapToBeReady() throws InterruptedException { - GetMultiRegionAccessPointResponse getMrapResponse = null; - - Instant waitStart = Instant.now(); - boolean initial = true; - do { - if (!initial) { - Thread.sleep(Duration.ofSeconds(10).toMillis()); - initial = true; - } - GetMultiRegionAccessPointResponse response = s3Control.getMultiRegionAccessPoint(r -> r.accountId(accountId).name(MRAP_NAME)); - LOG.debug(() -> "Wait response: " + response); - getMrapResponse = response; - } while (MultiRegionAccessPointStatus.READY != getMrapResponse.accessPoint().status() - && Duration.between(Instant.now(), waitStart).compareTo(Duration.ofMinutes(5)) < 0); - - return "arn:aws:s3::" + accountId + ":accesspoint/" + getMrapResponse.accessPoint().alias(); - } - - private static String waitForApToBeReady() { - return s3Control.getAccessPoint(r -> r.accountId(accountId).name(AP_NAME)).accessPointArn(); - } - - private static void createBucket(String bucketName, int retryCount) { - try { - s3.createBucket( - CreateBucketRequest.builder() - .bucket(bucketName) - .createBucketConfiguration( - CreateBucketConfiguration.builder() - .locationConstraint(BucketLocationConstraint.US_WEST_2) - .build()) - .build()); - } catch (S3Exception e) { - LOG.debug(() -> "Error attempting to create bucket: " + bucketName); - if (e.awsErrorDetails().errorCode().equals("BucketAlreadyOwnedByYou")) { - LOG.debug(() -> String.format("%s bucket already exists, likely leaked by a previous run%n", bucketName)); - } else if (e.awsErrorDetails().errorCode().equals("TooManyBuckets")) { - LOG.debug(() -> "Printing all buckets for debug:"); - s3.listBuckets().buckets().forEach(l -> LOG.debug(l::toString)); - if (retryCount < 2) { - LOG.debug(() -> "Retrying..."); - createBucket(bucketName, retryCount + 1); - } else { - throw e; - } - } else { - throw e; - } - } - - s3.waiter().waitUntilBucketExists(r -> r.bucket(bucketName)); - } - - private static Path createRandomFile() throws IOException { - Path tmp = Files.createTempFile(null, null); - byte[] randomBytes = new byte[1024]; - new Random().nextBytes(randomBytes); - try (OutputStream os = Files.newOutputStream(tmp)) { - for (int i = 0; i < 16; ++i) { - os.write(randomBytes); - } - } - return tmp; - } - - private static class NonResettableByteStream extends ByteArrayInputStream { - public NonResettableByteStream(byte[] buf) { - super(buf); - } - - @Override - public boolean markSupported() { - return false; - } - - @Override - public synchronized void reset() { - throw new UnsupportedOperationException(); - } - } - - private static String crc32(String s) { - return crc32(s.getBytes(StandardCharsets.UTF_8)); - } - - private static String crc32(byte[] bytes) { - CRC32.reset(); - CRC32.update(bytes); - return BinaryUtils.toBase64(CRC32.getChecksumBytes()); - } - - private static String crc32(Path p) throws IOException { - CRC32.reset(); - - byte[] buff = new byte[4096]; - int read; - try (InputStream is = Files.newInputStream(p)) { - while (true) { - read = is.read(buff); - if (read == -1) { - break; - } - CRC32.update(buff, 0, read); - } - } - - return BinaryUtils.toBase64(CRC32.getChecksumBytes()); - } - - private void recordObjectToCleanup(BucketType type, String key) { - bucketCleanup.computeIfAbsent(type, k -> new ArrayList<>()).add(key); - } - - private static class RequestRecorder implements ExecutionInterceptor { - private final List requests = new ArrayList<>(); - @Override - public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) { - requests.add(context.httpRequest()); - } - - public List getRequests() { - return requests; - } - } - - private static class EnablePayloadSigningInterceptor implements ExecutionInterceptor { - @Override - public void beforeExecution(Context.BeforeExecution context, ExecutionAttributes executionAttributes) { - executionAttributes.putAttribute(S3SignerExecutionAttribute.ENABLE_PAYLOAD_SIGNING, true); - ExecutionInterceptor.super.beforeExecution(context, executionAttributes); - } - } -} diff --git a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/BaseS3RegressionTest.java b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/BaseS3RegressionTest.java new file mode 100644 index 000000000000..dceb91781f32 --- /dev/null +++ b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/BaseS3RegressionTest.java @@ -0,0 +1,121 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.regression; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3control.S3ControlClient; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.utils.Logger; + +public abstract class BaseS3RegressionTest { + private static final Logger LOG = Logger.loggerFor(BaseS3RegressionTest.class); + + private static final String BUCKET_NAME_PREFIX = "do-not-delete-checksums-"; + private static final String MRAP_NAME = "do-not-delete-checksum-testing"; + private static final String AP_NAME = "do-not-delete-checksum-testing-ap"; + private static final String EOZ_SUFFIX = "--usw2-az3--x-s3"; + protected static final Region REGION = Region.US_WEST_2; + + protected static final String TEST_CREDENTIALS_PROFILE_NAME = "aws-test-account"; + protected static final AwsCredentialsProviderChain CREDENTIALS_PROVIDER_CHAIN = + AwsCredentialsProviderChain.of(ProfileCredentialsProvider.builder() + .profileName(TEST_CREDENTIALS_PROFILE_NAME) + .build(), + DefaultCredentialsProvider.create()); + + protected static String accountId; + protected static String bucketName; + protected static String mrapArn; + protected static String eozBucket; + protected static String apArn; + + protected static S3ControlClient s3Control; + protected static S3Client s3; + protected static StsClient sts; + + private Map> bucketCleanup = new HashMap<>(); + + @BeforeAll + static void setup() throws InterruptedException, IOException { + // Log.initLoggingToStdout(Log.LogLevel.Trace); + + s3 = S3Client.builder() + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .region(REGION) + .build(); + + s3Control = S3ControlClient.builder() + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .region(REGION) + .build(); + + sts = StsClient.builder().credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .region(REGION) + .build(); + + accountId = S3ChecksumsTestUtils.getAccountId(sts); + bucketName = S3ChecksumsTestUtils.createBucket(s3, getBucketName(), LOG); + mrapArn = S3ChecksumsTestUtils.createMrap(s3Control, accountId, MRAP_NAME, bucketName, LOG); + eozBucket = S3ChecksumsTestUtils.createEozBucket(s3, getBucketName() + EOZ_SUFFIX, LOG); + apArn = S3ChecksumsTestUtils.createAccessPoint(s3Control, accountId, AP_NAME, bucketName); + + } + + @AfterEach + public void methodCleanup() { + bucketCleanup.forEach((bt, keys) -> { + String bucket = bucketForType(bt); + keys.forEach(k -> s3.deleteObject(r -> r.bucket(bucket).key(k))); + }); + + bucketCleanup.clear(); + } + + protected void recordObjectToCleanup(BucketType type, String key) { + bucketCleanup.computeIfAbsent(type, k -> new ArrayList<>()).add(key); + } + + protected static String getBucketName() { + return BUCKET_NAME_PREFIX + accountId; + } + + protected static String bucketForType(BucketType type) { + switch (type) { + case STANDARD_BUCKET: + return bucketName; + case MRAP: + return mrapArn; + case EOZ: + return eozBucket; + case ACCESS_POINT: + return apArn; + default: + throw new RuntimeException("Unknown bucket type: " + type); + } + } + +} diff --git a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/BucketType.java b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/BucketType.java new file mode 100644 index 000000000000..e7f2ebe36276 --- /dev/null +++ b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/BucketType.java @@ -0,0 +1,36 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.regression; + +public enum BucketType { + STANDARD_BUCKET(false), + ACCESS_POINT(true), + // Multi-region access point + MRAP(true), + // Express one zone/S3 express + EOZ(false), + ; + + private final boolean arnType; + + BucketType(boolean arnType) { + this.arnType = arnType; + } + + public boolean isArnType() { + return arnType; + } +} diff --git a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/ControlPlaneOperationRegressionTesting.java b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/ControlPlaneOperationRegressionTesting.java new file mode 100644 index 000000000000..13c402322cd3 --- /dev/null +++ b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/ControlPlaneOperationRegressionTesting.java @@ -0,0 +1,185 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.regression; + +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccelerateWithArnType; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccelerateWithEoz; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccelerateWithPathStyle; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccessPointWithPathStyle; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.makeAsyncClient; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.makeSyncClient; + +import java.util.List; +import java.util.concurrent.Callable; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.awscore.AwsClient; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.GlacierJobParameters; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.RestoreObjectRequest; +import software.amazon.awssdk.services.s3.model.RestoreRequest; +import software.amazon.awssdk.services.s3.model.StorageClass; +import software.amazon.awssdk.services.s3.model.Tier; +import software.amazon.awssdk.utils.CompletableFutureUtils; +import software.amazon.awssdk.utils.Logger; + +public class ControlPlaneOperationRegressionTesting extends BaseS3RegressionTest { + private static final Logger LOG = Logger.loggerFor(ControlPlaneOperationRegressionTesting.class); + + // Request checksum required + @ParameterizedTest + @MethodSource("testConfigs") + void deleteObject(TestConfig config) throws Exception { + assumeNotAccessPointWithPathStyle(config); + assumeNotAccelerateWithPathStyle(config); + assumeNotAccelerateWithArnType(config); + assumeNotAccelerateWithEoz(config); + + LOG.debug(() -> "Running deleteObject with config: " + config.toString()); + + String bucket = bucketForType(config.getBucketType()); + String key = putRandomObject(config.getBucketType()); + TestCallable callable = null; + try { + DeleteObjectsRequest req = DeleteObjectsRequest.builder() + .bucket(bucket) + .delete(Delete.builder() + .objects(ObjectIdentifier.builder() + .key(key) + .build()) + .build()) + .build(); + + callable = callDeleteObjects(req, config); + callable.runnable().call(); + } finally { + if (callable != null) { + callable.client().close(); + } + } + } + + // Request checksum optional + @ParameterizedTest + @MethodSource("testConfigs") + void restoreObject(TestConfig config) throws Exception { + assumeNotAccessPointWithPathStyle(config); + assumeNotAccelerateWithPathStyle(config); + assumeNotAccelerateWithArnType(config); + + Assumptions.assumeFalse(config.getBucketType() == BucketType.EOZ, + "Restore is not supported for S3 Express"); + + LOG.debug(() -> "Running restoreObject with config: " + config); + + String bucket = bucketForType(config.getBucketType()); + String key = putRandomArchivedObject(config.getBucketType()); + TestCallable callable = null; + try { + RestoreObjectRequest request = RestoreObjectRequest.builder() + .bucket(bucket) + .key(key) + .restoreRequest(RestoreRequest.builder() + .days(5) + .glacierJobParameters(GlacierJobParameters.builder() + .tier(Tier.STANDARD) + .build()) + .build()) + .build(); + + callable = callRestoreObject(request, config); + callable.runnable().call(); + } finally { + if (callable != null) { + callable.client().close(); + } + } + } + + private TestCallable callDeleteObjects(DeleteObjectsRequest request, TestConfig config) { + AwsClient toClose; + Callable runnable = null; + + if (config.getFlavor().isAsync()) { + S3AsyncClient s3Async = makeAsyncClient(config, REGION, CREDENTIALS_PROVIDER_CHAIN); + toClose = s3Async; + runnable = () -> { + CompletableFutureUtils.joinLikeSync(s3Async.deleteObjects(request)); + return null; + }; + } else { + S3Client s3 = makeSyncClient(config, REGION, CREDENTIALS_PROVIDER_CHAIN); + toClose = s3; + runnable = () -> { + s3.deleteObjects(request); + return null; + }; + } + + return new TestCallable<>(toClose, runnable); + } + + private TestCallable callRestoreObject(RestoreObjectRequest request, TestConfig config) { + AwsClient toClose; + Callable callable = null; + + if (config.getFlavor().isAsync()) { + S3AsyncClient s3Async = makeAsyncClient(config, REGION, CREDENTIALS_PROVIDER_CHAIN); + toClose = s3Async; + callable = () -> { + s3Async.restoreObject(request).join(); + return null; + }; + } else { + S3Client s3 = makeSyncClient(config, REGION, CREDENTIALS_PROVIDER_CHAIN); + toClose = s3; + callable = () -> { + s3.restoreObject(request); + return null; + }; + } + + return new TestCallable<>(toClose, callable); + } + + static List testConfigs() { + return TestConfig.testConfigs(); + } + + private String putRandomObject(BucketType bucketType) { + String key = S3ChecksumsTestUtils.randomKey(); + String bucketName = bucketForType(bucketType); + s3.putObject(r -> r.bucket(bucketName).key(key), RequestBody.fromString("hello")); + recordObjectToCleanup(bucketType, key); + return key; + } + + private String putRandomArchivedObject(BucketType bucketType) { + String key = S3ChecksumsTestUtils.randomKey(); + String bucketName = bucketForType(bucketType); + s3.putObject(r -> r.bucket(bucketName).key(key).storageClass(StorageClass.GLACIER), RequestBody.fromString("hello")); + recordObjectToCleanup(bucketType, key); + return key; + } + + +} diff --git a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/DownloadStreamingRegressionTesting.java b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/DownloadStreamingRegressionTesting.java new file mode 100644 index 000000000000..6f74393c49d4 --- /dev/null +++ b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/DownloadStreamingRegressionTesting.java @@ -0,0 +1,612 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.regression; + +import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.services.s3.regression.ControlPlaneOperationRegressionTesting.testConfigs; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccelerateWithArnType; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccelerateWithEoz; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccelerateWithPathStyle; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccessPointWithPathStyle; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.crc32; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.ResponsePublisher; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ChecksumMode; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.UploadPartResponse; +import software.amazon.awssdk.testutils.InputStreamUtils; +import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.StringUtils; +import software.amazon.awssdk.utils.ToString; + +public class DownloadStreamingRegressionTesting extends BaseS3RegressionTest { + private static final Logger LOG = Logger.loggerFor(DownloadStreamingRegressionTesting.class); + + static ObjectWithCRC smallObject; + static ObjectWithCRC largeObject; + static ObjectWithCRC largeObjectMulti; + + private static Path tempDirPath; + + private List pathsToDelete; + + @BeforeAll + static void init() throws Exception { + tempDirPath = createTempDir("DownloadStreamingIntegrationTesting"); + smallObject = uploadObjectSmall(); // 16 KiB + largeObject = uploadObjectLarge(); // 80 MiB + largeObjectMulti = uploadMultiPartObject(); // 80 MiB, default multipart config + } + + @AfterAll + static void cleanup() { + for (BucketType bucketType : BucketType.values()) { + String bucket = bucketForType(bucketType); + s3.deleteObject(req -> req.bucket(bucket).key(smallObject.key())); + s3.deleteObject(req -> req.bucket(bucket).key(largeObject.key())); + s3.deleteObject(req -> req.bucket(bucket).key(largeObjectMulti.key())); + } + } + + @BeforeEach + void setupMethod() { + pathsToDelete = new ArrayList<>(); + } + + @AfterEach + void testCleanup() { + pathsToDelete.forEach(p -> { + try { + Files.delete(p); + } catch (Exception e) { + LOG.error(() -> String.format("Unable to delete file %s", p.toString()), e); + } + }); + } + + @ParameterizedTest + @MethodSource("downloadConfigs") + void downloadObject(DownloadConfig config) throws Exception { + assumeNotAccelerateWithPathStyle(config.baseConfig()); + assumeNotAccessPointWithPathStyle(config.baseConfig()); + assumeNotAccelerateWithArnType(config.baseConfig()); + assumeNotAccelerateWithEoz(config.baseConfig()); + + LOG.debug(() -> "Running downloadObject with config: " + config); + + String key = config.contentSize().s3Object().key(); + GetObjectRequest.Builder b = GetObjectRequest.builder() + .bucket(bucketForType(config.baseConfig().getBucketType())) + .key(key); + if (config.checksumModeEnabled()) { + b.checksumMode(ChecksumMode.ENABLED); + } + + GetObjectRequest request = b.build(); + + CallResponse response; + switch (config.baseConfig().getFlavor()) { + case STANDARD_SYNC: { + response = callSyncGetObject(config, request); + break; + } + case STANDARD_ASYNC: + case MULTIPART_ENABLED: + case CRT_BASED: { + response = callAsyncGetObject(request, config); + break; + } + default: + throw new RuntimeException("Unsupported java client flavor: " + config.baseConfig().getFlavor()); + } + + String receivedContentCRC32 = crc32(response.content()); + String s3Crc32 = response.crc32(); + if (config.checksumModeEnabled() && StringUtils.isNotBlank(s3Crc32)) { + assertThat(receivedContentCRC32) + .withFailMessage("Mismatch with s3 crc32 for config " + config) + .isEqualTo(s3Crc32); + } + String expectedCRC32 = config.contentSize().s3Object().crc32(); + assertThat(receivedContentCRC32) + .withFailMessage("Mismatch with calculated crc32 for config " + config) + .isEqualTo(expectedCRC32); + } + + // 16 KiB + static ObjectWithCRC uploadObjectSmall() throws IOException { + String name = String.format("%s-%s.dat", System.currentTimeMillis(), UUID.randomUUID()); + LOG.debug(() -> "test setup - uploading small test object: " + name); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + byte[] rand = new byte[1024]; + for (int i = 0; i < 16; i++) { + new Random().nextBytes(rand); + os.write(rand); + } + byte[] fullContent = os.toByteArray(); + String crc32 = crc32(fullContent); + for (BucketType bucketType : BucketType.values()) { + String bucket = bucketForType(bucketType); + PutObjectRequest req = PutObjectRequest + .builder() + .bucket(bucket) + .key(name) + .build(); + s3.putObject(req, RequestBody.fromBytes(fullContent)); + } + return new ObjectWithCRC(name, crc32); + } + + // 80 MiB + static ObjectWithCRC uploadObjectLarge() throws IOException { + String name = String.format("%s-%s.dat", System.currentTimeMillis(), UUID.randomUUID()); + LOG.debug(() -> "test setup - uploading large test object: " + name); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + byte[] rand = new byte[1024 * 1024]; + for (int i = 0; i < 80; i++) { + new Random().nextBytes(rand); + os.write(rand); + } + byte[] fullContent = os.toByteArray(); + String crc32 = crc32(fullContent); + for (BucketType bucketType : BucketType.values()) { + String bucket = bucketForType(bucketType); + PutObjectRequest req = PutObjectRequest + .builder() + .bucket(bucket) + .key(name) + .build(); + + s3.putObject(req, RequestBody.fromBytes(fullContent)); + } + return new ObjectWithCRC(name, crc32); + } + + // 80MiB, multipart default config + static ObjectWithCRC uploadMultiPartObject() throws Exception { + String name = String.format("%s-%s.dat", System.currentTimeMillis(), UUID.randomUUID()); + LOG.debug(() -> "test setup - uploading large test object - multipart: " + name); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + byte[] rand = new byte[8 * 1024 * 1024]; + for (int i = 0; i < 10; i++) { + new Random().nextBytes(rand); + os.write(rand); + } + byte[] fullContent = os.toByteArray(); + String crc32 = crc32(fullContent); + for (BucketType bucketType : BucketType.values()) { + doMultipartUpload(bucketType, name, fullContent); + } + return new ObjectWithCRC(name, crc32); + } + + static void doMultipartUpload(BucketType bucketType, String objectName, byte[] content) { + String bucket = bucketForType(bucketType); + LOG.debug(() -> String.format("Uploading multipart object for bucket type: %s - %s", bucketType, bucket)); + CreateMultipartUploadRequest createMulti = CreateMultipartUploadRequest.builder() + .bucket(bucket) + .key(objectName) + .build(); + + CreateMultipartUploadResponse res = s3.createMultipartUpload(createMulti); + String uploadId = res.uploadId(); + + List completedParts = new ArrayList<>(); + int partAmount = 10; + int partSize = 8 * 1024 * 1024; + for (int i = 0; i < partAmount; i++) { + final int partNumber = i + 1; + int startIndex = partSize * i; + int endIndex = startIndex + partSize; + byte[] partContent = Arrays.copyOfRange(content, startIndex, endIndex); + LOG.debug(() -> "Uploading part: " + partNumber); + UploadPartResponse partResponse = s3.uploadPart(req -> req.partNumber(partNumber) + .uploadId(uploadId) + .key(objectName) + .bucket(bucket), + RequestBody.fromBytes(partContent)); + completedParts.add(CompletedPart.builder() + .eTag(partResponse.eTag()) + .partNumber(partNumber) + .build()); + LOG.debug(() -> String.format("done part %s - etag: %s: ", partNumber, partResponse.eTag())); + } + + LOG.debug(() -> "Finishing MPU, completed parts: " + completedParts); + + s3.completeMultipartUpload(req -> req.multipartUpload(u -> u.parts(completedParts)) + .bucket(bucket) + .key(objectName) + .uploadId(uploadId)); + s3.waiter().waitUntilObjectExists(r -> r.bucket(bucket).key(objectName), + c -> c.waitTimeout(Duration.ofMinutes(5))); + } + + private static List downloadConfigs() { + List configs = new ArrayList<>(); + for (ResponseTransformerType responseTransformerType : ResponseTransformerType.values()) { + for (TestConfig baseConfig : testConfigs()) { + for (ContentSize contentSize : ContentSize.values()) { + DownloadConfig checksumEnabled = + new DownloadConfig(baseConfig, responseTransformerType, contentSize, true); + DownloadConfig checksumDisabled = + new DownloadConfig(baseConfig, responseTransformerType, contentSize, false); + configs.add(checksumEnabled); + configs.add(checksumDisabled); + } + } + } + return configs; + } + + CallResponse callSyncGetObject(DownloadConfig config, GetObjectRequest request) throws IOException { + S3Client s3Client = makeSyncClient(config.baseConfig()); + + byte[] content; + String s3Crc32 = null; + switch (config.responseTransformerType()) { + case FILE: { + String filename = request.key(); + Path filePath = Paths.get(tempDirPath.toString(), filename); + pathsToDelete.add(filePath); + GetObjectResponse res = s3Client.getObject(request, ResponseTransformer.toFile(filePath)); + s3Crc32 = res.checksumCRC32(); + content = Files.readAllBytes(filePath); + break; + } + + case BYTES: { + ResponseBytes res = s3Client.getObject(request, ResponseTransformer.toBytes()); + content = res.asByteArray(); + s3Crc32 = res.response().checksumCRC32(); + break; + } + + case INPUT_STREAM: { + ResponseInputStream res = s3Client.getObject(request, ResponseTransformer.toInputStream()); + content = InputStreamUtils.drainInputStream(res); + s3Crc32 = res.response().checksumCRC32(); + break; + } + + case OUTPUT_STREAM: { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + GetObjectResponse res = s3Client.getObject(request, ResponseTransformer.toOutputStream(os)); + content = os.toByteArray(); + s3Crc32 = res.checksumCRC32(); + break; + } + + case UNMANAGED: { + UnmanagedResponseTransformer tr = new UnmanagedResponseTransformer(); + s3Client.getObject(request, ResponseTransformer.unmanaged(tr)); + content = tr.content; + s3Crc32 = tr.response().checksumCRC32(); + break; + } + + case PUBLISHER: + Assumptions.abort("Skipping 'publisher' transformer type for sync client: " + config); + content = null; + break; + + default: + throw new UnsupportedOperationException("unsupported response transformer type: " + config.responseTransformerType()); + + } + s3Client.close(); + return new CallResponse(content, s3Crc32); + } + + CallResponse callAsyncGetObject(GetObjectRequest request, DownloadConfig config) throws Exception { + S3AsyncClient s3AsyncClient = makeAsyncClient(config.baseConfig()); + + byte[] content; + String s3crc32 = null; + switch (config.responseTransformerType()) { + case FILE: { + String filename = randomFileName(); + Path filePath = Paths.get(tempDirPath.toString(), filename); + pathsToDelete.add(filePath); + GetObjectResponse res = s3AsyncClient.getObject(request, AsyncResponseTransformer.toFile(filePath)) + .get(5, TimeUnit.MINUTES); + content = Files.readAllBytes(filePath); + s3crc32 = res.checksumCRC32(); + break; + } + + case BYTES: { + ResponseBytes res = s3AsyncClient.getObject(request, AsyncResponseTransformer.toBytes()) + .get(5, TimeUnit.MINUTES); + content = res.asByteArray(); + s3crc32 = res.response().checksumCRC32(); + break; + } + + case INPUT_STREAM: { + ResponseInputStream res = s3AsyncClient.getObject(request, + AsyncResponseTransformer.toBlockingInputStream()) + .get(5, TimeUnit.MINUTES); + content = InputStreamUtils.drainInputStream(res); + s3crc32 = res.response().checksumCRC32(); + break; + } + + case PUBLISHER: { + ResponsePublisher res = s3AsyncClient.getObject(request, + AsyncResponseTransformer.toPublisher()) + .get(5, TimeUnit.MINUTES); + ContentConsumer consumer = new ContentConsumer(); + CompletableFuture fut = res.subscribe(consumer); + fut.get(5, TimeUnit.MINUTES); + content = consumer.getFullContent(); + s3crc32 = res.response().checksumCRC32(); + break; + } + + case OUTPUT_STREAM: + case UNMANAGED: + Assumptions.abort(String.format("Skipping '%s' transformer type for async client: %s", + config.responseTransformerType(), config)); + content = null; + break; + default: + throw new UnsupportedOperationException("unsupported response transformer type: " + config.responseTransformerType()); + } + s3AsyncClient.close(); + return new CallResponse(content, s3crc32); + } + + private static class CallResponse { + byte[] content; + String crc32; + + public CallResponse(byte[] content, String crc32) { + this.content = content; + this.crc32 = crc32; + } + + public byte[] content() { + return content; + } + + public String crc32() { + return crc32; + } + } + + enum ResponseTransformerType { + FILE, + BYTES, + INPUT_STREAM, + OUTPUT_STREAM, + UNMANAGED, + PUBLISHER + } + + private String randomFileName() { + return String.format("%s-%S", System.currentTimeMillis(), UUID.randomUUID()); + } + + static class DownloadConfig { + private TestConfig baseConfig; + private ResponseTransformerType responseTransformerType; + private ContentSize contentSize; + private boolean checksumModeEnabled; + + public DownloadConfig(TestConfig baseConfig, ResponseTransformerType responseTransformerType, + ContentSize contentSize, boolean checksumModeEnabled) { + this.baseConfig = baseConfig; + this.responseTransformerType = responseTransformerType; + this.contentSize = contentSize; + this.checksumModeEnabled = checksumModeEnabled; + } + + public TestConfig baseConfig() { + return this.baseConfig; + } + + public ResponseTransformerType responseTransformerType() { + return responseTransformerType; + } + + public ContentSize contentSize() { + return contentSize; + } + + private boolean checksumModeEnabled() { + return this.checksumModeEnabled; + } + + @Override + public String toString() { + return ToString.builder("DownloadConfig") + .add("baseConfig", baseConfig) + .add("responseTransformerType", responseTransformerType) + .add("contentSize", contentSize) + .add("checksumModeEnabled", checksumModeEnabled) + .build(); + } + } + + private static Path createTempDir(String path) { + try { + return Files.createDirectories(Paths.get(path)); + } catch (Exception e) { + LOG.error(() -> "Unable to create directory", e); + throw new RuntimeException(e); + } + } + + private S3Client makeSyncClient(TestConfig config) { + switch (config.getFlavor()) { + case STANDARD_SYNC: + return S3Client.builder() + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(REGION) + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .accelerate(config.isAccelerateEnabled()) + .build(); + default: + throw new RuntimeException("Unsupported sync flavor: " + config.getFlavor()); + } + } + + private S3AsyncClient makeAsyncClient(TestConfig config) { + switch (config.getFlavor()) { + case STANDARD_ASYNC: + return S3AsyncClient.builder() + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(REGION) + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .accelerate(config.isAccelerateEnabled()) + .build(); + case MULTIPART_ENABLED: + return S3AsyncClient.builder() + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(REGION) + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .accelerate(config.isAccelerateEnabled()) + .multipartEnabled(true) + .build(); + case CRT_BASED: { + return S3AsyncClient.crtBuilder() + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(REGION) + .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) + .accelerate(config.isAccelerateEnabled()) + .build(); + } + default: + throw new RuntimeException("Unsupported async flavor: " + config.getFlavor()); + } + } + + enum ContentSize { + SMALL, + LARGE, + LARGE_MULTI; + + ObjectWithCRC s3Object() { + switch (this) { + case SMALL: + return smallObject; + case LARGE: + return largeObject; + case LARGE_MULTI: + return largeObjectMulti; + default: + throw new IllegalArgumentException("Unknown ContentSize " + this); + } + } + } + + private static class ObjectWithCRC { + private String key; + private String crc32; + + public ObjectWithCRC(String key, String crc32) { + this.key = key; + this.crc32 = crc32; + } + + public String key() { + return key; + } + + public String crc32() { + return crc32; + } + } + + private static class UnmanagedResponseTransformer implements ResponseTransformer { + byte[] content; + GetObjectResponse response; + + @Override + public byte[] transform(GetObjectResponse response, AbortableInputStream inputStream) throws Exception { + this.response = response; + this.content = InputStreamUtils.drainInputStream(inputStream); // stream will be closed + return content; + } + + public GetObjectResponse response() { + return this.response; + } + } + + private static class ContentConsumer implements Consumer { + private List buffs = new ArrayList<>(); + + @Override + public void accept(ByteBuffer byteBuffer) { + buffs.add(byteBuffer); + } + + byte[] getFullContent() { + int totalSize = buffs.stream() + .mapToInt(ByteBuffer::remaining) + .sum(); + byte[] result = new byte[totalSize]; + int offset = 0; + for (ByteBuffer buff : buffs) { + int length = buff.remaining(); + buff.get(result, offset, length); + offset += length; + } + return result; + } + } +} diff --git a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/S3ChecksumsTestUtils.java b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/S3ChecksumsTestUtils.java new file mode 100644 index 000000000000..96d0f1c70b26 --- /dev/null +++ b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/S3ChecksumsTestUtils.java @@ -0,0 +1,375 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.regression; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.util.Random; +import java.util.UUID; +import org.junit.jupiter.api.Assumptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.checksums.DefaultChecksumAlgorithm; +import software.amazon.awssdk.checksums.SdkChecksum; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.BucketAccelerateStatus; +import software.amazon.awssdk.services.s3.model.BucketLocationConstraint; +import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DataRedundancy; +import software.amazon.awssdk.services.s3.model.LocationInfo; +import software.amazon.awssdk.services.s3.model.LocationType; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3control.S3ControlClient; +import software.amazon.awssdk.services.s3control.model.CreateMultiRegionAccessPointRequest; +import software.amazon.awssdk.services.s3control.model.GetMultiRegionAccessPointResponse; +import software.amazon.awssdk.services.s3control.model.MultiRegionAccessPointStatus; +import software.amazon.awssdk.services.s3control.model.S3ControlException; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.utils.BinaryUtils; +import software.amazon.awssdk.utils.Logger; + +public final class S3ChecksumsTestUtils { + + private static final SdkChecksum CRC32 = SdkChecksum.forAlgorithm(DefaultChecksumAlgorithm.CRC32); + + private S3ChecksumsTestUtils() { + } + + public static String createBucket(S3Client s3, String name, Logger log) { + log.debug(() -> "Creating bucket: " + name); + createBucket(s3, name, 3, log); + s3.putBucketAccelerateConfiguration(r -> r.bucket(name) + .accelerateConfiguration(c -> c.status(BucketAccelerateStatus.ENABLED))); + return name; + } + + public static void createBucket(S3Client s3, String bucketName, int retryCount, Logger log) { + try { + s3.createBucket( + CreateBucketRequest.builder() + .bucket(bucketName) + .createBucketConfiguration( + CreateBucketConfiguration.builder() + .locationConstraint(BucketLocationConstraint.US_WEST_2) + .build()) + .build()); + } catch (S3Exception e) { + log.debug(() -> "Error attempting to create bucket: " + bucketName); + if ("BucketAlreadyOwnedByYou".equals(e.awsErrorDetails().errorCode())) { + log.debug(() -> String.format("%s bucket already exists, likely leaked by a previous run%n", bucketName)); + } else if ("TooManyBuckets".equals(e.awsErrorDetails().errorCode())) { + log.debug(() -> "Printing all buckets for debug:"); + s3.listBuckets().buckets().forEach(l -> log.debug(l::toString)); + if (retryCount < 2) { + log.debug(() -> "Retrying..."); + createBucket(s3, bucketName, retryCount + 1, log); + } else { + throw e; + } + } else { + throw e; + } + } + + s3.waiter().waitUntilBucketExists(r -> r.bucket(bucketName)); + } + + public static String createEozBucket(S3Client s3, String bucketName, Logger log) { + String eozBucketName = bucketName; + log.debug(() -> "Creating EOZ bucket: " + eozBucketName); + CreateBucketConfiguration cfg = + CreateBucketConfiguration.builder() + .bucket(info -> info.dataRedundancy(DataRedundancy.SINGLE_AVAILABILITY_ZONE) + .type(software.amazon.awssdk.services.s3.model.BucketType.DIRECTORY)) + .location(LocationInfo.builder() + .name("usw2-az3") + .type(LocationType.AVAILABILITY_ZONE) + .build()) + .build(); + + try { + s3.createBucket(r -> r.bucket(eozBucketName).createBucketConfiguration(cfg)); + } catch (S3Exception e) { + AwsErrorDetails awsErrorDetails = e.awsErrorDetails(); + if (!"BucketAlreadyOwnedByYou".equals(awsErrorDetails.errorCode())) { + throw e; + } + } + return eozBucketName; + } + + public static String createMrap(S3ControlClient s3Control, String accountId, String mrapName, String bucketName, Logger log) + throws InterruptedException { + try { + s3Control.getMultiRegionAccessPoint(r -> r.accountId(accountId).name(mrapName)); + } catch (S3ControlException e) { + if (e.awsErrorDetails().sdkHttpResponse().statusCode() != 404) { + throw e; + } + + CreateMultiRegionAccessPointRequest createMrap = + CreateMultiRegionAccessPointRequest.builder() + .accountId(accountId) + .details(d -> d.name(mrapName) + .regions(software.amazon.awssdk.services.s3control.model.Region.builder() + .bucket(bucketName) + .build())) + .build(); + + s3Control.createMultiRegionAccessPoint(createMrap); + } + + return waitForMrapToBeReady(s3Control, accountId, mrapName, log); + } + + private static String waitForMrapToBeReady(S3ControlClient s3Control, String accountId, String mrapName, Logger log) + throws InterruptedException { + GetMultiRegionAccessPointResponse getMrapResponse = null; + + Instant waitStart = Instant.now(); + boolean initial = true; + do { + if (!initial) { + Thread.sleep(Duration.ofSeconds(10).toMillis()); + initial = true; + } + GetMultiRegionAccessPointResponse response = + s3Control.getMultiRegionAccessPoint(r -> r.accountId(accountId).name(mrapName)); + log.debug(() -> "Wait response: " + response); + getMrapResponse = response; + } while (MultiRegionAccessPointStatus.READY != getMrapResponse.accessPoint().status() + && Duration.between(Instant.now(), waitStart).compareTo(Duration.ofMinutes(5)) < 0); + + return "arn:aws:s3::" + accountId + ":accesspoint/" + getMrapResponse.accessPoint().alias(); + } + + public static String getAccountId(StsClient sts) { + return sts.getCallerIdentity().account(); + } + + public static String createAccessPoint(S3ControlClient s3Control, String accountId, String apName, String bucketName) { + try { + s3Control.getAccessPoint(r -> r.accountId(accountId).name(apName)); + } catch (S3ControlException e) { + if (e.awsErrorDetails().sdkHttpResponse().statusCode() != 404) { + throw e; + } + + s3Control.createAccessPoint(r -> r.bucket(bucketName).name(apName).accountId(accountId)); + } + + // wait for AP to be ready + return s3Control.getAccessPoint(r -> r.accountId(accountId).name(apName)).accessPointArn(); + } + + + public static void assumeNotAccessPointWithPathStyle(TestConfig config) { + BucketType bucketType = config.getBucketType(); + Assumptions.assumeFalse(config.isForcePathStyle() && bucketType.isArnType(), + "Path style doesn't work with ARN type buckets"); + } + + public static void assumeNotAccelerateWithPathStyle(TestConfig config) { + Assumptions.assumeFalse(config.isForcePathStyle() && config.isAccelerateEnabled(), + "Path style doesn't work with Accelerate"); + } + + public static void assumeNotAccelerateWithArnType(TestConfig config) { + Assumptions.assumeFalse(config.isAccelerateEnabled() && config.getBucketType().isArnType(), + "Accelerate doesn't work with ARN buckets"); + } + + public static void assumeNotAccelerateWithEoz(TestConfig config) { + Assumptions.assumeFalse(config.isAccelerateEnabled() && config.getBucketType() == BucketType.EOZ, + "Accelerate is not supported with Express One Zone"); + } + + + public static String crc32(String s) { + return crc32(s.getBytes(StandardCharsets.UTF_8)); + } + + public static String crc32(byte[] bytes) { + CRC32.reset(); + CRC32.update(bytes); + return BinaryUtils.toBase64(CRC32.getChecksumBytes()); + } + + public static String crc32(Path p) throws IOException { + CRC32.reset(); + + byte[] buff = new byte[4096]; + int read; + try (InputStream is = Files.newInputStream(p)) { + while (true) { + read = is.read(buff); + if (read == -1) { + break; + } + CRC32.update(buff, 0, read); + } + } + + return BinaryUtils.toBase64(CRC32.getChecksumBytes()); + } + + public static S3Client makeSyncClient(TestConfig config, Region region, AwsCredentialsProvider provider) { + switch (config.getFlavor()) { + case STANDARD_SYNC: + return S3Client.builder() + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(region) + .credentialsProvider(provider) + .accelerate(config.isAccelerateEnabled()) + .build(); + default: + throw new RuntimeException("Unsupported sync flavor: " + config.getFlavor()); + } + } + + public static S3AsyncClient makeAsyncClient(TestConfig config, Region region, AwsCredentialsProvider provider) { + switch (config.getFlavor()) { + case STANDARD_ASYNC: + return S3AsyncClient.builder() + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(region) + .credentialsProvider(provider) + .accelerate(config.isAccelerateEnabled()) + .build(); + case MULTIPART_ENABLED: + return S3AsyncClient.builder() + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(region) + .credentialsProvider(provider) + .accelerate(config.isAccelerateEnabled()) + .multipartEnabled(true) + .build(); + case CRT_BASED: { + return S3AsyncClient.crtBuilder() + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(region) + .credentialsProvider(provider) + .accelerate(config.isAccelerateEnabled()) + .build(); + } + default: + throw new RuntimeException("Unsupported async flavor: " + config.getFlavor()); + } + } + + public static S3Client makeSyncClient(TestConfig config, ClientOverrideConfiguration overrideConfiguration, + Region region, AwsCredentialsProvider provider) { + switch (config.getFlavor()) { + case STANDARD_SYNC: + return S3Client.builder() + .overrideConfiguration(overrideConfiguration) + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(region) + .credentialsProvider(provider) + .accelerate(config.isAccelerateEnabled()) + .build(); + default: + throw new RuntimeException("Unsupported sync flavor: " + config.getFlavor()); + } + } + + public static S3AsyncClient makeAsyncClient(TestConfig config, ClientOverrideConfiguration overrideConfiguration, + Region region, AwsCredentialsProvider provider) { + switch (config.getFlavor()) { + case STANDARD_ASYNC: + return S3AsyncClient.builder() + .overrideConfiguration(overrideConfiguration) + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(region) + .credentialsProvider(provider) + .accelerate(config.isAccelerateEnabled()) + .build(); + case MULTIPART_ENABLED: + return S3AsyncClient.builder() + .overrideConfiguration(overrideConfiguration) + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(region) + .credentialsProvider(provider) + .accelerate(config.isAccelerateEnabled()) + .multipartEnabled(true) + .build(); + case CRT_BASED: { + return S3AsyncClient.crtBuilder() + .forcePathStyle(config.isForcePathStyle()) + .requestChecksumCalculation(config.getRequestChecksumValidation()) + .region(region) + .credentialsProvider(provider) + .accelerate(config.isAccelerateEnabled()) + .build(); + } + default: + throw new RuntimeException("Unsupported async flavor: " + config.getFlavor()); + } + } + + public static S3TransferManager makeTm(TestConfig config, ClientOverrideConfiguration overrideConfiguration, + Region region, AwsCredentialsProvider provider) { + S3AsyncClient s3AsyncClient = makeAsyncClient(config, overrideConfiguration, region, provider); + return S3TransferManager.builder().s3Client(s3AsyncClient).build(); + } + + public static Path createRandomFile16KB() throws IOException { + Path tmp = Files.createTempFile(null, null); + byte[] randomBytes = new byte[1024]; + new Random().nextBytes(randomBytes); + try (OutputStream os = Files.newOutputStream(tmp)) { + for (int i = 0; i < 16; ++i) { + os.write(randomBytes); + } + } + return tmp; + } + + public static Path createRandomFile80MB() throws IOException { + Path tmp = Files.createTempFile(null, null); + byte[] randomBytes = new byte[1024 * 1024]; + new Random().nextBytes(randomBytes); + try (OutputStream os = Files.newOutputStream(tmp)) { + for (int i = 0; i < 80; ++i) { + os.write(randomBytes); + } + } + return tmp; + } + + public static String randomKey() { + return BinaryUtils.toHex(UUID.randomUUID().toString().getBytes()); + } + +} diff --git a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/S3ClientFlavor.java b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/S3ClientFlavor.java new file mode 100644 index 000000000000..54ddcd87e8e5 --- /dev/null +++ b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/S3ClientFlavor.java @@ -0,0 +1,34 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.regression; + +public enum S3ClientFlavor { + STANDARD_SYNC(false), + STANDARD_ASYNC(true), + MULTIPART_ENABLED(true), + CRT_BASED(true) + ; + + private final boolean async; + + private S3ClientFlavor(boolean async) { + this.async = async; + } + + public boolean isAsync() { + return async; + } +} diff --git a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/TestCallable.java b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/TestCallable.java new file mode 100644 index 000000000000..03e3629e7dfd --- /dev/null +++ b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/TestCallable.java @@ -0,0 +1,37 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.regression; + +import java.util.concurrent.Callable; +import software.amazon.awssdk.utils.SdkAutoCloseable; + +public class TestCallable { + private SdkAutoCloseable client; + private Callable runnable; + + TestCallable(SdkAutoCloseable client, Callable runnable) { + this.client = client; + this.runnable = runnable; + } + + public SdkAutoCloseable client() { + return client; + } + + public Callable runnable() { + return runnable; + } +} diff --git a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/TestConfig.java b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/TestConfig.java new file mode 100644 index 000000000000..3203faa3213a --- /dev/null +++ b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/TestConfig.java @@ -0,0 +1,108 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.regression; + +import java.util.ArrayList; +import java.util.List; +import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; + +public class TestConfig { + private S3ClientFlavor flavor; + private BucketType bucketType; + private boolean forcePathStyle; + private RequestChecksumCalculation requestChecksumValidation; + private boolean accelerateEnabled; + + public S3ClientFlavor getFlavor() { + return flavor; + } + + public void setFlavor(S3ClientFlavor flavor) { + this.flavor = flavor; + } + + public BucketType getBucketType() { + return bucketType; + } + + public void setBucketType(BucketType bucketType) { + this.bucketType = bucketType; + } + + public boolean isForcePathStyle() { + return forcePathStyle; + } + + public void setForcePathStyle(boolean forcePathStyle) { + this.forcePathStyle = forcePathStyle; + } + + public RequestChecksumCalculation getRequestChecksumValidation() { + return requestChecksumValidation; + } + + public void setRequestChecksumValidation(RequestChecksumCalculation requestChecksumValidation) { + this.requestChecksumValidation = requestChecksumValidation; + } + + public boolean isAccelerateEnabled() { + return accelerateEnabled; + } + + public void setAccelerateEnabled(boolean accelerateEnabled) { + this.accelerateEnabled = accelerateEnabled; + } + + @Override + public String toString() { + return "[" + + "flavor=" + flavor + + ", bucketType=" + bucketType + + ", forcePathStyle=" + forcePathStyle + + ", requestChecksumValidation=" + requestChecksumValidation + + ", accelerateEnabled=" + accelerateEnabled + + ']'; + } + + public static List testConfigs() { + List configs = new ArrayList<>(); + + boolean[] forcePathStyle = {true, false}; + RequestChecksumCalculation[] checksumValidations = {RequestChecksumCalculation.WHEN_REQUIRED, + RequestChecksumCalculation.WHEN_SUPPORTED}; + boolean[] accelerateEnabled = {true, false}; + for (boolean pathStyle : forcePathStyle) { + for (RequestChecksumCalculation checksumValidation : checksumValidations) { + for (S3ClientFlavor flavor : S3ClientFlavor.values()) { + for (BucketType bucketType : BucketType.values()) { + for (boolean accelerate : accelerateEnabled) { + TestConfig testConfig = new TestConfig(); + testConfig.setFlavor(flavor); + testConfig.setBucketType(bucketType); + testConfig.setForcePathStyle(pathStyle); + testConfig.setRequestChecksumValidation(checksumValidation); + testConfig.setAccelerateEnabled(accelerate); + configs.add(testConfig); + } + } + } + } + } + + return configs; + } + +} diff --git a/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/UploadStreamingRegressionTesting.java b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/UploadStreamingRegressionTesting.java new file mode 100644 index 000000000000..85f532d5a140 --- /dev/null +++ b/test/s3-tests/src/it/java/software/amazon/awssdk/services/s3/regression/UploadStreamingRegressionTesting.java @@ -0,0 +1,828 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.regression; + +import static org.assertj.core.api.Assertions.assertThat; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccelerateWithArnType; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccelerateWithEoz; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccelerateWithPathStyle; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.assumeNotAccessPointWithPathStyle; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.crc32; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.makeAsyncClient; +import static software.amazon.awssdk.services.s3.regression.S3ChecksumsTestUtils.makeSyncClient; +import static software.amazon.awssdk.services.s3.regression.S3ClientFlavor.MULTIPART_ENABLED; +import static software.amazon.awssdk.services.s3.regression.S3ClientFlavor.STANDARD_ASYNC; +import static software.amazon.awssdk.services.s3.regression.TestConfig.testConfigs; + +import io.reactivex.Flowable; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.auth.signer.S3SignerExecutionAttribute; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody; +import software.amazon.awssdk.core.async.BlockingOutputStreamAsyncRequestBody; +import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.interceptor.Context; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.http.SdkHttpMethod; +import software.amazon.awssdk.http.SdkHttpRequest; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ChecksumMode; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.CompletedUpload; +import software.amazon.awssdk.transfer.s3.model.Upload; +import software.amazon.awssdk.utils.CancellableOutputStream; +import software.amazon.awssdk.utils.CompletableFutureUtils; +import software.amazon.awssdk.utils.FunctionalUtils; +import software.amazon.awssdk.utils.Logger; + +public class UploadStreamingRegressionTesting extends BaseS3RegressionTest { + private static final Logger LOG = Logger.loggerFor(UploadStreamingRegressionTesting.class); + + private static final ExecutorService ASYNC_REQUEST_BODY_EXECUTOR = Executors.newSingleThreadExecutor(); + + static final byte[] smallContent = "Hello world".getBytes(StandardCharsets.UTF_8); + static final byte[] largeContent = largeContent(); + static final String smallContentCrc32 = crc32(smallContent); + static final String largeContentCrc32 = crc32(largeContent); + + static String smallContentCRC32ForBuffersAPI; + static String largeContentCRC32ForBuffersAPI; + + private static Path testFileSmall; + private static Path testFileLarge; + + @BeforeAll + static void setupClass() throws IOException { + testFileSmall = S3ChecksumsTestUtils.createRandomFile16KB(); + testFileLarge = S3ChecksumsTestUtils.createRandomFile80MB(); + + // used in RequestBody.*buffers(...) API + // we calculate crc32 once to try to accelerate test execution + byte[] crcArraySmallContentForBuffersApi = new byte[smallContent.length + smallContent.length]; + System.arraycopy(smallContent, 0, crcArraySmallContentForBuffersApi, 0, smallContent.length); + System.arraycopy(smallContent, 0, crcArraySmallContentForBuffersApi, smallContent.length, smallContent.length); + smallContentCRC32ForBuffersAPI = crc32(crcArraySmallContentForBuffersApi); + + byte[] crcArrayLargeContentForBuffersApi = new byte[largeContent.length + largeContent.length]; + System.arraycopy(largeContent, 0, crcArrayLargeContentForBuffersApi, 0, largeContent.length); + System.arraycopy(largeContent, 0, crcArrayLargeContentForBuffersApi, largeContent.length, largeContent.length); + largeContentCRC32ForBuffersAPI = crc32(crcArrayLargeContentForBuffersApi); + } + + @AfterAll + public static void cleanup() { + ASYNC_REQUEST_BODY_EXECUTOR.shutdownNow(); + } + + @ParameterizedTest + @MethodSource("uploadConfigs") + void putObject(UploadConfig config) throws Exception { + assumeNotAccelerateWithPathStyle(config.getBaseConfig()); + assumeNotAccessPointWithPathStyle(config.getBaseConfig()); + assumeNotAccelerateWithArnType(config.getBaseConfig()); + assumeNotAccelerateWithEoz(config.getBaseConfig()); + + // For testing purposes, ContentProvider is Publisher for async clients + // There is no way to create AsyncRequestBody with a Publisher and also provide the content length + S3ClientFlavor flavor = config.getBaseConfig().getFlavor(); + Assumptions.assumeFalse(config.getBodyType() == BodyType.CONTENT_PROVIDER_WITH_LENGTH + && flavor.isAsync(), + "No way to create AsyncRequestBody by giving both an Publisher and the content length"); + + // Payload signing doesn't work correctly for async java based + // TODO(sra-identity-auth) remove when chunked encoding support is added in async code path + Assumptions.assumeFalse( + (flavor == STANDARD_ASYNC || flavor == MULTIPART_ENABLED) + && (config.payloadSigning() + // MRAP requires body signing + || config.getBaseConfig().getBucketType() == BucketType.MRAP), + "Async payload signing doesn't work with Java based clients"); + + // For testing purposes, ContentProvider is Publisher for async clients + // Async java based clients don't currently support unknown content-length bodies + Assumptions.assumeFalse( + flavor == STANDARD_ASYNC + && config.getBodyType() == BodyType.CONTENT_PROVIDER_NO_LENGTH, + "Async Java based support unknown content length"); + + LOG.debug(() -> "Running putObject with config: " + config); + + BucketType bucketType = config.getBaseConfig().getBucketType(); + + String bucket = bucketForType(bucketType); + String key = S3ChecksumsTestUtils.randomKey(); + + PutObjectRequest request = PutObjectRequest.builder() + .bucket(bucket) + .key(key) + .build(); + + + RequestRecorder recorder = new RequestRecorder(); + + ClientOverrideConfiguration.Builder overrideConfiguration = + ClientOverrideConfiguration.builder() + .addExecutionInterceptor(recorder); + + if (config.payloadSigning()) { + overrideConfiguration.addExecutionInterceptor(new EnablePayloadSigningInterceptor()); + } + + TestCallable callable = null; + try { + + Long actualContentLength = null; + boolean requestBodyHasContentLength = false; + String actualCrc32; + + if (!flavor.isAsync()) { + TestRequestBody body = getRequestBody(config.getBodyType(), config.getContentSize()); + callable = callPutObject(request, body, config.getBaseConfig(), overrideConfiguration.build()); + actualContentLength = body.getActualContentLength(); + requestBodyHasContentLength = body.optionalContentLength().isPresent(); + actualCrc32 = body.getChecksum(); + } else if (flavor == MULTIPART_ENABLED) { + TestAsyncBody body = getAsyncRequestBody(config.getBodyType(), config.contentSize); + callable = callTmUpload(request, body, config.getBaseConfig(), overrideConfiguration.build()); + actualContentLength = body.getActualContentLength(); + requestBodyHasContentLength = body.getAsyncRequestBody().contentLength().isPresent(); + actualCrc32 = body.getChecksum(); + } else { + TestAsyncBody body = getAsyncRequestBody(config.getBodyType(), config.contentSize); + callable = callPutObject(request, body, config.getBaseConfig(), overrideConfiguration.build()); + actualContentLength = body.getActualContentLength(); + requestBodyHasContentLength = body.getAsyncRequestBody().contentLength().isPresent(); + actualCrc32 = body.getChecksum(); + } + + PutObjectResponse response = callable.runnable().call(); + + recordObjectToCleanup(bucketType, key); + + // mpu not supported + if (flavor == MULTIPART_ENABLED) { + return; + } + + // We only validate when configured to WHEN_SUPPORTED since checksums are optional for PutObject + if (config.getBaseConfig().getRequestChecksumValidation() == RequestChecksumCalculation.WHEN_SUPPORTED + // CRT switches to MPU under the hood which doesn't support checksums + && flavor != S3ClientFlavor.CRT_BASED) { + assertThat(response.checksumCRC32()).isEqualTo(actualCrc32); + } + + // We can't set an execution interceptor when using CRT + if (flavor == S3ClientFlavor.CRT_BASED) { + return; + } + + assertThat(recorder.getRequests()).isNotEmpty(); + + for (SdkHttpRequest httpRequest : recorder.getRequests()) { + // skip any non-PUT requests, e.g. GetSession for EOZ requests + if (httpRequest.method() != SdkHttpMethod.PUT) { + continue; + } + + String payloadSha = httpRequest.firstMatchingHeader("x-amz-content-sha256").get(); + if (payloadSha.startsWith("STREAMING")) { + String decodedContentLength = httpRequest.firstMatchingHeader("x-amz-decoded-content-length").get(); + assertThat(Long.parseLong(decodedContentLength)).isEqualTo(actualContentLength); + verifyChecksumResponsePayload(config, key, actualCrc32); + } else { + Optional contentLength = httpRequest.firstMatchingHeader("Content-Length"); + if (requestBodyHasContentLength) { + assertThat(Long.parseLong(contentLength.get())).isEqualTo(actualContentLength); + } + } + } + + } finally { + if (callable != null) { + callable.client().close(); + } + } + } + + private void verifyChecksumResponsePayload(UploadConfig config, String key, String expectedCRC32) { + String bucket = bucketForType(config.getBaseConfig().getBucketType()); + ResponseInputStream response = s3.getObject(req -> req.checksumMode(ChecksumMode.ENABLED) + .key(key) + .bucket(bucket)); + assertThat(response.response().checksumCRC32()).isEqualTo(expectedCRC32); + + } + + private TestCallable callPutObject(PutObjectRequest request, TestRequestBody requestBody, + TestConfig config, + ClientOverrideConfiguration overrideConfiguration) { + S3Client s3Client = makeSyncClient(config, overrideConfiguration, REGION, CREDENTIALS_PROVIDER_CHAIN); + Callable callable = () -> { + try { + return s3Client.putObject(request, requestBody); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + return new TestCallable<>(s3Client, callable); + } + + private TestCallable callPutObject(PutObjectRequest request, TestAsyncBody requestBody, TestConfig config, + ClientOverrideConfiguration overrideConfiguration) { + S3AsyncClient s3Client = makeAsyncClient(config, overrideConfiguration, REGION, CREDENTIALS_PROVIDER_CHAIN); + Callable callable = () -> { + try { + AsyncRequestBody asyncRequestBody = requestBody.getAsyncRequestBody(); + CompletableFuture future = s3Client.putObject(request, asyncRequestBody); + performWriteIfNeeded(requestBody); + return CompletableFutureUtils.joinLikeSync(future); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + return new TestCallable<>(s3Client, callable); + } + + private TestCallable callTmUpload(PutObjectRequest request, TestAsyncBody requestBody, TestConfig config, + ClientOverrideConfiguration overrideConfiguration) { + S3TransferManager transferManager = S3ChecksumsTestUtils.makeTm(config, overrideConfiguration, + REGION, CREDENTIALS_PROVIDER_CHAIN); + Callable callable = () -> { + try { + Upload upload = transferManager.upload( + r -> r.requestBody(requestBody.getAsyncRequestBody()).putObjectRequest(request)); + performWriteIfNeeded(requestBody); + CompletedUpload completedUpload = CompletableFutureUtils.joinLikeSync(upload.completionFuture()); + return completedUpload.response(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + return new TestCallable<>(transferManager, callable); + } + + private TestRequestBody getRequestBody(BodyType bodyType, ContentSize contentSize) throws IOException { + switch (bodyType) { + case STRING: { + String content = contentSize.stringContent(); + return new TestRequestBody(RequestBody.fromString(content), + content.getBytes(StandardCharsets.UTF_8).length, + crc32(content)); + } + case FILE: + return new TestRequestBody(RequestBody.fromFile(contentSize.fileContent()), + Files.size(contentSize.fileContent()), crc32(contentSize.fileContent())); + case CONTENT_PROVIDER_NO_LENGTH: { + RequestBody wrapped = + RequestBody.fromContentProvider(() -> FunctionalUtils.invokeSafely(() -> Files.newInputStream(contentSize.fileContent())), + "application/octet-stream"); + + return new TestRequestBody(wrapped, Files.size(contentSize.fileContent()), crc32(contentSize.fileContent())); + } + case CONTENT_PROVIDER_WITH_LENGTH: { + long contentLength = Files.size(contentSize.fileContent()); + RequestBody wrapped = + RequestBody.fromContentProvider(() -> FunctionalUtils.invokeSafely(() -> Files.newInputStream(contentSize.fileContent())), + Files.size(contentSize.fileContent()), + "application/octet-stream"); + return new TestRequestBody(wrapped, contentLength, crc32(contentSize.fileContent())); + } + case INPUTSTREAM_RESETABLE: { + byte[] content = contentSize.byteContent(); + RequestBody wrapped = RequestBody.fromInputStream(new ByteArrayInputStream(content), content.length); + return new TestRequestBody(wrapped, content.length, contentSize.precalculatedCrc32()); + } + case INPUTSTREAM_NOT_RESETABLE: { + byte[] content = contentSize.byteContent(); + RequestBody wrapped = RequestBody.fromInputStream(new NonResettableByteStream(content), content.length); + return new TestRequestBody(wrapped, content.length, contentSize.precalculatedCrc32()); + } + case BYTES: { + byte[] content = contentSize.byteContent(); + RequestBody wrapped = RequestBody.fromBytes(content); + return new TestRequestBody(wrapped, content.length, contentSize.precalculatedCrc32()); + } + case BYTE_BUFFER: { + byte[] content = contentSize.byteContent(); + RequestBody wrapped = RequestBody.fromByteBuffer(ByteBuffer.wrap(content)); + return new TestRequestBody(wrapped, content.length, contentSize.precalculatedCrc32()); + } + case REMAINING_BYTE_BUFFER: { + byte[] content = contentSize.byteContent(); + ByteBuffer buff = ByteBuffer.wrap(content); + int offset = 2; + buff.position(offset); + RequestBody asyncRequestBody = RequestBody.fromRemainingByteBuffer(buff); + byte[] crcArray = Arrays.copyOfRange(content, offset, content.length); + return new TestRequestBody(asyncRequestBody, content.length - offset, crc32(crcArray)); + } + case BUFFERS: + case BUFFERS_REMAINING: + case BUFFERS_UNSAFE: + case BUFFERS_REMAINING_UNSAFE: + case BYTES_UNSAFE: + case BYTE_BUFFER_UNSAFE: + case REMAINING_BYTE_BUFFER_UNSAFE: + case BLOCKING_INPUT_STREAM: + case BLOCKING_OUTPUT_STREAM: + case INPUTSTREAM_NO_LENGTH: + Assumptions.abort("Test BodyType not supported for sync client: " + bodyType); + default: + throw new RuntimeException("Unsupported body type: " + bodyType); + } + } + + private TestAsyncBody getAsyncRequestBody(BodyType bodyType, ContentSize contentSize) throws IOException { + switch (bodyType) { + case STRING: { + String content = contentSize.stringContent(); + return new TestAsyncBody(AsyncRequestBody.fromString(content), content.getBytes(StandardCharsets.UTF_8).length, + crc32(content), bodyType); + } + case FILE: { + long contentLength = Files.size(contentSize.fileContent()); + return new TestAsyncBody(AsyncRequestBody.fromFile(contentSize.fileContent()), contentLength, + crc32(contentSize.fileContent()), bodyType); + } + case INPUTSTREAM_RESETABLE: { + byte[] content = contentSize.byteContent(); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromInputStream(new ByteArrayInputStream(content), + (long) content.length, + ASYNC_REQUEST_BODY_EXECUTOR); + return new TestAsyncBody(asyncRequestBody, content.length, contentSize.precalculatedCrc32(), bodyType); + } + case INPUTSTREAM_NOT_RESETABLE: { + byte[] content = contentSize.byteContent(); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromInputStream(new NonResettableByteStream(content), + (long) content.length, + ASYNC_REQUEST_BODY_EXECUTOR); + return new TestAsyncBody(asyncRequestBody, content.length, contentSize.precalculatedCrc32(), bodyType); + } + case INPUTSTREAM_NO_LENGTH: { + byte[] content = contentSize.byteContent(); + AsyncRequestBody asyncRequestBody = AsyncRequestBody + .fromInputStream(conf -> conf.inputStream(new ByteArrayInputStream(content)) + .executor(ASYNC_REQUEST_BODY_EXECUTOR)); + return new TestAsyncBody(asyncRequestBody, content.length, contentSize.precalculatedCrc32(), bodyType); + } + case CONTENT_PROVIDER_NO_LENGTH: { + byte[] content = contentSize.byteContent(); + Flowable publisher = Flowable.just(ByteBuffer.wrap(content)); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromPublisher(publisher); + return new TestAsyncBody(asyncRequestBody, content.length, contentSize.precalculatedCrc32(), bodyType); + } + case BYTES: { + byte[] content = contentSize.byteContent(); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromBytes(content); + return new TestAsyncBody(asyncRequestBody, content.length, contentSize.precalculatedCrc32(), bodyType); + } + case BYTE_BUFFER: { + byte[] content = contentSize.byteContent(); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromByteBuffer(ByteBuffer.wrap(content)); + return new TestAsyncBody(asyncRequestBody, content.length, contentSize.precalculatedCrc32(), bodyType); + } + case REMAINING_BYTE_BUFFER: { + byte[] content = contentSize.byteContent(); + ByteBuffer buff = ByteBuffer.wrap(content); + int offset = 2; + buff.position(offset); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromRemainingByteBuffer(buff); + byte[] crcArray = Arrays.copyOfRange(content, offset, content.length); + return new TestAsyncBody(asyncRequestBody, content.length - offset, crc32(crcArray), bodyType); + } + case BYTES_UNSAFE: { + byte[] content = contentSize.byteContent(); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromBytesUnsafe(content); + return new TestAsyncBody(asyncRequestBody, content.length, contentSize.precalculatedCrc32(), bodyType); + } + case BYTE_BUFFER_UNSAFE: { + byte[] content = contentSize.byteContent(); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromByteBufferUnsafe(ByteBuffer.wrap(content)); + return new TestAsyncBody(asyncRequestBody, content.length, contentSize.precalculatedCrc32(), bodyType); + } + case REMAINING_BYTE_BUFFER_UNSAFE: { + byte[] content = contentSize.byteContent(); + ByteBuffer buff = ByteBuffer.wrap(content); + int offset = 2; + buff.position(offset); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromRemainingByteBufferUnsafe(buff); + byte[] crcArray = Arrays.copyOfRange(content, offset, content.length); + return new TestAsyncBody(asyncRequestBody, content.length - offset, crc32(crcArray), bodyType); + } + case BUFFERS: { + byte[] content1 = contentSize.byteContent(); + byte[] content2 = contentSize.byteContent(); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromByteBuffers(ByteBuffer.wrap(content1), + ByteBuffer.wrap(content2)); + return new TestAsyncBody(asyncRequestBody, + content1.length + content2.length, + contentSize.precalculatedCrc32forBuffersAPI(), + bodyType); + } + case BUFFERS_REMAINING: { + byte[] content1 = contentSize.byteContent(); + byte[] content2 = contentSize.byteContent(); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromRemainingByteBuffers(ByteBuffer.wrap(content1), + ByteBuffer.wrap(content2)); + byte[] crcArray = new byte[content2.length + content2.length]; + System.arraycopy(content1, 0, crcArray, 0, content1.length); + System.arraycopy(content2, 0, crcArray, content1.length, content2.length); + return new TestAsyncBody(asyncRequestBody, + content1.length + content2.length, + contentSize.precalculatedCrc32forBuffersAPI(), + bodyType); + } + case BUFFERS_UNSAFE: { + byte[] content1 = contentSize.byteContent(); + byte[] content2 = contentSize.byteContent(); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromByteBuffersUnsafe(ByteBuffer.wrap(content1), + ByteBuffer.wrap(content2)); + byte[] crcArray = new byte[content2.length + content2.length]; + System.arraycopy(content1, 0, crcArray, 0, content1.length); + System.arraycopy(content2, 0, crcArray, content1.length, content2.length); + return new TestAsyncBody(asyncRequestBody, + content1.length + content2.length, + contentSize.precalculatedCrc32forBuffersAPI(), + bodyType); + } + case BUFFERS_REMAINING_UNSAFE: { + byte[] content1 = contentSize.byteContent(); + byte[] content2 = contentSize.byteContent(); + AsyncRequestBody asyncRequestBody = AsyncRequestBody.fromRemainingByteBuffersUnsafe(ByteBuffer.wrap(content1), + ByteBuffer.wrap(content2)); + byte[] crcArray = new byte[content2.length + content2.length]; + System.arraycopy(content1, 0, crcArray, 0, content1.length); + System.arraycopy(content2, 0, crcArray, content1.length, content2.length); + return new TestAsyncBody(asyncRequestBody, + content1.length + content2.length, + contentSize.precalculatedCrc32forBuffersAPI(), + bodyType); + } + case BLOCKING_INPUT_STREAM: { + byte[] content = contentSize.byteContent(); + long streamToSendLength = content.length; + BlockingInputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingInputStream(streamToSendLength); + return new TestAsyncBodyForBlockingInputStream(body, + new ByteArrayInputStream(content), + content.length, + contentSize.precalculatedCrc32(), + bodyType); + } + case BLOCKING_OUTPUT_STREAM: { + byte[] content = contentSize.byteContent(); + long streamToSendLength = content.length; + BlockingOutputStreamAsyncRequestBody body = AsyncRequestBody.forBlockingOutputStream(streamToSendLength); + Consumer bodyWrite = outputStream -> { + try { + outputStream.write(content); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + }; + return new TestAsyncBodyForBlockingOutputStream(body, + bodyWrite, + content.length, + contentSize.precalculatedCrc32(), + bodyType); + } + default: + throw new RuntimeException("Unsupported async body type: " + bodyType); + } + } + + void performWriteIfNeeded(TestAsyncBody requestBody) throws IOException { + if (requestBody.bodyType == BodyType.BLOCKING_INPUT_STREAM) { + BlockingInputStreamAsyncRequestBody body = (BlockingInputStreamAsyncRequestBody) requestBody.asyncRequestBody; + InputStream inputStream = ((TestAsyncBodyForBlockingInputStream) requestBody).inputStream; + body.writeInputStream(inputStream); + inputStream.close(); + } + if (requestBody.bodyType == BodyType.BLOCKING_OUTPUT_STREAM) { + TestAsyncBodyForBlockingOutputStream body = (TestAsyncBodyForBlockingOutputStream) requestBody; + CancellableOutputStream outputStream = + ((BlockingOutputStreamAsyncRequestBody) body.getAsyncRequestBody()).outputStream(); + body.bodyWrite.accept(outputStream); + outputStream.close(); + } + } + + private static List uploadConfigs() { + List configs = new ArrayList<>(); + + boolean[] payloadSigningEnabled = {true, false}; + for (BodyType bodyType : BodyType.values()) { + for (TestConfig baseConfig : testConfigs()) { + for (ContentSize size : ContentSize.values()) { + for (boolean payloadSigning : payloadSigningEnabled) { + UploadConfig config = new UploadConfig(); + config.setPayloadSigning(payloadSigning); + config.setBaseConfig(baseConfig); + config.setBodyType(bodyType); + config.setContentSize(size); + configs.add(config); + } + } + } + } + return configs; + } + + static class UploadConfig { + private TestConfig baseConfig; + private BodyType bodyType; + private ContentSize contentSize; + private boolean payloadSigning; + + public void setPayloadSigning(boolean payloadSigning) { + this.payloadSigning = payloadSigning; + } + + public boolean payloadSigning() { + return payloadSigning; + } + + public TestConfig getBaseConfig() { + return baseConfig; + } + + public void setBaseConfig(TestConfig baseConfig) { + this.baseConfig = baseConfig; + } + + public BodyType getBodyType() { + return bodyType; + } + + public void setBodyType(BodyType bodyType) { + this.bodyType = bodyType; + } + + public void setContentSize(ContentSize contentSize) { + this.contentSize = contentSize; + } + + public ContentSize getContentSize() { + return this.contentSize; + } + + @Override + public String toString() { + return "UploadConfig{" + + "baseConfig=" + baseConfig + + ", bodyType=" + bodyType + + ", contentSize=" + contentSize + + '}'; + } + + } + + enum BodyType { + INPUTSTREAM_RESETABLE, + INPUTSTREAM_NOT_RESETABLE, + INPUTSTREAM_NO_LENGTH, + + STRING, + + FILE, + + CONTENT_PROVIDER_WITH_LENGTH, + + CONTENT_PROVIDER_NO_LENGTH, + + BYTES, + BYTE_BUFFER, + REMAINING_BYTE_BUFFER, + + BYTES_UNSAFE, + BYTE_BUFFER_UNSAFE, + REMAINING_BYTE_BUFFER_UNSAFE, + + BUFFERS, + BUFFERS_REMAINING, + BUFFERS_UNSAFE, + BUFFERS_REMAINING_UNSAFE, + + BLOCKING_INPUT_STREAM, + BLOCKING_OUTPUT_STREAM + } + + enum ContentSize { + SMALL, + LARGE; // 200 MiB + + byte[] byteContent() { + switch (this) { + case SMALL: + return smallContent; + case LARGE: + return largeContent; + default: + throw new IllegalArgumentException("not supported ContentSize " + this); + } + } + + String stringContent() { + switch (this) { + case SMALL: + return "Hello World!"; + case LARGE: + return new String(largeContent(), StandardCharsets.UTF_8); + default: + throw new IllegalArgumentException("not supported ContentSize " + this); + } + } + + Path fileContent() { + switch (this) { + case SMALL: + return testFileSmall; + case LARGE: + return testFileLarge; + default: + throw new IllegalArgumentException("not supported ContentSize " + this); + } + } + + String precalculatedCrc32() { + switch (this) { + case SMALL: + return smallContentCrc32; + case LARGE: + return largeContentCrc32; + default: + throw new IllegalArgumentException("not supported ContentSize " + this); + } + } + + String precalculatedCrc32forBuffersAPI() { + switch (this) { + case SMALL: + return smallContentCRC32ForBuffersAPI; + case LARGE: + return largeContentCRC32ForBuffersAPI; + default: + throw new IllegalArgumentException("not supported ContentSize " + this); + } + } + + } + + private static byte[] largeContent() { + // 80 MiB + Random r = new Random(); + byte[] b = new byte[80 * 1024 * 1024]; + r.nextBytes(b); + return b; + } + + static class TestRequestBody extends RequestBody { + private final long contentLength; + private final String checksum; + + protected TestRequestBody(RequestBody wrapped, long contentLength, String checksum) { + super(wrapped.contentStreamProvider(), wrapped.optionalContentLength().orElse(null), wrapped.contentType()); + this.contentLength = contentLength; + this.checksum = checksum; + } + + public long getActualContentLength() { + return contentLength; + } + + public String getChecksum() { + return checksum; + } + } + + private static class TestAsyncBody { + private final AsyncRequestBody asyncRequestBody; + private final long actualContentLength; + private final String checksum; + private final BodyType bodyType; + + private TestAsyncBody(AsyncRequestBody asyncRequestBody, long actualContentLength, String checksum, BodyType bodyType) { + this.asyncRequestBody = asyncRequestBody; + this.actualContentLength = actualContentLength; + this.checksum = checksum; + this.bodyType = bodyType; + } + + public AsyncRequestBody getAsyncRequestBody() { + return asyncRequestBody; + } + + public long getActualContentLength() { + return actualContentLength; + } + + public String getChecksum() { + return checksum; + } + } + + private static class TestAsyncBodyForBlockingOutputStream extends TestAsyncBody { + private final Consumer bodyWrite; + + private TestAsyncBodyForBlockingOutputStream(AsyncRequestBody asyncRequestBody, + Consumer bodyWrite, + long actualContentLength, + String checksum, + BodyType bodyType) { + super(asyncRequestBody, actualContentLength, checksum, bodyType); + this.bodyWrite = bodyWrite; + } + } + + private static class TestAsyncBodyForBlockingInputStream extends TestAsyncBody { + private final InputStream inputStream; + + private TestAsyncBodyForBlockingInputStream(AsyncRequestBody asyncRequestBody, + InputStream inputStream, + long actualContentLength, + String checksum, + BodyType bodyType) { + super(asyncRequestBody, actualContentLength, checksum, bodyType); + this.inputStream = inputStream; + } + } + + private static class RequestRecorder implements ExecutionInterceptor { + private final List requests = new ArrayList<>(); + + @Override + public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) { + requests.add(context.httpRequest()); + } + + public List getRequests() { + return requests; + } + } + + private static class EnablePayloadSigningInterceptor implements ExecutionInterceptor { + @Override + public void beforeExecution(Context.BeforeExecution context, ExecutionAttributes executionAttributes) { + executionAttributes.putAttribute(S3SignerExecutionAttribute.ENABLE_PAYLOAD_SIGNING, true); + ExecutionInterceptor.super.beforeExecution(context, executionAttributes); + } + } + + private static class NonResettableByteStream extends ByteArrayInputStream { + public NonResettableByteStream(byte[] buf) { + super(buf); + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public synchronized void reset() { + throw new UnsupportedOperationException(); + } + } + +} diff --git a/test/s3-tests/src/it/resources/junit-platform.properties b/test/s3-tests/src/it/resources/junit-platform.properties new file mode 100644 index 000000000000..2c0aa9e64b6a --- /dev/null +++ b/test/s3-tests/src/it/resources/junit-platform.properties @@ -0,0 +1,20 @@ +# +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://aws.amazon.com/apache2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. +# + +junit.jupiter.execution.parallel.enabled = true +junit.jupiter.execution.parallel.config.dynamic.factor = 8 + +junit.jupiter.execution.parallel.mode.default = same_thread +junit.jupiter.execution.parallel.mode.classes.default = concurrent \ No newline at end of file diff --git a/test/s3-tests/src/it/resources/log4j2.xml b/test/s3-tests/src/it/resources/log4j2.xml index 0ae8e7c45738..23536e3e7586 100644 --- a/test/s3-tests/src/it/resources/log4j2.xml +++ b/test/s3-tests/src/it/resources/log4j2.xml @@ -24,6 +24,7 @@ +