Skip to content

Commit b6c4291

Browse files
committed
[hotfix] Update to support Flink 1.20.0
Signed-off-by: Andriy Redko <[email protected]>
1 parent 926c127 commit b6c4291

File tree

6 files changed

+17
-84
lines changed

6 files changed

+17
-84
lines changed

.github/workflows/weekly.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@ jobs:
3737
jdk: '8, 11, 17, 21',
3838
branch: main
3939
}, {
40-
flink: 1.20-SNAPSHOT,
40+
flink: 1.20-SNAPSHOT,
41+
jdk: '8, 11, 17, 21',
42+
branch: main
43+
},
44+
{
45+
flink: 1.20,
4146
jdk: '8, 11, 17, 21',
4247
branch: main
4348
}, {

flink-connector-opensearch-base/src/test/java/org/apache/flink/connector/opensearch/test/DockerImageVersions.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,6 @@
2222
* integration tests.
2323
*/
2424
public class DockerImageVersions {
25-
public static final String OPENSEARCH_1 = "opensearchproject/opensearch:1.3.15";
26-
public static final String OPENSEARCH_2 = "opensearchproject/opensearch:2.13.0";
25+
public static final String OPENSEARCH_1 = "opensearchproject/opensearch:1.3.19";
26+
public static final String OPENSEARCH_2 = "opensearchproject/opensearch:2.17.1";
2727
}

flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,15 @@
1717

1818
package org.apache.flink.connector.opensearch.sink;
1919

20-
import org.apache.flink.api.common.operators.MailboxExecutor;
2120
import org.apache.flink.api.connector.sink2.Sink.InitContext;
2221
import org.apache.flink.connector.base.DeliveryGuarantee;
2322
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
2423
import org.apache.flink.metrics.MetricGroup;
2524
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
25+
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
2626
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
27-
import org.apache.flink.util.FlinkRuntimeException;
2827
import org.apache.flink.util.SimpleUserCodeClassLoader;
2928
import org.apache.flink.util.TestLoggerExtension;
30-
import org.apache.flink.util.function.ThrowingRunnable;
3129

3230
import org.apache.http.HttpHost;
3331
import org.junit.jupiter.api.DynamicTest;
@@ -163,8 +161,7 @@ void testOverrideBulkResponseInspectorFactory() {
163161
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
164162
new UnregisteredMetricsGroup()));
165163

166-
Mockito.when(sinkInitContext.getMailboxExecutor())
167-
.thenReturn(new OpensearchSinkBuilderTest.DummyMailboxExecutor());
164+
Mockito.when(sinkInitContext.getMailboxExecutor()).thenReturn(new SyncMailboxExecutor());
168165
Mockito.when(sinkInitContext.getProcessingTimeService())
169166
.thenReturn(new TestProcessingTimeService());
170167
Mockito.when(sinkInitContext.getUserCodeClassLoader())
@@ -176,21 +173,6 @@ void testOverrideBulkResponseInspectorFactory() {
176173
assertThat(called).isTrue();
177174
}
178175

179-
private static class DummyMailboxExecutor implements MailboxExecutor {
180-
private DummyMailboxExecutor() {}
181-
182-
public void execute(
183-
ThrowingRunnable<? extends Exception> command,
184-
String descriptionFormat,
185-
Object... descriptionArgs) {}
186-
187-
public void yield() throws InterruptedException, FlinkRuntimeException {}
188-
189-
public boolean tryYield() throws FlinkRuntimeException {
190-
return false;
191-
}
192-
}
193-
194176
private OpensearchSinkBuilder<Object> createEmptyBuilder() {
195177
return new OpensearchSinkBuilder<>();
196178
}

flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.connector.opensearch.sink;
1919

20-
import org.apache.flink.api.common.operators.MailboxExecutor;
2120
import org.apache.flink.api.connector.sink2.SinkWriter;
2221
import org.apache.flink.api.java.tuple.Tuple2;
2322
import org.apache.flink.connector.opensearch.OpensearchUtil;
@@ -27,11 +26,10 @@
2726
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
2827
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
2928
import org.apache.flink.metrics.testutils.MetricListener;
29+
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
3030
import org.apache.flink.runtime.metrics.MetricNames;
3131
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
32-
import org.apache.flink.util.FlinkRuntimeException;
3332
import org.apache.flink.util.TestLoggerExtension;
34-
import org.apache.flink.util.function.ThrowingRunnable;
3533

