From 07531bfb51a2f89bc5dd68520a8d72ca5f7edea6 Mon Sep 17 00:00:00 2001
From: "north.lin" <37775475+qg-lin@users.noreply.github.com>
Date: Fri, 17 Jan 2025 11:21:01 +0800
Subject: [PATCH 1/2] [FLINK-35152][pipeline-connector/doris] Support create
doris auto partition table
---
.../connectors/pipeline-connectors/doris.md | 19 ++
.../connectors/pipeline-connectors/doris.md | 19 ++
.../pom.xml | 2 +-
.../doris/factory/DorisDataSinkFactory.java | 6 +-
.../connectors/doris/sink/DorisDataSink.java | 4 +-
.../doris/sink/DorisDataSinkOptions.java | 23 ++
.../doris/sink/DorisEventSerializer.java | 39 +++-
.../doris/sink/DorisMetadataApplier.java | 6 +
.../doris/utils/DorisSchemaUtils.java | 107 +++++++++
.../doris/sink/DorisEventSerializerTest.java | 187 +++++++++++++++
.../doris/sink/DorisSchemaUtilsTest.java | 213 ++++++++++++++++++
11 files changed, 619 insertions(+), 6 deletions(-)
create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java
create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java
create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaUtilsTest.java
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/doris.md b/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
index 307abc13817..2318463e016 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
@@ -182,6 +182,25 @@ pipeline:
查看更多关于 Doris Table 的属性
+
+ table.create.auto-partition.properties.* |
+ optional |
+ (none) |
+ String |
+ 创建自动分区表的配置。
+ 当前仅支持DATE/DATETIME类型列的AUTO RANGE PARTITION,分区函数为date_trunc ,且Doris版本必须大于2.1.6,查看更多关于 Doris自动分区
+ 支持的属性有:
+ table.create.auto-partition.properties.include 包含的经过route后的表集合,用逗号分隔,支持正则表达式;
+ table.create.auto-partition.properties.exclude 排除的经过route后的表集合,用逗号分隔,支持正则表达式;
+ table.create.auto-partition.properties.default_partition_key 默认分区键;
+ table.create.auto-partition.properties.default_partition_unit 默认分区单位;
+ table.create.auto-partition.properties.DB.TABLE.partition_key 特定表的分区键,如未配置取默认分区键;
+ table.create.auto-partition.properties.DB.TABLE.partition_unit 特定表的分区单位,如未配置取默认分区单位。
+ 注意:
+ 1: 如果分区键不为DATE/DATETIME类型,则不会创建分区表。
+ 2: Doris AUTO RANGE PARTITION不支持NULLABLE列作为分区列,如果您配置的分区键的值为空或者表创建完成后新增了NULLABLE分区列,系统将自动填充默认值(DATE类型为1970-01-01 ,DATETIME类型为1970-01-01 00:00:00 ),请选择合适的分区键。
+ |
+
diff --git a/docs/content/docs/connectors/pipeline-connectors/doris.md b/docs/content/docs/connectors/pipeline-connectors/doris.md
index cee412c16e8..8b93eec2d8f 100644
--- a/docs/content/docs/connectors/pipeline-connectors/doris.md
+++ b/docs/content/docs/connectors/pipeline-connectors/doris.md
@@ -182,6 +182,25 @@ pipeline:
See more about Doris Table Properties
+
+ table.create.auto-partition.properties.* |
+ optional |
+ (none) |
+ String |
+ Create the auto partition Properties configuration of the table.
+ Currently the partition function only supports date_trunc, and the partition column supports only DATE or DATETIME types, and the version of Doris must greater than 2.1.6. See more about Doris Auto Partitioning
+ These properties are supported now:
+ table.create.auto-partition.properties.include A collection of tables after route to include, separated by commas, supports regular expressions;
+ table.create.auto-partition.properties.exclude A collection of tables after route to exclude, separated by commas, supports regular expressions;
+ table.create.auto-partition.properties.default_partition_key The default partition key;
+ table.create.auto-partition.properties.default_partition_unit The default partition unit;
+ table.create.auto-partition.properties.DB.TABLE.partition_key The partition key of a specific table. If not set, the default partition key is used;
+ table.create.auto-partition.properties.DB.TABLE.partition_unit The partition unit of a specific table. If not set, the default partition unit is used.
+ Note:
+ 1: If the partition key is not DATE/DATETIME type, auto partition tables won't be created.
+ 2: Doris AUTO RANGE PARTITION does not support NULLABLE columns as partition key, if Flink CDC get a NULL value or a NULLABLE partition key was added after the table was created, will automatically fill it with a default value(DATE:1970-01-01 , DATETIME:1970-01-01 00:00:00 ), chose a suitable partition key is very important.
+ |
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
index 3257debfe15..bb0256f0957 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/pom.xml
@@ -27,7 +27,7 @@ limitations under the License.
flink-cdc-pipeline-connector-doris
- 24.0.1
+ 24.1.0
8.0.26
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java
index 4891309d414..31d0836fe15 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/factory/DorisDataSinkFactory.java
@@ -58,6 +58,7 @@
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_MAX_RETRIES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_USE_CACHE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.STREAM_LOAD_PROP_PREFIX;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;
@@ -67,7 +68,10 @@ public class DorisDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
- .validateExcept(TABLE_CREATE_PROPERTIES_PREFIX, STREAM_LOAD_PROP_PREFIX);
+ .validateExcept(
+ TABLE_CREATE_PROPERTIES_PREFIX,
+ STREAM_LOAD_PROP_PREFIX,
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX);
Configuration config = context.getFactoryConfiguration();
DorisOptions.Builder optionsBuilder = DorisOptions.builder();
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSink.java
index 130d2170c64..ed2e9cb3e23 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSink.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSink.java
@@ -62,14 +62,14 @@ public EventSinkProvider getEventSinkProvider() {
dorisOptions,
readOptions,
executionOptions,
- new DorisEventSerializer(zoneId)));
+ new DorisEventSerializer(zoneId, configuration)));
} else {
return FlinkSinkProvider.of(
new DorisBatchSink<>(
dorisOptions,
readOptions,
executionOptions,
- new DorisEventSerializer(zoneId)));
+ new DorisEventSerializer(zoneId, configuration)));
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java
index 5d58bbc8288..2e5d62de244 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisDataSinkOptions.java
@@ -158,6 +158,29 @@ public class DorisDataSinkOptions {
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
// Prefix for Doris Create table.
public static final String TABLE_CREATE_PROPERTIES_PREFIX = "table.create.properties.";
+ // Prefix for Doris Create auto partition table.
+ public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX =
+ "table.create.auto-partition.properties.";
+ public static final String TABLE_CREATE_PARTITION_KEY = "partition-key";
+ public static final String TABLE_CREATE_PARTITION_UNIT = "partition-unit";
+
+ public static final String TABLE_CREATE_DEFAULT_PARTITION_KEY =
+ "default-" + TABLE_CREATE_PARTITION_KEY;
+ public static final String TABLE_CREATE_DEFAULT_PARTITION_UNIT =
+ "default-" + TABLE_CREATE_PARTITION_UNIT;
+
+ public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY =
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_DEFAULT_PARTITION_KEY;
+ public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT =
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_DEFAULT_PARTITION_UNIT;
+
+ public static final String TABLE_CREATE_PARTITION_INCLUDE = "include";
+ public static final String TABLE_CREATE_PARTITION_EXCLUDE = "exclude";
+
+ public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE =
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_PARTITION_INCLUDE;
+ public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_EXCLUDE =
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_PARTITION_EXCLUDE;
public static Map getPropertiesByPrefix(
Configuration tableOptions, String prefix) {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java
index a1cb9182d1a..983a2c91690 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializer.java
@@ -17,6 +17,8 @@
package org.apache.flink.cdc.connectors.doris.sink;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -26,8 +28,14 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -42,6 +50,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;
@@ -61,8 +70,11 @@ public class DorisEventSerializer implements DorisRecordSerializer {
/** ZoneId from pipeline config to support timestamp with local time zone. */
public final ZoneId pipelineZoneId;
- public DorisEventSerializer(ZoneId zoneId) {
+ public final Configuration dorisConfig;
+
+ public DorisEventSerializer(ZoneId zoneId, Configuration config) {
pipelineZoneId = zoneId;
+ dorisConfig = config;
}
@Override
@@ -108,6 +120,30 @@ private DorisRecord applyDataChangeEvent(DataChangeEvent event) throws JsonProce
throw new UnsupportedOperationException("Unsupport Operation " + op);
}
+ // get partition info from config
+ Tuple2 partitionInfo =
+ DorisSchemaUtils.getPartitionInfo(dorisConfig, schema, tableId);
+ if (!Objects.isNull(partitionInfo)) {
+ String partitionKey = partitionInfo.f0;
+ Object partitionValue = valueMap.get(partitionKey);
+ // fill partition column by default value if null
+ if (Objects.isNull(partitionValue)) {
+ schema.getColumn(partitionKey)
+ .ifPresent(
+ column -> {
+ DataType dataType = column.getType();
+ if (dataType instanceof DateType) {
+ valueMap.put(partitionKey, DorisSchemaUtils.DEFAULT_DATE);
+ } else if (dataType instanceof LocalZonedTimestampType
+ || dataType instanceof TimestampType
+ || dataType instanceof ZonedTimestampType) {
+ valueMap.put(
+ partitionKey, DorisSchemaUtils.DEFAULT_DATETIME);
+ }
+ });
+ }
+ }
+
return DorisRecord.of(
tableId.getSchemaName(),
tableId.getTableName(),
@@ -121,7 +157,6 @@ public Map serializerRecord(RecordData recordData, Schema schema
Preconditions.checkState(
columns.size() == recordData.getArity(),
"Column size does not match the data size");
-
for (int i = 0; i < recordData.getArity(); i++) {
DorisRowConverter.SerializationConverter converter =
DorisRowConverter.createNullableExternalConverter(
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index 1d6476152b4..a2014c1571f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -17,6 +17,7 @@
package org.apache.flink.cdc.connectors.doris.sink;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
@@ -39,6 +40,7 @@
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
@@ -162,6 +164,10 @@ private void applyCreateTableEvent(CreateTableEvent event) throws SchemaEvolveEx
DorisDataSinkOptions.getPropertiesByPrefix(
config, TABLE_CREATE_PROPERTIES_PREFIX);
tableSchema.setProperties(tableProperties);
+
+ Tuple2 partitionInfo =
+ DorisSchemaUtils.getPartitionInfo(config, schema, tableId);
+ tableSchema.setPartitionInfo(partitionInfo);
schemaChangeManager.createTable(tableSchema);
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java
new file mode 100644
index 00000000000..7d780ef23cb
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DateType;
+import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
+import org.apache.flink.cdc.common.types.TimestampType;
+import org.apache.flink.cdc.common.types.ZonedTimestampType;
+import org.apache.flink.cdc.common.utils.StringUtils;
+import org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions;
+
+import java.util.Map;
+
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_DEFAULT_PARTITION_KEY;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_DEFAULT_PARTITION_UNIT;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_EXCLUDE;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_INCLUDE;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_KEY;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_UNIT;
+
+/** Utilities for doris schema. */
+public class DorisSchemaUtils {
+
+ public static final String DEFAULT_DATE = "1970-01-01";
+ public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00";
+
+ /**
+ * Get partition info by config. Currently only supports DATE/TIMESTAMP AUTO RANGE PARTITION and
+ * doris version should greater than 2.1.6
+ *
+ * @param config
+ * @param schema
+ * @param tableId
+ * @return
+ */
+ public static Tuple2 getPartitionInfo(
+ Configuration config, Schema schema, TableId tableId) {
+ Map autoPartitionProperties =
+ DorisDataSinkOptions.getPropertiesByPrefix(
+ config, TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX);
+ String excludes = autoPartitionProperties.get(TABLE_CREATE_PARTITION_EXCLUDE);
+ if (!StringUtils.isNullOrWhitespaceOnly(excludes)) {
+ Selectors selectExclude =
+ new Selectors.SelectorsBuilder().includeTables(excludes).build();
+ if (selectExclude.isMatch(tableId)) {
+ return null;
+ }
+ }
+
+ String includes = autoPartitionProperties.get(TABLE_CREATE_PARTITION_INCLUDE);
+ if (!StringUtils.isNullOrWhitespaceOnly(includes)) {
+ Selectors selectInclude =
+ new Selectors.SelectorsBuilder().includeTables(includes).build();
+ if (!selectInclude.isMatch(tableId)) {
+ return null;
+ }
+ }
+
+ String partitionKey =
+ autoPartitionProperties.get(
+ tableId.identifier() + "." + TABLE_CREATE_PARTITION_KEY);
+ String partitionUnit =
+ autoPartitionProperties.get(
+ tableId.identifier() + "." + TABLE_CREATE_PARTITION_UNIT);
+ if (StringUtils.isNullOrWhitespaceOnly(partitionKey)) {
+ partitionKey = autoPartitionProperties.get(TABLE_CREATE_DEFAULT_PARTITION_KEY);
+ }
+ if (StringUtils.isNullOrWhitespaceOnly(partitionUnit)) {
+ partitionUnit = autoPartitionProperties.get(TABLE_CREATE_DEFAULT_PARTITION_UNIT);
+ }
+
+ if (schema.getColumn(partitionKey).isPresent()
+ && !StringUtils.isNullOrWhitespaceOnly(partitionKey)) {
+
+ DataType dataType = schema.getColumn(partitionKey).get().getType();
+ if (dataType instanceof LocalZonedTimestampType
+ || dataType instanceof TimestampType
+ || dataType instanceof ZonedTimestampType
+ || dataType instanceof DateType) {
+ return new Tuple2<>(partitionKey, partitionUnit);
+ }
+ }
+ return null;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java
new file mode 100644
index 00000000000..8cbd3554cd5
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisEventSerializerTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.TimestampData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE;
+
+/** A test for {@link org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer} . */
+public class DorisEventSerializerTest {
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+ private static DorisEventSerializer dorisEventSerializer;
+
+ private static final TableId TABLE_ID = TableId.parse("doris_database.doris_table");
+ private static final Schema SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.STRING())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("create_date", DataTypes.DATE())
+ .physicalColumn("create_time", DataTypes.TIMESTAMP())
+ .primaryKey("id")
+ .build();
+ private static final BinaryRecordDataGenerator RECORD_DATA_GENERATOR =
+ new BinaryRecordDataGenerator(((RowType) SCHEMA.toRowDataType()));
+
+ @Test
+ public void testDataChangeEventWithDateTimePartitionColumn() throws IOException {
+ Map configMap = new HashMap<>();
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE, "doris_database.\\.*");
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY, "create_time");
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT, "year");
+
+ Configuration dorisConfig = Configuration.fromMap(configMap);
+ dorisEventSerializer = new DorisEventSerializer(ZoneId.of("UTC"), dorisConfig);
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID, SCHEMA);
+ dorisEventSerializer.serialize(createTableEvent);
+
+ LocalDateTime localDateTime =
+ LocalDateTime.ofInstant(Instant.parse("2025-01-16T08:00:00Z"), ZoneId.of("Z"));
+ DataChangeEvent dataChangeEvent =
+ DataChangeEvent.insertEvent(
+ TABLE_ID,
+ RECORD_DATA_GENERATOR.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("flink"),
+ (int) LocalDate.of(2025, 1, 16).toEpochDay(),
+ TimestampData.fromLocalDateTime(localDateTime),
+ }));
+
+ DorisRecord dorisRecord = dorisEventSerializer.serialize(dataChangeEvent);
+ JsonNode jsonNode = objectMapper.readTree(dorisRecord.getRow());
+ Assert.assertEquals("2025-01-16 08:00:00.000000", jsonNode.get("create_time").asText());
+ }
+
+ @Test
+ public void testDataChangeEventIfDatetimePartitionColumnIsNull() throws IOException {
+ Map configMap = new HashMap<>();
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE, "doris_database.\\.*");
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY, "create_time");
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT, "year");
+
+ Configuration dorisConfig = Configuration.fromMap(configMap);
+ dorisEventSerializer = new DorisEventSerializer(ZoneId.of("UTC"), dorisConfig);
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID, SCHEMA);
+ dorisEventSerializer.serialize(createTableEvent);
+
+ DataChangeEvent dataChangeEvent =
+ DataChangeEvent.insertEvent(
+ TABLE_ID,
+ RECORD_DATA_GENERATOR.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("flink"),
+ (int) LocalDate.of(2025, 1, 16).toEpochDay(),
+ null,
+ }));
+
+ DorisRecord dorisRecord = dorisEventSerializer.serialize(dataChangeEvent);
+ JsonNode jsonNode = objectMapper.readTree(dorisRecord.getRow());
+ Assert.assertEquals(
+ DorisSchemaUtils.DEFAULT_DATETIME, jsonNode.get("create_time").asText());
+ }
+
+ @Test
+ public void testDataChangeEventWithDatePartitionColumn() throws IOException {
+ Map configMap = new HashMap<>();
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE, "doris_database.\\.*");
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY, "create_date");
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT, "year");
+
+ Configuration dorisConfig = Configuration.fromMap(configMap);
+ dorisEventSerializer = new DorisEventSerializer(ZoneId.of("UTC"), dorisConfig);
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID, SCHEMA);
+ dorisEventSerializer.serialize(createTableEvent);
+
+ DataChangeEvent dataChangeEvent =
+ DataChangeEvent.insertEvent(
+ TABLE_ID,
+ RECORD_DATA_GENERATOR.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("flink"),
+ (int) LocalDate.of(2025, 1, 16).toEpochDay(),
+ null,
+ }));
+
+ DorisRecord dorisRecord = dorisEventSerializer.serialize(dataChangeEvent);
+ JsonNode jsonNode = objectMapper.readTree(dorisRecord.getRow());
+ Assert.assertEquals("2025-01-16", jsonNode.get("create_date").asText());
+ }
+
+ @Test
+ public void testDataChangeEventIfDatePartitionColumnIsNull() throws IOException {
+ Map configMap = new HashMap<>();
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE, "doris_database.\\.*");
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY, "create_date");
+ configMap.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT, "year");
+
+ Configuration dorisConfig = Configuration.fromMap(configMap);
+ dorisEventSerializer = new DorisEventSerializer(ZoneId.of("UTC"), dorisConfig);
+
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID, SCHEMA);
+ dorisEventSerializer.serialize(createTableEvent);
+
+ DataChangeEvent dataChangeEvent =
+ DataChangeEvent.insertEvent(
+ TABLE_ID,
+ RECORD_DATA_GENERATOR.generate(
+ new Object[] {
+ new BinaryStringData("1"),
+ new BinaryStringData("flink"),
+ null,
+ TimestampData.fromMillis(System.currentTimeMillis()),
+ }));
+
+ DorisRecord dorisRecord = dorisEventSerializer.serialize(dataChangeEvent);
+ JsonNode jsonNode = objectMapper.readTree(dorisRecord.getRow());
+ Assert.assertEquals(DorisSchemaUtils.DEFAULT_DATE, jsonNode.get("create_date").asText());
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaUtilsTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaUtilsTest.java
new file mode 100644
index 00000000000..400232a806b
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisSchemaUtilsTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.doris.sink;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.PhysicalColumn;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_EXCLUDE;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_KEY;
+import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_UNIT;
+
+/** A test for {@link org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils} . */
+public class DorisSchemaUtilsTest {
+ private static final String SPECIFIC_TABLE_IDENTIFIER = "doris_database.partition_table";
+ private static final TableId TABLE_ID = TableId.parse(SPECIFIC_TABLE_IDENTIFIER);
+
+ private static final Schema SCHEMA =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null, "2.71828"))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null, "Alice"))
+ .column(new PhysicalColumn("create_time", DataTypes.TIMESTAMP(), null, null))
+ .column(new PhysicalColumn("create_time2", DataTypes.TIMESTAMP(), null, null))
+ .primaryKey("id")
+ .build();
+
+ @Test
+ public void testPartitionInfoByIncludeAllAndDefaultPartitionKey() {
+ Map map = new HashMap<>();
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE, "doris_database.\\.*");
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY, "create_time");
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT, "year");
+
+ Configuration config = Configuration.fromMap(map);
+
+ Tuple2 partitionInfo =
+ DorisSchemaUtils.getPartitionInfo(config, SCHEMA, TABLE_ID);
+ Assert.assertEquals(partitionInfo, new Tuple2<>("create_time", "year"));
+ }
+
+ @Test
+ public void testPartitionInfoByExclude() {
+ Map map = new HashMap<>();
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_EXCLUDE, SPECIFIC_TABLE_IDENTIFIER);
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY, "create_time");
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT, "year");
+
+ Configuration config = Configuration.fromMap(map);
+
+ Tuple2 partitionInfo =
+ DorisSchemaUtils.getPartitionInfo(config, SCHEMA, TABLE_ID);
+ Assert.assertNull(partitionInfo);
+ }
+
+ @Test
+ public void testPartitionInfoByIncludeSpecificTableAndDefaultPartitionKey() {
+ Map map = new HashMap<>();
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE, SPECIFIC_TABLE_IDENTIFIER);
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY, "create_time");
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT, "year");
+
+ Configuration config = Configuration.fromMap(map);
+
+ Tuple2 partitionInfo =
+ DorisSchemaUtils.getPartitionInfo(config, SCHEMA, TABLE_ID);
+ Assert.assertEquals(new Tuple2<>("create_time", "year"), partitionInfo);
+ }
+
+ @Test
+ public void testPartitionInfoBySpecificTable() {
+ Map map = new HashMap<>();
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY, "create1_time");
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT, "month");
+ map.put(
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX
+ + SPECIFIC_TABLE_IDENTIFIER
+ + "."
+ + TABLE_CREATE_PARTITION_KEY,
+ "create_time");
+ map.put(
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX
+ + SPECIFIC_TABLE_IDENTIFIER
+ + "."
+ + TABLE_CREATE_PARTITION_UNIT,
+ "year");
+
+ Configuration config = Configuration.fromMap(map);
+
+ Tuple2 partitionInfo =
+ DorisSchemaUtils.getPartitionInfo(config, SCHEMA, TABLE_ID);
+ Assert.assertEquals(new Tuple2<>("create_time", "year"), partitionInfo);
+ }
+
+ @Test
+ public void testPartitionInfoByNonDateOrDatetimeType() {
+ Map map = new HashMap<>();
+ map.put(
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX
+ + SPECIFIC_TABLE_IDENTIFIER
+ + "."
+ + TABLE_CREATE_PARTITION_KEY,
+ "create_time");
+ map.put(
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX
+ + SPECIFIC_TABLE_IDENTIFIER
+ + "."
+ + TABLE_CREATE_PARTITION_UNIT,
+ "year");
+
+ Configuration config = Configuration.fromMap(map);
+
+ Schema schema =
+ Schema.newBuilder()
+ .column(new PhysicalColumn("id", DataTypes.INT().notNull(), null))
+ .column(new PhysicalColumn("number", DataTypes.DOUBLE(), null, "2.71828"))
+ .column(new PhysicalColumn("name", DataTypes.VARCHAR(17), null, "Alice"))
+ .column(new PhysicalColumn("create_time", DataTypes.STRING(), null, null))
+ .primaryKey("id")
+ .build();
+
+ Tuple2 partitionInfo =
+ DorisSchemaUtils.getPartitionInfo(config, schema, TABLE_ID);
+ Assert.assertNull(partitionInfo);
+ }
+
+ @Test
+ public void testPartitionInfoByIncludeAllAndSpecificPartitionKey() {
+ Map map = new HashMap<>();
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE, "doris_database.\\.*");
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY, "create_time");
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT, "year");
+ map.put(
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX
+ + SPECIFIC_TABLE_IDENTIFIER
+ + "."
+ + TABLE_CREATE_PARTITION_KEY,
+ "create_time2");
+ map.put(
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX
+ + SPECIFIC_TABLE_IDENTIFIER
+ + "."
+ + TABLE_CREATE_PARTITION_UNIT,
+ "month");
+
+ Configuration config = Configuration.fromMap(map);
+
+ Tuple2 partitionInfo =
+ DorisSchemaUtils.getPartitionInfo(config, SCHEMA, TABLE_ID);
+ Assert.assertEquals(new Tuple2<>("create_time2", "month"), partitionInfo);
+ }
+
+ @Test
+ public void testPartitionInfoByIncludeAndExclude() {
+ Map map = new HashMap<>();
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE, "doris_database.p\\.*");
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_EXCLUDE, "doris_database.partition_\\.*");
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY, "create_time");
+ map.put(TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT, "year");
+ map.put(
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX
+ + SPECIFIC_TABLE_IDENTIFIER
+ + "."
+ + TABLE_CREATE_PARTITION_KEY,
+ "create_time2");
+ map.put(
+ TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX
+ + SPECIFIC_TABLE_IDENTIFIER
+ + "."
+ + TABLE_CREATE_PARTITION_UNIT,
+ "month");
+
+ Configuration config = Configuration.fromMap(map);
+
+ Tuple2 partitionInfo =
+ DorisSchemaUtils.getPartitionInfo(config, SCHEMA, TABLE_ID);
+ Assert.assertNull(partitionInfo);
+
+ TableId newTableId = TableId.parse("doris_database.part_table");
+ Tuple2 newPartitionInfo =
+ DorisSchemaUtils.getPartitionInfo(config, SCHEMA, newTableId);
+ Assert.assertEquals(new Tuple2<>("create_time", "year"), newPartitionInfo);
+ }
+}
From 9a21c0468501103557feec9a8af791c410bb9892 Mon Sep 17 00:00:00 2001
From: "north.lin" <37775475+qg-lin@users.noreply.github.com>
Date: Fri, 17 Jan 2025 19:08:40 +0800
Subject: [PATCH 2/2] [FLINK-35152][pipeline-connector/doris] Support create
doris auto partition table
---
.../connectors/pipeline-connectors/doris.md | 8 +-
.../connectors/pipeline-connectors/doris.md | 8 +-
.../doris/sink/DorisMetadataApplier.java | 5 +-
.../doris/utils/DorisSchemaUtils.java | 88 ++++++++++++-------
4 files changed, 68 insertions(+), 41 deletions(-)
diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/doris.md b/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
index 2318463e016..5c40055e4ac 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/doris.md
@@ -192,10 +192,10 @@ pipeline:
支持的属性有:
table.create.auto-partition.properties.include
包含的经过route后的表集合,用逗号分隔,支持正则表达式;
table.create.auto-partition.properties.exclude
排除的经过route后的表集合,用逗号分隔,支持正则表达式;
- table.create.auto-partition.properties.default_partition_key
默认分区键;
- table.create.auto-partition.properties.default_partition_unit
默认分区单位;
- table.create.auto-partition.properties.DB.TABLE.partition_key
特定表的分区键,如未配置取默认分区键;
- table.create.auto-partition.properties.DB.TABLE.partition_unit
特定表的分区单位,如未配置取默认分区单位。
+ table.create.auto-partition.properties.default-partition-key
默认分区键;
+ table.create.auto-partition.properties.default-partition-unit
默认分区单位;
+ table.create.auto-partition.properties.DB.TABLE.partition-key
特定表的分区键,如未配置取默认分区键;
+ table.create.auto-partition.properties.DB.TABLE.partition-unit
特定表的分区单位,如未配置取默认分区单位。
注意:
1: 如果分区键不为DATE/DATETIME类型,则不会创建分区表。
2: Doris AUTO RANGE PARTITION不支持NULLABLE列作为分区列,如果您配置的分区键的值为空或者表创建完成后新增了NULLABLE分区列,系统将自动填充默认值(DATE类型为1970-01-01
,DATETIME类型为1970-01-01 00:00:00
),请选择合适的分区键。
diff --git a/docs/content/docs/connectors/pipeline-connectors/doris.md b/docs/content/docs/connectors/pipeline-connectors/doris.md
index 8b93eec2d8f..049f87c3f69 100644
--- a/docs/content/docs/connectors/pipeline-connectors/doris.md
+++ b/docs/content/docs/connectors/pipeline-connectors/doris.md
@@ -192,10 +192,10 @@ pipeline:
These properties are supported now:
table.create.auto-partition.properties.include
A collection of tables after route to include, separated by commas, supports regular expressions;
table.create.auto-partition.properties.exclude
A collection of tables after route to exclude, separated by commas, supports regular expressions;
- table.create.auto-partition.properties.default_partition_key
The default partition key;
- table.create.auto-partition.properties.default_partition_unit
The default partition unit;
- table.create.auto-partition.properties.DB.TABLE.partition_key
The partition key of a specific table. If not set, the default partition key is used;
- table.create.auto-partition.properties.DB.TABLE.partition_unit
The partition unit of a specific table. If not set, the default partition unit is used.
+ table.create.auto-partition.properties.default-partition-key
The default partition key;
+ table.create.auto-partition.properties.default-partition-unit
The default partition unit;
+ table.create.auto-partition.properties.DB.TABLE.partition-key
The partition key of a specific table. If not set, the default partition key is used;
+ table.create.auto-partition.properties.DB.TABLE.partition-unit
The partition unit of a specific table. If not set, the default partition unit is used.
Note:
1: If the partition key is not DATE/DATETIME type, auto partition tables won't be created.
2: Doris AUTO RANGE PARTITION does not support NULLABLE columns as partition key, if Flink CDC get a NULL value or a NULLABLE partition key was added after the table was created, will automatically fill it with a default value(DATE:1970-01-01
, DATETIME:1970-01-01 00:00:00
), chose a suitable partition key is very important.
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index a2014c1571f..559b8557bc4 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -167,7 +167,10 @@ private void applyCreateTableEvent(CreateTableEvent event) throws SchemaEvolveEx
Tuple2 partitionInfo =
DorisSchemaUtils.getPartitionInfo(config, schema, tableId);
- tableSchema.setPartitionInfo(partitionInfo);
+ if (partitionInfo != null) {
+ LOG.info("Partition info of {} is: {}.", tableId.identifier(), partitionInfo);
+ tableSchema.setPartitionInfo(partitionInfo);
+ }
schemaChangeManager.createTable(tableSchema);
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java
index 7d780ef23cb..649e0af1c3b 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/utils/DorisSchemaUtils.java
@@ -60,48 +60,72 @@ public static Tuple2 getPartitionInfo(
Map autoPartitionProperties =
DorisDataSinkOptions.getPropertiesByPrefix(
config, TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX);
- String excludes = autoPartitionProperties.get(TABLE_CREATE_PARTITION_EXCLUDE);
+ if (autoPartitionProperties.isEmpty()) {
+ return null;
+ }
+
+ if (isExcluded(autoPartitionProperties, tableId)
+ || !isIncluded(autoPartitionProperties, tableId)) {
+ return null;
+ }
+
+ String partitionKey =
+ getPartitionProperty(
+ autoPartitionProperties,
+ tableId,
+ TABLE_CREATE_PARTITION_KEY,
+ TABLE_CREATE_DEFAULT_PARTITION_KEY);
+ if (partitionKey == null || !schema.getColumn(partitionKey).isPresent()) {
+ return null;
+ }
+
+ String partitionUnit =
+ getPartitionProperty(
+ autoPartitionProperties,
+ tableId,
+ TABLE_CREATE_PARTITION_UNIT,
+ TABLE_CREATE_DEFAULT_PARTITION_UNIT);
+ if (partitionUnit == null) {
+ return null;
+ }
+
+ DataType dataType = schema.getColumn(partitionKey).get().getType();
+ return isValidDataType(dataType) ? new Tuple2<>(partitionKey, partitionUnit) : null;
+ }
+
+ private static boolean isExcluded(Map properties, TableId tableId) {
+ String excludes = properties.get(TABLE_CREATE_PARTITION_EXCLUDE);
if (!StringUtils.isNullOrWhitespaceOnly(excludes)) {
Selectors selectExclude =
new Selectors.SelectorsBuilder().includeTables(excludes).build();
- if (selectExclude.isMatch(tableId)) {
- return null;
- }
+ return selectExclude.isMatch(tableId);
}
+ return false;
+ }
- String includes = autoPartitionProperties.get(TABLE_CREATE_PARTITION_INCLUDE);
+ private static boolean isIncluded(Map properties, TableId tableId) {
+ String includes = properties.get(TABLE_CREATE_PARTITION_INCLUDE);
if (!StringUtils.isNullOrWhitespaceOnly(includes)) {
Selectors selectInclude =
new Selectors.SelectorsBuilder().includeTables(includes).build();
- if (!selectInclude.isMatch(tableId)) {
- return null;
- }
- }
-
- String partitionKey =
- autoPartitionProperties.get(
- tableId.identifier() + "." + TABLE_CREATE_PARTITION_KEY);
- String partitionUnit =
- autoPartitionProperties.get(
- tableId.identifier() + "." + TABLE_CREATE_PARTITION_UNIT);
- if (StringUtils.isNullOrWhitespaceOnly(partitionKey)) {
- partitionKey = autoPartitionProperties.get(TABLE_CREATE_DEFAULT_PARTITION_KEY);
- }
- if (StringUtils.isNullOrWhitespaceOnly(partitionUnit)) {
- partitionUnit = autoPartitionProperties.get(TABLE_CREATE_DEFAULT_PARTITION_UNIT);
+ return selectInclude.isMatch(tableId);
}
+ return true;
+ }
- if (schema.getColumn(partitionKey).isPresent()
- && !StringUtils.isNullOrWhitespaceOnly(partitionKey)) {
+ private static String getPartitionProperty(
+ Map properties,
+ TableId tableId,
+ String specificKey,
+ String defaultKey) {
+ String key = properties.get(tableId.identifier() + "." + specificKey);
+ return StringUtils.isNullOrWhitespaceOnly(key) ? properties.get(defaultKey) : key;
+ }
- DataType dataType = schema.getColumn(partitionKey).get().getType();
- if (dataType instanceof LocalZonedTimestampType
- || dataType instanceof TimestampType
- || dataType instanceof ZonedTimestampType
- || dataType instanceof DateType) {
- return new Tuple2<>(partitionKey, partitionUnit);
- }
- }
- return null;
+ private static boolean isValidDataType(DataType dataType) {
+ return dataType instanceof LocalZonedTimestampType
+ || dataType instanceof TimestampType
+ || dataType instanceof ZonedTimestampType
+ || dataType instanceof DateType;
}
}