Skip to content

Commit

Permalink
fix: filter network change handling (#1270)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Jan 3, 2025
1 parent c0afa07 commit 4ef460c
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 23 deletions.
108 changes: 88 additions & 20 deletions waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// filterSubscriptions is the map of filter subscription IDs to subscriptions

const filterSubBatchSize = 90
const initNetworkConnType = 255

type appFilterMap map[string]filterConfig

Expand All @@ -43,6 +44,7 @@ type FilterManager struct {
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
envProcessor EnevelopeProcessor
networkConnType byte
}

type SubDetails struct {
Expand Down Expand Up @@ -76,6 +78,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)
mgr.networkConnType = initNetworkConnType

//parsing the subscribe params only to read the batchInterval passed.
mgr.params = new(subscribeParameters)
Expand Down Expand Up @@ -114,8 +117,8 @@ func (mgr *FilterManager) startFilterSubLoop() {
}
}

// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batchlimit is hit, all filters are subscribed to and new batch is created.
// SubscribeFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batch-limit is hit, all filters are subscribed to and new batch is created.
// if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created

func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) {
Expand Down Expand Up @@ -182,37 +185,102 @@ func (mgr *FilterManager) NetworkChange() {
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
}

func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) {
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
}
}
}

func (mgr *FilterManager) closeAndWait(wg *sync.WaitGroup, asub *SubDetails) {
defer wg.Done()
asub.cancel()
for {
env, ok := <-asub.sub.DataCh
if !ok {
mgr.logger.Debug("unsubscribed filter", zap.Strings("content-topics", asub.sub.ContentFilter.ContentTopics.ToList()))
return
}
// process any in-flight envelopes
err := mgr.envProcessor.OnNewEnvelope(env)
if err != nil {
mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err))
}
}
}

func (mgr *FilterManager) resubscribeAllSubscriptions() {
filterSubsCount := len(mgr.filterSubscriptions)
mgr.Lock()
mgr.logger.Debug("unsubscribing all filter subscriptions", zap.Int("subs-count", filterSubsCount))
var wg sync.WaitGroup
wg.Add(len(mgr.filterSubscriptions))

for _, asub := range mgr.filterSubscriptions {
go mgr.closeAndWait(&wg, &asub)
}
mgr.filterSubscriptions = make(map[string]SubDetails)

mgr.Unlock()

wg.Wait() //Waiting till all unsubs are done to avoid race between sub and unsub

mgr.logger.Debug("unsubscribed all filter subscriptions", zap.Int("subs-count", filterSubsCount))

// locking to protect filterConfigs map, can't lock while calling subscribe as same lock is acquired inside subscribe
mgr.Lock()
localMap := make(appFilterMap)
for filterID, config := range mgr.filterConfigs {
localMap[filterID] = config
}
mgr.Unlock()

for filterID, config := range localMap {
mgr.SubscribeFilter(filterID, config.contentFilter)
}

}

// OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa
// Note that pubsubTopic specific change can be triggered by specifying pubsubTopic,
// if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online.
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) {
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool, connType byte) {
subs := mgr.node.Subscriptions()
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
/*
Checking for initialization condition because when filterManager is initialized networkConnType is set to 255 and when first time node goes online
the network conn type will be set and will trigger resubscribe which is not desired.
Change in connType refers to scenario where the localnode's network has changed e.g: a mobile switching between wifi and cellular,
this in-turn means ip address of the localnode has changed.
this can cause issues in filter-push where it never recovers and hence resubscribing all filters
*/
if mgr.networkConnType != initNetworkConnType &&
mgr.networkConnType != connType { //
// resubscribe all existing filters
go mgr.resubscribeAllSubscriptions()
}
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.onlineChecker.SetOnline(newStatus)
mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online")
mgr.Lock()
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
}
}
mgr.checkAndProcessQueue(pubsubTopic)
mgr.Unlock()
}

mgr.networkConnType = connType
mgr.onlineChecker.SetOnline(newStatus)
}

Expand Down
4 changes: 2 additions & 2 deletions waku/v2/api/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func (s *FilterApiTestSuite) TestFilterManager() {
// Mock peers going down
s.LightNodeHost.Peerstore().RemovePeer(s.FullNodeHost.ID())

fm.OnConnectionStatusChange("", false)
fm.OnConnectionStatusChange("", false, 0)
time.Sleep(2 * time.Second)
fm.OnConnectionStatusChange("", true)
fm.OnConnectionStatusChange("", true, 0)
s.ConnectToFullNode(s.LightNode, s.FullNode)
time.Sleep(3 * time.Second)

Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
}

if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
logger.Warn("received message push from unknown peer")
wf.metrics.RecordError(unknownPeerMessagePush)
//Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us
if err := stream.Reset(); err != nil {
Expand Down

0 comments on commit 4ef460c

Please sign in to comment.