Skip to content

Commit ec700df

Browse files
committed
Fix transformed schema merging logic & Fix missing rewrite in CASE WHEN statement & Remove unused containFilteredComputedColumn field
1 parent 07324de commit ec700df

File tree

6 files changed

+447
-50
lines changed

6 files changed

+447
-50
lines changed

Diff for: flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java

+164
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.common.utils;
1919

2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2122
import org.apache.flink.cdc.common.data.RecordData;
2223
import org.apache.flink.cdc.common.event.AddColumnEvent;
2324
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
@@ -26,14 +27,21 @@
2627
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2728
import org.apache.flink.cdc.common.schema.Column;
2829
import org.apache.flink.cdc.common.schema.Schema;
30+
import org.apache.flink.cdc.common.types.DataType;
31+
import org.apache.flink.cdc.common.types.DataTypeFamily;
32+
import org.apache.flink.cdc.common.types.DataTypeRoot;
33+
import org.apache.flink.cdc.common.types.DataTypes;
34+
import org.apache.flink.cdc.common.types.DecimalType;
2935

3036
import javax.annotation.Nullable;
3137

3238
import java.util.ArrayList;
3339
import java.util.Collections;
3440
import java.util.LinkedList;
3541
import java.util.List;
42+
import java.util.Objects;
3643
import java.util.stream.Collectors;
44+
import java.util.stream.IntStream;
3745

3846
/** Utils for {@link Schema} to perform the ability of evolution. */
3947
@PublicEvolving
@@ -72,6 +80,162 @@ public static List<Object> restoreOriginalData(
7280
return actualFields;
7381
}
7482

