Skip to content

Commit

Permalink
More generalized handling of lock errors
Browse files Browse the repository at this point in the history
  • Loading branch information
latenssi committed Sep 2, 2021
1 parent 6aa7328 commit 7f703c5
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
10 changes: 6 additions & 4 deletions chain_events/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,9 @@ func (l *Listener) Start() *Listener {
}

if err := l.initHeight(); err != nil {
if strings.Contains(err.Error(), "could not obtain lock on row") {
// Skip as another listener is already handling this
_, ok := err.(*LockError)
if ok {
// Skip LockError as it means another listener is already handling this
} else {
panic(err)
}
Expand Down Expand Up @@ -132,8 +133,9 @@ func (l *Listener) Start() *Listener {
})

if lockErr != nil {
if strings.Contains(lockErr.Error(), "could not obtain lock on row") {
// Skip as another listener is already handling this round
_, ok := lockErr.(*LockError)
if ok {
// Skip on LockError as it means another listener is already handling this round
continue
}
l.handleError(lockErr)
Expand Down
8 changes: 8 additions & 0 deletions chain_events/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,11 @@ package chain_events
type Store interface {
LockedStatus(func(*ListenerStatus) error) error
}

type LockError struct {
Err error
}

func (e *LockError) Error() string {
return e.Err.Error()
}
11 changes: 10 additions & 1 deletion chain_events/store_gorm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package chain_events

import (
"strings"

"gorm.io/gorm"
"gorm.io/gorm/clause"
)
Expand All @@ -18,7 +20,7 @@ func NewGormStore(db *gorm.DB) *GormStore {
// It takes a function 'fn' as argument. In the context of 'fn' 'status' is locked.
// Uses NOWAIT modifier on the lock so simultaneous requests can be ignored.
func (s *GormStore) LockedStatus(fn func(status *ListenerStatus) error) error {
return s.db.Transaction(func(tx *gorm.DB) error {
txErr := s.db.Transaction(func(tx *gorm.DB) error {
status := ListenerStatus{}

if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "NOWAIT"}).FirstOrCreate(&status).Error; err != nil {
Expand All @@ -35,4 +37,11 @@ func (s *GormStore) LockedStatus(fn func(status *ListenerStatus) error) error {

return nil // commit
})

// Need to handle implementation specific error message
if txErr != nil && strings.Contains(txErr.Error(), "could not obtain lock on row") {
return &LockError{Err: txErr}
}

return txErr
}

0 comments on commit 7f703c5

Please sign in to comment.