Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SSAPI mvp #1951

Draft
wants to merge 22 commits into
base: release/v1.64.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ opentelemetry-java-contrib-jmx-metrics.jar
VERSION.txt
release_deps
/tmp
/local

# OpAmp Files
collector*.yaml
Expand Down
1 change: 1 addition & 0 deletions docs/receivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Below is a list of supported receivers with links to their documentation pages.
| SAP Netweaver Receiver | [sapnetweaverreceiver](../receiver/sapnetweaverreceiver/README.md) |
| SAPM Receiver | [sapmreceiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.113.0/receiver/sapmreceiver/README.md) |
| SNMP Receiver | [snmpreceiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.113.0/receiver/snmpreceiver/README.md) |
| Splunk Search API Receiver | [splunksearchapireceiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.113.0/receiver/splunksearchapireceiver/README.md) |
schmikei marked this conversation as resolved.
Show resolved Hide resolved
| Splunk HEC Receiver | [splunkhecreceiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.113.0/receiver/splunkhecreceiver/README.md) |
| StatsD Receiver | [statsdreceiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.113.0/receiver/statsdreceiver/README.md) |
| SQL Query Receiver | [sqlqueryreceiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.113.0/receiver/sqlqueryreceiver/README.md) |
Expand Down
2 changes: 2 additions & 0 deletions factories/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/observiq/bindplane-agent/receiver/pluginreceiver"
"github.com/observiq/bindplane-agent/receiver/routereceiver"
"github.com/observiq/bindplane-agent/receiver/sapnetweaverreceiver"
"github.com/observiq/bindplane-agent/receiver/splunksearchapireceiver"
"github.com/observiq/bindplane-agent/receiver/telemetrygeneratorreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/activedirectorydsreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/aerospikereceiver"
Expand Down Expand Up @@ -157,6 +158,7 @@ var defaultReceivers = []receiver.Factory{
sapnetweaverreceiver.NewFactory(),
simpleprometheusreceiver.NewFactory(),
snmpreceiver.NewFactory(),
splunksearchapireceiver.NewFactory(),
splunkhecreceiver.NewFactory(),
sqlqueryreceiver.NewFactory(),
sqlserverreceiver.NewFactory(),
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ require (
)

require (
github.com/observiq/bindplane-agent/receiver/splunksearchapireceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/aesprovider v0.113.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.113.0
go.opentelemetry.io/collector/processor/processortest v0.113.0
Expand Down Expand Up @@ -871,6 +872,8 @@ replace github.com/observiq/bindplane-agent/internal/report => ./internal/report

replace github.com/observiq/bindplane-agent/internal/measurements => ./internal/measurements

replace github.com/observiq/bindplane-agent/receiver/splunksearchapireceiver => ./receiver/splunksearchapireceiver

// Does not build with windows and only used in configschema executable
// Relevant issue https://github.com/mattn/go-ieproxy/issues/45
replace github.com/mattn/go-ieproxy => github.com/mattn/go-ieproxy v0.0.1
Expand Down
1 change: 1 addition & 0 deletions receiver/splunksearchapireceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Splunk Search API Receiver
124 changes: 124 additions & 0 deletions receiver/splunksearchapireceiver/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package splunksearchapireceiver provides a receiver that uses the Splunk API to migrate event data.
package splunksearchapireceiver

import (
"bytes"
"encoding/json"
"encoding/xml"
"fmt"
"io"
"net/http"
)

func (ssapir *splunksearchapireceiver) createSearchJob(config *Config, search string) (CreateJobResponse, error) {
// fmt.Println("Creating search job for search: ", search)
endpoint := fmt.Sprintf("%s/services/search/jobs", config.Endpoint)

reqBody := fmt.Sprintf(`search=%s`, search)
req, err := http.NewRequest("POST", endpoint, bytes.NewBuffer([]byte(reqBody)))
if err != nil {
return CreateJobResponse{}, err
}
req.SetBasicAuth(config.Username, config.Password)

resp, err := ssapir.client.Do(req)
if err != nil {
return CreateJobResponse{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusCreated {
return CreateJobResponse{}, fmt.Errorf("failed to create search job: %d", resp.StatusCode)
}

var jobResponse CreateJobResponse
body, err := io.ReadAll(resp.Body)
if err != nil {
return CreateJobResponse{}, fmt.Errorf("failed to read search job status response: %v", err)
}

err = xml.Unmarshal(body, &jobResponse)
if err != nil {
return CreateJobResponse{}, fmt.Errorf("failed to unmarshal search job response: %v", err)
}
return jobResponse, nil
}

func (ssapir *splunksearchapireceiver) getJobStatus(config *Config, sid string) (JobStatusResponse, error) {
// fmt.Println("Getting job status")
endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s", config.Endpoint, sid)

req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return JobStatusResponse{}, err
}
req.SetBasicAuth(config.Username, config.Password)

resp, err := ssapir.client.Do(req)
if err != nil {
return JobStatusResponse{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return JobStatusResponse{}, fmt.Errorf("failed to get search job status: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return JobStatusResponse{}, fmt.Errorf("failed to read search job status response: %v", err)
}
var jobStatusResponse JobStatusResponse
err = xml.Unmarshal(body, &jobStatusResponse)
if err != nil {
return JobStatusResponse{}, fmt.Errorf("failed to unmarshal search job response: %v", err)
}

return jobStatusResponse, nil
}

func (ssapir *splunksearchapireceiver) getSearchResults(config *Config, sid string) (SearchResults, error) {
endpoint := fmt.Sprintf("%s/services/search/v2/jobs/%s/results?output_mode=json", config.Endpoint, sid)
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return SearchResults{}, err
}
req.SetBasicAuth(config.Username, config.Password)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should also note that we should do some oauth token authentication as well. Not sure if you have a future task pending for that

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a ticket for that right now, but I'll make one


resp, err := ssapir.client.Do(req)
if err != nil {
return SearchResults{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return SearchResults{}, fmt.Errorf("failed to get search job results: %d", resp.StatusCode)
}

var searchResults SearchResults
body, err := io.ReadAll(resp.Body)
if err != nil {
return SearchResults{}, fmt.Errorf("failed to read search job results response: %v", err)
}
// fmt.Println("Body: ", string(body))
schmikei marked this conversation as resolved.
Show resolved Hide resolved
err = json.Unmarshal(body, &searchResults)
if err != nil {
return SearchResults{}, fmt.Errorf("failed to unmarshal search job results: %v", err)
}

return searchResults, nil
}
99 changes: 99 additions & 0 deletions receiver/splunksearchapireceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package splunksearchapireceiver

import (
"errors"
"strings"
"time"

"go.opentelemetry.io/collector/config/confighttp"
)

// Config struct to represent the configuration for the Splunk Search API receiver
type Config struct {
confighttp.ClientConfig `mapstructure:",squash"`
Username string `mapstructure:"splunk_username"`
Password string `mapstructure:"splunk_password"`
Searches []Search `mapstructure:"searches"`
}

// Search struct to represent a Splunk search
type Search struct {
Query string `mapstructure:"query"`
EarliestTime string `mapstructure:"earliest_time"`
LatestTime string `mapstructure:"latest_time"`
Limit int `mapstructure:"limit"`
}

// Validate validates the Splunk Search API receiver configuration
func (cfg *Config) Validate() error {
if cfg.Endpoint == "" {
return errors.New("missing Splunk server endpoint")
}
if cfg.Username == "" {
return errors.New("missing Splunk username")
}
if cfg.Password == "" {
return errors.New("missing Splunk password")
}
if len(cfg.Searches) == 0 {
return errors.New("at least one search must be provided")
}

for _, search := range cfg.Searches {
if search.Query == "" {
return errors.New("missing query in search")
}

// query implicitly starts with "search" command
if !strings.HasPrefix(search.Query, "search ") {
search.Query = "search " + search.Query
}

if strings.Contains(search.Query, "|") {
return errors.New("command chaining is not supported for queries")
schmikei marked this conversation as resolved.
Show resolved Hide resolved
}

if search.EarliestTime == "" {
return errors.New("missing earliest_time in search")
}
if search.LatestTime == "" {
return errors.New("missing latest_time in search")
}

// parse time strings to time.Time
earliestTime, err := time.Parse(time.RFC3339, search.EarliestTime)
if err != nil {
return errors.New("earliest_time failed to be parsed as RFC3339")
}

latestTime, err := time.Parse(time.RFC3339, search.LatestTime)
if err != nil {
return errors.New("latest_time failed to be parsed as RFC3339")
}

if earliestTime.UTC().After(latestTime.UTC()) {
return errors.New("earliest_time must be earlier than latest_time")
}
if earliestTime.UTC().After(time.Now().UTC()) {
schmikei marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("earliest_time must be earlier than current time")
}
if latestTime.UTC().After(time.Now().UTC()) {
return errors.New("latest_time must be earlier than current time")
}
}
return nil
}
54 changes: 54 additions & 0 deletions receiver/splunksearchapireceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package splunksearchapireceiver

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
)

var (
typeStr = component.MustNewType("splunksearchapi")
)

func createDefaultConfig() component.Config {
return &Config{
ClientConfig: confighttp.NewDefaultClientConfig(),
}
}

func createLogsReceiver(_ context.Context,
params receiver.Settings,
cfg component.Config,
consumer consumer.Logs,
) (receiver.Logs, error) {
ssapirConfig := cfg.(*Config)
ssapir := &splunksearchapireceiver{
logger: params.Logger,
logsConsumer: consumer,
config: ssapirConfig,
settings: params.TelemetrySettings,
}
return ssapir, nil
}

// NewFactory creates a factory for Splunk Search API receiver
func NewFactory() receiver.Factory {
return receiver.NewFactory(typeStr, createDefaultConfig, receiver.WithLogs(createLogsReceiver, component.StabilityLevelAlpha))
}
51 changes: 51 additions & 0 deletions receiver/splunksearchapireceiver/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
module github.com/open-telemetry/opentelemtry-collector-contrib/receiver/splunksearchapireceiver

go 1.22.5

require (
go.opentelemetry.io/collector/component v0.113.0
go.opentelemetry.io/collector/consumer v0.113.0
go.opentelemetry.io/collector/pdata v1.19.0
go.opentelemetry.io/collector/receiver v0.112.0
go.uber.org/zap v1.27.0
)

require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/rs/cors v1.11.1 // indirect
go.opentelemetry.io/collector/client v1.19.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.113.0 // indirect
go.opentelemetry.io/collector/config/configcompression v1.19.0 // indirect
go.opentelemetry.io/collector/config/configopaque v1.19.0 // indirect
go.opentelemetry.io/collector/config/configtls v1.19.0 // indirect
go.opentelemetry.io/collector/config/internal v0.113.0 // indirect
go.opentelemetry.io/collector/extension v0.113.0 // indirect
go.opentelemetry.io/collector/extension/auth v0.113.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect
)

require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
go.opentelemetry.io/collector/config/confighttp v0.113.0
go.opentelemetry.io/collector/config/configtelemetry v0.113.0 // indirect
go.opentelemetry.io/collector/pipeline v0.112.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.35.1 // indirect
)
Loading
Loading