Skip to content

Commit

Permalink
feat: Whitelist metrics scraped by self-monitor (#982)
Browse files Browse the repository at this point in the history
  • Loading branch information
Stanislav Khalash authored Apr 18, 2024
1 parent 2e5d730 commit 7729fb5
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 54 deletions.
3 changes: 1 addition & 2 deletions internal/reconciler/telemetry/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/kyma-project/telemetry-manager/internal/k8sutils"
"github.com/kyma-project/telemetry-manager/internal/overrides"
"github.com/kyma-project/telemetry-manager/internal/resources/selfmonitor"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/config"
"github.com/kyma-project/telemetry-manager/internal/webhookcert"
)
Expand Down Expand Up @@ -158,7 +157,7 @@ func (r *Reconciler) reconcileSelfMonitor(ctx context.Context, telemetry operato
return fmt.Errorf("failed to marshal selfmonitor config: %w", err)
}

rules := alertrules.MakeRules()
rules := config.MakeRules()
rulesYAML, err := yaml.Marshal(rules)
if err != nil {
return fmt.Errorf("failed to marshal rules: %w", err)
Expand Down
27 changes: 26 additions & 1 deletion internal/selfmonitor/config/config_builder.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"strings"
"time"
)

Expand Down Expand Up @@ -93,7 +94,7 @@ func makeScrapeConfig(scrapeNamespace string) []ScrapeConfig {
{
SourceLabels: []string{"__name__"},
Action: Keep,
Regex: "(otelcol_.*|fluentbit_.*|telemetry_.*)",
Regex: srapableMetricsRegex(),
},
// The following relabel configs add an artificial pipeline_name label to the Fluent Bit and OTel Collector metrics to simplify pipeline matching
// For Fluent Bit metrics, the pipeline_name is based on the name label. Note that a regex group matching Kubernetes resource names (alphanumerical chars and hyphens) is used to extract the pipeline name.
Expand All @@ -119,3 +120,27 @@ func makeScrapeConfig(scrapeNamespace string) []ScrapeConfig {
},
}
}

