diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 24964dd18..9d022e7a0 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -51,6 +51,7 @@ type WakuRelay struct { minPeersToPublish int + hostMutex sync.Mutex topicValidatorMutex sync.RWMutex topicValidators map[string][]validatorFn defaultTopicValidators []validatorFn @@ -103,25 +104,31 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou } func (w *WakuRelay) peerScoreInspector(peerScoresSnapshots map[peer.ID]*pubsub.PeerScoreSnapshot) { - if w.host == nil { + w.hostMutex.Lock() + host := w.host + w.hostMutex.Unlock() + + if host == nil { return } for pid, snap := range peerScoresSnapshots { if snap.Score < w.peerScoreThresholds.GraylistThreshold { // Disconnect bad peers - err := w.host.Network().ClosePeer(pid) + err := host.Network().ClosePeer(pid) if err != nil { w.log.Error("could not disconnect peer", logging.HostID("peer", pid), zap.Error(err)) } } - _ = w.host.Peerstore().(wps.WakuPeerstore).SetScore(pid, snap.Score) + _ = host.Peerstore().(wps.WakuPeerstore).SetScore(pid, snap.Score) } } // SetHost sets the host to be able to mount or consume a protocol func (w *WakuRelay) SetHost(h host.Host) { + w.hostMutex.Lock() w.host = h + w.hostMutex.Unlock() } // Start initiates the WakuRelay protocol @@ -155,8 +162,8 @@ func (w *WakuRelay) PubSub() *pubsub.PubSub { // Topics returns a list of all the pubsub topics currently subscribed to func (w *WakuRelay) Topics() []string { - defer w.topicsMutex.RUnlock() w.topicsMutex.RLock() + defer w.topicsMutex.RUnlock() var result []string for topic := range w.topics {