Skip to content

Commit 3f066ed

Browse files
committed
[FLINK-35109] Bump to Flink 1.19 and support Flink 1.20
Also - adds the migration support tests up to 1.20. - bumps Kafka-client to 3.6.2
1 parent 9b97c51 commit 3f066ed

File tree

41 files changed

+297
-66
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+297
-66
lines changed

Diff for: .github/workflows/push_pr.yml

+4-6
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,10 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: [ 1.17.2 ]
32-
jdk: [ '8, 11' ]
31+
flink: [ 1.19.1 ]
32+
jdk: [ '8, 11, 17, 21' ]
3333
include:
34-
- flink: 1.18.1
35-
jdk: '8, 11, 17'
36-
- flink: 1.19.0
34+
- flink: 1.20.0
3735
jdk: '8, 11, 17, 21'
3836
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3937
with:
@@ -42,7 +40,7 @@ jobs:
4240
python_test:
4341
strategy:
4442
matrix:
45-
flink: [ 1.17.2, 1.18.1, 1.19.0 ]
43+
flink: [ 1.19.1, 1.20.0 ]
4644
uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
4745
with:
4846
flink_version: ${{ matrix.flink }}

Diff for: .github/workflows/weekly.yml

+5-19
Original file line numberDiff line numberDiff line change
@@ -30,35 +30,21 @@ jobs:
3030
strategy:
3131
matrix:
3232
flink_branches: [{
33-
flink: 1.17-SNAPSHOT,
34-
branch: main
35-
}, {
36-
flink: 1.18-SNAPSHOT,
37-
jdk: '8, 11, 17',
38-
branch: main
39-
}, {
4033
flink: 1.19-SNAPSHOT,
41-
jdk: '8, 11, 17, 21',
4234
branch: main
4335
}, {
4436
flink: 1.20-SNAPSHOT,
45-
jdk: '8, 11, 17, 21',
4637
branch: main
4738
}, {
48-
flink: 1.17.2,
49-
branch: v3.1
50-
}, {
51-
flink: 1.18.1,
52-
jdk: '8, 11, 17',
53-
branch: v3.1
39+
flink: 1.19.1,
40+
branch: v3.2
5441
}, {
55-
flink: 1.19.0,
56-
branch: v3.1,
57-
jdk: '8, 11, 17, 21',
42+
flink: 1.20.0,
43+
branch: v3.2
5844
}]
5945
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
6046
with:
6147
flink_version: ${{ matrix.flink_branches.flink }}
6248
connector_branch: ${{ matrix.flink_branches.branch }}
63-
jdk_version: ${{ matrix.flink_branches.jdk || '8, 11' }}
49+
jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17, 21' }}
6450
run_dependency_convergence: false

Diff for: flink-connector-kafka/pom.xml

+7-5
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ under the License.
3939
FlinkKafkaProducerBaseTest --> --add-opens=java.base/java.lang=ALL-UNNAMED <!--
4040
FlinkKafkaProducerBaseTest --> --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED <!--
4141
FlinkKafkaConsumerBaseTest --> --add-opens=java.base/java.util=ALL-UNNAMED <!--
42-
KafkaProducerExactlyOnceITCase --> --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED </flink.connector.module.config>
42+
KafkaProducerExactlyOnceITCase --> --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
43+
</flink.connector.module.config>
4344
</properties>
4445

4546
<dependencies>
@@ -81,10 +82,10 @@ under the License.
8182
<version>${kafka.version}</version>
8283
</dependency>
8384

84-
<dependency>
85-
<groupId>com.google.guava</groupId>
86-
<artifactId>guava</artifactId>
87-
</dependency>
85+
<dependency>
86+
<groupId>com.google.guava</groupId>
87+
<artifactId>guava</artifactId>
88+
</dependency>
8889

8990
<!-- Tests -->
9091

@@ -211,6 +212,7 @@ under the License.
211212
<version>${flink.version}</version>
212213
<scope>test</scope>
213214
</dependency>
215+
214216
<dependency>
215217
<groupId>org.apache.flink</groupId>
216218
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>

Diff for: flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
9292

9393
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
9494
public static Collection<FlinkVersion> parameters() {
95-
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16);
95+
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.current());
9696
}
9797

9898
public FlinkKafkaConsumerBaseMigrationTest(FlinkVersion testMigrateVersion) {

Diff for: flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase {
4343
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
4444
public static Collection<FlinkVersion> parameters() {
45-
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16);
45+
return FlinkVersion.rangeOf(FlinkVersion.v1_12, FlinkVersion.current());
4646
}
4747

4848
public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {

Diff for: flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerUpgradeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import org.apache.flink.FlinkVersion;
2222
import org.apache.flink.api.common.typeutils.TypeSerializer;
23-
import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
2423
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
2524
import org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer;
25+
import org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerMatchers;
2626
import org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerUpgradeTestBase;
2727

2828
import org.hamcrest.Matcher;

Diff for: flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java

-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.flink.table.api.TableConfig;
3030
import org.apache.flink.table.api.TableResult;
3131
import org.apache.flink.table.api.config.ExecutionConfigOptions;
32-
import org.apache.flink.table.api.config.OptimizerConfigOptions;
3332

3433
import org.apache.kafka.clients.producer.ProducerConfig;
3534
import org.junit.Before;
@@ -65,7 +64,6 @@ public void testKafkaDebeziumChangelogSource() throws Exception {
6564
tableConf.set(
6665
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
6766
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
68-
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
6967

7068
// ---------- Write the Debezium json into Kafka -------------------
7169
List<String> lines = readLines("debezium-data-schema-exclude.txt");
@@ -194,7 +192,6 @@ public void testKafkaCanalChangelogSource() throws Exception {
194192
tableConf.set(
195193
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
196194
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
197-
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
198195

199196
// ---------- Write the Canal json into Kafka -------------------
200197
List<String> lines = readLines("canal-data.txt");
@@ -335,7 +332,6 @@ public void testKafkaMaxwellChangelogSource() throws Exception {
335332
tableConf.set(
336333
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
337334
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
338-
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
339335

340336
// ---------- Write the Maxwell json into Kafka -------------------
341337
List<String> lines = readLines("maxwell-data.txt");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.testutils;
20+
21+
import java.io.Closeable;
22+
23+
/**
24+
* Utility class to temporarily use a different classloader as the thread context classloader.
25+
*
26+
* <p>Temporarily copied from flink-core to avoid dependency on flink-core.
27+
*/
28+
public class ThreadContextClassLoader implements Closeable {
29+
30+
private final ClassLoader originalThreadContextClassLoader;
31+
32+
public ThreadContextClassLoader(ClassLoader newThreadContextClassLoader) {
33+
this.originalThreadContextClassLoader = Thread.currentThread().getContextClassLoader();
34+
Thread.currentThread().setContextClassLoader(newThreadContextClassLoader);
35+
}
36+
37+
@Override
38+
public void close() {
39+
Thread.currentThread().setContextClassLoader(originalThreadContextClassLoader);
40+
}
41+
}

0 commit comments

Comments
 (0)