Skip to content

Commit

Permalink
Add status reporting for Journald input (#42462)
Browse files Browse the repository at this point in the history
This commit adds the status reporting for the Journald input. It also
adds a debug log to the `UpdateStatus` function from `v2.Context`.

(cherry picked from commit 76f4086)
  • Loading branch information
belimawr authored and mergify[bot] committed Jan 31, 2025
1 parent 9550d0d commit 2595db9
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]
- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804]
- Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462]

*Auditbeat*

Expand Down
62 changes: 53 additions & 9 deletions filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
conf "github.com/elastic/elastic-agent-libs/config"
Expand All @@ -40,10 +41,11 @@ import (
)

type inputTestingEnvironment struct {
t *testing.T
workingDir string
stateStore *testInputStore
pipeline *mockPipelineConnector
t *testing.T
workingDir string
stateStore *testInputStore
pipeline *mockPipelineConnector
statusReporter *mockStatusReporter

pluginInitOnce sync.Once
plugin v2.Plugin
Expand All @@ -54,10 +56,11 @@ type inputTestingEnvironment struct {

func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
return &inputTestingEnvironment{
t: t,
workingDir: t.TempDir(),
stateStore: openTestStatestore(),
pipeline: &mockPipelineConnector{},
t: t,
workingDir: t.TempDir(),
stateStore: openTestStatestore(),
pipeline: &mockPipelineConnector{},
statusReporter: &mockStatusReporter{},
}
}

Expand Down Expand Up @@ -95,7 +98,7 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input)
}
}()

inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx}
inputCtx := v2.Context{Logger: logp.L(), Cancelation: ctx, StatusReporter: e.statusReporter}
if err := inp.Run(inputCtx, e.pipeline); err != nil {
e.t.Errorf("input 'Run' method returned an error: %s", err)
}
Expand Down Expand Up @@ -125,6 +128,25 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
}, 5*time.Second, 10*time.Millisecond, &msg)
}

func (e *inputTestingEnvironment) RequireStatuses(expected []statusUpdate) {
t := e.t
t.Helper()
got := e.statusReporter.GetUpdates()
if len(got) != len(expected) {
t.Fatalf("expecting %d updates, got %d", len(expected), len(got))
}

for i := range expected {
g, e := got[i], expected[i]
if g != e {
t.Errorf(
"expecting [%d] status update to be {state:%s, msg:%s}, got {state:%s, msg:%s}",
i, e.state.String(), e.msg, g.state.String(), g.msg,
)
}
}
}

type testInputStore struct {
registry *statestore.Registry
}
Expand Down Expand Up @@ -251,3 +273,25 @@ func blockingACKer(starter context.Context) beat.EventListener {
}
})
}

type statusUpdate struct {
state status.Status
msg string
}

type mockStatusReporter struct {
mutex sync.RWMutex
updates []statusUpdate
}

func (m *mockStatusReporter) UpdateStatus(status status.Status, msg string) {
m.mutex.Lock()
m.updates = append(m.updates, statusUpdate{status, msg})
m.mutex.Unlock()
}

func (m *mockStatusReporter) GetUpdates() []statusUpdate {
m.mutex.RLock()
defer m.mutex.RUnlock()
return append([]statusUpdate{}, m.updates...)
}
16 changes: 13 additions & 3 deletions filebeat/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
input "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/parser"
conf "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -154,6 +155,8 @@ func (inp *journald) Run(
logger := ctx.Logger.
With("path", src.Name()).
With("input_id", inp.ID)

ctx.UpdateStatus(status.Starting, "Starting")
currentCheckpoint := initCheckpoint(logger, cursor)

mode := inp.Seek
Expand All @@ -173,7 +176,9 @@ func (inp *journald) Run(
journalctl.Factory,
)
if err != nil {
return fmt.Errorf("could not start journal reader: %w", err)
wrappedErr := fmt.Errorf("could not start journal reader: %w", err)
ctx.UpdateStatus(status.Failed, wrappedErr.Error())
return wrappedErr
}

defer reader.Close()
Expand All @@ -186,6 +191,7 @@ func (inp *journald) Run(
saveRemoteHostname: inp.SaveRemoteHostname,
})

ctx.UpdateStatus(status.Running, "Running")
for {
entry, err := parser.Next()
if err != nil {
Expand All @@ -197,14 +203,18 @@ func (inp *journald) Run(
case errors.Is(err, journalctl.ErrRestarting):
continue
default:
logger.Errorf("could not read event: %s", err)
msg := fmt.Sprintf("could not read event: %s", err)
ctx.UpdateStatus(status.Failed, msg)
logger.Error(msg)
return err
}
}

event := entry.ToEvent()
if err := publisher.Publish(event, event.Private); err != nil {
logger.Errorf("could not publish event: %s", err)
msg := fmt.Sprintf("could not publish event: %s", err)
ctx.UpdateStatus(status.Failed, msg)
logger.Errorf(msg)
return err
}
}
Expand Down
28 changes: 28 additions & 0 deletions filebeat/input/journald/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalfield"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -330,6 +331,33 @@ func TestReaderAdapterCanHandleNonStringFields(t *testing.T) {
}
}

func TestInputCanReportStatus(t *testing.T) {
out := decompress(t, filepath.Join("testdata", "multiple-boots.journal.gz"))

env := newInputTestingEnvironment(t)
cfg := mapstr.M{
"paths": []string{out},
}
inp := env.mustCreateInput(cfg)

ctx, cancelInput := context.WithCancel(context.Background())
t.Cleanup(cancelInput)

env.startInput(ctx, inp)
env.waitUntilEventCount(6)

env.RequireStatuses([]statusUpdate{
{
state: status.Starting,
msg: "Starting",
},
{
state: status.Running,
msg: "Running",
},
})
}

func decompress(t *testing.T, namegz string) string {
t.Helper()

Expand Down
1 change: 1 addition & 0 deletions filebeat/input/v2/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type Context struct {

func (c Context) UpdateStatus(status status.Status, msg string) {
if c.StatusReporter != nil {
c.Logger.Debugf("updating status, status: '%s', message: '%s'", status.String(), msg)
c.StatusReporter.UpdateStatus(status, msg)
}
}
Expand Down

0 comments on commit 2595db9

Please sign in to comment.