Skip to content

Commit

Permalink
feat (telemetry) Handling synchronization when registering prometheus…
Browse files Browse the repository at this point in the history
… metrics (#113)

* Handling synchronization when registering prometheus metrics

* use sync.Map and lint

* fix linting

* use RWMap

* Using rwmap to cache metrics

* refactor

* Use lock for registration

* noop

* renaming

* renaming

* renaming

---------

Co-authored-by: Gordon <[email protected]>
  • Loading branch information
gordonbear and Gordon authored Jul 16, 2024
1 parent 221eb3c commit ea18f90
Showing 1 changed file with 136 additions and 92 deletions.
228 changes: 136 additions & 92 deletions telemetry/prometheus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package prometheus
import (
"fmt"
"strings"
"sync"
"time"
"unicode"

"github.com/berachain/offchain-sdk/tools/rwstore"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -27,10 +29,13 @@ const (
type metrics struct {
cfg *Config

gaugeVecs map[string]*prometheus.GaugeVec
counterVecs map[string]*prometheus.CounterVec
histogramVecs map[string]*prometheus.HistogramVec
summaryVecs map[string]*prometheus.SummaryVec
gaugeVecs *rwstore.RWMap[string, *prometheus.GaugeVec]
counterVecs *rwstore.RWMap[string, *prometheus.CounterVec]
histogramVecs *rwstore.RWMap[string, *prometheus.HistogramVec]
summaryVecs *rwstore.RWMap[string, *prometheus.SummaryVec]

// Ensures thread-safe registration of metric vectors.
metricsRegistrationLock sync.Mutex
}

// NewMetrics initializes a new instance of Prometheus metrics.
Expand All @@ -45,10 +50,10 @@ func NewMetrics(cfg *Config) (*metrics, error) { //nolint:revive // only used as
return p, nil
}

p.gaugeVecs = make(map[string]*prometheus.GaugeVec, initialVecCapacity)
p.counterVecs = make(map[string]*prometheus.CounterVec, initialVecCapacity)
p.histogramVecs = make(map[string]*prometheus.HistogramVec, initialVecCapacity)
p.summaryVecs = make(map[string]*prometheus.SummaryVec, initialVecCapacity)
p.gaugeVecs = rwstore.NewRWMap[string, *prometheus.GaugeVec]()
p.counterVecs = rwstore.NewRWMap[string, *prometheus.CounterVec]()
p.histogramVecs = rwstore.NewRWMap[string, *prometheus.HistogramVec]()
p.summaryVecs = rwstore.NewRWMap[string, *prometheus.SummaryVec]()
return p, nil
}

Expand All @@ -64,17 +69,8 @@ func (p *metrics) Gauge(name string, value float64, _ float64, tags ...string) {

name = forceValidName(name)
labels, labelValues := parseTagsToLabelPairs(tags)
gaugeVec, exists := p.gaugeVecs[name]
if !exists {
gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: name,
Namespace: p.cfg.Namespace,
Subsystem: p.cfg.Subsystem,
Help: name + " gauge",
}, labels)
prometheus.MustRegister(gaugeVec)
p.gaugeVecs[name] = gaugeVec
}

gaugeVec := p.getOrRegisterNewGagueVec(name, labels)
gaugeVec.WithLabelValues(labelValues...).Set(value)
}

Expand All @@ -86,17 +82,8 @@ func (p *metrics) Incr(name string, tags ...string) {

name = forceValidName(name)
labels, labelValues := parseTagsToLabelPairs(tags)
gaugeVec, exists := p.gaugeVecs[name]
if !exists {
gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: name,
Namespace: p.cfg.Namespace,
Subsystem: p.cfg.Subsystem,
Help: name + " incr/decr gauge",
}, labels)
prometheus.MustRegister(gaugeVec)
p.gaugeVecs[name] = gaugeVec
}

gaugeVec := p.getOrRegisterNewGagueVec(name, labels)
gaugeVec.WithLabelValues(labelValues...).Inc()
}

Expand All @@ -108,17 +95,8 @@ func (p *metrics) Decr(name string, tags ...string) {

name = forceValidName(name)
labels, labelValues := parseTagsToLabelPairs(tags)
gaugeVec, exists := p.gaugeVecs[name]
if !exists {
gaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: name,
Namespace: p.cfg.Namespace,
Subsystem: p.cfg.Subsystem,
Help: name + " incr/decr gauge",
}, labels)
prometheus.MustRegister(gaugeVec)
p.gaugeVecs[name] = gaugeVec
}

gaugeVec := p.getOrRegisterNewGagueVec(name, labels)
gaugeVec.WithLabelValues(labelValues...).Dec()
}

Expand All @@ -130,17 +108,8 @@ func (p *metrics) Count(name string, value int64, tags ...string) {

name = forceValidName(name)
labels, labelValues := parseTagsToLabelPairs(tags)
counterVec, exists := p.counterVecs[name]
if !exists {
counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: name,
Namespace: p.cfg.Namespace,
Subsystem: p.cfg.Subsystem,
Help: name + " counter",
}, labels)
prometheus.MustRegister(counterVec)
p.counterVecs[name] = counterVec
}

counterVec := p.getOrRegisterNewCounterVec(name, labels)
counterVec.WithLabelValues(labelValues...).Add(float64(value))
}

Expand All @@ -152,17 +121,8 @@ func (p *metrics) IncMonotonic(name string, tags ...string) {

name = forceValidName(name)
labels, labelValues := parseTagsToLabelPairs(tags)
counterVec, exists := p.counterVecs[name]
if !exists {
counterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: name,
Namespace: p.cfg.Namespace,
Subsystem: p.cfg.Subsystem,
Help: name + " counter",
}, labels)
prometheus.MustRegister(counterVec)
p.counterVecs[name] = counterVec
}

