diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index b4d890d45443..faa8c6b58d27 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -273,6 +273,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Ensure all responses sent by HTTP Endpoint are HTML-escaped. {pull}39329[39329]
- Update CEL mito extensions to v1.11.0 to improve type checking. {pull}39460[39460]
- Improve logging of request and response with request trace logging in error conditions. {pull}39455[39455]
+- Implement Elastic Agent status and health reporting for CEL Filebeat input. {pull}39209[39209]
- Add HTTP metrics to CEL input. {issue}39501[39501] {pull}39503[39503]
- Add default user-agent to CEL HTTP requests. {issue}39502[39502] {pull}39587[39587]
- Improve reindexing support in security module pipelines. {issue}38224[38224] {pull}[]
diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go
index a8d2c0e8cb2e..d84c9e39bfd1 100644
--- a/filebeat/input/v2/compat/compat.go
+++ b/filebeat/input/v2/compat/compat.go
@@ -31,6 +31,7 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
+ "github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
@@ -50,13 +51,14 @@ type factory struct {
// On stop the runner triggers the shutdown signal and waits until the input
// has returned.
type runner struct {
- id string
- log *logp.Logger
- agent *beat.Info
- wg sync.WaitGroup
- sig ctxtool.CancelContext
- input v2.Input
- connector beat.PipelineConnector
+ id string
+ log *logp.Logger
+ agent *beat.Info
+ wg sync.WaitGroup
+ sig ctxtool.CancelContext
+ input v2.Input
+ connector beat.PipelineConnector
+ statusReporter status.StatusReporter
}
// RunnerFactory creates a cfgfile.RunnerFactory from an input Loader that is
@@ -109,6 +111,10 @@ func (f *factory) Create(
}, nil
}
+func (r *runner) SetStatusReporter(reported status.StatusReporter) {
+ r.statusReporter = reported
+}
+
func (r *runner) String() string { return r.input.Name() }
func (r *runner) Start() {
@@ -121,10 +127,11 @@ func (r *runner) Start() {
log.Infof("Input '%s' starting", name)
err := r.input.Run(
v2.Context{
- ID: r.id,
- Agent: *r.agent,
- Logger: log,
- Cancelation: r.sig,
+ ID: r.id,
+ Agent: *r.agent,
+ Logger: log,
+ Cancelation: r.sig,
+ StatusReporter: r.statusReporter,
},
r.connector,
)
@@ -140,6 +147,7 @@ func (r *runner) Stop() {
r.sig.Cancel()
r.wg.Wait()
r.log.Infof("Input '%s' stopped (runner)", r.input.Name())
+ r.statusReporter = nil
}
func configID(config *conf.C) (string, error) {
diff --git a/filebeat/input/v2/input.go b/filebeat/input/v2/input.go
index 30b8ad333b15..9b78bc427aec 100644
--- a/filebeat/input/v2/input.go
+++ b/filebeat/input/v2/input.go
@@ -22,6 +22,7 @@ import (
"time"
"github.com/elastic/beats/v7/libbeat/beat"
+ "github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
@@ -83,6 +84,17 @@ type Context struct {
// Cancelation is used by Beats to signal the input to shutdown.
Cancelation Canceler
+
+ // StatusReporter provides a method to update the status of the underlying unit
+ // that maps to the config. Note: Under standalone execution of Filebeat this is
+ // expected to be nil.
+ StatusReporter status.StatusReporter
+}
+
+func (c Context) UpdateStatus(status status.Status, msg string) {
+ if c.StatusReporter != nil {
+ c.StatusReporter.UpdateStatus(status, msg)
+ }
}
// TestContext provides the Input Test function with common environmental
diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go
index 1f16e23ab4ff..d557ffa25c24 100644
--- a/libbeat/cfgfile/list.go
+++ b/libbeat/cfgfile/list.go
@@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/common/reload"
+ "github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
@@ -153,6 +154,12 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {
r.logger.Debugf("Starting runner: %s", runner)
r.runners[hash] = runner
+ if config.StatusReporter != nil {
+ if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok {
+ runnerWithStatus.SetStatusReporter(config.StatusReporter)
+ }
+ }
+
runner.Start()
moduleStarts.Add(1)
if config.DiagCallback != nil {
diff --git a/libbeat/common/reload/reload.go b/libbeat/common/reload/reload.go
index 279b7fd26b00..7021796d28c2 100644
--- a/libbeat/common/reload/reload.go
+++ b/libbeat/common/reload/reload.go
@@ -21,6 +21,7 @@ import (
"fmt"
"sync"
+ "github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)
@@ -47,6 +48,11 @@ type ConfigWithMeta struct {
// InputUnitID is the unit's ID that generated this ConfigWithMeta
InputUnitID string
+
+ // StatusReporter provides a method to update the status of the underlying unit
+ // that maps to the config. Note: Under standalone execution of a Beat this is
+ // expected to be nil.
+ StatusReporter status.StatusReporter
}
// ReloadableList provides a method to reload the configuration of a list of entities
@@ -160,7 +166,7 @@ func (r *Registry) GetReloadableOutput() Reloadable {
func (r *Registry) GetRegisteredNames() []string {
r.RLock()
defer r.RUnlock()
- var names []string
+ names := make([]string, 0, len(r.confs)+len(r.confsLists))
for name := range r.confs {
names = append(names, name)
diff --git a/libbeat/management/management.go b/libbeat/management/management.go
index 177642b33988..6770e87538d7 100644
--- a/libbeat/management/management.go
+++ b/libbeat/management/management.go
@@ -21,49 +21,19 @@ import (
"sync"
"github.com/elastic/beats/v7/libbeat/common/reload"
+ "github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)
-// Status describes the current status of the beat.
-type Status int
-
-//go:generate stringer -type=Status
-const (
- // Unknown is initial status when none has been reported.
- Unknown Status = iota
- // Starting is status describing application is starting.
- Starting
- // Configuring is status describing application is configuring.
- Configuring
- // Running is status describing application is running.
- Running
- // Degraded is status describing application is degraded.
- Degraded
- // Failed is status describing application is failed. This status should
- // only be used in the case the beat should stop running as the failure
- // cannot be recovered.
- Failed
- // Stopping is status describing application is stopping.
- Stopping
- // Stopped is status describing application is stopped.
- Stopped
-)
-
// DebugK used as key for all things central management
var DebugK = "centralmgmt"
-// StatusReporter provides a method to update current status of the beat.
-type StatusReporter interface {
- // UpdateStatus called when the status of the beat has changed.
- UpdateStatus(status Status, msg string)
-}
-
// Manager interacts with the beat to provide status updates and to receive
// configurations.
type Manager interface {
- StatusReporter
+ status.StatusReporter
// Enabled returns true if manager is enabled.
Enabled() bool
@@ -133,7 +103,7 @@ func NewManager(cfg *config.C, registry *reload.Registry) (Manager, error) {
}
return &fallbackManager{
logger: logp.NewLogger("mgmt"),
- status: Unknown,
+ status: status.Unknown,
msg: "",
}, nil
}
@@ -152,13 +122,13 @@ func SetManagerFactory(factory ManagerFactory) {
type fallbackManager struct {
logger *logp.Logger
lock sync.Mutex
- status Status
+ status status.Status
msg string
stopFunc func()
stopOnce sync.Once
}
-func (n *fallbackManager) UpdateStatus(status Status, msg string) {
+func (n *fallbackManager) UpdateStatus(status status.Status, msg string) {
n.lock.Lock()
defer n.lock.Unlock()
if n.status != status || n.msg != msg {
diff --git a/libbeat/management/status/status.go b/libbeat/management/status/status.go
new file mode 100644
index 000000000000..d4a9e5c3f8dc
--- /dev/null
+++ b/libbeat/management/status/status.go
@@ -0,0 +1,55 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package status
+
+// Status describes the current status of the beat.
+type Status int
+
+//go:generate go run golang.org/x/tools/cmd/stringer -type=Status
+const (
+ // Unknown is initial status when none has been reported.
+ Unknown Status = iota
+ // Starting is status describing unit is starting.
+ Starting
+ // Configuring is status describing unit is configuring.
+ Configuring
+ // Running is status describing unit is running.
+ Running
+ // Degraded is status describing unit is degraded.
+ Degraded
+ // Failed is status describing unit is failed. This status should
+ // only be used in the case the beat should stop running as the failure
+ // cannot be recovered.
+ Failed
+ // Stopping is status describing unit is stopping.
+ Stopping
+ // Stopped is status describing unit is stopped.
+ Stopped
+)
+
+// StatusReporter provides a method to update current status of a unit.
+type StatusReporter interface {
+ // UpdateStatus updates the status of the unit.
+ UpdateStatus(status Status, msg string)
+}
+
+// WithStatusReporter provides a method to set a status reporter
+type WithStatusReporter interface {
+ // SetStatusReporter sets the status reporter
+ SetStatusReporter(reporter StatusReporter)
+}
diff --git a/libbeat/management/status_string.go b/libbeat/management/status/status_string.go
similarity index 92%
rename from libbeat/management/status_string.go
rename to libbeat/management/status/status_string.go
index d26703bb4f99..a26ebcc43223 100644
--- a/libbeat/management/status_string.go
+++ b/libbeat/management/status/status_string.go
@@ -17,7 +17,7 @@
// Code generated by "stringer -type=Status"; DO NOT EDIT.
-package management
+package status
import "strconv"
@@ -32,11 +32,12 @@ func _() {
_ = x[Degraded-4]
_ = x[Failed-5]
_ = x[Stopping-6]
+ _ = x[Stopped-7]
}
-const _Status_name = "UnknownStartingConfiguringRunningDegradedFailedStopping"
+const _Status_name = "UnknownStartingConfiguringRunningDegradedFailedStoppingStopped"
-var _Status_index = [...]uint8{0, 7, 15, 26, 33, 41, 47, 55}
+var _Status_index = [...]uint8{0, 7, 15, 26, 33, 41, 47, 55, 62}
func (i Status) String() string {
if i < 0 || i >= Status(len(_Status_index)-1) {
diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go
index f0795377b0ec..7e8a7584c280 100644
--- a/x-pack/filebeat/input/cel/input.go
+++ b/x-pack/filebeat/input/cel/input.go
@@ -23,7 +23,7 @@ import (
"strings"
"time"
- retryablehttp "github.com/hashicorp/go-retryablehttp"
+ "github.com/hashicorp/go-retryablehttp"
"github.com/icholy/digest"
"github.com/rcrowley/go-metrics"
"go.elastic.co/ecszap"
@@ -39,6 +39,7 @@ import (
inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
+ "github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
@@ -102,17 +103,26 @@ func (input) Test(src inputcursor.Source, _ v2.TestContext) error {
// context cancellation or type invalidity errors, any other error will be retried.
func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error {
var cursor map[string]interface{}
+ env.UpdateStatus(status.Starting, "")
if !crsr.IsNew() { // Allow the user to bootstrap the program if needed.
err := crsr.Unpack(&cursor)
if err != nil {
+ env.UpdateStatus(status.Failed, "failed to unpack cursor: "+err.Error())
return err
}
}
- return input{}.run(env, src.(*source), cursor, pub)
+
+ err := input{}.run(env, src.(*source), cursor, pub)
+ if err != nil {
+ env.UpdateStatus(status.Failed, "failed to run: "+err.Error())
+ return err
+ }
+ env.UpdateStatus(status.Stopped, "")
+ return nil
}
// sanitizeFileName returns name with ":" and "/" replaced with "_", removing repeated instances.
-// The request.tracer.filename may have ":" when a httpjson input has cursor config and
+// The request.tracer.filename may have ":" when a cel input has cursor config and
// the macOS Finder will treat this as path-separator and causes to show up strange filepaths.
func sanitizeFileName(name string) string {
name = strings.ReplaceAll(name, ":", string(filepath.Separator))
@@ -171,6 +181,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
goodURL := cfg.Resource.URL.String()
state["url"] = goodURL
metrics.resource.Set(goodURL)
+ env.UpdateStatus(status.Running, "")
// On entry, state is expected to be in the shape:
//
// {
@@ -210,6 +221,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
budget = *cfg.MaxExecutions
waitUntil time.Time
)
+ // Keep track of whether CEL is degraded for this periodic run.
+ var isDegraded bool
for {
if wait := time.Until(waitUntil); wait > 0 {
// We have a special-case wait for when we have a zero limit.
@@ -243,7 +256,9 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
return err
}
log.Errorw("failed evaluation", "error", err)
+ env.UpdateStatus(status.Degraded, "failed evaluation: "+err.Error())
}
+ isDegraded = err != nil
metrics.celProcessingTime.Update(time.Since(start).Nanoseconds())
if trace != nil {
log.Debugw("final transaction", "transaction.id", trace.TxID())
@@ -350,6 +365,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
e, ok := state["events"]
if !ok {
log.Error("unexpected missing events array from evaluation")
+ env.UpdateStatus(status.Degraded, "unexpected missing events array from evaluation")
+ isDegraded = true
}
var events []interface{}
switch e := e.(type) {
@@ -363,6 +380,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
return nil
}
log.Errorw("single event object returned by evaluation", "event", e)
+ env.UpdateStatus(status.Degraded, "single event object returned by evaluation")
+ isDegraded = true
events = []interface{}{e}
// Make sure the cursor is not updated.
delete(state, "cursor")
@@ -388,6 +407,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
if ok {
if len(cursors) != len(events) {
log.Errorw("unexpected cursor list length", "cursors", len(cursors), "events", len(events))
+ env.UpdateStatus(status.Degraded, "unexpected cursor list length")
+ isDegraded = true
// But try to continue.
if len(cursors) < len(events) {
cursors = nil
@@ -438,6 +459,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
if err != nil {
hadPublicationError = true
log.Errorw("error publishing event", "error", err)
+ env.UpdateStatus(status.Degraded, "error publishing event: "+err.Error())
+ isDegraded = true
cursors = nil // We are lost, so retry with this event's cursor,
continue // but continue with the events that we have without
// advancing the cursor. This allows us to potentially publish the
@@ -454,6 +477,11 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
return err
}
}
+
+ if !isDegraded {
+ env.UpdateStatus(status.Running, "")
+ }
+
metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds())
// Advance the cursor to the final state if there was no error during
@@ -473,6 +501,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
budget--
if budget <= 0 {
log.Warnw("exceeding maximum number of CEL executions", "limit", *cfg.MaxExecutions)
+ env.UpdateStatus(status.Degraded, "exceeding maximum number of CEL executions")
return nil
}
}
diff --git a/x-pack/filebeat/input/cel/integration_test.go b/x-pack/filebeat/input/cel/integration_test.go
new file mode 100644
index 000000000000..fa9e5bf017c6
--- /dev/null
+++ b/x-pack/filebeat/input/cel/integration_test.go
@@ -0,0 +1,521 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+//go:build integration
+
+package cel_test
+
+import (
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "reflect"
+ "testing"
+ "time"
+
+ "google.golang.org/protobuf/types/known/structpb"
+
+ "github.com/elastic/beats/v7/libbeat/tests/integration"
+ filebeat "github.com/elastic/beats/v7/x-pack/filebeat/cmd"
+ "github.com/elastic/elastic-agent-client/v7/pkg/client/mock"
+ "github.com/elastic/elastic-agent-client/v7/pkg/proto"
+)
+
+// TestCheckinV2 is an integration test that checks that CEL input reports in
+// the expected statuses. Specifically it configures a filebeat instance to
+// run a CEL input with two streams as well as it monitors the reported state
+// by spawning an elastic-agent V2 mock server.
+// This test also spawns two http servers for making the CEL input streams
+// to report different states that are checked to match the expected states.
+func TestCheckinV2(t *testing.T) {
+ // make sure there is an ES instance running
+ integration.EnsureESIsRunning(t)
+ esConnectionDetails := integration.GetESURL(t, "http")
+ outputHosts := []interface{}{fmt.Sprintf("%s://%s:%s", esConnectionDetails.Scheme, esConnectionDetails.Hostname(), esConnectionDetails.Port())}
+ outputUsername := esConnectionDetails.User.Username()
+ outputPassword, _ := esConnectionDetails.User.Password()
+ outputProtocol := esConnectionDetails.Scheme
+
+ invalidResponse := []byte("invalid json")
+ validResponse := []byte("{\"ip\":\"0.0.0.0\"}")
+
+ // http server for the first CEL input stream
+ serverOneResponse := validResponse
+ svrOne := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ _, _ = w.Write(serverOneResponse)
+ }))
+ defer svrOne.Close()
+
+ // http server for the second CEL input stream
+ serverTwoResponse := validResponse
+ svrTwo := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ _, _ = w.Write(serverTwoResponse)
+ }))
+ defer svrTwo.Close()
+
+ // allStreams is an elastic-agent configuration with an ES output and one CEL
+ // input with two streams.
+ allStreams := []*proto.UnitExpected{
+ {
+ Id: "output-unit",
+ Type: proto.UnitType_OUTPUT,
+ ConfigStateIdx: 0,
+ State: proto.State_HEALTHY,
+ LogLevel: proto.UnitLogLevel_INFO,
+ Config: &proto.UnitExpectedConfig{
+ Id: "default",
+ Type: "elasticsearch",
+ Name: "elasticsearch",
+ Source: integration.RequireNewStruct(t, map[string]interface{}{
+ "type": "elasticsearch",
+ "hosts": outputHosts,
+ "username": outputUsername,
+ "password": outputPassword,
+ "protocol": outputProtocol,
+ "enabled": true,
+ "ssl.verification_mode": "none",
+ }),
+ },
+ },
+ {
+ Id: "input-unit-1",
+ Type: proto.UnitType_INPUT,
+ ConfigStateIdx: 0,
+ State: proto.State_HEALTHY,
+ LogLevel: proto.UnitLogLevel_DEBUG,
+ Config: &proto.UnitExpectedConfig{
+ Id: "cel-cel-1e8b33de-d54a-45cd-90da-23ed71c482e5",
+ Type: "cel",
+ Name: "cel-1",
+ Source: integration.RequireNewStruct(t, map[string]interface{}{
+ "use_output": "default",
+ "revision": 0,
+ }),
+ DataStream: &proto.DataStream{
+ Namespace: "default",
+ },
+ Meta: &proto.Meta{
+ Package: &proto.Package{
+ Name: "cel",
+ Version: "1.9.0",
+ },
+ },
+ Streams: []*proto.Stream{
+ {
+ Id: "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2",
+ DataStream: &proto.DataStream{
+ Dataset: "cel.cel",
+ },
+ Source: integration.RequireNewStruct(t, map[string]interface{}{
+ "interval": "10s",
+ "program": `bytes(get(state.url).Body).as(body,{"events":[body.decode_json()]})`,
+ "redact.delete": false,
+ "regexp": nil,
+ "resource.url": svrOne.URL,
+ "publisher_pipeline.disable_host": true,
+ }),
+ },
+ {
+ Id: "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2",
+ DataStream: &proto.DataStream{
+ Dataset: "cel.cel",
+ },
+ Source: integration.RequireNewStruct(t, map[string]interface{}{
+ "interval": "10s",
+ "program": `bytes(get(state.url).Body).as(body,{"events":[body.decode_json()]})`,
+ "redact.delete": false,
+ "regexp": nil,
+ "resource.url": svrTwo.URL,
+ "publisher_pipeline.disable_host": true,
+ }),
+ },
+ },
+ },
+ },
+ }
+
+ // oneStream is an elastic-agent configuration with an ES output and one CEL
+ // input with one stream. Effectively this is the same as allStreams with
+ // stream cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2 removed.
+ oneStream := []*proto.UnitExpected{
+ {
+ Id: "output-unit",
+ Type: proto.UnitType_OUTPUT,
+ ConfigStateIdx: 0,
+ State: proto.State_HEALTHY,
+ LogLevel: proto.UnitLogLevel_INFO,
+ Config: &proto.UnitExpectedConfig{
+ Id: "default",
+ Type: "elasticsearch",
+ Name: "elasticsearch",
+ Source: integration.RequireNewStruct(t, map[string]interface{}{
+ "type": "elasticsearch",
+ "hosts": outputHosts,
+ "username": outputUsername,
+ "password": outputPassword,
+ "protocol": outputProtocol,
+ "enabled": true,
+ "ssl.verification_mode": "none",
+ }),
+ },
+ },
+ {
+ Id: "input-unit-1",
+ Type: proto.UnitType_INPUT,
+ ConfigStateIdx: 0,
+ State: proto.State_HEALTHY,
+ LogLevel: proto.UnitLogLevel_DEBUG,
+ Config: &proto.UnitExpectedConfig{
+ Id: "cel-cel-1e8b33de-d54a-45cd-90da-23ed71c482e5",
+ Type: "cel",
+ Name: "cel-1",
+ Source: integration.RequireNewStruct(t, map[string]interface{}{
+ "use_output": "default",
+ "revision": 0,
+ }),
+ DataStream: &proto.DataStream{
+ Namespace: "default",
+ },
+ Meta: &proto.Meta{
+ Package: &proto.Package{
+ Name: "cel",
+ Version: "1.9.0",
+ },
+ },
+ Streams: []*proto.Stream{
+ {
+ Id: "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2",
+ DataStream: &proto.DataStream{
+ Dataset: "cel.cel",
+ },
+ Source: integration.RequireNewStruct(t, map[string]interface{}{
+ "interval": "10s",
+ "program": `bytes(get(state.url).Body).as(body,{"events":[body.decode_json()]})`,
+ "redact.delete": false,
+ "regexp": nil,
+ "resource.url": svrOne.URL,
+ "publisher_pipeline.disable_host": true,
+ }),
+ },
+ },
+ },
+ },
+ }
+
+ // noStream is an elastic-agent configuration with just an ES output.
+ noStream := []*proto.UnitExpected{
+ {
+ Id: "output-unit",
+ Type: proto.UnitType_OUTPUT,
+ ConfigStateIdx: 0,
+ State: proto.State_HEALTHY,
+ LogLevel: proto.UnitLogLevel_INFO,
+ Config: &proto.UnitExpectedConfig{
+ Id: "default",
+ Type: "elasticsearch",
+ Name: "elasticsearch",
+ Source: integration.RequireNewStruct(t, map[string]interface{}{
+ "type": "elasticsearch",
+ "hosts": outputHosts,
+ "username": outputUsername,
+ "password": outputPassword,
+ "protocol": outputProtocol,
+ "enabled": true,
+ "ssl.verification_mode": "none",
+ }),
+ },
+ },
+ }
+
+ // elastic-agent management V2 mock server
+ observedStates := make(chan *proto.CheckinObserved)
+ expectedUnits := make(chan []*proto.UnitExpected)
+ done := make(chan struct{})
+ server := &mock.StubServerV2{
+ CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
+ select {
+ case observedStates <- observed:
+ return &proto.CheckinExpected{
+ Units: <-expectedUnits,
+ }
+ case <-done:
+ return nil
+ }
+ },
+ ActionImpl: func(response *proto.ActionResponse) error {
+ return nil
+ },
+ }
+ if err := server.Start(); err != nil {
+ t.Fatalf("failed to start StubServerV2 server: %v", err)
+ }
+ defer server.Stop()
+
+ // It's necessary to change os.Args so filebeat.Filebeat() can read the
+ // appropriate args at beat.Execute().
+ initialOSArgs := os.Args
+ os.Args = []string{
+ "filebeat",
+ "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port),
+ "-E", "management.enabled=true",
+ "-E", "management.restart_on_output_change=true",
+ }
+ defer func() {
+ os.Args = initialOSArgs
+ }()
+
+ beat := filebeat.Filebeat()
+ beatRunErr := make(chan error)
+ go func() {
+ defer close(beatRunErr)
+ beatRunErr <- beat.Execute()
+ }()
+
+ // slice of funcs that check if the observed states match the expected ones.
+ // They return true if they match and false if they don't as well as a slice
+ // of units expected for the server to respond with.
+ checks := []func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected){
+ func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) {
+ // Wait for all healthy.
+ unitState, payload := extractStateAndPayload(observed, "input-unit-1")
+ if unitState != proto.State_HEALTHY {
+ return false, allStreams
+ }
+
+ if !reflect.DeepEqual(map[string]interface{}{
+ "streams": map[string]interface{}{
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{
+ "status": "HEALTHY",
+ "error": "",
+ },
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{
+ "status": "HEALTHY",
+ "error": "",
+ },
+ },
+ }, payload) {
+ return false, allStreams
+ }
+
+ serverOneResponse = invalidResponse
+
+ return true, allStreams
+ },
+ func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) {
+ // Wait for one degraded.
+ unitState, payload := extractStateAndPayload(observed, "input-unit-1")
+ if unitState != proto.State_DEGRADED {
+ return false, allStreams
+ }
+
+ if !reflect.DeepEqual(map[string]interface{}{
+ "streams": map[string]interface{}{
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{
+ "status": "DEGRADED",
+ "error": "failed evaluation: failed eval: ERROR: :1:30: failed to unmarshal JSON message: invalid character 'i' looking for beginning of value\n | bytes(get(state.url).Body).as(body,{\"events\":[body.decode_json()]})\n | .............................^",
+ },
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{
+ "status": "HEALTHY",
+ "error": "",
+ },
+ },
+ }, payload) {
+ return false, allStreams
+ }
+
+ serverTwoResponse = invalidResponse
+ return true, allStreams
+ },
+ func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) {
+ // Wait for all degraded.
+ unitState, payload := extractStateAndPayload(observed, "input-unit-1")
+ if unitState != proto.State_DEGRADED {
+ return false, allStreams
+ }
+
+ if !reflect.DeepEqual(map[string]interface{}{
+ "streams": map[string]interface{}{
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{
+ "status": "DEGRADED",
+ "error": "failed evaluation: failed eval: ERROR: :1:30: failed to unmarshal JSON message: invalid character 'i' looking for beginning of value\n | bytes(get(state.url).Body).as(body,{\"events\":[body.decode_json()]})\n | .............................^",
+ },
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{
+ "status": "DEGRADED",
+ "error": "failed evaluation: failed eval: ERROR: :1:30: failed to unmarshal JSON message: invalid character 'i' looking for beginning of value\n | bytes(get(state.url).Body).as(body,{\"events\":[body.decode_json()]})\n | .............................^",
+ },
+ },
+ }, payload) {
+ return false, allStreams
+ }
+
+ serverOneResponse = validResponse
+ serverTwoResponse = validResponse
+ return true, allStreams
+ },
+ func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) {
+ // Wait for all healthy.
+ unitState, payload := extractStateAndPayload(observed, "input-unit-1")
+ if unitState != proto.State_HEALTHY {
+ return false, allStreams
+ }
+
+ if !reflect.DeepEqual(map[string]interface{}{
+ "streams": map[string]interface{}{
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{
+ "status": "HEALTHY",
+ "error": "",
+ },
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{
+ "status": "HEALTHY",
+ "error": "",
+ },
+ },
+ }, payload) {
+ return false, allStreams
+ }
+
+ serverTwoResponse = invalidResponse
+ return true, allStreams
+ },
+ func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) {
+ // Wait for all healthy.
+ unitState, payload := extractStateAndPayload(observed, "input-unit-1")
+ if unitState != proto.State_DEGRADED {
+ return false, allStreams
+ }
+
+ if !reflect.DeepEqual(map[string]interface{}{
+ "streams": map[string]interface{}{
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{
+ "status": "HEALTHY",
+ "error": "",
+ },
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{
+ "status": "DEGRADED",
+ "error": "failed evaluation: failed eval: ERROR: :1:30: failed to unmarshal JSON message: invalid character 'i' looking for beginning of value\n | bytes(get(state.url).Body).as(body,{\"events\":[body.decode_json()]})\n | .............................^",
+ },
+ },
+ }, payload) {
+ return false, allStreams
+ }
+
+ setInputUnitsConfigStateIdx(oneStream, 1)
+ return true, oneStream
+ },
+ func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) {
+ unitState, payload := extractStateAndPayload(observed, "input-unit-1")
+ if unitState != proto.State_HEALTHY {
+ return false, oneStream
+ }
+
+ if !reflect.DeepEqual(map[string]interface{}{
+ "streams": map[string]interface{}{
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{
+ "status": "HEALTHY",
+ "error": "",
+ },
+ },
+ }, payload) {
+ return false, oneStream
+ }
+ setInputUnitsConfigStateIdx(noStream, 2)
+ return true, noStream
+ },
+ func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) {
+ _, payload := extractStateAndPayload(observed, "input-unit-1")
+ if payload != nil {
+ return false, noStream
+ }
+
+ serverOneResponse = validResponse
+ serverTwoResponse = validResponse
+
+ setInputUnitsConfigStateIdx(allStreams, 3)
+ return true, allStreams
+ },
+ func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) {
+ // Wait for all healthy.
+ unitState, payload := extractStateAndPayload(observed, "input-unit-1")
+ if unitState != proto.State_HEALTHY {
+ return false, allStreams
+ }
+
+ if !reflect.DeepEqual(map[string]interface{}{
+ "streams": map[string]interface{}{
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2": map[string]interface{}{
+ "status": "HEALTHY",
+ "error": "",
+ },
+ "cel-cel.cel-1e8b33de-d54a-45cd-90da-ffffffc482e2": map[string]interface{}{
+ "status": "HEALTHY",
+ "error": "",
+ },
+ },
+ }, payload) {
+ return false, allStreams
+ }
+
+ setInputUnitsConfigStateIdx(noStream, 4)
+ return true, noStream
+ },
+ func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) {
+ _, payload := extractStateAndPayload(observed, "input-unit-1")
+ if payload != nil {
+ return false, noStream
+ }
+
+ return true, []*proto.UnitExpected{}
+ },
+ func(t *testing.T, observed *proto.CheckinObserved) (bool, []*proto.UnitExpected) {
+ return len(observed.Units) == 0, []*proto.UnitExpected{}
+ },
+ }
+
+ const wait = 3 * time.Minute
+ timer := time.NewTimer(wait)
+ defer timer.Stop()
+ for len(checks) > 0 {
+ select {
+ case observed := <-observedStates:
+ matched, expected := checks[0](t, observed)
+ expectedUnits <- expected
+ if !matched {
+ continue
+ }
+ timer.Reset(wait)
+ checks = checks[1:]
+ case err := <-beatRunErr:
+ if err != nil {
+ t.Fatalf("beat run err: %v", err)
+ }
+ case <-timer.C:
+ t.Fatal("timeout waiting for checkin")
+ }
+ }
+}
+
+func extractStateAndPayload(observed *proto.CheckinObserved, inputID string) (proto.State, map[string]interface{}) {
+ for _, unit := range observed.GetUnits() {
+ if unit.Id == inputID {
+ return unit.GetState(), unit.Payload.AsMap()
+ }
+ }
+
+ return -1, nil
+}
+
+func setInputUnitsConfigStateIdx(units []*proto.UnitExpected, idx uint64) {
+ for _, unit := range units {
+ if unit.Type != proto.UnitType_INPUT {
+ continue
+ }
+
+ if unit.Config == nil {
+ return
+ }
+ unit.ConfigStateIdx = idx
+ unit.Config.Source.Fields["revision"] = structpb.NewNumberValue(float64(idx))
+ }
+}
diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go
index 6152f4f5306f..ea59249012a8 100644
--- a/x-pack/libbeat/management/managerV2.go
+++ b/x-pack/libbeat/management/managerV2.go
@@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/features"
lbmanagement "github.com/elastic/beats/v7/libbeat/management"
+ "github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
@@ -37,7 +38,7 @@ import (
// since there's a type disagreement with the `client.DiagnosticHook` argument, and due to licensing issues we can't import the agent client types into the reloader
type diagnosticHandler struct {
log *logp.Logger
- client *client.Unit
+ client *agentUnit
}
func (handler diagnosticHandler) Register(name string, description string, filename string, contentType string, callback func() []byte) {
@@ -70,13 +71,13 @@ type BeatV2Manager struct {
// track individual units given to us by the V2 API
mx sync.Mutex
- units map[unitKey]*client.Unit
+ units map[unitKey]*agentUnit
actions []client.Action
forceReload bool
// status is reported as a whole for every unit sent to this component
// hopefully this can be improved in the future to be separated per unit
- status lbmanagement.Status
+ status status.Status
message string
payload map[string]interface{}
@@ -165,7 +166,8 @@ func NewV2AgentManager(config *conf.C, registry *reload.Registry) (lbmanagement.
Meta: map[string]string{
"commit": version.Commit(),
"build_time": version.BuildTime().String(),
- }}
+ },
+ }
var agentClient client.V2
var err error
if c.InsecureGRPCURLForTesting != "" && c.Enabled {
@@ -202,8 +204,8 @@ func NewV2AgentManagerWithClient(config *Config, registry *reload.Registry, agen
config: config,
logger: log.Named("V2-manager"),
registry: registry,
- units: make(map[unitKey]*client.Unit),
- status: lbmanagement.Running,
+ units: make(map[unitKey]*agentUnit),
+ status: status.Running,
message: "Healthy",
stopChan: make(chan struct{}, 1),
changeDebounce: time.Second,
@@ -241,7 +243,7 @@ func (cm *BeatV2Manager) RegisterDiagnosticHook(name string, description string,
}
// UpdateStatus updates the manager with the current status for the beat.
-func (cm *BeatV2Manager) UpdateStatus(status lbmanagement.Status, msg string) {
+func (cm *BeatV2Manager) UpdateStatus(status status.Status, msg string) {
cm.mx.Lock()
defer cm.mx.Unlock()
@@ -312,8 +314,8 @@ func (cm *BeatV2Manager) RegisterAction(action client.Action) {
for _, unit := range cm.units {
// actions are only registered on input units (not a requirement by Agent but
// don't see a need in beats to support actions on an output at the moment)
- if unit.Type() == client.UnitTypeInput {
- unit.RegisterAction(action)
+ if clientUnit := unit; clientUnit != nil && clientUnit.Type() == client.UnitTypeInput {
+ clientUnit.RegisterAction(action)
}
}
}
@@ -341,8 +343,8 @@ func (cm *BeatV2Manager) UnregisterAction(action client.Action) {
for _, unit := range cm.units {
// actions are only registered on input units (not a requirement by Agent but
// don't see a need in beats to support actions on an output at the moment)
- if unit.Type() == client.UnitTypeInput {
- unit.UnregisterAction(action)
+ if clientUnit := unit; clientUnit != nil && clientUnit.Type() == client.UnitTypeInput {
+ clientUnit.UnregisterAction(action)
}
}
}
@@ -364,7 +366,6 @@ func (cm *BeatV2Manager) SetPayload(payload map[string]interface{}) {
// Errors while starting/reloading inputs are already reported by unit, but
// the shutdown process is still not being handled by unit.
func (cm *BeatV2Manager) updateStatuses() {
- status := getUnitState(cm.status)
message := cm.message
payload := cm.payload
@@ -375,7 +376,7 @@ func (cm *BeatV2Manager) updateStatuses() {
// `reload` method and will be marked stopped in that code path)
continue
}
- err := unit.UpdateState(status, message, payload)
+ err := unit.UpdateState(cm.status, message, payload)
if err != nil {
cm.logger.Errorf("Failed to update unit %s status: %s", unit.ID(), err)
}
@@ -386,21 +387,29 @@ func (cm *BeatV2Manager) updateStatuses() {
// Unit manager
// ================================
-func (cm *BeatV2Manager) addUnit(unit *client.Unit) {
+func (cm *BeatV2Manager) upsertUnit(unit *client.Unit) {
cm.mx.Lock()
defer cm.mx.Unlock()
- cm.units[unitKey{unit.Type(), unit.ID()}] = unit
+
+ aUnit, ok := cm.units[unitKey{unit.Type(), unit.ID()}]
+ if ok {
+ aUnit.update(unit)
+ } else {
+ unitLogger := cm.logger.Named(fmt.Sprintf("state-unit-%s", unit.ID()))
+ aUnit = newAgentUnit(unit, unitLogger)
+ cm.units[unitKey{unit.Type(), unit.ID()}] = aUnit
+ }
// update specific unit to starting
- _ = unit.UpdateState(client.UnitStateStarting, "Starting", nil)
+ _ = aUnit.UpdateState(status.Starting, "Starting", nil)
// register the already registered actions (only on input units)
for _, action := range cm.actions {
- unit.RegisterAction(action)
+ aUnit.RegisterAction(action)
}
}
-func (cm *BeatV2Manager) modifyUnit(unit *client.Unit) {
+func (cm *BeatV2Manager) updateUnit(unit *client.Unit) {
// `unit` is already in `cm.units` no need to add it to the map again
// but the lock still needs to be held so reload can be triggered
cm.mx.Lock()
@@ -411,27 +420,32 @@ func (cm *BeatV2Manager) modifyUnit(unit *client.Unit) {
// is reflected here. As this deals with modifications, they're already present.
// Only the state needs to be updated.
+ aUnit, ok := cm.units[unitKey{unit.Type(), unit.ID()}]
+ if !ok {
+ cm.logger.Infof("BeatV2Manager.updateUnit Unit %s not found", unit.ID())
+ return
+ }
+
+ aUnit.update(unit)
+
expected := unit.Expected()
if expected.State == client.UnitStateStopped {
// expected to be stopped; needs to stop this unit
- _ = unit.UpdateState(client.UnitStateStopping, "Stopping", nil)
+ _ = aUnit.UpdateState(status.Stopping, "Stopping", nil)
} else {
// update specific unit to configuring
- _ = unit.UpdateState(client.UnitStateConfiguring, "Configuring", nil)
+ _ = aUnit.UpdateState(status.Configuring, "Configuring", nil)
}
}
-func (cm *BeatV2Manager) deleteUnit(unit *client.Unit) {
- // a unit will only be deleted once it has reported stopped so nothing
- // more needs to be done other than cleaning up the reference to the unit
+func (cm *BeatV2Manager) softDeleteUnit(unit *client.Unit) {
cm.mx.Lock()
- delete(cm.units, unitKey{unit.Type(), unit.ID()})
- empty := len(cm.units) == 0
- cm.mx.Unlock()
+ defer cm.mx.Unlock()
- // stop the entire beat when all units removed
- if empty && cm.stopOnEmptyUnits {
- cm.stopBeat()
+ key := unitKey{unit.Type(), unit.ID()}
+
+ if aUnit, ok := cm.units[key]; ok {
+ aUnit.markAsDeleted()
}
}
@@ -482,7 +496,7 @@ func (cm *BeatV2Manager) unitListen() {
cm.logger.Debug("Received sighup, stopping")
}
cm.isRunning = false
- cm.UpdateStatus(lbmanagement.Stopping, "Stopping")
+ cm.UpdateStatus(status.Stopping, "Stopping")
return
case change := <-cm.client.UnitChanges():
cm.logger.Infof(
@@ -494,26 +508,39 @@ func (cm *BeatV2Manager) unitListen() {
// Within the context of how we send config to beats, I'm not sure if there is a difference between
// A unit add and a unit change, since either way we can't do much more than call the reloader
case client.UnitChangedAdded:
- cm.addUnit(change.Unit)
+ cm.upsertUnit(change.Unit)
// reset can be called here because `<-t.C` is handled in the same select
t.Reset(cm.changeDebounce)
case client.UnitChangedModified:
- cm.modifyUnit(change.Unit)
+ cm.updateUnit(change.Unit)
// reset can be called here because `<-t.C` is handled in the same select
t.Reset(cm.changeDebounce)
case client.UnitChangedRemoved:
- cm.deleteUnit(change.Unit)
+ // necessary to soft-delete here and follow up with the actual deletion of units
+ // in `<-t.C` to avoid deleting a unit that will be re-created before `<-t.C`
+ // expires where the respective runners will not reload; actual deleting here
+ // can cause a runner to lose ref to a unit
+ cm.softDeleteUnit(change.Unit)
}
case <-t.C:
// a copy of the units is used for reload to prevent the holding of the `cm.mx`.
// it could be possible that sending the configuration to reload could cause the `UpdateStatus`
// to be called on the manager causing it to try and grab the `cm.mx` lock, causing a deadlock.
cm.mx.Lock()
- units := make(map[unitKey]*client.Unit, len(cm.units))
+ units := make(map[unitKey]*agentUnit, len(cm.units))
for k, u := range cm.units {
+ if u.softDeleted {
+ delete(cm.units, k)
+ continue
+ }
units[k] = u
}
cm.mx.Unlock()
+
+ if len(cm.units) == 0 && cm.stopOnEmptyUnits {
+ cm.stopBeat()
+ }
+
cm.reload(units)
if cm.forceReload {
// Restart the debounce timer so we try to reload the inputs.
@@ -528,7 +555,7 @@ func (cm *BeatV2Manager) stopBeat() {
return
}
cm.logger.Debugf("Stopping beat")
- cm.UpdateStatus(lbmanagement.Stopping, "Stopping")
+ cm.UpdateStatus(status.Stopping, "Stopping")
cm.isRunning = false
cm.stopMut.Lock()
@@ -539,19 +566,19 @@ func (cm *BeatV2Manager) stopBeat() {
cm.beatStop.Do(cm.stopFunc)
}
cm.client.Stop()
- cm.UpdateStatus(lbmanagement.Stopped, "Stopped")
+ cm.UpdateStatus(status.Stopped, "Stopped")
if cm.errCanceller != nil {
cm.errCanceller()
cm.errCanceller = nil
}
}
-func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) {
+func (cm *BeatV2Manager) reload(units map[unitKey]*agentUnit) {
lowestLevel := client.UnitLogLevelError
- var outputUnit *client.Unit
- var inputUnits []*client.Unit
- var stoppingUnits []*client.Unit
- healthyInputs := map[string]*client.Unit{}
+ var outputUnit *agentUnit
+ var inputUnits []*agentUnit
+ var stoppingUnits []*agentUnit
+ healthyInputs := map[string]*agentUnit{}
unitErrors := map[string][]error{}
// as the very last action, set the state of the failed units
@@ -559,7 +586,7 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) {
for _, unit := range units {
errs := unitErrors[unit.ID()]
if len(errs) != 0 {
- _ = unit.UpdateState(client.UnitStateFailed, errors.Join(errs...).Error(), nil)
+ _ = unit.UpdateState(status.Failed, errors.Join(errs...).Error(), nil)
}
}
}()
@@ -631,14 +658,14 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) {
cm.logger.Errorw("could not start output", "error", err)
msg := fmt.Sprintf("could not start output: %s", err)
- if err := outputUnit.UpdateState(client.UnitStateFailed, msg, nil); err != nil {
+ if err := outputUnit.UpdateState(status.Failed, msg, nil); err != nil {
cm.logger.Errorw("setting output state", "error", err)
}
return
}
- if err := outputUnit.UpdateState(client.UnitStateHealthy, "Healthy", nil); err != nil {
+ if err := outputUnit.UpdateState(status.Running, "Healthy", nil); err != nil {
cm.logger.Errorw("setting output state", "error", err)
}
@@ -661,7 +688,7 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) {
// report the stopping units as stopped
for _, unit := range stoppingUnits {
- _ = unit.UpdateState(client.UnitStateStopped, "Stopped", nil)
+ _ = unit.UpdateState(status.Stopped, "Stopped", nil)
}
// now update the statuses of all units that contain only healthy
@@ -675,7 +702,7 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) {
continue
}
- err := unit.UpdateState(client.UnitStateHealthy, "Healthy", nil)
+ err := unit.UpdateState(status.Running, "Healthy", nil)
if err != nil {
cm.logger.Errorf("Failed to update unit %s status: %s", unit.ID(), err)
}
@@ -688,7 +715,7 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*client.Unit) {
//
// In any other case, the bool is always false and the error will be non nil
// if any error has occurred.
-func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) (bool, error) {
+func (cm *BeatV2Manager) reloadOutput(unit *agentUnit) (bool, error) {
// Assuming that the output reloadable isn't a list, see createBeater() in cmd/instance/beat.go
output := cm.registry.GetReloadableOutput()
if output == nil {
@@ -722,7 +749,7 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) (bool, error) {
if cm.stopOnOutputReload && cm.lastOutputCfg != nil {
cm.logger.Info("beat is restarting because output changed")
- _ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil)
+ _ = unit.UpdateState(status.Stopping, "Restarting", nil)
cm.Stop()
return true, nil
}
@@ -745,7 +772,7 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) (bool, error) {
return false, nil
}
-func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
+func (cm *BeatV2Manager) reloadInputs(inputUnits []*agentUnit) error {
obj := cm.registry.GetInputList()
if obj == nil {
return fmt.Errorf("failed to find beat reloadable type 'input'")
@@ -768,9 +795,10 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
}
// add diag callbacks for unit
// we want to add the diagnostic handler that's specific to the unit, and not the gobal diagnostic handler
- for _, in := range inputCfg {
+ for idx, in := range inputCfg {
in.DiagCallback = diagnosticHandler{client: unit, log: cm.logger.Named("diagnostic-manager")}
in.InputUnitID = unit.ID()
+ in.StatusReporter = unit.GetReporterForStreamByIndex(idx)
}
inputCfgs[unit.ID()] = expected.Config
inputBeatCfgs = append(inputBeatCfgs, inputCfg...)
@@ -892,30 +920,6 @@ func (cm *BeatV2Manager) handleDebugYaml() []byte {
return data
}
-func getUnitState(status lbmanagement.Status) client.UnitState {
- switch status {
- case lbmanagement.Unknown:
- // must be started if its unknown
- return client.UnitStateStarting
- case lbmanagement.Starting:
- return client.UnitStateStarting
- case lbmanagement.Configuring:
- return client.UnitStateConfiguring
- case lbmanagement.Running:
- return client.UnitStateHealthy
- case lbmanagement.Degraded:
- return client.UnitStateDegraded
- case lbmanagement.Failed:
- return client.UnitStateFailed
- case lbmanagement.Stopping:
- return client.UnitStateStopping
- case lbmanagement.Stopped:
- return client.UnitStateStopped
- }
- // unknown again?
- return client.UnitStateStarting
-}
-
func getZapcoreLevel(ll client.UnitLogLevel) (zapcore.Level, bool) {
switch ll {
case client.UnitLogLevelError:
diff --git a/x-pack/libbeat/management/unit.go b/x-pack/libbeat/management/unit.go
new file mode 100644
index 000000000000..729be62c77a7
--- /dev/null
+++ b/x-pack/libbeat/management/unit.go
@@ -0,0 +1,414 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package management
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/elastic/beats/v7/libbeat/management/status"
+ "github.com/elastic/elastic-agent-client/v7/pkg/client"
+ "github.com/elastic/elastic-agent-libs/logp"
+)
+
+// unitState is the current state of a unit
+type unitState struct {
+ state status.Status
+ msg string
+}
+
+type clientUnit interface {
+ ID() string
+ Type() client.UnitType
+ Expected() client.Expected
+ UpdateState(state client.UnitState, message string, payload map[string]interface{}) error
+ RegisterAction(action client.Action)
+ UnregisterAction(action client.Action)
+ RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook)
+}
+
+// agentUnit implements status.StatusReporter and holds an unitState
+// for the input as well as a unitState for each stream of
+// the input in when this a client.UnitTypeInput.
+type agentUnit struct {
+ softDeleted bool
+ mtx sync.Mutex
+ logger *logp.Logger
+ clientUnit clientUnit
+ inputLevelState unitState
+ streamIDs []string
+ streamStates map[string]unitState
+}
+
+// getUnitState converts status.Status to client.UnitState
+func getUnitState(s status.Status) client.UnitState {
+ switch s {
+ case status.Unknown:
+ // must be started if its unknown
+ return client.UnitStateStarting
+ case status.Starting:
+ return client.UnitStateStarting
+ case status.Configuring:
+ return client.UnitStateConfiguring
+ case status.Running:
+ return client.UnitStateHealthy
+ case status.Degraded:
+ return client.UnitStateDegraded
+ case status.Failed:
+ return client.UnitStateFailed
+ case status.Stopping:
+ return client.UnitStateStopping
+ case status.Stopped:
+ return client.UnitStateStopped
+ default:
+ // as this is an unknown state, return failed to get some attention
+ return client.UnitStateFailed
+ }
+}
+
+// getUnitState converts status.Status to client.UnitState
+func getStatus(s client.UnitState) status.Status {
+ switch s {
+ case client.UnitStateStarting:
+ return status.Starting
+ case client.UnitStateConfiguring:
+ return status.Configuring
+ case client.UnitStateHealthy:
+ return status.Running
+ case client.UnitStateDegraded:
+ return status.Degraded
+ case client.UnitStateFailed:
+ return status.Failed
+ case client.UnitStateStopping:
+ return status.Stopping
+ case client.UnitStateStopped:
+ return status.Stopped
+ default:
+ return status.Unknown
+ }
+}
+
+func getStreamStates(expected client.Expected) (map[string]unitState, []string) {
+ expectedCfg := expected.Config
+
+ if expectedCfg == nil {
+ return nil, nil
+ }
+
+ streamStates := make(map[string]unitState, len(expectedCfg.Streams))
+ streamIDs := make([]string, len(expectedCfg.Streams))
+
+ for idx, stream := range expectedCfg.Streams {
+ streamState := unitState{
+ state: status.Unknown,
+ msg: "",
+ }
+
+ if id := stream.GetId(); id != "" {
+ streamIDs[idx] = id
+ streamStates[id] = streamState
+ continue
+ }
+
+ if cfgName := expectedCfg.GetName(); cfgName != "" {
+ id := fmt.Sprintf("%s.[%d]", cfgName, idx)
+ streamIDs[idx] = id
+ streamStates[id] = streamState
+ continue
+ }
+
+ id := fmt.Sprintf("%s.[%d]", expectedCfg.GetId(), idx)
+ streamIDs[idx] = id
+ streamStates[id] = streamState
+ }
+
+ return streamStates, streamIDs
+}
+
+// newAgentUnit creates a new agentUnit. In case the supplied client.Unit is of type
+// client.UnitTypeInput it initializes the streamStates with a unitState.Unknown
+func newAgentUnit(cu clientUnit, log *logp.Logger) *agentUnit {
+ var (
+ streamStates map[string]unitState
+ streamIDs []string
+ )
+
+ if cu.Type() == client.UnitTypeInput {
+ streamStates, streamIDs = getStreamStates(cu.Expected())
+ }
+
+ return &agentUnit{
+ clientUnit: cu,
+ logger: log,
+ streamIDs: streamIDs,
+ streamStates: streamStates,
+ }
+}
+
+// RegisterAction registers action handler for this unit.
+func (u *agentUnit) RegisterAction(action client.Action) {
+ u.mtx.Lock()
+ defer u.mtx.Unlock()
+
+ if u.clientUnit == nil {
+ return
+ }
+
+ u.clientUnit.RegisterAction(action)
+}
+
+// UnregisterAction unregisters action handler with the client.
+func (u *agentUnit) UnregisterAction(action client.Action) {
+ u.mtx.Lock()
+ defer u.mtx.Unlock()
+
+ if u.clientUnit == nil {
+ return
+ }
+
+ u.clientUnit.UnregisterAction(action)
+}
+
+func (u *agentUnit) RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook) {
+ u.mtx.Lock()
+ defer u.mtx.Unlock()
+
+ if u.clientUnit == nil {
+ return
+ }
+
+ u.clientUnit.RegisterDiagnosticHook(name, description, filename, contentType, hook)
+}
+
+func (u *agentUnit) Expected() client.Expected {
+ u.mtx.Lock()
+ defer u.mtx.Unlock()
+
+ if u.clientUnit == nil {
+ return client.Expected{}
+ }
+
+ return u.clientUnit.Expected()
+}
+
+func (u *agentUnit) ID() string {
+ u.mtx.Lock()
+ defer u.mtx.Unlock()
+
+ if u.clientUnit == nil {
+ return ""
+ }
+
+ return u.clientUnit.ID()
+}
+
+// calcState calculates the current state of the unit.
+func (u *agentUnit) calcState() (status.Status, string) {
+ // for type output return the unit state directly as it has no streams
+ if u.clientUnit.Type() == client.UnitTypeOutput {
+ return u.inputLevelState.state, u.inputLevelState.msg
+ }
+
+ // if inputLevelState state is not running return the inputLevelState state
+ if u.inputLevelState.state != status.Running {
+ return u.inputLevelState.state, u.inputLevelState.msg
+ }
+
+ // inputLevelState state is marked as running, check the stream states
+ reportedStatus := status.Running
+ reportedMsg := "Healthy"
+ for _, streamState := range u.streamStates {
+ switch streamState.state {
+ case status.Degraded:
+ if reportedStatus != status.Degraded {
+ reportedStatus = status.Degraded
+ reportedMsg = streamState.msg
+ }
+ case status.Failed:
+ // return the first failed stream
+ return streamState.state, streamState.msg
+ }
+ }
+
+ return reportedStatus, reportedMsg
+}
+
+// Type of the unit.
+func (u *agentUnit) Type() client.UnitType {
+ u.mtx.Lock()
+ defer u.mtx.Unlock()
+
+ if u.clientUnit == nil {
+ return client.UnitTypeInput
+ }
+
+ return u.clientUnit.Type()
+}
+
+// UpdateState updates the state for the unit.
+func (u *agentUnit) UpdateState(state status.Status, msg string, payload map[string]interface{}) error {
+ u.mtx.Lock()
+ defer u.mtx.Unlock()
+
+ if u.clientUnit == nil {
+ return nil
+ }
+
+ if u.inputLevelState.state == state && u.inputLevelState.msg == msg {
+ return nil
+ }
+
+ u.inputLevelState = unitState{
+ state: state,
+ msg: msg,
+ }
+
+ state, msg = u.calcState()
+
+ if u.clientUnit.Type() == client.UnitTypeOutput || len(u.streamStates) == 0 {
+ return u.clientUnit.UpdateState(getUnitState(state), msg, payload)
+ }
+
+ streamsPayload := make(map[string]interface{}, len(u.streamStates))
+
+ for streamID, streamState := range u.streamStates {
+ streamsPayload[streamID] = map[string]interface{}{
+ "status": getUnitState(streamState.state).String(),
+ "error": streamState.msg,
+ }
+ }
+
+ if payload == nil {
+ payload = make(map[string]interface{})
+ }
+
+ payload["streams"] = streamsPayload
+
+ return u.clientUnit.UpdateState(getUnitState(state), msg, payload)
+}
+
+// updateStateForStream updates the state for a specific stream in the agent unit.
+func (u *agentUnit) updateStateForStream(streamID string, state status.Status, msg string) {
+ u.mtx.Lock()
+ defer u.mtx.Unlock()
+
+ if u.clientUnit == nil || u.streamStates == nil {
+ return
+ }
+
+ if _, ok := u.streamStates[streamID]; !ok {
+ return
+ }
+
+ if u.streamStates[streamID].state == state {
+ return
+ }
+
+ u.streamStates[streamID] = unitState{
+ state: state,
+ msg: msg,
+ }
+
+ state, msg = u.calcState()
+
+ streamsPayload := make(map[string]interface{}, len(u.streamStates))
+
+ for id, streamState := range u.streamStates {
+ streamsPayload[id] = map[string]interface{}{
+ "status": getUnitState(streamState.state).String(),
+ "error": streamState.msg,
+ }
+ }
+
+ payload := map[string]interface{}{
+ "streams": streamsPayload,
+ }
+
+ if err := u.clientUnit.UpdateState(getUnitState(state), msg, payload); err != nil {
+ u.logger.Warnf("failed to update state for input %s: %v", u.ID(), err)
+ }
+}
+
+func (u *agentUnit) update(cu *client.Unit) {
+ u.mtx.Lock()
+ defer u.mtx.Unlock()
+
+ u.softDeleted = false
+ u.clientUnit = cu
+
+ inputStatus := getStatus(cu.Expected().State)
+ if u.inputLevelState.state != inputStatus {
+ u.inputLevelState = unitState{
+ state: inputStatus,
+ }
+ }
+
+ newStreamStates, newStreamIDs := getStreamStates(cu.Expected())
+
+ for key, state := range newStreamStates {
+ if _, exists := u.streamStates[key]; exists {
+ continue
+ }
+
+ u.streamStates[key] = state
+ }
+
+ for key := range u.streamStates {
+ if _, exists := newStreamStates[key]; !exists {
+ delete(u.streamStates, key)
+ }
+ }
+
+ switch {
+ case len(newStreamIDs) != len(u.streamIDs):
+ u.streamIDs = newStreamIDs
+ default:
+ for idx, streamID := range u.streamIDs {
+ if newStreamIDs[idx] != streamID {
+ u.streamIDs = newStreamIDs
+ break
+ }
+ }
+ }
+}
+
+func (u *agentUnit) markAsDeleted() {
+ u.mtx.Lock()
+ defer u.mtx.Unlock()
+
+ u.softDeleted = true
+}
+
+// GetReporterForStreamByIndex returns a status reporter for the stream at the given index.
+// Note if the index is out of range it returns nil. It is up to the caller to check the return value.
+func (u *agentUnit) GetReporterForStreamByIndex(idx int) status.StatusReporter {
+ u.mtx.Lock()
+ defer u.mtx.Unlock()
+
+ if idx >= len(u.streamIDs) {
+ return nil
+ }
+
+ return &streamStatusReporter{
+ id: u.streamIDs[idx],
+ unit: u,
+ }
+}
+
+// streamStatusReporter implements status.StatusReporter
+type streamStatusReporter struct {
+ id string
+ unit *agentUnit
+}
+
+// UpdateStatus updates the status of the stream unit.
+func (s *streamStatusReporter) UpdateStatus(state status.Status, msg string) {
+ s.unit.updateStateForStream(s.id, state, msg)
+}
+
+// ID of the stream unit.
+func (s *streamStatusReporter) ID() string {
+ return s.id
+}
diff --git a/x-pack/libbeat/management/unit_test.go b/x-pack/libbeat/management/unit_test.go
new file mode 100644
index 000000000000..9684ff5e16e9
--- /dev/null
+++ b/x-pack/libbeat/management/unit_test.go
@@ -0,0 +1,215 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License;
+// you may not use this file except in compliance with the Elastic License.
+
+package management
+
+import (
+ "testing"
+
+ "github.com/elastic/elastic-agent-client/v7/pkg/client"
+ "github.com/elastic/elastic-agent-client/v7/pkg/proto"
+
+ "github.com/elastic/beats/v7/libbeat/management/status"
+)
+
+func TestUnitUpdate(t *testing.T) {
+
+ type StatusUpdate struct {
+ status status.Status
+ msg string
+ }
+
+ const (
+ Healthy = "Healthy"
+ Failed = "Failed"
+ Degraded = "Degraded"
+ )
+
+ unitCfg := &mockClientUnit{
+ expected: client.Expected{
+ Config: &proto.UnitExpectedConfig{
+ Id: "inputLevelState-1",
+ Streams: []*proto.Stream{
+ {Id: "stream-1"},
+ {Id: "stream-2"},
+ },
+ },
+ },
+ }
+
+ cases := []struct {
+ name string
+ unit *mockClientUnit
+ inputLevelStatus StatusUpdate
+ streamStates map[string]StatusUpdate
+ expectedUnitStatus client.UnitState
+ expectedUnitMsg string
+ }{
+ {
+ name: "all running",
+ unit: unitCfg,
+ inputLevelStatus: StatusUpdate{status.Running, Healthy},
+ streamStates: map[string]StatusUpdate{
+ "stream-1": {status.Running, Healthy},
+ "stream-2": {status.Running, Healthy},
+ },
+ expectedUnitStatus: client.UnitStateHealthy,
+ expectedUnitMsg: Healthy,
+ },
+ {
+ name: "inputLevelState failed",
+ unit: unitCfg,
+ inputLevelStatus: StatusUpdate{status.Failed, Failed},
+ streamStates: map[string]StatusUpdate{
+ "stream-1": {status.Running, Healthy},
+ "stream-2": {status.Running, Healthy},
+ },
+ expectedUnitStatus: client.UnitStateFailed,
+ expectedUnitMsg: Failed,
+ },
+ {
+ name: "inputLevelState stopping",
+ unit: unitCfg,
+ inputLevelStatus: StatusUpdate{status.Stopping, ""},
+ streamStates: map[string]StatusUpdate{
+ "stream-1": {status.Running, Healthy},
+ "stream-2": {status.Running, Healthy},
+ },
+ expectedUnitStatus: client.UnitStateStopping,
+ expectedUnitMsg: "",
+ },
+ {
+ name: "inputLevelState configuring",
+ unit: unitCfg,
+ inputLevelStatus: StatusUpdate{status.Configuring, ""},
+ streamStates: map[string]StatusUpdate{
+ "stream-1": {status.Running, Healthy},
+ "stream-2": {status.Running, Healthy},
+ },
+ expectedUnitStatus: client.UnitStateConfiguring,
+ expectedUnitMsg: "",
+ },
+ {
+ name: "inputLevelState starting",
+ unit: unitCfg,
+ inputLevelStatus: StatusUpdate{status.Starting, ""},
+ streamStates: map[string]StatusUpdate{
+ "stream-1": {status.Running, Healthy},
+ "stream-2": {status.Running, Healthy},
+ },
+ expectedUnitStatus: client.UnitStateStarting,
+ expectedUnitMsg: "",
+ },
+ {
+ name: "inputLevelState degraded",
+ unit: unitCfg,
+ inputLevelStatus: StatusUpdate{status.Degraded, Degraded},
+ streamStates: map[string]StatusUpdate{
+ "stream-1": {status.Running, Healthy},
+ "stream-2": {status.Running, Healthy},
+ },
+ expectedUnitStatus: client.UnitStateDegraded,
+ expectedUnitMsg: Degraded,
+ },
+ {
+ name: "one stream failed the other running",
+ unit: unitCfg,
+ inputLevelStatus: StatusUpdate{status.Running, Healthy},
+ streamStates: map[string]StatusUpdate{
+ "stream-1": {status.Failed, Failed},
+ "stream-2": {status.Running, Healthy},
+ },
+ expectedUnitStatus: client.UnitStateFailed,
+ expectedUnitMsg: Failed,
+ },
+ {
+ name: "one stream failed the other degraded",
+ unit: unitCfg,
+ inputLevelStatus: StatusUpdate{status.Running, Healthy},
+ streamStates: map[string]StatusUpdate{
+ "stream-1": {status.Failed, Failed},
+ "stream-2": {status.Degraded, Degraded},
+ },
+ expectedUnitStatus: client.UnitStateFailed,
+ expectedUnitMsg: Failed,
+ },
+ {
+ name: "one stream running the other degraded",
+ unit: unitCfg,
+ inputLevelStatus: StatusUpdate{status.Running, Healthy},
+ streamStates: map[string]StatusUpdate{
+ "stream-1": {status.Running, Healthy},
+ "stream-2": {status.Degraded, Degraded},
+ },
+ expectedUnitStatus: client.UnitStateDegraded,
+ expectedUnitMsg: Degraded,
+ },
+ {
+ name: "both streams degraded",
+ unit: unitCfg,
+ inputLevelStatus: StatusUpdate{status.Running, Healthy},
+ streamStates: map[string]StatusUpdate{
+ "stream-1": {status.Degraded, Degraded},
+ "stream-2": {status.Degraded, Degraded},
+ },
+ expectedUnitStatus: client.UnitStateDegraded,
+ expectedUnitMsg: Degraded,
+ },
+ }
+
+ for _, c := range cases {
+ t.Run(c.name, func(t *testing.T) {
+ aUnit := newAgentUnit(c.unit, nil)
+ err := aUnit.UpdateState(c.inputLevelStatus.status, c.inputLevelStatus.msg, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ for id, state := range c.streamStates {
+ aUnit.updateStateForStream(id, state.status, state.msg)
+ }
+
+ if c.unit.reportedState != c.expectedUnitStatus {
+ t.Errorf("expected unit status %s, got %s", c.expectedUnitStatus, aUnit.inputLevelState.state)
+ }
+
+ if c.unit.reportedMsg != c.expectedUnitMsg {
+ t.Errorf("expected unit msg %s, got %s", c.expectedUnitStatus, aUnit.inputLevelState.state)
+ }
+ })
+ }
+}
+
+type mockClientUnit struct {
+ expected client.Expected
+ reportedState client.UnitState
+ reportedMsg string
+}
+
+func (u *mockClientUnit) Expected() client.Expected {
+ return u.expected
+}
+
+func (u *mockClientUnit) UpdateState(state client.UnitState, msg string, _ map[string]interface{}) error {
+ u.reportedState = state
+ u.reportedMsg = msg
+ return nil
+}
+
+func (u *mockClientUnit) ID() string {
+ return "inputLevelState-1"
+}
+
+func (u *mockClientUnit) Type() client.UnitType {
+ return client.UnitTypeInput
+}
+
+func (u *mockClientUnit) RegisterAction(_ client.Action) {
+}
+
+func (u *mockClientUnit) UnregisterAction(_ client.Action) {
+}
+
+func (u *mockClientUnit) RegisterDiagnosticHook(_ string, _ string, _ string, _ string, _ client.DiagnosticHook) {
+}