Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.17](backport #42212) [filebeat][streaming] - Added OAuth2 support with auto token refresh for websockets #42244

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862]
- Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012]
- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]

*Auditbeat*
Expand Down
45 changes: 44 additions & 1 deletion x-pack/filebeat/docs/inputs/input-streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The websocket streaming input supports:
** Basic
** Bearer
** Custom
** OAuth2.0

NOTE: The `streaming` input websocket handler does not currently support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.

Expand Down Expand Up @@ -113,7 +114,7 @@ This will include any sensitive or secret information kept in the `state` object

==== Authentication

The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.
The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication, authentication via a custom auth config and OAuth2 based authentication. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.

Example configurations with authentication:

Expand Down Expand Up @@ -166,6 +167,48 @@ filebeat.inputs:
token_url: https://api.crowdstrike.com/oauth2/token
----

==== Websocket OAuth2.0

The `websocket` streaming input supports OAuth2.0 authentication. The `auth` configuration field is used to specify the OAuth2.0 configuration. These values are not exposed to the `state` object.

The `auth` configuration field has the following subfields:

- `client_id`: The client ID to use for OAuth2.0 authentication.
- `client_secret`: The client secret to use for OAuth2.0 authentication.
- `token_url`: The token URL to use for OAuth2.0 authentication.
- `scopes`: The scopes to use for OAuth2.0 authentication.
- `endpoint_params`: The endpoint parameters to use for OAuth2.0 authentication.
- `auth_style`: The authentication style to use for OAuth2.0 authentication. If left unset, the style will be automatically detected.
- `token_expiry_buffer`: Minimum valid time remaining before attempting an OAuth2 token renewal. The default value is `2m`.

**Explanations for `auth_style` and `token_expiry_buffer`:**

- `auth_style`: The authentication style to use for OAuth2.0 authentication which determines how the values of sensitive information like `client_id` and `client_secret` are sent in the token request. The default style value is automatically inferred and used appropriately if no value is provided. The `auth_style` configuration field is optional and can be used to specify the authentication style to use for OAuth2.0 authentication. The `auth_style` configuration field supports the following configurable values:

* `in_header`: The `client_id` and `client_secret` is sent in the header as a base64 encoded `Authorization` header.
* `in_params`: The `client_id` and `client_secret` is sent in the request body along with the other OAuth2 parameters.

- `token_expiry_buffer`: The token expiry buffer to use for OAuth2.0 authentication. The `token_expiry_buffer` is used as a safety net to ensure that the token does not expire before the input can refresh it. The `token_expiry_buffer` configuration field is optional. If the `token_expiry_buffer` configuration field is not set, the default value of `2m` is used.

NOTE: We recommend leaving the `auth_style` configuration field unset (automatically inferred internally) for most scenarios, except where manual intervention is required.

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: streaming
auth:
client_id: a23fcea2643868ef1a41565a1a8a1c7c
client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
token_url: https://api.sample-url.com/oauth2/token
scopes: ["read", "write"]
endpoint_params:
param1: value1
param2: value2
auth_style: in_params
token_expiry_buffer: 5m
url: wss://localhost:443/_stream
----

[[input-state-streaming]]
==== Input state

