Skip to content

Commit

Permalink
refactoring of init
Browse files Browse the repository at this point in the history
  • Loading branch information
dimkouv committed Feb 12, 2025
1 parent 27c420b commit fbc34c4
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 34 deletions.
29 changes: 8 additions & 21 deletions commit/merkleroot/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,8 @@ type Observer interface {
// asyncObserver is an Observer implementation that fetches the data asynchronously.
type asyncObserver struct {
lggr logger.Logger
observer Observer
syncObserver observerImpl
cf context.CancelFunc
useSyncCalls bool // Will be true if the provided syncTimeout is 0, useful for testing
mu *sync.RWMutex
offRampNextSeqNums []plugintypes.SeqNumChain
onRampLatestSeqNums []plugintypes.SeqNumChain
Expand All @@ -326,24 +325,18 @@ type asyncObserver struct {
// newAsyncObserver creates a new asyncObserver.
// It fetches the data from the base observer asynchronously and caches the results.
// It fetches the data every tickDur and uses a timeout of syncTimeout to kill long RPC calls.
func newAsyncObserver(lggr logger.Logger, observer Observer, tickDur, syncTimeout time.Duration) *asyncObserver {
func newAsyncObserver(lggr logger.Logger, observer observerImpl, tickDur, syncTimeout time.Duration) *asyncObserver {
ctx, cf := context.WithCancel(context.Background())

o := &asyncObserver{
lggr: lggr,
observer: observer,
syncObserver: observer,
cf: cf,
mu: &sync.RWMutex{},
offRampNextSeqNums: make([]plugintypes.SeqNumChain, 0),
onRampLatestSeqNums: make([]plugintypes.SeqNumChain, 0),
}

if tickDur == 0 {
o.lggr.Debugw("using sync calls for async observer, tickDur set to 0")
o.useSyncCalls = true
return o
}

ticker := time.NewTicker(tickDur)
lggr.Debugw("async observer started", "tickDur", tickDur, "syncTimeout", syncTimeout)
o.start(ctx, ticker.C, syncTimeout)
Expand Down Expand Up @@ -374,13 +367,13 @@ func (o *asyncObserver) sync(ctx context.Context, syncTimeout time.Duration) {
{
id: "offRampNextSeqNums",
op: func(ctx context.Context) {
o.offRampNextSeqNums = o.observer.ObserveOffRampNextSeqNums(ctxSync)
o.offRampNextSeqNums = o.syncObserver.ObserveOffRampNextSeqNums(ctxSync)
},
},
{
id: "onRampLatestSeqNums",
op: func(ctx context.Context) {
o.onRampLatestSeqNums = o.observer.ObserveLatestOnRampSeqNums(ctxSync)
o.onRampLatestSeqNums = o.syncObserver.ObserveLatestOnRampSeqNums(ctxSync)
},
},
}
Expand Down Expand Up @@ -410,9 +403,6 @@ func (o *asyncObserver) applySyncOp(
// ObserveOffRampNextSeqNums observes the next sequence numbers for each source chain from the OffRamp.
// Values are fetched from observers state which are fetched async.
func (o *asyncObserver) ObserveOffRampNextSeqNums(_ context.Context) []plugintypes.SeqNumChain {
if o.useSyncCalls {
return o.observer.ObserveOffRampNextSeqNums(context.Background())
}
o.mu.RLock()
defer o.mu.RUnlock()
return o.offRampNextSeqNums
Expand All @@ -421,9 +411,6 @@ func (o *asyncObserver) ObserveOffRampNextSeqNums(_ context.Context) []plugintyp
// ObserveLatestOnRampSeqNums observes the latest onRamp sequence numbers for each configured source chain.
// Values are fetched from observers state which are fetched async.
func (o *asyncObserver) ObserveLatestOnRampSeqNums(_ context.Context) []plugintypes.SeqNumChain {
if o.useSyncCalls {
return o.observer.ObserveLatestOnRampSeqNums(context.Background())
}
o.mu.RLock()
defer o.mu.RUnlock()
return o.onRampLatestSeqNums
Expand All @@ -433,17 +420,17 @@ func (o *asyncObserver) ObserveLatestOnRampSeqNums(_ context.Context) []pluginty
// It directly calls the base observer since this values cannot be known in advance.
func (o *asyncObserver) ObserveMerkleRoots(
ctx context.Context, ranges []plugintypes.ChainRange) []cciptypes.MerkleRootChain {
return o.observer.ObserveMerkleRoots(ctx, ranges)
return o.syncObserver.ObserveMerkleRoots(ctx, ranges)
}

// ObserveRMNRemoteCfg observes the RMN Remote Config by directly calling the base observer since this value is cached.
func (o *asyncObserver) ObserveRMNRemoteCfg(ctx context.Context) rmntypes.RemoteConfig {
return o.observer.ObserveRMNRemoteCfg(ctx)
return o.syncObserver.ObserveRMNRemoteCfg(ctx)
}

// ObserveFChain observes the FChain by directly calling the base observer since this value is cached.
func (o *asyncObserver) ObserveFChain(ctx context.Context) map[cciptypes.ChainSelector]int {
return o.observer.ObserveFChain(ctx)
return o.syncObserver.ObserveFChain(ctx)
}

// Close closes the observer and releases any resources.
Expand Down
34 changes: 21 additions & 13 deletions commit/merkleroot/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,33 @@ func NewProcessor(
rmnHomeReader readerpkg.RMNHome,
metricsReporter MetricsReporter,
) *Processor {
var observer Observer
baseObserver := newObserverImpl(
lggr,
homeChain,
oracleID,
chainSupport,
ccipReader,
msgHasher,
)
if !offchainCfg.MerkleRootAsyncObserverDisabled {
observer = newAsyncObserver(
lggr,
baseObserver,
offchainCfg.MerkleRootAsyncObserverSyncFreq,
offchainCfg.MerkleRootAsyncObserverSyncTimeout,
)
} else {
observer = baseObserver
}

return &Processor{
oracleID: oracleID,
oracleIDToP2pID: oracleIDToP2pID,
offchainCfg: offchainCfg,
destChain: destChain,
lggr: lggr,
observer: newAsyncObserver(
lggr,
newObserverImpl(
lggr,
homeChain,
oracleID,
chainSupport,
ccipReader,
msgHasher,
),
offchainCfg.MerkleRootAsyncObserverSyncFreq,
offchainCfg.MerkleRootAsyncObserverSyncTimeout,
),
observer: observer,
ccipReader: ccipReader,
reportingCfg: reportingCfg,
chainSupport: chainSupport,
Expand Down

0 comments on commit fbc34c4

Please sign in to comment.