Skip to content

Commit 81d916f

Browse files
authored
[FLINK-35272][cdc-runtime] Transform supports omitting and renaming computed column
This closes #3285.
1 parent 2dabfc0 commit 81d916f

File tree

37 files changed

+4624
-1070
lines changed

37 files changed

+4624
-1070
lines changed

docs/content.zh/docs/connectors/pipeline-connectors/doris.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ pipeline:
186186
<thead>
187187
<tr>
188188
<th class="text-left" style="width:10%;">CDC type</th>
189-
<th class="text-left" style="width:30%;">Doris type<a href="https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-types/Data-Types/BOOLEAN/"></a></th>
189+
<th class="text-left" style="width:30%;">Doris type</th>
190190
<th class="text-left" style="width:60%;">NOTE</th>
191191
</tr>
192192
</thead>

docs/content.zh/docs/core-concept/transform.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,9 @@ transform:
252252
Tips: The format of table-options is `key1=value1,key2=value2`.
253253

254254
## Classification mapping
255-
Multiple transform rules can be defined to classify input data rows and apply different processings. For example, we may define a transform rule as follows:
255+
Multiple transform rules can be defined to classify input data rows and apply different processing.
256+
Only the first matched transform rule will apply.
257+
For example, we may define a transform rule as follows:
256258

257259
```yaml
258260
transform:

docs/content/docs/connectors/pipeline-connectors/doris.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ pipeline:
186186
<thead>
187187
<tr>
188188
<th class="text-left" style="width:10%;">Flink CDC Type</th>
189-
<th class="text-left" style="width:30%;"><a href="https://doris.apache.org/docs/dev/sql-manual/sql-types/Data-Types/BOOLEAN/">Doris Type</a></th>
189+
<th class="text-left" style="width:30%;">Doris Type</th>
190190
<th class="text-left" style="width:60%;">Note</th>
191191
</tr>
192192
</thead>

docs/content/docs/core-concept/transform.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,9 @@ transform:
252252
Tips: The format of table-options is `key1=value1,key2=value2`.
253253

254254
## Classification mapping
255-
Multiple transform rules can be defined to classify input data rows and apply different processings. For example, we may define a transform rule as follows:
255+
Multiple transform rules can be defined to classify input data rows and apply different processing.
256+
Only the first matched transform rule will apply.
257+
For example, we may define a transform rule as follows:
256258

257259
```yaml
258260
transform:

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java

+186
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.common.utils;
1919

2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2122
import org.apache.flink.cdc.common.data.RecordData;
2223
import org.apache.flink.cdc.common.event.AddColumnEvent;
2324
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
@@ -26,11 +27,21 @@
2627
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2728
import org.apache.flink.cdc.common.schema.Column;
2829
import org.apache.flink.cdc.common.schema.Schema;
30+
import org.apache.flink.cdc.common.types.DataType;
31+
import org.apache.flink.cdc.common.types.DataTypeFamily;
32+
import org.apache.flink.cdc.common.types.DataTypeRoot;
33+
import org.apache.flink.cdc.common.types.DataTypes;
34+
import org.apache.flink.cdc.common.types.DecimalType;
35+
36+
import javax.annotation.Nullable;
2937

3038
import java.util.ArrayList;
39+
import java.util.Collections;
3140
import java.util.LinkedList;
3241
import java.util.List;
42+
import java.util.Objects;
3343
import java.util.stream.Collectors;
44+
import java.util.stream.IntStream;
3445

3546
/** Utils for {@link Schema} to perform the ability of evolution. */
3647
@PublicEvolving
@@ -56,6 +67,181 @@ public static List<RecordData.FieldGetter> createFieldGetters(List<Column> colum
5667
return fieldGetters;
5768
}
5869