Expand Down
47 changes: 42 additions & 5 deletions x-pack/filebeat/input/streaming/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ import (
"regexp"
"time"

"golang.org/x/oauth2"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

const (
authStyleInHeader = "in_header"
authStyleInParams = "in_params"
)

type config struct {
// Type is the type of the stream being followed. The
// zero value indicates websocket.
Expand Down Expand Up @@ -85,11 +92,30 @@ type customAuthConfig struct {

type oAuth2Config struct {
// common oauth fields
ClientID string `config:"client_id"`
ClientSecret string `config:"client_secret"`
EndpointParams map[string][]string `config:"endpoint_params"`
Scopes []string `config:"scopes"`
TokenURL string `config:"token_url"`
AuthStyle string `config:"auth_style"`
ClientID string `config:"client_id"`
ClientSecret string `config:"client_secret"`
EndpointParams url.Values `config:"endpoint_params"`
Scopes []string `config:"scopes"`
TokenExpiryBuffer time.Duration `config:"token_expiry_buffer" validate:"min=0"`
TokenURL string `config:"token_url"`
// accessToken is only used internally to set the initial headers via formHeader() if oauth2 is enabled
accessToken string
}

func (o oAuth2Config) isEnabled() bool {
return o.ClientID != "" && o.ClientSecret != "" && o.TokenURL != ""
}

func (o oAuth2Config) getAuthStyle() oauth2.AuthStyle {
switch o.AuthStyle {
case authStyleInHeader:
return oauth2.AuthStyleInHeader
case authStyleInParams:
return oauth2.AuthStyleInParams
default:
return oauth2.AuthStyleAutoDetect
}
}

type urlConfig struct {
Expand Down Expand Up @@ -144,6 +170,12 @@ func (c config) Validate() error {
return errors.New("wait_min must be less than or equal to wait_max")
}
}

if c.Auth.OAuth2.isEnabled() {
if c.Auth.OAuth2.AuthStyle != authStyleInHeader && c.Auth.OAuth2.AuthStyle != authStyleInParams && c.Auth.OAuth2.AuthStyle != "" {
return fmt.Errorf("unsupported auth style: %s", c.Auth.OAuth2.AuthStyle)
}
}
return nil
}

Expand Down Expand Up @@ -173,6 +205,11 @@ func defaultConfig() config {
Transport: httpcommon.HTTPTransportSettings{
Timeout: 180 * time.Second,
},
Auth: authConfig{
OAuth2: oAuth2Config{
TokenExpiryBuffer: 2 * time.Minute,
},
},
Retry: &retry{
MaxAttempts: 5,
WaitMin: 1 * time.Second,
Expand Down
73 changes: 73 additions & 0 deletions x-pack/filebeat/input/streaming/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,79 @@ var configTests = []struct {
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "valid_authStyle_default",
config: map[string]interface{}{
"auth": map[string]interface{}{
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
},
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "valid_authStyle_in_params",
config: map[string]interface{}{
"auth": map[string]interface{}{
"auth_style": "in_params",
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
},
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "valid_authStyle_in_header",
config: map[string]interface{}{
"auth": map[string]interface{}{
"auth_style": "in_header",
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
},
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "invalid_authStyle",
config: map[string]interface{}{
"auth": map[string]interface{}{
"auth_style": "in_query",
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
},
"url": "wss://localhost:443/v1/stream",
},
wantErr: fmt.Errorf("unsupported auth style: in_query accessing config"),
},
{
name: "valid_tokenExpiryBuffer",
config: map[string]interface{}{
"auth": map[string]interface{}{
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
"token_expiry_buffer": "5m",
},
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "invalid_tokenExpiryBuffer",
config: map[string]interface{}{
"auth": map[string]interface{}{
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
"token_expiry_buffer": "-1s",
},
"url": "wss://localhost:443/v1/stream",
},
wantErr: fmt.Errorf("requires duration >= 0 accessing 'auth.token_expiry_buffer'"),
},
}

func TestConfig(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions x-pack/filebeat/input/streaming/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
)

type input struct {
stream StreamFollower

Check failure on line 29 in x-pack/filebeat/input/streaming/input.go

View workflow job for this annotation

GitHub Actions / lint (linux)

field `stream` is unused (unused)

time func() time.Time
cfg config
Expand Down Expand Up @@ -378,12 +378,14 @@
func formHeader(cfg config) map[string][]string {
header := make(map[string][]string)
switch {
case cfg.Auth.CustomAuth != nil:
header[cfg.Auth.CustomAuth.Header] = []string{cfg.Auth.CustomAuth.Value}
case cfg.Auth.OAuth2.accessToken != "":
header["Authorization"] = []string{"Bearer " + cfg.Auth.OAuth2.accessToken}
case cfg.Auth.BearerToken != "":
header["Authorization"] = []string{"Bearer " + cfg.Auth.BearerToken}
case cfg.Auth.BasicToken != "":
header["Authorization"] = []string{"Basic " + cfg.Auth.BasicToken}
case cfg.Auth.CustomAuth != nil:
header[cfg.Auth.CustomAuth.Header] = []string{cfg.Auth.CustomAuth.Value}
}
return header
}
1 change: 0 additions & 1 deletion x-pack/filebeat/input/streaming/input_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, erro
if err := cfg.Unpack(&src.cfg); err != nil {
return nil, nil, err
}

if src.cfg.Program == "" {
// set default program
src.cfg.Program = `
Expand Down
Loading
Loading