Skip to content

Commit

Permalink
[streaming] - Made namespace consistent in logging & put a null check…
Browse files Browse the repository at this point in the history
… to stop paincs on shutdown (#42315)

* made namespace consistent in logging & put a null check to stop paincs on shutdown
  • Loading branch information
ShourieG authored Jan 17, 2025
1 parent a122a9f commit ef3bd69
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078]
- Fix Netflow Template Sharing configuration handling. {pull}42080[42080]
- Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218]
- In the `streaming` input, prevent panics on shutdown with a null check and apply a consistent namespace to contextual data in debug logs. {pull}42315[42315]

*Heartbeat*

Expand Down
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/streaming/crowdstrike.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client,
if err != nil {
return state, Warning{fmt.Errorf("failed to decode discover body: %w", err)}
}
s.log.Debugw("stream discover metadata", "meta", mapstr.M(body.Meta))
s.log.Debugw("stream discover metadata", logp.Namespace(s.ns), "meta", mapstr.M(body.Meta))

var offset int
if cursor, ok := state["cursor"].(map[string]any); ok {
Expand Down Expand Up @@ -233,6 +233,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client,
err := dec.Decode(&msg)
if err != nil {
s.metrics.errorsTotal.Inc()
//nolint:errorlint // will not be a wrapped error here.
if err == io.EOF {
s.log.Info("stream ended, restarting")
return state, nil
Expand All @@ -241,7 +242,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client,
}
s.metrics.receivedBytesTotal.Add(uint64(len(msg)))
state["response"] = []byte(msg)
s.log.Debugw("received firehose message", logp.Namespace("falcon_hose"), "msg", debugMsg(msg))
s.log.Debugw("received firehose message", logp.Namespace(s.ns), "msg", debugMsg(msg))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.log.Errorw("failed to process and publish data", "error", err)
Expand Down
12 changes: 7 additions & 5 deletions x-pack/filebeat/input/streaming/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,11 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {

// ensures this is the last connection closed when the function returns
defer func() {
if err := c.Close(); err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("encountered an error while closing the websocket connection", "error", err)
if c != nil {
if err := c.Close(); err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("encountered an error while closing the websocket connection", "error", err)
}
}
}()

Expand Down Expand Up @@ -217,7 +219,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {
}
s.metrics.receivedBytesTotal.Add(uint64(len(message)))
state["response"] = message
s.log.Debugw("received websocket message", logp.Namespace("websocket"), "msg", string(message))
s.log.Debugw("received websocket message", logp.Namespace(s.ns), "msg", string(message))
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
Expand Down Expand Up @@ -294,7 +296,7 @@ func handleConnectionResponse(resp *http.Response, metrics *inputMetrics, log *l
buf.WriteString("... truncated")
}

log.Debugw("websocket connection response", "body", &buf)
log.Debugw("websocket connection response", "http.response.body.content", &buf)
}
}

Expand Down

0 comments on commit ef3bd69

Please sign in to comment.