Skip to content

Commit f980346

Browse files
authored
[FLINK-36633] Add support for Flink 1.20 in Flink OpenSearch connector
Signed-off-by: Andriy Redko <[email protected]>
1 parent 926c127 commit f980346

File tree

6 files changed

+37
-93
lines changed

6 files changed

+37
-93
lines changed

.github/workflows/weekly.yml

+6-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@ jobs:
3737
jdk: '8, 11, 17, 21',
3838
branch: main
3939
}, {
40-
flink: 1.20-SNAPSHOT,
40+
flink: 1.20-SNAPSHOT,
41+
jdk: '8, 11, 17, 21',
42+
branch: main
43+
},
44+
{
45+
flink: 1.20,
4146
jdk: '8, 11, 17, 21',
4247
branch: main
4348
}, {

flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,6 @@
2222
* integration tests.
2323
*/
2424
public class DockerImageVersions {
25-
public static final String OPENSEARCH_1 = "opensearchproject/opensearch:1.3.15";
26-
public static final String OPENSEARCH_2 = "opensearchproject/opensearch:2.13.0";
25+
public static final String OPENSEARCH_1 = "opensearchproject/opensearch:1.3.19";
26+
public static final String OPENSEARCH_2 = "opensearchproject/opensearch:2.17.1";
2727
}

flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java

+2-20
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,15 @@
1717

1818
package org.apache.flink.connector.opensearch.sink;
1919

20-
import org.apache.flink.api.common.operators.MailboxExecutor;
2120
import org.apache.flink.api.connector.sink2.Sink.InitContext;
2221
import org.apache.flink.connector.base.DeliveryGuarantee;
2322
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
2423
import org.apache.flink.metrics.MetricGroup;
2524
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
25+
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
2626
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
27-
import org.apache.flink.util.FlinkRuntimeException;
2827
import org.apache.flink.util.SimpleUserCodeClassLoader;
2928
import org.apache.flink.util.TestLoggerExtension;
30-
import org.apache.flink.util.function.ThrowingRunnable;
3129

3230
import org.apache.http.HttpHost;
3331
import org.junit.jupiter.api.DynamicTest;
@@ -163,8 +161,7 @@ void testOverrideBulkResponseInspectorFactory() {
163161
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
164162
new UnregisteredMetricsGroup()));
165163

166-
Mockito.when(sinkInitContext.getMailboxExecutor())
167-
.thenReturn(new OpensearchSinkBuilderTest.DummyMailboxExecutor());
164+
Mockito.when(sinkInitContext.getMailboxExecutor()).thenReturn(new SyncMailboxExecutor());
168165
Mockito.when(sinkInitContext.getProcessingTimeService())
169166
.thenReturn(new TestProcessingTimeService());
170167
Mockito.when(sinkInitContext.getUserCodeClassLoader())
@@ -176,21 +173,6 @@ void testOverrideBulkResponseInspectorFactory() {
176173
assertThat(called).isTrue();
177174
}
178175

179-
private static class DummyMailboxExecutor implements MailboxExecutor {
180-
private DummyMailboxExecutor() {}
181-
182-
public void execute(
183-
ThrowingRunnable<? extends Exception> command,
184-
String descriptionFormat,
185-
Object... descriptionArgs) {}
186-
187-
public void yield() throws InterruptedException, FlinkRuntimeException {}
188-
189-
public boolean tryYield() throws FlinkRuntimeException {
190-
return false;
191-
}
192-
}
193-
194176
private OpensearchSinkBuilder<Object> createEmptyBuilder() {
195177
return new OpensearchSinkBuilder<>();
196178
}

flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java

+2-29
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.connector.opensearch.sink;
1919

20-
import org.apache.flink.api.common.operators.MailboxExecutor;
2120
import org.apache.flink.api.connector.sink2.SinkWriter;
2221
import org.apache.flink.api.java.tuple.Tuple2;
2322
import org.apache.flink.connector.opensearch.OpensearchUtil;
@@ -27,11 +26,10 @@
2726
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
2827
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
2928
import org.apache.flink.metrics.testutils.MetricListener;
29+
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
3030
import org.apache.flink.runtime.metrics.MetricNames;
3131
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
32-
import org.apache.flink.util.FlinkRuntimeException;
3332
import org.apache.flink.util.TestLoggerExtension;
34-
import org.apache.flink.util.function.ThrowingRunnable;
3533

3634
import org.apache.http.HttpHost;
3735
import org.junit.jupiter.api.AfterEach;
@@ -331,7 +329,7 @@ private OpensearchWriter<Tuple2<Integer, String>> createWriter(
331329
null,
332330
true),
333331
metricGroup,
334-
new TestMailbox(),
332+
new SyncMailboxExecutor(),
335333
new DefaultRestClientFactory(),
336334
new DefaultBulkResponseInspector(failureHandler));
337335
}
@@ -376,29 +374,4 @@ public void emit(
376374
}
377375
}
378376
}
379-
380-
private static class TestMailbox implements MailboxExecutor {
381-
382-
@Override
383-
public void execute(
384-
ThrowingRunnable<? extends Exception> command,
385-
String descriptionFormat,
386-
Object... descriptionArgs) {
387-
try {
388-
command.run();
389-
} catch (Exception e) {
390-
throw new RuntimeException("Unexpected error", e);
391-
}
392-
}
393-
394-
@Override
395-
public void yield() throws InterruptedException, FlinkRuntimeException {
396-
Thread.sleep(100);
397-
}
398-
399-
@Override
400-
public boolean tryYield() throws FlinkRuntimeException {
401-
return false;
402-
}
403-
}
404377
}

flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java

+2-29
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.connector.opensearch.sink;
1919

20-
import org.apache.flink.api.common.operators.MailboxExecutor;
2120
import org.apache.flink.api.connector.sink2.SinkWriter;
2221
import org.apache.flink.api.java.tuple.Tuple2;
2322
import org.apache.flink.connector.opensearch.OpensearchUtil;
@@ -27,11 +26,10 @@
2726
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
2827
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
2928
import org.apache.flink.metrics.testutils.MetricListener;
29+
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
3030
import org.apache.flink.runtime.metrics.MetricNames;
3131
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
32-
import org.apache.flink.util.FlinkRuntimeException;
3332
import org.apache.flink.util.TestLoggerExtension;
34-
import org.apache.flink.util.function.ThrowingRunnable;
3533

3634
import org.apache.http.HttpHost;
3735
import org.junit.jupiter.api.AfterEach;
@@ -331,7 +329,7 @@ private Opensearch2Writer<Tuple2<Integer, String>> createWriter(
331329
null,
332330
true),
333331
metricGroup,
334-
new TestMailbox(),
332+
new SyncMailboxExecutor(),
335333
new DefaultRestClientFactory(),
336334
new DefaultBulkResponseInspector(failureHandler));
337335
}
@@ -376,29 +374,4 @@ public void emit(
376374
}
377375
}
378376
}
379-
380-
private static class TestMailbox implements MailboxExecutor {
381-
382-
@Override
383-
public void execute(
384-
ThrowingRunnable<? extends Exception> command,
385-
String descriptionFormat,
386-
Object... descriptionArgs) {
387-
try {
388-
command.run();
389-
} catch (Exception e) {
390-
throw new RuntimeException("Unexpected error", e);
391-
}
392-
}
393-
394-
@Override
395-
public void yield() throws InterruptedException, FlinkRuntimeException {
396-
Thread.sleep(100);
397-
}
398-
399-
@Override
400-
public boolean tryYield() throws FlinkRuntimeException {
401-
return false;
402-
}
403-
}
404377
}

pom.xml

+23-12
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ under the License.
2222
<parent>
2323
<groupId>org.apache.flink</groupId>
2424
<artifactId>flink-connector-parent</artifactId>
25-
<version>1.0.0</version>
25+
<version>1.1.0</version>
2626
</parent>
2727

2828
<modelVersion>4.0.0</modelVersion>
@@ -55,7 +55,7 @@ under the License.
5555
</modules>
5656

5757
<properties>
58-
<flink.version>1.18.0</flink.version>
58+
<flink.version>1.20.0</flink.version>
5959
<commons-compress.version>1.26.1</commons-compress.version>
6060
<jackson-bom.version>2.16.2</jackson-bom.version>
6161
<junit5.version>5.10.2</junit5.version>
@@ -64,7 +64,7 @@ under the License.
6464
<mockito.version>3.12.4</mockito.version>
6565

6666
<japicmp.skip>false</japicmp.skip>
67-
<japicmp.referenceVersion>1.0.0-1.16</japicmp.referenceVersion>
67+
<japicmp.referenceVersion>1.0.0-1.19</japicmp.referenceVersion>
6868

6969
<slf4j.version>1.7.36</slf4j.version>
7070
<log4j.version>2.17.2</log4j.version>
@@ -300,7 +300,7 @@ under the License.
300300
<dependency>
301301
<groupId>net.bytebuddy</groupId>
302302
<artifactId>byte-buddy</artifactId>
303-
<version>1.14.13</version>
303+
<version>1.14.19</version>
304304
</dependency>
305305

306306
<!-- For dependency convergence -->
@@ -370,18 +370,11 @@ under the License.
370370
<scope>test</scope>
371371
</dependency>
372372

373-
<dependency>
374-
<!-- mockito/powermock mismatch -->
375-
<groupId>net.bytebuddy</groupId>
376-
<artifactId>byte-buddy</artifactId>
377-
<version>1.14.10</version>
378-
</dependency>
379-
380373
<dependency>
381374
<!-- mockito/powermock mismatch -->
382375
<groupId>net.bytebuddy</groupId>
383376
<artifactId>byte-buddy-agent</artifactId>
384-
<version>1.14.10</version>
377+
<version>1.14.19</version>
385378
</dependency>
386379
</dependencies>
387380
</dependencyManagement>
@@ -409,6 +402,24 @@ under the License.
409402
<module>flink-sql-connector-opensearch2</module>
410403
</modules>
411404
</profile>
405+
<!-- The spotless comes from flink-connector-parent and needs version bump to support JDK-21 -->
406+
<profile>
407+
<id>spotless</id>
408+
<activation>
409+
<jdk>[21,)</jdk>
410+
</activation>
411+
<build>
412+
<plugins>
413+
<plugin>
414+
<groupId>com.diffplug.spotless</groupId>
415+
<artifactId>spotless-maven-plugin</artifactId>
416+
<configuration>
417+
<skip>true</skip>
418+
</configuration>
419+
</plugin>
420+
</plugins>
421+
</build>
422+
</profile>
412423
</profiles>
413424

414425
<build>

0 commit comments

Comments
 (0)