Skip to content

Commit

Permalink
Update AllChangesFlushed
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed Jul 24, 2024
1 parent b6e7399 commit f58eec0
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions pkg/repl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,19 @@ func (c *Client) SetPos(pos mysql.Position) {
}

func (c *Client) AllChangesFlushed() bool {
if c.GetDeltaLen() > 0 {
return false
}
deltaLen := c.GetDeltaLen()
c.Lock()
defer c.Unlock()
return c.canal.SyncedPosition().Compare(c.binlogPosSynced) == 0

// We check if the position canal is up to is the same position
// as what we've made changes for. If this is zero it's a good
// indicator that we are up to date. However, because the
// "server lock" is not a global lock, it's possible that the synced
// position could still advance.
if c.canal.SyncedPosition().Compare(c.binlogPosSynced) != 0 {
c.logger.Warnf("Binlog reader info canal-position=%v synced-position=%v. Discrepancies could be due to modifications on other tables.", c.canal.SyncedPosition(), c.binlogPosSynced)
}
return deltaLen == 0
}

func (c *Client) GetBinlogApplyPosition() mysql.Position {
Expand Down

0 comments on commit f58eec0

Please sign in to comment.