func srapableMetricsRegex() string {
fluentBitMetrics := []string{
metricFluentBitOutputProcBytesTotal,
metricFluentBitOutputDroppedRecordsTotal,
metricFluentBitInputBytesTotal,
metricFluentBitBufferUsageBytes,
}

otelCollectorMetrics := []string{
metricOtelCollectorExporterSent,
metricOtelCollectorExporterSendFailed,
metricOtelCollectorExporterQueueSize,
metricOtelCollectorExporterQueueCapacity,
metricOtelCollectorExporterEnqueueFailed,
metricOtelCollectorReceiverRefused,
}

for i := range otelCollectorMetrics {
otelCollectorMetrics[i] += "_.*"
}

return strings.Join(append(fluentBitMetrics, otelCollectorMetrics...), "|")
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package alertrules
package config

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package alertrules
package config

import (
"fmt"
)

const (
fluentBitMetricsServiceName = "telemetry-fluent-bit-metrics"

metricFluentBitOutputProcBytesTotal = "fluentbit_output_proc_bytes_total"
metricFluentBitInputBytesTotal = "fluentbit_input_bytes_total"
metricFluentBitOutputDroppedRecordsTotal = "fluentbit_output_dropped_records_total"
metricFluentBitBufferUsageBytes = "telemetry_fsbuffer_usage_bytes"
)

type fluentBitRuleBuilder struct {
Expand All @@ -20,7 +29,7 @@ func (rb fluentBitRuleBuilder) rules() []Rule {
func (rb fluentBitRuleBuilder) exporterSentRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentExporterSentLogs,
Expr: rate("fluentbit_output_proc_bytes_total", selectService(fluentBitMetricsServiceName)).
Expr: rate(metricFluentBitOutputProcBytesTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
greaterThan(0).
build(),
Expand All @@ -30,7 +39,7 @@ func (rb fluentBitRuleBuilder) exporterSentRule() Rule {
func (rb fluentBitRuleBuilder) receiverReadRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentReceiverReadLogs,
Expr: rate("fluentbit_input_bytes_total", selectService(fluentBitMetricsServiceName)).
Expr: rate(metricFluentBitInputBytesTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
greaterThan(0).
build(),
Expand All @@ -40,7 +49,7 @@ func (rb fluentBitRuleBuilder) receiverReadRule() Rule {
func (rb fluentBitRuleBuilder) exporterDroppedRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentExporterDroppedLogs,
Expr: rate("fluentbit_output_dropped_records_total", selectService(fluentBitMetricsServiceName)).
Expr: rate(metricFluentBitOutputDroppedRecordsTotal, selectService(fluentBitMetricsServiceName)).
sumBy(labelPipelineName).
greaterThan(0).
build(),
Expand All @@ -50,14 +59,14 @@ func (rb fluentBitRuleBuilder) exporterDroppedRule() Rule {
func (rb fluentBitRuleBuilder) bufferInUseRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentBufferInUse,
Expr: "telemetry_fsbuffer_usage_bytes > 300000000",
Expr: fmt.Sprintf("%s > 300000000", metricFluentBitBufferUsageBytes),
}
}

func (rb fluentBitRuleBuilder) bufferFullRule() Rule {
return Rule{
Alert: rb.namePrefix() + RuleNameLogAgentBufferFull,
Expr: "telemetry_fsbuffer_usage_bytes > 900000000",
Expr: fmt.Sprintf("%s > 900000000", metricFluentBitBufferUsageBytes),
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package alertrules
package config

import (
"fmt"
)

const (
metricOtelCollectorExporterSent = "otelcol_exporter_sent"
metricOtelCollectorExporterSendFailed = "otelcol_exporter_send_failed"
metricOtelCollectorExporterQueueSize = "otelcol_exporter_queue_size"
metricOtelCollectorExporterQueueCapacity = "otelcol_exporter_queue_capacity"
metricOtelCollectorExporterEnqueueFailed = "otelcol_exporter_enqueue_failed"
metricOtelCollectorReceiverRefused = "otelcol_receiver_refused"
)

type otelCollectorRuleBuilder struct {
serviceName string
dataType string
Expand All @@ -20,8 +29,12 @@ func (rb otelCollectorRuleBuilder) rules() []Rule {
}
}

func (rb otelCollectorRuleBuilder) formatMetricName(baseMetricName string) string {
return fmt.Sprintf("%s_%s", baseMetricName, rb.dataType)
}

func (rb otelCollectorRuleBuilder) exporterSentRule() Rule {
metric := fmt.Sprintf("otelcol_exporter_sent_%s", rb.dataType)
metric := rb.formatMetricName(metricOtelCollectorExporterSent)
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterSentData,
Expr: rate(metric, selectService(rb.serviceName)).
Expand All @@ -32,7 +45,7 @@ func (rb otelCollectorRuleBuilder) exporterSentRule() Rule {
}

func (rb otelCollectorRuleBuilder) exporterDroppedRule() Rule {
metric := fmt.Sprintf("otelcol_exporter_send_failed_%s", rb.dataType)
metric := rb.formatMetricName(metricOtelCollectorExporterSendFailed)
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterDroppedData,
Expr: rate(metric, selectService(rb.serviceName)).
Expand All @@ -45,15 +58,15 @@ func (rb otelCollectorRuleBuilder) exporterDroppedRule() Rule {
func (rb otelCollectorRuleBuilder) exporterQueueAlmostFullRule() Rule {
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterQueueAlmostFull,
Expr: div("otelcol_exporter_queue_size", "otelcol_exporter_queue_capacity", selectService(rb.serviceName)).
Expr: div(metricOtelCollectorExporterQueueSize, metricOtelCollectorExporterQueueCapacity, selectService(rb.serviceName)).
maxBy(labelPipelineName).
greaterThan(0.8).
build(),
}
}

func (rb otelCollectorRuleBuilder) exporterEnqueueFailedRule() Rule {
metric := fmt.Sprintf("otelcol_exporter_enqueue_failed_%s", rb.dataType)
metric := rb.formatMetricName(metricOtelCollectorExporterEnqueueFailed)
return Rule{
Alert: rb.namePrefix + RuleNameGatewayExporterEnqueueFailed,
Expr: rate(metric, selectService(rb.serviceName)).
Expand All @@ -64,7 +77,7 @@ func (rb otelCollectorRuleBuilder) exporterEnqueueFailedRule() Rule {
}

func (rb otelCollectorRuleBuilder) receiverRefusedRule() Rule {
metric := fmt.Sprintf("otelcol_receiver_refused_%s", rb.dataType)
metric := rb.formatMetricName(metricOtelCollectorReceiverRefused)
return Rule{
Alert: rb.namePrefix + RuleNameGatewayReceiverRefusedData,
Expr: rate(metric, selectService(rb.serviceName)).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package alertrules
package config

import (
"strings"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package alertrules
package config

import (
"testing"
Expand Down
2 changes: 1 addition & 1 deletion internal/selfmonitor/config/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ scrape_configs:
action: replace
metric_relabel_configs:
- source_labels: [__name__]
regex: (otelcol_.*|fluentbit_.*|telemetry_.*)
regex: fluentbit_output_proc_bytes_total|fluentbit_output_dropped_records_total|fluentbit_input_bytes_total|telemetry_fsbuffer_usage_bytes|otelcol_exporter_sent_.*|otelcol_exporter_send_failed_.*|otelcol_exporter_queue_size_.*|otelcol_exporter_queue_capacity_.*|otelcol_exporter_enqueue_failed_.*|otelcol_receiver_refused_.*
action: keep
- source_labels: [__name__, name]
regex: fluentbit_.+;([a-zA-Z0-9-]+)
Expand Down
32 changes: 16 additions & 16 deletions internal/selfmonitor/prober/log_pipeline_prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/config"
)

type LogPipelineProber struct {
Expand Down Expand Up @@ -50,41 +50,41 @@ func (p *LogPipelineProber) Probe(ctx context.Context, pipelineName string) (Log
}

func (p *LogPipelineProber) allDataDropped(alerts []promv1.Alert, pipelineName string) bool {
exporterSentLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterSentLogs, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterDroppedLogs, pipelineName)
bufferFull := p.evaluateRule(alerts, alertrules.RuleNameLogAgentBufferFull, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterSentLogs, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterDroppedLogs, pipelineName)
bufferFull := p.evaluateRule(alerts, config.RuleNameLogAgentBufferFull, pipelineName)
return !exporterSentLogs && (exporterDroppedLogs || bufferFull)
}

func (p *LogPipelineProber) someDataDropped(alerts []promv1.Alert, pipelineName string) bool {
exporterSentLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterSentLogs, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterDroppedLogs, pipelineName)
bufferFull := p.evaluateRule(alerts, alertrules.RuleNameLogAgentBufferFull, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterSentLogs, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterDroppedLogs, pipelineName)
bufferFull := p.evaluateRule(alerts, config.RuleNameLogAgentBufferFull, pipelineName)
return exporterSentLogs && (exporterDroppedLogs || bufferFull)
}

func (p *LogPipelineProber) noLogsDelivered(alerts []promv1.Alert, pipelineName string) bool {
receiverReadLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentReceiverReadLogs, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterSentLogs, pipelineName)
receiverReadLogs := p.evaluateRule(alerts, config.RuleNameLogAgentReceiverReadLogs, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterSentLogs, pipelineName)
return receiverReadLogs && !exporterSentLogs
}

func (p *LogPipelineProber) bufferFillingUp(alerts []promv1.Alert, pipelineName string) bool {
return p.evaluateRule(alerts, alertrules.RuleNameLogAgentBufferInUse, pipelineName)
return p.evaluateRule(alerts, config.RuleNameLogAgentBufferInUse, pipelineName)
}

func (p *LogPipelineProber) healthy(alerts []promv1.Alert, pipelineName string) bool {
// The pipeline is healthy if none of the following conditions are met:
bufferInUse := p.evaluateRule(alerts, alertrules.RuleNameLogAgentBufferInUse, pipelineName)
bufferFull := p.evaluateRule(alerts, alertrules.RuleNameLogAgentBufferFull, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterDroppedLogs, pipelineName)
bufferInUse := p.evaluateRule(alerts, config.RuleNameLogAgentBufferInUse, pipelineName)
bufferFull := p.evaluateRule(alerts, config.RuleNameLogAgentBufferFull, pipelineName)
exporterDroppedLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterDroppedLogs, pipelineName)

// The pipeline is healthy if either no logs are being read or all logs are being sent
receiverReadLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentReceiverReadLogs, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, alertrules.RuleNameLogAgentExporterSentLogs, pipelineName)
receiverReadLogs := p.evaluateRule(alerts, config.RuleNameLogAgentReceiverReadLogs, pipelineName)
exporterSentLogs := p.evaluateRule(alerts, config.RuleNameLogAgentExporterSentLogs, pipelineName)
return !(bufferInUse || bufferFull || exporterDroppedLogs) && (!receiverReadLogs || exporterSentLogs)
}

func (p *LogPipelineProber) evaluateRule(alerts []promv1.Alert, alertName, pipelineName string) bool {
return evaluateRuleWithMatcher(alerts, alertName, pipelineName, alertrules.MatchesLogPipelineRule)
return evaluateRuleWithMatcher(alerts, alertName, pipelineName, config.MatchesLogPipelineRule)
}
30 changes: 15 additions & 15 deletions internal/selfmonitor/prober/otel_pipeline_prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/config"
)

// OTelPipelineProber is a prober for OTel Collector pipelines
Expand All @@ -24,11 +24,11 @@ type OTelPipelineProbeResult struct {
}

func NewMetricPipelineProber(selfMonitorName types.NamespacedName) (*OTelPipelineProber, error) {
return newOTelPipelineProber(selfMonitorName, alertrules.MatchesMetricPipelineRule)
return newOTelPipelineProber(selfMonitorName, config.MatchesMetricPipelineRule)
}

func NewTracePipelineProber(selfMonitorName types.NamespacedName) (*OTelPipelineProber, error) {
return newOTelPipelineProber(selfMonitorName, alertrules.MatchesTracePipelineRule)
return newOTelPipelineProber(selfMonitorName, config.MatchesTracePipelineRule)
}

func newOTelPipelineProber(selfMonitorName types.NamespacedName, matcher matcherFunc) (*OTelPipelineProber, error) {
Expand Down Expand Up @@ -61,34 +61,34 @@ func (p *OTelPipelineProber) Probe(ctx context.Context, pipelineName string) (OT
}

func (p *OTelPipelineProber) allDataDropped(alerts []promv1.Alert, pipelineName string) bool {
exporterSentFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterSentData, pipelineName)
exporterDroppedFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterDroppedData, pipelineName)
exporterEnqueueFailedFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterEnqueueFailed, pipelineName)
exporterSentFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterSentData, pipelineName)
exporterDroppedFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterDroppedData, pipelineName)
exporterEnqueueFailedFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterEnqueueFailed, pipelineName)

return !exporterSentFiring && (exporterDroppedFiring || exporterEnqueueFailedFiring)
}

func (p *OTelPipelineProber) someDataDropped(alerts []promv1.Alert, pipelineName string) bool {
exporterSentFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterSentData, pipelineName)
exporterDroppedFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterDroppedData, pipelineName)
exporterEnqueueFailedFiring := p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterEnqueueFailed, pipelineName)
exporterSentFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterSentData, pipelineName)
exporterDroppedFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterDroppedData, pipelineName)
exporterEnqueueFailedFiring := p.evaluateRule(alerts, config.RuleNameGatewayExporterEnqueueFailed, pipelineName)

return exporterSentFiring && (exporterDroppedFiring || exporterEnqueueFailedFiring)
}

