Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-5216 Tracking latencies and simplification in the metrics code #598

Merged
merged 15 commits into from
Feb 12, 2025
Merged
2 changes: 0 additions & 2 deletions commit/chainfee/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ func (p *processor) Observation(
ChainFeeUpdates: chainFeeUpdates,
TimestampNow: now,
}

p.metricsReporter.TrackChainFeeObservation(obs)
return obs, nil
}

Expand Down
5 changes: 3 additions & 2 deletions commit/chainfee/observation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

plugincommon2 "github.com/smartcontractkit/chainlink-ccip/internal/plugincommon"
"github.com/smartcontractkit/chainlink-ccip/internal/plugintypes"
"github.com/smartcontractkit/chainlink-ccip/mocks/internal_/plugincommon"
reader2 "github.com/smartcontractkit/chainlink-ccip/mocks/internal_/reader"
Expand Down Expand Up @@ -122,7 +123,7 @@ func Test_processor_Observation(t *testing.T) {
ccipReader: ccipReader,
oracleID: oracleID,
homeChain: homeChain,
metricsReporter: NoopMetrics{},
metricsReporter: plugincommon2.NoopReporter{},
}

supportedSet := mapset.NewSet(tc.supportedChains...)
Expand Down Expand Up @@ -316,7 +317,7 @@ func Test_unique_chain_filter_in_Observation(t *testing.T) {
ccipReader: ccipReader,
oracleID: oracleID,
homeChain: homeChain,
metricsReporter: NoopMetrics{},
metricsReporter: plugincommon2.NoopReporter{},
}

supportedSet := mapset.NewSet(tc.supportedChains...)
Expand Down
1 change: 0 additions & 1 deletion commit/chainfee/outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func (p *processor) Outcome(

lggr.Infow("Gas Prices Outcome", "gasPrices", gasPrices)
out := Outcome{GasPrices: gasPrices}
p.metricsReporter.TrackChainFeeOutcome(out)
return out, nil
}

Expand Down
2 changes: 1 addition & 1 deletion commit/chainfee/outcome_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestProcessor_Outcome(t *testing.T) {
RemoteGasPriceBatchWriteFrequency: tt.chainFeeWriteFrequency,
FeeInfo: tt.feeInfo,
},
metricsReporter: NoopMetrics{},
metricsReporter: plugincommon.NoopReporter{},
}

outcome, err := p.Outcome(ctx, Outcome{}, Query{}, tt.aos)
Expand Down
7 changes: 4 additions & 3 deletions commit/chainfee/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type processor struct {
ccipReader readerpkg.CCIPReader
cfg pluginconfig.CommitOffchainConfig
chainSupport plugincommon.ChainSupport
metricsReporter MetricsReporter
metricsReporter plugincommon.MetricsReporter
fRoleDON int
}

