Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repl: revert to inject-noise based master wait #339

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22

require (
github.com/alecthomas/kong v0.7.1
github.com/go-mysql-org/go-mysql v1.8.1-0.20240728143959-24fbb5be92c3
github.com/go-mysql-org/go-mysql v1.8.1-0.20240805131754-ccf204bf2b2a
github.com/go-sql-driver/mysql v1.7.1
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
github.com/pingcap/tidb/pkg/parser v0.0.0-20231103042308-035ad5ccbe67
Expand Down Expand Up @@ -40,5 +40,3 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/go-mysql-org/go-mysql => github.com/morgo/go-mysql v0.0.0-20240809144225-c94bbea2a85e
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-mysql-org/go-mysql v1.8.1-0.20240805131754-ccf204bf2b2a h1:VO6kiE9ex1uNaCCgDz/q0EhTueLrr3vmxkjJpU2x6pk=
github.com/go-mysql-org/go-mysql v1.8.1-0.20240805131754-ccf204bf2b2a/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
Expand All @@ -33,8 +35,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/morgo/go-mysql v0.0.0-20240809144225-c94bbea2a85e h1:2qxqEqsL1z1psCF6r6m+2FlW7b1iItFjILzWJiiwqt0=
github.com/morgo/go-mysql v0.0.0-20240809144225-c94bbea2a85e/go.mod h1:+SgFgTlqjqOQoMc98n9oyUWEgn2KkOL1VmXDoq2ONOs=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32 h1:m5ZsBa5o/0CkzZXfXLaThzKuR85SnHHetqBCpzQ30h8=
Expand Down
47 changes: 40 additions & 7 deletions pkg/repl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,7 @@ func (c *Client) Run() (err error) {
cfg.Password = c.password
cfg.Logger = NewLogWrapper(c.logger) // wrapper to filter the noise.
cfg.IncludeTableRegex = []string{fmt.Sprintf("^%s\\.%s$", c.table.SchemaName, c.table.TableName)}
cfg.Dump.ExecutionPath = "" // skip dump
cfg.DisableFlushBinlogWhileWaiting = true // can't guarantee privileges exist.
cfg.Dump.ExecutionPath = "" // skip dump
if dbconn.IsRDSHost(cfg.Addr) {
// create a new TLSConfig for RDS
// It needs to be a copy because sharing a global pointer
Expand Down Expand Up @@ -404,10 +403,15 @@ func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock
if err := c.flush(ctx, true, lock); err != nil {
return err
}
// Wait for the changes flushed to be received.
if err := c.BlockWait(ctx); err != nil {
return err
}
// TODO: Wait for the changes flushed to be received.
// Ideally we can call c.BlockWait() when
// https://github.com/cashapp/spirit/issues/337 merges.
// For now, we skip block wait. Because the check in AllChangesFlushed()
// now only returns a warning, this is fine.
// if err := c.BlockWait(ctx); err != nil {
// return err
// }

// Do a final flush
return c.flush(ctx, true, lock)
}
Expand Down Expand Up @@ -711,7 +715,36 @@ func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)
// **Caveat** Unless you are calling this from Flush(), calling this DOES NOT ensure that
// changes have been applied to the database.
func (c *Client) BlockWait(ctx context.Context) error {
return c.canal.CatchMasterPos(DefaultTimeout)
targetPos, err := c.canal.GetMasterPos() // what the server is at.
if err != nil {
return err
}
for i := 100; ; i++ {
if err := c.injectBinlogNoise(ctx); err != nil {
return err
}
canalPos := c.canal.SyncedPosition()
if i%100 == 0 {
// Print status every 100 loops = 10s
c.logger.Infof("blocking until we have read all binary logs: current-pos=%s target-pos=%s", canalPos, targetPos)
}
if canalPos.Compare(targetPos) >= 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
return nil
}

// injectBinlogNoise is used to inject some noise into the binlog stream
// This helps ensure that we are "past" a binary log position if there is some off-by-one
// problem where the most recent canal event is not yet updating the canal SyncedPosition,
// and there are no current changes on the MySQL server to advance itself.
// Note: We can not update the table or the newTable, because this intentionally
// causes a panic (c.tableChanged() is called).
func (c *Client) injectBinlogNoise(ctx context.Context) error {
tblName := fmt.Sprintf("_%s_chkpnt", c.table.TableName)
return dbconn.Exec(ctx, c.db, "ALTER TABLE %n.%n AUTO_INCREMENT=0", c.table.SchemaName, tblName)
}

func (c *Client) keyHasChanged(key []interface{}, deleted bool) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/repl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,10 @@ func TestBlockWait(t *testing.T) {
db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

testutils.RunSQL(t, "DROP TABLE IF EXISTS blockwaitt1, blockwaitt2")
testutils.RunSQL(t, "DROP TABLE IF EXISTS blockwaitt1, blockwaitt2, _blockwaitt1_chkpnt")
testutils.RunSQL(t, "CREATE TABLE blockwaitt1 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE blockwaitt2 (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _blockwaitt1_chkpnt (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")

t1 := table.NewTableInfo(db, "test", "blockwaitt1")
assert.NoError(t, t1.SetInfo(context.TODO()))
Expand Down
Loading