From 95a81d8536c15245f8b6bddb097d87cc71abdb3a Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 29 Oct 2024 15:37:54 -0400 Subject: [PATCH 1/2] [FLINK-36633] Add support for Flink 1.20 in Flink OpenSearch connector Signed-off-by: Andriy Redko --- .github/workflows/weekly.yml | 7 +++- .../opensearch/test/DockerImageVersions.java | 4 +-- .../sink/OpensearchSinkBuilderTest.java | 22 ++----------- .../sink/OpensearchWriterITCase.java | 31 ++--------------- .../sink/Opensearch2WriterITCase.java | 31 ++--------------- pom.xml | 33 ++++++++++++------- 6 files changed, 36 insertions(+), 92 deletions(-) diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index 6db10f2..b8e7cf5 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -37,7 +37,12 @@ jobs: jdk: '8, 11, 17, 21', branch: main }, { - flink: 1.20-SNAPSHOT, + flink: 1.20-SNAPSHOT, + jdk: '8, 11, 17, 21', + branch: main + }, + { + flink: 1.20, jdk: '8, 11, 17, 21', branch: main }, { diff --git a/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java b/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java index 1ee0b2e..57bcc85 100644 --- a/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java +++ b/flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java @@ -22,6 +22,6 @@ * integration tests. */ public class DockerImageVersions { - public static final String OPENSEARCH_1 = "opensearchproject/opensearch:1.3.15"; - public static final String OPENSEARCH_2 = "opensearchproject/opensearch:2.13.0"; + public static final String OPENSEARCH_1 = "opensearchproject/opensearch:1.3.19"; + public static final String OPENSEARCH_2 = "opensearchproject/opensearch:2.17.1"; } diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java index cf35411..bb38777 100644 --- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java +++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java @@ -17,17 +17,15 @@ package org.apache.flink.connector.opensearch.sink; -import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.connector.sink2.Sink.InitContext; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.SimpleUserCodeClassLoader; import org.apache.flink.util.TestLoggerExtension; -import org.apache.flink.util.function.ThrowingRunnable; import org.apache.http.HttpHost; import org.junit.jupiter.api.DynamicTest; @@ -163,8 +161,7 @@ void testOverrideBulkResponseInspectorFactory() { TestingSinkWriterMetricGroup.getSinkWriterMetricGroup( new UnregisteredMetricsGroup())); - Mockito.when(sinkInitContext.getMailboxExecutor()) - .thenReturn(new OpensearchSinkBuilderTest.DummyMailboxExecutor()); + Mockito.when(sinkInitContext.getMailboxExecutor()).thenReturn(new SyncMailboxExecutor()); Mockito.when(sinkInitContext.getProcessingTimeService()) .thenReturn(new TestProcessingTimeService()); Mockito.when(sinkInitContext.getUserCodeClassLoader()) @@ -176,21 +173,6 @@ void testOverrideBulkResponseInspectorFactory() { assertThat(called).isTrue(); } - private static class DummyMailboxExecutor implements MailboxExecutor { - private DummyMailboxExecutor() {} - - public void execute( - ThrowingRunnable command, - String descriptionFormat, - Object... descriptionArgs) {} - - public void yield() throws InterruptedException, FlinkRuntimeException {} - - public boolean tryYield() throws FlinkRuntimeException { - return false; - } - } - private OpensearchSinkBuilder createEmptyBuilder() { return new OpensearchSinkBuilder<>(); } diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java index fa7fc16..dd7c545 100644 --- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java +++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java @@ -17,7 +17,6 @@ package org.apache.flink.connector.opensearch.sink; -import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.opensearch.OpensearchUtil; @@ -27,11 +26,10 @@ import org.apache.flink.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.TestLoggerExtension; -import org.apache.flink.util.function.ThrowingRunnable; import org.apache.http.HttpHost; import org.junit.jupiter.api.AfterEach; @@ -331,7 +329,7 @@ private OpensearchWriter> createWriter( null, true), metricGroup, - new TestMailbox(), + new SyncMailboxExecutor(), new DefaultRestClientFactory(), new DefaultBulkResponseInspector(failureHandler)); } @@ -376,29 +374,4 @@ public void emit( } } } - - private static class TestMailbox implements MailboxExecutor { - - @Override - public void execute( - ThrowingRunnable command, - String descriptionFormat, - Object... descriptionArgs) { - try { - command.run(); - } catch (Exception e) { - throw new RuntimeException("Unexpected error", e); - } - } - - @Override - public void yield() throws InterruptedException, FlinkRuntimeException { - Thread.sleep(100); - } - - @Override - public boolean tryYield() throws FlinkRuntimeException { - return false; - } - } } diff --git a/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java b/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java index 92d0c09..b092d1a 100644 --- a/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java +++ b/flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java @@ -17,7 +17,6 @@ package org.apache.flink.connector.opensearch.sink; -import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.opensearch.OpensearchUtil; @@ -27,11 +26,10 @@ import org.apache.flink.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.metrics.testutils.MetricListener; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.TestLoggerExtension; -import org.apache.flink.util.function.ThrowingRunnable; import org.apache.http.HttpHost; import org.junit.jupiter.api.AfterEach; @@ -331,7 +329,7 @@ private Opensearch2Writer> createWriter( null, true), metricGroup, - new TestMailbox(), + new SyncMailboxExecutor(), new DefaultRestClientFactory(), new DefaultBulkResponseInspector(failureHandler)); } @@ -376,29 +374,4 @@ public void emit( } } } - - private static class TestMailbox implements MailboxExecutor { - - @Override - public void execute( - ThrowingRunnable command, - String descriptionFormat, - Object... descriptionArgs) { - try { - command.run(); - } catch (Exception e) { - throw new RuntimeException("Unexpected error", e); - } - } - - @Override - public void yield() throws InterruptedException, FlinkRuntimeException { - Thread.sleep(100); - } - - @Override - public boolean tryYield() throws FlinkRuntimeException { - return false; - } - } } diff --git a/pom.xml b/pom.xml index e775f25..f351ee2 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ under the License. org.apache.flink flink-connector-parent - 1.0.0 + 1.1.0 4.0.0 @@ -55,7 +55,7 @@ under the License. - 1.18.0 + 1.20.0 1.26.1 2.16.2 5.10.2 @@ -64,7 +64,7 @@ under the License. 3.12.4 false - 1.0.0-1.16 + 1.0.0-1.19 1.7.36 2.17.2 @@ -370,18 +370,11 @@ under the License. test - - - net.bytebuddy - byte-buddy - 1.14.10 - - net.bytebuddy byte-buddy-agent - 1.14.10 + 1.14.13 @@ -409,6 +402,24 @@ under the License. flink-sql-connector-opensearch2 + + + spotless + + [21,) + + + + + com.diffplug.spotless + spotless-maven-plugin + + true + + + + + From 3086eb83ed68d896fab3878285ecf7989c522972 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 30 Oct 2024 13:54:56 -0400 Subject: [PATCH 2/2] Address code review comments Signed-off-by: Andriy Redko --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index f351ee2..3ce4108 100644 --- a/pom.xml +++ b/pom.xml @@ -300,7 +300,7 @@ under the License. net.bytebuddy byte-buddy - 1.14.13 + 1.14.19 @@ -374,7 +374,7 @@ under the License. net.bytebuddy byte-buddy-agent - 1.14.13 + 1.14.19