Skip to content

Commit 8c266e8

Browse files
committed
[FLINK-35109] Bump to Flink 1.19 and support Flink 1.20
Also - adds the migration support tests up to 1.20. - bumps Kafka-client to 3.6.2
1 parent 01dafc1 commit 8c266e8

File tree

41 files changed

+286
-54
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+286
-54
lines changed

.github/workflows/push_pr.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ jobs:
2828
compile_and_test:
2929
strategy:
3030
matrix:
31-
flink: [ 1.18.1 ]
32-
jdk: [ '8, 11, 17' ]
31+
flink: [ 1.19.1 ]
32+
jdk: [ '8, 11, 17, 21' ]
3333
include:
34-
- flink: 1.19.0
34+
- flink: 1.20.0
3535
jdk: '8, 11, 17, 21'
3636
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
3737
with:
@@ -40,7 +40,7 @@ jobs:
4040
python_test:
4141
strategy:
4242
matrix:
43-
flink: [ 1.18.1, 1.19.0 ]
43+
flink: [ 1.19.1, 1.20.0 ]
4444
uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
4545
with:
4646
flink_version: ${{ matrix.flink }}

.github/workflows/weekly.yml

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,34 +30,21 @@ jobs:
3030
strategy:
3131
matrix:
3232
flink_branches: [{
33-
flink: 1.18-SNAPSHOT,
34-
branch: main
35-
}, {
3633
flink: 1.19-SNAPSHOT,
37-
jdk: '8, 11, 17, 21',
3834
branch: main
3935
}, {
4036
flink: 1.20-SNAPSHOT,
41-
jdk: '8, 11, 17, 21',
4237
branch: main
4338
}, {
44-
flink: 1.18.1,
39+
flink: 1.19.1,
4540
branch: v3.2
4641
}, {
47-
flink: 1.19.0,
48-
branch: v3.2,
49-
jdk: '8, 11, 17, 21',
50-
}, {
51-
flink: 1.18.1,
52-
branch: v3.1
53-
}, {
54-
flink: 1.19.0,
55-
branch: v3.1,
56-
jdk: '8, 11, 17, 21',
42+
flink: 1.20.0,
43+
branch: v3.2
5744
}]
5845
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
5946
with:
6047
flink_version: ${{ matrix.flink_branches.flink }}
6148
connector_branch: ${{ matrix.flink_branches.branch }}
62-
jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17' }}
49+
jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17, 21' }}
6350
run_dependency_convergence: false

flink-connector-kafka/pom.xml

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ under the License.
3939
FlinkKafkaProducerBaseTest --> --add-opens=java.base/java.lang=ALL-UNNAMED <!--
4040
FlinkKafkaProducerBaseTest --> --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED <!--
4141
FlinkKafkaConsumerBaseTest --> --add-opens=java.base/java.util=ALL-UNNAMED <!--
42-
KafkaProducerExactlyOnceITCase --> --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED </flink.connector.module.config>
42+
KafkaProducerExactlyOnceITCase --> --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
43+
</flink.connector.module.config>
4344
</properties>
4445

4546
<dependencies>
@@ -81,10 +82,10 @@ under the License.
8182
<version>${kafka.version}</version>
8283
</dependency>
8384

84-
<dependency>
85-
<groupId>com.google.guava</groupId>
86-
<artifactId>guava</artifactId>
87-
</dependency>
85+
<dependency>
86+
<groupId>com.google.guava</groupId>
87+
<artifactId>guava</artifactId>
88+
</dependency>
8889

8990
<!-- Tests -->
9091

@@ -211,6 +212,7 @@ under the License.
211212
<version>${flink.version}</version>
212213
<scope>test</scope>
213214
</dependency>
215+
214216
<dependency>
215217
<groupId>org.apache.flink</groupId>
216218
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
9292

9393
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
9494
public static Collection<FlinkVersion> parameters() {
95-
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16);
95+
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.current());
9696
}
9797

9898
public FlinkKafkaConsumerBaseMigrationTest(FlinkVersion testMigrateVersion) {

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
public class FlinkKafkaProducerMigrationTest extends KafkaMigrationTestBase {
4343
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
4444
public static Collection<FlinkVersion> parameters() {
45-
return FlinkVersion.rangeOf(FlinkVersion.v1_8, FlinkVersion.v1_16);
45+
return FlinkVersion.rangeOf(FlinkVersion.v1_12, FlinkVersion.current());
4646
}
4747

4848
public FlinkKafkaProducerMigrationTest(FlinkVersion testMigrateVersion) {

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializerUpgradeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import org.apache.flink.FlinkVersion;
2222
import org.apache.flink.api.common.typeutils.TypeSerializer;
23-
import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
2423
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
2524
import org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer;
25+
import org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerMatchers;
2626
import org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerUpgradeTestBase;
2727

2828
import org.hamcrest.Matcher;

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.flink.table.api.TableConfig;
3030
import org.apache.flink.table.api.TableResult;
3131
import org.apache.flink.table.api.config.ExecutionConfigOptions;
32-
import org.apache.flink.table.api.config.OptimizerConfigOptions;
3332

3433
import org.apache.kafka.clients.producer.ProducerConfig;
3534
import org.junit.Before;
@@ -65,7 +64,6 @@ public void testKafkaDebeziumChangelogSource() throws Exception {
6564
tableConf.set(
6665
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
6766
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
68-
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
6967

7068
// ---------- Write the Debezium json into Kafka -------------------
7169
List<String> lines = readLines("debezium-data-schema-exclude.txt");
@@ -194,7 +192,6 @@ public void testKafkaCanalChangelogSource() throws Exception {
194192
tableConf.set(
195193
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
196194
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
197-
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
198195

199196
// ---------- Write the Canal json into Kafka -------------------
200197
List<String> lines = readLines("canal-data.txt");
@@ -335,7 +332,6 @@ public void testKafkaMaxwellChangelogSource() throws Exception {
335332
tableConf.set(
336333
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
337334
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
338-
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
339335

340336
// ---------- Write the Maxwell json into Kafka -------------------
341337
List<String> lines = readLines("maxwell-data.txt");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.streaming.connectors.kafka.testutils;
20+
21+
import java.io.Closeable;
22+
23+
/**
24+
* Utility class to temporarily use a different classloader as the thread context classloader.
25+
*
26+
* <p>Temporarily copied from flink-core to avoid dependency on flink-core.
27+
*/
28+
public class ThreadContextClassLoader implements Closeable {
29+
30+
private final ClassLoader originalThreadContextClassLoader;
31+
32+
public ThreadContextClassLoader(ClassLoader newThreadContextClassLoader) {
33+
this.originalThreadContextClassLoader = Thread.currentThread().getContextClassLoader();
34+
Thread.currentThread().setContextClassLoader(newThreadContextClassLoader);
35+
}
36+
37+
@Override
38+
public void close() {
39+
Thread.currentThread().setContextClassLoader(originalThreadContextClassLoader);
40+
}
41+
}

0 commit comments

Comments
 (0)