Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36651][Connectors/Elasticsearch] Fix IT test not compatible with 1.20, drop main branch tests for 1.18 #110

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,8 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [ 1.18-SNAPSHOT ]
vahmed-hamdy marked this conversation as resolved.
Show resolved Hide resolved
jdk: [ '8, 11, 17' ]
include:
- flink: 1.19-SNAPSHOT
jdk: '8, 11, 17, 21'
- flink: 1.20-SNAPSHOT
jdk: '8, 11, 17, 21'
flink: [ 1.19-SNAPSHOT, 1.20-SNAPSHOT ]
jdk: [ '8, 11, 17, 21' ]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
Expand Down
14 changes: 4 additions & 10 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,21 @@ jobs:
strategy:
matrix:
flink_branches: [{
flink: 1.18-SNAPSHOT,
jdk: '8, 11, 17',
branch: main
}, {
flink: 1.19-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
vahmed-hamdy marked this conversation as resolved.
Show resolved Hide resolved
}, {
flink: 1.20-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
}, {
flink: 1.18.1,
branch: v3.0
}, {
flink: 1.19.0,
branch: v3.0
}, {
flink: 1.20.0,
branch: main
}]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink_branches.flink }}
connector_branch: ${{ matrix.flink_branches.branch }}
jdk_version: ${{ matrix.flink_branches.jdk || '8, 11' }}
jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17, 21' }}
run_dependency_convergence: false
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.connector.elasticsearch.sink;

import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.SinkWriter.Context;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -31,14 +30,13 @@
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.function.ThrowingRunnable;

import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -329,7 +327,7 @@ private static ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
new DefaultBulkResponseInspector(),
new NetworkClientConfig(null, null, null, null, null, null, null, null),
metricGroup,
new TestMailbox());
new SyncMailboxExecutor());
}

private TestingSinkWriterMetricGroup getSinkWriterMetricGroup() {
Expand Down Expand Up @@ -481,29 +479,4 @@ GetResponse getResponse(String index, int id) throws IOException {
return client.get(new GetRequest(index, Integer.toString(id)), RequestOptions.DEFAULT);
}
}

private static class TestMailbox implements MailboxExecutor {

@Override
public void execute(
ThrowingRunnable<? extends Exception> command,
String descriptionFormat,
Object... descriptionArgs) {
try {
command.run();
} catch (Exception e) {
throw new RuntimeException("Unexpected error", e);
}
}

@Override
public void yield() throws InterruptedException, FlinkRuntimeException {
Thread.sleep(100);
}

@Override
public boolean tryYield() throws FlinkRuntimeException {
return false;
}
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ under the License.
</modules>

<properties>
<flink.version>1.18.0</flink.version>
<flink.version>1.20.0</flink.version>

<jackson-bom.version>2.15.3</jackson-bom.version>
<junit4.version>4.13.2</junit4.version>
Expand Down