diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index 8996845174..794d82c67a 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -726,7 +726,7 @@ func (f *Fixture) ExecStatus(ctx context.Context, opts ...process.CmdOption) (Ag status := AgentStatusOutput{} if uerr := json.Unmarshal(out, &status); uerr != nil { return AgentStatusOutput{}, - fmt.Errorf("could not unmarshal agent status output: %w", errors.Join(uerr, err)) + fmt.Errorf("could not unmarshal agent status output: %w:\n%s", errors.Join(uerr, err), out) } else if status.IsZero() { return status, fmt.Errorf("agent status output is empty: %w", err) } diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 937dbdd365..ebd0221a8b 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -9,6 +9,7 @@ package integration import ( "bytes" "context" + "encoding/base64" "errors" "fmt" "net/url" @@ -21,9 +22,11 @@ import ( "text/template" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/testing/estools" "github.com/elastic/elastic-agent/pkg/control/v2/client" aTesting "github.com/elastic/elastic-agent/pkg/testing" @@ -1101,6 +1104,273 @@ service: require.True(t, err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "Retrieved unexpected error: %s", err.Error()) } +func TestHybridAgentE2E(t *testing.T) { + // This test is a hybrid agent test that ingests a single log with + // filebeat and fbreceiver. It then compares the final documents in + // Elasticsearch to ensure they have no meaningful differences. + info := define.Require(t, define.Requirements{ + Group: Default, + Local: true, + OS: []define.OS{ + {Type: define.Windows}, + {Type: define.Linux}, + {Type: define.Darwin}, + }, + Stack: &define.Stack{}, + }) + tmpDir := t.TempDir() + numEvents := 1 + fbIndex := "logs-generic-default" + fbReceiverIndex := "logs-generic-default" + + inputFile, err := os.CreateTemp(tmpDir, "input-*.log") + require.NoError(t, err, "failed to create input log file") + inputFilePath := inputFile.Name() + for i := 0; i < numEvents; i++ { + _, err = inputFile.Write([]byte(fmt.Sprintf("Line %d", i))) + require.NoErrorf(t, err, "failed to write line %d to temp file", i) + _, err = inputFile.Write([]byte("\n")) + require.NoErrorf(t, err, "failed to write newline to input file") + time.Sleep(100 * time.Millisecond) + } + err = inputFile.Close() + require.NoError(t, err, "failed to close data input file") + + t.Cleanup(func() { + if t.Failed() { + contents, err := os.ReadFile(inputFilePath) + if err != nil { + t.Logf("no data file to import at %s", inputFilePath) + return + } + t.Logf("contents of input file: %s\n", string(contents)) + } + }) + + type configOptions struct { + InputPath string + HomeDir string + ESEndpoint string + ESApiKey string + BeatsESApiKey string + FBReceiverIndex string + } + esEndpoint, err := getESHost() + require.NoError(t, err, "error getting elasticsearch endpoint") + esApiKey, err := createESApiKey(info.ESClient) + require.NoError(t, err, "error creating API key") + require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) + + configTemplate := `agent.logging.level: info +agent.logging.to_stderr: true +inputs: + - id: filestream-filebeat + type: filestream + paths: + - {{.InputPath}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + use_output: default + queue.mem.flush.timeout: 0s + path.home: {{.HomeDir}}/filebeat +outputs: + default: + type: elasticsearch + hosts: [{{.ESEndpoint}}] + api_key: {{.BeatsESApiKey}} + compression_level: 0 +receivers: + filebeatreceiver: + filebeat: + inputs: + - type: filestream + id: filestream-fbreceiver + enabled: true + paths: + - {{.InputPath}} + prospector.scanner.fingerprint.enabled: false + file_identity.native: ~ + processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_fields: + fields: + dataset: generic + namespace: default + type: logs + target: data_stream + - add_fields: + fields: + dataset: generic + target: event + output: + otelconsumer: + logging: + level: info + selectors: + - '*' + path.home: {{.HomeDir}}/fbreceiver + queue.mem.flush.timeout: 0s +exporters: + debug: + use_internal_logger: false + verbosity: detailed + elasticsearch/log: + endpoints: + - {{.ESEndpoint}} + compression: none + api_key: {{.ESApiKey}} + logs_index: {{.FBReceiverIndex}} + batcher: + enabled: true + flush_timeout: 1s + mapping: + mode: bodymap +service: + pipelines: + logs: + receivers: + - filebeatreceiver + exporters: + - elasticsearch/log + - debug +` + + beatsApiKey, err := base64.StdEncoding.DecodeString(esApiKey.Encoded) + require.NoError(t, err, "error decoding api key") + + var configBuffer bytes.Buffer + require.NoError(t, + template.Must(template.New("config").Parse(configTemplate)).Execute(&configBuffer, + configOptions{ + InputPath: inputFilePath, + HomeDir: tmpDir, + ESEndpoint: esEndpoint, + ESApiKey: esApiKey.Encoded, + BeatsESApiKey: string(beatsApiKey), + FBReceiverIndex: fbReceiverIndex, + })) + configContents := configBuffer.Bytes() + t.Cleanup(func() { + if t.Failed() { + t.Logf("Contents of agent config file:\n%s\n", string(configContents)) + } + }) + + // Now we can actually create the fixture and run it + fixture, err := define.NewFixtureFromLocalBuild(t, define.Version()) + require.NoError(t, err) + + ctx, cancel := testcontext.WithDeadline(t, context.Background(), time.Now().Add(5*time.Minute)) + defer cancel() + + err = fixture.Prepare(ctx) + require.NoError(t, err) + err = fixture.Configure(ctx, configContents) + require.NoError(t, err) + + cmd, err := fixture.PrepareAgentCommand(ctx, nil) + require.NoError(t, err) + cmd.WaitDelay = 1 * time.Second + + var output strings.Builder + cmd.Stderr = &output + cmd.Stdout = &output + + err = cmd.Start() + require.NoError(t, err) + + t.Cleanup(func() { + if t.Failed() { + t.Log("Elastic-Agent output:") + t.Log(output.String()) + } + }) + + require.Eventually(t, func() bool { + err = fixture.IsHealthy(ctx) + if err != nil { + t.Logf("waiting for agent healthy: %s", err.Error()) + return false + } + return true + }, 1*time.Minute, 1*time.Second) + + var docs estools.Documents + actualHits := &struct { + Hits int + }{} + require.Eventually(t, + func() bool { + findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer findCancel() + + docs, err = estools.GetLogsForIndexWithContext(findCtx, info.ESClient, ".ds-"+fbIndex+"*", map[string]interface{}{ + "log.file.path": inputFilePath, + }) + require.NoError(t, err) + + actualHits.Hits = docs.Hits.Total.Value + + return actualHits.Hits == numEvents*2 // filebeat + fbreceiver + }, + 1*time.Minute, 1*time.Second, + "Expected %d logs in elasticsearch, got: %v", numEvents, actualHits) + + doc1 := docs.Hits.Hits[0].Source + doc2 := docs.Hits.Hits[1].Source + ignoredFields := []string{ + // Expected to change between filebeat and fbreceiver + "@timestamp", + "agent.ephemeral_id", + "agent.id", + + // Missing from fbreceiver doc + "elastic_agent.id", + "elastic_agent.snapshot", + "elastic_agent.version", + + // TODO: fbreceiver adds metadata fields that are internal in filebeat. + // Remove this once https://github.com/elastic/beats/pull/42412 + // is available in agent. + "@metadata.beat", + "@metadata.type", + "@metadata.version", + } + + assertMapsEqual(t, doc1, doc2, ignoredFields, "expected documents to be equal") + cancel() + cmd.Wait() +} + +func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { + t.Helper() + + flatM1 := m1.Flatten() + flatM2 := m2.Flatten() + for _, f := range ignoredFields { + hasKeyM1, _ := flatM1.HasKey(f) + hasKeyM2, _ := flatM2.HasKey(f) + + if !hasKeyM1 && !hasKeyM2 { + assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) + } + + // If the ignored field exists and is equal in both maps then it shouldn't be ignored + if hasKeyM1 && hasKeyM2 { + valM1, _ := flatM1.GetValue(f) + valM2, _ := flatM2.GetValue(f) + if valM1 == valM2 { + assert.Failf(t, msg, "ignored field %q is equal in both maps, please remove it from the ignored fields", f) + } + } + + flatM1.Delete(f) + flatM2.Delete(f) + } + require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal") +} + func TestFBOtelRestartE2E(t *testing.T) { // This test ensures that filebeatreceiver is able to deliver logs even // in advent of a collector restart.