diff --git a/pkg/dbconn/metadatalock.go b/pkg/dbconn/metadatalock.go index 29547a9..ee58977 100644 --- a/pkg/dbconn/metadatalock.go +++ b/pkg/dbconn/metadatalock.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha1" "database/sql" + "encoding/hex" "errors" "fmt" "time" @@ -26,6 +27,7 @@ type MetadataLock struct { closeCh chan error refreshInterval time.Duration db *sql.DB + lockName string } func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, logger loggers.Advanced, optionFns ...func(*MetadataLock)) (*MetadataLock, error) { @@ -51,8 +53,6 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo return nil, err } - var lockName string - // Function to acquire the lock getLock := func() error { // https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html#function_get-lock @@ -63,26 +63,28 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo // The hash is truncated to 8 characters to avoid the maximum lock length. // bizarrely_long_schema_name.thisisareallylongtablenamethisisareallylongtablename60charac ==> // bizarrely_long_schem.thisisareallylongtablenamethisis-66fec116 - lockName = computeLockName(table) - stmt := sqlescape.MustEscapeSQL("SELECT GET_LOCK(%?, %?)", lockName, getLockTimeout.Seconds()) + mdl.lockName = computeLockName(table) + stmt := sqlescape.MustEscapeSQL("SELECT GET_LOCK(%?, %?)", mdl.lockName, getLockTimeout.Seconds()) if err := mdl.db.QueryRowContext(ctx, stmt).Scan(&answer); err != nil { return fmt.Errorf("could not acquire metadata lock: %s", err) } if answer == 0 { // 0 means the lock is held by another connection // TODO: we could lookup the connection that holds the lock and report details about it - logger.Warnf("could not acquire metadata lock for %s, lock is held by another connection", lockName) - return fmt.Errorf("could not acquire metadata lock for %s, lock is held by another connection", lockName) + logger.Warnf("could not acquire metadata lock for %s, lock is held by another connection", mdl.lockName) + + // TODO: we could deal in error codes instead of string contains checks. + return fmt.Errorf("could not acquire metadata lock for %s, lock is held by another connection", mdl.lockName) } else if answer != 1 { // probably we never get here, but just in case - return fmt.Errorf("could not acquire metadata lock %s, GET_LOCK returned: %d", lockName, answer) + return fmt.Errorf("could not acquire metadata lock %s, GET_LOCK returned: %d", mdl.lockName, answer) } return nil } // Acquire the lock or return an error immediately // We only Infof the initial acquisition. - logger.Infof("attempting to acquire metadata lock %s", lockName) + logger.Infof("attempting to acquire metadata lock %s", mdl.lockName) if err = getLock(); err != nil { return nil, err } @@ -98,7 +100,7 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo select { case <-ctx.Done(): // Close the dedicated connection to release the lock - logger.Warnf("releasing metadata lock for %s", lockName) + logger.Warnf("releasing metadata lock for %s", mdl.lockName) mdl.closeCh <- mdl.db.Close() return case <-ticker.C: @@ -131,7 +133,7 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo logger.Infof("re-acquired metadata lock after re-establishing connection: %s.%s", table.SchemaName, table.TableName) } else { - logger.Debugf("refreshed metadata lock for %s", lockName) + logger.Debugf("refreshed metadata lock for %s", mdl.lockName) } } } @@ -157,6 +159,10 @@ func (m *MetadataLock) CloseDBConnection(logger loggers.Advanced) error { return nil } +func (m *MetadataLock) GetLockName() string { + return m.lockName +} + func computeLockName(table *table.TableInfo) string { schemaNamePart := table.SchemaName if len(schemaNamePart) > 20 { @@ -170,7 +176,7 @@ func computeLockName(table *table.TableInfo) string { hash := sha1.New() hash.Write([]byte(table.SchemaName + table.TableName)) - hashPart := fmt.Sprintf("%x", hash.Sum(nil))[:8] + hashPart := hex.EncodeToString(hash.Sum(nil))[:8] lockName := fmt.Sprintf("%s.%s-%s", schemaNamePart, tableNamePart, hashPart) return lockName diff --git a/pkg/migration/runner_test.go b/pkg/migration/runner_test.go index 1a5a094..5307aec 100644 --- a/pkg/migration/runner_test.go +++ b/pkg/migration/runner_test.go @@ -2769,7 +2769,10 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) { // as tablename while the migration is running. lock, err := dbconn.NewMetadataLock(ctx, testutils.DSN(), &tableInfo, &testLogger{}) - assert.ErrorContains(t, err, "could not acquire metadata lock for test.resume_checkpoint_e2e_w_sentinel, lock is held by another connection") + assert.Error(t, err) + if lock != nil { + assert.ErrorContains(t, err, fmt.Sprintf("could not acquire metadata lock for %s, lock is held by another connection", lock.GetLockName())) + } assert.Nil(t, lock) break }