Skip to content

Commit fa05175

Browse files
committed
add round 0 test
1 parent 8c73d29 commit fa05175

File tree

2 files changed

+16
-11
lines changed

2 files changed

+16
-11
lines changed

Diff for: flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java

+16-9
Original file line numberDiff line numberDiff line change
@@ -2185,7 +2185,7 @@ void testTransformWithLargeLiterals() throws Exception {
21852185
}
21862186

21872187
@Test
2188-
void testFloorCeilAndRoundFunctionWithLiterals() throws Exception {
2188+
void testFloorCeilAndRoundFunction() throws Exception {
21892189
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
21902190

21912191
// Setup value source
@@ -2248,7 +2248,14 @@ void testFloorCeilAndRoundFunctionWithLiterals() throws Exception {
22482248
+ "ROUND(bigint_col, 2) AS round_bigint,"
22492249
+ "ROUND(float_col, 2) AS round_float,"
22502250
+ "ROUND(double_col, 2) AS round_double,"
2251-
+ "ROUND(decimal_col, 2) AS round_decimal",
2251+
+ "ROUND(decimal_col, 2) AS round_decimal,"
2252+
+ "ROUND(tinyint_col, 0) AS round_0_tinyint,"
2253+
+ "ROUND(smallint_col, 0) AS round_0_smallint,"
2254+
+ "ROUND(int_col, 0) AS round_0_int,"
2255+
+ "ROUND(bigint_col, 0) AS round_0_bigint,"
2256+
+ "ROUND(float_col, 0) AS round_0_float,"
2257+
+ "ROUND(double_col, 0) AS round_0_double,"
2258+
+ "ROUND(decimal_col, 0) AS round_0_decimal",
22522259
null,
22532260
"id",
22542261
null,
@@ -2267,16 +2274,16 @@ void testFloorCeilAndRoundFunctionWithLiterals() throws Exception {
22672274

22682275
assertThat(outputEvents)
22692276
.containsExactly(
2270-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`tinyint_col` TINYINT,`smallint_col` SMALLINT,`int_col` INT,`bigint_col` BIGINT,`float_col` FLOAT,`double_col` DOUBLE,`decimal_col` DECIMAL(10, 3),`ceil_tinyint` TINYINT,`ceil_smallint` SMALLINT,`ceil_int` INT,`ceil_bigint` BIGINT,`ceil_float` FLOAT,`ceil_double` DOUBLE,`ceil_decimal` DECIMAL(10, 0),`ceiling_tinyint` TINYINT,`ceiling_smallint` SMALLINT,`ceiling_int` INT,`ceiling_bigint` BIGINT,`ceiling_float` FLOAT,`ceiling_double` DOUBLE,`ceiling_decimal` DECIMAL(10, 0),`floor_tinyint` TINYINT,`floor_smallint` SMALLINT,`floor_int` INT,`floor_bigint` BIGINT,`floor_float` FLOAT,`floor_double` DOUBLE,`floor_decimal` DECIMAL(10, 0),`round_tinyint` TINYINT,`round_smallint` SMALLINT,`round_int` INT,`round_bigint` BIGINT,`round_float` FLOAT,`round_double` DOUBLE,`round_decimal` DECIMAL(10, 2)}, primaryKeys=id, options=()}",
2271-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, 1, 1, 1, 1, 1.1, 1.1, 1.100, 1, 1, 1, 1, 2.0, 2.0, 2, 1, 1, 1, 1, 2.0, 2.0, 2, 1, 1, 1, 1, 1.0, 1.0, 1, 1, 1, 1, 1, 1.1, 1.1, 1.10], op=INSERT, meta=()}",
2272-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4, 4, 4, 4, 4, 4.44, 4.44, 4.440, 4, 4, 4, 4, 5.0, 5.0, 5, 4, 4, 4, 4, 5.0, 5.0, 5, 4, 4, 4, 4, 4.0, 4.0, 4, 4, 4, 4, 4, 4.44, 4.44, 4.44], op=INSERT, meta=()}",
2273-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5, 5, 5, 5, 5, 5.555, 5.555, 5.555, 5, 5, 5, 5, 6.0, 6.0, 6, 5, 5, 5, 5, 6.0, 6.0, 6, 5, 5, 5, 5, 5.0, 5.0, 5, 5, 5, 5, 5, 5.56, 5.56, 5.56], op=INSERT, meta=()}",
2274-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9, 9, 9, 9, 9, 1.0E7, 9999999.999, 9999999.999, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000, 9, 9, 9, 9, 1.0E7, 9999999.0, 9999999, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000.00], op=INSERT, meta=()}",
2275-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[0, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");
2277+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`tinyint_col` TINYINT,`smallint_col` SMALLINT,`int_col` INT,`bigint_col` BIGINT,`float_col` FLOAT,`double_col` DOUBLE,`decimal_col` DECIMAL(10, 3),`ceil_tinyint` TINYINT,`ceil_smallint` SMALLINT,`ceil_int` INT,`ceil_bigint` BIGINT,`ceil_float` FLOAT,`ceil_double` DOUBLE,`ceil_decimal` DECIMAL(10, 0),`ceiling_tinyint` TINYINT,`ceiling_smallint` SMALLINT,`ceiling_int` INT,`ceiling_bigint` BIGINT,`ceiling_float` FLOAT,`ceiling_double` DOUBLE,`ceiling_decimal` DECIMAL(10, 0),`floor_tinyint` TINYINT,`floor_smallint` SMALLINT,`floor_int` INT,`floor_bigint` BIGINT,`floor_float` FLOAT,`floor_double` DOUBLE,`floor_decimal` DECIMAL(10, 0),`round_tinyint` TINYINT,`round_smallint` SMALLINT,`round_int` INT,`round_bigint` BIGINT,`round_float` FLOAT,`round_double` DOUBLE,`round_decimal` DECIMAL(10, 2),`round_0_tinyint` TINYINT,`round_0_smallint` SMALLINT,`round_0_int` INT,`round_0_bigint` BIGINT,`round_0_float` FLOAT,`round_0_double` DOUBLE,`round_0_decimal` DECIMAL(8, 0)}, primaryKeys=id, options=()}",
2278+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, 1, 1, 1, 1, 1.1, 1.1, 1.100, 1, 1, 1, 1, 2.0, 2.0, 2, 1, 1, 1, 1, 2.0, 2.0, 2, 1, 1, 1, 1, 1.0, 1.0, 1, 1, 1, 1, 1, 1.1, 1.1, 1.10, 1, 1, 1, 1, 1.0, 1.0, 1], op=INSERT, meta=()}",
2279+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4, 4, 4, 4, 4, 4.44, 4.44, 4.440, 4, 4, 4, 4, 5.0, 5.0, 5, 4, 4, 4, 4, 5.0, 5.0, 5, 4, 4, 4, 4, 4.0, 4.0, 4, 4, 4, 4, 4, 4.44, 4.44, 4.44, 4, 4, 4, 4, 4.0, 4.0, 4], op=INSERT, meta=()}",
2280+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5, 5, 5, 5, 5, 5.555, 5.555, 5.555, 5, 5, 5, 5, 6.0, 6.0, 6, 5, 5, 5, 5, 6.0, 6.0, 6, 5, 5, 5, 5, 5.0, 5.0, 5, 5, 5, 5, 5, 5.56, 5.56, 5.56, 5, 5, 5, 5, 6.0, 6.0, 6], op=INSERT, meta=()}",
2281+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9, 9, 9, 9, 9, 1.0E7, 9999999.999, 9999999.999, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000, 9, 9, 9, 9, 1.0E7, 9999999.0, 9999999, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000.00, 9, 9, 9, 9, 1.0E7, 1.0E7, 10000000], op=INSERT, meta=()}",
2282+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[0, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}");
22762283
}
22772284

22782285
@Test
2279-
void testAbsFunctionWithLiterals() throws Exception {
2286+
void testAbsFunction() throws Exception {
22802287
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
22812288

22822289
// Setup value source

Diff for: flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java

-2
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,6 @@ public void writeBinary(int pos, byte[] bytes) {
145145

146146
@Override
147147
public void writeDecimal(int pos, DecimalData value, int precision) {
148-
assert value == null || (value.precision() <= precision);
149-
150148
if (DecimalData.isCompact(precision)) {
151149
assert value != null;
152150
writeLong(pos, value.toUnscaledLong());

0 commit comments

Comments
 (0)