Skip to content

Commit 947dffc

Browse files
[FLINK-37122][build] Try to be compatible with old flink version 1.18.x
This closes #3859
1 parent 7997f51 commit 947dffc

File tree

9 files changed

+23
-21
lines changed

9 files changed

+23
-21
lines changed

docs/content.zh/docs/get-started/quickstart/mysql-to-doris.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ under the License.
3333
准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。
3434

3535
### 准备 Flink Standalone 集群
36-
1. 下载 [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz),解压后得到 flink-1.18.0 目录。
37-
使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
36+
1. 下载 [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz),解压后得到 flink-1.19.1 目录。
37+
使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.19.1 所在目录。
3838

3939
```shell
40-
cd flink-1.18.0
40+
cd flink-1.19.1
4141
```
4242

4343
2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。
@@ -338,7 +338,7 @@ Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,
338338
```shell
339339
docker-compose down
340340
```
341-
在 Flink 所在目录 `flink-1.18.0` 下执行如下命令停止 Flink 集群:
341+
在 Flink 所在目录 `flink-1.19.1` 下执行如下命令停止 Flink 集群:
342342

343343
```shell
344344
./bin/stop-cluster.sh

docs/content.zh/docs/get-started/quickstart/mysql-to-starrocks.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ under the License.
3333
准备一台已经安装了 Docker 的 Linux 或者 MacOS 电脑。
3434

3535
### 准备 Flink Standalone 集群
36-
1. 下载 [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) ,解压后得到 flink-1.18.0 目录。
36+
1. 下载 [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz) ,解压后得到 flink-1.19.1 目录。
3737
使用下面的命令跳转至 Flink 目录下,并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
3838

3939
```shell
40-
cd flink-1.18.0
40+
cd flink-1.19.1
4141
```
4242

4343
2. 通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。
@@ -306,7 +306,7 @@ Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,
306306
docker-compose down
307307
```
308308

309-
在 Flink 所在目录 `flink-1.18.0` 下执行如下命令停止 Flink 集群:
309+
在 Flink 所在目录 `flink-1.19.1` 下执行如下命令停止 Flink 集群:
310310

311311
```shell
312312
./bin/stop-cluster.sh

docs/content/docs/get-started/quickstart/mysql-to-doris.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ without a single line of Java/Scala code or IDE installation.
3535
Prepare a Linux or MacOS computer with Docker installed.
3636

3737
### Prepare Flink Standalone cluster
38-
1. Download [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) ,unzip and get flink-1.18.0 directory.
39-
Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.18.0 is located.
38+
1. Download [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz) ,unzip and get flink-1.19.1 directory.
39+
Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.19.1 is located.
4040

4141
```shell
42-
cd flink-1.18.0
42+
cd flink-1.19.1
4343
```
4444

4545
2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds.
@@ -343,7 +343,7 @@ After finishing the tutorial, run the following command to stop all containers i
343343
```shell
344344
docker-compose down
345345
```
346-
Run the following command to stop the Flink cluster in the directory of Flink `flink-1.18.0`:
346+
Run the following command to stop the Flink cluster in the directory of Flink `flink-1.19.1`:
347347

348348
```shell
349349
./bin/stop-cluster.sh

docs/content/docs/get-started/quickstart/mysql-to-starrocks.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ without a single line of Java/Scala code or IDE installation.
3535
Prepare a Linux or MacOS computer with Docker installed.
3636

3737
### Prepare Flink Standalone cluster
38-
1. Download [Flink 1.18.0](https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz) ,unzip and get flink-1.18.0 directory.
39-
Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.18.0 is located.
38+
1. Download [Flink 1.19.1](https://archive.apache.org/dist/flink/flink-1.19.1/flink-1.19.1-bin-scala_2.12.tgz) ,unzip and get flink-1.19.1 directory.
39+
Use the following command to navigate to the Flink directory and set FLINK_HOME to the directory where flink-1.19.1 is located.
4040

4141
```shell
42-
cd flink-1.18.0
42+
cd flink-1.19.1
4343
```
4444

4545
2. Enable checkpointing by appending the following parameters to the conf/flink-conf.yaml configuration file to perform a checkpoint every 3 seconds.
@@ -310,7 +310,7 @@ After finishing the tutorial, run the following command to stop all containers i
310310
docker-compose down
311311
```
312312

313-
Run the following command to stop the Flink cluster in the directory of Flink `flink-1.18.0`:
313+
Run the following command to stop the Flink cluster in the directory of Flink `flink-1.19.1`:
314314

315315
```shell
316316
./bin/stop-cluster.sh

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/testsource/source/DistributedSourceFunction.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ public DistributedSourceFunction(int numOfTables, boolean distributedTables) {
7878
public void open(Configuration parameters) throws Exception {
7979
super.open(parameters);
8080
iotaCounter = 0;
81-
subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
82-
parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
81+
subTaskId = getRuntimeContext().getIndexOfThisSubtask();
82+
parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
8383
if (distributedTables) {
8484
tables =
8585
IntStream.range(0, numOfTables)

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,9 @@ public final void subtaskReset(int subTaskId, long checkpointId) {
287287
@Override
288288
public final void executionAttemptFailed(
289289
int subTaskId, int attemptNumber, @Nullable Throwable reason) {
290-
failedReasons.put(subTaskId, reason);
290+
if (reason != null) {
291+
failedReasons.put(subTaskId, reason);
292+
}
291293
}
292294

293295
@Override

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public SchemaOperator(
9797
@Override
9898
public void open() throws Exception {
9999
super.open();
100-
subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
100+
subTaskId = getRuntimeContext().getIndexOfThisSubtask();
101101
upstreamSchemaTable = HashBasedTable.create();
102102
evolvedSchemaMap = new HashMap<>();
103103
tableIdRouter = new TableIdRouter(routingRules);

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void open() throws Exception {
133133
this.schemaOperatorMetrics =
134134
new SchemaOperatorMetrics(
135135
getRuntimeContext().getMetricGroup(), schemaChangeBehavior);
136-
this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
136+
this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
137137
this.originalSchemaMap = new HashMap<>();
138138
this.evolvedSchemaMap = new HashMap<>();
139139
this.router = new TableIdRouter(routingRules);

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public DistributedPrePartitionOperator(
6363
@Override
6464
public void open() throws Exception {
6565
super.open();
66-
subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
66+
subTaskId = getRuntimeContext().getIndexOfThisSubtask();
6767
schemaMap = new HashMap<>();
6868
hashFunctionMap = new HashMap<>();
6969
}

0 commit comments

Comments
 (0)