Skip to content

Commit a154eee

Browse files
committed
[hotfix] Fix compatible issue with 1.20
1 parent 7ada6f3 commit a154eee

File tree

1 file changed

+2
-29
lines changed

1 file changed

+2
-29
lines changed

flink-connector-elasticsearch-base/src/test/java/org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.connector.elasticsearch.sink;
1919

20-
import org.apache.flink.api.common.operators.MailboxExecutor;
2120
import org.apache.flink.api.connector.sink2.SinkWriter;
2221
import org.apache.flink.api.connector.sink2.SinkWriter.Context;
2322
import org.apache.flink.api.java.tuple.Tuple2;
@@ -31,14 +30,13 @@
3130
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
3231
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
3332
import org.apache.flink.metrics.testutils.MetricListener;
33+
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
3434
import org.apache.flink.runtime.metrics.MetricNames;
3535
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
3636
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
3737
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
3838
import org.apache.flink.test.junit5.MiniClusterExtension;
39-
import org.apache.flink.util.FlinkRuntimeException;
4039
import org.apache.flink.util.TestLoggerExtension;
41-
import org.apache.flink.util.function.ThrowingRunnable;
4240

4341
import org.apache.http.HttpHost;
4442
import org.elasticsearch.action.ActionListener;
@@ -329,7 +327,7 @@ private static ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
329327
new DefaultBulkResponseInspector(),
330328
new NetworkClientConfig(null, null, null, null, null, null, null, null),
331329
metricGroup,
332-
new TestMailbox());
330+
new SyncMailboxExecutor());
333331
}
334332

335333
private TestingSinkWriterMetricGroup getSinkWriterMetricGroup() {
@@ -481,29 +479,4 @@ GetResponse getResponse(String index, int id) throws IOException {
481479
return client.get(new GetRequest(index, Integer.toString(id)), RequestOptions.DEFAULT);
482480
}
483481
}
484-
485-
private static class TestMailbox implements MailboxExecutor {
486-
487-
@Override
488-
public void execute(
489-
ThrowingRunnable<? extends Exception> command,
490-
String descriptionFormat,
491-
Object... descriptionArgs) {
492-
try {
493-
command.run();
494-
} catch (Exception e) {
495-
throw new RuntimeException("Unexpected error", e);
496-
}
497-
}
498-
499-
@Override
500-
public void yield() throws InterruptedException, FlinkRuntimeException {
501-
Thread.sleep(100);
502-
}
503-
504-
@Override
505-
public boolean tryYield() throws FlinkRuntimeException {
506-
return false;
507-
}
508-
}
509482
}

0 commit comments

Comments
 (0)