func (p *OTelPipelineProber) queueAlmostFull(alerts []promv1.Alert, pipelineName string) bool {
return p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterQueueAlmostFull, pipelineName)
return p.evaluateRule(alerts, config.RuleNameGatewayExporterQueueAlmostFull, pipelineName)
}

func (p *OTelPipelineProber) throttling(alerts []promv1.Alert, pipelineName string) bool {
return p.evaluateRule(alerts, alertrules.RuleNameGatewayReceiverRefusedData, pipelineName)
return p.evaluateRule(alerts, config.RuleNameGatewayReceiverRefusedData, pipelineName)
}

func (p *OTelPipelineProber) healthy(alerts []promv1.Alert, pipelineName string) bool {
return !(p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterDroppedData, pipelineName) ||
p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterQueueAlmostFull, pipelineName) ||
p.evaluateRule(alerts, alertrules.RuleNameGatewayExporterEnqueueFailed, pipelineName) ||
p.evaluateRule(alerts, alertrules.RuleNameGatewayReceiverRefusedData, pipelineName))
return !(p.evaluateRule(alerts, config.RuleNameGatewayExporterDroppedData, pipelineName) ||
p.evaluateRule(alerts, config.RuleNameGatewayExporterQueueAlmostFull, pipelineName) ||
p.evaluateRule(alerts, config.RuleNameGatewayExporterEnqueueFailed, pipelineName) ||
p.evaluateRule(alerts, config.RuleNameGatewayReceiverRefusedData, pipelineName))
}