Expand All @@ -35,9 +35,9 @@ func NewProcessor(
offChainConfig pluginconfig.CommitOffchainConfig,
chainSupport plugincommon.ChainSupport,
fRoleDON int,
metricsReporter MetricsReporter,
metricsReporter plugincommon.MetricsReporter,
) plugincommon.PluginProcessor[Query, Observation, Outcome] {
return &processor{
p := &processor{
lggr: lggr,
oracleID: oracleID,
destChain: destChain,
Expand All @@ -48,6 +48,7 @@ func NewProcessor(
cfg: offChainConfig,
metricsReporter: metricsReporter,
}
return plugincommon.NewTrackedProcessor(lggr, p, processorLabel, metricsReporter)
}

func (p *processor) Query(ctx context.Context, prevOutcome Outcome) (Query, error) {
Expand Down
46 changes: 29 additions & 17 deletions commit/chainfee/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
)

const (
processorLabel = "chainfee"
gasPricesLabel = "gasPrices"
feeComponentsLabel = "feeComponents"
nativeTokenPricesLabel = "nativeTokenPrices"
chainFeeUpdatesLabel = "chainFeeUpdates"
)

type Query struct {
}

Expand All @@ -17,6 +25,12 @@ type Outcome struct {
GasPrices []cciptypes.GasPriceChain `json:"gasPrices"`
}

func (o Outcome) Stats() map[string]int {
return map[string]int{
gasPricesLabel: len(o.GasPrices),
}
}

type Observation struct {
// FeeComponents: from the source chains, via chain writer
FeeComponents map[cciptypes.ChainSelector]types.ChainFeeComponents `json:"feeComponents"`
Expand All @@ -28,6 +42,21 @@ type Observation struct {
TimestampNow time.Time `json:"timestamp"`
}

func (o Observation) Stats() map[string]int {
return map[string]int{
feeComponentsLabel: len(o.FeeComponents),
nativeTokenPricesLabel: len(o.NativeTokenPrices),
chainFeeUpdatesLabel: len(o.ChainFeeUpdates),
}
}

func (o Observation) IsEmpty() bool {
return len(o.FeeComponents) == 0 &&
len(o.NativeTokenPrices) == 0 &&
len(o.ChainFeeUpdates) == 0 &&
len(o.FChain) == 0 && o.TimestampNow.IsZero()
}

// AggregateObservation is the aggregation of a list of observations
type AggregateObservation struct {
FeeComponents map[cciptypes.ChainSelector][]types.ChainFeeComponents `json:"feeComponents"`
Expand All @@ -46,20 +75,3 @@ type Update struct {
ChainFee ComponentsUSDPrices `json:"chainFee"`
Timestamp time.Time `json:"timestamp"`
}

// MetricsReporter exposes only relevant methods for reporting chain fees from metrics.Reporter
type MetricsReporter interface {
TrackChainFeeObservation(obs Observation)
TrackChainFeeOutcome(outcome Outcome)
}

type NoopMetrics struct{}

func (n NoopMetrics) TrackChainFeeObservation(Observation) {}

func (n NoopMetrics) TrackChainFeeOutcome(Outcome) {}

func (o Observation) IsEmpty() bool {
return len(o.FeeComponents) == 0 && len(o.NativeTokenPrices) == 0 && len(o.ChainFeeUpdates) == 0 &&
len(o.FChain) == 0 && o.TimestampNow.IsZero()
}
1 change: 0 additions & 1 deletion commit/merkleroot/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (p *Processor) Observation(
}
lggr.Infow("sending merkle root processor observation",
"observation", observation, "nextState", nextState, "observationDuration", time.Since(tStart))
p.metricsReporter.TrackMerkleObservation(observation, nextState.String())
return observation, nil
}

Expand Down
1 change: 0 additions & 1 deletion commit/merkleroot/outcome.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (p *Processor) Outcome(
return Outcome{}, err
}

p.metricsReporter.TrackMerkleOutcome(outcome, nextState.String())
lggr.Infow("Sending Outcome",
"outcome", outcome, "nextState", nextState, "outcomeDuration", time.Since(tStart))
return outcome, nil
Expand Down
5 changes: 3 additions & 2 deletions commit/merkleroot/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewProcessor(
rmnCrypto cciptypes.RMNCrypto,
rmnHomeReader readerpkg.RMNHome,
metricsReporter MetricsReporter,
) *Processor {
) plugincommon.PluginProcessor[Query, Observation, Outcome] {
var observer Observer
baseObserver := newObserverImpl(
lggr,
Expand All @@ -75,7 +75,7 @@ func NewProcessor(
observer = baseObserver
}

return &Processor{
p := &Processor{
oracleID: oracleID,
oracleIDToP2pID: oracleIDToP2pID,
offchainCfg: offchainCfg,
Expand All @@ -90,6 +90,7 @@ func NewProcessor(
rmnHomeReader: rmnHomeReader,
metricsReporter: metricsReporter,
}
return plugincommon.NewTrackedProcessor(lggr, p, processorLabel, metricsReporter)
}

var _ plugincommon.PluginProcessor[Query, Observation, Outcome] = &Processor{}
Expand Down
41 changes: 36 additions & 5 deletions commit/merkleroot/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package merkleroot

import (
"sort"
"time"

"github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn"
rmntypes "github.com/smartcontractkit/chainlink-ccip/commit/merkleroot/rmn/types"
Expand All @@ -10,6 +11,13 @@ import (
cciptypes "github.com/smartcontractkit/chainlink-ccip/pkg/types/ccipocr3"
)

const (
processorLabel = "merkleroot"
rootsLabel = "roots"
messagesLabel = "messages"
rmnSignatureLabel = "rmnSignatures"
Comment on lines +15 to +18
Copy link
Contributor

@0xnogo 0xnogo Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the rootsLabel, messagesLabel, rmnSignatureLabel under the processorLabel? Meaning all the 3 labels will be dimensions of merkleroot metric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the question (btw these labels were there before, I'm only moving them from one place to another)

)

type Query struct {
RetryRMNSignatures bool
RMNSignatures *rmn.ReportSignatures
Expand All @@ -24,6 +32,17 @@ type Observation struct {
FChain map[cciptypes.ChainSelector]int `json:"fChain"`
}

func (o Observation) Stats() map[string]int {
counts := map[string]int{
rootsLabel: len(o.MerkleRoots),
messagesLabel: 0,
}
for _, root := range o.MerkleRoots {
counts[messagesLabel] += root.SeqNumsRange.Length()
}
return counts
}

func (o Observation) IsEmpty() bool {
return len(o.MerkleRoots) == 0 &&
len(o.OnRampMaxSeqNums) == 0 &&
Expand Down Expand Up @@ -147,6 +166,18 @@ type Outcome struct {
RMNRemoteCfg rmntypes.RemoteConfig `json:"rmnRemoteCfg"`
}

func (o Outcome) Stats() map[string]int {
counts := map[string]int{
rootsLabel: len(o.RootsToReport),
rmnSignatureLabel: len(o.RMNReportSignatures),
messagesLabel: 0,
}
for _, root := range o.RootsToReport {
counts[messagesLabel] += root.SeqNumsRange.Length()
}
return counts
}

// Sort all fields of the given Outcome
func (o *Outcome) Sort() {
sort.Slice(o.RangesSelectedForReport, func(i, j int) bool {
Expand Down Expand Up @@ -210,15 +241,15 @@ func (p processorState) String() string {

// MetricsReporter exposes only relevant methods for reporting merkle roots from metrics.Reporter
type MetricsReporter interface {
TrackMerkleObservation(obs Observation, state string)
TrackMerkleOutcome(outcome Outcome, state string)
TrackRmnReport(latency float64, success bool)
TrackProcessorLatency(processor string, method string, latency time.Duration, err error)
TrackProcessorOutput(processor string, method plugincommon.MethodType, obs plugintypes.Trackable)
}

type NoopMetrics struct{}

func (n NoopMetrics) TrackMerkleObservation(Observation, string) {}
func (n NoopMetrics) TrackRmnReport(float64, bool) {}

func (n NoopMetrics) TrackMerkleOutcome(Outcome, string) {}
func (n NoopMetrics) TrackProcessorLatency(string, string, time.Duration, error) {}

func (n NoopMetrics) TrackRmnReport(float64, bool) {}
func (n NoopMetrics) TrackProcessorOutput(string, plugincommon.MethodType, plugintypes.Trackable) {}
Loading
Loading