From 2595db9b6e916556d8785a9d5573131028d87476 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 31 Jan 2025 10:48:37 -0500 Subject: [PATCH] Add status reporting for Journald input (#42462) This commit adds the status reporting for the Journald input. It also adds a debug log to the `UpdateStatus` function from `v2.Context`. (cherry picked from commit 76f40863c9918ee1a691a2bb29cbab98a5d83878) --- CHANGELOG.next.asciidoc | 1 + filebeat/input/journald/environment_test.go | 62 ++++++++++++++++++--- filebeat/input/journald/input.go | 16 +++++- filebeat/input/journald/input_test.go | 28 ++++++++++ filebeat/input/v2/input.go | 1 + 5 files changed, 96 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 444c2f6d0767..d7138f4bbc41 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -329,6 +329,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212] - Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225] - Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804] +- Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462] *Auditbeat* diff --git a/filebeat/input/journald/environment_test.go b/filebeat/input/journald/environment_test.go index 9ea77d017d15..5b6a8fcf35c2 100644 --- a/filebeat/input/journald/environment_test.go +++ b/filebeat/input/journald/environment_test.go @@ -32,6 +32,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" conf "github.com/elastic/elastic-agent-libs/config" @@ -40,10 +41,11 @@ import ( ) type inputTestingEnvironment struct { - t *testing.T - workingDir string - stateStore *testInputStore - pipeline *mockPipelineConnector + t *testing.T + workingDir string + stateStore *testInputStore + pipeline *mockPipelineConnector + statusReporter *mockStatusReporter pluginInitOnce sync.Once plugin v2.Plugin @@ -54,10 +56,11 @@ type inputTestingEnvironment struct { func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment { return &inputTestingEnvironment{ - t: t, - workingDir: t.TempDir(), - stateStore: openTestStatestore(), - pipeline: &mockPipelineConnector{}, + t: t, + workingDir: t.TempDir(), + stateStore: openTestStatestore(), + pipeline: &mockPipelineConnector{}, + statusReporter: &mockStatusReporter{}, } } @@ -95,7 +98,7 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) } }() - inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx} + inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, StatusReporter: e.statusReporter} if err := inp.Run(inputCtx, e.pipeline); err != nil { e.t.Errorf("input 'Run' method returned an error: %s", err) } @@ -125,6 +128,25 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) { }, 5*time.Second, 10*time.Millisecond, &msg) } +func (e *inputTestingEnvironment) RequireStatuses(expected []statusUpdate) { + t := e.t + t.Helper() + got := e.statusReporter.GetUpdates() + if len(got) != len(expected) { + t.Fatalf("expecting %d updates, got %d", len(expected), len(got)) + } + + for i := range expected { + g, e := got[i], expected[i] + if g != e { + t.Errorf( + "expecting [%d] status update to be {state:%s, msg:%s}, got {state:%s, msg:%s}", + i, e.state.String(), e.msg, g.state.String(), g.msg, + ) + } + } +} + type testInputStore struct { registry *statestore.Registry } @@ -251,3 +273,25 @@ func blockingACKer(starter context.Context) beat.EventListener { } }) } + +type statusUpdate struct { + state status.Status + msg string +} + +type mockStatusReporter struct { + mutex sync.RWMutex + updates []statusUpdate +} + +func (m *mockStatusReporter) UpdateStatus(status status.Status, msg string) { + m.mutex.Lock() + m.updates = append(m.updates, statusUpdate{status, msg}) + m.mutex.Unlock() +} + +func (m *mockStatusReporter) GetUpdates() []statusUpdate { + m.mutex.RLock() + defer m.mutex.RUnlock() + return append([]statusUpdate{}, m.updates...) +} diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index 0ab3c5481775..da019832c4fc 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -29,6 +29,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/parser" conf "github.com/elastic/elastic-agent-libs/config" @@ -154,6 +155,8 @@ func (inp *journald) Run( logger := ctx.Logger. With("path", src.Name()). With("input_id", inp.ID) + + ctx.UpdateStatus(status.Starting, "Starting") currentCheckpoint := initCheckpoint(logger, cursor) mode := inp.Seek @@ -173,7 +176,9 @@ func (inp *journald) Run( journalctl.Factory, ) if err != nil { - return fmt.Errorf("could not start journal reader: %w", err) + wrappedErr := fmt.Errorf("could not start journal reader: %w", err) + ctx.UpdateStatus(status.Failed, wrappedErr.Error()) + return wrappedErr } defer reader.Close() @@ -186,6 +191,7 @@ func (inp *journald) Run( saveRemoteHostname: inp.SaveRemoteHostname, }) + ctx.UpdateStatus(status.Running, "Running") for { entry, err := parser.Next() if err != nil { @@ -197,14 +203,18 @@ func (inp *journald) Run( case errors.Is(err, journalctl.ErrRestarting): continue default: - logger.Errorf("could not read event: %s", err) + msg := fmt.Sprintf("could not read event: %s", err) + ctx.UpdateStatus(status.Failed, msg) + logger.Error(msg) return err } } event := entry.ToEvent() if err := publisher.Publish(event, event.Private); err != nil { - logger.Errorf("could not publish event: %s", err) + msg := fmt.Sprintf("could not publish event: %s", err) + ctx.UpdateStatus(status.Failed, msg) + logger.Errorf(msg) return err } } diff --git a/filebeat/input/journald/input_test.go b/filebeat/input/journald/input_test.go index 776115d5d8ac..731f7efb3be3 100644 --- a/filebeat/input/journald/input_test.go +++ b/filebeat/input/journald/input_test.go @@ -37,6 +37,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -330,6 +331,33 @@ func TestReaderAdapterCanHandleNonStringFields(t *testing.T) { } } +func TestInputCanReportStatus(t *testing.T) { + out := decompress(t, filepath.Join("testdata", "multiple-boots.journal.gz")) + + env := newInputTestingEnvironment(t) + cfg := mapstr.M{ + "paths": []string{out}, + } + inp := env.mustCreateInput(cfg) + + ctx, cancelInput := context.WithCancel(context.Background()) + t.Cleanup(cancelInput) + + env.startInput(ctx, inp) + env.waitUntilEventCount(6) + + env.RequireStatuses([]statusUpdate{ + { + state: status.Starting, + msg: "Starting", + }, + { + state: status.Running, + msg: "Running", + }, + }) +} + func decompress(t *testing.T, namegz string) string { t.Helper() diff --git a/filebeat/input/v2/input.go b/filebeat/input/v2/input.go index f62cc149a936..63824d776301 100644 --- a/filebeat/input/v2/input.go +++ b/filebeat/input/v2/input.go @@ -97,6 +97,7 @@ type Context struct { func (c Context) UpdateStatus(status status.Status, msg string) { if c.StatusReporter != nil { + c.Logger.Debugf("updating status, status: '%s', message: '%s'", status.String(), msg) c.StatusReporter.UpdateStatus(status, msg) } }