Skip to content

Commit

Permalink
Merge pull request #357 from kolbe/lock-name-hash
Browse files Browse the repository at this point in the history
use a hash for metadata lock name
  • Loading branch information
kolbe authored Dec 3, 2024
2 parents d1dc655 + 3a1f4b9 commit 1d7524e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 31 deletions.
36 changes: 23 additions & 13 deletions pkg/dbconn/metadatalock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"time"

"github.com/cashapp/spirit/pkg/table"

"github.com/cashapp/spirit/pkg/dbconn/sqlescape"
"github.com/siddontang/loggers"
)
Expand All @@ -23,12 +25,9 @@ type MetadataLock struct {
refreshInterval time.Duration
}

func NewMetadataLock(ctx context.Context, dsn string, lockName string, logger loggers.Advanced, optionFns ...func(*MetadataLock)) (*MetadataLock, error) {
if len(lockName) == 0 {
return nil, errors.New("metadata lock name is empty")
}
if len(lockName) > 64 {
return nil, fmt.Errorf("metadata lock name is too long: %d, max length is 64", len(lockName))
func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, logger loggers.Advanced, optionFns ...func(*MetadataLock)) (*MetadataLock, error) {
if table == nil {
return nil, errors.New("metadata lock table info is nil")
}

mdl := &MetadataLock{
Expand All @@ -52,28 +51,39 @@ func NewMetadataLock(ctx context.Context, dsn string, lockName string, logger lo
getLock := func() error {
// https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html#function_get-lock
var answer int
stmt := sqlescape.MustEscapeSQL("SELECT GET_LOCK(%?, %?)", lockName, getLockTimeout.Seconds())
// Using the table name alone entails a maximum lock length and leads to conflicts
// between different tables with the same name in different schemas.
// We use the schema name and table name to create a unique lock name with a hash.
// The hash is truncated to 8 characters to avoid the maximum lock length.
// bizarrely_long_schema_name.thisisareallylongtablenamethisisareallylongtablename60charac ==>
// bizarrely_long_schem.thisisareallylongtablenamethisis-66fec116
//
// The computation of the hash is done server-side to simplify the whole process,
// but that means we can't easily log the actual lock name used. If you want to do that
// in the future, just add another MySQL round-trip to compute the lock name server-side
// and then use the returned string in the GET_LOCK call.
stmt := sqlescape.MustEscapeSQL("SELECT GET_LOCK( concat(left(%?,20),'.',left(%?,32),'-',left(sha1(concat(%?,%?)),8)), %?)", table.SchemaName, table.TableName, table.SchemaName, table.TableName, getLockTimeout.Seconds())
if err := dbConn.QueryRowContext(ctx, stmt).Scan(&answer); err != nil {
return fmt.Errorf("could not acquire metadata lock: %s", err)
}
if answer == 0 {
// 0 means the lock is held by another connection
// TODO: we could lookup the connection that holds the lock and report details about it
return fmt.Errorf("could not acquire metadata lock: %s, lock is held by another connection", lockName)
return fmt.Errorf("could not acquire metadata lock for %s.%s, lock is held by another connection", table.SchemaName, table.TableName)
} else if answer != 1 {
// probably we never get here, but just in case
return fmt.Errorf("could not acquire metadata lock: %s, GET_LOCK returned: %d", lockName, answer)
return fmt.Errorf("could not acquire metadata lock for %s.%s, GET_LOCK returned: %d", table.SchemaName, table.TableName, answer)
}
return nil
}

// Acquire the lock or return an error immediately
// We only Infof the initial acquisition.
logger.Infof("attempting to acquire metadata lock: %s", lockName)
logger.Infof("attempting to acquire metadata lock for %s.%s", table.SchemaName, table.TableName)
if err = getLock(); err != nil {
return nil, err
}
logger.Infof("acquired metadata lock: %s", lockName)
logger.Infof("acquired metadata lock: %s.%s", table.SchemaName, table.TableName)

// Setup background refresh runner
ctx, mdl.cancel = context.WithCancel(ctx)
Expand All @@ -85,7 +95,7 @@ func NewMetadataLock(ctx context.Context, dsn string, lockName string, logger lo
select {
case <-ctx.Done():
// Close the dedicated connection to release the lock
logger.Warnf("releasing metadata lock: %s", lockName)
logger.Warnf("releasing metadata lock for %s.%s", table.SchemaName, table.TableName)
mdl.closeCh <- dbConn.Close()
return
case <-ticker.C:
Expand All @@ -97,7 +107,7 @@ func NewMetadataLock(ctx context.Context, dsn string, lockName string, logger lo
// and we can try again on the next tick interval.
logger.Warnf("could not refresh metadata lock: %s", err)
} else {
logger.Debugf("refreshed metadata lock: %s", lockName)
logger.Debugf("refreshed metadata lock for %s.%s", table.SchemaName, table.TableName)
}
}
}
Expand Down
33 changes: 18 additions & 15 deletions pkg/dbconn/metadatalock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,39 @@ import (
"testing"
"time"

"github.com/cashapp/spirit/pkg/table"

"github.com/cashapp/spirit/pkg/testutils"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

func TestMetadataLock(t *testing.T) {
lockName := "test"
lockTableInfo := table.TableInfo{SchemaName: "test", TableName: "test"}
logger := logrus.New()
mdl, err := NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger)
mdl, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
assert.NoError(t, err)
assert.NotNil(t, mdl)

// Confirm a second lock cannot be acquired
_, err = NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger)
_, err = NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
assert.ErrorContains(t, err, "lock is held by another connection")

// Close the original mdl
assert.NoError(t, mdl.Close())

// Confirm a new lock can be acquired
mdl3, err := NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger)
mdl3, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
assert.NoError(t, err)
assert.NoError(t, mdl3.Close())
}

func TestMetadataLockContextCancel(t *testing.T) {
lockName := "test-cancel"
lockTableInfo := table.TableInfo{SchemaName: "test", TableName: "test-cancel"}

logger := logrus.New()
ctx, cancel := context.WithCancel(context.Background())
mdl, err := NewMetadataLock(ctx, testutils.DSN(), lockName, logger)
mdl, err := NewMetadataLock(ctx, testutils.DSN(), &lockTableInfo, logger)
assert.NoError(t, err)
assert.NotNil(t, mdl)

Expand All @@ -46,16 +48,16 @@ func TestMetadataLockContextCancel(t *testing.T) {
<-mdl.closeCh

// Confirm the lock is released by acquiring a new one
mdl2, err := NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger)
mdl2, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
assert.NoError(t, err)
assert.NotNil(t, mdl2)
assert.NoError(t, mdl2.Close())
}

func TestMetadataLockRefresh(t *testing.T) {
lockName := "test-refresh"
lockTableInfo := table.TableInfo{SchemaName: "test", TableName: "test-refresh"}
logger := logrus.New()
mdl, err := NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger, func(mdl *MetadataLock) {
mdl, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger, func(mdl *MetadataLock) {
// override the refresh interval for faster testing
mdl.refreshInterval = 2 * time.Second
})
Expand All @@ -66,22 +68,23 @@ func TestMetadataLockRefresh(t *testing.T) {
time.Sleep(5 * time.Second)

// Confirm the lock is still held
_, err = NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger)
_, err = NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
assert.ErrorContains(t, err, "lock is held by another connection")

// Close the lock
assert.NoError(t, mdl.Close())
}

func TestMetadataLockLength(t *testing.T) {
long := "thisisareallylongtablenamethisisareallylongtablenamethisisareallylongtablename"
empty := ""
lockTableInfo := table.TableInfo{SchemaName: "test", TableName: "thisisareallylongtablenamethisisareallylongtablenamethisisareallylongtablename"}
var empty *table.TableInfo

logger := logrus.New()

_, err := NewMetadataLock(context.Background(), testutils.DSN(), long, logger)
assert.ErrorContains(t, err, "metadata lock name is too long")
_, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
// No error anymore after using a hash of the table name
assert.NoError(t, err)

_, err = NewMetadataLock(context.Background(), testutils.DSN(), empty, logger)
assert.ErrorContains(t, err, "metadata lock name is empty")
assert.ErrorContains(t, err, "metadata lock table info is nil")
}
2 changes: 1 addition & 1 deletion pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (r *Runner) Run(originalCtx context.Context) error {
}

// Take a metadata lock to prevent other migrations from running concurrently.
r.metadataLock, err = dbconn.NewMetadataLock(ctx, r.dsn(), r.table.TableName, r.logger)
r.metadataLock, err = dbconn.NewMetadataLock(ctx, r.dsn(), r.table, r.logger)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2711,6 +2711,8 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) {
statusInterval = 500 * time.Millisecond

tableName := `resume_checkpoint_e2e_w_sentinel`
tableInfo := table.TableInfo{SchemaName: "test", TableName: tableName}

testutils.RunSQL(t, fmt.Sprintf(`DROP TABLE IF EXISTS %s, _%s_old, _%s_chkpnt, _%s_sentinel`, tableName, tableName, tableName, tableName))
table := fmt.Sprintf(`CREATE TABLE %s (
id int(11) NOT NULL AUTO_INCREMENT,
Expand Down Expand Up @@ -2766,8 +2768,8 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) {
// Test that it's not possible to acquire metadata lock with name
// as tablename while the migration is running.
lock, err := dbconn.NewMetadataLock(ctx, testutils.DSN(),
tableName, &testLogger{})
assert.ErrorContains(t, err, "could not acquire metadata lock: resume_checkpoint_e2e_w_sentinel, lock is held by another connection")
&tableInfo, &testLogger{})
assert.ErrorContains(t, err, "could not acquire metadata lock for test.resume_checkpoint_e2e_w_sentinel, lock is held by another connection")
assert.Nil(t, lock)
break
}
Expand Down

0 comments on commit 1d7524e

Please sign in to comment.