From a481a98e0151a8420c2bd6ce78f8b64068795776 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 6 Jan 2025 11:27:03 +0800 Subject: [PATCH] add: tests with json deserializer Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../source/MySqlAncientDateAndTimeITCase.java | 361 +++++++++++++++--- .../table/MySqlAncientDateAndTimeITCase.java | 16 +- .../resources/ddl/ancient_date_and_time.sql | 32 +- 3 files changed, 335 insertions(+), 74 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java index 13e3dd29a2b..ca1a91b4ed0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; @@ -35,6 +36,9 @@ import org.apache.flink.types.RowUtils; import org.apache.flink.util.CloseableIterator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.AfterClass; @@ -105,7 +109,7 @@ public void after() { * shifted to 1971 ~ 2069. */ @Test - public void testAncientDateAndTimeWithTimeAdjuster() throws Exception { + public void testAncientDateAndTimeWithTimeAdjusterWithRowDataDeserializer() throws Exception { // LocalDate.ofEpochDay reference: // +---------------------------------------------------------------------------------+ // | 17390 | 11323 | 11720 | 23072 | -557266 | -1 | 18261 | @@ -115,6 +119,7 @@ public void testAncientDateAndTimeWithTimeAdjuster() throws Exception { MYSQL_CONTAINER, ancientDatabase, true, + DeserializerType.ROW_DATA, Arrays.asList( "+I[1, 17390, 2016-07-13T17:17:17, 2015-06-14T17:17:17.100, 2014-05-15T17:17:17.120, 2013-04-16T17:17:17.123, 2012-03-17T17:17:17.123400, 2011-02-18T17:17:17.123450, 2010-01-19T17:17:17.123456]", "+I[2, null, null, null, null, null, null, null, null]", @@ -127,7 +132,8 @@ public void testAncientDateAndTimeWithTimeAdjuster() throws Exception { } @Test - public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception { + public void testAncientDateAndTimeWithoutTimeAdjusterWithRowDataDeserializer() + throws Exception { // LocalDate.ofEpochDay reference: // +---------------------------------------------------------------------------------+ // | -713095 | -719162 | -718765 | -707413 | -557266 | -1 | 18261 | @@ -137,6 +143,7 @@ public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception { MYSQL_CONTAINER, ancientDatabase, false, + DeserializerType.ROW_DATA, Arrays.asList( "+I[1, -713095, 0016-07-13T17:17:17, 0015-06-14T17:17:17.100, 0014-05-15T17:17:17.120, 0013-04-16T17:17:17.123, 0012-03-17T17:17:17.123400, 0011-02-18T17:17:17.123450, 0010-01-19T17:17:17.123456]", "+I[2, null, null, null, null, null, null, null, null]", @@ -148,69 +155,303 @@ public void testAncientDateAndTimeWithoutTimeAdjuster() throws Exception { "+I[8, 18261, 2019-12-31T23:11:11, 2019-12-31T23:11:11.100, 2019-12-31T23:11:11.120, 2019-12-31T23:11:11.123, 2019-12-31T23:11:11.123400, 2019-12-31T23:11:11.123450, 2019-12-31T23:11:11.123456]")); } + @Test + public void testAncientDateAndTimeWithTimeAdjusterWithJsonDeserializer() throws Exception { + // LocalDate.ofEpochDay reference: + // + // +---------------------------------------------------------------------------------+ + // | 17390 | 11323 | 11720 | 23072 | -557266 | -1 | 18261 | + // | 2017/8/12 | 2001/1/1 | 2002/2/2 | 2033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + // + // LocalDateTime.ofEpochSecond reference: + // + // Row 1: + // 1468430237000 -> 2016-07-13T17:17:17 + // 1434302237100 -> 2015-06-14T17:17:17.100 + // 1400174237120 -> 2014-05-15T17:17:17.120 + // 1366132637123 -> 2013-04-16T17:17:17.123 + // 1332004637123400 -> 2012-03-17T17:17:17.123400 + // 1298049437123450 -> 2011-02-18T17:17:17.123450 + // 1263921437123456 -> 2010-01-19T17:17:17.123456 + // + // Row 2: + // (null) + // + // Row 3: + // 978365776000 -> 2001-01-01T16:16:16 + // 978365776100 -> 2001-01-01T16:16:16.100 + // 978365776120 -> 2001-01-01T16:16:16.120 + // 978365776123 -> 2001-01-01T16:16:16.123 + // 978365776123400 -> 2001-01-01T16:16:16.123400 + // 978365776123450 -> 2001-01-01T16:16:16.123450 + // 978365776123456 -> 2001-01-01T16:16:16.123456 + // + // Row 4: + // 1012662915000 -> 2002-02-02T15:15:15 + // 1012662915100 -> 2002-02-02T15:15:15.100 + // 1012662915120 -> 2002-02-02T15:15:15.120 + // 1012662915123 -> 2002-02-02T15:15:15.123 + // 1012662915123400 -> 2002-02-02T15:15:15.123400 + // 1012662915123450 -> 2002-02-02T15:15:15.123450 + // 1012662915123456 -> 2002-02-02T15:15:15.123456 + // + // Row 5: + // 1993472054000 -> 2033-03-03T14:14:14 + // 1993472054100 -> 2033-03-03T14:14:14.100 + // 1993472054120 -> 2033-03-03T14:14:14.120 + // 1993472054123 -> 2033-03-03T14:14:14.123 + // 1993472054123400 -> 2033-03-03T14:14:14.123400 + // 1993472054123450 -> 2033-03-03T14:14:14.123450 + // 1993472054123456 -> 2033-03-03T14:14:14.123456 + // + // Row 6: + // -48147734807000 -> 0444-04-04T13:13:13 + // -48147734806900 -> 0444-04-04T13:13:13.100 + // -48147734806880 -> 0444-04-04T13:13:13.120 + // -48147734806877 -> 0444-04-04T13:13:13.123 + // -48147734806876600 -> 0444-04-04T13:13:13.000123400 + // -48147734806876550 -> 0444-04-04T13:13:13.000123450 + // -48147734806876544 -> 0444-04-04T13:13:13.000123456 + // + // Row 7: + // -42468000 -> 1969-12-31T12:12:12 + // -42467900 -> 1969-12-31T12:12:12.100 + // -42467880 -> 1969-12-31T12:12:12.120 + // -42467877 -> 1969-12-31T12:12:12.123 + // -42467876600 -> 1969-12-31T12:12:12.123400 + // -42467876550 -> 1969-12-31T12:12:12.123450 + // -42467876544 -> 1969-12-31T12:12:12.123456 + // + // Row 8: + // 1577833871000 -> 2019-12-31T23:11:11 + // 1577833871100 -> 2019-12-31T23:11:11.100 + // 1577833871120 -> 2019-12-31T23:11:11.120 + // 1577833871123 -> 2019-12-31T23:11:11.123 + // 1577833871123400 -> 2019-12-31T23:11:11.123400 + // 1577833871123450 -> 2019-12-31T23:11:11.123450 + // 1577833871123456 -> 2019-12-31T23:11:11.123456 + + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + true, + DeserializerType.JSON, + Arrays.asList( + "{\"id\":\"AQ==\",\"date_col\":17390,\"datetime_0_col\":1468430237000,\"datetime_1_col\":1434302237100,\"datetime_2_col\":1400174237120,\"datetime_3_col\":1366132637123,\"datetime_4_col\":1332004637123400,\"datetime_5_col\":1298049437123450,\"datetime_6_col\":1263921437123456}", + "{\"id\":\"Ag==\",\"date_col\":null,\"datetime_0_col\":null,\"datetime_1_col\":null,\"datetime_2_col\":null,\"datetime_3_col\":null,\"datetime_4_col\":null,\"datetime_5_col\":null,\"datetime_6_col\":null}", + "{\"id\":\"Aw==\",\"date_col\":11323,\"datetime_0_col\":978365776000,\"datetime_1_col\":978365776100,\"datetime_2_col\":978365776120,\"datetime_3_col\":978365776123,\"datetime_4_col\":978365776123400,\"datetime_5_col\":978365776123450,\"datetime_6_col\":978365776123456}", + "{\"id\":\"BA==\",\"date_col\":11720,\"datetime_0_col\":1012662915000,\"datetime_1_col\":1012662915100,\"datetime_2_col\":1012662915120,\"datetime_3_col\":1012662915123,\"datetime_4_col\":1012662915123400,\"datetime_5_col\":1012662915123450,\"datetime_6_col\":1012662915123456}", + "{\"id\":\"BQ==\",\"date_col\":23072,\"datetime_0_col\":1993472054000,\"datetime_1_col\":1993472054100,\"datetime_2_col\":1993472054120,\"datetime_3_col\":1993472054123,\"datetime_4_col\":1993472054123400,\"datetime_5_col\":1993472054123450,\"datetime_6_col\":1993472054123456}", + "{\"id\":\"Bg==\",\"date_col\":-557266,\"datetime_0_col\":-48147734807000,\"datetime_1_col\":-48147734806900,\"datetime_2_col\":-48147734806880,\"datetime_3_col\":-48147734806877,\"datetime_4_col\":-48147734806876600,\"datetime_5_col\":-48147734806876550,\"datetime_6_col\":-48147734806876544}", + "{\"id\":\"Bw==\",\"date_col\":-1,\"datetime_0_col\":-42468000,\"datetime_1_col\":-42467900,\"datetime_2_col\":-42467880,\"datetime_3_col\":-42467877,\"datetime_4_col\":-42467876600,\"datetime_5_col\":-42467876550,\"datetime_6_col\":-42467876544}", + "{\"id\":\"CA==\",\"date_col\":18261,\"datetime_0_col\":1577833871000,\"datetime_1_col\":1577833871100,\"datetime_2_col\":1577833871120,\"datetime_3_col\":1577833871123,\"datetime_4_col\":1577833871123400,\"datetime_5_col\":1577833871123450,\"datetime_6_col\":1577833871123456}")); + } + + @Test + public void testAncientDateAndTimeWithoutTimeAdjusterWithJsonDeserializer() throws Exception { + // LocalDate.ofEpochDay reference: + // + // +---------------------------------------------------------------------------------+ + // | -713095 | -719162 | -718765 | -707413 | -557266 | -1 | 18261 | + // | 0017/8/12 | 0001/1/1 | 0002/2/2 | 0033/3/3 | 0444/4/4 | 1969/12/31 | 2019/12/31 | + // +---------------------------------------------------------------------------------+ + // + // LocalDateTime.ofEpochSecond reference: + // + // Row 1: + // -61645473763000 -> 0016-07-13T17:17:17 + // -61679601762900 -> 0015-06-14T17:17:17.100 + // -61713729762880 -> 0014-05-15T17:17:17.120 + // -61747771362877 -> 0013-04-16T17:17:17.123 + // -61781899362876600 -> 0012-03-17T17:17:17.123400 + // -61815854562876550 -> 0011-02-18T17:17:17.123450 + // -61849982562876544 -> 0010-01-19T17:17:17.123456 + // + // Row 2: + // (null) + // + // Row 3: + // -62135538224000 -> 0001-01-01T16:16:16 + // -62135538223900 -> 0001-01-01T16:16:16.100 + // -62135538223880 -> 0001-01-01T16:16:16.120 + // -62135538223877 -> 0001-01-01T16:16:16.123 + // -62135538223876600 -> 0001-01-01T16:16:16.123400 + // -62135538223876550 -> 0001-01-01T16:16:16.123450 + // -62135538223876544 -> 0001-01-01T16:16:16.123456 + // + // Row 4: + // -62101241085000 -> 0002-02-02T15:15:15 + // -62101241084900 -> 0002-02-02T15:15:15.100 + // -62101241084880 -> 0002-02-02T15:15:15.120 + // -62101241084877 -> 0002-02-02T15:15:15.123 + // -62101241084876600 -> 0002-02-02T15:15:15.123400 + // -62101241084876550 -> 0002-02-02T15:15:15.123450 + // -62101241084876544 -> 0002-02-02T15:15:15.123456 + // + // Row 5: + // -61120431946000 -> 0033-03-03T14:14:14 + // -61120431945900 -> 0033-03-03T14:14:14.100 + // -61120431945880 -> 0033-03-03T14:14:14.120 + // -61120431945877 -> 0033-03-03T14:14:14.123 + // -61120431945876600 -> 0033-03-03T14:14:14.123400 + // -61120431945876550 -> 0033-03-03T14:14:14.123450 + // -61120431945876544 -> 0033-03-03T14:14:14.123456 + // + // + // Row 6: + // -48147734807000 -> 0444-04-04T13:13:13 + // -48147734806900 -> 0444-04-04T13:13:13.100 + // -48147734806880 -> 0444-04-04T13:13:13.120 + // -48147734806877 -> 0444-04-04T13:13:13.123 + // -48147734806876600 -> 0444-04-04T13:13:13.000123400 + // -48147734806876550 -> 0444-04-04T13:13:13.000123450 + // -48147734806876544 -> 0444-04-04T13:13:13.000123456 + // + // Row 7: + // -42468000 -> 1969-12-31T12:12:12 + // -42467900 -> 1969-12-31T12:12:12.100 + // -42467880 -> 1969-12-31T12:12:12.120 + // -42467877 -> 1969-12-31T12:12:12.123 + // -42467876600 -> 1969-12-31T12:12:12.123400 + // -42467876550 -> 1969-12-31T12:12:12.123450 + // -42467876544 -> 1969-12-31T12:12:12.123456 + // + // Row 8: + // 1577833871000 -> 2019-12-31T23:11:11 + // 1577833871100 -> 2019-12-31T23:11:11.100 + // 1577833871120 -> 2019-12-31T23:11:11.120 + // 1577833871123 -> 2019-12-31T23:11:11.123 + // 1577833871123400 -> 2019-12-31T23:11:11.123400 + // 1577833871123450 -> 2019-12-31T23:11:11.123450 + // 1577833871123456 -> 2019-12-31T23:11:11.123456 + + runGenericAncientDateAndTimeTest( + MYSQL_CONTAINER, + ancientDatabase, + false, + DeserializerType.JSON, + Arrays.asList( + "{\"id\":\"AQ==\",\"date_col\":-713095,\"datetime_0_col\":-61645473763000,\"datetime_1_col\":-61679601762900,\"datetime_2_col\":-61713729762880,\"datetime_3_col\":-61747771362877,\"datetime_4_col\":-61781899362876600,\"datetime_5_col\":-61815854562876550,\"datetime_6_col\":-61849982562876544}", + "{\"id\":\"Ag==\",\"date_col\":null,\"datetime_0_col\":null,\"datetime_1_col\":null,\"datetime_2_col\":null,\"datetime_3_col\":null,\"datetime_4_col\":null,\"datetime_5_col\":null,\"datetime_6_col\":null}", + "{\"id\":\"Aw==\",\"date_col\":-719162,\"datetime_0_col\":-62135538224000,\"datetime_1_col\":-62135538223900,\"datetime_2_col\":-62135538223880,\"datetime_3_col\":-62135538223877,\"datetime_4_col\":-62135538223876600,\"datetime_5_col\":-62135538223876550,\"datetime_6_col\":-62135538223876544}", + "{\"id\":\"BA==\",\"date_col\":-718765,\"datetime_0_col\":-62101241085000,\"datetime_1_col\":-62101241084900,\"datetime_2_col\":-62101241084880,\"datetime_3_col\":-62101241084877,\"datetime_4_col\":-62101241084876600,\"datetime_5_col\":-62101241084876550,\"datetime_6_col\":-62101241084876544}", + "{\"id\":\"BQ==\",\"date_col\":-707413,\"datetime_0_col\":-61120431946000,\"datetime_1_col\":-61120431945900,\"datetime_2_col\":-61120431945880,\"datetime_3_col\":-61120431945877,\"datetime_4_col\":-61120431945876600,\"datetime_5_col\":-61120431945876550,\"datetime_6_col\":-61120431945876544}", + "{\"id\":\"Bg==\",\"date_col\":-557266,\"datetime_0_col\":-48147734807000,\"datetime_1_col\":-48147734806900,\"datetime_2_col\":-48147734806880,\"datetime_3_col\":-48147734806877,\"datetime_4_col\":-48147734806876600,\"datetime_5_col\":-48147734806876550,\"datetime_6_col\":-48147734806876544}", + "{\"id\":\"Bw==\",\"date_col\":-1,\"datetime_0_col\":-42468000,\"datetime_1_col\":-42467900,\"datetime_2_col\":-42467880,\"datetime_3_col\":-42467877,\"datetime_4_col\":-42467876600,\"datetime_5_col\":-42467876550,\"datetime_6_col\":-42467876544}", + "{\"id\":\"CA==\",\"date_col\":18261,\"datetime_0_col\":1577833871000,\"datetime_1_col\":1577833871100,\"datetime_2_col\":1577833871120,\"datetime_3_col\":1577833871123,\"datetime_4_col\":1577833871123400,\"datetime_5_col\":1577833871123450,\"datetime_6_col\":1577833871123456}")); + } + private void runGenericAncientDateAndTimeTest( MySqlContainer container, UniqueDatabase database, boolean enableTimeAdjuster, + DeserializerType deserializerType, List expectedResults) throws Exception { - // Build deserializer - DataType dataType = - DataTypes.ROW( - DataTypes.FIELD("id", DataTypes.INT()), - DataTypes.FIELD("date_col", DataTypes.DATE()), - DataTypes.FIELD("datetime_0_col", DataTypes.TIMESTAMP(0)), - DataTypes.FIELD("datetime_1_col", DataTypes.TIMESTAMP(1)), - DataTypes.FIELD("datetime_2_col", DataTypes.TIMESTAMP(2)), - DataTypes.FIELD("datetime_3_col", DataTypes.TIMESTAMP(3)), - DataTypes.FIELD("datetime_4_col", DataTypes.TIMESTAMP(4)), - DataTypes.FIELD("datetime_5_col", DataTypes.TIMESTAMP(5)), - DataTypes.FIELD("datetime_6_col", DataTypes.TIMESTAMP(6))); - LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType); - InternalTypeInfo typeInfo = InternalTypeInfo.of(logicalType); - RowDataDebeziumDeserializeSchema deserializer = - RowDataDebeziumDeserializeSchema.newBuilder() - .setPhysicalRowType((RowType) dataType.getLogicalType()) - .setResultTypeInfo(typeInfo) - .build(); - Properties dbzProperties = new Properties(); - dbzProperties.put("enable.time.adjuster", String.valueOf(enableTimeAdjuster)); - // Build source - MySqlSource mySqlSource = - MySqlSource.builder() - .hostname(container.getHost()) - .port(container.getDatabasePort()) - .databaseList(database.getDatabaseName()) - .serverTimeZone("UTC") - .tableList(database.getDatabaseName() + ".ancient_times") - .username(database.getUsername()) - .password(database.getPassword()) - .serverId(getServerId()) - .deserializer(deserializer) - .startupOptions(StartupOptions.initial()) - .debeziumProperties(dbzProperties) - .build(); + switch (deserializerType) { + case ROW_DATA: + { + // Build deserializer + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD("date_col", DataTypes.DATE()), + DataTypes.FIELD("datetime_0_col", DataTypes.TIMESTAMP(0)), + DataTypes.FIELD("datetime_1_col", DataTypes.TIMESTAMP(1)), + DataTypes.FIELD("datetime_2_col", DataTypes.TIMESTAMP(2)), + DataTypes.FIELD("datetime_3_col", DataTypes.TIMESTAMP(3)), + DataTypes.FIELD("datetime_4_col", DataTypes.TIMESTAMP(4)), + DataTypes.FIELD("datetime_5_col", DataTypes.TIMESTAMP(5)), + DataTypes.FIELD("datetime_6_col", DataTypes.TIMESTAMP(6))); + LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType); + InternalTypeInfo typeInfo = InternalTypeInfo.of(logicalType); + RowDataDebeziumDeserializeSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType((RowType) dataType.getLogicalType()) + .setResultTypeInfo(typeInfo) + .build(); + + Properties dbzProperties = new Properties(); + dbzProperties.put("enable.time.adjuster", String.valueOf(enableTimeAdjuster)); + // Build source + MySqlSource mySqlSource = + MySqlSource.builder() + .hostname(container.getHost()) + .port(container.getDatabasePort()) + .databaseList(database.getDatabaseName()) + .serverTimeZone("UTC") + .tableList(database.getDatabaseName() + ".ancient_times") + .username(database.getUsername()) + .password(database.getPassword()) + .serverId(getServerId()) + .deserializer(deserializer) + .startupOptions(StartupOptions.initial()) + .debeziumProperties(dbzProperties) + .build(); - try (CloseableIterator iterator = - env.fromSource( - mySqlSource, - WatermarkStrategy.noWatermarks(), - "Backfill Skipped Source") - .executeAndCollect()) { - Assertions.assertThat(fetchRows(iterator, expectedResults.size())) - .containsExactlyInAnyOrderElementsOf(expectedResults); + try (CloseableIterator iterator = + env.fromSource( + mySqlSource, + WatermarkStrategy.noWatermarks(), + "Fetch results") + .executeAndCollect()) { + List results = fetchRows(iterator, expectedResults.size()); + Assertions.assertThat(convertRowDataToRowString(results)) + .containsExactlyInAnyOrderElementsOf(expectedResults); + } + } + break; + case JSON: + { + JsonDebeziumDeserializationSchema deserializer = + new JsonDebeziumDeserializationSchema(); + + Properties dbzProperties = new Properties(); + dbzProperties.put("enable.time.adjuster", String.valueOf(enableTimeAdjuster)); + // Build source + MySqlSource mySqlSource = + MySqlSource.builder() + .hostname(container.getHost()) + .port(container.getDatabasePort()) + .databaseList(database.getDatabaseName()) + .serverTimeZone("UTC") + .tableList(database.getDatabaseName() + ".ancient_times") + .username(database.getUsername()) + .password(database.getPassword()) + .serverId(getServerId()) + .deserializer(deserializer) + .startupOptions(StartupOptions.initial()) + .debeziumProperties(dbzProperties) + .build(); + + try (CloseableIterator iterator = + env.fromSource( + mySqlSource, + WatermarkStrategy.noWatermarks(), + "Fetch results") + .executeAndCollect()) { + List results = fetchRows(iterator, expectedResults.size()); + Assertions.assertThat(convertJsonToRowString(results)) + .containsExactlyInAnyOrderElementsOf(expectedResults); + } + } + break; + default: + throw new IllegalArgumentException( + "Unknown deserializer type: " + deserializerType); } } - private static List fetchRows(Iterator iter, int size) { - List rows = new ArrayList<>(size); + private static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); while (size > 0 && iter.hasNext()) { - RowData row = iter.next(); + T row = iter.next(); rows.add(row); size--; } - return convertRowDataToRowString(rows); + return rows; } private static List convertRowDataToRowString(List rows) { @@ -245,6 +486,21 @@ private static List convertRowDataToRowString(List rows) { .collect(Collectors.toList()); } + private static List convertJsonToRowString(List rows) { + ObjectMapper mapper = new ObjectMapper(); + return rows.stream() + .map( + row -> { + try { + JsonNode node = mapper.readTree(row); + return node.get("after").toString(); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + } + private static Object wrap(RowData row, int index, BiFunction getter) { if (row.isNullAt(index)) { return null; @@ -257,4 +513,9 @@ private String getServerId() { int serverId = random.nextInt(100) + 5400; return serverId + "-" + (serverId + env.getParallelism()); } + + enum DeserializerType { + JSON, + ROW_DATA + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java index b8e2510a495..b9ceaa78341 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlAncientDateAndTimeITCase.java @@ -246,14 +246,14 @@ private static void createBinlogEvents(UniqueDatabase database) throws SQLExcept statement.execute( "INSERT INTO ancient_times VALUES (\n" + " DEFAULT,\n" - + " DEFAULT,\n" - + " DEFAULT,\n" - + " DEFAULT,\n" - + " DEFAULT,\n" - + " DEFAULT,\n" - + " DEFAULT,\n" - + " DEFAULT,\n" - + " DEFAULT\n" + + " '0017-08-12',\n" + + " '0016-07-13 17:17:17',\n" + + " '0015-06-14 17:17:17.1',\n" + + " '0014-05-15 17:17:17.12',\n" + + " '0013-04-16 17:17:17.123',\n" + + " '0012-03-17 17:17:17.1234',\n" + + " '0011-02-18 17:17:17.12345',\n" + + " '0010-01-19 17:17:17.123456'\n" + ");"); statement.execute( "INSERT INTO ancient_times VALUES (\n" diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql index 878c687c350..e4ff4e595c2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/ancient_date_and_time.sql @@ -16,27 +16,27 @@ CREATE TABLE ancient_times ( id SERIAL, - date_col DATE DEFAULT '0017-08-12', - datetime_0_col DATETIME(0) DEFAULT '0016-07-13 17:17:17', - datetime_1_col DATETIME(1) DEFAULT '0015-06-14 17:17:17.1', - datetime_2_col DATETIME(2) DEFAULT '0014-05-15 17:17:17.12', - datetime_3_col DATETIME(3) DEFAULT '0013-04-16 17:17:17.123', - datetime_4_col DATETIME(4) DEFAULT '0012-03-17 17:17:17.1234', - datetime_5_col DATETIME(5) DEFAULT '0011-02-18 17:17:17.12345', - datetime_6_col DATETIME(6) DEFAULT '0010-01-19 17:17:17.123456', + date_col DATE, + datetime_0_col DATETIME(0), + datetime_1_col DATETIME(1), + datetime_2_col DATETIME(2), + datetime_3_col DATETIME(3), + datetime_4_col DATETIME(4), + datetime_5_col DATETIME(5), + datetime_6_col DATETIME(6), PRIMARY KEY (id) ); INSERT INTO ancient_times VALUES ( DEFAULT, - DEFAULT, - DEFAULT, - DEFAULT, - DEFAULT, - DEFAULT, - DEFAULT, - DEFAULT, - DEFAULT + '0017-08-12', + '0016-07-13 17:17:17', + '0015-06-14 17:17:17.1', + '0014-05-15 17:17:17.12', + '0013-04-16 17:17:17.123', + '0012-03-17 17:17:17.1234', + '0011-02-18 17:17:17.12345', + '0010-01-19 17:17:17.123456' ); INSERT INTO ancient_times VALUES (