Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37191][mysql] Avoid back filling if the lowWatermark of BinlogSplit is equal to highWatermark. #3902

Merged
merged 4 commits into from
Feb 11, 2025

Conversation

lvyanquan
Copy link
Contributor

@lvyanquan lvyanquan commented Feb 4, 2025

To fix timeout in test case of PolardbxSourceITCase.

Avoid extra backfill when there is no binlog event between lowWatermark and highWatermark.

@lvyanquan
Copy link
Contributor Author

@yuxiqian @leonardBang PTAL.

@leonardBang
Copy link
Contributor

CI failed....

Copy link
Contributor

@yuxiqian yuxiqian Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BinlogOffset#compareTo is a general-purposed comparison method, and this change might affect more than backfill necessity check. Is it possible to tweak SnapshotSplitReader#isBackfillRequired only?

@yuxiqian
Copy link
Contributor

yuxiqian commented Feb 6, 2025

Seems testBinlogOffsetCompareWithSnapshotAndBinlogPhase is failing due to this change. DataStream migration test is irrelevant, and should be fixed in #3884.

@lvyanquan
Copy link
Contributor Author

lvyanquan commented Feb 6, 2025

Hi, @JNSimba.

I found that the change of #3415 will cause the following problem:

  1. Even though there is not record between lowWatermark and highWatermark, we still need a backfill procedure, and the timestamp of highWatermark is greater than lowWatermark.
  2. If there is no more record after highWatermark, the backfill procedure will never stop, as we will need to wait for a record that is at or after the highWatermark(as the following code displayed).

I thank that the change of #3415 mainly to solve the problem of the comparson of highWatermark and the last record, but in reality, we don't need to read this record, so we can skip it directly.

Notice that this will not cause data loss, as this record will be read in backfill phase.

What do you think?

@JNSimba
Copy link
Member

JNSimba commented Feb 6, 2025

@lvyanquan Is the backfill phase not stopped because the ts_sec of offsetContext is always 0, which is less than the ts_sec of highwatermark(binlogSplit.getEndingOffset)?

final BinlogOffset currentBinlogOffset =
RecordUtils.getBinlogPosition(offsetContext.getOffset());
// reach the high watermark, the binlog reader should finished
if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {

  1. If so, the following situation may also cause problems.
    Since the server_id of highwatermark is equal to 0, but the server_id of offsetContext is not equal to 0, the comparison will trigger the comparison of timestamp, so the ts_sec of offset will be less than the ts_sec of highwatermark, and it will also get stuck?
    if (serverId != targetServerId) {
    // These are from different servers, and their binlog coordinates are not related. So
    // the only thing we can do
    // is compare timestamps, and we have to assume that the server timestamps can be
    // compared ...
    long timestamp = this.getTimestampSec();
    long targetTimestamp = that.getTimestampSec();
    return Long.compare(timestamp, targetTimestamp);
    }

@leonardBang leonardBang requested a review from yuxiqian February 7, 2025 04:05
@lvyanquan
Copy link
Contributor Author

lvyanquan commented Feb 7, 2025

@lvyanquan Is the backfill phase not stopped because the ts_sec of offsetContext is always 0, which is less than the ts_sec of highwatermark(binlogSplit.getEndingOffset)?

Yes.

and it will also get stuck?

Make sense. So I would like to revert the change of adding timestamp to highWaterMark and skip the record that is equal to the offset of highWaterMark.

The test case of BinlogSplitReaderTest will be remained to verify this.

Copy link
Member

@JNSimba JNSimba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulled and tested locally, I could confirm that PolarDBX test case could run without problem.

But seems BinlogSplitReaderTest#testBinlogOffsetCompareWithSnapshotAndBinlogPhase (introduced in #3415) will always fail on this assertion:

assertTrue(sourceRecords.isEmpty());
Expecting empty but was: [SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1739012804, file=mysql-bin.000003, pos=15691, row=1, server_id=223344, event=2}} ConnectRecord{topic='mysql_binlog_source.customer_tesuwt.customers', kafkaPartition=null, key=Struct{id=999999}, keySchema=Schema{mysql_binlog_source.customer_tesuwt.customers.Key:STRUCT}, value=Struct{after=Struct{id=999999,name=user_22,address=Shanghai,phone_number=123567891234},source=Struct{version=1.9.8.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1739012804000,db=customer_tesuwt,table=customers,server_id=223344,file=mysql-bin.000003,pos=15831,row=0,thread=5},op=c,ts_ms=1739012808026}, valueSchema=Schema{mysql_binlog_source.customer_tesuwt.customers.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]

@@ -232,7 +232,9 @@ public int compareTo(BinlogOffset that) {
// compared ...
long timestamp = this.getTimestampSec();
long targetTimestamp = that.getTimestampSec();
return Long.compare(timestamp, targetTimestamp);
if (timestamp != 0 && targetTimestamp != 0) {
return Long.compare(timestamp, targetTimestamp);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is necessary because that if the TimestampSec is 0, the assumption of These are from different servers, and their binlog coordinates are not related is not correct, these two BinlogOffsets could come from the same server.

@lvyanquan lvyanquan requested a review from yuxiqian February 11, 2025 03:49
Copy link
Contributor

@yuxiqian yuxiqian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulled, ran tests locally and looks good.

Considering BinlogOffset is part of public API and has complex comparison logic, maybe we can add some tests later to cover it in case if we break something accidentally again in the future.

@leonardBang leonardBang merged commit cd1fb6f into apache:master Feb 11, 2025
26 of 28 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants