From 9b9dcf80ff21088dc9de6236db131a0eff8762eb Mon Sep 17 00:00:00 2001 From: Hisar Balik Date: Mon, 3 Feb 2025 09:35:46 +0100 Subject: [PATCH] feat: Add cloud provider info to the gateways (#1778) --- .k3d-kyma.yaml | 12 ++ config/rbac/role.yaml | 1 + ...ulti_pipeline_gateway_with_connectors.yaml | 8 +- ...ngle_pipeline_gateway_with_connectors.yaml | 4 +- .../config/gatewayprocs/k8s_attribute_proc.go | 20 +++ .../gatewayprocs/k8s_attribute_proc_test.go | 20 +++ .../config/gatewayprocs/resource_procs.go | 21 ++- .../gatewayprocs/resource_procs_test.go | 9 +- .../config/log/gateway/config.go | 6 +- .../config/log/gateway/config_builder.go | 11 +- .../config/log/gateway/config_builder_test.go | 68 ++++++-- .../config/log/gateway/processors.go | 6 +- .../config/log/gateway/processors_test.go | 33 ++-- .../config/log/gateway/testdata/config.yaml | 21 ++- .../config/metric/gateway/config.go | 2 +- .../config/metric/gateway/config_builder.go | 4 +- .../metric/gateway/config_builder_test.go | 60 +++++-- .../config/metric/gateway/processors.go | 4 +- .../config/metric/gateway/processors_test.go | 18 ++- .../config/metric/gateway/service.go | 2 +- .../config/metric/gateway/service_test.go | 44 ++--- .../metric/gateway/testdata/config.yaml | 21 ++- .../testdata/config_otlp_disabled.yaml | 21 ++- .../config/trace/gateway/config.go | 10 +- .../config/trace/gateway/config_builder.go | 11 +- .../trace/gateway/config_builder_test.go | 61 +++++-- .../config/trace/gateway/processors.go | 12 +- .../config/trace/gateway/processors_test.go | 38 +++-- .../config/trace/gateway/testdata/config.yaml | 21 ++- .../otel/mocks/gateway_config_builder.go | 22 +-- .../reconciler/logpipeline/otel/reconciler.go | 9 +- .../logpipeline/otel/reconciler_test.go | 6 +- .../reconciler/metricpipeline/reconciler.go | 4 + .../mocks/gateway_config_builder.go | 22 +-- .../reconciler/tracepipeline/reconciler.go | 9 +- .../tracepipeline/reconciler_test.go | 16 +- internal/resources/otelcollector/rbac.go | 2 +- .../otelcollector/testdata/log-gateway.yaml | 1 + .../testdata/metric-gateway-istio.yaml | 1 + .../testdata/metric-gateway.yaml | 1 + .../otelcollector/testdata/trace-gateway.yaml | 1 + internal/utils/k8s/cluster_info_getter.go | 54 +++++++ .../utils/k8s/cluster_info_getter_test.go | 56 +++++++ main.go | 11 +- test/e2e/metrics_runtime_input_test.go | 2 +- .../metrics_cloud_provider_attributes_test.go | 153 ++++++++++++++++++ test/testkit/k8s/configmap.go | 45 ++++++ 47 files changed, 812 insertions(+), 172 deletions(-) create mode 100644 internal/utils/k8s/cluster_info_getter.go create mode 100644 internal/utils/k8s/cluster_info_getter_test.go create mode 100644 test/integration/istio/metrics_cloud_provider_attributes_test.go create mode 100644 test/testkit/k8s/configmap.go diff --git a/.k3d-kyma.yaml b/.k3d-kyma.yaml index af1a4f8fb..1cd12e009 100644 --- a/.k3d-kyma.yaml +++ b/.k3d-kyma.yaml @@ -12,3 +12,15 @@ registries: name: kyma hostPort: '5001' +options: + k3s: + nodeLabels: + - label: topology.kubernetes.io/region=kyma-local + nodeFilters: + - server:* + - label: topology.kubernetes.io/zone=kyma-local + nodeFilters: + - server:* + - label: node.kubernetes.io/instance-type=local + nodeFilters: + - server:* diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index bcd7d2dcb..ef2ed5d71 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -11,6 +11,7 @@ rules: - "" resources: - secrets + - configmaps verbs: - get - list diff --git a/docs/contributor/assets/multi_pipeline_gateway_with_connectors.yaml b/docs/contributor/assets/multi_pipeline_gateway_with_connectors.yaml index 7a0994b64..45367f687 100644 --- a/docs/contributor/assets/multi_pipeline_gateway_with_connectors.yaml +++ b/docs/contributor/assets/multi_pipeline_gateway_with_connectors.yaml @@ -31,7 +31,7 @@ data: - filter/drop-if-input-source-runtime - filter/drop-if-input-source-prometheus - filter/drop-if-input-source-istio - - resource/insert-cluster-name + - resource/insert-cluster-attributes - batch exporters: - otlp/load-test-1 @@ -58,7 +58,7 @@ data: - filter/drop-if-input-source-runtime - filter/drop-if-input-source-prometheus - filter/drop-if-input-source-istio - - resource/insert-cluster-name + - resource/insert-cluster-attributes - batch exporters: - otlp/load-test-2 @@ -85,7 +85,7 @@ data: - filter/drop-if-input-source-runtime - filter/drop-if-input-source-prometheus - filter/drop-if-input-source-istio - - resource/insert-cluster-name + - resource/insert-cluster-attributes - batch exporters: - otlp/load-test-3 @@ -165,7 +165,7 @@ data: name: k8s.pod.uid - sources: - from: connection - resource/insert-cluster-name: + resource/insert-cluster-attributes: attributes: - action: insert key: k8s.cluster.name diff --git a/docs/contributor/assets/single_pipeline_gateway_with_connectors.yaml b/docs/contributor/assets/single_pipeline_gateway_with_connectors.yaml index 21811fd86..658279688 100644 --- a/docs/contributor/assets/single_pipeline_gateway_with_connectors.yaml +++ b/docs/contributor/assets/single_pipeline_gateway_with_connectors.yaml @@ -31,7 +31,7 @@ data: - filter/drop-if-input-source-runtime - filter/drop-if-input-source-prometheus - filter/drop-if-input-source-istio - - resource/insert-cluster-name + - resource/insert-cluster-attributes - batch exporters: - otlp/load-test-1 @@ -97,7 +97,7 @@ data: name: k8s.pod.uid - sources: - from: connection - resource/insert-cluster-name: + resource/insert-cluster-attributes: attributes: - action: insert key: k8s.cluster.name diff --git a/internal/otelcollector/config/gatewayprocs/k8s_attribute_proc.go b/internal/otelcollector/config/gatewayprocs/k8s_attribute_proc.go index 79ab0591f..a74e4fc7a 100644 --- a/internal/otelcollector/config/gatewayprocs/k8s_attribute_proc.go +++ b/internal/otelcollector/config/gatewayprocs/k8s_attribute_proc.go @@ -51,5 +51,25 @@ func extractLabels() []config.ExtractLabel { Key: "app", TagName: "kyma.app_name", }, + { + From: "node", + Key: "topology.kubernetes.io/region", + TagName: "cloud.region", + }, + { + From: "node", + Key: "topology.kubernetes.io/zone", + TagName: "cloud.availability_zone", + }, + { + From: "node", + Key: "node.kubernetes.io/instance-type", + TagName: "host.type", + }, + { + From: "node", + Key: "kubernetes.io/arch", + TagName: "host.arch", + }, } } diff --git a/internal/otelcollector/config/gatewayprocs/k8s_attribute_proc_test.go b/internal/otelcollector/config/gatewayprocs/k8s_attribute_proc_test.go index a4a5e2e91..72fc9f6cf 100644 --- a/internal/otelcollector/config/gatewayprocs/k8s_attribute_proc_test.go +++ b/internal/otelcollector/config/gatewayprocs/k8s_attribute_proc_test.go @@ -43,6 +43,26 @@ func TestK8sAttributesProcessorConfig(t *testing.T) { Key: "app", TagName: "kyma.app_name", }, + { + From: "node", + Key: "topology.kubernetes.io/region", + TagName: "cloud.region", + }, + { + From: "node", + Key: "topology.kubernetes.io/zone", + TagName: "cloud.availability_zone", + }, + { + From: "node", + Key: "node.kubernetes.io/instance-type", + TagName: "host.type", + }, + { + From: "node", + Key: "kubernetes.io/arch", + TagName: "host.arch", + }, } config := K8sAttributesProcessorConfig() diff --git a/internal/otelcollector/config/gatewayprocs/resource_procs.go b/internal/otelcollector/config/gatewayprocs/resource_procs.go index 15424ab52..31af7e402 100644 --- a/internal/otelcollector/config/gatewayprocs/resource_procs.go +++ b/internal/otelcollector/config/gatewayprocs/resource_procs.go @@ -4,13 +4,30 @@ import ( "github.com/kyma-project/telemetry-manager/internal/otelcollector/config" ) -func InsertClusterNameProcessorConfig() *config.ResourceProcessor { +func InsertClusterAttributesProcessorConfig(clusterName, cloudProvider string) *config.ResourceProcessor { + if cloudProvider != "" { + return &config.ResourceProcessor{ + Attributes: []config.AttributeAction{ + { + Action: "insert", + Key: "k8s.cluster.name", + Value: clusterName, + }, + { + Action: "insert", + Key: "cloud.provider", + Value: cloudProvider, + }, + }, + } + } + return &config.ResourceProcessor{ Attributes: []config.AttributeAction{ { Action: "insert", Key: "k8s.cluster.name", - Value: "${KUBERNETES_SERVICE_HOST}", + Value: clusterName, }, }, } diff --git a/internal/otelcollector/config/gatewayprocs/resource_procs_test.go b/internal/otelcollector/config/gatewayprocs/resource_procs_test.go index 61db91218..b6892c5ed 100644 --- a/internal/otelcollector/config/gatewayprocs/resource_procs_test.go +++ b/internal/otelcollector/config/gatewayprocs/resource_procs_test.go @@ -15,11 +15,16 @@ func TestInsertClusterNameProcessorConfig(t *testing.T) { { Action: "insert", Key: "k8s.cluster.name", - Value: "${KUBERNETES_SERVICE_HOST}", + Value: "test-cluster", + }, + { + Action: "insert", + Key: "cloud.provider", + Value: "test-cloud-provider", }, } - config := InsertClusterNameProcessorConfig() + config := InsertClusterAttributesProcessorConfig("test-cluster", "test-cloud-provider") require.ElementsMatch(expectedAttributeActions, config.Attributes, "Attributes should match") } diff --git a/internal/otelcollector/config/log/gateway/config.go b/internal/otelcollector/config/log/gateway/config.go index 654097e99..91159be7b 100644 --- a/internal/otelcollector/config/log/gateway/config.go +++ b/internal/otelcollector/config/log/gateway/config.go @@ -19,9 +19,9 @@ type Receivers struct { type Processors struct { config.BaseProcessors `yaml:",inline"` - K8sAttributes *config.K8sAttributesProcessor `yaml:"k8sattributes,omitempty"` - InsertClusterName *config.ResourceProcessor `yaml:"resource/insert-cluster-name,omitempty"` - DropKymaAttributes *config.ResourceProcessor `yaml:"resource/drop-kyma-attributes,omitempty"` + K8sAttributes *config.K8sAttributesProcessor `yaml:"k8sattributes,omitempty"` + InsertClusterAttributes *config.ResourceProcessor `yaml:"resource/insert-cluster-attributes,omitempty"` + DropKymaAttributes *config.ResourceProcessor `yaml:"resource/drop-kyma-attributes,omitempty"` } type Exporters map[string]Exporter diff --git a/internal/otelcollector/config/log/gateway/config_builder.go b/internal/otelcollector/config/log/gateway/config_builder.go index df3008dcf..7f8b1463b 100644 --- a/internal/otelcollector/config/log/gateway/config_builder.go +++ b/internal/otelcollector/config/log/gateway/config_builder.go @@ -22,14 +22,19 @@ type Builder struct { Reader client.Reader } -func (b *Builder) Build(ctx context.Context, pipelines []telemetryv1alpha1.LogPipeline) (*Config, otlpexporter.EnvVars, error) { +type BuildOptions struct { + ClusterName string + CloudProvider string +} + +func (b *Builder) Build(ctx context.Context, pipelines []telemetryv1alpha1.LogPipeline, opts BuildOptions) (*Config, otlpexporter.EnvVars, error) { cfg := &Config{ Base: config.Base{ Service: config.DefaultService(make(config.Pipelines)), Extensions: config.DefaultExtensions(), }, Receivers: makeReceiversConfig(), - Processors: makeProcessorsConfig(), + Processors: makeProcessorsConfig(opts), Exporters: make(Exporters), } @@ -99,7 +104,7 @@ func makePipelineConfig(exporterIDs ...string) config.Pipeline { Processors: []string{ "memory_limiter", "k8sattributes", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "batch", }, Exporters: exporterIDs, diff --git a/internal/otelcollector/config/log/gateway/config_builder_test.go b/internal/otelcollector/config/log/gateway/config_builder_test.go index e64a44a8b..702ba9784 100644 --- a/internal/otelcollector/config/log/gateway/config_builder_test.go +++ b/internal/otelcollector/config/log/gateway/config_builder_test.go @@ -26,7 +26,12 @@ func TestBuildConfig(t *testing.T) { t.Run("otlp exporter endpoint", func(t *testing.T) { collectorConfig, envVars, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{ testutils.NewLogPipelineBuilder().WithName("test").WithOTLPOutput(testutils.OTLPEndpoint("http://localhost")).Build(), - }) + }, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, + ) require.NoError(t, err) const endpointEnvVar = "OTLP_ENDPOINT_TEST" @@ -42,7 +47,10 @@ func TestBuildConfig(t *testing.T) { }) t.Run("secure", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithName("test").WithOTLPOutput().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithName("test").WithOTLPOutput().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test") @@ -52,7 +60,10 @@ func TestBuildConfig(t *testing.T) { t.Run("insecure", func(t *testing.T) { collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{ - testutils.NewLogPipelineBuilder().WithName("test-insecure").WithOTLPOutput(testutils.OTLPEndpoint("http://localhost")).Build()}) + testutils.NewLogPipelineBuilder().WithName("test-insecure").WithOTLPOutput(testutils.OTLPEndpoint("http://localhost")).Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-insecure") @@ -63,6 +74,9 @@ func TestBuildConfig(t *testing.T) { t.Run("basic auth", func(t *testing.T) { collectorConfig, envVars, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{ testutils.NewLogPipelineBuilder().WithName("test-basic-auth").WithOTLPOutput(testutils.OTLPBasicAuth("user", "password")).Build(), + }, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-basic-auth") @@ -82,6 +96,9 @@ func TestBuildConfig(t *testing.T) { t.Run("custom header", func(t *testing.T) { collectorConfig, envVars, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{ testutils.NewLogPipelineBuilder().WithName("test-custom-header").WithOTLPOutput(testutils.OTLPCustomHeader("Authorization", "TOKEN_VALUE", "Api-Token")).Build(), + }, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-custom-header") @@ -99,6 +116,9 @@ func TestBuildConfig(t *testing.T) { t.Run("mtls", func(t *testing.T) { collectorConfig, envVars, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{ testutils.NewLogPipelineBuilder().WithName("test-mtls").WithOTLPOutput(testutils.OTLPClientTLSFromString("ca", "cert", "key")).Build(), + }, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-mtls") @@ -115,7 +135,10 @@ func TestBuildConfig(t *testing.T) { }) t.Run("extensions", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.NotEmpty(t, collectorConfig.Extensions.HealthCheck.Endpoint) @@ -125,7 +148,10 @@ func TestBuildConfig(t *testing.T) { }) t.Run("telemetry", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) metricreaders := []config.MetricReader{ @@ -147,7 +173,10 @@ func TestBuildConfig(t *testing.T) { }) t.Run("single pipeline queue size", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithName("test").WithOTLPOutput().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithName("test").WithOTLPOutput().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Equal(t, maxQueueSize, collectorConfig.Exporters["otlp/test"].OTLP.SendingQueue.QueueSize, "Pipeline should have the full queue size") }) @@ -156,7 +185,10 @@ func TestBuildConfig(t *testing.T) { collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{ testutils.NewLogPipelineBuilder().WithName("test-1").WithOTLPOutput().Build(), testutils.NewLogPipelineBuilder().WithName("test-2").WithOTLPOutput().Build(), - testutils.NewLogPipelineBuilder().WithName("test-3").WithOTLPOutput().Build()}) + testutils.NewLogPipelineBuilder().WithName("test-3").WithOTLPOutput().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) @@ -167,7 +199,10 @@ func TestBuildConfig(t *testing.T) { }) t.Run("single pipeline topology", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithName("test").WithOTLPOutput().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithName("test").WithOTLPOutput().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Contains(t, collectorConfig.Service.Pipelines, "logs/test") @@ -175,7 +210,7 @@ func TestBuildConfig(t *testing.T) { require.Equal(t, collectorConfig.Service.Pipelines["logs/test"].Processors[0], "memory_limiter") require.Equal(t, collectorConfig.Service.Pipelines["logs/test"].Processors[1], "k8sattributes") - require.Equal(t, collectorConfig.Service.Pipelines["logs/test"].Processors[2], "resource/insert-cluster-name") + require.Equal(t, collectorConfig.Service.Pipelines["logs/test"].Processors[2], "resource/insert-cluster-attributes") require.Equal(t, collectorConfig.Service.Pipelines["logs/test"].Processors[3], "batch") require.Contains(t, collectorConfig.Service.Pipelines["logs/test"].Exporters, "otlp/test") @@ -184,7 +219,10 @@ func TestBuildConfig(t *testing.T) { t.Run("multi pipeline topology", func(t *testing.T) { collectorConfig, envVars, err := sut.Build(context.Background(), []telemetryv1alpha1.LogPipeline{ testutils.NewLogPipelineBuilder().WithName("test-1").WithOTLPOutput().Build(), - testutils.NewLogPipelineBuilder().WithName("test-2").WithOTLPOutput().Build()}) + testutils.NewLogPipelineBuilder().WithName("test-2").WithOTLPOutput().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-1") @@ -195,7 +233,7 @@ func TestBuildConfig(t *testing.T) { require.Contains(t, collectorConfig.Service.Pipelines["logs/test-1"].Receivers, "otlp") require.Equal(t, collectorConfig.Service.Pipelines["logs/test-1"].Processors[0], "memory_limiter") require.Equal(t, collectorConfig.Service.Pipelines["logs/test-1"].Processors[1], "k8sattributes") - require.Equal(t, collectorConfig.Service.Pipelines["logs/test-1"].Processors[2], "resource/insert-cluster-name") + require.Equal(t, collectorConfig.Service.Pipelines["logs/test-1"].Processors[2], "resource/insert-cluster-attributes") require.Equal(t, collectorConfig.Service.Pipelines["logs/test-1"].Processors[3], "batch") require.Contains(t, collectorConfig.Service.Pipelines, "logs/test-2") @@ -203,7 +241,7 @@ func TestBuildConfig(t *testing.T) { require.Contains(t, collectorConfig.Service.Pipelines["logs/test-2"].Receivers, "otlp") require.Equal(t, collectorConfig.Service.Pipelines["logs/test-2"].Processors[0], "memory_limiter") require.Equal(t, collectorConfig.Service.Pipelines["logs/test-2"].Processors[1], "k8sattributes") - require.Equal(t, collectorConfig.Service.Pipelines["logs/test-2"].Processors[2], "resource/insert-cluster-name") + require.Equal(t, collectorConfig.Service.Pipelines["logs/test-2"].Processors[2], "resource/insert-cluster-attributes") require.Equal(t, collectorConfig.Service.Pipelines["logs/test-2"].Processors[3], "batch") require.Contains(t, envVars, "OTLP_ENDPOINT_TEST_1") @@ -213,6 +251,9 @@ func TestBuildConfig(t *testing.T) { t.Run("marshaling", func(t *testing.T) { config, _, err := sut.Build(context.Background(), []telemetryv1alpha1.LogPipeline{ testutils.NewLogPipelineBuilder().WithName("test").WithOTLPOutput().Build(), + }, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", }) require.NoError(t, err) @@ -230,6 +271,9 @@ func TestBuildConfig(t *testing.T) { t.Run("failed to make otlp exporter config", func(t *testing.T) { _, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{ testutils.NewLogPipelineBuilder().WithName("test-fail").WithOTLPOutput(testutils.OTLPBasicAuthFromSecret("nonexistent-secret", "default", "user", "password")).Build(), + }, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", }) require.Error(t, err) require.ErrorContains(t, err, "failed to make otlp exporter config") diff --git a/internal/otelcollector/config/log/gateway/processors.go b/internal/otelcollector/config/log/gateway/processors.go index 661926441..aaddddef3 100644 --- a/internal/otelcollector/config/log/gateway/processors.go +++ b/internal/otelcollector/config/log/gateway/processors.go @@ -5,14 +5,14 @@ import ( "github.com/kyma-project/telemetry-manager/internal/otelcollector/config/gatewayprocs" ) -func makeProcessorsConfig() Processors { +func makeProcessorsConfig(opts BuildOptions) Processors { return Processors{ BaseProcessors: config.BaseProcessors{ Batch: makeBatchProcessorConfig(), MemoryLimiter: makeMemoryLimiterConfig(), }, - K8sAttributes: gatewayprocs.K8sAttributesProcessorConfig(), - InsertClusterName: gatewayprocs.InsertClusterNameProcessorConfig(), + K8sAttributes: gatewayprocs.K8sAttributesProcessorConfig(), + InsertClusterAttributes: gatewayprocs.InsertClusterAttributesProcessorConfig(opts.ClusterName, opts.CloudProvider), } } diff --git a/internal/otelcollector/config/log/gateway/processors_test.go b/internal/otelcollector/config/log/gateway/processors_test.go index fbcf0ab05..7517fb930 100644 --- a/internal/otelcollector/config/log/gateway/processors_test.go +++ b/internal/otelcollector/config/log/gateway/processors_test.go @@ -16,18 +16,27 @@ func TestProcessors(t *testing.T) { fakeClient := fake.NewClientBuilder().Build() sut := Builder{Reader: fakeClient} - t.Run("insert cluster name processor", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}) + t.Run("insert cluster attributes processor", func(t *testing.T) { + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) - require.Equal(t, 1, len(collectorConfig.Processors.InsertClusterName.Attributes)) - require.Equal(t, "insert", collectorConfig.Processors.InsertClusterName.Attributes[0].Action) - require.Equal(t, "k8s.cluster.name", collectorConfig.Processors.InsertClusterName.Attributes[0].Key) - require.Equal(t, "${KUBERNETES_SERVICE_HOST}", collectorConfig.Processors.InsertClusterName.Attributes[0].Value) + require.Equal(t, 2, len(collectorConfig.Processors.InsertClusterAttributes.Attributes)) + require.Equal(t, "insert", collectorConfig.Processors.InsertClusterAttributes.Attributes[0].Action) + require.Equal(t, "k8s.cluster.name", collectorConfig.Processors.InsertClusterAttributes.Attributes[0].Key) + require.Equal(t, "test-cluster", collectorConfig.Processors.InsertClusterAttributes.Attributes[0].Value) + require.Equal(t, "insert", collectorConfig.Processors.InsertClusterAttributes.Attributes[1].Action) + require.Equal(t, "cloud.provider", collectorConfig.Processors.InsertClusterAttributes.Attributes[1].Key) + require.Equal(t, "test-cloud-provider", collectorConfig.Processors.InsertClusterAttributes.Attributes[1].Value) }) t.Run("memory limit processors", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Equal(t, "1s", collectorConfig.Processors.MemoryLimiter.CheckInterval) @@ -36,7 +45,10 @@ func TestProcessors(t *testing.T) { }) t.Run("batch processors", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Equal(t, 512, collectorConfig.Processors.Batch.SendBatchSize) @@ -45,7 +57,10 @@ func TestProcessors(t *testing.T) { }) t.Run("k8s attributes processors", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.LogPipeline{testutils.NewLogPipelineBuilder().WithOTLPOutput().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Equal(t, "serviceAccount", collectorConfig.Processors.K8sAttributes.AuthType) diff --git a/internal/otelcollector/config/log/gateway/testdata/config.yaml b/internal/otelcollector/config/log/gateway/testdata/config.yaml index db675e4f3..a3577c1f4 100644 --- a/internal/otelcollector/config/log/gateway/testdata/config.yaml +++ b/internal/otelcollector/config/log/gateway/testdata/config.yaml @@ -11,7 +11,7 @@ service: processors: - memory_limiter - k8sattributes - - resource/insert-cluster-name + - resource/insert-cluster-attributes - batch exporters: - otlp/test @@ -65,6 +65,18 @@ processors: - from: pod key: app tag_name: kyma.app_name + - from: node + key: topology.kubernetes.io/region + tag_name: cloud.region + - from: node + key: topology.kubernetes.io/zone + tag_name: cloud.availability_zone + - from: node + key: node.kubernetes.io/instance-type + tag_name: host.type + - from: node + key: kubernetes.io/arch + tag_name: host.arch pod_association: - sources: - from: resource_attribute @@ -74,11 +86,14 @@ processors: name: k8s.pod.uid - sources: - from: connection - resource/insert-cluster-name: + resource/insert-cluster-attributes: attributes: - action: insert key: k8s.cluster.name - value: ${KUBERNETES_SERVICE_HOST} + value: test-cluster + - action: insert + key: cloud.provider + value: test-cloud-provider exporters: otlp/test: endpoint: ${OTLP_ENDPOINT_TEST} diff --git a/internal/otelcollector/config/metric/gateway/config.go b/internal/otelcollector/config/metric/gateway/config.go index 40040b910..66013cf4c 100644 --- a/internal/otelcollector/config/metric/gateway/config.go +++ b/internal/otelcollector/config/metric/gateway/config.go @@ -49,7 +49,7 @@ type Processors struct { config.BaseProcessors `yaml:",inline"` K8sAttributes *config.K8sAttributesProcessor `yaml:"k8sattributes,omitempty"` - InsertClusterName *config.ResourceProcessor `yaml:"resource/insert-cluster-name,omitempty"` + InsertClusterAttributes *config.ResourceProcessor `yaml:"resource/insert-cluster-attributes,omitempty"` DropDiagnosticMetricsIfInputSourcePrometheus *FilterProcessor `yaml:"filter/drop-diagnostic-metrics-if-input-source-prometheus,omitempty"` DropDiagnosticMetricsIfInputSourceIstio *FilterProcessor `yaml:"filter/drop-diagnostic-metrics-if-input-source-istio,omitempty"` DropIfInputSourceRuntime *FilterProcessor `yaml:"filter/drop-if-input-source-runtime,omitempty"` diff --git a/internal/otelcollector/config/metric/gateway/config_builder.go b/internal/otelcollector/config/metric/gateway/config_builder.go index ac6841f69..954ff83a8 100644 --- a/internal/otelcollector/config/metric/gateway/config_builder.go +++ b/internal/otelcollector/config/metric/gateway/config_builder.go @@ -25,6 +25,8 @@ type Builder struct { type BuildOptions struct { GatewayNamespace string InstrumentationScopeVersion string + ClusterName string + CloudProvider string } func (b *Builder) Build(ctx context.Context, pipelines []telemetryv1alpha1.MetricPipeline, opts BuildOptions) (*Config, otlpexporter.EnvVars, error) { @@ -34,7 +36,7 @@ func (b *Builder) Build(ctx context.Context, pipelines []telemetryv1alpha1.Metri Extensions: config.DefaultExtensions(), }, Receivers: makeReceiversConfig(), - Processors: makeProcessorsConfig(), + Processors: makeProcessorsConfig(opts), Exporters: make(Exporters), Connectors: make(Connectors), } diff --git a/internal/otelcollector/config/metric/gateway/config_builder_test.go b/internal/otelcollector/config/metric/gateway/config_builder_test.go index 00abf67e3..067cb8570 100644 --- a/internal/otelcollector/config/metric/gateway/config_builder_test.go +++ b/internal/otelcollector/config/metric/gateway/config_builder_test.go @@ -29,7 +29,10 @@ func TestMakeConfig(t *testing.T) { []telemetryv1alpha1.MetricPipeline{ testutils.NewMetricPipelineBuilder().WithName("test").WithOTLPOutput(testutils.OTLPEndpoint("http://localhost")).Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) @@ -50,7 +53,10 @@ func TestMakeConfig(t *testing.T) { []telemetryv1alpha1.MetricPipeline{ testutils.NewMetricPipelineBuilder().WithName("test").WithOTLPOutput(testutils.OTLPEndpoint("https://localhost")).Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test") @@ -65,7 +71,10 @@ func TestMakeConfig(t *testing.T) { []telemetryv1alpha1.MetricPipeline{ testutils.NewMetricPipelineBuilder().WithName("test-insecure").WithOTLPOutput(testutils.OTLPEndpoint("http://localhost")).Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-insecure") @@ -80,7 +89,10 @@ func TestMakeConfig(t *testing.T) { []telemetryv1alpha1.MetricPipeline{ testutils.NewMetricPipelineBuilder().WithName("test-basic-auth").WithOTLPOutput(testutils.OTLPBasicAuth("user", "password")).Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-basic-auth") @@ -103,7 +115,10 @@ func TestMakeConfig(t *testing.T) { []telemetryv1alpha1.MetricPipeline{ testutils.NewMetricPipelineBuilder().WithName("test-custom-header").WithOTLPOutput(testutils.OTLPCustomHeader("Authorization", "TOKEN_VALUE", "Api-Token")).Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-custom-header") @@ -124,7 +139,10 @@ func TestMakeConfig(t *testing.T) { []telemetryv1alpha1.MetricPipeline{ testutils.NewMetricPipelineBuilder().WithName("test-mtls").WithOTLPOutput(testutils.OTLPClientTLSFromString("ca", "cert", "key")).Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-mtls") @@ -146,7 +164,10 @@ func TestMakeConfig(t *testing.T) { []telemetryv1alpha1.MetricPipeline{ testutils.NewMetricPipelineBuilder().Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) @@ -162,7 +183,10 @@ func TestMakeConfig(t *testing.T) { []telemetryv1alpha1.MetricPipeline{ testutils.NewMetricPipelineBuilder().Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) @@ -190,7 +214,10 @@ func TestMakeConfig(t *testing.T) { []telemetryv1alpha1.MetricPipeline{ testutils.NewMetricPipelineBuilder().WithName("test").Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) require.Equal(t, maxQueueSize, collectorConfig.Exporters["otlp/test"].OTLP.SendingQueue.QueueSize, "Pipeline should have the full queue size") @@ -204,7 +231,10 @@ func TestMakeConfig(t *testing.T) { testutils.NewMetricPipelineBuilder().WithName("test-2").Build(), testutils.NewMetricPipelineBuilder().WithName("test-3").Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) @@ -223,7 +253,10 @@ func TestMakeConfig(t *testing.T) { testutils.NewMetricPipelineBuilder().WithName("test-2").Build(), testutils.NewMetricPipelineBuilder().WithName("test-3").Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) @@ -259,7 +292,10 @@ func TestMakeConfig(t *testing.T) { WithOTLPInput(tt.withOTLPInput). WithOTLPOutput(testutils.OTLPEndpoint("https://localhost")).Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }, ) require.NoError(t, err) diff --git a/internal/otelcollector/config/metric/gateway/processors.go b/internal/otelcollector/config/metric/gateway/processors.go index 39d5178cd..42b73ce49 100644 --- a/internal/otelcollector/config/metric/gateway/processors.go +++ b/internal/otelcollector/config/metric/gateway/processors.go @@ -10,14 +10,14 @@ import ( "github.com/kyma-project/telemetry-manager/internal/otelcollector/config/ottlexpr" ) -func makeProcessorsConfig() Processors { +func makeProcessorsConfig(opts BuildOptions) Processors { return Processors{ BaseProcessors: config.BaseProcessors{ Batch: makeBatchProcessorConfig(), MemoryLimiter: makeMemoryLimiterConfig(), }, K8sAttributes: gatewayprocs.K8sAttributesProcessorConfig(), - InsertClusterName: gatewayprocs.InsertClusterNameProcessorConfig(), + InsertClusterAttributes: gatewayprocs.InsertClusterAttributesProcessorConfig(opts.ClusterName, opts.CloudProvider), ResolveServiceName: makeResolveServiceNameConfig(), DropKymaAttributes: gatewayprocs.DropKymaAttributesProcessorConfig(), DeleteSkipEnrichmentAttribute: makeDeleteSkipEnrichmentAttributeConfig(), diff --git a/internal/otelcollector/config/metric/gateway/processors_test.go b/internal/otelcollector/config/metric/gateway/processors_test.go index 5635e4bc8..8fdf8fa7b 100644 --- a/internal/otelcollector/config/metric/gateway/processors_test.go +++ b/internal/otelcollector/config/metric/gateway/processors_test.go @@ -16,20 +16,26 @@ func TestProcessors(t *testing.T) { fakeClient := fake.NewClientBuilder().Build() sut := Builder{Reader: fakeClient} - t.Run("insert cluster name processor", func(t *testing.T) { + t.Run("insert cluster attributes processor", func(t *testing.T) { collectorConfig, _, err := sut.Build( ctx, []telemetryv1alpha1.MetricPipeline{ testutils.NewMetricPipelineBuilder().Build(), }, - BuildOptions{}, + BuildOptions{ + ClusterName: "CLUSTER_NAME", + CloudProvider: "CLOUD_PROVIDER", + }, ) require.NoError(t, err) - require.Equal(t, 1, len(collectorConfig.Processors.InsertClusterName.Attributes)) - require.Equal(t, "insert", collectorConfig.Processors.InsertClusterName.Attributes[0].Action) - require.Equal(t, "k8s.cluster.name", collectorConfig.Processors.InsertClusterName.Attributes[0].Key) - require.Equal(t, "${KUBERNETES_SERVICE_HOST}", collectorConfig.Processors.InsertClusterName.Attributes[0].Value) + require.Equal(t, 2, len(collectorConfig.Processors.InsertClusterAttributes.Attributes)) + require.Equal(t, "insert", collectorConfig.Processors.InsertClusterAttributes.Attributes[0].Action) + require.Equal(t, "k8s.cluster.name", collectorConfig.Processors.InsertClusterAttributes.Attributes[0].Key) + require.Equal(t, "CLUSTER_NAME", collectorConfig.Processors.InsertClusterAttributes.Attributes[0].Value) + require.Equal(t, "insert", collectorConfig.Processors.InsertClusterAttributes.Attributes[1].Action) + require.Equal(t, "cloud.provider", collectorConfig.Processors.InsertClusterAttributes.Attributes[1].Key) + require.Equal(t, "CLOUD_PROVIDER", collectorConfig.Processors.InsertClusterAttributes.Attributes[1].Value) }) t.Run("memory limit processors", func(t *testing.T) { diff --git a/internal/otelcollector/config/metric/gateway/service.go b/internal/otelcollector/config/metric/gateway/service.go index a6a00fd84..d0322b496 100644 --- a/internal/otelcollector/config/metric/gateway/service.go +++ b/internal/otelcollector/config/metric/gateway/service.go @@ -36,7 +36,7 @@ func makeOutputPipelineServiceConfig(pipeline *telemetryv1alpha1.MetricPipeline) processors = append(processors, "transform/set-instrumentation-scope-kyma") - processors = append(processors, "resource/insert-cluster-name", "resource/delete-skip-enrichment-attribute", "batch") + processors = append(processors, "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch") return config.Pipeline{ Receivers: []string{formatRoutingConnectorID(pipeline.Name), formatForwardConnectorID(pipeline.Name)}, diff --git a/internal/otelcollector/config/metric/gateway/service_test.go b/internal/otelcollector/config/metric/gateway/service_test.go index 65f11250c..3f35163c1 100644 --- a/internal/otelcollector/config/metric/gateway/service_test.go +++ b/internal/otelcollector/config/metric/gateway/service_test.go @@ -47,7 +47,7 @@ func TestService(t *testing.T) { "filter/drop-if-input-source-istio", "filter/drop-if-input-source-otlp", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-output"].Processors) @@ -81,7 +81,7 @@ func TestService(t *testing.T) { "filter/drop-if-input-source-runtime", "filter/drop-if-input-source-istio", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-output"].Processors) @@ -116,7 +116,7 @@ func TestService(t *testing.T) { "filter/drop-if-input-source-istio", "filter/drop-diagnostic-metrics-if-input-source-prometheus", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-output"].Processors) @@ -151,7 +151,7 @@ func TestService(t *testing.T) { "filter/drop-if-input-source-istio", "filter/drop-diagnostic-metrics-if-input-source-prometheus", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-output"].Processors) @@ -185,7 +185,7 @@ func TestService(t *testing.T) { "filter/drop-if-input-source-runtime", "filter/drop-if-input-source-prometheus", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-output"].Processors) @@ -220,7 +220,7 @@ func TestService(t *testing.T) { "filter/drop-if-input-source-prometheus", "filter/drop-diagnostic-metrics-if-input-source-istio", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-output"].Processors) @@ -255,7 +255,7 @@ func TestService(t *testing.T) { "filter/drop-if-input-source-prometheus", "filter/drop-diagnostic-metrics-if-input-source-istio", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-output"].Processors) @@ -290,7 +290,7 @@ func TestService(t *testing.T) { "filter/drop-if-input-source-prometheus", "filter/drop-if-input-source-istio", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-output"].Processors) @@ -325,7 +325,7 @@ func TestService(t *testing.T) { "filter/drop-if-input-source-prometheus", "filter/drop-if-input-source-istio", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-output"].Processors) @@ -373,7 +373,7 @@ func TestService(t *testing.T) { "filter/drop-if-input-source-istio", "filter/test-1-filter-by-namespace-runtime-input", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-1-output"].Processors) @@ -399,7 +399,7 @@ func TestService(t *testing.T) { "filter/test-2-filter-by-namespace-prometheus-input", "filter/drop-diagnostic-metrics-if-input-source-prometheus", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-2-output"].Processors) @@ -424,7 +424,7 @@ func TestService(t *testing.T) { "filter/drop-if-input-source-prometheus", "filter/drop-diagnostic-metrics-if-input-source-istio", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, collectorConfig.Service.Pipelines["metrics/test-3-output"].Processors) @@ -456,7 +456,7 @@ func TestService_RuntimeResources_Enabled(t *testing.T) { "filter/drop-if-input-source-prometheus", "filter/drop-if-input-source-istio", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, @@ -473,7 +473,7 @@ func TestService_RuntimeResources_Enabled(t *testing.T) { "filter/drop-if-input-source-istio", "filter/drop-runtime-pod-metrics", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, @@ -489,7 +489,7 @@ func TestService_RuntimeResources_Enabled(t *testing.T) { "filter/drop-if-input-source-istio", "filter/drop-runtime-container-metrics", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, @@ -505,7 +505,7 @@ func TestService_RuntimeResources_Enabled(t *testing.T) { "filter/drop-if-input-source-istio", "filter/drop-runtime-node-metrics", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, @@ -521,7 +521,7 @@ func TestService_RuntimeResources_Enabled(t *testing.T) { "filter/drop-if-input-source-istio", "filter/drop-runtime-volume-metrics", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, @@ -537,7 +537,7 @@ func TestService_RuntimeResources_Enabled(t *testing.T) { "filter/drop-if-input-source-istio", "filter/drop-runtime-deployment-metrics", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, @@ -553,7 +553,7 @@ func TestService_RuntimeResources_Enabled(t *testing.T) { "filter/drop-if-input-source-istio", "filter/drop-runtime-daemonset-metrics", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, @@ -569,7 +569,7 @@ func TestService_RuntimeResources_Enabled(t *testing.T) { "filter/drop-if-input-source-istio", "filter/drop-runtime-statefulset-metrics", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, @@ -585,7 +585,7 @@ func TestService_RuntimeResources_Enabled(t *testing.T) { "filter/drop-if-input-source-istio", "filter/drop-runtime-job-metrics", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, @@ -615,7 +615,7 @@ func TestService_RuntimeResources_Enabled(t *testing.T) { "filter/drop-runtime-statefulset-metrics", "filter/drop-runtime-job-metrics", "transform/set-instrumentation-scope-kyma", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "resource/delete-skip-enrichment-attribute", "batch", }, diff --git a/internal/otelcollector/config/metric/gateway/testdata/config.yaml b/internal/otelcollector/config/metric/gateway/testdata/config.yaml index 8deeab183..7dacfb6b5 100644 --- a/internal/otelcollector/config/metric/gateway/testdata/config.yaml +++ b/internal/otelcollector/config/metric/gateway/testdata/config.yaml @@ -31,7 +31,7 @@ service: - filter/drop-if-input-source-prometheus - filter/drop-if-input-source-istio - transform/set-instrumentation-scope-kyma - - resource/insert-cluster-name + - resource/insert-cluster-attributes - resource/delete-skip-enrichment-attribute - batch exporters: @@ -108,6 +108,18 @@ processors: - from: pod key: app tag_name: kyma.app_name + - from: node + key: topology.kubernetes.io/region + tag_name: cloud.region + - from: node + key: topology.kubernetes.io/zone + tag_name: cloud.availability_zone + - from: node + key: node.kubernetes.io/instance-type + tag_name: host.type + - from: node + key: kubernetes.io/arch + tag_name: host.arch pod_association: - sources: - from: resource_attribute @@ -117,11 +129,14 @@ processors: name: k8s.pod.uid - sources: - from: connection - resource/insert-cluster-name: + resource/insert-cluster-attributes: attributes: - action: insert key: k8s.cluster.name - value: ${KUBERNETES_SERVICE_HOST} + value: test-cluster + - action: insert + key: cloud.provider + value: test-cloud-provider filter/drop-if-input-source-runtime: metrics: metric: diff --git a/internal/otelcollector/config/metric/gateway/testdata/config_otlp_disabled.yaml b/internal/otelcollector/config/metric/gateway/testdata/config_otlp_disabled.yaml index afbbdcbdd..af1f71ffa 100644 --- a/internal/otelcollector/config/metric/gateway/testdata/config_otlp_disabled.yaml +++ b/internal/otelcollector/config/metric/gateway/testdata/config_otlp_disabled.yaml @@ -32,7 +32,7 @@ service: - filter/drop-if-input-source-istio - filter/drop-if-input-source-otlp - transform/set-instrumentation-scope-kyma - - resource/insert-cluster-name + - resource/insert-cluster-attributes - resource/delete-skip-enrichment-attribute - batch exporters: @@ -109,6 +109,18 @@ processors: - from: pod key: app tag_name: kyma.app_name + - from: node + key: topology.kubernetes.io/region + tag_name: cloud.region + - from: node + key: topology.kubernetes.io/zone + tag_name: cloud.availability_zone + - from: node + key: node.kubernetes.io/instance-type + tag_name: host.type + - from: node + key: kubernetes.io/arch + tag_name: host.arch pod_association: - sources: - from: resource_attribute @@ -118,11 +130,14 @@ processors: name: k8s.pod.uid - sources: - from: connection - resource/insert-cluster-name: + resource/insert-cluster-attributes: attributes: - action: insert key: k8s.cluster.name - value: ${KUBERNETES_SERVICE_HOST} + value: test-cluster + - action: insert + key: cloud.provider + value: test-cloud-provider filter/drop-if-input-source-runtime: metrics: metric: diff --git a/internal/otelcollector/config/trace/gateway/config.go b/internal/otelcollector/config/trace/gateway/config.go index 02ae8c3f3..28b877b32 100644 --- a/internal/otelcollector/config/trace/gateway/config.go +++ b/internal/otelcollector/config/trace/gateway/config.go @@ -19,11 +19,11 @@ type Receivers struct { type Processors struct { config.BaseProcessors `yaml:",inline"` - K8sAttributes *config.K8sAttributesProcessor `yaml:"k8sattributes,omitempty"` - InsertClusterName *config.ResourceProcessor `yaml:"resource/insert-cluster-name,omitempty"` - DropNoisySpans FilterProcessor `yaml:"filter/drop-noisy-spans"` - ResolveServiceName *TransformProcessor `yaml:"transform/resolve-service-name,omitempty"` - DropKymaAttributes *config.ResourceProcessor `yaml:"resource/drop-kyma-attributes,omitempty"` + K8sAttributes *config.K8sAttributesProcessor `yaml:"k8sattributes,omitempty"` + InsertClusterAttributes *config.ResourceProcessor `yaml:"resource/insert-cluster-attributes,omitempty"` + DropNoisySpans FilterProcessor `yaml:"filter/drop-noisy-spans"` + ResolveServiceName *TransformProcessor `yaml:"transform/resolve-service-name,omitempty"` + DropKymaAttributes *config.ResourceProcessor `yaml:"resource/drop-kyma-attributes,omitempty"` } type FilterProcessor struct { diff --git a/internal/otelcollector/config/trace/gateway/config_builder.go b/internal/otelcollector/config/trace/gateway/config_builder.go index b6ff6b86c..70faca2d3 100644 --- a/internal/otelcollector/config/trace/gateway/config_builder.go +++ b/internal/otelcollector/config/trace/gateway/config_builder.go @@ -22,14 +22,19 @@ type Builder struct { Reader client.Reader } -func (b *Builder) Build(ctx context.Context, pipelines []telemetryv1alpha1.TracePipeline) (*Config, otlpexporter.EnvVars, error) { +type BuildOptions struct { + ClusterName string + CloudProvider string +} + +func (b *Builder) Build(ctx context.Context, pipelines []telemetryv1alpha1.TracePipeline, opts BuildOptions) (*Config, otlpexporter.EnvVars, error) { cfg := &Config{ Base: config.Base{ Service: config.DefaultService(make(config.Pipelines)), Extensions: config.DefaultExtensions(), }, Receivers: makeReceiversConfig(), - Processors: makeProcessorsConfig(), + Processors: makeProcessorsConfig(opts), Exporters: make(Exporters), } @@ -100,7 +105,7 @@ func makePipelineConfig(exporterIDs ...string) config.Pipeline { "memory_limiter", "k8sattributes", "filter/drop-noisy-spans", - "resource/insert-cluster-name", + "resource/insert-cluster-attributes", "transform/resolve-service-name", "resource/drop-kyma-attributes", "batch", diff --git a/internal/otelcollector/config/trace/gateway/config_builder_test.go b/internal/otelcollector/config/trace/gateway/config_builder_test.go index 451cd028f..dac8a08b8 100644 --- a/internal/otelcollector/config/trace/gateway/config_builder_test.go +++ b/internal/otelcollector/config/trace/gateway/config_builder_test.go @@ -26,6 +26,9 @@ func TestBuildConfig(t *testing.T) { t.Run("otlp exporter endpoint", func(t *testing.T) { collectorConfig, envVars, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{ testutils.NewTracePipelineBuilder().WithName("test").WithOTLPOutput(testutils.OTLPEndpoint("http://localhost")).Build(), + }, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", }) require.NoError(t, err) @@ -41,7 +44,10 @@ func TestBuildConfig(t *testing.T) { }) t.Run("secure", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().WithName("test").Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().WithName("test").Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test") @@ -51,7 +57,10 @@ func TestBuildConfig(t *testing.T) { t.Run("insecure", func(t *testing.T) { collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{ - testutils.NewTracePipelineBuilder().WithName("test-insecure").WithOTLPOutput(testutils.OTLPEndpoint("http://localhost")).Build()}) + testutils.NewTracePipelineBuilder().WithName("test-insecure").WithOTLPOutput(testutils.OTLPEndpoint("http://localhost")).Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-insecure") @@ -62,6 +71,9 @@ func TestBuildConfig(t *testing.T) { t.Run("basic auth", func(t *testing.T) { collectorConfig, envVars, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{ testutils.NewTracePipelineBuilder().WithName("test-basic-auth").WithOTLPOutput(testutils.OTLPBasicAuth("user", "password")).Build(), + }, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-basic-auth") @@ -81,6 +93,9 @@ func TestBuildConfig(t *testing.T) { t.Run("custom header", func(t *testing.T) { collectorConfig, envVars, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{ testutils.NewTracePipelineBuilder().WithName("test-custom-header").WithOTLPOutput(testutils.OTLPCustomHeader("Authorization", "TOKEN_VALUE", "Api-Token")).Build(), + }, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-custom-header") @@ -98,6 +113,9 @@ func TestBuildConfig(t *testing.T) { t.Run("mtls", func(t *testing.T) { collectorConfig, envVars, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{ testutils.NewTracePipelineBuilder().WithName("test-mtls").WithOTLPOutput(testutils.OTLPClientTLSFromString("ca", "cert", "key")).Build(), + }, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-mtls") @@ -114,7 +132,10 @@ func TestBuildConfig(t *testing.T) { }) t.Run("extensions", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.NotEmpty(t, collectorConfig.Extensions.HealthCheck.Endpoint) @@ -124,7 +145,10 @@ func TestBuildConfig(t *testing.T) { }) t.Run("telemetry", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) metricreaders := []config.MetricReader{ @@ -146,7 +170,10 @@ func TestBuildConfig(t *testing.T) { }) t.Run("single pipeline queue size", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().WithName("test").Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().WithName("test").Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Equal(t, maxQueueSize, collectorConfig.Exporters["otlp/test"].OTLP.SendingQueue.QueueSize, "Pipeline should have the full queue size") }) @@ -155,7 +182,10 @@ func TestBuildConfig(t *testing.T) { collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{ testutils.NewTracePipelineBuilder().WithName("test-1").Build(), testutils.NewTracePipelineBuilder().WithName("test-2").Build(), - testutils.NewTracePipelineBuilder().WithName("test-3").Build()}) + testutils.NewTracePipelineBuilder().WithName("test-3").Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) @@ -166,7 +196,10 @@ func TestBuildConfig(t *testing.T) { }) t.Run("single pipeline topology", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().WithName("test").Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().WithName("test").Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Contains(t, collectorConfig.Service.Pipelines, "traces/test") @@ -175,7 +208,7 @@ func TestBuildConfig(t *testing.T) { require.Equal(t, collectorConfig.Service.Pipelines["traces/test"].Processors[0], "memory_limiter") require.Equal(t, collectorConfig.Service.Pipelines["traces/test"].Processors[1], "k8sattributes") require.Equal(t, collectorConfig.Service.Pipelines["traces/test"].Processors[2], "filter/drop-noisy-spans") - require.Equal(t, collectorConfig.Service.Pipelines["traces/test"].Processors[3], "resource/insert-cluster-name") + require.Equal(t, collectorConfig.Service.Pipelines["traces/test"].Processors[3], "resource/insert-cluster-attributes") require.Equal(t, collectorConfig.Service.Pipelines["traces/test"].Processors[4], "transform/resolve-service-name") require.Equal(t, collectorConfig.Service.Pipelines["traces/test"].Processors[5], "resource/drop-kyma-attributes") require.Equal(t, collectorConfig.Service.Pipelines["traces/test"].Processors[6], "batch") @@ -186,7 +219,10 @@ func TestBuildConfig(t *testing.T) { t.Run("multi pipeline topology", func(t *testing.T) { collectorConfig, envVars, err := sut.Build(context.Background(), []telemetryv1alpha1.TracePipeline{ testutils.NewTracePipelineBuilder().WithName("test-1").Build(), - testutils.NewTracePipelineBuilder().WithName("test-2").Build()}) + testutils.NewTracePipelineBuilder().WithName("test-2").Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Contains(t, collectorConfig.Exporters, "otlp/test-1") @@ -198,7 +234,7 @@ func TestBuildConfig(t *testing.T) { require.Equal(t, collectorConfig.Service.Pipelines["traces/test-1"].Processors[0], "memory_limiter") require.Equal(t, collectorConfig.Service.Pipelines["traces/test-1"].Processors[1], "k8sattributes") require.Equal(t, collectorConfig.Service.Pipelines["traces/test-1"].Processors[2], "filter/drop-noisy-spans") - require.Equal(t, collectorConfig.Service.Pipelines["traces/test-1"].Processors[3], "resource/insert-cluster-name") + require.Equal(t, collectorConfig.Service.Pipelines["traces/test-1"].Processors[3], "resource/insert-cluster-attributes") require.Equal(t, collectorConfig.Service.Pipelines["traces/test-1"].Processors[4], "transform/resolve-service-name") require.Equal(t, collectorConfig.Service.Pipelines["traces/test-1"].Processors[5], "resource/drop-kyma-attributes") require.Equal(t, collectorConfig.Service.Pipelines["traces/test-1"].Processors[6], "batch") @@ -209,7 +245,7 @@ func TestBuildConfig(t *testing.T) { require.Equal(t, collectorConfig.Service.Pipelines["traces/test-2"].Processors[0], "memory_limiter") require.Equal(t, collectorConfig.Service.Pipelines["traces/test-2"].Processors[1], "k8sattributes") require.Equal(t, collectorConfig.Service.Pipelines["traces/test-2"].Processors[2], "filter/drop-noisy-spans") - require.Equal(t, collectorConfig.Service.Pipelines["traces/test-2"].Processors[3], "resource/insert-cluster-name") + require.Equal(t, collectorConfig.Service.Pipelines["traces/test-2"].Processors[3], "resource/insert-cluster-attributes") require.Equal(t, collectorConfig.Service.Pipelines["traces/test-2"].Processors[4], "transform/resolve-service-name") require.Equal(t, collectorConfig.Service.Pipelines["traces/test-2"].Processors[5], "resource/drop-kyma-attributes") require.Equal(t, collectorConfig.Service.Pipelines["traces/test-2"].Processors[6], "batch") @@ -221,6 +257,9 @@ func TestBuildConfig(t *testing.T) { t.Run("marshaling", func(t *testing.T) { config, _, err := sut.Build(context.Background(), []telemetryv1alpha1.TracePipeline{ testutils.NewTracePipelineBuilder().WithName("test").Build(), + }, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", }) require.NoError(t, err) diff --git a/internal/otelcollector/config/trace/gateway/processors.go b/internal/otelcollector/config/trace/gateway/processors.go index f43275b62..c2677aa46 100644 --- a/internal/otelcollector/config/trace/gateway/processors.go +++ b/internal/otelcollector/config/trace/gateway/processors.go @@ -5,17 +5,17 @@ import ( "github.com/kyma-project/telemetry-manager/internal/otelcollector/config/gatewayprocs" ) -func makeProcessorsConfig() Processors { +func makeProcessorsConfig(opts BuildOptions) Processors { return Processors{ BaseProcessors: config.BaseProcessors{ Batch: makeBatchProcessorConfig(), MemoryLimiter: makeMemoryLimiterConfig(), }, - K8sAttributes: gatewayprocs.K8sAttributesProcessorConfig(), - InsertClusterName: gatewayprocs.InsertClusterNameProcessorConfig(), - DropNoisySpans: makeDropNoisySpansConfig(), - ResolveServiceName: makeResolveServiceNameConfig(), - DropKymaAttributes: gatewayprocs.DropKymaAttributesProcessorConfig(), + K8sAttributes: gatewayprocs.K8sAttributesProcessorConfig(), + InsertClusterAttributes: gatewayprocs.InsertClusterAttributesProcessorConfig(opts.ClusterName, opts.CloudProvider), + DropNoisySpans: makeDropNoisySpansConfig(), + ResolveServiceName: makeResolveServiceNameConfig(), + DropKymaAttributes: gatewayprocs.DropKymaAttributesProcessorConfig(), } } diff --git a/internal/otelcollector/config/trace/gateway/processors_test.go b/internal/otelcollector/config/trace/gateway/processors_test.go index a16de357b..01e968469 100644 --- a/internal/otelcollector/config/trace/gateway/processors_test.go +++ b/internal/otelcollector/config/trace/gateway/processors_test.go @@ -16,18 +16,27 @@ func TestProcessors(t *testing.T) { fakeClient := fake.NewClientBuilder().Build() sut := Builder{Reader: fakeClient} - t.Run("insert cluster name processor", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}) + t.Run("insert cluster attributes processor", func(t *testing.T) { + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) - require.Equal(t, 1, len(collectorConfig.Processors.InsertClusterName.Attributes)) - require.Equal(t, "insert", collectorConfig.Processors.InsertClusterName.Attributes[0].Action) - require.Equal(t, "k8s.cluster.name", collectorConfig.Processors.InsertClusterName.Attributes[0].Key) - require.Equal(t, "${KUBERNETES_SERVICE_HOST}", collectorConfig.Processors.InsertClusterName.Attributes[0].Value) + require.Equal(t, 2, len(collectorConfig.Processors.InsertClusterAttributes.Attributes)) + require.Equal(t, "insert", collectorConfig.Processors.InsertClusterAttributes.Attributes[0].Action) + require.Equal(t, "k8s.cluster.name", collectorConfig.Processors.InsertClusterAttributes.Attributes[0].Key) + require.Equal(t, "test-cluster", collectorConfig.Processors.InsertClusterAttributes.Attributes[0].Value) + require.Equal(t, "insert", collectorConfig.Processors.InsertClusterAttributes.Attributes[1].Action) + require.Equal(t, "cloud.provider", collectorConfig.Processors.InsertClusterAttributes.Attributes[1].Key) + require.Equal(t, "test-cloud-provider", collectorConfig.Processors.InsertClusterAttributes.Attributes[1].Value) }) t.Run("memory limit processors", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Equal(t, "1s", collectorConfig.Processors.MemoryLimiter.CheckInterval) @@ -36,7 +45,10 @@ func TestProcessors(t *testing.T) { }) t.Run("batch processors", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Equal(t, 512, collectorConfig.Processors.Batch.SendBatchSize) @@ -45,7 +57,10 @@ func TestProcessors(t *testing.T) { }) t.Run("k8s attributes processors", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Equal(t, "serviceAccount", collectorConfig.Processors.K8sAttributes.AuthType) @@ -73,7 +88,10 @@ func TestProcessors(t *testing.T) { }) t.Run("filter processor", func(t *testing.T) { - collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}) + collectorConfig, _, err := sut.Build(ctx, []telemetryv1alpha1.TracePipeline{testutils.NewTracePipelineBuilder().Build()}, BuildOptions{ + ClusterName: "test-cluster", + CloudProvider: "test-cloud-provider", + }) require.NoError(t, err) require.Equal(t, 9, len(collectorConfig.Processors.DropNoisySpans.Traces.Span), "Span filter list size is wrong") diff --git a/internal/otelcollector/config/trace/gateway/testdata/config.yaml b/internal/otelcollector/config/trace/gateway/testdata/config.yaml index c9d0ce5e9..bb04ebb0e 100644 --- a/internal/otelcollector/config/trace/gateway/testdata/config.yaml +++ b/internal/otelcollector/config/trace/gateway/testdata/config.yaml @@ -12,7 +12,7 @@ service: - memory_limiter - k8sattributes - filter/drop-noisy-spans - - resource/insert-cluster-name + - resource/insert-cluster-attributes - transform/resolve-service-name - resource/drop-kyma-attributes - batch @@ -68,6 +68,18 @@ processors: - from: pod key: app tag_name: kyma.app_name + - from: node + key: topology.kubernetes.io/region + tag_name: cloud.region + - from: node + key: topology.kubernetes.io/zone + tag_name: cloud.availability_zone + - from: node + key: node.kubernetes.io/instance-type + tag_name: host.type + - from: node + key: kubernetes.io/arch + tag_name: host.arch pod_association: - sources: - from: resource_attribute @@ -77,11 +89,14 @@ processors: name: k8s.pod.uid - sources: - from: connection - resource/insert-cluster-name: + resource/insert-cluster-attributes: attributes: - action: insert key: k8s.cluster.name - value: ${KUBERNETES_SERVICE_HOST} + value: test-cluster + - action: insert + key: cloud.provider + value: test-cloud-provider filter/drop-noisy-spans: traces: span: diff --git a/internal/reconciler/logpipeline/otel/mocks/gateway_config_builder.go b/internal/reconciler/logpipeline/otel/mocks/gateway_config_builder.go index 92e2d6362..f9eb01d7a 100644 --- a/internal/reconciler/logpipeline/otel/mocks/gateway_config_builder.go +++ b/internal/reconciler/logpipeline/otel/mocks/gateway_config_builder.go @@ -18,9 +18,9 @@ type GatewayConfigBuilder struct { mock.Mock } -// Build provides a mock function with given fields: ctx, pipelines -func (_m *GatewayConfigBuilder) Build(ctx context.Context, pipelines []v1alpha1.LogPipeline) (*gateway.Config, otlpexporter.EnvVars, error) { - ret := _m.Called(ctx, pipelines) +// Build provides a mock function with given fields: ctx, pipelines, opts +func (_m *GatewayConfigBuilder) Build(ctx context.Context, pipelines []v1alpha1.LogPipeline, opts gateway.BuildOptions) (*gateway.Config, otlpexporter.EnvVars, error) { + ret := _m.Called(ctx, pipelines, opts) if len(ret) == 0 { panic("no return value specified for Build") @@ -29,27 +29,27 @@ func (_m *GatewayConfigBuilder) Build(ctx context.Context, pipelines []v1alpha1. var r0 *gateway.Config var r1 otlpexporter.EnvVars var r2 error - if rf, ok := ret.Get(0).(func(context.Context, []v1alpha1.LogPipeline) (*gateway.Config, otlpexporter.EnvVars, error)); ok { - return rf(ctx, pipelines) + if rf, ok := ret.Get(0).(func(context.Context, []v1alpha1.LogPipeline, gateway.BuildOptions) (*gateway.Config, otlpexporter.EnvVars, error)); ok { + return rf(ctx, pipelines, opts) } - if rf, ok := ret.Get(0).(func(context.Context, []v1alpha1.LogPipeline) *gateway.Config); ok { - r0 = rf(ctx, pipelines) + if rf, ok := ret.Get(0).(func(context.Context, []v1alpha1.LogPipeline, gateway.BuildOptions) *gateway.Config); ok { + r0 = rf(ctx, pipelines, opts) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*gateway.Config) } } - if rf, ok := ret.Get(1).(func(context.Context, []v1alpha1.LogPipeline) otlpexporter.EnvVars); ok { - r1 = rf(ctx, pipelines) + if rf, ok := ret.Get(1).(func(context.Context, []v1alpha1.LogPipeline, gateway.BuildOptions) otlpexporter.EnvVars); ok { + r1 = rf(ctx, pipelines, opts) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(otlpexporter.EnvVars) } } - if rf, ok := ret.Get(2).(func(context.Context, []v1alpha1.LogPipeline) error); ok { - r2 = rf(ctx, pipelines) + if rf, ok := ret.Get(2).(func(context.Context, []v1alpha1.LogPipeline, gateway.BuildOptions) error); ok { + r2 = rf(ctx, pipelines, opts) } else { r2 = ret.Error(2) } diff --git a/internal/reconciler/logpipeline/otel/reconciler.go b/internal/reconciler/logpipeline/otel/reconciler.go index 025336aed..9a077ad8f 100644 --- a/internal/reconciler/logpipeline/otel/reconciler.go +++ b/internal/reconciler/logpipeline/otel/reconciler.go @@ -27,7 +27,7 @@ import ( const defaultReplicaCount int32 = 2 type GatewayConfigBuilder interface { - Build(ctx context.Context, pipelines []telemetryv1alpha1.LogPipeline) (*gateway.Config, otlpexporter.EnvVars, error) + Build(ctx context.Context, pipelines []telemetryv1alpha1.LogPipeline, opts gateway.BuildOptions) (*gateway.Config, otlpexporter.EnvVars, error) } type GatewayApplierDeleter interface { @@ -171,7 +171,12 @@ func (r *Reconciler) isReconcilable(ctx context.Context, pipeline *telemetryv1al } func (r *Reconciler) reconcileLogGateway(ctx context.Context, pipeline *telemetryv1alpha1.LogPipeline, allPipelines []telemetryv1alpha1.LogPipeline) error { - collectorConfig, collectorEnvVars, err := r.gatewayConfigBuilder.Build(ctx, allPipelines) + clusterInfo := k8sutils.GetGardenerShootInfo(ctx, r.Client) + collectorConfig, collectorEnvVars, err := r.gatewayConfigBuilder.Build(ctx, allPipelines, gateway.BuildOptions{ + ClusterName: clusterInfo.ClusterName, + CloudProvider: clusterInfo.CloudProvider, + }) + if err != nil { return fmt.Errorf("failed to create collector config: %w", err) } diff --git a/internal/reconciler/logpipeline/otel/reconciler_test.go b/internal/reconciler/logpipeline/otel/reconciler_test.go index a40744256..f712b4612 100644 --- a/internal/reconciler/logpipeline/otel/reconciler_test.go +++ b/internal/reconciler/logpipeline/otel/reconciler_test.go @@ -45,7 +45,7 @@ func TestReconcile(t *testing.T) { gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil) gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{} - gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline)).Return(&gateway.Config{}, nil, nil).Times(1) + gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline), mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1) gatewayProberStub := commonStatusStubs.NewDeploymentSetProber(workloadstatus.ErrDeploymentFetching) @@ -91,7 +91,7 @@ func TestReconcile(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build() gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{} - gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline)).Return(&gateway.Config{}, nil, nil).Times(1) + gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline), mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1) gatewayApplierDeleterMock := &mocks.GatewayApplierDeleter{} gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -140,7 +140,7 @@ func TestReconcile(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build() gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{} - gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline)).Return(&gateway.Config{}, nil, nil).Times(1) + gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline), mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1) gatewayApplierDeleterMock := &mocks.GatewayApplierDeleter{} gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil) diff --git a/internal/reconciler/metricpipeline/reconciler.go b/internal/reconciler/metricpipeline/reconciler.go index 27cf06550..48380347c 100644 --- a/internal/reconciler/metricpipeline/reconciler.go +++ b/internal/reconciler/metricpipeline/reconciler.go @@ -240,9 +240,13 @@ func isMetricAgentRequired(pipeline *telemetryv1alpha1.MetricPipeline) bool { } func (r *Reconciler) reconcileMetricGateway(ctx context.Context, pipeline *telemetryv1alpha1.MetricPipeline, allPipelines []telemetryv1alpha1.MetricPipeline) error { + shootInfo := k8sutils.GetGardenerShootInfo(ctx, r.Client) + collectorConfig, collectorEnvVars, err := r.gatewayConfigBuilder.Build(ctx, allPipelines, gateway.BuildOptions{ GatewayNamespace: r.telemetryNamespace, InstrumentationScopeVersion: r.moduleVersion, + ClusterName: shootInfo.ClusterName, + CloudProvider: shootInfo.CloudProvider, }) if err != nil { diff --git a/internal/reconciler/tracepipeline/mocks/gateway_config_builder.go b/internal/reconciler/tracepipeline/mocks/gateway_config_builder.go index 90063d6de..cf6b802a0 100644 --- a/internal/reconciler/tracepipeline/mocks/gateway_config_builder.go +++ b/internal/reconciler/tracepipeline/mocks/gateway_config_builder.go @@ -18,9 +18,9 @@ type GatewayConfigBuilder struct { mock.Mock } -// Build provides a mock function with given fields: ctx, pipelines -func (_m *GatewayConfigBuilder) Build(ctx context.Context, pipelines []v1alpha1.TracePipeline) (*gateway.Config, otlpexporter.EnvVars, error) { - ret := _m.Called(ctx, pipelines) +// Build provides a mock function with given fields: ctx, pipelines, opts +func (_m *GatewayConfigBuilder) Build(ctx context.Context, pipelines []v1alpha1.TracePipeline, opts gateway.BuildOptions) (*gateway.Config, otlpexporter.EnvVars, error) { + ret := _m.Called(ctx, pipelines, opts) if len(ret) == 0 { panic("no return value specified for Build") @@ -29,27 +29,27 @@ func (_m *GatewayConfigBuilder) Build(ctx context.Context, pipelines []v1alpha1. var r0 *gateway.Config var r1 otlpexporter.EnvVars var r2 error - if rf, ok := ret.Get(0).(func(context.Context, []v1alpha1.TracePipeline) (*gateway.Config, otlpexporter.EnvVars, error)); ok { - return rf(ctx, pipelines) + if rf, ok := ret.Get(0).(func(context.Context, []v1alpha1.TracePipeline, gateway.BuildOptions) (*gateway.Config, otlpexporter.EnvVars, error)); ok { + return rf(ctx, pipelines, opts) } - if rf, ok := ret.Get(0).(func(context.Context, []v1alpha1.TracePipeline) *gateway.Config); ok { - r0 = rf(ctx, pipelines) + if rf, ok := ret.Get(0).(func(context.Context, []v1alpha1.TracePipeline, gateway.BuildOptions) *gateway.Config); ok { + r0 = rf(ctx, pipelines, opts) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*gateway.Config) } } - if rf, ok := ret.Get(1).(func(context.Context, []v1alpha1.TracePipeline) otlpexporter.EnvVars); ok { - r1 = rf(ctx, pipelines) + if rf, ok := ret.Get(1).(func(context.Context, []v1alpha1.TracePipeline, gateway.BuildOptions) otlpexporter.EnvVars); ok { + r1 = rf(ctx, pipelines, opts) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(otlpexporter.EnvVars) } } - if rf, ok := ret.Get(2).(func(context.Context, []v1alpha1.TracePipeline) error); ok { - r2 = rf(ctx, pipelines) + if rf, ok := ret.Get(2).(func(context.Context, []v1alpha1.TracePipeline, gateway.BuildOptions) error); ok { + r2 = rf(ctx, pipelines, opts) } else { r2 = ret.Error(2) } diff --git a/internal/reconciler/tracepipeline/reconciler.go b/internal/reconciler/tracepipeline/reconciler.go index 60686541e..5fcd98ef2 100644 --- a/internal/reconciler/tracepipeline/reconciler.go +++ b/internal/reconciler/tracepipeline/reconciler.go @@ -44,7 +44,7 @@ import ( const defaultReplicaCount int32 = 2 type GatewayConfigBuilder interface { - Build(ctx context.Context, pipelines []telemetryv1alpha1.TracePipeline) (*gateway.Config, otlpexporter.EnvVars, error) + Build(ctx context.Context, pipelines []telemetryv1alpha1.TracePipeline, opts gateway.BuildOptions) (*gateway.Config, otlpexporter.EnvVars, error) } type GatewayApplierDeleter interface { @@ -217,7 +217,12 @@ func (r *Reconciler) isReconcilable(ctx context.Context, pipeline *telemetryv1al } func (r *Reconciler) reconcileTraceGateway(ctx context.Context, pipeline *telemetryv1alpha1.TracePipeline, allPipelines []telemetryv1alpha1.TracePipeline) error { - collectorConfig, collectorEnvVars, err := r.gatewayConfigBuilder.Build(ctx, allPipelines) + clusterInfo := k8sutils.GetGardenerShootInfo(ctx, r.Client) + + collectorConfig, collectorEnvVars, err := r.gatewayConfigBuilder.Build(ctx, allPipelines, gateway.BuildOptions{ + ClusterName: clusterInfo.ClusterName, + CloudProvider: clusterInfo.CloudProvider, + }) if err != nil { return fmt.Errorf("failed to create collector config: %w", err) } diff --git a/internal/reconciler/tracepipeline/reconciler_test.go b/internal/reconciler/tracepipeline/reconciler_test.go index 4cd1a7ee6..2bab74805 100644 --- a/internal/reconciler/tracepipeline/reconciler_test.go +++ b/internal/reconciler/tracepipeline/reconciler_test.go @@ -52,7 +52,7 @@ func TestReconcile(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build() gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{} - gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline)).Return(&gateway.Config{}, nil, nil).Times(1) + gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline), mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1) gatewayApplierDeleterMock := &mocks.GatewayApplierDeleter{} gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -109,7 +109,7 @@ func TestReconcile(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build() gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{} - gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline)).Return(&gateway.Config{}, nil, nil).Times(1) + gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline), mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1) gatewayApplierDeleterMock := &mocks.GatewayApplierDeleter{} gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -166,7 +166,7 @@ func TestReconcile(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build() gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{} - gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline)).Return(&gateway.Config{}, nil, nil).Times(1) + gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline), mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1) gatewayApplierDeleterMock := &mocks.GatewayApplierDeleter{} gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -296,7 +296,7 @@ func TestReconcile(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline, secret).WithStatusSubresource(&pipeline).Build() gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{} - gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline)).Return(&gateway.Config{}, nil, nil).Times(1) + gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline), mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1) gatewayApplierDeleterMock := &mocks.GatewayApplierDeleter{} gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -506,7 +506,7 @@ func TestReconcile(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build() gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{} - gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline)).Return(&gateway.Config{}, nil, nil).Times(1) + gatewayConfigBuilderMock.On("Build", mock.Anything, containsPipeline(pipeline), mock.Anything).Return(&gateway.Config{}, nil, nil).Times(1) gatewayApplierDeleterMock := &mocks.GatewayApplierDeleter{} gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -640,7 +640,7 @@ func TestReconcile(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&pipeline).WithStatusSubresource(&pipeline).Build() gatewayConfigBuilderMock := &mocks.GatewayConfigBuilder{} - gatewayConfigBuilderMock.On("Build", mock.Anything, mock.Anything).Return(&gateway.Config{}, nil, nil) + gatewayConfigBuilderMock.On("Build", mock.Anything, mock.Anything, mock.Anything).Return(&gateway.Config{}, nil, nil) gatewayApplierDeleterMock := &mocks.GatewayApplierDeleter{} gatewayApplierDeleterMock.On("ApplyResources", mock.Anything, mock.Anything, mock.Anything).Return(nil) @@ -699,9 +699,9 @@ func TestReconcile(t *testing.T) { } if !tt.expectGatewayConfigured { - gatewayConfigBuilderMock.AssertNotCalled(t, "Build", mock.Anything, mock.Anything) + gatewayConfigBuilderMock.AssertNotCalled(t, "Build", mock.Anything, mock.Anything, mock.Anything) } else { - gatewayConfigBuilderMock.AssertCalled(t, "Build", mock.Anything, containsPipeline(pipeline)) + gatewayConfigBuilderMock.AssertCalled(t, "Build", mock.Anything, containsPipeline(pipeline), mock.Anything) } }) } diff --git a/internal/resources/otelcollector/rbac.go b/internal/resources/otelcollector/rbac.go index 890456916..98bcf87f6 100644 --- a/internal/resources/otelcollector/rbac.go +++ b/internal/resources/otelcollector/rbac.go @@ -220,7 +220,7 @@ func withK8sAttributeRules() ClusterRoleOption { // policy rules needed for the k8sattributeprocessor component k8sAttributeRules := []rbacv1.PolicyRule{{ APIGroups: []string{""}, - Resources: []string{"namespaces", "pods"}, + Resources: []string{"namespaces", "pods", "nodes"}, Verbs: []string{"get", "list", "watch"}, }, { APIGroups: []string{"apps"}, diff --git a/internal/resources/otelcollector/testdata/log-gateway.yaml b/internal/resources/otelcollector/testdata/log-gateway.yaml index 39eeab07f..3100cbe76 100644 --- a/internal/resources/otelcollector/testdata/log-gateway.yaml +++ b/internal/resources/otelcollector/testdata/log-gateway.yaml @@ -231,6 +231,7 @@ rules: resources: - namespaces - pods + - nodes verbs: - get - list diff --git a/internal/resources/otelcollector/testdata/metric-gateway-istio.yaml b/internal/resources/otelcollector/testdata/metric-gateway-istio.yaml index 74d9cc436..4db08459f 100644 --- a/internal/resources/otelcollector/testdata/metric-gateway-istio.yaml +++ b/internal/resources/otelcollector/testdata/metric-gateway-istio.yaml @@ -233,6 +233,7 @@ rules: resources: - namespaces - pods + - nodes verbs: - get - list diff --git a/internal/resources/otelcollector/testdata/metric-gateway.yaml b/internal/resources/otelcollector/testdata/metric-gateway.yaml index 9148cc42a..69c51d82a 100644 --- a/internal/resources/otelcollector/testdata/metric-gateway.yaml +++ b/internal/resources/otelcollector/testdata/metric-gateway.yaml @@ -231,6 +231,7 @@ rules: resources: - namespaces - pods + - nodes verbs: - get - list diff --git a/internal/resources/otelcollector/testdata/trace-gateway.yaml b/internal/resources/otelcollector/testdata/trace-gateway.yaml index b8ab59b71..0d2e56a26 100644 --- a/internal/resources/otelcollector/testdata/trace-gateway.yaml +++ b/internal/resources/otelcollector/testdata/trace-gateway.yaml @@ -231,6 +231,7 @@ rules: resources: - namespaces - pods + - nodes verbs: - get - list diff --git a/internal/utils/k8s/cluster_info_getter.go b/internal/utils/k8s/cluster_info_getter.go new file mode 100644 index 000000000..da6dd79cf --- /dev/null +++ b/internal/utils/k8s/cluster_info_getter.go @@ -0,0 +1,54 @@ +package k8s + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + defaultClusterName = "${KUBERNETES_SERVICE_HOST}" + gardenerShootNameAttributeName = "shootName" + gardenerCloudProviderAttributeName = "provider" + CloudProviderOpenStack = "openstack" + CloudProviderSAPConvergedCloud = "sap-converged-cloud" +) + +var defaultGardenerShootInfoCM = types.NamespacedName{ + Namespace: "kube-system", + Name: "shoot-info", +} + +type ClusterInfo struct { + ClusterName string + CloudProvider string +} + +func GetGardenerShootInfo(ctx context.Context, client client.Client) ClusterInfo { + shootInfo := corev1.ConfigMap{} + + err := client.Get(ctx, defaultGardenerShootInfoCM, &shootInfo) + + // The shoot-info config map is used to store information about the Gardener cluster, such as the cluster name and the cloud provider. + // If cluster in a non Gardener cluster, the shoot-info config map will not exist, so we return the default values. + if err != nil { + logf.FromContext(ctx).V(1).Info("Failed get shoot-info config map") + + return ClusterInfo{ClusterName: defaultClusterName} + } + + // The provider `openstack` is used to represent the SAP Converged Cloud provider. + cloudProvider := shootInfo.Data[gardenerCloudProviderAttributeName] + + if cloudProvider == CloudProviderOpenStack { + cloudProvider = CloudProviderSAPConvergedCloud + } + + return ClusterInfo{ + ClusterName: shootInfo.Data[gardenerShootNameAttributeName], + CloudProvider: cloudProvider, + } +} diff --git a/internal/utils/k8s/cluster_info_getter_test.go b/internal/utils/k8s/cluster_info_getter_test.go new file mode 100644 index 000000000..bcdf4122c --- /dev/null +++ b/internal/utils/k8s/cluster_info_getter_test.go @@ -0,0 +1,56 @@ +package k8s + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestClusterInfoGetter(t *testing.T) { + t.Run("Gardener cluster", func(t *testing.T) { + shootInfo := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "shoot-info", Namespace: "kube-system"}, + Data: map[string]string{ + "shootName": "test-cluster", + "provider": "test-provider", + }, + } + + fakeClient := fake.NewClientBuilder().WithObjects(shootInfo).Build() + + clusterInfo := GetGardenerShootInfo(context.Background(), fakeClient) + + require.Equal(t, clusterInfo.ClusterName, "test-cluster") + require.Equal(t, clusterInfo.CloudProvider, "test-provider") + }) + + t.Run("Gardener converged cloud", func(t *testing.T) { + shootInfo := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "shoot-info", Namespace: "kube-system"}, + Data: map[string]string{ + "shootName": "test-cluster", + "provider": "openstack", + }, + } + + fakeClient := fake.NewClientBuilder().WithObjects(shootInfo).Build() + + clusterInfo := GetGardenerShootInfo(context.Background(), fakeClient) + + require.Equal(t, clusterInfo.ClusterName, "test-cluster") + require.Equal(t, clusterInfo.CloudProvider, "sap-converged-cloud") + }) + + t.Run("Non Gardener cluster", func(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithObjects().Build() + + clusterInfo := GetGardenerShootInfo(context.Background(), fakeClient) + + require.Equal(t, clusterInfo.ClusterName, "${KUBERNETES_SERVICE_HOST}") + require.Equal(t, clusterInfo.CloudProvider, "") + }) +} diff --git a/main.go b/main.go index bf9d69f8f..3ace3cfa7 100644 --- a/main.go +++ b/main.go @@ -183,7 +183,7 @@ func run() error { &appsv1.Deployment{}: {Field: setNamespaceFieldSelector()}, &appsv1.ReplicaSet{}: {Field: setNamespaceFieldSelector()}, &appsv1.DaemonSet{}: {Field: setNamespaceFieldSelector()}, - &corev1.ConfigMap{}: {Field: setNamespaceFieldSelector()}, + &corev1.ConfigMap{}: {Namespaces: setConfigMapNamespaceFieldSelector()}, &corev1.ServiceAccount{}: {Field: setNamespaceFieldSelector()}, &corev1.Service{}: {Field: setNamespaceFieldSelector()}, &networkingv1.NetworkPolicy{}: {Field: setNamespaceFieldSelector()}, @@ -456,6 +456,15 @@ func setNamespaceFieldSelector() fields.Selector { return fields.SelectorFromSet(fields.Set{"metadata.namespace": telemetryNamespace}) } +func setConfigMapNamespaceFieldSelector() map[string]cache.Config { + return map[string]cache.Config{ + "kube-system": { + FieldSelector: fields.SelectorFromSet(fields.Set{"metadata.name": "shoot-info"}), + }, + telemetryNamespace: {}, + } +} + func createSelfMonitoringConfig() telemetry.SelfMonitorConfig { return telemetry.SelfMonitorConfig{ Config: selfmonitor.Config{ diff --git a/test/e2e/metrics_runtime_input_test.go b/test/e2e/metrics_runtime_input_test.go index 96e5ef875..a40dde19a 100644 --- a/test/e2e/metrics_runtime_input_test.go +++ b/test/e2e/metrics_runtime_input_test.go @@ -449,7 +449,7 @@ func backendContainsDesiredResourceAttributes(proxyClient *apiserverproxy.Client g.Expect(resp).To(HaveHTTPBody(HaveFlatMetrics( ContainElement(SatisfyAll( HaveName(Equal(metricName)), - HaveResourceAttributes(HaveKeys(ConsistOf(resourceAttributes))), + HaveResourceAttributes(HaveKeys(ContainElements(resourceAttributes))), )), ))) }, 3*periodic.TelemetryEventuallyTimeout, periodic.TelemetryInterval).Should(Succeed(), "Failed to find metric %s with resource attributes %v", metricName, resourceAttributes) diff --git a/test/integration/istio/metrics_cloud_provider_attributes_test.go b/test/integration/istio/metrics_cloud_provider_attributes_test.go new file mode 100644 index 000000000..b43506c8a --- /dev/null +++ b/test/integration/istio/metrics_cloud_provider_attributes_test.go @@ -0,0 +1,153 @@ +//go:build istio + +package istio + +import ( + "fmt" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "io" + "k8s.io/apimachinery/pkg/types" + "net/http" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kyma-project/telemetry-manager/internal/otelcollector/ports" + testutils "github.com/kyma-project/telemetry-manager/internal/utils/test" + "github.com/kyma-project/telemetry-manager/test/testkit/apiserverproxy" + "github.com/kyma-project/telemetry-manager/test/testkit/assert" + kitk8s "github.com/kyma-project/telemetry-manager/test/testkit/k8s" + kitkyma "github.com/kyma-project/telemetry-manager/test/testkit/kyma" + . "github.com/kyma-project/telemetry-manager/test/testkit/matchers/metric" + "github.com/kyma-project/telemetry-manager/test/testkit/mocks/backend" + "github.com/kyma-project/telemetry-manager/test/testkit/mocks/prommetricgen" + "github.com/kyma-project/telemetry-manager/test/testkit/mocks/telemetrygen" + "github.com/kyma-project/telemetry-manager/test/testkit/periodic" + "github.com/kyma-project/telemetry-manager/test/testkit/suite" +) + +var _ = Describe(suite.ID(), Label(suite.LabelIntegration), Label(suite.LabelMetrics), Label(suite.LabelSetA), Ordered, func() { + Context("When metric pipelines with cloud provider resources metrics exist", Ordered, func() { + var ( + mockNs = suite.ID() + + backendName = suite.IDWithSuffix("resource-metrics") + pipelineName = suite.IDWithSuffix("resource-metrics") + backendURL string + + DeploymentName = suite.IDWithSuffix("deployment") + ) + + makeResources := func() []client.Object { + var objs []client.Object + objs = append(objs, kitk8s.NewNamespace(mockNs).K8sObject()) + objs = append(objs, kitk8s.NewConfigMap("shoot-info", "kube-system").WithData("shootName", "kyma-telemetry").WithData("provider", "k3d").WithLabel(kitk8s.PersistentLabelName, "true").K8sObject()) + + backend := backend.New(mockNs, backend.SignalTypeMetrics, backend.WithName(backendName)) + objs = append(objs, backend.K8sObjects()...) + backendURL = backend.ExportURL(proxyClient) + + pipeline := testutils.NewMetricPipelineBuilder(). + WithName(pipelineName). + WithRuntimeInput(true). + WithRuntimeInputContainerMetrics(true). + WithRuntimeInputPodMetrics(true). + WithRuntimeInputNodeMetrics(true). + WithRuntimeInputVolumeMetrics(true). + WithRuntimeInputDeploymentMetrics(false). + WithRuntimeInputStatefulSetMetrics(false). + WithRuntimeInputDaemonSetMetrics(false). + WithRuntimeInputJobMetrics(false). + WithOTLPOutput(testutils.OTLPEndpoint(backend.Endpoint())). + Build() + objs = append(objs, &pipeline) + + metricProducer := prommetricgen.New(mockNs) + + objs = append(objs, []client.Object{ + metricProducer.Pod().WithPrometheusAnnotations(prommetricgen.SchemeHTTP).K8sObject(), + metricProducer.Service().WithPrometheusAnnotations(prommetricgen.SchemeHTTP).K8sObject(), + }...) + + podSpec := telemetrygen.PodSpec(telemetrygen.SignalTypeMetrics) + + objs = append(objs, []client.Object{ + kitk8s.NewDeployment(DeploymentName, mockNs).WithPodSpec(podSpec).WithLabel("name", DeploymentName).K8sObject(), + }...) + + return objs + } + + BeforeAll(func() { + k8sObjects := makeResources() + + DeferCleanup(func() { + Expect(kitk8s.DeleteObjects(ctx, k8sClient, k8sObjects...)).Should(Succeed()) + }) + Expect(kitk8s.CreateObjects(ctx, k8sClient, k8sObjects...)).Should(Succeed()) + }) + + It("Should have healthy pipelines", func() { + assert.MetricPipelineHealthy(ctx, k8sClient, pipelineName) + }) + + It("Ensures the metric gateway deployment is ready", func() { + assert.DeploymentReady(ctx, k8sClient, kitkyma.MetricGatewayName) + }) + + It("Should have metrics backends running", func() { + assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Name: backendName, Namespace: mockNs}) + assert.ServiceReady(ctx, k8sClient, types.NamespacedName{Name: backendName, Namespace: mockNs}) + }) + + It("should have workloads created properly", func() { + assert.DeploymentReady(ctx, k8sClient, types.NamespacedName{Name: DeploymentName, Namespace: mockNs}) + }) + + It("Ensures accessibility of metric agent metrics endpoint", func() { + agentMetricsURL := proxyClient.ProxyURLForService(kitkyma.MetricAgentMetricsService.Namespace, kitkyma.MetricAgentMetricsService.Name, "metrics", ports.Metrics) + assert.EmitsOTelCollectorMetrics(proxyClient, agentMetricsURL) + }) + + Context("Pipeline A should deliver pod metrics", Ordered, func() { + It("Should deliver pod metrics with expected cloud resource attributes", func() { + backendContainsDesiredCloudResourceAttributes(proxyClient, backendURL, "cloud.region") + backendContainsDesiredCloudResourceAttributes(proxyClient, backendURL, "cloud.availability_zone") + backendContainsDesiredCloudResourceAttributes(proxyClient, backendURL, "host.type") + backendContainsDesiredCloudResourceAttributes(proxyClient, backendURL, "host.arch") + backendContainsDesiredCloudResourceAttributes(proxyClient, backendURL, "k8s.cluster.name") + backendContainsDesiredCloudResourceAttributes(proxyClient, backendURL, "cloud.provider") + }) + }) + }) +}) + +// Check for `ContainElements` for metrics present in the backend +func backendContainsMetricsDeliveredForResource(proxyClient *apiserverproxy.Client, backendExportURL string, resourceMetrics []string) { + Eventually(func(g Gomega) { + resp, err := proxyClient.Get(backendExportURL) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(resp).To(HaveHTTPStatus(http.StatusOK)) + defer resp.Body.Close() + + g.Expect(resp).To(HaveHTTPBody( + HaveFlatMetrics(HaveUniqueNamesForRuntimeScope(ContainElements(resourceMetrics))), + )) + }, 2*periodic.TelemetryEventuallyTimeout, periodic.TelemetryInterval).Should(Succeed(), "Failed to find metrics using ContainElements %v", resourceMetrics) +} + +func backendContainsDesiredCloudResourceAttributes(proxyClient *apiserverproxy.Client, backendExportURL, attribute string) { + Eventually(func(g Gomega) { + resp, err := proxyClient.Get(backendExportURL) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(resp).To(HaveHTTPStatus(http.StatusOK)) + + bodyContent, err := io.ReadAll(resp.Body) + defer resp.Body.Close() + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(bodyContent).To(HaveFlatMetrics( + ContainElement(SatisfyAll( + HaveResourceAttributes(HaveKey(attribute)), + )), + )) + }, periodic.TelemetryEventuallyTimeout, periodic.TelemetryInterval).Should(Succeed(), fmt.Sprintf("could not find metrics matching resource attribute %s", attribute)) +} diff --git a/test/testkit/k8s/configmap.go b/test/testkit/k8s/configmap.go new file mode 100644 index 000000000..a04b7d442 --- /dev/null +++ b/test/testkit/k8s/configmap.go @@ -0,0 +1,45 @@ +package k8s + +import ( + "maps" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type ConfigMap struct { + name string + namespace string + data map[string]string + labels map[string]string +} + +func NewConfigMap(cfName, ns string) *ConfigMap { + return &ConfigMap{ + name: cfName, + namespace: ns, + data: make(map[string]string), + labels: make(map[string]string), + } +} + +func (c *ConfigMap) WithData(key, value string) *ConfigMap { + maps.Copy(c.data, map[string]string{key: value}) + return c +} + +func (c *ConfigMap) WithLabel(key, value string) *ConfigMap { + c.labels[key] = value + return c +} + +func (c *ConfigMap) K8sObject() *corev1.ConfigMap { + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.name, + Namespace: c.namespace, + Labels: c.labels, + }, + Data: c.data, + } +}