diff --git a/gateway/connections/state.go b/gateway/connections/state.go index f6909a4..42ac436 100644 --- a/gateway/connections/state.go +++ b/gateway/connections/state.go @@ -33,6 +33,7 @@ import ( "context" "crypto/tls" "fmt" + "github.com/go-zookeeper/zk" "github.com/openconfig/gnmi/errlist" "sync" "time" @@ -236,22 +237,27 @@ func (t *ConnectionState) connectWithLock(connectionSlot *semaphore.Weighted) { var connectionSlotAcquired = false for !t.stopped { if !connectionSlotAcquired { + t.config.Log.Info().Msgf("Target %s: Acquiring connection slot", t.name) connectionSlotAcquired = connectionSlot.TryAcquire(1) } if connectionSlotAcquired { if !t.ConnectionLockAcquired { + t.config.Log.Info().Msgf("Target %s: Acquiring lock", t.name) var err error t.ConnectionLockAcquired, err = t.lock.Try() if err != nil { - t.config.Log.Error().Msgf("error while trying to acquire lock: %v", err) + t.config.Log.Error().Msgf("Target %s: error while trying to acquire lock: %v", t.name, err) + time.Sleep(2 * time.Second) } } if t.ConnectionLockAcquired { t.config.Log.Info().Msgf("Target %s: Lock acquired", t.name) t.doConnect() - err := t.lock.Unlock() - if err != nil { - t.config.Log.Warn().Err(err).Msgf("Target %s: error while releasing lock: %v", t.name, err) + if t.lock.LockAcquired() { + err := t.lock.Unlock() + if err != nil && err != zk.ErrNotLocked { + t.config.Log.Error().Msgf("Target %s: error while releasing lock: %v", t.name, err) + } } t.ConnectionLockAcquired = false t.config.Log.Info().Msgf("Target %s: Lock released", t.name) @@ -260,6 +266,7 @@ func (t *ConnectionState) connectWithLock(connectionSlot *semaphore.Weighted) { } } } + t.config.Log.Info().Msgf("Target %s: Stopped", t.name) if connectionSlotAcquired { connectionSlot.Release(1) } diff --git a/gateway/connections/zookeeper.go b/gateway/connections/zookeeper.go index 57cc1b8..dede961 100644 --- a/gateway/connections/zookeeper.go +++ b/gateway/connections/zookeeper.go @@ -66,7 +66,9 @@ func (c *ZookeeperConnectionManager) eventListener(zkEvents <-chan zk.Event) { for _, targetConfig := range c.connections { if targetConfig.useLock { err := targetConfig.unlock() - c.config.Log.Error().Msgf("error while unlocking target: %v", err) + if err != nil { + c.config.Log.Error().Msgf("error while unlocking target: %v", err) + } } } c.connectionsMutex.Unlock() diff --git a/gateway/gateway.go b/gateway/gateway.go index 28901a3..42b2931 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -425,7 +425,7 @@ func (g *Gateway) zookeeperEventHandler(zkEventChan <-chan zk.Event) { switch event.State { case zk.StateDisconnected: disconnectedCount++ - if disconnectedCount > 5 { + if disconnectedCount > 50 { panic("too many Zookeeper disconnects") } case zk.StateHasSession: diff --git a/gateway/locking/local.go b/gateway/locking/local.go index 125db95..f025f22 100644 --- a/gateway/locking/local.go +++ b/gateway/locking/local.go @@ -40,6 +40,10 @@ func NewNonBlockingLock(id string, member string) DistributedLocker { } } +func (l *NonBlockingLock) LockAcquired() bool { + return l.acquired +} + func (l *NonBlockingLock) GetMember(id string) (string, error) { val, exists := registry.Load(id) if exists { diff --git a/gateway/locking/locker.go b/gateway/locking/locker.go index 82b0241..b5c22c7 100644 --- a/gateway/locking/locker.go +++ b/gateway/locking/locker.go @@ -18,14 +18,16 @@ package locking // DistributedLocker is an interface for creating non-blocking locks // among distributed processes. type DistributedLocker interface { + // LockAcquired returns true if the lock is currently acquired. + LockAcquired() bool // Try to acquire the lock. If the lock is already acquired return true and // a deadlock error. Try() (bool, error) // Unlock the lock. Unlock() error - // Return the ID for this lock. + // ID returns the ID or lock path for this lock. ID() string - // Get the member that currently has the lock for the ID, if it's currently + // GetMember gets the member that currently has the lock for the provided ID, if it's currently // locked, otherwise return an empty string. GetMember(id string) (string, error) } diff --git a/gateway/locking/zookeeper.go b/gateway/locking/zookeeper.go index 78dbf34..3b676dc 100644 --- a/gateway/locking/zookeeper.go +++ b/gateway/locking/zookeeper.go @@ -46,22 +46,27 @@ package locking import ( "fmt" + "github.com/rs/zerolog/log" "strconv" "strings" + "sync" "time" "github.com/go-zookeeper/zk" ) +var _ DistributedLocker = new(ZookeeperNonBlockingLock) + type ZookeeperNonBlockingLock struct { acquired bool conn *zk.Conn // The member that is holding the lock. This is usually the address and port where the cluster member is reachable. - member string - id string - acl []zk.ACL - lockPath string - seq int + member string + id string + acl []zk.ACL + lockPath string + seq int + unlockMutex sync.Mutex } // NewZookeeperNonBlockingLock creates a new lock instance using the provided connection, path, and acl. @@ -77,6 +82,10 @@ func NewZookeeperNonBlockingLock(conn *zk.Conn, id string, member string, acl [] } } +func (l *ZookeeperNonBlockingLock) LockAcquired() bool { + return l.acquired +} + func GetMember(conn *zk.Conn, id string) (string, error) { trimmedID := "/" + strings.Trim(id, "/") _, lowestSeqPath, err := lowestSeqChild(conn, trimmedID) @@ -229,13 +238,16 @@ func (l *ZookeeperNonBlockingLock) lowestSeqChild(path string) (int, string, err func (l *ZookeeperNonBlockingLock) watchState() { currentState := l.conn.State() - for l.acquired && (currentState == zk.StateConnected || currentState == zk.StateHasSession) { + for l.LockAcquired() && (currentState == zk.StateConnected || currentState == zk.StateHasSession) { time.Sleep(500 * time.Millisecond) currentState = l.conn.State() } - // disconnected - if l.acquired { - l.released() + // zk is disconnected + if l.LockAcquired() { + err := l.Unlock() + if err != nil { + log.Error().Msgf("watchState: unable to unlock zookeeper lock: %v", err) + } } } @@ -243,19 +255,19 @@ func (l *ZookeeperNonBlockingLock) watchState() { // this Lock instance than ErrNotLocked is returned. // This should only be called if we're still connected. func (l *ZookeeperNonBlockingLock) Unlock() error { + l.unlockMutex.Lock() + defer l.unlockMutex.Unlock() if l.lockPath == "" { return zk.ErrNotLocked } if err := l.conn.Delete(l.lockPath, -1); err != nil { return fmt.Errorf("unable to release lock gracefully: %s", err) } - l.released() - //log.Info().Msg("Cluster lock released.") - return nil -} -func (l *ZookeeperNonBlockingLock) released() { + // reset the lock internals l.lockPath = "" l.seq = 0 l.acquired = false + log.Info().Msg("Cluster lock released.") + return nil }