-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetrics.go
236 lines (204 loc) · 6.26 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
package kafka
import (
"fmt"
"strings"
"github.com/Shopify/sarama"
metrics "github.com/rcrowley/go-metrics"
)
// Metric Types:
const (
meterMetricType = `meter`
histoMetricType = `histogram`
)
// Metric Measurement Names:
const (
// Global Measurements
MetricCount = `count`
// Metere Measurements
MetricOneRate = `1m.rate`
MetricFiveRate = `5m.rate`
MetricFifteenRate = `15m.rate`
MetricMeanRate = `mean.rate`
// Histogram Measurements
MetricMin = `min`
MetricMax = `max`
MetricSeventyFive = `75%`
MetricNinetyNine = `99%`
)
type metricType map[string]bool
var meterMetric = metricType{meterMetricType: true}
var histoMetric = metricType{histoMetricType: true}
// RawMetric contains the raw metric measurements and values.
type RawMetric struct {
Measurement string
Values map[string]interface{}
Type metricType
}
// Update updates the RawMetric with current values from a metrics registry.
func (m *RawMetric) Update(r *metrics.Registry) {
all := *r
allMetrics := all.GetAll()
m.Values = allMetrics[m.Measurement]
m.getMetricType()
}
func (m *RawMetric) getMetricType() {
if m.Type == nil {
switch {
case m.Measurement == "" || m.Values == nil:
Warnf("metric not initialized!")
break
default:
m.Type = make(map[string]bool, 1)
for k := range m.Values {
switch {
case strings.Contains(k, "rate"):
m.Type = meterMetric
break
case !strings.Contains(k, "rate") && !strings.Contains(k, "count"):
m.Type = histoMetric
break
}
}
}
}
}
// ConvertToMetric converts the RawMetric and returns a Metric.
func (m *RawMetric) ConvertToMetric() *Metric {
var KM Metric
switch {
case m.Type[meterMetricType]:
KM = &MeterMetric{
Measurement: m.Measurement,
Type: meterMetricType,
Count: m.Values[MetricCount].(int64),
OneRate: m.Values[MetricOneRate].(float64),
FiveRate: m.Values[MetricFiveRate].(float64),
FifteenRate: m.Values[MetricFifteenRate].(float64),
MeanRate: m.Values[MetricMeanRate].(float64),
}
case m.Type[histoMetricType]:
KM = &HistoMetric{
Measurement: m.Measurement,
Type: histoMetricType,
Count: m.Values[MetricCount].(int64),
Min: m.Values[MetricMin].(int64),
Max: m.Values[MetricMax].(int64),
SeventyFive: m.Values[MetricSeventyFive].(float64),
NinetyNine: m.Values[MetricNinetyNine].(float64),
}
}
return &KM
}
// MetricCollection contains a collection of Metrics.
type MetricCollection struct {
Meters []MeterMetric
Histograms []HistoMetric
}
// MeterCount returns the number of meters currently in the collection.
func (mc *MetricCollection) MeterCount() int {
return len(mc.Meters)
}
// HistoCount returns the number of meters currently in the collection.
func (mc *MetricCollection) HistoCount() int {
return len(mc.Histograms)
}
// AddFromRaw recieves a single or collection of RawMetric types and appends to its appropriate collection type.
func (mc *MetricCollection) AddFromRaw(rawMetrics ...*RawMetric) {
for _, raw := range rawMetrics {
mc.Add(raw.ConvertToMetric())
}
}
// Add recieves a Metric and appends it to its appropriate collection type.
func (mc *MetricCollection) Add(metrics ...*Metric) {
for _, metric := range metrics {
m := *metric
switch {
case m.IsMeter():
mc.Meters = append(mc.Meters, *m.(*MeterMetric))
case m.IsHisto():
mc.Histograms = append(mc.Histograms, *m.(*HistoMetric))
}
}
}
// Metric represents a metric measurement from Kafka.
type Metric interface {
GetType() string
IsMeter() bool
IsHisto() bool
}
// MeterMetric contains Meter values.
type MeterMetric struct {
Measurement string
Type string
Count int64
OneRate float64
FiveRate float64
FifteenRate float64
MeanRate float64
}
// GetType returns the metric type.
func (m *MeterMetric) GetType() string {
return m.Type
}
// IsMeter returns true if the metric type is a meter.
func (m *MeterMetric) IsMeter() bool {
return m.Type == meterMetricType
}
// IsHisto returns true if the metric type is a histogram.
func (m *MeterMetric) IsHisto() bool {
return m.Type == histoMetricType
}
// HistoMetric contains Histogram values.
type HistoMetric struct {
Measurement string
Type string
Count int64
Min int64
Max int64
SeventyFive float64
NinetyNine float64
}
// GetType returns the metric type.
func (m *HistoMetric) GetType() string {
return m.Type
}
// IsMeter returns true if the metric type is a meter.
func (m *HistoMetric) IsMeter() bool {
return m.Type == meterMetricType
}
// IsHisto returns true if the metric type is a histogram.
func (m *HistoMetric) IsHisto() bool {
return m.Type == histoMetricType
}
// From Sarama GoDocs:
const (
metricsReservoirSize = 1028
metricsAlphaFactor = 0.015
)
func getOrRegisterHistogram(name string, r metrics.Registry) metrics.Histogram {
return r.GetOrRegister(name, func() metrics.Histogram {
return metrics.NewHistogram(metrics.NewExpDecaySample(metricsReservoirSize, metricsAlphaFactor))
}).(metrics.Histogram)
}
func getMetricNameForBroker(name string, broker *sarama.Broker) string {
// Use broker id like the Java client as it does not contain '.' or ':' characters that
// can be interpreted as special character by monitoring tool (e.g. Graphite)
return fmt.Sprintf(name+"-for-broker-%d", broker.ID())
}
func getOrRegisterBrokerMeter(name string, broker *sarama.Broker, r metrics.Registry) metrics.Meter {
return metrics.GetOrRegisterMeter(getMetricNameForBroker(name, broker), r)
}
func getOrRegisterBrokerHistogram(name string, broker *sarama.Broker, r metrics.Registry) metrics.Histogram {
return getOrRegisterHistogram(getMetricNameForBroker(name, broker), r)
}
func getMetricNameForTopic(name string, topic string) string {
// Convert dot to _ since reporters like Graphite typically use dot to represent hierarchy
// cf. KAFKA-1902 and KAFKA-2337
return fmt.Sprintf(name+"-for-topic-%s", strings.Replace(topic, ".", "_", -1))
}
func getOrRegisterTopicMeter(name string, topic string, r metrics.Registry) metrics.Meter {
return metrics.GetOrRegisterMeter(getMetricNameForTopic(name, topic), r)
}
func getOrRegisterTopicHistogram(name string, topic string, r metrics.Registry) metrics.Histogram {
return getOrRegisterHistogram(getMetricNameForTopic(name, topic), r)
}