Skip to content

Commit abb397f

Browse files
committed
Address comments
1 parent 6f53955 commit abb397f

File tree

16 files changed

+276
-167
lines changed

16 files changed

+276
-167
lines changed

Diff for: flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -81,23 +81,23 @@ public static List<Object> restoreOriginalData(
8181
}
8282

8383
/** Merge compatible upstream schemas. */
84-
public static Schema mergeCompatibleSchemas(List<Schema> schemas) {
84+
public static Schema inferWiderSchema(List<Schema> schemas) {
8585
if (schemas.isEmpty()) {
8686
return null;
8787
} else if (schemas.size() == 1) {
8888
return schemas.get(0);
8989
} else {
9090
Schema outputSchema = null;
9191
for (Schema schema : schemas) {
92-
outputSchema = mergeSchema(outputSchema, schema);
92+
outputSchema = inferWiderSchema(outputSchema, schema);
9393
}
9494
return outputSchema;
9595
}
9696
}
9797

9898
/** Try to combine two schemas with potential incompatible type. */
9999
@VisibleForTesting
100-
public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) {
100+
public static Schema inferWiderSchema(@Nullable Schema lSchema, Schema rSchema) {
101101
if (lSchema == null) {
102102
return rSchema;
103103
}
@@ -137,15 +137,15 @@ public static Schema mergeSchema(@Nullable Schema lSchema, Schema rSchema) {
137137

138138
List<Column> mergedColumns =
139139
IntStream.range(0, lSchema.getColumnCount())
140-
.mapToObj(i -> mergeColumn(leftColumns.get(i), rightColumns.get(i)))
140+
.mapToObj(i -> inferWiderColumn(leftColumns.get(i), rightColumns.get(i)))
141141
.collect(Collectors.toList());
142142

143143
return lSchema.copy(mergedColumns);
144144
}
145145

146146
/** Try to combine two columns with potential incompatible type. */
147147
@VisibleForTesting
148-
public static Column mergeColumn(Column lColumn, Column rColumn) {
148+
public static Column inferWiderColumn(Column lColumn, Column rColumn) {
149149
if (!Objects.equals(lColumn.getName(), rColumn.getName())) {
150150
throw new IllegalStateException(
151151
String.format(
@@ -158,12 +158,12 @@ public static Column mergeColumn(Column lColumn, Column rColumn) {
158158
"Unable to merge column %s and %s with different comments.",
159159
lColumn, rColumn));
160160
}
161-
return lColumn.copy(mergeDataType(lColumn.getType(), rColumn.getType()));
161+
return lColumn.copy(inferWiderType(lColumn.getType(), rColumn.getType()));
162162
}
163163

164164
/** Try to combine given data types to a compatible wider data type. */
165165
@VisibleForTesting
166-
public static DataType mergeDataType(DataType lType, DataType rType) {
166+
public static DataType inferWiderType(DataType lType, DataType rType) {
167167
// Ignore nullability during data type merge
168168
boolean nullable = lType.isNullable() || rType.isNullable();
169169
lType = lType.notNull();

Diff for: flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java

+54-51
Original file line numberDiff line numberDiff line change
@@ -180,166 +180,169 @@ public void testGetNumericPrecision() {
180180
}
181181

182182
@Test
183-
public void testMergeDataType() {
184-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BINARY(17), DataTypes.BINARY(17)))
183+
public void testInferWiderType() {
184+
Assertions.assertThat(
185+
SchemaUtils.inferWiderType(DataTypes.BINARY(17), DataTypes.BINARY(17)))
185186
.isEqualTo(DataTypes.BINARY(17));
186187
Assertions.assertThat(
187-
SchemaUtils.mergeDataType(DataTypes.VARBINARY(17), DataTypes.VARBINARY(17)))
188+
SchemaUtils.inferWiderType(
189+
DataTypes.VARBINARY(17), DataTypes.VARBINARY(17)))
188190
.isEqualTo(DataTypes.VARBINARY(17));
189-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BYTES(), DataTypes.BYTES()))
191+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.BYTES(), DataTypes.BYTES()))
190192
.isEqualTo(DataTypes.BYTES());
191-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BOOLEAN(), DataTypes.BOOLEAN()))
193+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.BOOLEAN(), DataTypes.BOOLEAN()))
192194
.isEqualTo(DataTypes.BOOLEAN());
193-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.INT()))
195+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.INT()))
194196
.isEqualTo(DataTypes.INT());
195-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.TINYINT(), DataTypes.TINYINT()))
197+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.TINYINT(), DataTypes.TINYINT()))
196198
.isEqualTo(DataTypes.TINYINT());
197-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.SMALLINT(), DataTypes.SMALLINT()))
199+
Assertions.assertThat(
200+
SchemaUtils.inferWiderType(DataTypes.SMALLINT(), DataTypes.SMALLINT()))
198201
.isEqualTo(DataTypes.SMALLINT());
199-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.BIGINT(), DataTypes.BIGINT()))
202+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.BIGINT(), DataTypes.BIGINT()))
200203
.isEqualTo(DataTypes.BIGINT());
201-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.FLOAT(), DataTypes.FLOAT()))
204+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.FLOAT(), DataTypes.FLOAT()))
202205
.isEqualTo(DataTypes.FLOAT());
203-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.DOUBLE(), DataTypes.DOUBLE()))
206+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.DOUBLE(), DataTypes.DOUBLE()))
204207
.isEqualTo(DataTypes.DOUBLE());
205-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.CHAR(17), DataTypes.CHAR(17)))
208+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.CHAR(17), DataTypes.CHAR(17)))
206209
.isEqualTo(DataTypes.CHAR(17));
207210
Assertions.assertThat(
208-
SchemaUtils.mergeDataType(DataTypes.VARCHAR(17), DataTypes.VARCHAR(17)))
211+
SchemaUtils.inferWiderType(DataTypes.VARCHAR(17), DataTypes.VARCHAR(17)))
209212
.isEqualTo(DataTypes.VARCHAR(17));
210-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.STRING(), DataTypes.STRING()))
213+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.STRING(), DataTypes.STRING()))
211214
.isEqualTo(DataTypes.STRING());
212215
Assertions.assertThat(
213-
SchemaUtils.mergeDataType(
216+
SchemaUtils.inferWiderType(
214217
DataTypes.DECIMAL(17, 7), DataTypes.DECIMAL(17, 7)))
215218
.isEqualTo(DataTypes.DECIMAL(17, 7));
216-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.DATE(), DataTypes.DATE()))
219+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.DATE(), DataTypes.DATE()))
217220
.isEqualTo(DataTypes.DATE());
218-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.TIME(), DataTypes.TIME()))
221+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.TIME(), DataTypes.TIME()))
219222
.isEqualTo(DataTypes.TIME());
220-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.TIME(6), DataTypes.TIME(6)))
223+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.TIME(6), DataTypes.TIME(6)))
221224
.isEqualTo(DataTypes.TIME(6));
222225
Assertions.assertThat(
223-
SchemaUtils.mergeDataType(DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP()))
226+
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP()))
224227
.isEqualTo(DataTypes.TIMESTAMP());
225228
Assertions.assertThat(
226-
SchemaUtils.mergeDataType(DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3)))
229+
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(3)))
227230
.isEqualTo(DataTypes.TIMESTAMP(3));
228231
Assertions.assertThat(
229-
SchemaUtils.mergeDataType(
232+
SchemaUtils.inferWiderType(
230233
DataTypes.TIMESTAMP_TZ(), DataTypes.TIMESTAMP_TZ()))
231234
.isEqualTo(DataTypes.TIMESTAMP_TZ());
232235
Assertions.assertThat(
233-
SchemaUtils.mergeDataType(
236+
SchemaUtils.inferWiderType(
234237
DataTypes.TIMESTAMP_TZ(3), DataTypes.TIMESTAMP_TZ(3)))
235238
.isEqualTo(DataTypes.TIMESTAMP_TZ(3));
236239
Assertions.assertThat(
237-
SchemaUtils.mergeDataType(
240+
SchemaUtils.inferWiderType(
238241
DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP_LTZ()))
239242
.isEqualTo(DataTypes.TIMESTAMP_LTZ());
240243
Assertions.assertThat(
241-
SchemaUtils.mergeDataType(
244+
SchemaUtils.inferWiderType(
242245
DataTypes.TIMESTAMP_LTZ(3), DataTypes.TIMESTAMP_LTZ(3)))
243246
.isEqualTo(DataTypes.TIMESTAMP_LTZ(3));
244247
Assertions.assertThat(
245-
SchemaUtils.mergeDataType(
248+
SchemaUtils.inferWiderType(
246249
DataTypes.ARRAY(DataTypes.INT()), DataTypes.ARRAY(DataTypes.INT())))
247250
.isEqualTo(DataTypes.ARRAY(DataTypes.INT()));
248251
Assertions.assertThat(
249-
SchemaUtils.mergeDataType(
252+
SchemaUtils.inferWiderType(
250253
DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()),
251254
DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())))
252255
.isEqualTo(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()));
253256

254257
// Test compatible widening cast
255-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.BIGINT()))
258+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.BIGINT()))
256259
.isEqualTo(DataTypes.BIGINT());
257-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.VARCHAR(17), DataTypes.STRING()))
260+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.VARCHAR(17), DataTypes.STRING()))
258261
.isEqualTo(DataTypes.STRING());
259-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.FLOAT(), DataTypes.DOUBLE()))
262+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.FLOAT(), DataTypes.DOUBLE()))
260263
.isEqualTo(DataTypes.DOUBLE());
261-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DECIMAL(4, 0)))
264+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DECIMAL(4, 0)))
262265
.isEqualTo(DataTypes.DECIMAL(10, 0));
263-
Assertions.assertThat(SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DECIMAL(10, 5)))
266+
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DECIMAL(10, 5)))
264267
.isEqualTo(DataTypes.DECIMAL(15, 5));
265268
Assertions.assertThat(
266-
SchemaUtils.mergeDataType(DataTypes.BIGINT(), DataTypes.DECIMAL(10, 5)))
269+
SchemaUtils.inferWiderType(DataTypes.BIGINT(), DataTypes.DECIMAL(10, 5)))
267270
.isEqualTo(DataTypes.DECIMAL(24, 5));
268271
Assertions.assertThat(
269-
SchemaUtils.mergeDataType(
272+
SchemaUtils.inferWiderType(
270273
DataTypes.DECIMAL(5, 4), DataTypes.DECIMAL(10, 2)))
271274
.isEqualTo(DataTypes.DECIMAL(12, 4));
272275

273276
// Test merging with nullability
274277
Assertions.assertThat(
275-
SchemaUtils.mergeDataType(
278+
SchemaUtils.inferWiderType(
276279
DataTypes.INT().notNull(), DataTypes.INT().notNull()))
277280
.isEqualTo(DataTypes.INT().notNull());
278281
Assertions.assertThat(
279-
SchemaUtils.mergeDataType(
282+
SchemaUtils.inferWiderType(
280283
DataTypes.INT().nullable(), DataTypes.INT().notNull()))
281284
.isEqualTo(DataTypes.INT().nullable());
282285
Assertions.assertThat(
283-
SchemaUtils.mergeDataType(
286+
SchemaUtils.inferWiderType(
284287
DataTypes.INT().notNull(), DataTypes.INT().nullable()))
285288
.isEqualTo(DataTypes.INT().nullable());
286289
Assertions.assertThat(
287-
SchemaUtils.mergeDataType(
290+
SchemaUtils.inferWiderType(
288291
DataTypes.INT().nullable(), DataTypes.INT().nullable()))
289292
.isEqualTo(DataTypes.INT().nullable());
290293

291294
// incompatible type merges test
292295
Assertions.assertThatThrownBy(
293-
() -> SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.DOUBLE()))
296+
() -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DOUBLE()))
294297
.isExactlyInstanceOf(IllegalStateException.class);
295298

296299
Assertions.assertThatThrownBy(
297300
() ->
298-
SchemaUtils.mergeDataType(
301+
SchemaUtils.inferWiderType(
299302
DataTypes.DECIMAL(17, 0), DataTypes.DOUBLE()))
300303
.isExactlyInstanceOf(IllegalStateException.class);
301304
Assertions.assertThatThrownBy(
302-
() -> SchemaUtils.mergeDataType(DataTypes.INT(), DataTypes.STRING()))
305+
() -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.STRING()))
303306
.isExactlyInstanceOf(IllegalStateException.class);
304307
}
305308

