Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35152][pipeline-connector/doris] Support create doris auto partition table #3871

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,25 @@ pipeline:
查看更多关于 <a href="https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-statements/Data-Definition-Statements/Create/CREATE-TABLE/"> Doris Table 的属性</a></td>
</td>
</tr>
<tr>
<td>table.create.auto-partition.properties.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>创建自动分区表的配置。<br/>
当前仅支持DATE/DATETIME类型列的AUTO RANGE PARTITION,分区函数为<code>date_trunc</code>,且Doris版本必须大于2.1.6,查看更多关于 <a href="https://doris.apache.org/docs/table-design/data-partitioning/auto-partitioning">Doris自动分区</a><br/>
支持的属性有:<br/>
<code> table.create.auto-partition.properties.include</code>包含的经过route后的表集合,用逗号分隔,支持正则表达式;<br/>
<code> table.create.auto-partition.properties.exclude</code>排除的经过route后的表集合,用逗号分隔,支持正则表达式;<br/>
<code> table.create.auto-partition.properties.default_partition_key</code>默认分区键;<br/>
<code> table.create.auto-partition.properties.default_partition_unit</code>默认分区单位;<br/>
<code> table.create.auto-partition.properties.DB.TABLE.partition_key</code>特定表的分区键,如未配置取默认分区键;<br/>
<code> table.create.auto-partition.properties.DB.TABLE.partition_unit</code>特定表的分区单位,如未配置取默认分区单位。<br/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add these properties though transform

注意:<br/>
1: 如果分区键不为DATE/DATETIME类型,则不会创建分区表。<br/>
2: Doris AUTO RANGE PARTITION不支持NULLABLE列作为分区列,如果您配置的分区键的值为空或者表创建完成后新增了NULLABLE分区列,系统将自动填充默认值(DATE类型为<code>1970-01-01</code>,DATETIME类型为<code>1970-01-01 00:00:00</code>),请选择合适的分区键。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
19 changes: 19 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,25 @@ pipeline:
See more about <a href="https://doris.apache.org/docs/dev/sql-manual/sql-statements/Data-Definition-Statements/Create/CREATE-TABLE/"> Doris Table Properties</a></td>
</td>
</tr>
<tr>
<td>table.create.auto-partition.properties.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Create the auto partition Properties configuration of the table.<br/>
Currently the partition function only supports date_trunc, and the partition column supports only DATE or DATETIME types, and the version of Doris must greater than 2.1.6. See more about <a href="https://doris.apache.org/docs/table-design/data-partitioning/auto-partitioning">Doris Auto Partitioning</a><br/>
These properties are supported now:<br/>
<code> table.create.auto-partition.properties.include</code>A collection of tables after route to include, separated by commas, supports regular expressions;<br/>
<code> table.create.auto-partition.properties.exclude</code>A collection of tables after route to exclude, separated by commas, supports regular expressions;<br/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better not to let user aware of the concept of route, which will make it hard to understand.

<code> table.create.auto-partition.properties.default_partition_key</code>The default partition key;<br/>
<code> table.create.auto-partition.properties.default_partition_unit</code>The default partition unit;<br/>
<code> table.create.auto-partition.properties.DB.TABLE.partition_key</code>The partition key of a specific table. If not set, the default partition key is used;<br/>
<code> table.create.auto-partition.properties.DB.TABLE.partition_unit</code>The partition unit of a specific table. If not set, the default partition unit is used.<br/>
Note:<br/>
1: If the partition key is not DATE/DATETIME type, auto partition tables won't be created.<br/>
2: Doris AUTO RANGE PARTITION does not support NULLABLE columns as partition key, if Flink CDC get a NULL value or a NULLABLE partition key was added after the table was created, will automatically fill it with a default value(DATE:<code>1970-01-01</code>, DATETIME:<code>1970-01-01 00:00:00</code>), chose a suitable partition key is very important.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ limitations under the License.
<name>flink-cdc-pipeline-connector-doris</name>

<properties>
<doris.connector.version>24.0.1</doris.connector.version>
<doris.connector.version>24.1.0</doris.connector.version>
<mysql.connector.version>8.0.26</mysql.connector.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_MAX_RETRIES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.SINK_USE_CACHE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.STREAM_LOAD_PROP_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.USERNAME;

