From 7c6bf04cae7be0a6852a781d04b9ab2dc3cea0fa Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 12 Jun 2024 09:41:29 -0600 Subject: [PATCH] reduce deadlocks in recopying chunks --- pkg/checksum/checker.go | 72 +++++++++++++++++++++++++++++++++++- pkg/migration/runner.go | 2 + pkg/migration/runner_test.go | 8 ++++ pkg/row/copier.go | 9 ++++- 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index 4f065444..0dce905f 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -40,6 +40,7 @@ type Checker struct { logger loggers.Advanced fixDifferences bool differencesFound atomic.Uint64 + recopyLock sync.Mutex } type CheckerConfig struct { @@ -229,8 +230,75 @@ func (c *Checker) replaceChunk(chunk *table.Chunk) error { c.table.QuotedName, chunk.String(), ) - _, err := dbconn.RetryableTransaction(context.TODO(), c.db, false, dbconn.NewDBConfig(), deleteStmt, replaceStmt) - return err + // Note: historically this process has caused deadlocks between the DELETE statement + // in one replaceChunk and the REPLACE statement of another chunk. Inspection of + // SHOW ENGINE INNODB STATUS shows that this is not caused by locks on the PRIMARY KEY, + // but a unique secondary key (in our case an idempotence key): + // + // ------------------------ + // LATEST DETECTED DEADLOCK + // ------------------------ + // 2024-06-11 18:34:21 70676106989440 + // *** (1) TRANSACTION: + // TRANSACTION 15106308424, ACTIVE 4 sec updating or deleting + // mysql tables in use 1, locked 1 + // LOCK WAIT 620 lock struct(s), heap size 73848, 49663 row lock(s), undo log entries 49661 + // MySQL thread id 540806, OS thread handle 70369444421504, query id 409280999 10.137.84.232 updating + // DELETE FROM ``.`__new` WHERE `id` >= 1108588365 AND `id` < 1108688365 + // + // *** (1) HOLDS THE LOCK(S): + // RECORD LOCKS space id 1802 page no 26277057 n bits 232 index idempotence_key_idx of table ``.`__new` trx id 15106308424 lock_mode X locks rec but not gap + // Record lock, heap no 163 PHYSICAL RECORD: n_fields 2; compact format; info bits 32 + // 0: len 30; hex ; asc ; (total 62 bytes); + // 1: len 8; hex ; asc ;; + // + // + // *** (1) WAITING FOR THIS LOCK TO BE GRANTED: + // RECORD LOCKS space id 1802 page no 1945840 n bits 280 index idempotence_key_idx of table ``.`__new` trx id 15106308424 lock_mode X locks rec but not gap waiting + // Record lock, heap no 75 PHYSICAL RECORD: n_fields 2; compact format; info bits 0 + // 0: len 30; hex ; asc ; (total 62 bytes); + // 1: len 8; hex ; asc ;; + // + // + // *** (2) TRANSACTION: + // TRANSACTION 15106301192, ACTIVE 58 sec inserting + // mysql tables in use 2, locked 1 + // LOCK WAIT 220020 lock struct(s), heap size 27680888, 409429 row lock(s), undo log entries 162834 + // MySQL thread id 540264, OS thread handle 70369485823872, query id 409127061 10.137.84.232 executing + // REPLACE INTO ``.`__new` (`id`, FROM ``.`` WHERE `id` >= 1106488365 AND `id` < 1106588365 + // + // *** (2) HOLDS THE LOCK(S): + // RECORD LOCKS space id 1802 page no 1945840 n bits 280 index idempotence_key_idx of table ``.`__new` trx id 15106301192 lock_mode X + // Record lock, heap no 75 PHYSICAL RECORD: n_fields 2; compact format; info bits 0 + // 0: len 30; hex ; asc ; (total 62 bytes); + // 1: len 8; hex ; asc B yI;; + // + // + // *** (2) WAITING FOR THIS LOCK TO BE GRANTED: + // RECORD LOCKS space id 1802 page no 26277057 n bits 232 index idempotence_key_idx of table ``.`__new` trx id 15106301192 lock_mode X waiting + // Record lock, heap no 163 PHYSICAL RECORD: n_fields 2; compact format; info bits 32 + // 0: len 30; hex ; asc ; (total 62 bytes); + // 1: len 8; hex ; asc ;; + // + // *** WE ROLL BACK TRANSACTION (1) + // + // We don't need this to be an atomic transaction. We just need to delete from the _new table + // first so that any since-deleted rows (which wouldn't get removed by replace) are removed first. + // By doing this as two transactions we should be able to remove + // the opportunity for deadlocks. + // + // We further prevent the chance of deadlocks from the recopying process by only re-copying one chunk at a time. + // We may revisit this in future, but since conflicts are expected to be low, it should be fine for now. + c.recopyLock.Lock() + defer c.recopyLock.Unlock() + + if _, err := dbconn.RetryableTransaction(context.TODO(), c.db, false, dbconn.NewDBConfig(), deleteStmt); err != nil { + return err + } + if _, err := dbconn.RetryableTransaction(context.TODO(), c.db, false, dbconn.NewDBConfig(), replaceStmt); err != nil { + return err + } + return nil } func (c *Checker) isHealthy(ctx context.Context) bool { diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index e26d66c6..481eba43 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -469,6 +469,7 @@ func (r *Runner) setup(ctx context.Context) error { Throttler: &throttler.Noop{}, Logger: r.logger, MetricsSink: r.metricsSink, + DBConfig: r.dbConfig, }) if err != nil { return err @@ -747,6 +748,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error { Throttler: &throttler.Noop{}, Logger: r.logger, MetricsSink: r.metricsSink, + DBConfig: r.dbConfig, }, lowWatermark, rowsCopied, rowsCopiedLogical) if err != nil { diff --git a/pkg/migration/runner_test.go b/pkg/migration/runner_test.go index 5e2b0d0c..318bfe01 100644 --- a/pkg/migration/runner_test.go +++ b/pkg/migration/runner_test.go @@ -740,6 +740,8 @@ func TestCheckpoint(t *testing.T) { // the migration process manually. r.db, err = dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) assert.NoError(t, err) + r.dbConfig = dbconn.NewDBConfig() + // Get Table Info r.table = table.NewTableInfo(r.db, r.migration.Database, r.migration.Table) err = r.table.SetInfo(context.TODO()) @@ -1202,6 +1204,7 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) { Throttler: &throttler.Noop{}, Logger: m.logger, MetricsSink: &metrics.NoopSink{}, + DBConfig: dbconn.NewDBConfig(), }) assert.NoError(t, err) m.replClient.KeyAboveCopierCallback = m.copier.KeyAboveHighWatermark @@ -1330,6 +1333,7 @@ func TestE2EBinlogSubscribingNonCompositeKey(t *testing.T) { Throttler: &throttler.Noop{}, Logger: m.logger, MetricsSink: &metrics.NoopSink{}, + DBConfig: dbconn.NewDBConfig(), }) assert.NoError(t, err) m.replClient.KeyAboveCopierCallback = m.copier.KeyAboveHighWatermark @@ -1931,6 +1935,7 @@ func TestE2ERogueValues(t *testing.T) { Throttler: &throttler.Noop{}, Logger: m.logger, MetricsSink: &metrics.NoopSink{}, + DBConfig: dbconn.NewDBConfig(), }) assert.NoError(t, err) m.replClient.KeyAboveCopierCallback = m.copier.KeyAboveHighWatermark @@ -2075,6 +2080,7 @@ func TestResumeFromCheckpointPhantom(t *testing.T) { // Do the initial setup. m.db, err = dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) assert.NoError(t, err) + m.dbConfig = dbconn.NewDBConfig() m.table = table.NewTableInfo(m.db, m.migration.Database, m.migration.Table) assert.NoError(t, m.table.SetInfo(ctx)) assert.NoError(t, m.createNewTable(ctx)) @@ -2093,6 +2099,7 @@ func TestResumeFromCheckpointPhantom(t *testing.T) { Throttler: &throttler.Noop{}, Logger: m.logger, MetricsSink: &metrics.NoopSink{}, + DBConfig: dbconn.NewDBConfig(), }) assert.NoError(t, err) m.replClient.KeyAboveCopierCallback = m.copier.KeyAboveHighWatermark @@ -2163,6 +2170,7 @@ func TestResumeFromCheckpointPhantom(t *testing.T) { assert.NoError(t, err) m.db, err = dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) assert.NoError(t, err) + m.dbConfig = dbconn.NewDBConfig() m.table = table.NewTableInfo(m.db, m.migration.Database, m.migration.Table) assert.NoError(t, m.table.SetInfo(ctx)) // check we can resume from checkpoint. diff --git a/pkg/row/copier.go b/pkg/row/copier.go index 01da54ed..325b2bc6 100644 --- a/pkg/row/copier.go +++ b/pkg/row/copier.go @@ -49,6 +49,7 @@ type Copier struct { startTime time.Time ExecTime time.Duration Throttler throttler.Throttler + dbConfig *dbconn.DBConfig logger loggers.Advanced metricsSink metrics.Sink } @@ -60,6 +61,7 @@ type CopierConfig struct { Throttler throttler.Throttler Logger loggers.Advanced MetricsSink metrics.Sink + DBConfig *dbconn.DBConfig } // NewCopierDefaultConfig returns a default config for the copier. @@ -71,6 +73,7 @@ func NewCopierDefaultConfig() *CopierConfig { Throttler: &throttler.Noop{}, Logger: logrus.New(), MetricsSink: &metrics.NoopSink{}, + DBConfig: dbconn.NewDBConfig(), } } @@ -83,6 +86,9 @@ func NewCopier(db *sql.DB, tbl, newTable *table.TableInfo, config *CopierConfig) if err != nil { return nil, err } + if config.DBConfig == nil { + return nil, errors.New("dbConfig must be non-nil") + } return &Copier{ db: db, table: tbl, @@ -93,6 +99,7 @@ func NewCopier(db *sql.DB, tbl, newTable *table.TableInfo, config *CopierConfig) chunker: chunker, logger: config.Logger, metricsSink: config.MetricsSink, + dbConfig: config.DBConfig, }, nil } @@ -131,7 +138,7 @@ func (c *Copier) CopyChunk(ctx context.Context, chunk *table.Chunk) error { c.logger.Debugf("running chunk: %s, query: %s", chunk.String(), query) var affectedRows int64 var err error - if affectedRows, err = dbconn.RetryableTransaction(ctx, c.db, c.finalChecksum, dbconn.NewDBConfig(), query); err != nil { + if affectedRows, err = dbconn.RetryableTransaction(ctx, c.db, c.finalChecksum, c.dbConfig, query); err != nil { return err } atomic.AddUint64(&c.CopyRowsCount, uint64(affectedRows))