Skip to content

Commit

Permalink
[streaming] - Made namespace consistent in logging & put a null check…
Browse files Browse the repository at this point in the history
… to stop paincs on shutdown (#42315)

* made namespace consistent in logging & put a null check to stop paincs on shutdown

(cherry picked from commit ef3bd69)

# Conflicts:
#	x-pack/filebeat/input/streaming/crowdstrike.go
#	x-pack/filebeat/input/streaming/websocket.go
  • Loading branch information
ShourieG authored and mergify[bot] committed Jan 17, 2025
1 parent 27de6d3 commit 9e8e892
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 5 deletions.
24 changes: 24 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,30 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Ensure netflow custom field configuration is applied. {issue}40735[40735] {pull}40730[40730]
- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015]
- Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218]
- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142]
- Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192]
- Log bad handshake details when websocket connection fails {pull}41300[41300]
- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179]
- Journald input now can read events from all boots {issue}41083[41083] {pull}41244[41244]
- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393]
- Fix aws region in aws-s3 input s3 polling mode. {pull}41572[41572]
- Fix errors in SQS host resolution in the `aws-s3` input when using custom (non-AWS) endpoints. {pull}41504[41504]
- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393]
- The azure-eventhub input now correctly reports its status to the Elastic Agent on fatal errors {pull}41469[41469]
- Add support for Access Points in the `aws-s3` input. {pull}41495[41495]
- Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664]
- Fix missing key in streaming input logging. {pull}41600[41600]
- Improve S3 object size metric calculation to support situations where Content-Length is not available. {pull}41755[41755]
- Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765]
- Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583]
- Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920]
- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036]
- Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019]
- The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078]
- Fix Netflow Template Sharing configuration handling. {pull}42080[42080]
- Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218]
- In the `streaming` input, prevent panics on shutdown with a null check and apply a consistent namespace to contextual data in debug logs. {pull}42315[42315]

*Heartbeat*

Expand Down
7 changes: 6 additions & 1 deletion x-pack/filebeat/input/streaming/crowdstrike.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client,
if err != nil {
return state, Warning{fmt.Errorf("failed to decode discover body: %w", err)}
}
s.log.Debugw("stream discover metadata", "meta", mapstr.M(body.Meta))
s.log.Debugw("stream discover metadata", logp.Namespace(s.ns), "meta", mapstr.M(body.Meta))

var offset int
if cursor, ok := state["cursor"].(map[string]any); ok {
Expand Down Expand Up @@ -233,6 +233,7 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client,
err := dec.Decode(&msg)
if err != nil {
s.metrics.errorsTotal.Inc()
//nolint:errorlint // will not be a wrapped error here.
if err == io.EOF {
s.log.Info("stream ended, restarting")
return state, nil
Expand All @@ -241,7 +242,11 @@ func (s *falconHoseStream) followSession(ctx context.Context, cli *http.Client,
}
s.metrics.receivedBytesTotal.Add(uint64(len(msg)))
state["response"] = []byte(msg)
<<<<<<< HEAD

Check failure on line 245 in x-pack/filebeat/input/streaming/crowdstrike.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '<<' (typecheck)
s.log.Debugw("received firehose message", logp.Namespace("falcon_hose"), debugMsg(msg))
=======
s.log.Debugw("received firehose message", logp.Namespace(s.ns), "msg", debugMsg(msg))
>>>>>>> ef3bd69d3 ([streaming] - Made namespace consistent in logging & put a null check to stop paincs on shutdown (#42315))

Check failure on line 249 in x-pack/filebeat/input/streaming/crowdstrike.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.log.Errorw("failed to process and publish data", "error", err)
Expand Down
14 changes: 10 additions & 4 deletions x-pack/filebeat/input/streaming/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,11 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {

// ensures this is the last connection closed when the function returns
defer func() {
if err := c.Close(); err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("encountered an error while closing the websocket connection", "error", err)
if c != nil {
if err := c.Close(); err != nil {
s.metrics.errorsTotal.Inc()
s.log.Errorw("encountered an error while closing the websocket connection", "error", err)
}
}
}()

Expand Down Expand Up @@ -217,7 +219,11 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {
}
s.metrics.receivedBytesTotal.Add(uint64(len(message)))
state["response"] = message
<<<<<<< HEAD

Check failure on line 222 in x-pack/filebeat/input/streaming/websocket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '<<' (typecheck)
s.log.Debugw("received websocket message", logp.Namespace("websocket"), string(message))
=======
s.log.Debugw("received websocket message", logp.Namespace(s.ns), "msg", string(message))
>>>>>>> ef3bd69d3 ([streaming] - Made namespace consistent in logging & put a null check to stop paincs on shutdown (#42315))

Check failure on line 226 in x-pack/filebeat/input/streaming/websocket.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)
err = s.process(ctx, state, s.cursor, s.now().In(time.UTC))
if err != nil {
s.metrics.errorsTotal.Inc()
Expand Down Expand Up @@ -294,7 +300,7 @@ func handleConnectionResponse(resp *http.Response, metrics *inputMetrics, log *l
buf.WriteString("... truncated")
}

log.Debugw("websocket connection response", "body", &buf)
log.Debugw("websocket connection response", "http.response.body.content", &buf)
}
}

Expand Down

0 comments on commit 9e8e892

Please sign in to comment.