83+
/** Merge compatible upstream schemas. */
84+
public static Schema mergeCompatibleUpstreamSchema(List<Schema> schemas) {
85+
if (schemas.isEmpty()) {
86+
return null;
87+
} else if (schemas.size() == 1) {
88+
return schemas.get(0);
89+
} else {
90+
Schema outputSchema = null;
91+
for (Schema schema : schemas) {
92+
outputSchema = mergeSchema(outputSchema, schema);
93+
}
94+
return outputSchema;
95+
}
96+
}
97+
98+
/** Try to combine two schemas with potential incompatible type. */
99+
@VisibleForTesting
100+
public static Schema mergeSchema(@Nullable Schema lhs, Schema rhs) {
101+
if (lhs == null) {
102+
return rhs;
103+
}
104+
if (lhs.getColumnCount() != rhs.getColumnCount()) {
105+
throw new IllegalStateException(
106+
String.format(
107+
"Unable to merge schema %s and %s with different column counts.",
108+
lhs, rhs));
109+
}
110+
if (!lhs.primaryKeys().equals(rhs.primaryKeys())) {
111+
throw new IllegalStateException(
112+
String.format(
113+
"Unable to merge schema %s and %s with different primary keys.",
114+
lhs, rhs));
115+
}
116+
if (!lhs.partitionKeys().equals(rhs.partitionKeys())) {
117+
throw new IllegalStateException(
118+
String.format(
119+
"Unable to merge schema %s and %s with different partition keys.",
120+
lhs, rhs));
121+
}
122+
if (!lhs.options().equals(rhs.options())) {
123+
throw new IllegalStateException(
124+
String.format(
125+
"Unable to merge schema %s and %s with different options.", lhs, rhs));
126+
}
127+
if (!Objects.equals(lhs.comment(), rhs.comment())) {
128+
throw new IllegalStateException(
129+
String.format(
130+
"Unable to merge schema %s and %s with different comments.", lhs, rhs));
131+
}
132+
133+
List<Column> leftColumns = lhs.getColumns();
134+
List<Column> rightColumns = rhs.getColumns();
135+
136+
List<Column> mergedColumns =
137+
IntStream.range(0, lhs.getColumnCount())
138+
.mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i)))
139+
.collect(Collectors.toList());
140+
141+
return lhs.copy(mergedColumns);
142+
}
143+
144+
/** Try to combine two columns with potential incompatible type. */
145+
@VisibleForTesting
146+
public static Column mergeColumn(Column lhs, Column rhs) {
147+
if (!Objects.equals(lhs.getName(), rhs.getName())) {
148+
throw new IllegalStateException(
149+
String.format(
150+
"Unable to merge column %s and %s with different name.", lhs, rhs));
151+
}
152+
if (!Objects.equals(lhs.getComment(), rhs.getComment())) {
153+
throw new IllegalStateException(
154+
String.format(
155+
"Unable to merge column %s and %s with different comments.", lhs, rhs));
156+
}
157+
return lhs.copy(mergeDataType(lhs.getType(), rhs.getType()));
158+
}
159+
160+
/** Try to combine given data types to a compatible wider data type. */
161+
@VisibleForTesting
162+
public static DataType mergeDataType(DataType lhs, DataType rhs) {
163+
// Ignore nullability during data type merge
164+
boolean nullable = lhs.isNullable() || rhs.isNullable();
165+
lhs = lhs.notNull();
166+
rhs = rhs.notNull();
167+
168+
DataType mergedType;
169+
if (lhs.equals(rhs)) {
170+
// identical type
171+
mergedType = rhs;
172+
} else if (lhs.is(DataTypeFamily.INTEGER_NUMERIC)
173+
&& rhs.is(DataTypeFamily.INTEGER_NUMERIC)) {
174+
mergedType = DataTypes.BIGINT();
175+
} else if (lhs.is(DataTypeFamily.CHARACTER_STRING)
176+
&& rhs.is(DataTypeFamily.CHARACTER_STRING)) {
177+
mergedType = DataTypes.STRING();
178+
} else if (lhs.is(DataTypeFamily.APPROXIMATE_NUMERIC)
179+
&& rhs.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
180+
mergedType = DataTypes.DOUBLE();
181+
} else if (lhs.is(DataTypeRoot.DECIMAL) && rhs.is(DataTypeRoot.DECIMAL)) {
182+
// Merge two decimal types
183+
DecimalType lhsDecimal = (DecimalType) lhs;
184+
DecimalType rhsDecimal = (DecimalType) rhs;
185+
mergedType =
186+
DataTypes.DECIMAL(
187+
Math.max(lhsDecimal.getPrecision(), rhsDecimal.getPrecision()),
188+
Math.max(lhsDecimal.getScale(), rhsDecimal.getScale()));
189+
} else if (lhs.is(DataTypeRoot.DECIMAL) && rhs.is(DataTypeFamily.EXACT_NUMERIC)) {
190+
// Merge decimal and int
191+
DecimalType lhsDecimal = (DecimalType) lhs;
192+
mergedType =
193+
DataTypes.DECIMAL(
194+
Math.max(
195+
lhsDecimal.getPrecision(),
196+
lhsDecimal.getScale() + getNumericPrecision(rhs)),
197+
lhsDecimal.getScale());
198+
} else if (rhs.is(DataTypeRoot.DECIMAL) && lhs.is(DataTypeFamily.EXACT_NUMERIC)) {
199+
// Merge decimal and int
200+
DecimalType rhsDecimal = (DecimalType) rhs;
201+
mergedType =
202+
DataTypes.DECIMAL(
203+
Math.max(
204+
rhsDecimal.getPrecision(),
205+
rhsDecimal.getScale() + getNumericPrecision(lhs)),
206+
rhsDecimal.getScale());
207+
} else {
208+
throw new IllegalStateException(
209+
String.format("Incompatible types: \"%s\" and \"%s\"", lhs, rhs));
210+
}
211+
212+
if (nullable) {
213+
return mergedType.nullable();
214+
} else {
215+
return mergedType.notNull();
216+
}
217+
}
218+
219+
@VisibleForTesting
220+
public static int getNumericPrecision(DataType dataType) {
221+
if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {
222+
if (dataType.is(DataTypeRoot.TINYINT)) {
223+
return 3;
224+
} else if (dataType.is(DataTypeRoot.SMALLINT)) {
225+
return 5;
226+
} else if (dataType.is(DataTypeRoot.INTEGER)) {
227+
return 10;
228+
} else if (dataType.is(DataTypeRoot.BIGINT)) {
229+
return 19;
230+
} else if (dataType.is(DataTypeRoot.DECIMAL)) {
231+
return ((DecimalType) dataType).getPrecision();
232+
}
233+
}
234+
235+
throw new IllegalArgumentException(
236+
"Failed to get precision of non-exact decimal type " + dataType);
237+
}
238+
75239
/** apply SchemaChangeEvent to the old schema and return the schema after changing. */
76240
public static Schema applySchemaChangeEvent(Schema schema, SchemaChangeEvent event) {
77241
if (event instanceof AddColumnEvent) {

0 commit comments

Comments
 (0)