Skip to content

Commit cd1fb6f

Browse files
authored
[FLINK-37191][cdc-connector/mysql] Avoid back filling when lowWatermark is equal to highWatermark in BinlogSplit
This closes #3902
1 parent 0a4c256 commit cd1fb6f

File tree

3 files changed

+5
-18
lines changed

3 files changed

+5
-18
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
292292
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) {
293293
// the existed tables those have finished snapshot reading
294294
if (maxSplitHighWatermarkMap.containsKey(tableId)
295-
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
295+
&& position.isAfter(maxSplitHighWatermarkMap.get(tableId))) {
296296
pureBinlogPhaseTables.add(tableId);
297297
return true;
298298
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java

-10
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,6 @@ protected SnapshotResult<MySqlOffsetContext> doExecute(
154154
hooks.getPreLowWatermarkAction().accept(jdbcConnection, snapshotSplit);
155155
}
156156
final BinlogOffset lowWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection);
157-
lowWatermark
158-
.getOffset()
159-
.put(
160-
BinlogOffset.TIMESTAMP_KEY,
161-
String.valueOf(clock.currentTime().getEpochSecond()));
162157
LOG.info(
163158
"Snapshot step 1 - Determining low watermark {} for split {}",
164159
lowWatermark,
@@ -192,11 +187,6 @@ protected SnapshotResult<MySqlOffsetContext> doExecute(
192187
} else {
193188
// Get the current binlog offset as HW
194189
highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection);
195-
highWatermark
196-
.getOffset()
197-
.put(
198-
BinlogOffset.TIMESTAMP_KEY,
199-
String.valueOf(clock.currentTime().getEpochSecond()));
200190
}
201191

202192
LOG.info(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,9 @@ public int compareTo(BinlogOffset that) {
232232
// compared ...
233233
long timestamp = this.getTimestampSec();
234234
long targetTimestamp = that.getTimestampSec();
235-
return Long.compare(timestamp, targetTimestamp);
235+
if (timestamp != 0 && targetTimestamp != 0) {
236+
return Long.compare(timestamp, targetTimestamp);
237+
}
236238
}
237239

238240
// First compare the MySQL binlog filenames
@@ -251,12 +253,7 @@ public int compareTo(BinlogOffset that) {
251253
}
252254

253255
// The completed events are the same, so compare the row number ...
254-
if (this.getRestartSkipRows() != that.getRestartSkipRows()) {
255-
return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows());
256-
}
257-
258-
// The skip rows are the same, so compare the timestamp ...
259-
return Long.compare(this.getTimestampSec(), that.getTimestampSec());
256+
return Long.compare(this.getRestartSkipRows(), that.getRestartSkipRows());
260257
}
261258

262259
public boolean isAtOrBefore(BinlogOffset that) {

0 commit comments

Comments
 (0)