From 4ef460cb951a72b2a0be2b3f23347ef8f569ea08 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 3 Jan 2025 15:47:27 +0530 Subject: [PATCH] fix: filter network change handling (#1270) --- waku/v2/api/filter/filter_manager.go | 108 ++++++++++++++++++++++----- waku/v2/api/filter/filter_test.go | 4 +- waku/v2/protocol/filter/client.go | 2 +- 3 files changed, 91 insertions(+), 23 deletions(-) diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go index 665d577bd..66762238f 100644 --- a/waku/v2/api/filter/filter_manager.go +++ b/waku/v2/api/filter/filter_manager.go @@ -26,6 +26,7 @@ import ( // filterSubscriptions is the map of filter subscription IDs to subscriptions const filterSubBatchSize = 90 +const initNetworkConnType = 255 type appFilterMap map[string]filterConfig @@ -43,6 +44,7 @@ type FilterManager struct { filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter} waitingToSubQueue chan filterConfig envProcessor EnevelopeProcessor + networkConnType byte } type SubDetails struct { @@ -76,6 +78,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.filterConfigs = make(appFilterMap) mgr.waitingToSubQueue = make(chan filterConfig, 100) + mgr.networkConnType = initNetworkConnType //parsing the subscribe params only to read the batchInterval passed. mgr.params = new(subscribeParameters) @@ -114,8 +117,8 @@ func (mgr *FilterManager) startFilterSubLoop() { } } -// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch -// once batchlimit is hit, all filters are subscribed to and new batch is created. +// SubscribeFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch +// once batch-limit is hit, all filters are subscribed to and new batch is created. // if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) { @@ -182,37 +185,102 @@ func (mgr *FilterManager) NetworkChange() { mgr.node.PingPeers() // ping all peers to check if subscriptions are alive } +func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) { + if len(mgr.waitingToSubQueue) > 0 { + for af := range mgr.waitingToSubQueue { + // TODO: change the below logic once topic specific health is implemented for lightClients + if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { + // check if any filter subs are pending and subscribe them + mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } else { + mgr.waitingToSubQueue <- af + } + if len(mgr.waitingToSubQueue) == 0 { + mgr.logger.Debug("no pending subscriptions") + break + } + } + } +} + +func (mgr *FilterManager) closeAndWait(wg *sync.WaitGroup, asub *SubDetails) { + defer wg.Done() + asub.cancel() + for { + env, ok := <-asub.sub.DataCh + if !ok { + mgr.logger.Debug("unsubscribed filter", zap.Strings("content-topics", asub.sub.ContentFilter.ContentTopics.ToList())) + return + } + // process any in-flight envelopes + err := mgr.envProcessor.OnNewEnvelope(env) + if err != nil { + mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err)) + } + } +} + +func (mgr *FilterManager) resubscribeAllSubscriptions() { + filterSubsCount := len(mgr.filterSubscriptions) + mgr.Lock() + mgr.logger.Debug("unsubscribing all filter subscriptions", zap.Int("subs-count", filterSubsCount)) + var wg sync.WaitGroup + wg.Add(len(mgr.filterSubscriptions)) + + for _, asub := range mgr.filterSubscriptions { + go mgr.closeAndWait(&wg, &asub) + } + mgr.filterSubscriptions = make(map[string]SubDetails) + + mgr.Unlock() + + wg.Wait() //Waiting till all unsubs are done to avoid race between sub and unsub + + mgr.logger.Debug("unsubscribed all filter subscriptions", zap.Int("subs-count", filterSubsCount)) + + // locking to protect filterConfigs map, can't lock while calling subscribe as same lock is acquired inside subscribe + mgr.Lock() + localMap := make(appFilterMap) + for filterID, config := range mgr.filterConfigs { + localMap[filterID] = config + } + mgr.Unlock() + + for filterID, config := range localMap { + mgr.SubscribeFilter(filterID, config.contentFilter) + } + +} + // OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa // Note that pubsubTopic specific change can be triggered by specifying pubsubTopic, // if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online. -func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) { +func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool, connType byte) { subs := mgr.node.Subscriptions() mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) + /* + Checking for initialization condition because when filterManager is initialized networkConnType is set to 255 and when first time node goes online + the network conn type will be set and will trigger resubscribe which is not desired. + Change in connType refers to scenario where the localnode's network has changed e.g: a mobile switching between wifi and cellular, + this in-turn means ip address of the localnode has changed. + this can cause issues in filter-push where it never recovers and hence resubscribing all filters + */ + if mgr.networkConnType != initNetworkConnType && + mgr.networkConnType != connType { // + // resubscribe all existing filters + go mgr.resubscribeAllSubscriptions() + } if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online mgr.onlineChecker.SetOnline(newStatus) mgr.NetworkChange() mgr.logger.Debug("switching from offline to online") mgr.Lock() - if len(mgr.waitingToSubQueue) > 0 { - for af := range mgr.waitingToSubQueue { - // TODO: change the below logic once topic specific health is implemented for lightClients - if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { - // check if any filter subs are pending and subscribe them - mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) - go mgr.subscribeAndRunLoop(af) - } else { - mgr.waitingToSubQueue <- af - } - if len(mgr.waitingToSubQueue) == 0 { - mgr.logger.Debug("no pending subscriptions") - break - } - } - } + mgr.checkAndProcessQueue(pubsubTopic) mgr.Unlock() } - + mgr.networkConnType = connType mgr.onlineChecker.SetOnline(newStatus) } diff --git a/waku/v2/api/filter/filter_test.go b/waku/v2/api/filter/filter_test.go index 8a5f2d408..8a720ea66 100644 --- a/waku/v2/api/filter/filter_test.go +++ b/waku/v2/api/filter/filter_test.go @@ -161,9 +161,9 @@ func (s *FilterApiTestSuite) TestFilterManager() { // Mock peers going down s.LightNodeHost.Peerstore().RemovePeer(s.FullNodeHost.ID()) - fm.OnConnectionStatusChange("", false) + fm.OnConnectionStatusChange("", false, 0) time.Sleep(2 * time.Second) - fm.OnConnectionStatusChange("", true) + fm.OnConnectionStatusChange("", true, 0) s.ConnectToFullNode(s.LightNode, s.FullNode) time.Sleep(3 * time.Second) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 8fbcd91c1..c1e762e9b 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -175,7 +175,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea } if !wf.subscriptions.IsSubscribedTo(peerID) { - logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) + logger.Warn("received message push from unknown peer") wf.metrics.RecordError(unknownPeerMessagePush) //Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us if err := stream.Reset(); err != nil {