306309
@Test
307-
public void testMergeColumn() {
310+
public void testInferWiderColumn() {
308311
// Test normal merges
309312
Assertions.assertThat(
310-
SchemaUtils.mergeColumn(
313+
SchemaUtils.inferWiderColumn(
311314
Column.physicalColumn("Column1", DataTypes.INT()),
312315
Column.physicalColumn("Column1", DataTypes.BIGINT())))
313316
.isEqualTo(Column.physicalColumn("Column1", DataTypes.BIGINT()));
314317

315318
Assertions.assertThat(
316-
SchemaUtils.mergeColumn(
319+
SchemaUtils.inferWiderColumn(
317320
Column.physicalColumn("Column2", DataTypes.FLOAT()),
318321
Column.physicalColumn("Column2", DataTypes.DOUBLE())))
319322
.isEqualTo(Column.physicalColumn("Column2", DataTypes.DOUBLE()));
320323

321324
// Test merging columns with incompatible types
322325
Assertions.assertThatThrownBy(
323326
() ->
324-
SchemaUtils.mergeColumn(
327+
SchemaUtils.inferWiderColumn(
325328
Column.physicalColumn("Column3", DataTypes.INT()),
326329
Column.physicalColumn("Column3", DataTypes.STRING())))
327330
.isExactlyInstanceOf(IllegalStateException.class);
328331

329332
// Test merging with incompatible names
330333
Assertions.assertThatThrownBy(
331334
() ->
332-
SchemaUtils.mergeColumn(
335+
SchemaUtils.inferWiderColumn(
333336
Column.physicalColumn("Column4", DataTypes.INT()),
334337
Column.physicalColumn("AnotherColumn4", DataTypes.INT())))
335338
.isExactlyInstanceOf(IllegalStateException.class);
336339
}
337340

338341
@Test
339-
public void testMergeSchema() {
342+
public void testInferWiderSchema() {
340343
// Test normal merges
341344
Assertions.assertThat(
342-
SchemaUtils.mergeSchema(
345+
SchemaUtils.inferWiderSchema(
343346
Schema.newBuilder()
344347
.physicalColumn("Column1", DataTypes.INT())
345348
.physicalColumn("Column2", DataTypes.DOUBLE())
@@ -363,7 +366,7 @@ public void testMergeSchema() {
363366
// Test merging with incompatible types
364367
Assertions.assertThatThrownBy(
365368
() ->
366-
SchemaUtils.mergeSchema(
369+
SchemaUtils.inferWiderSchema(
367370
Schema.newBuilder()
368371
.physicalColumn("Column1", DataTypes.INT())
369372
.physicalColumn("Column2", DataTypes.DOUBLE())
@@ -381,7 +384,7 @@ public void testMergeSchema() {
381384
// Test merging with incompatible column names
382385
Assertions.assertThatThrownBy(
383386
() ->
384-
SchemaUtils.mergeSchema(
387+
SchemaUtils.inferWiderSchema(
385388
Schema.newBuilder()
386389
.physicalColumn("Column1", DataTypes.INT())
387390
.physicalColumn("Column2", DataTypes.DOUBLE())
@@ -399,7 +402,7 @@ public void testMergeSchema() {
399402
// Test merging with different column counts
400403
Assertions.assertThatThrownBy(
401404
() ->
402-
SchemaUtils.mergeSchema(
405+
SchemaUtils.inferWiderSchema(
403406
Schema.newBuilder()
404407
.physicalColumn("Column1", DataTypes.INT())
405408
.physicalColumn("Column2", DataTypes.DOUBLE())
@@ -418,7 +421,7 @@ public void testMergeSchema() {
418421
// Test merging with incompatible schema metadata
419422
Assertions.assertThatThrownBy(
420423
() ->
421-
SchemaUtils.mergeSchema(
424+
SchemaUtils.inferWiderSchema(
422425
Schema.newBuilder()
423426
.physicalColumn("Column1", DataTypes.INT())
424427
.physicalColumn("Column2", DataTypes.DOUBLE())

Diff for: flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
359359
new TransformDef(
360360
"default_namespace.default_schema.table1",
361361
"*,concat(col1,'1') as col12",
362-
"col1 = '1'",
362+
"col1 = '1' OR col1 = '999'",
363363
"col1",
364364
"col12",
365365
"key1=value1",

0 commit comments

Comments
 (0)