Skip to content

Commit

Permalink
export LockName so clients aquiring locks can get their lockname
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiran01bm committed Feb 16, 2025
1 parent e2d5a30 commit 43e42a7
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
25 changes: 15 additions & 10 deletions pkg/dbconn/metadatalock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type MetadataLock struct {
closeCh chan error
refreshInterval time.Duration
db *sql.DB
lockName string
}

func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, logger loggers.Advanced, optionFns ...func(*MetadataLock)) (*MetadataLock, error) {
Expand All @@ -51,8 +52,6 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo
return nil, err
}

var lockName string

// Function to acquire the lock
getLock := func() error {
// https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html#function_get-lock
Expand All @@ -63,26 +62,28 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo
// The hash is truncated to 8 characters to avoid the maximum lock length.
// bizarrely_long_schema_name.thisisareallylongtablenamethisisareallylongtablename60charac ==>
// bizarrely_long_schem.thisisareallylongtablenamethisis-66fec116
lockName = computeLockName(table)
stmt := sqlescape.MustEscapeSQL("SELECT GET_LOCK(%?, %?)", lockName, getLockTimeout.Seconds())
mdl.lockName = computeLockName(table)
stmt := sqlescape.MustEscapeSQL("SELECT GET_LOCK(%?, %?)", mdl.lockName, getLockTimeout.Seconds())
if err := mdl.db.QueryRowContext(ctx, stmt).Scan(&answer); err != nil {
return fmt.Errorf("could not acquire metadata lock: %s", err)
}
if answer == 0 {
// 0 means the lock is held by another connection
// TODO: we could lookup the connection that holds the lock and report details about it
logger.Warnf("could not acquire metadata lock for %s, lock is held by another connection", lockName)
return fmt.Errorf("could not acquire metadata lock for %s, lock is held by another connection", lockName)
logger.Warnf("could not acquire metadata lock for %s, lock is held by another connection", mdl.lockName)

// TODO: we could deal in error codes instead of string contains checks.
return fmt.Errorf("could not acquire metadata lock for %s, lock is held by another connection", mdl.lockName)
} else if answer != 1 {
// probably we never get here, but just in case
return fmt.Errorf("could not acquire metadata lock %s, GET_LOCK returned: %d", lockName, answer)
return fmt.Errorf("could not acquire metadata lock %s, GET_LOCK returned: %d", mdl.lockName, answer)
}
return nil
}

// Acquire the lock or return an error immediately
// We only Infof the initial acquisition.
logger.Infof("attempting to acquire metadata lock %s", lockName)
logger.Infof("attempting to acquire metadata lock %s", mdl.lockName)
if err = getLock(); err != nil {
return nil, err
}
Expand All @@ -98,7 +99,7 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo
select {
case <-ctx.Done():
// Close the dedicated connection to release the lock
logger.Warnf("releasing metadata lock for %s", lockName)
logger.Warnf("releasing metadata lock for %s", mdl.lockName)
mdl.closeCh <- mdl.db.Close()
return
case <-ticker.C:
Expand Down Expand Up @@ -131,7 +132,7 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo

logger.Infof("re-acquired metadata lock after re-establishing connection: %s.%s", table.SchemaName, table.TableName)
} else {
logger.Debugf("refreshed metadata lock for %s", lockName)
logger.Debugf("refreshed metadata lock for %s", mdl.lockName)
}
}
}
Expand All @@ -157,6 +158,10 @@ func (m *MetadataLock) CloseDBConnection(logger loggers.Advanced) error {
return nil
}

func (m *MetadataLock) GetLockName() string {
return m.lockName
}

func computeLockName(table *table.TableInfo) string {
schemaNamePart := table.SchemaName
if len(schemaNamePart) > 20 {
Expand Down
5 changes: 4 additions & 1 deletion pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2769,7 +2769,10 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) {
// as tablename while the migration is running.
lock, err := dbconn.NewMetadataLock(ctx, testutils.DSN(),
&tableInfo, &testLogger{})
assert.ErrorContains(t, err, "could not acquire metadata lock for test.resume_checkpoint_e2e_w_sentinel, lock is held by another connection")
assert.Error(t, err)
if lock != nil {
assert.ErrorContains(t, err, fmt.Sprintf("could not acquire metadata lock for %s, lock is held by another connection", lock.GetLockName()))
}
assert.Nil(t, lock)
break
}
Expand Down

0 comments on commit 43e42a7

Please sign in to comment.