From 6075293df57874741a5cdba80c6463d2dfb10746 Mon Sep 17 00:00:00 2001 From: Daren Date: Tue, 27 May 2025 14:00:49 +0100 Subject: [PATCH] [FLINK-37733] Externalise DynamoDB connector IT Test to E2E test package --- .../pom.xml | 117 ++++++++++++++++++ .../sink/test}/DynamoDbSinkITCase.java | 22 +++- .../sink/test}/TestRequestMapper.java | 2 +- .../test}/DynamoDbDynamicSinkITCase.java | 7 +- .../testutils}/DockerImageVersions.java | 2 +- .../dynamodb/testutils/DynamoDBHelpers.java | 0 .../dynamodb/testutils/DynamoDbContainer.java | 0 .../connector/dynamodb/testutils/Item.java | 6 +- .../connector/dynamodb/testutils/Items.java | 2 +- .../src/test/resources/create-table.sql | 0 .../src/test/resources/datagen.sql | 0 .../src/test/resources/log4j2-test.properties | 28 +++++ flink-connector-aws-e2e-tests/pom.xml | 3 + 13 files changed, 178 insertions(+), 11 deletions(-) create mode 100644 flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/pom.xml rename {flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink => flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/sink/test}/DynamoDbSinkITCase.java (94%) rename {flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink => flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/sink/test}/TestRequestMapper.java (97%) rename {flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table => flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/table/test}/DynamoDbDynamicSinkITCase.java (96%) rename {flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util => flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils}/DockerImageVersions.java (96%) rename {flink-connector-aws/flink-connector-dynamodb => flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests}/src/test/java/org/apache/flink/connector/dynamodb/testutils/DynamoDBHelpers.java (100%) rename {flink-connector-aws/flink-connector-dynamodb => flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests}/src/test/java/org/apache/flink/connector/dynamodb/testutils/DynamoDbContainer.java (100%) rename {flink-connector-aws/flink-connector-dynamodb => flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests}/src/test/java/org/apache/flink/connector/dynamodb/testutils/Item.java (90%) rename {flink-connector-aws/flink-connector-dynamodb => flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests}/src/test/java/org/apache/flink/connector/dynamodb/testutils/Items.java (97%) rename {flink-connector-aws/flink-connector-dynamodb => flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests}/src/test/resources/create-table.sql (100%) rename {flink-connector-aws/flink-connector-dynamodb => flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests}/src/test/resources/datagen.sql (100%) create mode 100644 flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/resources/log4j2-test.properties diff --git a/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/pom.xml new file mode 100644 index 000000000..098bd7f02 --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/pom.xml @@ -0,0 +1,117 @@ + + + + + + + flink-connector-aws-e2e-tests-parent + org.apache.flink + 5.1-SNAPSHOT + + + 4.0.0 + + flink-connector-dynamodb-e2e-tests + Flink : Connectors : AWS : E2E Tests : Amazon DynamoDB + jar + + + + org.apache.flink + flink-streaming-java + ${flink.version} + test + + + + org.apache.flink + flink-table-api-java + ${flink.version} + test + + + + org.apache.flink + flink-table-runtime + ${flink.version} + test + + + + org.apache.flink + flink-table-planner-loader + ${flink.version} + test + + + + org.apache.flink + flink-connector-dynamodb + ${project.version} + test + + + + org.apache.flink + flink-connector-aws-base + ${project.version} + test + + + + org.apache.flink + flink-sql-connector-dynamodb + ${project.version} + test + + + + + com.google.guava + guava + test + + + + com.fasterxml.jackson.core + jackson-databind + test + + + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + test + + + + software.amazon.awssdk + dynamodb + test + + + + software.amazon.awssdk + dynamodb-enhanced + test + + + diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/sink/test/DynamoDbSinkITCase.java similarity index 94% rename from flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java rename to flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/sink/test/DynamoDbSinkITCase.java index dafcac86c..418e3a772 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java +++ b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/sink/test/DynamoDbSinkITCase.java @@ -16,14 +16,19 @@ * limitations under the License. */ -package org.apache.flink.connector.dynamodb.sink; +package org.apache.flink.connector.dynamodb.sink.test; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.dynamodb.sink.DynamoDbSink; +import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest; +import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType; +import org.apache.flink.connector.dynamodb.testutils.DockerImageVersions; import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers; import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer; import org.apache.flink.connector.dynamodb.testutils.Item; import org.apache.flink.connector.dynamodb.testutils.Items; -import org.apache.flink.connector.dynamodb.util.DockerImageVersions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -347,4 +352,17 @@ public Scenario withClientProperties(Properties properties) { return this; } } + + private static class TestDynamoDbElementConverter + implements ElementConverter, DynamoDbWriteRequest> { + + @Override + public DynamoDbWriteRequest apply( + Map elements, SinkWriter.Context context) { + return DynamoDbWriteRequest.builder() + .setType(DynamoDbWriteRequestType.PUT) + .setItem(elements) + .build(); + } + } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/TestRequestMapper.java b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/sink/test/TestRequestMapper.java similarity index 97% rename from flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/TestRequestMapper.java rename to flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/sink/test/TestRequestMapper.java index 27e91fce4..cc4ec7cff 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/TestRequestMapper.java +++ b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/sink/test/TestRequestMapper.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.dynamodb.sink; +package org.apache.flink.connector.dynamodb.sink.test; import org.apache.flink.api.common.functions.RichMapFunction; diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkITCase.java b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/table/test/DynamoDbDynamicSinkITCase.java similarity index 96% rename from flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkITCase.java rename to flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/table/test/DynamoDbDynamicSinkITCase.java index e1b434bdb..51dcece2d 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkITCase.java +++ b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/table/test/DynamoDbDynamicSinkITCase.java @@ -16,12 +16,13 @@ * limitations under the License. */ -package org.apache.flink.connector.dynamodb.table; +package org.apache.flink.connector.dynamodb.table.test; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.connector.dynamodb.table.DynamoDbDynamicSink; +import org.apache.flink.connector.dynamodb.testutils.DockerImageVersions; import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers; import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer; -import org.apache.flink.connector.dynamodb.util.DockerImageVersions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; @@ -49,7 +50,7 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; -/** Integration test for {@link org.apache.flink.connector.dynamodb.table.DynamoDbDynamicSink}. */ +/** Integration test for {@link DynamoDbDynamicSink}. */ @Testcontainers @ExtendWith(MiniClusterExtension.class) public class DynamoDbDynamicSinkITCase { diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DockerImageVersions.java b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/DockerImageVersions.java similarity index 96% rename from flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DockerImageVersions.java rename to flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/DockerImageVersions.java index 5fce066e0..f702d7fcb 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DockerImageVersions.java +++ b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/DockerImageVersions.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.dynamodb.util; +package org.apache.flink.connector.dynamodb.testutils; /** * Utility class for defining the image names and versions of Docker containers used during the Java diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/DynamoDBHelpers.java b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/DynamoDBHelpers.java similarity index 100% rename from flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/DynamoDBHelpers.java rename to flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/DynamoDBHelpers.java diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/DynamoDbContainer.java b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/DynamoDbContainer.java similarity index 100% rename from flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/DynamoDbContainer.java rename to flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/DynamoDbContainer.java diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/Item.java b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/Item.java similarity index 90% rename from flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/Item.java rename to flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/Item.java index 8ff0cb47f..b3a838f4e 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/Item.java +++ b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/Item.java @@ -26,15 +26,15 @@ /** DynamoDB item container. */ public class Item { - public static Item.ItemBuilder builder() { - return new Item.ItemBuilder(); + public static ItemBuilder builder() { + return new ItemBuilder(); } /** Builder to constrict DynamoDB item. */ public static final class ItemBuilder { Map item = new HashMap<>(); - public Item.ItemBuilder attr(String name, String value) { + public ItemBuilder attr(String name, String value) { item.put(name, AttributeValue.builder().s(value).build()); return this; } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/Items.java b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/Items.java similarity index 97% rename from flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/Items.java rename to flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/Items.java index 83356d2c2..337eba5c5 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/testutils/Items.java +++ b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/java/org/apache/flink/connector/dynamodb/testutils/Items.java @@ -28,7 +28,7 @@ public class Items { public static ItemsBuilder builder() { - return new Items.ItemsBuilder(); + return new ItemsBuilder(); } /** Builder to constrict DynamoDB items. */ diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/resources/create-table.sql b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/resources/create-table.sql similarity index 100% rename from flink-connector-aws/flink-connector-dynamodb/src/test/resources/create-table.sql rename to flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/resources/create-table.sql diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/resources/datagen.sql b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/resources/datagen.sql similarity index 100% rename from flink-connector-aws/flink-connector-dynamodb/src/test/resources/datagen.sql rename to flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/resources/datagen.sql diff --git a/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/resources/log4j2-test.properties b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/resources/log4j2-test.properties new file mode 100644 index 000000000..835c2ec9a --- /dev/null +++ b/flink-connector-aws-e2e-tests/flink-connector-dynamodb-e2e-tests/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/flink-connector-aws-e2e-tests/pom.xml b/flink-connector-aws-e2e-tests/pom.xml index d7227369e..bd1090dd9 100644 --- a/flink-connector-aws-e2e-tests/pom.xml +++ b/flink-connector-aws-e2e-tests/pom.xml @@ -42,6 +42,7 @@ under the License. flink-connector-aws-kinesis-streams-e2e-tests flink-connector-kinesis-e2e-tests flink-connector-aws-sqs-e2e-tests + flink-connector-dynamodb-e2e-tests flink-formats-avro-glue-schema-registry-e2e-tests flink-formats-json-glue-schema-registry-e2e-tests @@ -90,6 +91,7 @@ under the License. ${project.basedir} requires-aws-credentials + --add-opens java.base/java.util=ALL-UNNAMED @@ -121,6 +123,7 @@ under the License. ${project.basedir} + --add-opens java.base/java.util=ALL-UNNAMED