Skip to content

Commit bae1c46

Browse files
committed
[FLINK-37000] Fix MySQL CDC could not handle datetime prior to 1970 properly
Signed-off-by: yuxiqian <[email protected]>
1 parent ddb5f00 commit bae1c46

File tree

9 files changed

+1411
-4
lines changed

9 files changed

+1411
-4
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlAncientDateAndTimeITCase.java

+413
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one or more
2+
-- contributor license agreements. See the NOTICE file distributed with
3+
-- this work for additional information regarding copyright ownership.
4+
-- The ASF licenses this file to You under the Apache License, Version 2.0
5+
-- (the "License"); you may not use this file except in compliance with
6+
-- the License. You may obtain a copy of the License at
7+
--
8+
-- http://www.apache.org/licenses/LICENSE-2.0
9+
--
10+
-- Unless required by applicable law or agreed to in writing, software
11+
-- distributed under the License is distributed on an "AS IS" BASIS,
12+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
-- See the License for the specific language governing permissions and
14+
-- limitations under the License.
15+
16+
CREATE TABLE ancient_times
17+
(
18+
id INT NOT NULL AUTO_INCREMENT,
19+
date_col DATE DEFAULT '0017-08-12',
20+
datetime_0_col DATETIME(0) DEFAULT '0016-07-13 17:17:17',
21+
datetime_1_col DATETIME(1) DEFAULT '0015-06-14 17:17:17.1',
22+
datetime_2_col DATETIME(2) DEFAULT '0014-05-15 17:17:17.12',
23+
datetime_3_col DATETIME(3) DEFAULT '0013-04-16 17:17:17.123',
24+
datetime_4_col DATETIME(4) DEFAULT '0012-03-17 17:17:17.1234',
25+
datetime_5_col DATETIME(5) DEFAULT '0011-02-18 17:17:17.12345',
26+
datetime_6_col DATETIME(6) DEFAULT '0010-01-19 17:17:17.123456',
27+
PRIMARY KEY (id)
28+
);
29+
30+
INSERT INTO ancient_times VALUES (
31+
DEFAULT,
32+
DEFAULT,
33+
DEFAULT,
34+
DEFAULT,
35+
DEFAULT,
36+
DEFAULT,
37+
DEFAULT,
38+
DEFAULT,
39+
DEFAULT
40+
);
41+
42+
INSERT INTO ancient_times VALUES (
43+
DEFAULT,
44+
'0000-00-00',
45+
'0000-00-00 00:00:00',
46+
'0000-00-00 00:00:00.0',
47+
'0000-00-00 00:00:00.00',
48+
'0000-00-00 00:00:00.000',
49+
'0000-00-00 00:00:00.0000',
50+
'0000-00-00 00:00:00.00000',
51+
'0000-00-00 00:00:00.000000'
52+
);
53+
54+
INSERT INTO ancient_times VALUES (
55+
DEFAULT,
56+
'0001-01-01',
57+
'0001-01-01 16:16:16',
58+
'0001-01-01 16:16:16.1',
59+
'0001-01-01 16:16:16.12',
60+
'0001-01-01 16:16:16.123',
61+
'0001-01-01 16:16:16.1234',
62+
'0001-01-01 16:16:16.12345',
63+
'0001-01-01 16:16:16.123456'
64+
);
65+
66+
INSERT INTO ancient_times VALUES (
67+
DEFAULT,
68+
'0002-02-02',
69+
'0002-02-02 15:15:15',
70+
'0002-02-02 15:15:15.1',
71+
'0002-02-02 15:15:15.12',
72+
'0002-02-02 15:15:15.123',
73+
'0002-02-02 15:15:15.1234',
74+
'0002-02-02 15:15:15.12345',
75+
'0002-02-02 15:15:15.123456'
76+
);
77+
78+
INSERT INTO ancient_times VALUES (
79+
DEFAULT,
80+
'0033-03-03',
81+
'0033-03-03 14:14:14',
82+
'0033-03-03 14:14:14.1',
83+
'0033-03-03 14:14:14.12',
84+
'0033-03-03 14:14:14.123',
85+
'0033-03-03 14:14:14.1234',
86+
'0033-03-03 14:14:14.12345',
87+
'0033-03-03 14:14:14.123456'
88+
);
89+
90+
INSERT INTO ancient_times VALUES (
91+
DEFAULT,
92+
'0444-04-04',
93+
'0444-04-04 13:13:13',
94+
'0444-04-04 13:13:13.1',
95+
'0444-04-04 13:13:13.12',
96+
'0444-04-04 13:13:13.123',
97+
'0444-04-04 13:13:13.1234',
98+
'0444-04-04 13:13:13.12345',
99+
'0444-04-04 13:13:13.123456'
100+
);
101+
102+
INSERT INTO ancient_times VALUES (
103+
DEFAULT,
104+
'1969-12-31',
105+
'1969-12-31 12:12:12',
106+
'1969-12-31 12:12:12.1',
107+
'1969-12-31 12:12:12.12',
108+
'1969-12-31 12:12:12.123',
109+
'1969-12-31 12:12:12.1234',
110+
'1969-12-31 12:12:12.12345',
111+
'1969-12-31 12:12:12.123456'
112+
);
113+
114+
INSERT INTO ancient_times VALUES (
115+
DEFAULT,
116+
'2019-12-31',
117+
'2019-12-31 23:11:11',
118+
'2019-12-31 23:11:11.1',
119+
'2019-12-31 23:11:11.12',
120+
'2019-12-31 23:11:11.123',
121+
'2019-12-31 23:11:11.1234',
122+
'2019-12-31 23:11:11.12345',
123+
'2019-12-31 23:11:11.123456'
124+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
# For advice on how to change settings please see
15+
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
16+
17+
[mysqld]
18+
#
19+
# Remove leading # and set to the amount of RAM for the most important data
20+
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
21+
# innodb_buffer_pool_size = 128M
22+
#
23+
# Remove leading # to turn on a very important data integrity option: logging
24+
# changes to the binary log between backups.
25+
# log_bin
26+
#
27+
# Remove leading # to set options mainly useful for reporting servers.
28+
# The server defaults are faster for transactions and fast SELECTs.
29+
# Adjust sizes as needed, experiment to find the optimal values.
30+
# join_buffer_size = 128M
31+
# sort_buffer_size = 2M
32+
# read_rnd_buffer_size = 2M
33+
skip-host-cache
34+
skip-name-resolve
35+
#datadir=/var/lib/mysql
36+
#socket=/var/lib/mysql/mysql.sock
37+
secure-file-priv=/var/lib/mysql
38+
user=mysql
39+
40+
# Disabling symbolic-links is recommended to prevent assorted security risks
41+
symbolic-links=0
42+
43+
#log-error=/var/log/mysqld.log
44+
#pid-file=/var/run/mysqld/mysqld.pid
45+
46+
# ----------------------------------------------
47+
# Enable the binlog for replication & CDC
48+
# ----------------------------------------------
49+
50+
# Enable binary replication log and set the prefix, expiration, and log format.
51+
# The prefix is arbitrary, expiration can be short for integration tests but would
52+
# be longer on a production system. Row-level info is required for ingest to work.
53+
# Server ID is required, but this will vary on production systems
54+
server-id = 223344
55+
log_bin = mysql-bin
56+
expire_logs_days = 1
57+
binlog_format = row
58+
sql_mode = "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION"

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -309,10 +309,12 @@ protected Object convertToTimestamp(Object dbzObj, Schema schema) {
309309
return TimestampData.fromMillis((Long) dbzObj);
310310
case MicroTimestamp.SCHEMA_NAME:
311311
long micro = (long) dbzObj;
312-
return TimestampData.fromMillis(micro / 1000, (int) (micro % 1000 * 1000));
312+
return TimestampData.fromMillis(
313+
Math.floorDiv(micro, 1000), (int) (Math.floorMod(micro, 1000) * 1000));
313314
case NanoTimestamp.SCHEMA_NAME:
314315
long nano = (long) dbzObj;
315-
return TimestampData.fromMillis(nano / 1000_000, (int) (nano % 1000_000));
316+
return TimestampData.fromMillis(
317+
Math.floorDiv(nano, 1000_000), (int) (Math.floorMod(nano, 1000_000)));
316318
}
317319
}
318320
throw new IllegalArgumentException(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -481,12 +481,17 @@ public Object convert(Object dbzObj, Schema schema) {
481481
return TimestampData.fromEpochMillis((Long) dbzObj);
482482
case MicroTimestamp.SCHEMA_NAME:
483483
long micro = (long) dbzObj;
484+
// Use Math#floorDiv and Math#floorMod instead of `/` and `%`, because
485+
// timestamp number could be negative if we're handling timestamps prior
486+
// to 1970.
484487
return TimestampData.fromEpochMillis(
485-
micro / 1000, (int) (micro % 1000 * 1000));
488+
Math.floorDiv(micro, 1000),
489+
(int) (Math.floorMod(micro, 1000) * 1000));
486490
case NanoTimestamp.SCHEMA_NAME:
487491
long nano = (long) dbzObj;
488492
return TimestampData.fromEpochMillis(
489-
nano / 1000_000, (int) (nano % 1000_000));
493+
Math.floorDiv(nano, 1000_000),
494+
(int) (Math.floorMod(nano, 1000_000)));
490495
}
491496
}
492497
LocalDateTime localDateTime =

0 commit comments

Comments
 (0)