70+
/** Restore original data fields from RecordData structure. */
71+
public static List<Object> restoreOriginalData(
72+
@Nullable RecordData recordData, List<RecordData.FieldGetter> fieldGetters) {
73+
if (recordData == null) {
74+
return Collections.emptyList();
75+
}
76+
List<Object> actualFields = new ArrayList<>();
77+
for (RecordData.FieldGetter fieldGetter : fieldGetters) {
78+
actualFields.add(fieldGetter.getFieldOrNull(recordData));
79+
}
80+
return actualFields;
81+
}
82+
83+
/** Merge compatible upstream schemas. */
84+
public static Schema inferWiderSchema(List<Schema> schemas) {
85+
if (schemas.isEmpty()) {
86+
return null;
87+
} else if (schemas.size() == 1) {
88+
return schemas.get(0);
89+
} else {
90+
Schema outputSchema = null;
91+
for (Schema schema : schemas) {
92+
outputSchema = inferWiderSchema(outputSchema, schema);
93+
}
94+
return outputSchema;
95+
}
96+
}
97+
98+
/** Try to combine two schemas with potential incompatible type. */
99+
@VisibleForTesting
100+
public static Schema inferWiderSchema(@Nullable Schema lSchema, Schema rSchema) {
101+
if (lSchema == null) {
102+
return rSchema;
103+
}
104+
if (lSchema.getColumnCount() != rSchema.getColumnCount()) {
105+
throw new IllegalStateException(
106+
String.format(
107+
"Unable to merge schema %s and %s with different column counts.",
108+
lSchema, rSchema));
109+
}
110+
if (!lSchema.primaryKeys().equals(rSchema.primaryKeys())) {
111+
throw new IllegalStateException(
112+
String.format(
113+
"Unable to merge schema %s and %s with different primary keys.",
114+
lSchema, rSchema));
115+
}
116+
if (!lSchema.partitionKeys().equals(rSchema.partitionKeys())) {
117+
throw new IllegalStateException(
118+
String.format(
119+
"Unable to merge schema %s and %s with different partition keys.",
120+
lSchema, rSchema));
121+
}
122+
if (!lSchema.options().equals(rSchema.options())) {
123+
throw new IllegalStateException(
124+
String.format(
125+
"Unable to merge schema %s and %s with different options.",
126+
lSchema, rSchema));
127+
}
128+
if (!Objects.equals(lSchema.comment(), rSchema.comment())) {
129+
throw new IllegalStateException(
130+
String.format(
131+
"Unable to merge schema %s and %s with different comments.",
132+
lSchema, rSchema));
133+
}
134+
135+
List<Column> leftColumns = lSchema.getColumns();
136+
List<Column> rightColumns = rSchema.getColumns();
137+
138+
List<Column> mergedColumns =
139+
IntStream.range(0, lSchema.getColumnCount())
140+
.mapToObj(i -> inferWiderColumn(leftColumns.get(i), rightColumns.get(i)))
141+
.collect(Collectors.toList());
142+
143+
return lSchema.copy(mergedColumns);
144+
}
145+
146+
/** Try to combine two columns with potential incompatible type. */
147+
@VisibleForTesting
148+
public static Column inferWiderColumn(Column lColumn, Column rColumn) {
149+
if (!Objects.equals(lColumn.getName(), rColumn.getName())) {
150+
throw new IllegalStateException(
151+
String.format(
152+
"Unable to merge column %s and %s with different name.",
153+
lColumn, rColumn));
154+
}
155+
if (!Objects.equals(lColumn.getComment(), rColumn.getComment())) {
156+
throw new IllegalStateException(
157+
String.format(
158+
"Unable to merge column %s and %s with different comments.",
159+
lColumn, rColumn));
160+
}
161+
return lColumn.copy(inferWiderType(lColumn.getType(), rColumn.getType()));
162+
}
163+
164+
/** Try to combine given data types to a compatible wider data type. */
165+
@VisibleForTesting
166+
public static DataType inferWiderType(DataType lType, DataType rType) {
167+
// Ignore nullability during data type merge
168+
boolean nullable = lType.isNullable() || rType.isNullable();
169+
lType = lType.notNull();
170+
rType = rType.notNull();
171+
172+
DataType mergedType;
173+
if (lType.equals(rType)) {
174+
// identical type
175+
mergedType = rType;
176+
} else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
177+
&& rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
178+
mergedType = DataTypes.BIGINT();
179+
} else if (lType.is(DataTypeFamily.CHARACTER_STRING)
180+
&& rType.is(DataTypeFamily.CHARACTER_STRING)) {
181+
mergedType = DataTypes.STRING();
182+
} else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
183+
&& rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
184+
mergedType = DataTypes.DOUBLE();
185+
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) {
186+
// Merge two decimal types
187+
DecimalType lhsDecimal = (DecimalType) lType;
188+
DecimalType rhsDecimal = (DecimalType) rType;
189+
int resultIntDigits =
190+
Math.max(
191+
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
192+
rhsDecimal.getPrecision() - rhsDecimal.getScale());
193+
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
194+
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
195+
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
196+
// Merge decimal and int
197+
DecimalType lhsDecimal = (DecimalType) lType;
198+
mergedType =
199+
DataTypes.DECIMAL(
200+
Math.max(
201+
lhsDecimal.getPrecision(),
202+
lhsDecimal.getScale() + getNumericPrecision(rType)),
203+
lhsDecimal.getScale());
204+
} else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
205+
// Merge decimal and int
206+
DecimalType rhsDecimal = (DecimalType) rType;
207+
mergedType =
208+
DataTypes.DECIMAL(
209+
Math.max(
210+
rhsDecimal.getPrecision(),
211+
rhsDecimal.getScale() + getNumericPrecision(lType)),
212+
rhsDecimal.getScale());
213+
} else {
214+
throw new IllegalStateException(
215+
String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType));
216+
}
217+
218+
if (nullable) {
219+
return mergedType.nullable();
220+
} else {
221+
return mergedType.notNull();
222+
}
223+
}
224+
225+
@VisibleForTesting
226+
public static int getNumericPrecision(DataType dataType) {
227+
if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {
228+
if (dataType.is(DataTypeRoot.TINYINT)) {
229+
return 3;
230+
} else if (dataType.is(DataTypeRoot.SMALLINT)) {
231+
return 5;
232+
} else if (dataType.is(DataTypeRoot.INTEGER)) {
233+
return 10;
234+
} else if (dataType.is(DataTypeRoot.BIGINT)) {
235+
return 19;
236+
} else if (dataType.is(DataTypeRoot.DECIMAL)) {
237+
return ((DecimalType) dataType).getPrecision();
238+
}
239+
}
240+
241+
throw new IllegalArgumentException(
242+
"Failed to get precision of non-exact decimal type " + dataType);
243+
}
244+
59245
/** apply SchemaChangeEvent to the old schema and return the schema after changing. */
60246
public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) {
61247
if (event instanceof AddColumnEvent) {

0 commit comments

Comments
 (0)