Skip to content

Commit

Permalink
Merge pull request #295 from cashapp/mtocker-fix-deadlocks
Browse files Browse the repository at this point in the history
reduce deadlocks in recopying chunks
  • Loading branch information
morgo authored Jun 12, 2024
2 parents 2658705 + 7c6bf04 commit 788655a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 3 deletions.
72 changes: 70 additions & 2 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Checker struct {
logger loggers.Advanced
fixDifferences bool
differencesFound atomic.Uint64
recopyLock sync.Mutex
}

type CheckerConfig struct {
Expand Down Expand Up @@ -229,8 +230,75 @@ func (c *Checker) replaceChunk(chunk *table.Chunk) error {
c.table.QuotedName,
chunk.String(),
)
_, err := dbconn.RetryableTransaction(context.TODO(), c.db, false, dbconn.NewDBConfig(), deleteStmt, replaceStmt)
return err
// Note: historically this process has caused deadlocks between the DELETE statement
// in one replaceChunk and the REPLACE statement of another chunk. Inspection of
// SHOW ENGINE INNODB STATUS shows that this is not caused by locks on the PRIMARY KEY,
// but a unique secondary key (in our case an idempotence key):
//
// ------------------------
// LATEST DETECTED DEADLOCK
// ------------------------
// 2024-06-11 18:34:21 70676106989440
// *** (1) TRANSACTION:
// TRANSACTION 15106308424, ACTIVE 4 sec updating or deleting
// mysql tables in use 1, locked 1
// LOCK WAIT 620 lock struct(s), heap size 73848, 49663 row lock(s), undo log entries 49661
// MySQL thread id 540806, OS thread handle 70369444421504, query id 409280999 10.137.84.232 <snip> updating
// DELETE FROM `<snip>`.`_<snip>_new` WHERE `id` >= 1108588365 AND `id` < 1108688365
//
// *** (1) HOLDS THE LOCK(S):
// RECORD LOCKS space id 1802 page no 26277057 n bits 232 index idempotence_key_idx of table `<snip>`.`_<snip>_new` trx id 15106308424 lock_mode X locks rec but not gap
// Record lock, heap no 163 PHYSICAL RECORD: n_fields 2; compact format; info bits 32
// 0: len 30; hex <snip>; asc <snip>; (total 62 bytes);
// 1: len 8; hex <snip>; asc <snip>;;
//
//
// *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
// RECORD LOCKS space id 1802 page no 1945840 n bits 280 index idempotence_key_idx of table `<snip>`.`_<snip>_new` trx id 15106308424 lock_mode X locks rec but not gap waiting
// Record lock, heap no 75 PHYSICAL RECORD: n_fields 2; compact format; info bits 0
// 0: len 30; hex <snip>; asc <snip>; (total 62 bytes);
// 1: len 8; hex <snip>; asc <snip>;;
//
//
// *** (2) TRANSACTION:
// TRANSACTION 15106301192, ACTIVE 58 sec inserting
// mysql tables in use 2, locked 1
// LOCK WAIT 220020 lock struct(s), heap size 27680888, 409429 row lock(s), undo log entries 162834
// MySQL thread id 540264, OS thread handle 70369485823872, query id 409127061 10.137.84.232 <snip> executing
// REPLACE INTO `<snip>`.`_<snip>_new` (`id`, <snip> FROM `<snip>`.`<snip>` WHERE `id` >= 1106488365 AND `id` < 1106588365
//
// *** (2) HOLDS THE LOCK(S):
// RECORD LOCKS space id 1802 page no 1945840 n bits 280 index idempotence_key_idx of table `<snip>`.`_<snip>_new` trx id 15106301192 lock_mode X
// Record lock, heap no 75 PHYSICAL RECORD: n_fields 2; compact format; info bits 0
// 0: len 30; hex <snip>; asc <snip>; (total 62 bytes);
// 1: len 8; hex <snip>; asc B yI;;
//
//
// *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
// RECORD LOCKS space id 1802 page no 26277057 n bits 232 index idempotence_key_idx of table `<snip>`.`_<snip>_new` trx id 15106301192 lock_mode X waiting
// Record lock, heap no 163 PHYSICAL RECORD: n_fields 2; compact format; info bits 32
// 0: len 30; hex <snip>; asc <snip>; (total 62 bytes);
// 1: len 8; hex <snip>; asc <snip>;;
//
// *** WE ROLL BACK TRANSACTION (1)
//
// We don't need this to be an atomic transaction. We just need to delete from the _new table
// first so that any since-deleted rows (which wouldn't get removed by replace) are removed first.
// By doing this as two transactions we should be able to remove
// the opportunity for deadlocks.
//
// We further prevent the chance of deadlocks from the recopying process by only re-copying one chunk at a time.
// We may revisit this in future, but since conflicts are expected to be low, it should be fine for now.
c.recopyLock.Lock()
defer c.recopyLock.Unlock()

if _, err := dbconn.RetryableTransaction(context.TODO(), c.db, false, dbconn.NewDBConfig(), deleteStmt); err != nil {
return err
}
if _, err := dbconn.RetryableTransaction(context.TODO(), c.db, false, dbconn.NewDBConfig(), replaceStmt); err != nil {
return err
}
return nil
}

