Skip to content

Commit

Permalink
Merge pull request #371 from Kiran01bm/mdlrefresh-stuff
Browse files Browse the repository at this point in the history
enhance the mdl refresh runner to handle connection closures and temporary network issues resiliently
  • Loading branch information
morgo authored Feb 13, 2025
2 parents 6170ede + bf7d314 commit f96a72c
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 3 deletions.
42 changes: 39 additions & 3 deletions pkg/dbconn/metadatalock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dbconn

import (
"context"
"database/sql"
"errors"
"fmt"
"time"
Expand All @@ -23,6 +24,7 @@ type MetadataLock struct {
cancel context.CancelFunc
closeCh chan error
refreshInterval time.Duration
db *sql.DB
}

func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, logger loggers.Advanced, optionFns ...func(*MetadataLock)) (*MetadataLock, error) {
Expand All @@ -42,7 +44,8 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo
// Setup the dedicated connection for this lock
dbConfig := NewDBConfig()
dbConfig.MaxOpenConnections = 1
dbConn, err := New(dsn, dbConfig)
var err error
mdl.db, err = New(dsn, dbConfig)
if err != nil {
return nil, err
}
Expand All @@ -63,12 +66,15 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo
// in the future, just add another MySQL round-trip to compute the lock name server-side
// and then use the returned string in the GET_LOCK call.
stmt := sqlescape.MustEscapeSQL("SELECT GET_LOCK( concat(left(%?,20),'.',left(%?,32),'-',left(sha1(concat(%?,%?)),8)), %?)", table.SchemaName, table.TableName, table.SchemaName, table.TableName, getLockTimeout.Seconds())
if err := dbConn.QueryRowContext(ctx, stmt).Scan(&answer); err != nil {
if err := mdl.db.QueryRowContext(ctx, stmt).Scan(&answer); err != nil {
return fmt.Errorf("could not acquire metadata lock: %s", err)
}
if answer == 0 {
// 0 means the lock is held by another connection
// TODO: we could lookup the connection that holds the lock and report details about it
logger.Warnf("could not acquire metadata lock for %s.%s, lock is held by another connection", table.SchemaName, table.TableName)

// TODO: we could deal in error codes instead of string contains checks.
return fmt.Errorf("could not acquire metadata lock for %s.%s, lock is held by another connection", table.SchemaName, table.TableName)
} else if answer != 1 {
// probably we never get here, but just in case
Expand Down Expand Up @@ -96,7 +102,7 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo
case <-ctx.Done():
// Close the dedicated connection to release the lock
logger.Warnf("releasing metadata lock for %s.%s", table.SchemaName, table.TableName)
mdl.closeCh <- dbConn.Close()
mdl.closeCh <- mdl.db.Close()
return
case <-ticker.C:
if err = getLock(); err != nil {
Expand All @@ -106,6 +112,27 @@ func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, lo
// that we did not make. This makes it a warning, not an error,
// and we can try again on the next tick interval.
logger.Warnf("could not refresh metadata lock: %s", err)

// try to close the existing connection
if closeErr := mdl.db.Close(); closeErr != nil {
logger.Warnf("could not close database connection: %s", closeErr)
continue
}

// try to re-establish the connection
mdl.db, err = New(dsn, dbConfig)
if err != nil {
logger.Warnf("could not re-establish database connection: %s", err)
continue
}

// try to acquire the lock again with the new connection
if err = getLock(); err != nil {
logger.Warnf("could not acquire metadata lock after re-establishing connection: %s", err)
continue
}

logger.Infof("re-acquired metadata lock after re-establishing connection: %s.%s", table.SchemaName, table.TableName)
} else {
logger.Debugf("refreshed metadata lock for %s.%s", table.SchemaName, table.TableName)
}
Expand All @@ -123,3 +150,12 @@ func (m *MetadataLock) Close() error {
// Wait for the dedicated connection to be closed and return its error (if any)
return <-m.closeCh
}

func (m *MetadataLock) CloseDBConnection(logger loggers.Advanced) error {
// Closes the database connection for the MetadataLock
logger.Infof("About to close MetadataLock database connection")
if m.db != nil {
return m.db.Close()
}
return nil
}
40 changes: 40 additions & 0 deletions pkg/dbconn/metadatalock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/cashapp/spirit/pkg/table"
"github.com/siddontang/loggers"

"github.com/cashapp/spirit/pkg/testutils"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -57,6 +58,7 @@ func TestMetadataLockContextCancel(t *testing.T) {
func TestMetadataLockRefresh(t *testing.T) {
lockTableInfo := table.TableInfo{SchemaName: "test", TableName: "test-refresh"}
logger := logrus.New()

mdl, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger, func(mdl *MetadataLock) {
// override the refresh interval for faster testing
mdl.refreshInterval = 2 * time.Second
Expand Down Expand Up @@ -88,3 +90,41 @@ func TestMetadataLockLength(t *testing.T) {
_, err = NewMetadataLock(context.Background(), testutils.DSN(), empty, logger)
assert.ErrorContains(t, err, "metadata lock table info is nil")
}

// simulateConnectionClose simulates a temporary network issue by closing the connection
func simulateConnectionClose(t *testing.T, mdl *MetadataLock, logger loggers.Advanced) {
// close the existing connection to simulate a network issue
err := mdl.CloseDBConnection(logger)
assert.NoError(t, err)

// wait a bit to ensure the connection is closed
time.Sleep(1 * time.Second)
}

func TestMetadataLockRefreshWithConnIssueSimulation(t *testing.T) {
lockTableInfo := table.TableInfo{SchemaName: "test", TableName: "test-refresh"}
logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)

// create a new MetadataLock with a short refresh interval for testing
mdl, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger, func(mdl *MetadataLock) {
mdl.refreshInterval = 2 * time.Second
})
assert.NoError(t, err)
assert.NotNil(t, mdl)

time.Sleep(4 * time.Second)

// simulate a temporary network issue by closing the connection
simulateConnectionClose(t, mdl, logger)

// wait for the refresh interval to trigger the connection failure and recovery
time.Sleep(4 * time.Second)

// confirm the lock is still held by attempting to acquire it with a new connection
_, err = NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
assert.ErrorContains(t, err, "lock is held by another connection")

// close the lock
assert.NoError(t, mdl.Close())
}

0 comments on commit f96a72c

Please sign in to comment.