diff --git a/metricbeat/mb/event.go b/metricbeat/mb/event.go index fb6907b6396f..ebd0af668e94 100644 --- a/metricbeat/mb/event.go +++ b/metricbeat/mb/event.go @@ -223,6 +223,9 @@ type PartialMetricsError struct { } func (p PartialMetricsError) Error() string { + if p.Err == nil { + return "" + } return p.Err.Error() } diff --git a/metricbeat/module/system/_meta/config.yml b/metricbeat/module/system/_meta/config.yml index 4f3a66dcfa90..f48f318182e7 100644 --- a/metricbeat/module/system/_meta/config.yml +++ b/metricbeat/module/system/_meta/config.yml @@ -17,6 +17,7 @@ process.include_top_n: by_cpu: 5 # include top 5 processes by CPU by_memory: 5 # include top 5 processes by memory + degrade_on_partial: false # mark metricset as degraded if partial metrics are emitted # Configure the mount point of the host’s filesystem for use in monitoring a host from within a container # hostfs: "/hostfs" diff --git a/metricbeat/module/system/process/process.go b/metricbeat/module/system/process/process.go index f84c0b6027a0..c00e60f6bfd6 100644 --- a/metricbeat/module/system/process/process.go +++ b/metricbeat/module/system/process/process.go @@ -45,9 +45,10 @@ func init() { // MetricSet that fetches process metrics. type MetricSet struct { mb.BaseMetricSet - stats *process.Stats - perCPU bool - setpid int + stats *process.Stats + perCPU bool + setpid int + degradeOnPartial bool } // New creates and returns a new MetricSet. @@ -72,7 +73,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if config.Pid != 0 && config.Procs[0] != ".*" { logp.L().Warnf("`process.pid` set to %d, but `processes` is set to a non-default value. Metricset will only report metrics for pid %d", config.Pid, config.Pid) } - + degradedConf := struct { + DegradeOnPartial bool `config:"degrade_on_partial"` + }{} + if err := base.Module().UnpackConfig(°radedConf); err != nil { + logp.L().Warnf("Failed to unpack config; degraded mode will be disabled for partial metrics: %v", err) + } m := &MetricSet{ BaseMetricSet: base, stats: &process.Stats{ @@ -88,7 +94,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { IgnoreRootCgroups: true, }, }, - perCPU: config.IncludePerCPU, + perCPU: config.IncludePerCPU, + degradeOnPartial: degradedConf.DegradeOnPartial, } m.setpid = config.Pid @@ -119,6 +126,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { // return only if the error is fatal in nature return fmt.Errorf("process stats: %w", err) } else if (err != nil && errors.Is(err, process.NonFatalErr{})) { + if m.degradeOnPartial { + return fmt.Errorf("error fetching process list: %w", err) + } err = mb.PartialMetricsError{Err: err} } @@ -138,6 +148,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { // return only if the error is fatal in nature return fmt.Errorf("error fetching pid %d: %w", m.setpid, err) } else if (err != nil && errors.Is(err, process.NonFatalErr{})) { + if m.degradeOnPartial { + return fmt.Errorf("error fetching process list: %w", err) + } err = mb.PartialMetricsError{Err: err} } // if error is non-fatal, emit partial metrics. diff --git a/metricbeat/module/system/process/process_test.go b/metricbeat/module/system/process/process_test.go index 18841b68c09e..8920d0d0efef 100644 --- a/metricbeat/module/system/process/process_test.go +++ b/metricbeat/module/system/process/process_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" _ "github.com/elastic/beats/v7/metricbeat/module/system" "github.com/elastic/elastic-agent-libs/logp" @@ -55,6 +56,32 @@ func TestFetch(t *testing.T) { events[0].BeatEvent("system", "process").Fields.StringToPrint()) } +func TestFetchDegradeOnPartial(t *testing.T) { + logp.DevelopmentSetup() + config := getConfig() + config["degrade_on_partial"] = true + + f := mbtest.NewReportingMetricSetV2Error(t, config) + events, errs := mbtest.ReportingFetchV2Error(f) + if len(errs) > 0 { + for _, err := range errs { + assert.NotErrorIsf(t, err, &mb.PartialMetricsError{}, "Expected non-fatal error, got %v", err) + } + } else { + assert.NotEmpty(t, events) + + events, errs = mbtest.ReportingFetchV2Error(f) + for _, err := range errs { + assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err) + } + assert.NotEmpty(t, events) + + t.Logf("fetched %d events, showing events[0]:", len(events)) + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), + events[0].BeatEvent("system", "process").Fields.StringToPrint()) + } +} + func TestFetchSinglePid(t *testing.T) { logp.DevelopmentSetup() diff --git a/metricbeat/module/system/process_summary/process_summary.go b/metricbeat/module/system/process_summary/process_summary.go index cc8d5e385525..69c08c957e74 100644 --- a/metricbeat/module/system/process_summary/process_summary.go +++ b/metricbeat/module/system/process_summary/process_summary.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-system-metrics/metric/system/process" "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" @@ -50,17 +51,28 @@ func init() { // multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - sys resolve.Resolver + sys resolve.Resolver + degradeOnPartial bool } // New create a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - sys := base.Module().(resolve.Resolver) + sys, ok := base.Module().(resolve.Resolver) + if !ok { + return nil, fmt.Errorf("resolver cannot be cast from the module") + } + degradedConf := struct { + DegradeOnPartial bool `config:"degrade_on_partial"` + }{} + if err := base.Module().UnpackConfig(°radedConf); err != nil { + logp.L().Warnf("Failed to unpack config; degraded mode will be disabled for partial metrics: %v", err) + } return &MetricSet{ - BaseMetricSet: base, - sys: sys, + BaseMetricSet: base, + sys: sys, + degradeOnPartial: degradedConf.DegradeOnPartial, }, nil } @@ -74,6 +86,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { // return only if the error is fatal in nature return fmt.Errorf("error fetching process list: %w", degradeErr) } else if (degradeErr != nil && errors.Is(degradeErr, process.NonFatalErr{})) { + if m.degradeOnPartial { + return fmt.Errorf("error fetching process list: %w", degradeErr) + } degradeErr = mb.PartialMetricsError{Err: degradeErr} } diff --git a/metricbeat/module/system/process_summary/process_summary_test.go b/metricbeat/module/system/process_summary/process_summary_test.go index 042148f37133..48dbf09cfe3e 100644 --- a/metricbeat/module/system/process_summary/process_summary_test.go +++ b/metricbeat/module/system/process_summary/process_summary_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" _ "github.com/elastic/beats/v7/metricbeat/module/system" "github.com/elastic/elastic-agent-libs/logp" @@ -58,6 +59,27 @@ func TestFetch(t *testing.T) { require.NoError(t, err) } +func TestFetchDegradeOnPartial(t *testing.T) { + logp.DevelopmentSetup() + config := getConfig() + config["degrade_on_partial"] = true + + f := mbtest.NewReportingMetricSetV2Error(t, config) + events, errs := mbtest.ReportingFetchV2Error(f) + if len(errs) > 0 { + for _, err := range errs { + assert.NotErrorIsf(t, err, &mb.PartialMetricsError{}, "Expected non-fatal error, got %v", err) + } + } else { + require.NotEmpty(t, events) + event := events[0].BeatEvent("system", "process_summary").Fields + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), + event.StringToPrint()) + + _, err := event.GetValue("system.process.summary") + require.NoError(t, err) + } +} func TestStateNames(t *testing.T) { logp.DevelopmentSetup() @@ -80,7 +102,8 @@ func TestStateNames(t *testing.T) { assert.NotZero(t, event["total"]) var sum int - total := event["total"].(int) + total, ok := event["total"].(int) + require.Truef(t, ok, "Expected int got %T", event["total"]) for key, val := range event { if key == "total" { continue diff --git a/metricbeat/modules.d/system.yml b/metricbeat/modules.d/system.yml index 4123ea00f332..20cd4564033a 100644 --- a/metricbeat/modules.d/system.yml +++ b/metricbeat/modules.d/system.yml @@ -20,6 +20,7 @@ process.include_top_n: by_cpu: 5 # include top 5 processes by CPU by_memory: 5 # include top 5 processes by memory + degrade_on_partial: false # mark metricset as degraded if partial metrics are emitted # Configure the mount point of the host’s filesystem for use in monitoring a host from within a container # hostfs: "/hostfs"