func (c *Checker) isHealthy(ctx context.Context) bool {
Expand Down
2 changes: 2 additions & 0 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ func (r *Runner) setup(ctx context.Context) error {
Throttler: &throttler.Noop{},
Logger: r.logger,
MetricsSink: r.metricsSink,
DBConfig: r.dbConfig,
})
if err != nil {
return err
Expand Down Expand Up @@ -747,6 +748,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
Throttler: &throttler.Noop{},
Logger: r.logger,
MetricsSink: r.metricsSink,
DBConfig: r.dbConfig,
}, lowWatermark, rowsCopied, rowsCopiedLogical)

if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,8 @@ func TestCheckpoint(t *testing.T) {
// the migration process manually.
r.db, err = dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)
r.dbConfig = dbconn.NewDBConfig()

// Get Table Info
r.table = table.NewTableInfo(r.db, r.migration.Database, r.migration.Table)
err = r.table.SetInfo(context.TODO())
Expand Down Expand Up @@ -1202,6 +1204,7 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) {
Throttler: &throttler.Noop{},
Logger: m.logger,
MetricsSink: &metrics.NoopSink{},
DBConfig: dbconn.NewDBConfig(),
})
assert.NoError(t, err)
m.replClient.KeyAboveCopierCallback = m.copier.KeyAboveHighWatermark
Expand Down Expand Up @@ -1330,6 +1333,7 @@ func TestE2EBinlogSubscribingNonCompositeKey(t *testing.T) {
Throttler: &throttler.Noop{},
Logger: m.logger,
MetricsSink: &metrics.NoopSink{},
DBConfig: dbconn.NewDBConfig(),
})
assert.NoError(t, err)
m.replClient.KeyAboveCopierCallback = m.copier.KeyAboveHighWatermark
Expand Down Expand Up @@ -1931,6 +1935,7 @@ func TestE2ERogueValues(t *testing.T) {
Throttler: &throttler.Noop{},
Logger: m.logger,
MetricsSink: &metrics.NoopSink{},
DBConfig: dbconn.NewDBConfig(),
})
assert.NoError(t, err)
m.replClient.KeyAboveCopierCallback = m.copier.KeyAboveHighWatermark
Expand Down Expand Up @@ -2075,6 +2080,7 @@ func TestResumeFromCheckpointPhantom(t *testing.T) {
// Do the initial setup.
m.db, err = dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)
m.dbConfig = dbconn.NewDBConfig()
m.table = table.NewTableInfo(m.db, m.migration.Database, m.migration.Table)
assert.NoError(t, m.table.SetInfo(ctx))
assert.NoError(t, m.createNewTable(ctx))
Expand All @@ -2093,6 +2099,7 @@ func TestResumeFromCheckpointPhantom(t *testing.T) {
Throttler: &throttler.Noop{},
Logger: m.logger,
MetricsSink: &metrics.NoopSink{},
DBConfig: dbconn.NewDBConfig(),
})
assert.NoError(t, err)
m.replClient.KeyAboveCopierCallback = m.copier.KeyAboveHighWatermark
Expand Down Expand Up @@ -2163,6 +2170,7 @@ func TestResumeFromCheckpointPhantom(t *testing.T) {
assert.NoError(t, err)
m.db, err = dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)
m.dbConfig = dbconn.NewDBConfig()
m.table = table.NewTableInfo(m.db, m.migration.Database, m.migration.Table)
assert.NoError(t, m.table.SetInfo(ctx))
// check we can resume from checkpoint.
Expand Down
9 changes: 8 additions & 1 deletion pkg/row/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Copier struct {
startTime time.Time
ExecTime time.Duration
Throttler throttler.Throttler
dbConfig *dbconn.DBConfig
logger loggers.Advanced
metricsSink metrics.Sink
}
Expand All @@ -60,6 +61,7 @@ type CopierConfig struct {
Throttler throttler.Throttler
Logger loggers.Advanced
MetricsSink metrics.Sink
DBConfig *dbconn.DBConfig
}

// NewCopierDefaultConfig returns a default config for the copier.
Expand All @@ -71,6 +73,7 @@ func NewCopierDefaultConfig() *CopierConfig {
Throttler: &throttler.Noop{},
Logger: logrus.New(),
MetricsSink: &metrics.NoopSink{},
DBConfig: dbconn.NewDBConfig(),
}
}

Expand All @@ -83,6 +86,9 @@ func NewCopier(db *sql.DB, tbl, newTable *table.TableInfo, config *CopierConfig)
if err != nil {
return nil, err
}
if config.DBConfig == nil {
return nil, errors.New("dbConfig must be non-nil")
}
return &Copier{
db: db,
table: tbl,
Expand All @@ -93,6 +99,7 @@ func NewCopier(db *sql.DB, tbl, newTable *table.TableInfo, config *CopierConfig)
chunker: chunker,
logger: config.Logger,
metricsSink: config.MetricsSink,
dbConfig: config.DBConfig,
}, nil
}

Expand Down Expand Up @@ -131,7 +138,7 @@ func (c *Copier) CopyChunk(ctx context.Context, chunk *table.Chunk) error {
c.logger.Debugf("running chunk: %s, query: %s", chunk.String(), query)
var affectedRows int64
var err error
if affectedRows, err = dbconn.RetryableTransaction(ctx, c.db, c.finalChecksum, dbconn.NewDBConfig(), query); err != nil {
if affectedRows, err = dbconn.RetryableTransaction(ctx, c.db, c.finalChecksum, c.dbConfig, query); err != nil {
return err
}
atomic.AddUint64(&c.CopyRowsCount, uint64(affectedRows))
Expand Down

0 comments on commit 788655a

Please sign in to comment.