diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java index 61c55ef8ef6..3afa7128d13 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java @@ -188,6 +188,10 @@ protected DataType inferBytes(Object value, Schema schema) { protected DataType inferStruct(Object value, Schema schema) { Struct struct = (Struct) value; if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { + if (struct == null) { + // set the default value + return DataTypes.DECIMAL(DecimalType.DEFAULT_PRECISION, DecimalType.DEFAULT_SCALE); + } SpecialValueDecimal decimal = VariableScaleDecimal.toLogical(struct); BigDecimal bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO); return DataTypes.DECIMAL(bigDecimal.precision(), bigDecimal.scale()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java index 08c0c9bac41..6eaa38c97ed 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/table/OracleConnectorITCase.java @@ -638,6 +638,9 @@ void testConsumingNumericColumns(boolean parallelismSnapshot) throws Exception { statement.execute( "INSERT INTO debezium.test_numeric_table " + "VALUES (11000000001, 1, 99, 9999, 987654321, 20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965)"); + statement.execute( + "INSERT INTO debezium.test_numeric_table " + + "VALUES (11000000002, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)"); } String sourceDDL = @@ -709,7 +712,8 @@ void testConsumingNumericColumns(boolean parallelismSnapshot) throws Exception { List expected = Arrays.asList( "+I[11000000000, false, 98, 9998, 987654320, 20000000000000000000, 987654321.12345678, 2147483647, 1024.955, 1024.955]", - "+I[11000000001, true, 99, 9999, 987654321, 20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965]"); + "+I[11000000001, true, 99, 9999, 987654321, 20000000000000000001, 987654321.87654321, 2147483648, 1024.965, 1024.965]", + "+I[11000000002, null, null, null, null, null, null, null, null, null]"); List actual = TestValuesTableFactory.getRawResultsAsStrings("test_numeric_sink"); Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);