Expand All @@ -67,7 +68,10 @@ public class DorisDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(TABLE_CREATE_PROPERTIES_PREFIX, STREAM_LOAD_PROP_PREFIX);
.validateExcept(
TABLE_CREATE_PROPERTIES_PREFIX,
STREAM_LOAD_PROP_PREFIX,
TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX);

Configuration config = context.getFactoryConfiguration();
DorisOptions.Builder optionsBuilder = DorisOptions.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ public EventSinkProvider getEventSinkProvider() {
dorisOptions,
readOptions,
executionOptions,
new DorisEventSerializer(zoneId)));
new DorisEventSerializer(zoneId, configuration)));
} else {
return FlinkSinkProvider.of(
new DorisBatchSink<>(
dorisOptions,
readOptions,
executionOptions,
new DorisEventSerializer(zoneId)));
new DorisEventSerializer(zoneId, configuration)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,29 @@ public class DorisDataSinkOptions {
public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
// Prefix for Doris Create table.
public static final String TABLE_CREATE_PROPERTIES_PREFIX = "table.create.properties.";
// Prefix for Doris Create auto partition table.
public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX =
"table.create.auto-partition.properties.";
public static final String TABLE_CREATE_PARTITION_KEY = "partition-key";
public static final String TABLE_CREATE_PARTITION_UNIT = "partition-unit";

public static final String TABLE_CREATE_DEFAULT_PARTITION_KEY =
"default-" + TABLE_CREATE_PARTITION_KEY;
public static final String TABLE_CREATE_DEFAULT_PARTITION_UNIT =
"default-" + TABLE_CREATE_PARTITION_UNIT;

public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_KEY =
TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_DEFAULT_PARTITION_KEY;
public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_DEFAULT_PARTITION_UNIT =
TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_DEFAULT_PARTITION_UNIT;

public static final String TABLE_CREATE_PARTITION_INCLUDE = "include";
public static final String TABLE_CREATE_PARTITION_EXCLUDE = "exclude";

public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_INCLUDE =
TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_PARTITION_INCLUDE;
public static final String TABLE_CREATE_AUTO_PARTITION_PROPERTIES_EXCLUDE =
TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX + TABLE_CREATE_PARTITION_EXCLUDE;

public static Map<String, String> getPropertiesByPrefix(
Configuration tableOptions, String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.cdc.connectors.doris.sink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
Expand All @@ -26,8 +28,14 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DateType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -42,6 +50,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign;

Expand All @@ -61,8 +70,11 @@ public class DorisEventSerializer implements DorisRecordSerializer<Event> {
/** ZoneId from pipeline config to support timestamp with local time zone. */
public final ZoneId pipelineZoneId;

public DorisEventSerializer(ZoneId zoneId) {
public final Configuration dorisConfig;

public DorisEventSerializer(ZoneId zoneId, Configuration config) {
pipelineZoneId = zoneId;
dorisConfig = config;
}

@Override
Expand Down Expand Up @@ -108,6 +120,30 @@ private DorisRecord applyDataChangeEvent(DataChangeEvent event) throws JsonProce
throw new UnsupportedOperationException("Unsupport Operation " + op);
}

// get partition info from config
Tuple2<String, String> partitionInfo =
DorisSchemaUtils.getPartitionInfo(dorisConfig, schema, tableId);
if (!Objects.isNull(partitionInfo)) {
String partitionKey = partitionInfo.f0;
Object partitionValue = valueMap.get(partitionKey);
// fill partition column by default value if null
if (Objects.isNull(partitionValue)) {
schema.getColumn(partitionKey)
.ifPresent(
column -> {
DataType dataType = column.getType();
if (dataType instanceof DateType) {
valueMap.put(partitionKey, DorisSchemaUtils.DEFAULT_DATE);
} else if (dataType instanceof LocalZonedTimestampType
|| dataType instanceof TimestampType
|| dataType instanceof ZonedTimestampType) {
valueMap.put(
partitionKey, DorisSchemaUtils.DEFAULT_DATETIME);
}
});
}
}

return DorisRecord.of(
tableId.getSchemaName(),
tableId.getTableName(),
Expand All @@ -121,7 +157,6 @@ public Map<String, Object> serializerRecord(RecordData recordData, Schema schema
Preconditions.checkState(
columns.size() == recordData.getArity(),
"Column size does not match the data size");

for (int i = 0; i < recordData.getArity(); i++) {
DorisRowConverter.SerializationConverter converter =
DorisRowConverter.createNullableExternalConverter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.doris.sink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
Expand All @@ -39,6 +40,7 @@
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.connectors.doris.utils.DorisSchemaUtils;
import org.apache.flink.util.CollectionUtil;

import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
Expand Down Expand Up @@ -162,6 +164,10 @@ private void applyCreateTableEvent(CreateTableEvent event) throws SchemaEvolveEx
DorisDataSinkOptions.getPropertiesByPrefix(
config, TABLE_CREATE_PROPERTIES_PREFIX);
tableSchema.setProperties(tableProperties);

Tuple2<String, String> partitionInfo =
DorisSchemaUtils.getPartitionInfo(config, schema, tableId);
tableSchema.setPartitionInfo(partitionInfo);
qg-lin marked this conversation as resolved.
Show resolved Hide resolved
schemaChangeManager.createTable(tableSchema);
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.doris.utils;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DateType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions;

import java.util.Map;

import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_DEFAULT_PARTITION_KEY;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_DEFAULT_PARTITION_UNIT;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_EXCLUDE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_INCLUDE;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_KEY;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PARTITION_UNIT;

/** Utilities for doris schema. */
public class DorisSchemaUtils {

public static final String DEFAULT_DATE = "1970-01-01";
public static final String DEFAULT_DATETIME = "1970-01-01 00:00:00";

/**
* Get partition info by config. Currently only supports DATE/TIMESTAMP AUTO RANGE PARTITION and
* doris version should greater than 2.1.6
*
* @param config
* @param schema
* @param tableId
* @return
*/
public static Tuple2<String, String> getPartitionInfo(
Configuration config, Schema schema, TableId tableId) {
Map<String, String> autoPartitionProperties =
DorisDataSinkOptions.getPropertiesByPrefix(
config, TABLE_CREATE_AUTO_PARTITION_PROPERTIES_PREFIX);
String excludes = autoPartitionProperties.get(TABLE_CREATE_PARTITION_EXCLUDE);
if (!StringUtils.isNullOrWhitespaceOnly(excludes)) {
Selectors selectExclude =
new Selectors.SelectorsBuilder().includeTables(excludes).build();
if (selectExclude.isMatch(tableId)) {
return null;
}
}

String includes = autoPartitionProperties.get(TABLE_CREATE_PARTITION_INCLUDE);
if (!StringUtils.isNullOrWhitespaceOnly(includes)) {
Selectors selectInclude =
new Selectors.SelectorsBuilder().includeTables(includes).build();
if (!selectInclude.isMatch(tableId)) {
return null;
}
}

String partitionKey =
autoPartitionProperties.get(
tableId.identifier() + "." + TABLE_CREATE_PARTITION_KEY);
String partitionUnit =
autoPartitionProperties.get(
tableId.identifier() + "." + TABLE_CREATE_PARTITION_UNIT);
if (StringUtils.isNullOrWhitespaceOnly(partitionKey)) {
partitionKey = autoPartitionProperties.get(TABLE_CREATE_DEFAULT_PARTITION_KEY);
}
if (StringUtils.isNullOrWhitespaceOnly(partitionUnit)) {
partitionUnit = autoPartitionProperties.get(TABLE_CREATE_DEFAULT_PARTITION_UNIT);
}

if (schema.getColumn(partitionKey).isPresent()
&& !StringUtils.isNullOrWhitespaceOnly(partitionKey)) {

DataType dataType = schema.getColumn(partitionKey).get().getType();
if (dataType instanceof LocalZonedTimestampType
|| dataType instanceof TimestampType
|| dataType instanceof ZonedTimestampType
|| dataType instanceof DateType) {
return new Tuple2<>(partitionKey, partitionUnit);
}
}
return null;
}
}
Loading
Loading