From fbc34c486ffb180812d7bef85b1ea7e377aeb403 Mon Sep 17 00:00:00 2001 From: dimkouv Date: Wed, 12 Feb 2025 14:15:49 +0200 Subject: [PATCH] refactoring of init --- commit/merkleroot/observation.go | 29 ++++++++------------------- commit/merkleroot/processor.go | 34 ++++++++++++++++++++------------ 2 files changed, 29 insertions(+), 34 deletions(-) diff --git a/commit/merkleroot/observation.go b/commit/merkleroot/observation.go index 03001f2e2..4c0bb8744 100644 --- a/commit/merkleroot/observation.go +++ b/commit/merkleroot/observation.go @@ -315,9 +315,8 @@ type Observer interface { // asyncObserver is an Observer implementation that fetches the data asynchronously. type asyncObserver struct { lggr logger.Logger - observer Observer + syncObserver observerImpl cf context.CancelFunc - useSyncCalls bool // Will be true if the provided syncTimeout is 0, useful for testing mu *sync.RWMutex offRampNextSeqNums []plugintypes.SeqNumChain onRampLatestSeqNums []plugintypes.SeqNumChain @@ -326,24 +325,18 @@ type asyncObserver struct { // newAsyncObserver creates a new asyncObserver. // It fetches the data from the base observer asynchronously and caches the results. // It fetches the data every tickDur and uses a timeout of syncTimeout to kill long RPC calls. -func newAsyncObserver(lggr logger.Logger, observer Observer, tickDur, syncTimeout time.Duration) *asyncObserver { +func newAsyncObserver(lggr logger.Logger, observer observerImpl, tickDur, syncTimeout time.Duration) *asyncObserver { ctx, cf := context.WithCancel(context.Background()) o := &asyncObserver{ lggr: lggr, - observer: observer, + syncObserver: observer, cf: cf, mu: &sync.RWMutex{}, offRampNextSeqNums: make([]plugintypes.SeqNumChain, 0), onRampLatestSeqNums: make([]plugintypes.SeqNumChain, 0), } - if tickDur == 0 { - o.lggr.Debugw("using sync calls for async observer, tickDur set to 0") - o.useSyncCalls = true - return o - } - ticker := time.NewTicker(tickDur) lggr.Debugw("async observer started", "tickDur", tickDur, "syncTimeout", syncTimeout) o.start(ctx, ticker.C, syncTimeout) @@ -374,13 +367,13 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) { { id: "offRampNextSeqNums", op: func(ctx context.Context) { - o.offRampNextSeqNums = o.observer.ObserveOffRampNextSeqNums(ctxSync) + o.offRampNextSeqNums = o.syncObserver.ObserveOffRampNextSeqNums(ctxSync) }, }, { id: "onRampLatestSeqNums", op: func(ctx context.Context) { - o.onRampLatestSeqNums = o.observer.ObserveLatestOnRampSeqNums(ctxSync) + o.onRampLatestSeqNums = o.syncObserver.ObserveLatestOnRampSeqNums(ctxSync) }, }, } @@ -410,9 +403,6 @@ func (o *asyncObserver) applySyncOp( // ObserveOffRampNextSeqNums observes the next sequence numbers for each source chain from the OffRamp. // Values are fetched from observers state which are fetched async. func (o *asyncObserver) ObserveOffRampNextSeqNums(_ context.Context) []plugintypes.SeqNumChain { - if o.useSyncCalls { - return o.observer.ObserveOffRampNextSeqNums(context.Background()) - } o.mu.RLock() defer o.mu.RUnlock() return o.offRampNextSeqNums @@ -421,9 +411,6 @@ func (o *asyncObserver) ObserveOffRampNextSeqNums(_ context.Context) []plugintyp // ObserveLatestOnRampSeqNums observes the latest onRamp sequence numbers for each configured source chain. // Values are fetched from observers state which are fetched async. func (o *asyncObserver) ObserveLatestOnRampSeqNums(_ context.Context) []plugintypes.SeqNumChain { - if o.useSyncCalls { - return o.observer.ObserveLatestOnRampSeqNums(context.Background()) - } o.mu.RLock() defer o.mu.RUnlock() return o.onRampLatestSeqNums @@ -433,17 +420,17 @@ func (o *asyncObserver) ObserveLatestOnRampSeqNums(_ context.Context) []pluginty // It directly calls the base observer since this values cannot be known in advance. func (o *asyncObserver) ObserveMerkleRoots( ctx context.Context, ranges []plugintypes.ChainRange) []cciptypes.MerkleRootChain { - return o.observer.ObserveMerkleRoots(ctx, ranges) + return o.syncObserver.ObserveMerkleRoots(ctx, ranges) } // ObserveRMNRemoteCfg observes the RMN Remote Config by directly calling the base observer since this value is cached. func (o *asyncObserver) ObserveRMNRemoteCfg(ctx context.Context) rmntypes.RemoteConfig { - return o.observer.ObserveRMNRemoteCfg(ctx) + return o.syncObserver.ObserveRMNRemoteCfg(ctx) } // ObserveFChain observes the FChain by directly calling the base observer since this value is cached. func (o *asyncObserver) ObserveFChain(ctx context.Context) map[cciptypes.ChainSelector]int { - return o.observer.ObserveFChain(ctx) + return o.syncObserver.ObserveFChain(ctx) } // Close closes the observer and releases any resources. diff --git a/commit/merkleroot/processor.go b/commit/merkleroot/processor.go index 02246bebf..ed11450a0 100644 --- a/commit/merkleroot/processor.go +++ b/commit/merkleroot/processor.go @@ -55,25 +55,33 @@ func NewProcessor( rmnHomeReader readerpkg.RMNHome, metricsReporter MetricsReporter, ) *Processor { + var observer Observer + baseObserver := newObserverImpl( + lggr, + homeChain, + oracleID, + chainSupport, + ccipReader, + msgHasher, + ) + if !offchainCfg.MerkleRootAsyncObserverDisabled { + observer = newAsyncObserver( + lggr, + baseObserver, + offchainCfg.MerkleRootAsyncObserverSyncFreq, + offchainCfg.MerkleRootAsyncObserverSyncTimeout, + ) + } else { + observer = baseObserver + } + return &Processor{ oracleID: oracleID, oracleIDToP2pID: oracleIDToP2pID, offchainCfg: offchainCfg, destChain: destChain, lggr: lggr, - observer: newAsyncObserver( - lggr, - newObserverImpl( - lggr, - homeChain, - oracleID, - chainSupport, - ccipReader, - msgHasher, - ), - offchainCfg.MerkleRootAsyncObserverSyncFreq, - offchainCfg.MerkleRootAsyncObserverSyncTimeout, - ), + observer: observer, ccipReader: ccipReader, reportingCfg: reportingCfg, chainSupport: chainSupport,