func (p *OTelPipelineProber) evaluateRule(alerts []promv1.Alert, alertName, pipelineName string) bool {
Expand Down
8 changes: 4 additions & 4 deletions internal/selfmonitor/webhook/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

telemetryv1alpha1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1alpha1"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/alertrules"
"github.com/kyma-project/telemetry-manager/internal/selfmonitor/config"
)

type Handler struct {
Expand Down Expand Up @@ -137,7 +137,7 @@ func (h *Handler) toMetricPipelineReconcileEvents(ctx context.Context, alerts []
for i := range metricPipelines.Items {
pipelineName := metricPipelines.Items[i].GetName()
for _, alert := range alerts {
if alertrules.MatchesMetricPipelineRule(alert.Labels, alertrules.RulesAny, pipelineName) {
if config.MatchesMetricPipelineRule(alert.Labels, config.RulesAny, pipelineName) {
events = append(events, event.GenericEvent{Object: &metricPipelines.Items[i]})
}
}
Expand All @@ -156,7 +156,7 @@ func (h *Handler) toTracePipelineReconcileEvents(ctx context.Context, alerts []A
for i := range tracePipelines.Items {
pipelineName := tracePipelines.Items[i].GetName()
for _, alert := range alerts {
if alertrules.MatchesTracePipelineRule(alert.Labels, alertrules.RulesAny, pipelineName) {
if config.MatchesTracePipelineRule(alert.Labels, config.RulesAny, pipelineName) {
events = append(events, event.GenericEvent{Object: &tracePipelines.Items[i]})
}
}
Expand All @@ -175,7 +175,7 @@ func (h *Handler) toLogPipelineReconcileEvents(ctx context.Context, alerts []Ale
for i := range logPipelines.Items {
pipelineName := logPipelines.Items[i].GetName()
for _, alert := range alerts {
if alertrules.MatchesLogPipelineRule(alert.Labels, alertrules.RulesAny, pipelineName) {
if config.MatchesLogPipelineRule(alert.Labels, config.RulesAny, pipelineName) {
events = append(events, event.GenericEvent{Object: &logPipelines.Items[i]})
}
}
Expand Down

0 comments on commit 7729fb5

Please sign in to comment.