3634
import org.apache.http.HttpHost;
3735
import org.junit.jupiter.api.AfterEach;
@@ -331,7 +329,7 @@ private OpensearchWriter<Tuple2<Integer, String>> createWriter(
331329
null,
332330
true),
333331
metricGroup,
334-
new TestMailbox(),
332+
new SyncMailboxExecutor(),
335333
new DefaultRestClientFactory(),
336334
new DefaultBulkResponseInspector(failureHandler));
337335
}
@@ -376,29 +374,4 @@ public void emit(
376374
}
377375
}
378376
}
379-
380-
private static class TestMailbox implements MailboxExecutor {
381-
382-
@Override
383-
public void execute(
384-
ThrowingRunnable<? extends Exception> command,
385-
String descriptionFormat,
386-
Object... descriptionArgs) {
387-
try {
388-
command.run();
389-
} catch (Exception e) {
390-
throw new RuntimeException("Unexpected error", e);
391-
}
392-
}
393-
394-
@Override
395-
public void yield() throws InterruptedException, FlinkRuntimeException {
396-
Thread.sleep(100);
397-
}
398-
399-
@Override
400-
public boolean tryYield() throws FlinkRuntimeException {
401-
return false;
402-
}
403-
}
404377
}

flink-connector-opensearch2/src/test/java/org/apache/flink/connector/opensearch/sink/Opensearch2WriterITCase.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.connector.opensearch.sink;
1919

20-
import org.apache.flink.api.common.operators.MailboxExecutor;
2120
import org.apache.flink.api.connector.sink2.SinkWriter;
2221
import org.apache.flink.api.java.tuple.Tuple2;
2322
import org.apache.flink.connector.opensearch.OpensearchUtil;
@@ -27,11 +26,10 @@
2726
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
2827
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
2928
import org.apache.flink.metrics.testutils.MetricListener;
29+
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
3030
import org.apache.flink.runtime.metrics.MetricNames;
3131
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
32-
import org.apache.flink.util.FlinkRuntimeException;
3332
import org.apache.flink.util.TestLoggerExtension;
34-
import org.apache.flink.util.function.ThrowingRunnable;
3533

3634
import org.apache.http.HttpHost;
3735
import org.junit.jupiter.api.AfterEach;
@@ -331,7 +329,7 @@ private Opensearch2Writer<Tuple2<Integer, String>> createWriter(
331329
null,
332330
true),
333331
metricGroup,
334-
new TestMailbox(),
332+
new SyncMailboxExecutor(),
335333
new DefaultRestClientFactory(),
336334
new DefaultBulkResponseInspector(failureHandler));
337335
}
@@ -376,29 +374,4 @@ public void emit(
376374
}
377375
}
378376
}
379-
380-
private static class TestMailbox implements MailboxExecutor {
381-
382-
@Override
383-
public void execute(
384-
ThrowingRunnable<? extends Exception> command,
385-
String descriptionFormat,
386-
Object... descriptionArgs) {
387-
try {
388-
command.run();
389-
} catch (Exception e) {
390-
throw new RuntimeException("Unexpected error", e);
391-
}
392-
}
393-
394-
@Override
395-
public void yield() throws InterruptedException, FlinkRuntimeException {
396-
Thread.sleep(100);
397-
}
398-
399-
@Override
400-
public boolean tryYield() throws FlinkRuntimeException {
401-
return false;
402-
}
403-
}
404377
}

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ under the License.
2222
<parent>
2323
<groupId>org.apache.flink</groupId>
2424
<artifactId>flink-connector-parent</artifactId>
25-
<version>1.0.0</version>
25+
<version>1.1.0</version>
2626
</parent>
2727

2828
<modelVersion>4.0.0</modelVersion>
@@ -55,7 +55,7 @@ under the License.
5555
</modules>
5656

5757
<properties>
58-
<flink.version>1.18.0</flink.version>
58+
<flink.version>1.20.0</flink.version>
5959
<commons-compress.version>1.26.1</commons-compress.version>
6060
<jackson-bom.version>2.16.2</jackson-bom.version>
6161
<junit5.version>5.10.2</junit5.version>
@@ -64,7 +64,7 @@ under the License.
6464
<mockito.version>3.12.4</mockito.version>
6565

6666
<japicmp.skip>false</japicmp.skip>
67-
<japicmp.referenceVersion>1.0.0-1.16</japicmp.referenceVersion>
67+
<japicmp.referenceVersion>1.0.0-1.19</japicmp.referenceVersion>
6868

6969
<slf4j.version>1.7.36</slf4j.version>
7070
<log4j.version>2.17.2</log4j.version>

0 commit comments

Comments
 (0)