Skip to content

Commit

Permalink
Merge pull request #374 from Kiran01bm/compose-lockname-on-client
Browse files Browse the repository at this point in the history
compose lock name on spirit side - clients acquiring locks can get to know their exact lock name
  • Loading branch information
Kiran01bm authored Feb 19, 2025
2 parents 704b7ce + acf1a96 commit 76c1998
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 13 deletions.
46 changes: 34 additions & 12 deletions pkg/dbconn/metadatalock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package dbconn

import (
"context"
"crypto/sha1"
"database/sql"
"encoding/hex"
"errors"
"fmt"
"time"
Expand All @@ -25,6 +27,7 @@ type MetadataLock struct {
closeCh chan error
refreshInterval time.Duration
db *sql.DB
lockName string
}

func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, logger loggers.Advanced, optionFns ...func(*MetadataLock)) (*MetadataLock, error) {
Expand Down Expand Up @@ -60,32 +63,28 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo
// The hash is truncated to 8 characters to avoid the maximum lock length.
// bizarrely_long_schema_name.thisisareallylongtablenamethisisareallylongtablename60charac ==>
// bizarrely_long_schem.thisisareallylongtablenamethisis-66fec116
//
// The computation of the hash is done server-side to simplify the whole process,
// but that means we can't easily log the actual lock name used. If you want to do that
// in the future, just add another MySQL round-trip to compute the lock name server-side
// and then use the returned string in the GET_LOCK call.
stmt := sqlescape.MustEscapeSQL("SELECT GET_LOCK( concat(left(%?,20),'.',left(%?,32),'-',left(sha1(concat(%?,%?)),8)), %?)", table.SchemaName, table.TableName, table.SchemaName, table.TableName, getLockTimeout.Seconds())
mdl.lockName = computeLockName(table)
stmt := sqlescape.MustEscapeSQL("SELECT GET_LOCK(%?, %?)", mdl.lockName, getLockTimeout.Seconds())
if err := mdl.db.QueryRowContext(ctx, stmt).Scan(&answer); err != nil {
return fmt.Errorf("could not acquire metadata lock: %s", err)
}
if answer == 0 {
// 0 means the lock is held by another connection
// TODO: we could lookup the connection that holds the lock and report details about it
logger.Warnf("could not acquire metadata lock for %s.%s, lock is held by another connection", table.SchemaName, table.TableName)
logger.Warnf("could not acquire metadata lock for %s, lock is held by another connection", mdl.lockName)

// TODO: we could deal in error codes instead of string contains checks.
return fmt.Errorf("could not acquire metadata lock for %s.%s, lock is held by another connection", table.SchemaName, table.TableName)
return fmt.Errorf("could not acquire metadata lock for %s, lock is held by another connection", mdl.lockName)
} else if answer != 1 {
// probably we never get here, but just in case
return fmt.Errorf("could not acquire metadata lock for %s.%s, GET_LOCK returned: %d", table.SchemaName, table.TableName, answer)
return fmt.Errorf("could not acquire metadata lock %s, GET_LOCK returned: %d", mdl.lockName, answer)
}
return nil
}

// Acquire the lock or return an error immediately
// We only Infof the initial acquisition.
logger.Infof("attempting to acquire metadata lock for %s.%s", table.SchemaName, table.TableName)
logger.Infof("attempting to acquire metadata lock %s", mdl.lockName)
if err = getLock(); err != nil {
return nil, err
}
Expand All @@ -101,7 +100,7 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo
select {
case <-ctx.Done():
// Close the dedicated connection to release the lock
logger.Warnf("releasing metadata lock for %s.%s", table.SchemaName, table.TableName)
logger.Warnf("releasing metadata lock for %s", mdl.lockName)
mdl.closeCh <- mdl.db.Close()
return
case <-ticker.C:
Expand Down Expand Up @@ -134,7 +133,7 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo

logger.Infof("re-acquired metadata lock after re-establishing connection: %s.%s", table.SchemaName, table.TableName)
} else {
logger.Debugf("refreshed metadata lock for %s.%s", table.SchemaName, table.TableName)
logger.Debugf("refreshed metadata lock for %s", mdl.lockName)
}
}
}
Expand All @@ -159,3 +158,26 @@ func (m *MetadataLock) CloseDBConnection(logger loggers.Advanced) error {
}
return nil
}

func (m *MetadataLock) GetLockName() string {
return m.lockName
}

func computeLockName(table *table.TableInfo) string {
schemaNamePart := table.SchemaName
if len(schemaNamePart) > 20 {
schemaNamePart = schemaNamePart[:20]
}

tableNamePart := table.TableName
if len(tableNamePart) > 32 {
tableNamePart = tableNamePart[:32]
}

hash := sha1.New()
hash.Write([]byte(table.SchemaName + table.TableName))
hashPart := hex.EncodeToString(hash.Sum(nil))[:8]

lockName := fmt.Sprintf("%s.%s-%s", schemaNamePart, tableNamePart, hashPart)
return lockName
}
22 changes: 22 additions & 0 deletions pkg/dbconn/metadatalock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,28 @@ func TestMetadataLockRefresh(t *testing.T) {
assert.NoError(t, mdl.Close())
}

func TestComputeLockName(t *testing.T) {
tests := []struct {
table *table.TableInfo
expected string
}{
{
table: &table.TableInfo{SchemaName: "shortschema", TableName: "shorttable"},
expected: "shortschema.shorttable-",
},
{
table: &table.TableInfo{SchemaName: "averylongschemanamethatexceeds20chars", TableName: "averylongtablenamewhichexceeds32characters"},
expected: "averylongschemanamet.averylongtablenamewhichexceeds32-",
},
}

for _, test := range tests {
lockName := computeLockName(test.table)
assert.Contains(t, lockName, test.expected, "Lock name should contain the expected prefix")
assert.Len(t, lockName, len(test.expected)+8, "Lock name should have the correct length")
}
}

func TestMetadataLockLength(t *testing.T) {
lockTableInfo := table.TableInfo{SchemaName: "test", TableName: "thisisareallylongtablenamethisisareallylongtablenamethisisareallylongtablename"}
var empty *table.TableInfo
Expand Down
5 changes: 4 additions & 1 deletion pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2769,7 +2769,10 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) {
// as tablename while the migration is running.
lock, err := dbconn.NewMetadataLock(ctx, testutils.DSN(),
&tableInfo, &testLogger{})
assert.ErrorContains(t, err, "could not acquire metadata lock for test.resume_checkpoint_e2e_w_sentinel, lock is held by another connection")
assert.Error(t, err)
if lock != nil {
assert.ErrorContains(t, err, fmt.Sprintf("could not acquire metadata lock for %s, lock is held by another connection", lock.GetLockName()))
}
assert.Nil(t, lock)
break
}
Expand Down

0 comments on commit 76c1998

Please sign in to comment.