counterVec := p.getOrRegisterNewCounterVec(name, labels)
counterVec.WithLabelValues(labelValues...).Inc()
}

Expand All @@ -181,19 +141,33 @@ func (p *metrics) Histogram(name string, value float64, rate float64, tags ...st

name = forceValidName(name)
labels, labelValues := parseTagsToLabelPairs(tags)
histogramVec, exists := p.histogramVecs[name]
if !exists {
histogramVec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: name,
Namespace: p.cfg.Namespace,
Subsystem: p.cfg.Subsystem,
Help: name + " histogram",
// The maximum covered stats range is rate * HistogramBucketCount
Buckets: prometheus.LinearBuckets(0, rate, p.cfg.HistogramBucketCount),
}, labels)
prometheus.MustRegister(histogramVec)
p.histogramVecs[name] = histogramVec

if histogramVec, exists := p.histogramVecs.Get(name); exists {
histogramVec.WithLabelValues(labelValues...).Observe(value)
return
}

p.metricsRegistrationLock.Lock()
defer p.metricsRegistrationLock.Unlock()

// Double-check in case metrics was registered while waiting for the lock.
if histogramVec, exists := p.histogramVecs.Get(name); exists {
histogramVec.WithLabelValues(labelValues...).Observe(value)
return
}

histogramVec := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: name,
Namespace: p.cfg.Namespace,
Subsystem: p.cfg.Subsystem,
Help: name + " histogram",
// The maximum covered stats range is rate * HistogramBucketCount
Buckets: prometheus.LinearBuckets(0, rate, p.cfg.HistogramBucketCount),
}, labels)

prometheus.MustRegister(histogramVec)
p.histogramVecs.Set(name, histogramVec)

histogramVec.WithLabelValues(labelValues...).Observe(value)
}

Expand All @@ -206,25 +180,38 @@ func (p *metrics) Time(name string, value time.Duration, tags ...string) {

name = forceValidName(name)
labels, labelValues := parseTagsToLabelPairs(tags)
summaryVec, exists := p.summaryVecs[name]
if !exists {
summaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: name,
Namespace: p.cfg.Namespace,
Subsystem: p.cfg.Subsystem,
Help: name + " timing summary",
Objectives: map[float64]float64{
quantile50: errorMargin50,
quantile90: errorMargin90,
quantile99: errorMargin99,
},
}, labels)
prometheus.MustRegister(summaryVec)
p.summaryVecs[name] = summaryVec

if summaryVec, exists := p.summaryVecs.Get(name); exists {
// Convert time.Duration to seconds since Prometheus prefers base units
// see https://prometheus.io/docs/practices/naming/#base-units
summaryVec.WithLabelValues(labels...).Observe(value.Seconds())
return
}

p.metricsRegistrationLock.Lock()
defer p.metricsRegistrationLock.Unlock()

// Double-check in case metrics was registered while waiting for the lock.
if summaryVec, exists := p.summaryVecs.Get(name); exists {
summaryVec.WithLabelValues(labels...).Observe(value.Seconds())
return
}

// Convert time.Duration to seconds since Prometheus prefers base units
// see https://prometheus.io/docs/practices/naming/#base-units
summaryVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: name,
Namespace: p.cfg.Namespace,
Subsystem: p.cfg.Subsystem,
Help: name + " timing summary",
Objectives: map[float64]float64{
quantile50: errorMargin50,
quantile90: errorMargin90,
quantile99: errorMargin99,
},
}, labels)

prometheus.MustRegister(summaryVec)
p.summaryVecs.Set(name, summaryVec)

summaryVec.WithLabelValues(labelValues...).Observe(value.Seconds())
}

Expand Down Expand Up @@ -279,3 +266,60 @@ func setDefaultCfg(cfg *Config) {
cfg.HistogramBucketCount = DefaultBucketCount
}
}

// Helper method to get or register a GaugeVec.
func (p *metrics) getOrRegisterNewGagueVec(name string, labels []string) *prometheus.GaugeVec {
// Attempt to read from the RWMap without metricsRegistrationLock.
if gaugeVec, exists := p.gaugeVecs.Get(name); exists {
return gaugeVec
}

p.metricsRegistrationLock.Lock()
defer p.metricsRegistrationLock.Unlock()

// Double-check in case it was created while waiting for the lock.
if gaugeVec, exists := p.gaugeVecs.Get(name); exists {
return gaugeVec
}

// Create a new GaugeVec and register it
gaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: name,
Namespace: p.cfg.Namespace,
Subsystem: p.cfg.Subsystem,
Help: name + " gauge",
}, labels)

prometheus.MustRegister(gaugeVec)
p.gaugeVecs.Set(name, gaugeVec)
return gaugeVec
}

// Helper method to get or register a CounterVec.
func (p *metrics) getOrRegisterNewCounterVec(name string, labels []string) *prometheus.CounterVec {
// Attempt to read from the RWMap without metricsRegistrationLock.
if counterVec, exists := p.counterVecs.Get(name); exists {
return counterVec
}

p.metricsRegistrationLock.Lock()
defer p.metricsRegistrationLock.Unlock()

// Double-check in case it was created while waiting for the lock.
if counterVec, exists := p.counterVecs.Get(name); exists {
return counterVec
}

// Create a new CounterVec and register it
counterVec := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: name,
Namespace: p.cfg.Namespace,
Subsystem: p.cfg.Subsystem,
Help: name + " counter",
}, labels)

// Register the CounterVec or get the already registered one.
prometheus.MustRegister(counterVec)
p.counterVecs.Set(name, counterVec)
return counterVec
}

0 comments on commit ea18f90

Please sign in to comment.