From f58eec0a233253400a2f4f0202702d98cfcbaeba Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 24 Jul 2024 08:34:25 -0600 Subject: [PATCH] Update AllChangesFlushed --- pkg/repl/client.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/repl/client.go b/pkg/repl/client.go index 408d037..78ca439 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -229,12 +229,19 @@ func (c *Client) SetPos(pos mysql.Position) { } func (c *Client) AllChangesFlushed() bool { - if c.GetDeltaLen() > 0 { - return false - } + deltaLen := c.GetDeltaLen() c.Lock() defer c.Unlock() - return c.canal.SyncedPosition().Compare(c.binlogPosSynced) == 0 + + // We check if the position canal is up to is the same position + // as what we've made changes for. If this is zero it's a good + // indicator that we are up to date. However, because the + // "server lock" is not a global lock, it's possible that the synced + // position could still advance. + if c.canal.SyncedPosition().Compare(c.binlogPosSynced) != 0 { + c.logger.Warnf("Binlog reader info canal-position=%v synced-position=%v. Discrepancies could be due to modifications on other tables.", c.canal.SyncedPosition(), c.binlogPosSynced) + } + return deltaLen == 0 } func (c *Client) GetBinlogApplyPosition() mysql.Position {