+
+{{< top >}}
diff --git a/docs/content/docs/connectors/pipeline-connectors/maxcompute.md b/docs/content/docs/connectors/pipeline-connectors/maxcompute.md
new file mode 100644
index 00000000000..d1d39a6457a
--- /dev/null
+++ b/docs/content/docs/connectors/pipeline-connectors/maxcompute.md
@@ -0,0 +1,322 @@
+---
+title: "MaxCompute"
+weight: 7
+type: docs
+aliases:
+ - /connectors/maxcompute
+---
+
+
+
+# MaxCompute Connector
+
+MaxCompute connector can be used as the *Data Sink* of the pipeline, and write data
+to [MaxCompute](https://www.aliyun.com/product/odps). This document describes how to set up the MaxCompute connector.
+
+## What can the connector do?
+
+* Create table automatically if not exist
+* Schema change synchronization
+* Data synchronization
+
+## Example
+
+The pipeline for reading data from MySQL and sink to MaxCompute can be defined as follows:
+
+```yaml
+source:
+ type: mysql
+ name: MySQL Source
+ hostname: 127.0.0.1
+ port: 3306
+ username: admin
+ password: pass
+ tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+ server-id: 5401-5404
+
+sink:
+ type: maxcompute
+ name: MaxCompute Sink
+ accessId: ak
+ accessKey: sk
+ endpoint: endpoint
+ project: flink_cdc
+ bucketSize: 8
+
+pipeline:
+ name: MySQL to MaxCompute Pipeline
+ parallelism: 2
+```
+
+## Connector Options
+
+
+
+
+
+
Option
+
Required
+
Default
+
Type
+
Description
+
+
+
+
+
type
+
required
+
(none)
+
String
+
Specify what connector to use, here should be 'maxcompute'.
+
+
+
name
+
optional
+
(none)
+
String
+
The name of the sink.
+
+
+
accessId
+
required
+
(none)
+
String
+
AccessKey ID of Alibaba Cloud account or RAM user. You can enter
+ AccessKey management page Obtain AccessKey ID.
+
+
+
accessKey
+
required
+
(none)
+
String
+
AccessKey Secret corresponding to AccessKey ID. You can enter
+ AccessKey management page Obtain AccessKey Secret.
+
+
+
endpoint
+
required
+
(none)
+
String
+
The connection address for the MaxCompute service. You need to configure the Endpoint based on the region selected when creating the MaxCompute project and the network connection method. For values corresponding to each region and network, please refer to Endpoint.
+
+
+
project
+
required
+
(none)
+
String
+
The name of the MaxCompute project. You can log in to the MaxCompute console and obtain the MaxCompute project name on the Workspace > Project Management page.
+
+
+
tunnelEndpoint
+
optional
+
(none)
+
String
+
The connection address for the MaxCompute Tunnel service. Typically, this configuration can be auto-routed based on the region where the specified project is located. It is used only in special network environments such as when using a proxy.
+
+
+
quotaName
+
optional
+
(none)
+
String
+
The name of the exclusive resource group for MaxCompute data transfer. If not specified, the shared resource group is used. For details, refer to Using exclusive resource groups for Maxcompute
+
+
+
stsToken
+
optional
+
(none)
+
String
+
When using a temporary access token (STS Token) issued by a RAM role for authentication, this parameter must be specified.
+
+
+
bucketsNum
+
optional
+
16
+
Integer
+
The number of buckets used when auto-creating MaxCompute Delta tables. For usage, refer to Delta Table Overview
+
+
+
compressAlgorithm
+
optional
+
zlib
+
String
+
The data compression algorithm used when writing to MaxCompute. Currently supports raw (no compression), zlib, and snappy.
+
+
+
totalBatchSize
+
optional
+
64MB
+
String
+
The size of the data buffer in memory, by partition level (for non-partitioned tables, by table level). Buffers for different partitions (tables) are independent, and data is written to MaxCompute when the threshold is reached.
+
+
+
bucketBatchSize
+
optional
+
4MB
+
String
+
The size of the data buffer in memory, by bucket level. This is effective only when writing to Delta tables. Buffers for different data buckets are independent, and the bucket data is written to MaxCompute when the threshold is reached.
+
+
+
numCommitThreads
+
optional
+
16
+
Integer
+
The number of partitions (tables) that can be processed simultaneously during the checkpoint stage.
+
+
+
numFlushConcurrent
+
optional
+
4
+
Integer
+
The number of buckets that can be written to MaxCompute simultaneously. This is effective only when writing to Delta tables.
+
+
+
+
+
+## Usage Instructions
+
+* The connector supports automatic table creation, automatically mapping the location relationship and data types between MaxCompute tables and source tables (see the mapping table below). When the source table has a primary key, a MaxCompute Delta table is automatically created; otherwise, a regular MaxCompute table (Append table) is created.
+* When writing to a regular MaxCompute table (Append table), the delete operation will be ignored, and the update operation will be treated as an insert operation.
+* Currently, only at-least-once is supported. Delta tables can achieve idempotent writes due to their primary key characteristics.
+* For synchronization of table structure changes:
+ * A new column can only be added as the last column.
+ * Modifying a column type can only be changed to a compatible type. For compatible types, refer to[ALTER TABLE](https://help.aliyun.com/zh/maxcompute/user-guide/alter-table)
+
+## Table Location Mapping
+When the connector automatically creates tables, it uses the following mapping relationship to map the location information of the source tables to the location of the MaxCompute tables. Note that when the MaxCompute project does not support the Schema model, each synchronization task can only synchronize one MySQL Database. (The same applies to other DataSources, the connector will ignore the TableId.namespace information)
+
+
+
+
+
+
Abstract in Flink CDC
+
MaxCompute Location
+
MySQL Location
+
+
+
+
+
project in the configuration file
+
project
+
(none)
+
+
+
TableId.namespace
+
schema (Only when the MaxCompute project supports the Schema model. If not supported, this configuration will be ignored)
+
database
+
+
+
TableId.tableName
+
table
+
table
+
+
+
+
+
+## Data Type Mapping
+
+
+
+
+
+
Flink Type
+
MaxCompute Type
+
+
+
+
+
CHAR/VARCHAR
+
STRING
+
+
+
BOOLEAN
+
BOOLEAN
+
+
+
BINARY/VARBINARY
+
BINARY
+
+
+
DECIMAL
+
DECIMAL
+
+
+
TINYINT
+
TINYINT
+
+
+
SMALLINT
+
SMALLINT
+
+
+
INTEGER
+
INTEGER
+
+
+
BIGINT
+
BIGINT
+
+
+
FLOAT
+
FLOAT
+
+
+
DOUBLE
+
DOUBLE
+
+
+
TIME_WITHOUT_TIME_ZONE
+
STRING
+
+
+
DATE
+
DATE
+
+
+
TIMESTAMP_WITHOUT_TIME_ZONE
+
TIMESTAMP_NTZ
+
+
+
TIMESTAMP_WITH_LOCAL_TIME_ZONE
+
TIMESTAMP
+
+
+
TIMESTAMP_WITH_TIME_ZONE
+
TIMESTAMP
+
+
+
+
ARRAY
+
ARRAY
+
+
+
MAP
+
MAP
+
+
+
ROW
+
STRUCT
+
+
+
+
+
+{{< top >}}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/pom.xml
new file mode 100644
index 00000000000..5587db0bed6
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/pom.xml
@@ -0,0 +1,118 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-cdc-pipeline-connectors
+ ${revision}
+
+ flink-cdc-pipeline-connector-maxcompute
+ flink-cdc-pipeline-connector-maxcompute
+
+
+
+ org.apache.flink
+ flink-cdc-composer
+ ${project.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-runtime
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+
+ commons-codec
+ commons-codec
+ 1.15
+
+
+
+ com.aliyun.odps
+ odps-sdk-core
+ 0.50.6-public
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ aliyun-java-auth
+ com.aliyun
+
+
+ guava
+ com.google.guava
+
+
+ icu4j
+ com.ibm.icu
+
+
+ snappy-java
+ org.xerial.snappy
+
+
+
+
+
+ org.testcontainers
+ testcontainers
+ 1.19.8
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-flink
+ package
+
+ shade
+
+
+ false
+
+
+ *:*
+
+
+
+
+
+
+
+
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSink.java
new file mode 100644
index 00000000000..d147333766c
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSink.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute;
+
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.sink.EventSinkProvider;
+import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
+import org.apache.flink.cdc.connectors.maxcompute.sink.MaxComputeEventSink;
+import org.apache.flink.cdc.connectors.maxcompute.sink.MaxComputeHashFunctionProvider;
+
+/** A {@link DataSink} for "MaxCompute" connector. */
+public class MaxComputeDataSink implements DataSink {
+ private final MaxComputeOptions options;
+ private final MaxComputeWriteOptions writeOptions;
+
+ public MaxComputeDataSink(MaxComputeOptions options, MaxComputeWriteOptions writeOptions) {
+ this.options = options;
+ this.writeOptions = writeOptions;
+ }
+
+ @Override
+ public EventSinkProvider getEventSinkProvider() {
+ return FlinkSinkProvider.of(new MaxComputeEventSink(options, writeOptions));
+ }
+
+ @Override
+ public MetadataApplier getMetadataApplier() {
+ return new MaxComputeMetadataApplier(options);
+ }
+
+ @Override
+ public HashFunctionProvider getDataChangeEventHashFunctionProvider() {
+ return new MaxComputeHashFunctionProvider(options);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkFactory.java
new file mode 100644
index 00000000000..6f83853785a
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkFactory.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.factories.DataSinkFactory;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
+import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
+import org.apache.flink.configuration.MemorySize;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** A {@link DataSinkFactory} for "MaxCompute" connector. */
+public class MaxComputeDataSinkFactory implements DataSinkFactory {
+
+ private static final String IDENTIFIER = "maxcompute";
+
+ @Override
+ public DataSink createDataSink(Context context) {
+ MaxComputeOptions options =
+ extractMaxComputeOptions(
+ context.getFactoryConfiguration(), context.getPipelineConfiguration());
+ MaxComputeWriteOptions writeOptions =
+ extractMaxComputeWriteOptions(context.getFactoryConfiguration());
+ return new MaxComputeDataSink(options, writeOptions);
+ }
+
+ private MaxComputeOptions extractMaxComputeOptions(
+ Configuration factoryConfiguration, Configuration pipelineConfiguration) {
+ String accessId = factoryConfiguration.get(MaxComputeDataSinkOptions.ACCESS_ID);
+ String accessKey = factoryConfiguration.get(MaxComputeDataSinkOptions.ACCESS_KEY);
+ String endpoint = factoryConfiguration.get(MaxComputeDataSinkOptions.ENDPOINT);
+ String project = factoryConfiguration.get(MaxComputeDataSinkOptions.PROJECT);
+ String tunnelEndpoint = factoryConfiguration.get(MaxComputeDataSinkOptions.TUNNEL_ENDPOINT);
+ String quotaName = factoryConfiguration.get(MaxComputeDataSinkOptions.QUOTA_NAME);
+ String stsToken = factoryConfiguration.get(MaxComputeDataSinkOptions.STS_TOKEN);
+ int bucketsNum = factoryConfiguration.get(MaxComputeDataSinkOptions.BUCKETS_NUM);
+
+ String schemaOperatorUid =
+ pipelineConfiguration.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
+ return MaxComputeOptions.builder(accessId, accessKey, endpoint, project)
+ .withTunnelEndpoint(tunnelEndpoint)
+ .withQuotaName(quotaName)
+ .withStsToken(stsToken)
+ .withBucketsNum(bucketsNum)
+ .withSchemaOperatorUid(schemaOperatorUid)
+ .build();
+ }
+
+ private MaxComputeWriteOptions extractMaxComputeWriteOptions(
+ Configuration factoryConfiguration) {
+ int numCommitThread =
+ factoryConfiguration.get(MaxComputeDataSinkOptions.NUM_COMMIT_THREADS);
+ String compressAlgorithm =
+ factoryConfiguration.get(MaxComputeDataSinkOptions.COMPRESS_ALGORITHM);
+ int flushConcurrent =
+ factoryConfiguration.get(MaxComputeDataSinkOptions.NUM_FLUSH_CONCURRENT);
+ long maxBufferSize =
+ MemorySize.parse(
+ factoryConfiguration.get(
+ MaxComputeDataSinkOptions.TOTAL_BATCH_SIZE))
+ .getBytes();
+ long maxSlotSize =
+ MemorySize.parse(
+ factoryConfiguration.get(
+ MaxComputeDataSinkOptions.BUCKET_BATCH_SIZE))
+ .getBytes();
+
+ return MaxComputeWriteOptions.builder()
+ .withNumCommitThread(numCommitThread)
+ .withCompressAlgorithm(compressAlgorithm)
+ .withFlushConcurrent(flushConcurrent)
+ .withMaxBufferSize(maxBufferSize)
+ .withSlotBufferSize(maxSlotSize)
+ .build();
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ Set> requiredOptions = new HashSet<>();
+ requiredOptions.add(MaxComputeDataSinkOptions.ACCESS_ID);
+ requiredOptions.add(MaxComputeDataSinkOptions.ACCESS_KEY);
+ requiredOptions.add(MaxComputeDataSinkOptions.ENDPOINT);
+ requiredOptions.add(MaxComputeDataSinkOptions.PROJECT);
+ return requiredOptions;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ Set> optionalOptions = new HashSet<>();
+ // options
+ optionalOptions.add(MaxComputeDataSinkOptions.TUNNEL_ENDPOINT);
+ optionalOptions.add(MaxComputeDataSinkOptions.QUOTA_NAME);
+ optionalOptions.add(MaxComputeDataSinkOptions.STS_TOKEN);
+ optionalOptions.add(MaxComputeDataSinkOptions.BUCKETS_NUM);
+ // write options
+ optionalOptions.add(MaxComputeDataSinkOptions.NUM_COMMIT_THREADS);
+ optionalOptions.add(MaxComputeDataSinkOptions.COMPRESS_ALGORITHM);
+ optionalOptions.add(MaxComputeDataSinkOptions.NUM_FLUSH_CONCURRENT);
+ optionalOptions.add(MaxComputeDataSinkOptions.TOTAL_BATCH_SIZE);
+ optionalOptions.add(MaxComputeDataSinkOptions.BUCKET_BATCH_SIZE);
+
+ return optionalOptions;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java
new file mode 100644
index 00000000000..e28272b9108
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+/** Options for MaxCompute Data Sink. */
+public class MaxComputeDataSinkOptions {
+ // basic options.
+ public static final ConfigOption ACCESS_ID =
+ ConfigOptions.key("accessId")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MaxCompute user access id.");
+
+ public static final ConfigOption ACCESS_KEY =
+ ConfigOptions.key("accessKey")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MaxCompute user access key.");
+
+ public static final ConfigOption ENDPOINT =
+ ConfigOptions.key("endpoint")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MaxCompute endpoint.");
+
+ public static final ConfigOption PROJECT =
+ ConfigOptions.key("project")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MaxCompute project.");
+
+ public static final ConfigOption TUNNEL_ENDPOINT =
+ ConfigOptions.key("tunnelEndpoint")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MaxCompute tunnel end point.");
+ public static final ConfigOption QUOTA_NAME =
+ ConfigOptions.key("quotaName")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "MaxCompute tunnel quota name, note that not quota nick-name.");
+
+ public static final ConfigOption STS_TOKEN =
+ ConfigOptions.key("stsToken")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MaxCompute sts token.");
+
+ public static final ConfigOption BUCKETS_NUM =
+ ConfigOptions.key("bucketsNum")
+ .intType()
+ .defaultValue(16)
+ .withDescription(
+ "The batch size of MaxCompute table when automatically create table.");
+
+ // write options.
+ public static final ConfigOption COMPRESS_ALGORITHM =
+ ConfigOptions.key("compressAlgorithm")
+ .stringType()
+ .defaultValue("zlib")
+ .withDescription(
+ "The compress algorithm of data upload to MaxCompute, support 'zlib', 'snappy', 'raw'.");
+
+ public static final ConfigOption TOTAL_BATCH_SIZE =
+ ConfigOptions.key("totalBatchSize")
+ .stringType()
+ .defaultValue("64MB")
+ .withDescription("The max batch size of data upload to MaxCompute.");
+
+ public static final ConfigOption BUCKET_BATCH_SIZE =
+ ConfigOptions.key("bucketBatchSize")
+ .stringType()
+ .defaultValue("4MB")
+ .withDescription(
+ "The max batch size of data per bucket when upload to MaxCompute");
+
+ public static final ConfigOption NUM_COMMIT_THREADS =
+ ConfigOptions.key("numCommitThreads")
+ .intType()
+ .defaultValue(16)
+ .withDescription("The number of threads used to commit data to MaxCompute.");
+
+ public static final ConfigOption NUM_FLUSH_CONCURRENT =
+ ConfigOptions.key("numFlushConcurrent")
+ .intType()
+ .defaultValue(4)
+ .withDescription("The number of concurrent with flush bucket data.");
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeMetadataApplier.java
new file mode 100644
index 00000000000..bafe980ec72
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeMetadataApplier.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.DropTableEvent;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TruncateTableEvent;
+import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.SchemaEvolutionUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.TypeConvertUtils;
+
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link MetadataApplier} for "MaxCompute" connector. */
+public class MaxComputeMetadataApplier implements MetadataApplier {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(MaxComputeMetadataApplier.class);
+
+ private final MaxComputeOptions maxComputeOptions;
+
+ public MaxComputeMetadataApplier(MaxComputeOptions maxComputeOptions) {
+ this.maxComputeOptions = maxComputeOptions;
+ }
+
+ @Override
+ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
+ LOG.info("MaxCompute apply schema change event: {}", schemaChangeEvent);
+ try {
+ if (schemaChangeEvent instanceof CreateTableEvent) {
+ CreateTableEvent createTableEvent = (CreateTableEvent) schemaChangeEvent;
+ if (MaxComputeUtils.isTableExist(maxComputeOptions, createTableEvent.tableId())) {
+ Table table =
+ MaxComputeUtils.getTable(maxComputeOptions, createTableEvent.tableId());
+ TableSchema expectSchema =
+ TypeConvertUtils.toMaxCompute(createTableEvent.getSchema());
+ if (!MaxComputeUtils.schemaEquals(table.getSchema(), expectSchema)) {
+ throw new IllegalStateException(
+ "The schema of create table event is not equals to exist table schema, please drop/rename exist table before flink cdc task start.");
+ }
+ if (!CollectionUtils.isEqualCollection(
+ createTableEvent.getSchema().primaryKeys(), table.getPrimaryKey())) {
+ throw new IllegalStateException(
+ "The primary key of create table event is not equals to exist table primary key, please drop/rename exist table before flink cdc task start.");
+ }
+ } else {
+ SchemaEvolutionUtils.createTable(
+ maxComputeOptions,
+ createTableEvent.tableId(),
+ createTableEvent.getSchema());
+ }
+ } else if (schemaChangeEvent instanceof AlterColumnTypeEvent) {
+ AlterColumnTypeEvent alterColumnTypeEvent =
+ (AlterColumnTypeEvent) schemaChangeEvent;
+ SchemaEvolutionUtils.alterColumnType(
+ maxComputeOptions,
+ alterColumnTypeEvent.tableId(),
+ alterColumnTypeEvent.getTypeMapping());
+ } else if (schemaChangeEvent instanceof DropColumnEvent) {
+ DropColumnEvent dropColumnEvent = (DropColumnEvent) schemaChangeEvent;
+ SchemaEvolutionUtils.dropColumn(
+ maxComputeOptions,
+ dropColumnEvent.tableId(),
+ dropColumnEvent.getDroppedColumnNames());
+ } else if (schemaChangeEvent instanceof RenameColumnEvent) {
+ RenameColumnEvent renameColumnEvent = (RenameColumnEvent) schemaChangeEvent;
+ SchemaEvolutionUtils.renameColumn(
+ maxComputeOptions,
+ renameColumnEvent.tableId(),
+ renameColumnEvent.getNameMapping());
+ } else if (schemaChangeEvent instanceof AddColumnEvent) {
+ AddColumnEvent addColumnEvent = (AddColumnEvent) schemaChangeEvent;
+ SchemaEvolutionUtils.addColumns(
+ maxComputeOptions,
+ addColumnEvent.tableId(),
+ addColumnEvent.getAddedColumns());
+ } else if (schemaChangeEvent instanceof DropTableEvent) {
+ DropTableEvent dropTableEvent = (DropTableEvent) schemaChangeEvent;
+ SchemaEvolutionUtils.dropTable(maxComputeOptions, dropTableEvent.tableId());
+ } else if (schemaChangeEvent instanceof TruncateTableEvent) {
+ TruncateTableEvent truncateTableEvent = (TruncateTableEvent) schemaChangeEvent;
+ SchemaEvolutionUtils.truncateTable(maxComputeOptions, truncateTableEvent.tableId());
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported schema change event: "
+ + schemaChangeEvent.getClass().getName());
+ }
+ } catch (OdpsException e) {
+ throw new SchemaEvolveException(schemaChangeEvent, e.getMessage(), e);
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/Constant.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/Constant.java
new file mode 100644
index 00000000000..ed85024af72
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/Constant.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.common;
+
+/** Constant use for MaxCompute Connector. */
+public class Constant {
+ public static final String TUNNEL_SESSION_ID = "tunnel_session_id";
+ public static final String MAXCOMPUTE_PARTITION_NAME = "maxcompute_partition_name";
+ public static final String SCHEMA_ENABLE_FLAG = "odps.schema.model.enabled";
+
+ public static final String PIPELINE_SESSION_MANAGE_OPERATOR_UID =
+ "$$_session_manage_operator_$$";
+
+ public static final String END_OF_SESSION = "end_of_session";
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/FlinkOdpsException.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/FlinkOdpsException.java
new file mode 100755
index 00000000000..61e1041b837
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/FlinkOdpsException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.common;
+
+/** Exception thrown by Flink MaxCompute Connector. */
+public class FlinkOdpsException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public FlinkOdpsException(Throwable cause) {
+ super(cause);
+ }
+
+ public FlinkOdpsException(String message) {
+ super(message);
+ }
+
+ public FlinkOdpsException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/SessionIdentifier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/SessionIdentifier.java
new file mode 100644
index 00000000000..4cdab849ccc
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/SessionIdentifier.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.common;
+
+import org.apache.flink.cdc.common.event.TableId;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * A Session is uniquely identified through {@link TableId} and {@link String partitionName}. When
+ * the Session is successfully created, this class can also carry the sessionId. Note that sessionId
+ * does not participate in the comparison of hashcode and equals.
+ */
+public class SessionIdentifier implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String project;
+ private final String schema;
+ private final String table;
+ private final String partitionName;
+
+ /** sessionId not calculate in hashcode and equals. */
+ private String sessionId;
+
+ public SessionIdentifier(String project, String schema, String table, String partitionName) {
+ this(project, schema, table, partitionName, null);
+ }
+
+ public SessionIdentifier(
+ String project, String schema, String table, String partitionName, String sessionId) {
+ this.project = project;
+ this.schema = schema;
+ this.table = table;
+ this.partitionName = partitionName;
+ this.sessionId = sessionId;
+ }
+
+ public static SessionIdentifier of(
+ String project, String schema, String table, String partitionName) {
+ return new SessionIdentifier(project, schema, table, partitionName);
+ }
+
+ public static SessionIdentifier of(
+ String project, String schema, String table, String partitionName, String sessionId) {
+ return new SessionIdentifier(project, schema, table, partitionName, sessionId);
+ }
+
+ public String getProject() {
+ return project;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getPartitionName() {
+ return partitionName;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ @Override
+ public String toString() {
+ return "SessionIdentifier{"
+ + "project='"
+ + project
+ + '\''
+ + ", schema='"
+ + schema
+ + '\''
+ + ", table='"
+ + table
+ + '\''
+ + ", partitionName='"
+ + partitionName
+ + '\''
+ + ", sessionId='"
+ + sessionId
+ + '\''
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SessionIdentifier that = (SessionIdentifier) o;
+ return Objects.equals(project, that.project)
+ && Objects.equals(schema, that.schema)
+ && Objects.equals(table, that.table)
+ && Objects.equals(partitionName, that.partitionName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(project, schema, table, partitionName);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/UncheckedOdpsException.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/UncheckedOdpsException.java
new file mode 100644
index 00000000000..a4eedfbd9cc
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/common/UncheckedOdpsException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.common;
+
+import com.aliyun.odps.OdpsException;
+
+/** a wrapper class for {@link OdpsException} which throws {@link RuntimeException}. */
+public class UncheckedOdpsException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ private final OdpsException cause;
+
+ public UncheckedOdpsException(OdpsException cause) {
+ super(cause);
+ this.cause = cause;
+ }
+
+ @Override
+ public OdpsException getCause() {
+ return cause;
+ }
+
+ @Override
+ public String getMessage() {
+ return cause.getMessage() + ", requestId: " + cause.getRequestId();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageCoordinatedOperatorFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageCoordinatedOperatorFactory.java
new file mode 100644
index 00000000000..870983bdcf6
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageCoordinatedOperatorFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator;
+
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Provider;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
+/** The {@link AbstractStreamOperatorFactory} for {@link SessionManageOperator}. */
+public class SessionManageCoordinatedOperatorFactory extends AbstractStreamOperatorFactory
+ implements CoordinatedOperatorFactory, OneInputStreamOperatorFactory {
+ private static final long serialVersionUID = 1L;
+ private final MaxComputeOptions options;
+ private final MaxComputeWriteOptions writeOptions;
+ private final String schemaOperatorUid;
+
+ public SessionManageCoordinatedOperatorFactory(
+ MaxComputeOptions options,
+ MaxComputeWriteOptions writeOptions,
+ String schemaOperatorUid) {
+ this.options = options;
+ this.writeOptions = writeOptions;
+ this.schemaOperatorUid = schemaOperatorUid;
+ }
+
+ @Override
+ public > T createStreamOperator(
+ StreamOperatorParameters parameters) {
+ OperatorIDGenerator schemaOperatorIdGenerator = new OperatorIDGenerator(schemaOperatorUid);
+ SessionManageOperator operator =
+ new SessionManageOperator(options, schemaOperatorIdGenerator.generate());
+ TaskOperatorEventGateway taskOperatorEventGateway =
+ parameters
+ .getContainingTask()
+ .getEnvironment()
+ .getOperatorCoordinatorEventGateway();
+ operator.setup(
+ parameters.getContainingTask(),
+ parameters.getStreamConfig(),
+ parameters.getOutput());
+ operator.setTaskOperatorEventGateway(taskOperatorEventGateway);
+ parameters
+ .getOperatorEventDispatcher()
+ .registerEventHandler(operator.getOperatorID(), operator);
+ return (T) operator;
+ }
+
+ @Override
+ public Class extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+ return SessionManageOperator.class;
+ }
+
+ @Override
+ public Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
+ return new SessionManageCoordinator.SessionManageCoordinatorProvider(
+ operatorName, operatorID, options, writeOptions);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageCoordinator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageCoordinator.java
new file mode 100644
index 00000000000..24b86cffc9b
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageCoordinator.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator;
+
+import org.apache.flink.cdc.common.utils.StringUtils;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import org.apache.flink.cdc.connectors.maxcompute.common.FlinkOdpsException;
+import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CommitSessionRequest;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionRequest;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionResponse;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.WaitForFlushSuccessRequest;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
+import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.RetryUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.SessionCommitCoordinateHelper;
+import org.apache.flink.cdc.connectors.maxcompute.writer.MaxComputeWriter;
+import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An OperatorCoordinator is used to manage the Session and is consistent with accepting {@link
+ * CreateSessionRequest} and {@link CommitSessionRequest} sent by the Operator.
+ */
+public class SessionManageCoordinator implements OperatorCoordinator, CoordinationRequestHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SessionManageCoordinator.class);
+ private final String operatorName;
+ private final MaxComputeOptions options;
+ private final MaxComputeWriteOptions writeOptions;
+ private final int parallelism;
+ private SessionCommitCoordinateHelper sessionCommitCoordinateHelper;
+ private Map sessionCache;
+ private Map sessionIdMap;
+ private CompletableFuture[] waitingFlushFutures;
+ private ExecutorService executor;
+
+ private SessionManageCoordinator(
+ String operatorName,
+ Context context,
+ MaxComputeOptions options,
+ MaxComputeWriteOptions writeOptions) {
+ this.operatorName = operatorName;
+ this.parallelism = context.currentParallelism();
+ this.options = options;
+ this.writeOptions = writeOptions;
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Starting SessionManageCoordinator {}.", operatorName);
+
+ this.sessionCache = new HashMap<>();
+ this.sessionIdMap = new HashMap<>();
+ // start the executor
+ this.executor = Executors.newFixedThreadPool(writeOptions.getNumCommitThread());
+
+ this.waitingFlushFutures = new CompletableFuture[parallelism];
+ this.sessionCommitCoordinateHelper = new SessionCommitCoordinateHelper(parallelism);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (this.executor != null) {
+ this.executor.shutdown();
+ }
+ }
+
+ @Override
+ public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
+ // nothing to do
+ }
+
+ private MaxComputeWriter createWriter(SessionIdentifier identifier) {
+ String partitionName = identifier.getPartitionName();
+ if (!StringUtils.isNullOrWhitespaceOnly(partitionName)) {
+ RetryUtils.executeUnchecked(
+ () -> {
+ MaxComputeUtils.createPartitionIfAbsent(
+ options,
+ identifier.getSchema(),
+ identifier.getTable(),
+ partitionName);
+ return null;
+ });
+ }
+ try {
+ MaxComputeWriter writer =
+ MaxComputeWriter.batchWriter(options, writeOptions, identifier);
+ LOG.info("Create session for table {}, sessionId {}.", identifier, writer.getId());
+ return writer;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ public void checkpointCoordinator(long checkpointId, CompletableFuture result) {
+ executor.execute(
+ () -> {
+ try {
+ result.complete(new byte[0]);
+ } catch (Throwable throwable) {
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(throwable);
+ // when a checkpoint fails, throws directly.
+ result.completeExceptionally(
+ new CompletionException(
+ String.format(
+ "Failed to checkpoint Session %s for source %s",
+ checkpointId, this.getClass().getSimpleName()),
+ throwable));
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture handleCoordinationRequest(
+ CoordinationRequest request) {
+ LOG.info("Received coordination request {}.", request);
+ if (request instanceof CommitSessionRequest) {
+ CommitSessionRequest commitSessionRequest = (CommitSessionRequest) request;
+
+ CompletableFuture future =
+ sessionCommitCoordinateHelper.commit(
+ commitSessionRequest.getOperatorIndex(),
+ commitSessionRequest.getSessionId());
+ String toSubmitSessionId = sessionCommitCoordinateHelper.getToCommitSessionId();
+ while (sessionCommitCoordinateHelper.isCommitting() && toSubmitSessionId != null) {
+ commitSession(toSubmitSessionId);
+ toSubmitSessionId = sessionCommitCoordinateHelper.getToCommitSessionId();
+ }
+ if (!sessionCommitCoordinateHelper.isCommitting()) {
+ sessionCommitCoordinateHelper.commitSuccess(Constant.END_OF_SESSION, true);
+ sessionCommitCoordinateHelper.clear();
+
+ if (!sessionCache.isEmpty()) {
+ throw new FlinkOdpsException(
+ "sessionCache not empty: " + sessionCache.keySet());
+ }
+ completeAllFlushFutures();
+ }
+ return future;
+ } else if (request instanceof WaitForFlushSuccessRequest) {
+ CompletableFuture waitingFlushFuture = new CompletableFuture<>();
+ waitingFlushFutures[((WaitForFlushSuccessRequest) request).getOperatorIndex()] =
+ waitingFlushFuture;
+ return waitingFlushFuture;
+ } else if (request instanceof CreateSessionRequest) {
+ SessionIdentifier sessionIdentifier = ((CreateSessionRequest) request).getIdentifier();
+ if (!sessionCache.containsKey(sessionIdentifier)) {
+ MaxComputeWriter writer = createWriter(sessionIdentifier);
+ sessionCache.put(sessionIdentifier, writer);
+ sessionIdMap.put(writer.getId(), sessionIdentifier);
+ }
+ return CompletableFuture.completedFuture(
+ CoordinationResponseUtils.wrap(
+ new CreateSessionResponse(
+ sessionCache.get(sessionIdentifier).getId())));
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private void commitSession(String toSubmitSessionId) {
+ MaxComputeWriter writer = sessionCache.remove(sessionIdMap.remove(toSubmitSessionId));
+ AtomicBoolean isSuccess = new AtomicBoolean(true);
+ LOG.info("start commit writer {}.", toSubmitSessionId);
+ try {
+ Future> future =
+ executor.submit(
+ () -> {
+ try {
+ writer.commit();
+ } catch (Throwable throwable) {
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(throwable);
+ LOG.warn(
+ "Failed to commit writer {}.",
+ writer.getId(),
+ throwable);
+ isSuccess.set(false);
+ }
+ });
+ future.get();
+ } catch (Exception e) {
+ isSuccess.set(false);
+ }
+ sessionCommitCoordinateHelper.commitSuccess(toSubmitSessionId, isSuccess.get());
+ }
+
+ private void completeAllFlushFutures() {
+ for (CompletableFuture waitingFlushFuture : waitingFlushFutures) {
+ waitingFlushFuture.complete(null);
+ }
+ Arrays.fill(waitingFlushFutures, null);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // nothing to do
+ }
+
+ @Override
+ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData)
+ throws Exception {
+ // nothing to do
+ }
+
+ @Override
+ public void subtaskReset(int subtask, long checkpointId) {
+ // nothing to do
+ }
+
+ @Override
+ public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
+ // nothing to do
+ }
+
+ @Override
+ public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {
+ // nothing to do
+ }
+
+ /**
+ * The {@link org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Provider} of
+ * {@link SessionManageCoordinator}.
+ */
+ public static class SessionManageCoordinatorProvider implements Provider {
+
+ private final OperatorID operatorID;
+ private final String operatorName;
+ private final MaxComputeOptions options;
+ private final MaxComputeWriteOptions writeOptions;
+
+ public SessionManageCoordinatorProvider(
+ String operatorName,
+ OperatorID operatorID,
+ MaxComputeOptions options,
+ MaxComputeWriteOptions writeOptions) {
+ this.operatorName = operatorName;
+ this.operatorID = operatorID;
+
+ this.options = options;
+ this.writeOptions = writeOptions;
+ }
+
+ /** Gets the ID of the operator to which the coordinator belongs. */
+ @Override
+ public OperatorID getOperatorId() {
+ return operatorID;
+ }
+
+ /**
+ * Creates the {@code OperatorCoordinator}, using the given context.
+ *
+ * @param context
+ */
+ @Override
+ public OperatorCoordinator create(Context context) throws Exception {
+ return new SessionManageCoordinator(operatorName, context, options, writeOptions);
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageOperator.java
new file mode 100644
index 00000000000..3538bb3c6b5
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageOperator.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionRequest;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionResponse;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.WaitForFlushSuccessRequest;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.TypeConvertUtils;
+import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
+import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.SerializedValue;
+
+import com.aliyun.odps.PartitionSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Processes a {@link DataChangeEvent}, extracting data and encapsulating it into a {@link
+ * SessionIdentifier}, and then sends a {@link CreateSessionRequest} to the {@link
+ * SessionManageCoordinator} to create a writing session. Subsequently, it incorporates the
+ * SessionId into the metadata of the {@link DataChangeEvent} for downstream processing.
+ */
+public class SessionManageOperator extends AbstractStreamOperator
+ implements OneInputStreamOperator, OperatorEventHandler, BoundedOneInput {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(SessionManageOperator.class);
+
+ /** TODO: a tricky way to get an Operator from sink. */
+ public static SessionManageOperator instance;
+
+ private final MaxComputeOptions options;
+ private final OperatorID schemaOperatorUid;
+
+ private transient TaskOperatorEventGateway taskOperatorEventGateway;
+ private transient Map sessionCache;
+ private transient Map schemaMaps;
+ private transient Map> fieldGetterMaps;
+ private transient SchemaEvolutionClient schemaEvolutionClient;
+
+ private transient Future snapshotFlushSuccess;
+ private transient int indexOfThisSubtask;
+ /**
+ * trigger endOfInput is ahead of prepareSnapshotPreBarrier, so we need this flag to handle when
+ * endOfInput, send WaitForSuccessRequest in advance.
+ */
+ private transient boolean endOfInput;
+
+ public SessionManageOperator(MaxComputeOptions options, OperatorID schemaOperatorUid) {
+ this.chainingStrategy = ChainingStrategy.ALWAYS;
+ this.options = options;
+ this.schemaOperatorUid = schemaOperatorUid;
+ }
+
+ @Override
+ public void open() throws Exception {
+ this.sessionCache = new HashMap<>();
+ this.schemaMaps = new HashMap<>();
+ this.fieldGetterMaps = new HashMap<>();
+ SessionManageOperator.instance = this;
+ }
+
+ @Override
+ public void setup(
+ StreamTask, ?> containingTask,
+ StreamConfig config,
+ Output> output) {
+ super.setup(containingTask, config, output);
+ schemaEvolutionClient =
+ new SchemaEvolutionClient(
+ containingTask.getEnvironment().getOperatorCoordinatorEventGateway(),
+ schemaOperatorUid);
+ indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+ }
+
+ @Override
+ public void processElement(StreamRecord element) throws Exception {
+ if (element.getValue() instanceof DataChangeEvent) {
+ DataChangeEvent dataChangeEvent = (DataChangeEvent) element.getValue();
+ TableId tableId = dataChangeEvent.tableId();
+ // because of this operator is between SchemaOperator and DataSinkWriterOperator, no
+ // schema will fill when CreateTableEvent is loss.
+ if (!schemaMaps.containsKey(tableId)) {
+ emitLatestSchema(tableId);
+ }
+ String partitionName =
+ extractPartition(
+ dataChangeEvent.op() == OperationType.DELETE
+ ? dataChangeEvent.before()
+ : dataChangeEvent.after(),
+ tableId);
+ SessionIdentifier sessionIdentifier =
+ SessionIdentifier.of(
+ options.getProject(),
+ MaxComputeUtils.getSchema(options, tableId),
+ tableId.getTableName(),
+ partitionName);
+ if (!sessionCache.containsKey(sessionIdentifier)) {
+ CreateSessionResponse response =
+ (CreateSessionResponse)
+ sendRequestToOperator(new CreateSessionRequest(sessionIdentifier));
+ sessionCache.put(sessionIdentifier, response.getSessionId());
+ }
+ dataChangeEvent
+ .meta()
+ .put(Constant.TUNNEL_SESSION_ID, sessionCache.get(sessionIdentifier));
+ dataChangeEvent.meta().put(Constant.MAXCOMPUTE_PARTITION_NAME, partitionName);
+ output.collect(new StreamRecord<>(dataChangeEvent));
+ } else if (element.getValue() instanceof FlushEvent) {
+ LOG.info(
+ "operator {} handle FlushEvent begin, wait for sink writers flush success",
+ indexOfThisSubtask);
+ sessionCache.clear();
+ Future waitForSuccess =
+ submitRequestToOperator(new WaitForFlushSuccessRequest(indexOfThisSubtask));
+ output.collect(element);
+ // wait for sink writers flush success
+ waitForSuccess.get();
+ LOG.info(
+ "operator {} handle FlushEvent end, all sink writers flush success",
+ indexOfThisSubtask);
+ } else if (element.getValue() instanceof CreateTableEvent) {
+ TableId tableId = ((CreateTableEvent) element.getValue()).tableId();
+ Schema schema = ((CreateTableEvent) element.getValue()).getSchema();
+ schemaMaps.put(tableId, schema);
+ fieldGetterMaps.put(tableId, TypeConvertUtils.createFieldGetters(schema));
+ output.collect(element);
+ } else if (element.getValue() instanceof SchemaChangeEvent) {
+ SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) element.getValue();
+ TableId tableId = schemaChangeEvent.tableId();
+ Schema newSchema =
+ SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent);
+ schemaMaps.put(tableId, newSchema);
+ fieldGetterMaps.put(tableId, TypeConvertUtils.createFieldGetters(newSchema));
+ output.collect(element);
+ } else {
+ output.collect(element);
+ LOG.warn("unknown element {}", element.getValue());
+ }
+ }
+
+ private void emitLatestSchema(TableId tableId) throws Exception {
+ Optional schema = schemaEvolutionClient.getLatestOriginalSchema(tableId);
+ if (schema.isPresent()) {
+ Schema latestSchema = schema.get();
+ schemaMaps.put(tableId, latestSchema);
+ fieldGetterMaps.put(tableId, TypeConvertUtils.createFieldGetters(latestSchema));
+ output.collect(new StreamRecord<>(new CreateTableEvent(tableId, latestSchema)));
+ } else {
+ throw new RuntimeException(
+ "Could not find schema message from SchemaRegistry for " + tableId);
+ }
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ super.prepareSnapshotPreBarrier(checkpointId);
+ if (endOfInput) {
+ return;
+ }
+ LOG.info(
+ "operator {} prepare snapshot, wait for sink writers flush success",
+ indexOfThisSubtask);
+ // wait for sink writers flush success
+ waitLastSnapshotFlushSuccess();
+ snapshotFlushSuccess =
+ submitRequestToOperator(
+ new WaitForFlushSuccessRequest(
+ getRuntimeContext().getIndexOfThisSubtask()));
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ sessionCache.clear();
+ waitLastSnapshotFlushSuccess();
+ LOG.info("operator {} snapshot end, all sink writers flush success", indexOfThisSubtask);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ this.endOfInput = true;
+ LOG.info(
+ "operator {} end of input, wait for sink writers flush success",
+ indexOfThisSubtask);
+ waitLastSnapshotFlushSuccess();
+ snapshotFlushSuccess =
+ submitRequestToOperator(
+ new WaitForFlushSuccessRequest(
+ getRuntimeContext().getIndexOfThisSubtask()));
+ }
+
+ private void waitLastSnapshotFlushSuccess() throws Exception {
+ if (snapshotFlushSuccess != null) {
+ snapshotFlushSuccess.get();
+ snapshotFlushSuccess = null;
+ }
+ }
+
+ /** partition column is always after data column. */
+ private String extractPartition(RecordData recordData, TableId tableId) {
+ Schema schema = schemaMaps.get(tableId);
+ int partitionKeyCount = schema.partitionKeys().size();
+ if (partitionKeyCount == 0) {
+ return null;
+ }
+ int columnCount = schema.getColumnCount();
+ List fieldGetters = fieldGetterMaps.get(tableId);
+
+ PartitionSpec partitionSpec = new PartitionSpec();
+ for (int i = 0; i < partitionKeyCount; i++) {
+ RecordData.FieldGetter fieldGetter =
+ fieldGetters.get(columnCount - partitionKeyCount - 1 + i);
+ Object value = fieldGetter.getFieldOrNull(recordData);
+ partitionSpec.set(schema.partitionKeys().get(i), Objects.toString(value));
+ }
+ return partitionSpec.toString(true, true);
+ }
+
+ @Override
+ public void handleOperatorEvent(OperatorEvent evt) {
+ // handle event
+ }
+
+ /** call from CreateSessionCoordinatedOperatorFactory. */
+ public void setTaskOperatorEventGateway(TaskOperatorEventGateway taskOperatorEventGateway) {
+ this.taskOperatorEventGateway = taskOperatorEventGateway;
+ }
+
+ public CoordinationResponse sendRequestToOperator(CoordinationRequest request)
+ throws IOException, ExecutionException, InterruptedException {
+ CompletableFuture responseFuture =
+ taskOperatorEventGateway.sendRequestToCoordinator(
+ getOperatorID(), new SerializedValue<>(request));
+ return CoordinationResponseUtils.unwrap(responseFuture.get());
+ }
+
+ public Future submitRequestToOperator(CoordinationRequest request)
+ throws IOException {
+ return taskOperatorEventGateway.sendRequestToCoordinator(
+ getOperatorID(), new SerializedValue<>(request));
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CommitSessionRequest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CommitSessionRequest.java
new file mode 100644
index 00000000000..c12ed2049e5
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CommitSessionRequest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator.message;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+
+/**
+ * The commit session request from {@link
+ * org.apache.flink.cdc.connectors.maxcompute.sink.MaxComputeEventWriter} to {@link
+ * org.apache.flink.cdc.connectors.maxcompute.coordinator.SessionManageCoordinator}. Which is a type
+ * of {@link SyncRequest}.
+ */
+public class CommitSessionRequest implements CoordinationRequest {
+ private static final long serialVersionUID = 1L;
+
+ private final int operatorIndex;
+ private final String sessionId;
+
+ public CommitSessionRequest(int operatorIndex, String sessionId) {
+ this.operatorIndex = operatorIndex;
+ this.sessionId = sessionId;
+ }
+
+ public int getOperatorIndex() {
+ return operatorIndex;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ @Override
+ public String toString() {
+ return "CommitSessionRequest{"
+ + "operatorIndex="
+ + operatorIndex
+ + ", sessionId='"
+ + sessionId
+ + '\''
+ + '}';
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CommitSessionResponse.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CommitSessionResponse.java
new file mode 100644
index 00000000000..105dca3bc7d
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CommitSessionResponse.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator.message;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+
+/**
+ * Response for a {@link CommitSessionRequest}. This response is sent from {@link
+ * org.apache.flink.cdc.connectors.maxcompute.coordinator.SessionManageCoordinator} to {@link
+ * org.apache.flink.cdc.connectors.maxcompute.sink.MaxComputeEventWriter}.
+ *
+ *
A successful response indicates that all sessions have been committed, allowing the writer to
+ * proceed to the next round of writing. Otherwise, if any session has not been successfully
+ * committed, all task managers are instructed to reset to the latest checkpoint in order to retry
+ * the operation.
+ */
+public class CommitSessionResponse implements CoordinationResponse {
+ private static final long serialVersionUID = 1L;
+
+ private final boolean success;
+
+ public CommitSessionResponse(boolean success) {
+ this.success = success;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CreateSessionRequest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CreateSessionRequest.java
new file mode 100644
index 00000000000..fe8dac29724
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CreateSessionRequest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator.message;
+
+import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+
+/**
+ * Represents a request sent from a {@link
+ * org.apache.flink.cdc.connectors.maxcompute.coordinator.SessionManageOperator} to the {@link
+ * org.apache.flink.cdc.connectors.maxcompute.coordinator.SessionManageCoordinator}.
+ *
+ *
When a {@link org.apache.flink.cdc.common.event.DataChangeEvent} indicates a new session, a
+ * {@link CreateSessionRequest} is sent to the coordinator to handle the session creation process.
+ *
+ *
use {@link SessionIdentifier} to identify the session,
+ */
+public class CreateSessionRequest implements CoordinationRequest {
+ private static final long serialVersionUID = 1L;
+
+ private SessionIdentifier identifier;
+
+ public CreateSessionRequest(SessionIdentifier identifier) {
+ this.identifier = identifier;
+ }
+
+ public SessionIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ public void setIdentifier(SessionIdentifier identifier) {
+ this.identifier = identifier;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CreateSessionResponse.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CreateSessionResponse.java
new file mode 100644
index 00000000000..c650d0240a3
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/CreateSessionResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator.message;
+
+import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+
+/**
+ * the response of {@link CreateSessionRequest}, which contains the sessionId of specific {@link
+ * SessionIdentifier}.
+ */
+public class CreateSessionResponse implements CoordinationResponse {
+
+ private final String sessionId;
+
+ public CreateSessionResponse(String sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/SyncRequest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/SyncRequest.java
new file mode 100644
index 00000000000..02ba9f915c5
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/SyncRequest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator.message;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+
+/**
+ * A request signaling the {@link
+ * org.apache.flink.cdc.connectors.maxcompute.coordinator.SessionManageCoordinator} to await
+ * requests from all operators. Upon receiving this request from every operator, the coordinator
+ * proceeds to send a response.
+ */
+public class SyncRequest implements CoordinationRequest {
+ private static final long serialVersionUID = 1L;
+
+ private final int operatorIndex;
+
+ public SyncRequest(int operatorIndex) {
+ this.operatorIndex = operatorIndex;
+ }
+
+ public int getOperatorIndex() {
+ return operatorIndex;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/WaitForFlushSuccessRequest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/WaitForFlushSuccessRequest.java
new file mode 100644
index 00000000000..8a8baec6e74
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/message/WaitForFlushSuccessRequest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator.message;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+
+/** the {@link OperatorEvent} use for sync SessionManageOperator and SinkWriter. */
+public class WaitForFlushSuccessRequest implements CoordinationRequest {
+ private static final long serialVersionUID = 1L;
+
+ private final int operatorIndex;
+
+ public WaitForFlushSuccessRequest(int operatorIndex) {
+ this.operatorIndex = operatorIndex;
+ }
+
+ public int getOperatorIndex() {
+ return operatorIndex;
+ }
+
+ @Override
+ public String toString() {
+ return "WaitForFlushSuccessRequest{" + "operatorIndex=" + operatorIndex + '}';
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeOptions.java
new file mode 100644
index 00000000000..87ebd50391f
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeOptions.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.options;
+
+import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
+
+import java.io.Serializable;
+
+/** basic options for MaxCompute. */
+public class MaxComputeOptions implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String accessId;
+ private final String accessKey;
+ private final String endpoint;
+ private final String project;
+ private final String tunnelEndpoint;
+ private final boolean supportSchema;
+ private final String quotaName;
+ private final String stsToken;
+ private final int bucketsNum;
+ private final String schemaOperatorUid;
+
+ private MaxComputeOptions(Builder builder) {
+ this.accessId = builder.accessId;
+ this.accessKey = builder.accessKey;
+ this.endpoint = builder.endpoint;
+ this.project = builder.project;
+ this.tunnelEndpoint = builder.tunnelEndpoint;
+ this.quotaName = builder.quotaName;
+ this.stsToken = builder.stsToken;
+ this.bucketsNum = builder.bucketsNum;
+ this.supportSchema = MaxComputeUtils.supportSchema(this);
+ this.schemaOperatorUid = builder.schemaOperatorUid;
+ }
+
+ public static Builder builder(
+ String accessId, String accessKey, String endpoint, String project) {
+ return new Builder(accessId, accessKey, endpoint, project);
+ }
+
+ public String getTunnelEndpoint() {
+ return tunnelEndpoint;
+ }
+
+ public String getAccessId() {
+ return accessId;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public String getProject() {
+ return project;
+ }
+
+ public String getQuotaName() {
+ return quotaName;
+ }
+
+ public String getStsToken() {
+ return stsToken;
+ }
+
+ public boolean isSupportSchema() {
+ return supportSchema;
+ }
+
+ public int getBucketsNum() {
+ return bucketsNum;
+ }
+
+ public String getSchemaOperatorUid() {
+ return schemaOperatorUid;
+ }
+
+ /** builder for maxcompute options. */
+ public static class Builder {
+
+ private final String accessId;
+ private final String accessKey;
+ private final String endpoint;
+ private final String project;
+ private String tunnelEndpoint;
+ private String quotaName;
+ private String stsToken;
+ private String schemaOperatorUid;
+ private int bucketsNum = 16;
+
+ public Builder(String accessId, String accessKey, String endpoint, String project) {
+ this.accessId = accessId;
+ this.accessKey = accessKey;
+ this.endpoint = endpoint;
+ this.project = project;
+ }
+
+ public Builder withTunnelEndpoint(String tunnelEndpoint) {
+ this.tunnelEndpoint = tunnelEndpoint;
+ return this;
+ }
+
+ public Builder withQuotaName(String quotaName) {
+ this.quotaName = quotaName;
+ return this;
+ }
+
+ public Builder withStsToken(String stsToken) {
+ this.stsToken = stsToken;
+ return this;
+ }
+
+ public Builder withBucketsNum(int bucketsNum) {
+ this.bucketsNum = bucketsNum;
+ return this;
+ }
+
+ public Builder withSchemaOperatorUid(String schemaOperatorUid) {
+ this.schemaOperatorUid = schemaOperatorUid;
+ return this;
+ }
+
+ public MaxComputeOptions build() {
+ return new MaxComputeOptions(this);
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeWriteOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeWriteOptions.java
new file mode 100644
index 00000000000..c746063e6b6
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeWriteOptions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.options;
+
+import java.io.Serializable;
+
+/** extended options for maxcompute. */
+public class MaxComputeWriteOptions implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final int flushConcurrent;
+ private final long maxBufferSize;
+ private final long slotBufferSize;
+ private final int numCommitThread;
+ private final String compressAlgorithm;
+
+ private MaxComputeWriteOptions(Builder builder) {
+ this.flushConcurrent = builder.flushConcurrent;
+ this.maxBufferSize = builder.maxBufferSize;
+ this.slotBufferSize = builder.slotBufferSize;
+ this.numCommitThread = builder.numCommitThread;
+ this.compressAlgorithm = builder.compressAlgorithm;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public int getFlushConcurrent() {
+ return flushConcurrent;
+ }
+
+ public long getMaxBufferSize() {
+ return maxBufferSize;
+ }
+
+ public long getSlotBufferSize() {
+ return slotBufferSize;
+ }
+
+ public int getNumCommitThread() {
+ return numCommitThread;
+ }
+
+ public String getCompressAlgorithm() {
+ return compressAlgorithm;
+ }
+
+ /** builder for maxcompute write options. */
+ public static class Builder {
+ private int flushConcurrent = 2;
+ private long maxBufferSize = 64 * 1024 * 1024L;
+ private long slotBufferSize = 1024 * 1024L;
+ private int numCommitThread = 16;
+ private String compressAlgorithm = "zlib";
+
+ public Builder withFlushConcurrent(int flushConcurrent) {
+ this.flushConcurrent = flushConcurrent;
+ return this;
+ }
+
+ public Builder withMaxBufferSize(long maxBufferSize) {
+ this.maxBufferSize = maxBufferSize;
+ return this;
+ }
+
+ public Builder withSlotBufferSize(long slotBufferSize) {
+ this.slotBufferSize = slotBufferSize;
+ return this;
+ }
+
+ public Builder withNumCommitThread(int numCommitThread) {
+ this.numCommitThread = numCommitThread;
+ return this;
+ }
+
+ public Builder withCompressAlgorithm(String compressAlgorithm) {
+ this.compressAlgorithm = compressAlgorithm;
+ return this;
+ }
+
+ public MaxComputeWriteOptions build() {
+ return new MaxComputeWriteOptions(this);
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/sink/MaxComputeEventSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/sink/MaxComputeEventSink.java
new file mode 100644
index 00000000000..f860a003979
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/sink/MaxComputeEventSink.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.sink;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.SessionManageCoordinatedOperatorFactory;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+
+import java.io.IOException;
+
+/** A {@link Sink} of {@link Event} to MaxCompute. */
+public class MaxComputeEventSink implements Sink, WithPreWriteTopology {
+ private static final long serialVersionUID = 1L;
+ private final MaxComputeOptions options;
+ private final MaxComputeWriteOptions writeOptions;
+
+ public MaxComputeEventSink(MaxComputeOptions options, MaxComputeWriteOptions writeOptions) {
+ this.options = options;
+ this.writeOptions = writeOptions;
+ }
+
+ @Override
+ public DataStream addPreWriteTopology(DataStream inputDataStream) {
+ SingleOutputStreamOperator stream =
+ inputDataStream.transform(
+ "SessionManageOperator",
+ new EventTypeInfo(),
+ new SessionManageCoordinatedOperatorFactory(
+ options, writeOptions, options.getSchemaOperatorUid()));
+ stream.uid(Constant.PIPELINE_SESSION_MANAGE_OPERATOR_UID);
+ return stream;
+ }
+
+ @Override
+ public SinkWriter createWriter(InitContext context) throws IOException {
+ return new MaxComputeEventWriter(options, writeOptions, context);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/sink/MaxComputeEventWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/sink/MaxComputeEventWriter.java
new file mode 100644
index 00000000000..3e8a4c77d45
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/sink/MaxComputeEventWriter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.sink;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.SessionManageOperator;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CommitSessionRequest;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CommitSessionResponse;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
+import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.TypeConvertUtils;
+import org.apache.flink.cdc.connectors.maxcompute.writer.MaxComputeWriter;
+import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.util.Preconditions;
+
+import com.aliyun.odps.data.ArrayRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/** a {@link SinkWriter} for {@link Event} for MaxCompute. */
+public class MaxComputeEventWriter implements SinkWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(MaxComputeEventWriter.class);
+
+ private final Sink.InitContext context;
+ private final MaxComputeOptions options;
+ private final MaxComputeWriteOptions writeOptions;
+ private final Map writerMap;
+ private final Map schemaCache;
+
+ public MaxComputeEventWriter(
+ MaxComputeOptions options,
+ MaxComputeWriteOptions writeOptions,
+ Sink.InitContext context) {
+ this.context = context;
+ this.options = options;
+ this.writeOptions = writeOptions;
+
+ this.writerMap = new HashMap<>();
+ this.schemaCache = new HashMap<>();
+ }
+
+ @Override
+ public void write(Event element, Context context) throws IOException {
+ if (element instanceof DataChangeEvent) {
+ DataChangeEvent dataChangeEvent = (DataChangeEvent) element;
+ String sessionId = dataChangeEvent.meta().get(Constant.TUNNEL_SESSION_ID);
+ String partitionName = dataChangeEvent.meta().get(Constant.MAXCOMPUTE_PARTITION_NAME);
+ if (!writerMap.containsKey(sessionId)) {
+ LOG.info(
+ "Sink writer {} start to create session {}.",
+ this.context.getSubtaskId(),
+ sessionId);
+ SessionIdentifier sessionIdentifier =
+ SessionIdentifier.of(
+ options.getProject(),
+ MaxComputeUtils.getSchema(options, dataChangeEvent.tableId()),
+ dataChangeEvent.tableId().getTableName(),
+ partitionName,
+ sessionId);
+ writerMap.put(
+ sessionId,
+ MaxComputeWriter.batchWriter(options, writeOptions, sessionIdentifier));
+ }
+ MaxComputeWriter writer = writerMap.get(sessionId);
+ ArrayRecord record = writer.newElement();
+
+ if (dataChangeEvent.op() != OperationType.DELETE) {
+ TypeConvertUtils.toMaxComputeRecord(
+ schemaCache.get(dataChangeEvent.tableId()),
+ dataChangeEvent.after(),
+ record);
+ writer.write(record);
+ } else {
+ TypeConvertUtils.toMaxComputeRecord(
+ schemaCache.get(dataChangeEvent.tableId()),
+ dataChangeEvent.before(),
+ record);
+ writer.delete(record);
+ }
+ } else if (element instanceof CreateTableEvent) {
+ CreateTableEvent createTableEvent = (CreateTableEvent) element;
+ schemaCache.put(createTableEvent.tableId(), createTableEvent.getSchema());
+ } else if (element instanceof SchemaChangeEvent) {
+ SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) element;
+ TableId tableId = schemaChangeEvent.tableId();
+ Schema newSchema =
+ SchemaUtils.applySchemaChangeEvent(schemaCache.get(tableId), schemaChangeEvent);
+ schemaCache.put(tableId, newSchema);
+ }
+ }
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException, InterruptedException {
+ SessionManageOperator operator = SessionManageOperator.instance;
+ Preconditions.checkNotNull(
+ operator,
+ "SessionManageOperator cannot be null, please setting 'pipeline.operator-chaining' to true to avoid this issue.");
+ LOG.info("Sink writer {} start to flush.", context.getSubtaskId());
+ List> responces = new ArrayList<>(writerMap.size() + 1);
+ writerMap.entrySet().stream()
+ .sorted(Map.Entry.comparingByKey())
+ .forEach(
+ entry -> {
+ try {
+ entry.getValue().flush();
+ Future future =
+ operator.submitRequestToOperator(
+ new CommitSessionRequest(
+ context.getSubtaskId(), entry.getKey()));
+ responces.add(future);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ writerMap.clear();
+ Future future =
+ operator.submitRequestToOperator(
+ new CommitSessionRequest(context.getSubtaskId(), Constant.END_OF_SESSION));
+ responces.add(future);
+ try {
+ for (Future response : responces) {
+ CommitSessionResponse commitSessionResponse =
+ CoordinationResponseUtils.unwrap(response.get());
+ if (!commitSessionResponse.isSuccess()) {
+ throw new IOException(
+ "JobManager commit session failed. restart all TaskManager");
+ }
+ }
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ LOG.info("Sink writer {} flush success.", context.getSubtaskId());
+ }
+
+ @Override
+ public void close() throws Exception {
+ // do nothing
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/sink/MaxComputeHashFunctionProvider.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/sink/MaxComputeHashFunctionProvider.java
new file mode 100644
index 00000000000..4be46f2f413
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/sink/MaxComputeHashFunctionProvider.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.sink;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.function.HashFunctionProvider;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.utils.TypeConvertUtils;
+
+import com.aliyun.odps.tunnel.hasher.TypeHasher;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Hash function for maxcompute to distribute data change event to different maxcompute sink by
+ * primary key.
+ */
+public class MaxComputeHashFunctionProvider implements HashFunctionProvider {
+ private static final long serialVersionUID = 1L;
+ private final int bucketSize;
+
+ public MaxComputeHashFunctionProvider(MaxComputeOptions options) {
+ this.bucketSize = options.getBucketsNum();
+ }
+
+ @Override
+ public HashFunction getHashFunction(@Nullable TableId tableId, Schema schema) {
+ return new MaxComputeHashFunction(schema, bucketSize);
+ }
+
+ static class MaxComputeHashFunction implements HashFunction {
+ private final int bucketSize;
+ private final List primaryKeyGetters;
+
+ public MaxComputeHashFunction(Schema schema, int bucketSize) {
+ primaryKeyGetters = createFieldGetters(schema);
+ this.bucketSize = bucketSize;
+ }
+
+ @Override
+ public int hashcode(DataChangeEvent event) {
+ List hashes = new ArrayList<>();
+ RecordData data =
+ event.op().equals(OperationType.DELETE) ? event.before() : event.after();
+ for (RecordData.FieldGetter primaryKeyGetter : primaryKeyGetters) {
+ Object object = primaryKeyGetter.getFieldOrNull(data);
+ int hash =
+ object == null
+ ? 0
+ : TypeHasher.hash(
+ TypeConvertUtils.inferMaxComputeType(object), object);
+ hashes.add(hash);
+ }
+ return TypeHasher.CombineHashVal(hashes) % bucketSize;
+ }
+
+ private List createFieldGetters(Schema schema) {
+ List fieldGetters =
+ new ArrayList<>(schema.primaryKeys().size());
+ schema.primaryKeys().stream()
+ .mapToInt(
+ pk -> {
+ int index = schema.getColumnNames().indexOf(pk);
+ if (index == -1) {
+ throw new IllegalStateException(
+ String.format(
+ "Unable to find column \"%s\" which is defined as primary key",
+ pk));
+ }
+ return index;
+ })
+ .forEach(
+ primaryKeyPosition ->
+ fieldGetters.add(
+ TypeConvertUtils.createFieldGetter(
+ schema.getColumns()
+ .get(primaryKeyPosition)
+ .getType(),
+ primaryKeyPosition)));
+ return fieldGetters;
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/MaxComputeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/MaxComputeUtils.java
new file mode 100644
index 00000000000..a6832dbe55f
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/MaxComputeUtils.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.utils;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
+import org.apache.flink.cdc.connectors.maxcompute.common.UncheckedOdpsException;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeWriteOptions;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.account.StsAccount;
+import com.aliyun.odps.tunnel.Configuration;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.io.CompressOption;
+import com.aliyun.odps.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/** common utils use for maxcompute connector. */
+public class MaxComputeUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MaxComputeUtils.class);
+
+ public static Odps getOdps(MaxComputeOptions maxComputeOptions) {
+ Account account;
+ if (StringUtils.isNullOrEmpty(maxComputeOptions.getStsToken())) {
+ account =
+ new AliyunAccount(
+ maxComputeOptions.getAccessId(), maxComputeOptions.getAccessKey());
+ } else {
+ account =
+ new StsAccount(
+ maxComputeOptions.getAccessId(),
+ maxComputeOptions.getAccessKey(),
+ maxComputeOptions.getStsToken());
+ }
+ Odps odps = new Odps(account);
+ odps.setEndpoint(maxComputeOptions.getEndpoint());
+ odps.setTunnelEndpoint(maxComputeOptions.getTunnelEndpoint());
+ odps.setDefaultProject(maxComputeOptions.getProject());
+ odps.getRestClient().setReadTimeout(60);
+ odps.getRestClient().setConnectTimeout(60);
+ odps.setUserAgent("Flink CDC");
+ return odps;
+ }
+
+ public static TableTunnel getTunnel(
+ MaxComputeOptions maxComputeOptions, MaxComputeWriteOptions writeOptions) {
+ Odps odps = getOdps(maxComputeOptions);
+ Configuration configuration =
+ Configuration.builder(odps)
+ .withRetryLogger(RetryUtils.getRetryLogger())
+ .withRetryPolicy(new RetryUtils.FlinkDefaultRetryPolicy())
+ .withCompressOptions(
+ MaxComputeUtils.compressOptionOf(
+ writeOptions.getCompressAlgorithm()))
+ .withQuotaName(maxComputeOptions.getQuotaName())
+ .build();
+ TableTunnel tunnel = new TableTunnel(odps, configuration);
+ if (!StringUtils.isNullOrEmpty(maxComputeOptions.getTunnelEndpoint())) {
+ tunnel.setEndpoint(maxComputeOptions.getTunnelEndpoint());
+ }
+ return tunnel;
+ }
+
+ public static Table getTable(MaxComputeOptions maxComputeOptions, TableId tableId) {
+ Odps odps = getOdps(maxComputeOptions);
+ if (maxComputeOptions.isSupportSchema()) {
+ return odps.tables()
+ .get(
+ maxComputeOptions.getProject(),
+ tableId.getNamespace(),
+ tableId.getTableName());
+ } else {
+ return odps.tables().get(tableId.getTableName());
+ }
+ }
+
+ public static TableSchema getTableSchema(MaxComputeOptions options, TableId tableId) {
+ Odps odps = getOdps(options);
+ if (options.isSupportSchema()) {
+ return odps.tables()
+ .get(options.getProject(), tableId.getNamespace(), tableId.getTableName())
+ .getSchema();
+ } else {
+ return odps.tables().get(options.getProject(), tableId.getTableName()).getSchema();
+ }
+ }
+
+ public static boolean supportSchema(MaxComputeOptions maxComputeOptions) {
+ Odps odps = getOdps(maxComputeOptions);
+ try {
+ boolean flag =
+ Boolean.parseBoolean(
+ odps.projects().get().getProperty(Constant.SCHEMA_ENABLE_FLAG));
+ LOG.info("project {} is support schema: {}", maxComputeOptions.getProject(), flag);
+ return flag;
+ } catch (OdpsException e) {
+ throw new UncheckedOdpsException(e);
+ }
+ }
+
+ public static CompressOption compressOptionOf(String compressAlgo) {
+ CompressOption.CompressAlgorithm compressAlgorithm;
+ switch (compressAlgo) {
+ case "raw":
+ compressAlgorithm = CompressOption.CompressAlgorithm.ODPS_RAW;
+ break;
+ case "zlib":
+ compressAlgorithm = CompressOption.CompressAlgorithm.ODPS_ZLIB;
+ break;
+ case "lz4":
+ compressAlgorithm = CompressOption.CompressAlgorithm.ODPS_LZ4_FRAME;
+ break;
+ case "snappy":
+ compressAlgorithm = CompressOption.CompressAlgorithm.ODPS_SNAPPY;
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "unknown compress algo: "
+ + compressAlgo
+ + " , only support raw, zlib, lz4, snappy");
+ }
+ return new CompressOption(compressAlgorithm, 1, 0);
+ }
+
+ public static boolean isTableExist(MaxComputeOptions maxComputeOptions, TableId tableId) {
+ Odps odps = getOdps(maxComputeOptions);
+ try {
+ if (maxComputeOptions.isSupportSchema()) {
+ return odps.tables()
+ .exists(
+ odps.getDefaultProject(),
+ tableId.getNamespace(),
+ tableId.getTableName());
+ } else {
+ return odps.tables().exists(tableId.getTableName());
+ }
+ } catch (OdpsException e) {
+ throw new UncheckedOdpsException(e);
+ }
+ }
+
+ public static boolean schemaEquals(TableSchema currentSchema, TableSchema expectSchema) {
+ List currentColumns = currentSchema.getAllColumns();
+ List expectColumns = expectSchema.getAllColumns();
+ if (currentColumns.size() != expectColumns.size()
+ || currentSchema.getColumns().size() != expectSchema.getColumns().size()) {
+ LOG.error(
+ "current column size not equals to expect column size: {}, {}",
+ currentColumns.size(),
+ expectColumns.size());
+ return false;
+ }
+ for (int i = 0; i < currentColumns.size(); i++) {
+ if (!currentColumns.get(i).getName().equalsIgnoreCase(expectColumns.get(i).getName())) {
+ LOG.error(
+ "current column {} name not equals to expect column name: {}",
+ currentColumns.get(i).getName(),
+ expectColumns.get(i).getName());
+ return false;
+ }
+ if (!currentColumns
+ .get(i)
+ .getTypeInfo()
+ .getTypeName()
+ .equals(expectColumns.get(i).getTypeInfo().getTypeName())) {
+ LOG.error(
+ "current column {} type not equals to expect column type: {}",
+ currentColumns.get(i).getTypeInfo().getTypeName(),
+ expectColumns.get(i).getTypeInfo().getTypeName());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static void createPartitionIfAbsent(
+ MaxComputeOptions options, String schema, String table, String partitionName)
+ throws OdpsException {
+ Odps odps = getOdps(options);
+ if (options.isSupportSchema()) {
+ if (StringUtils.isNullOrEmpty(schema)) {
+ LOG.info(
+ "create partition {} in {}.default.{}",
+ partitionName,
+ options.getProject(),
+ table);
+ odps.tables()
+ .get(options.getProject(), "default", table)
+ .createPartition(new PartitionSpec(partitionName), true);
+ } else {
+ LOG.info(
+ "create partition {} in {}.{}.{}",
+ partitionName,
+ options.getProject(),
+ schema,
+ table);
+ odps.tables()
+ .get(options.getProject(), schema, table)
+ .createPartition(new PartitionSpec(partitionName), true);
+ }
+ } else {
+ LOG.info("create partition {} in {}.{}", partitionName, options.getProject(), table);
+ odps.tables()
+ .get(options.getProject(), table)
+ .createPartition(new PartitionSpec(partitionName), true);
+ }
+ }
+
+ public static String getSchema(MaxComputeOptions options, TableId tableId) {
+ if (options.isSupportSchema()) {
+ if (tableId.getNamespace() == null) {
+ return "default";
+ } else {
+ return tableId.getNamespace();
+ }
+ } else {
+ return null;
+ }
+ }
+
+ public static boolean isTransactionalTable(
+ MaxComputeOptions options, SessionIdentifier sessionIdentifier) {
+ Odps odps = getOdps(options);
+ return odps.tables()
+ .get(
+ sessionIdentifier.getProject(),
+ sessionIdentifier.getSchema(),
+ sessionIdentifier.getTable())
+ .isTransactional();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/RetryUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/RetryUtils.java
new file mode 100644
index 00000000000..1d7b7d3d151
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/RetryUtils.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.utils;
+
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.rest.RestClient;
+import com.aliyun.odps.tunnel.io.TunnelRetryHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Retry utilities to execute a callable with specific retry times. Set MAX_RETRIES and RETRY_DELAY
+ * at the start of {@link org.apache.flink.cdc.connectors.maxcompute.sink.MaxComputeEventSink}.
+ */
+public class RetryUtils {
+ public static final Logger LOG = LoggerFactory.getLogger(RetryUtils.class);
+ private static final int DEFAULT_MAX_RETRIES = 3;
+ private static final long DEFAULT_RETRY_DELAY = 5000;
+ private static final RetryLogger RETRY_LOGGER = new RetryLogger();
+
+ public static RetryLogger getRetryLogger() {
+ return RETRY_LOGGER;
+ }
+
+ /**
+ * Executes a callable with default retry strategy.
+ *
+ * @param callable the task to be executed
+ * @param the type of the task's result
+ * @return the task result
+ * @throws IOException If the task fails after all retries
+ */
+ public static T execute(Callable callable) throws IOException {
+ return execute(callable, DEFAULT_MAX_RETRIES, DEFAULT_RETRY_DELAY);
+ }
+
+ /**
+ * Executes a callable with specific retry strategy.
+ *
+ * @param callable the task to be executed
+ * @param maxRetries the maximum number of retries
+ * @param retryDelay the delay between retries in milliseconds
+ * @param the type of the task's result
+ * @return the task result
+ * @throws IOException If the task fails after all retries
+ */
+ public static T execute(Callable callable, int maxRetries, long retryDelay)
+ throws IOException {
+ int attempt = 0;
+ while (true) {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ attempt++;
+ if (attempt > maxRetries) {
+ if (e instanceof OdpsException) {
+ throw new IOException(
+ "Failed after retries. RequestId: "
+ + ((OdpsException) e).getRequestId(),
+ e);
+ }
+ throw new IOException("Failed after retries", e);
+ }
+ try {
+ RETRY_LOGGER.onRetryLog(e, attempt, retryDelay);
+ Thread.sleep(retryDelay);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Retry interrupted", ie);
+ }
+ }
+ }
+ }
+
+ /**
+ * Executes a callable with default retry strategy and unchecked exceptions.
+ *
+ * @param callable the task to be executed
+ * @param the type of the task's result
+ * @return the task result
+ */
+ public static T executeUnchecked(Callable callable) {
+ return executeUnchecked(callable, DEFAULT_MAX_RETRIES, DEFAULT_RETRY_DELAY);
+ }
+
+ /**
+ * Executes a callable with specific retry strategy and unchecked exceptions.
+ *
+ * @param callable the task to be executed
+ * @param maxRetries the maximum number of retries
+ * @param retryDelay the delay between retries in milliseconds
+ * @param the type of the task's result
+ * @return the task result
+ */
+ public static T executeUnchecked(Callable callable, int maxRetries, long retryDelay) {
+ try {
+ return execute(callable, maxRetries, retryDelay);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ static class RetryLogger extends RestClient.RetryLogger {
+ @Override
+ public void onRetryLog(Throwable e, long retryCount, long retrySleepTime) {
+ // Log the exception and retry details
+ LOG.warn(
+ "Retry attempt #{} failed. Exception: {}. Sleeping for {} ms before next attempt.",
+ retryCount,
+ e.getMessage(),
+ retrySleepTime,
+ e);
+ }
+ }
+
+ // retry 3 times and wait 5 seconds for each retry
+ static class FlinkDefaultRetryPolicy implements TunnelRetryHandler.RetryPolicy {
+ @Override
+ public boolean shouldRetry(Exception e, int attempt) {
+ return attempt <= DEFAULT_MAX_RETRIES;
+ }
+
+ @Override
+ public long getRetryWaitTime(int attempt) {
+ return TimeUnit.MILLISECONDS.toMillis(DEFAULT_RETRY_DELAY);
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java
new file mode 100644
index 00000000000..fc31d8f95a6
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.utils;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.utils.StringUtils;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.util.CollectionUtil;
+
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.Tables;
+import com.aliyun.odps.task.SQLTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Schema evolution utils for maxcompute. */
+public class SchemaEvolutionUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaEvolutionUtils.class);
+ private static final Map unsupportSchemahints = new HashMap<>();
+ private static final Map supportSchemaHints = new HashMap<>();
+
+ static {
+ unsupportSchemahints.put("odps.sql.type.system.odps2", "true");
+ unsupportSchemahints.put("odps.sql.decimal.odps2", "true");
+ unsupportSchemahints.put("odps.sql.allow.schema.evolution", "true");
+
+ supportSchemaHints.put("odps.sql.type.system.odps2", "true");
+ supportSchemaHints.put("odps.sql.decimal.odps2", "true");
+ supportSchemaHints.put("odps.namespace.schema", "true");
+ supportSchemaHints.put("odps.sql.allow.namespace.schema", "true");
+ supportSchemaHints.put("odps.sql.allow.schema.evolution", "true");
+ }
+
+ private SchemaEvolutionUtils() {}
+
+ /**
+ * equals to run a sql like: create table table_name (col_name1 type1 comment [, col_name2 type2
+ * ...]);.
+ */
+ public static void createTable(MaxComputeOptions options, TableId tableId, Schema schema)
+ throws OdpsException {
+ Odps odps = MaxComputeUtils.getOdps(options);
+ TableSchema tableSchema = TypeConvertUtils.toMaxCompute(schema);
+ if (options.isSupportSchema()
+ && !StringUtils.isNullOrWhitespaceOnly(tableId.getNamespace())) {
+ LOG.info("create schema {}", tableId.getNamespace());
+ odps.schemas()
+ .create(
+ odps.getDefaultProject(),
+ tableId.getNamespace(),
+ "generate by Flink CDC",
+ true);
+ }
+ Tables.TableCreator tableCreator =
+ odps.tables()
+ .newTableCreator(
+ odps.getDefaultProject(), tableId.getTableName(), tableSchema)
+ .withHints(unsupportSchemahints)
+ .ifNotExists()
+ .debug();
+ if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
+ tableCreator
+ .transactionTable()
+ .withBucketNum(options.getBucketsNum())
+ .withPrimaryKeys(schema.primaryKeys());
+ }
+ if (options.isSupportSchema()) {
+ if (StringUtils.isNullOrWhitespaceOnly(tableId.getNamespace())) {
+ tableCreator.withSchemaName("default").withHints(supportSchemaHints);
+ } else {
+ tableCreator.withSchemaName(tableId.getNamespace()).withHints(supportSchemaHints);
+ }
+ }
+ LOG.info("create table {}, schema {}", getFullTableName(options, tableId), schema);
+ tableCreator.create();
+ }
+
+ /**
+ * equals to run a sql like: 'alter table table_name add columns (col_name1 type1 comment [,
+ * col_name2 type2 ...]);'.
+ */
+ public static void addColumns(
+ MaxComputeOptions options,
+ TableId tableId,
+ List columns)
+ throws OdpsException {
+ Odps odps = MaxComputeUtils.getOdps(options);
+
+ StringBuilder sqlBuilder =
+ new StringBuilder(
+ "alter table " + getFullTableName(options, tableId) + " add columns (");
+
+ for (AddColumnEvent.ColumnWithPosition addColumn : columns) {
+ if (addColumn.getPosition() == AddColumnEvent.ColumnPosition.LAST) {
+ sqlBuilder
+ .append(addColumn.getAddColumn().getName())
+ .append(" ")
+ .append(string(addColumn.getAddColumn().getType()))
+ .append(" comment '")
+ .append(addColumn.getAddColumn().getType().asSummaryString())
+ .append("',");
+ } else {
+ throw new UnsupportedOperationException(
+ "Not support position: "
+ + addColumn.getPosition()
+ + " "
+ + addColumn.getExistedColumnName());
+ }
+ }
+ // remove ','
+ sqlBuilder.deleteCharAt(sqlBuilder.length() - 1);
+ sqlBuilder.append(");");
+
+ Instance instance =
+ SQLTask.run(
+ odps,
+ odps.getDefaultProject(),
+ sqlBuilder.toString(),
+ options.isSupportSchema() ? supportSchemaHints : unsupportSchemahints,
+ null);
+ LOG.info("execute add column task: `{}`, instanceId: {}", sqlBuilder, instance.getId());
+ instance.waitForSuccess();
+ }
+
+ /**
+ * equals to run a sql like: 'alter table table_name change column old_column_name
+ * new_column_name new_data_type;'. and 'alter table table_name change column col_name comment
+ * 'col_comment'';
+ */
+ public static void alterColumnType(
+ MaxComputeOptions options, TableId tableId, Map typeMapping)
+ throws OdpsException {
+ Odps odps = MaxComputeUtils.getOdps(options);
+
+ String prefix = "alter table " + getFullTableName(options, tableId) + " change column ";
+
+ for (Map.Entry entry : typeMapping.entrySet()) {
+ String alterColumnSql =
+ prefix
+ + entry.getKey()
+ + " "
+ + entry.getKey()
+ + " "
+ + string(entry.getValue())
+ + ";";
+ Instance instance =
+ SQLTask.run(
+ odps,
+ odps.getDefaultProject(),
+ alterColumnSql,
+ options.isSupportSchema() ? supportSchemaHints : unsupportSchemahints,
+ null);
+ LOG.info(
+ "execute alter column task: `{}`, instanceId: {}",
+ alterColumnSql,
+ instance.getId());
+ instance.waitForSuccess();
+ }
+ }
+
+ /**
+ * equals to run a sql like: 'alter table table_name drop columns col_name1[, col_name2...];'.
+ */
+ public static void dropColumn(
+ MaxComputeOptions options, TableId tableId, List droppedColumnNames)
+ throws OdpsException {
+ Odps odps = MaxComputeUtils.getOdps(options);
+ StringBuilder sqlBuilder =
+ new StringBuilder(
+ "alter table " + getFullTableName(options, tableId) + " drop columns ");
+ for (String column : droppedColumnNames) {
+ sqlBuilder.append(column).append(",");
+ }
+ // remove ','
+ sqlBuilder.deleteCharAt(sqlBuilder.length() - 1);
+ sqlBuilder.append(";");
+ Instance instance =
+ SQLTask.run(
+ odps,
+ odps.getDefaultProject(),
+ sqlBuilder.toString(),
+ options.isSupportSchema() ? supportSchemaHints : unsupportSchemahints,
+ null);
+ LOG.info("execute drop column task: `{}`, instanceId: {}", sqlBuilder, instance.getId());
+ instance.waitForSuccess();
+ }
+
+ /**
+ * equals to run a sql like: 'alter table table_name change column old_col_name rename to
+ * new_col_name;'.
+ */
+ public static void renameColumn(
+ MaxComputeOptions options, TableId tableId, Map nameMapping)
+ throws OdpsException {
+ Odps odps = MaxComputeUtils.getOdps(options);
+ String prefix = "alter table " + getFullTableName(options, tableId) + " change column ";
+ for (Map.Entry entry : nameMapping.entrySet()) {
+ String sql = prefix + entry.getKey() + " rename to " + entry.getValue() + ";";
+ Instance instance =
+ SQLTask.run(
+ odps,
+ odps.getDefaultProject(),
+ sql,
+ options.isSupportSchema() ? supportSchemaHints : unsupportSchemahints,
+ null);
+ LOG.info("execute rename column task: `{}`, instanceId: {}", sql, instance.getId());
+ instance.waitForSuccess();
+ }
+ }
+
+ public static void dropTable(MaxComputeOptions options, TableId tableId) throws OdpsException {
+ Odps odps = MaxComputeUtils.getOdps(options);
+ Table table = MaxComputeUtils.getTable(options, tableId);
+ odps.tables().delete(table.getProject(), table.getSchemaName(), table.getName(), false);
+ }
+
+ public static void truncateTable(MaxComputeOptions options, TableId tableId)
+ throws OdpsException {
+ Table table = MaxComputeUtils.getTable(options, tableId);
+ table.truncate();
+ }
+
+ private static String getFullTableName(MaxComputeOptions options, TableId tableId) {
+ if (options.isSupportSchema()) {
+ if (StringUtils.isNullOrWhitespaceOnly(tableId.getNamespace())) {
+ return "`" + options.getProject() + "`.`default`.`" + tableId.getTableName() + "`";
+ } else {
+ return "`"
+ + options.getProject()
+ + "`.`"
+ + tableId.getNamespace()
+ + "`.`"
+ + tableId.getTableName()
+ + "`";
+ }
+ } else {
+ return "`" + options.getProject() + "`.`" + tableId.getTableName() + "`";
+ }
+ }
+
+ private static String string(DataType dataType) {
+ return TypeConvertUtils.toMaxCompute(dataType).getTypeName();
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SessionCommitCoordinateHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SessionCommitCoordinateHelper.java
new file mode 100644
index 00000000000..e6b1cb42b34
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SessionCommitCoordinateHelper.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.utils;
+
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CommitSessionResponse;
+import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The SessionCommitCoordinator class is responsible for coordinating and controlling the order of
+ * session submissions by multiple concurrent executors in a distributed processing environment. It
+ * ensures that: 1. Each executor must submit sessions in ascending order by session ID. 2. Each
+ * executor must submit a Constant.END_OF_SESSION as a terminator after completing its session
+ * submissions.
+ *
+ *
Working Principle: - Maintains an array of queues (toCommitSessionIds), with each queue
+ * corresponding to an executor, to isolate the session submissions of different executors. This
+ * maintains the independence and submission order of each executor. - Executors submit session IDs
+ * sequentially by invoking the commit() method. The commit operation simply enqueues the session ID
+ * into the corresponding executor's queue. - The getToCommitSessionId() method is tasked with
+ * selecting the smallest session ID across all executors that has been "submitted" or is "no longer
+ * required" for submission, allowing for further processing. "Submitted" means that the session ID
+ * has been submitted by all executors; "no longer required" assumes that any subsequent session IDs
+ * that are yet to be submitted will always be greater than the currently chosen ID. - Once a
+ * session ID is selected by the getToCommitSessionId() method, it's removed from all executors'
+ * queues, indicating that the session ID has been processed. This process ensures ordered
+ * processing of the sessions and allows the system to efficiently progress. - Each processing step
+ * of the session IDs is based on a key assumption: that any subsequent session ID submissions will
+ * always be greater than the current processed session ID. This is guaranteed by the fact that each
+ * executor commits to submitting session IDs in order and submits a special terminator
+ * (Constant.END_OF_SESSION) at the end.
+ *
+ *
Note: - The class presupposes that all session IDs are comparable, and each executor strictly
+ * adheres to the submission order of session IDs in ascending order. Any behavior that deviates
+ * from this principle may lead to unpredictable outcomes, as it contravenes the fundamental
+ * assumption of the class's design. - The introduction of Constant.END_OF_SESSION as a terminator
+ * is a key aspect of this coordination strategy, as it provides a clear signal for recognizing the
+ * completion status of an executor, allowing the system to determine whether all relevant sessions
+ * have been processed.
+ */
+public class SessionCommitCoordinateHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(SessionCommitCoordinateHelper.class);
+ private final Queue[] toCommitSessionIds;
+ private final Map> toCommitFutures;
+ /**
+ * If any string is {@link Constant#END_OF_SESSION}, it should be considered larger than any
+ * other non-{@link Constant#END_OF_SESSION} string.
+ */
+ private final Comparator comparator =
+ (String a, String b) -> {
+ if (a.equals(Constant.END_OF_SESSION) || b.equals(Constant.END_OF_SESSION)) {
+ if (a.equals(b)) {
+ return 0;
+ } else if (a.equals(Constant.END_OF_SESSION)) {
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+ return a.compareTo(b);
+ };
+
+ private boolean isCommitting;
+
+ public SessionCommitCoordinateHelper(int parallelism) {
+ Preconditions.checkArgument(parallelism > 0);
+ isCommitting = true;
+ toCommitFutures = new HashMap<>();
+ toCommitSessionIds = new ArrayDeque[parallelism];
+
+ for (int i = 0; i < parallelism; i++) {
+ toCommitSessionIds[i] = new ArrayDeque<>();
+ }
+ }
+
+ public void clear() {
+ for (Queue toCommitSessionId : toCommitSessionIds) {
+ toCommitSessionId.clear();
+ }
+ toCommitFutures.clear();
+ isCommitting = true;
+ }
+
+ public CompletableFuture commit(int subtaskId, String sessionId) {
+ LOG.info("subtask {} commit sessionId: {}", subtaskId, sessionId);
+ toCommitSessionIds[subtaskId].offer(sessionId);
+ if (toCommitFutures.containsKey(sessionId)) {
+ return toCommitFutures.get(sessionId);
+ }
+ CompletableFuture future = new CompletableFuture<>();
+ toCommitFutures.putIfAbsent(sessionId, future);
+ return future;
+ }
+
+ public String getToCommitSessionId() {
+ String peekSession = null;
+ for (Queue commitSessionId : toCommitSessionIds) {
+ if (commitSessionId.isEmpty()) {
+ return null;
+ }
+ if (peekSession == null) {
+ peekSession = commitSessionId.peek();
+ } else {
+ if (comparator.compare(commitSessionId.peek(), peekSession) < 0) {
+ peekSession = commitSessionId.peek();
+ }
+ }
+ }
+ // peekSession cannot be null here.
+ if (peekSession.equals(Constant.END_OF_SESSION)) {
+ isCommitting = false;
+ return null;
+ }
+ for (Queue toCommitSessionId : toCommitSessionIds) {
+ if (toCommitSessionId.peek().equals(peekSession)) {
+ toCommitSessionId.poll();
+ }
+ }
+ return peekSession;
+ }
+
+ public void commitSuccess(String sessionId, boolean success) {
+ toCommitFutures
+ .get(sessionId)
+ .complete(CoordinationResponseUtils.wrap(new CommitSessionResponse(success)));
+ }
+
+ public boolean isCommitting() {
+ return isCommitting;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java
new file mode 100644
index 00000000000..e87ad27e7ca
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/TypeConvertUtils.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.utils;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.MapData;
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.ArrayType;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.MapType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.ArrayRecord;
+import com.aliyun.odps.data.Binary;
+import com.aliyun.odps.data.SimpleStruct;
+import com.aliyun.odps.data.Struct;
+import com.aliyun.odps.table.utils.Preconditions;
+import com.aliyun.odps.type.StructTypeInfo;
+import com.aliyun.odps.type.TypeInfo;
+import com.aliyun.odps.type.TypeInfoFactory;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount;
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
+import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
+
+/**
+ * Data type mapping table This table shows the mapping relationship from Flink types to MaxCompute
+ * types and the corresponding Java type representation.
+ *
+ *
+ */
+public class TypeConvertUtils {
+
+ /** this method ignore the message of primary key in flinkSchema. */
+ public static TableSchema toMaxCompute(Schema flinkSchema) {
+ Preconditions.checkNotNull(flinkSchema, "flink Schema");
+ TableSchema tableSchema = new TableSchema();
+ Set partitionKeys = new HashSet<>(flinkSchema.partitionKeys());
+ List columns = flinkSchema.getColumns();
+ Set pkSet = new HashSet<>(flinkSchema.primaryKeys());
+
+ for (int i = 0; i < flinkSchema.getColumnCount(); i++) {
+ org.apache.flink.cdc.common.schema.Column flinkColumn = columns.get(i);
+ Column odpsColumn = toMaxCompute(flinkColumn, pkSet.contains(flinkColumn.getName()));
+ if (partitionKeys.contains(flinkColumn.getName())) {
+ tableSchema.addPartitionColumn(odpsColumn);
+ } else {
+ tableSchema.addColumn(odpsColumn);
+ }
+ }
+ return tableSchema;
+ }
+
+ public static Column toMaxCompute(
+ org.apache.flink.cdc.common.schema.Column flinkColumn, boolean notNull) {
+ Preconditions.checkNotNull(flinkColumn, "flink Schema Column");
+ DataType type = flinkColumn.getType();
+ Column.ColumnBuilder columnBuilder =
+ Column.newBuilder(flinkColumn.getName(), toMaxCompute(type));
+ if (!type.isNullable() || notNull) {
+ columnBuilder.notNull();
+ }
+ return columnBuilder.build();
+ }
+
+ public static TypeInfo toMaxCompute(DataType type) {
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case TIME_WITHOUT_TIME_ZONE:
+ return TypeInfoFactory.STRING;
+ case BOOLEAN:
+ return TypeInfoFactory.BOOLEAN;
+ case BINARY:
+ case VARBINARY:
+ return TypeInfoFactory.BINARY;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ return TypeInfoFactory.getDecimalTypeInfo(
+ decimalType.getPrecision(), decimalType.getScale());
+ case TINYINT:
+ return TypeInfoFactory.TINYINT;
+ case SMALLINT:
+ return TypeInfoFactory.SMALLINT;
+ case INTEGER:
+ return TypeInfoFactory.INT;
+ case BIGINT:
+ return TypeInfoFactory.BIGINT;
+ case FLOAT:
+ return TypeInfoFactory.FLOAT;
+ case DOUBLE:
+ return TypeInfoFactory.DOUBLE;
+ case DATE:
+ return TypeInfoFactory.DATE;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return TypeInfoFactory.TIMESTAMP_NTZ;
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return TypeInfoFactory.TIMESTAMP;
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) type;
+ return TypeInfoFactory.getArrayTypeInfo(toMaxCompute(arrayType.getElementType()));
+ case MAP:
+ MapType mapType = (MapType) type;
+ return TypeInfoFactory.getMapTypeInfo(
+ toMaxCompute(mapType.getKeyType()), toMaxCompute(mapType.getValueType()));
+ case ROW:
+ RowType rowType = (RowType) type;
+ return TypeInfoFactory.getStructTypeInfo(
+ rowType.getFieldNames(),
+ rowType.getFieldTypes().stream()
+ .map(TypeConvertUtils::toMaxCompute)
+ .collect(Collectors.toList()));
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ public static void toMaxComputeRecord(Schema flinkSchema, RecordData from, ArrayRecord to) {
+ Preconditions.checkNotNull(from, "flink record data");
+ Preconditions.checkNotNull(to, "maxcompute arrayRecord");
+ int partitionKeyCount = flinkSchema.partitionKeys().size();
+
+ List fieldGetters = createFieldGetters(flinkSchema);
+
+ if (to.getColumnCount() != (fieldGetters.size() - partitionKeyCount)) {
+ throw new IllegalArgumentException(
+ "record data count not match, odps {"
+ + Arrays.stream(to.getColumns())
+ .map(c -> c.getName() + " " + c.getTypeInfo().getTypeName())
+ .collect(Collectors.joining(", "))
+ + "} count "
+ + to.getColumnCount()
+ + "vs flink {"
+ + flinkSchema
+ + "} count "
+ + (fieldGetters.size() - partitionKeyCount));
+ }
+ for (int i = 0; i < (fieldGetters.size() - partitionKeyCount); i++) {
+ Object value = fieldGetters.get(i).getFieldOrNull(from);
+ to.set(i, value);
+ }
+ }
+
+ /**
+ * create a list of {@link RecordData.FieldGetter} from given {@link Schema} to get Object from
+ * RecordData.
+ *
+ *
This method is a modified version of {@link SchemaUtils#createFieldGetters(Schema)}, which
+ * return MaxCompute Java type
+ */
+ public static List createFieldGetters(Schema schema) {
+ List fieldGetters = new ArrayList<>(schema.getColumns().size());
+ for (int i = 0; i < schema.getColumns().size(); i++) {
+ fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i));
+ }
+ return fieldGetters;
+ }
+
+ /**
+ * Creates an accessor for getting elements in an internal RecordData structure at the given
+ * position.
+ *
+ *
This method is a modified version of {@link RecordData#createFieldGetter(DataType, int)},
+ * which return MaxCompute Java type
+ *
+ * @param fieldType the element type of the RecordData
+ * @param fieldPos the element position of the RecordData
+ */
+ public static RecordData.FieldGetter createFieldGetter(DataType fieldType, int fieldPos) {
+ final RecordData.FieldGetter fieldGetter;
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ fieldGetter = record -> record.getString(fieldPos).toString();
+ break;
+ case BOOLEAN:
+ fieldGetter = record -> record.getBoolean(fieldPos);
+ break;
+ case BINARY:
+ case VARBINARY:
+ fieldGetter = record -> new Binary(record.getBinary(fieldPos));
+ break;
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(fieldType);
+ final int decimalScale = getScale(fieldType);
+ fieldGetter =
+ record ->
+ record.getDecimal(fieldPos, decimalPrecision, decimalScale)
+ .toBigDecimal();
+ break;
+ case TINYINT:
+ fieldGetter = record -> record.getByte(fieldPos);
+ break;
+ case SMALLINT:
+ fieldGetter = record -> record.getShort(fieldPos);
+ break;
+ case INTEGER:
+ fieldGetter = record -> record.getInt(fieldPos);
+ break;
+ case DATE:
+ fieldGetter = record -> LocalDate.ofEpochDay(record.getInt(fieldPos));
+ break;
+ case TIME_WITHOUT_TIME_ZONE:
+ fieldGetter =
+ record -> LocalTime.ofNanoOfDay(record.getInt(fieldPos) * 1000L).toString();
+ break;
+ case BIGINT:
+ fieldGetter = record -> record.getLong(fieldPos);
+ break;
+ case FLOAT:
+ fieldGetter = record -> record.getFloat(fieldPos);
+ break;
+ case DOUBLE:
+ fieldGetter = record -> record.getDouble(fieldPos);
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ fieldGetter =
+ record ->
+ record.getTimestamp(fieldPos, getPrecision(fieldType))
+ .toLocalDateTime();
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ fieldGetter =
+ record ->
+ record.getLocalZonedTimestampData(fieldPos, getPrecision(fieldType))
+ .toInstant();
+ break;
+ case TIMESTAMP_WITH_TIME_ZONE:
+ fieldGetter =
+ record ->
+ record.getZonedTimestamp(fieldPos, getPrecision(fieldType))
+ .toInstant();
+ break;
+ case ARRAY:
+ fieldGetter =
+ record -> {
+ ArrayData array = record.getArray(fieldPos);
+ DataType elementType = ((ArrayType) fieldType).getElementType();
+ ArrayData.ElementGetter elementGetter =
+ createElementGetter(elementType);
+ List