diff --git a/cmd/config-translator/translator.go b/cmd/config-translator/translator.go index ad213dfd27..0df0e6e058 100644 --- a/cmd/config-translator/translator.go +++ b/cmd/config-translator/translator.go @@ -66,6 +66,7 @@ func initFlags() { mode := translatorUtil.DetectAgentMode(*inputMode) ctx.SetMode(mode) ctx.SetKubernetesMode(translatorUtil.DetectKubernetesMode(mode)) + ctx.SetWorkloadType(translatorUtil.DetectWorkloadType()) } /** diff --git a/translator/context/context.go b/translator/context/context.go index 00b790b63b..dcddc2b88f 100644 --- a/translator/context/context.go +++ b/translator/context/context.go @@ -38,6 +38,7 @@ type Context struct { outputTomlFilePath string mode string kubernetesMode string + workloadType string shortMode string credentials map[string]string proxy map[string]string @@ -99,6 +100,10 @@ func (ctx *Context) KubernetesMode() string { return ctx.kubernetesMode } +func (ctx *Context) WorkloadType() string { + return ctx.workloadType +} + func (ctx *Context) ShortMode() string { return ctx.shortMode } @@ -150,6 +155,10 @@ func (ctx *Context) SetKubernetesMode(mode string) { } } +func (ctx *Context) SetWorkloadType(workloadType string) { + ctx.workloadType = workloadType +} + func (ctx *Context) SetCredentials(creds map[string]string) { ctx.credentials = creds } diff --git a/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_eks_config.yaml b/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_eks_config.yaml index 5534b3815e..203799295c 100644 --- a/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_eks_config.yaml +++ b/translator/tocwconfig/sampleConfig/otlp_metrics_cloudwatchlogs_eks_config.yaml @@ -70,6 +70,34 @@ processors: match_type: "" initial_value: 2 max_staleness: 0s + k8sattributes/hostOtlpMetrics/cloudwatchlogs: + auth_type: serviceAccount + context: "" + exclude: + pods: + - name: jaeger-agent + - name: jaeger-collector + extract: + metadata: + - k8s.namespace.name + - k8s.pod.name + - k8s.replicaset.name + - k8s.deployment.name + - k8s.daemonset.name + - k8s.statefulset.name + - k8s.cronjob.name + - k8s.job.name + - k8s.node.name + filter: + namespace: "" + node: "" + node_from_env_var: K8S_NODE_NAME + kube_config_path: "" + passthrough: false + pod_association: + - sources: + - from: connection + name: "" receivers: otlp/metrics: protocols: @@ -121,8 +149,9 @@ service: exporters: - awsemf processors: - - awsentity/service/otlp - cumulativetodelta/hostOtlpMetrics/cloudwatchlogs + - k8sattributes/hostOtlpMetrics/cloudwatchlogs + - awsentity/service/otlp - batch/hostOtlpMetrics/cloudwatchlogs receivers: - otlp/metrics diff --git a/translator/tocwconfig/sampleConfig/otlp_metrics_eks_config.yaml b/translator/tocwconfig/sampleConfig/otlp_metrics_eks_config.yaml index a55f30cb6d..3fecc84b86 100644 --- a/translator/tocwconfig/sampleConfig/otlp_metrics_eks_config.yaml +++ b/translator/tocwconfig/sampleConfig/otlp_metrics_eks_config.yaml @@ -55,6 +55,34 @@ processors: imds_retries: 1 middleware: agenthealth/statuscode refresh_interval_seconds: 0s + k8sattributes/hostOtlpMetrics: + auth_type: serviceAccount + context: "" + exclude: + pods: + - name: jaeger-agent + - name: jaeger-collector + extract: + metadata: + - k8s.namespace.name + - k8s.pod.name + - k8s.replicaset.name + - k8s.deployment.name + - k8s.daemonset.name + - k8s.statefulset.name + - k8s.cronjob.name + - k8s.job.name + - k8s.node.name + filter: + namespace: "" + node: "" + node_from_env_var: K8S_NODE_NAME + kube_config_path: "" + passthrough: false + pod_association: + - sources: + - from: connection + name: "" receivers: otlp/metrics: protocols: @@ -106,9 +134,10 @@ service: exporters: - awscloudwatch processors: - - awsentity/service/otlp - cumulativetodelta/hostOtlpMetrics - ec2tagger + - k8sattributes/hostOtlpMetrics + - awsentity/service/otlp receivers: - otlp/metrics telemetry: diff --git a/translator/tocwconfig/tocwconfig_test.go b/translator/tocwconfig/tocwconfig_test.go index 7b45433150..3cc5ab85b7 100644 --- a/translator/tocwconfig/tocwconfig_test.go +++ b/translator/tocwconfig/tocwconfig_test.go @@ -297,6 +297,7 @@ func TestOtlpMetricsConfigKubernetes(t *testing.T) { resetContext(t) context.CurrentContext().SetMode(config.ModeEC2) context.CurrentContext().SetKubernetesMode(config.ModeK8sEC2) + context.CurrentContext().SetWorkloadType("DaemonSet") context.CurrentContext().SetRunInContainer(true) checkTranslation(t, "otlp_metrics_eks_config", "linux", nil, "") checkTranslation(t, "otlp_metrics_eks_config", "darwin", nil, "") @@ -315,6 +316,7 @@ func TestOtlpMetricsEmfConfigKubernetes(t *testing.T) { resetContext(t) context.CurrentContext().SetMode(config.ModeEC2) context.CurrentContext().SetKubernetesMode(config.ModeK8sEC2) + context.CurrentContext().SetWorkloadType("DaemonSet") context.CurrentContext().SetRunInContainer(true) t.Setenv(config.HOST_NAME, "host_name_from_env") checkTranslation(t, "otlp_metrics_cloudwatchlogs_eks_config", "linux", nil, "") diff --git a/translator/translate/otel/pipeline/host/translator.go b/translator/translate/otel/pipeline/host/translator.go index 2e8502b536..ff0aa6f6e3 100644 --- a/translator/translate/otel/pipeline/host/translator.go +++ b/translator/translate/otel/pipeline/host/translator.go @@ -24,6 +24,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/cumulativetodeltaprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/ec2taggerprocessor" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/k8sattributesprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/metricsdecorator" "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/rollupprocessor" "github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil" @@ -106,6 +107,7 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators, case common.PipelineNameHostOtlpMetrics: // TODO: For OTLP, the entity processor is only on K8S for now. Eventually this should be added to EC2 if currentContext.KubernetesMode() != "" { + translators.Processors.Set(k8sattributesprocessor.NewTranslatorWithName(t.name)) entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, common.OtlpKey, false) } case common.PipelineNameHostCustomMetrics: diff --git a/translator/translate/otel/pipeline/host/translator_test.go b/translator/translate/otel/pipeline/host/translator_test.go index 8a23380093..cba19971d8 100644 --- a/translator/translate/otel/pipeline/host/translator_test.go +++ b/translator/translate/otel/pipeline/host/translator_test.go @@ -132,7 +132,7 @@ func TestTranslator(t *testing.T) { want: &want{ pipelineID: "metrics/hostOtlpMetrics", receivers: []string{"nop", "other"}, - processors: []string{"cumulativetodelta/hostOtlpMetrics", "awsentity/service/otlp"}, + processors: []string{"cumulativetodelta/hostOtlpMetrics", "k8sattributes/hostOtlpMetrics", "awsentity/service/otlp"}, exporters: []string{"awscloudwatch"}, extensions: []string{"agenthealth/metrics", "agenthealth/statuscode"}, }, @@ -190,7 +190,7 @@ func TestTranslator(t *testing.T) { want: &want{ pipelineID: "metrics/hostOtlpMetrics/cloudwatchlogs", receivers: []string{"nop", "other"}, - processors: []string{"cumulativetodelta/hostOtlpMetrics/cloudwatchlogs", "awsentity/service/otlp", "batch/hostOtlpMetrics/cloudwatchlogs"}, + processors: []string{"cumulativetodelta/hostOtlpMetrics/cloudwatchlogs", "k8sattributes/hostOtlpMetrics/cloudwatchlogs", "awsentity/service/otlp", "batch/hostOtlpMetrics/cloudwatchlogs"}, exporters: []string{"awsemf"}, extensions: []string{"agenthealth/logs", "agenthealth/statuscode"}, }, diff --git a/translator/translate/otel/processor/awsentity/translator.go b/translator/translate/otel/processor/awsentity/translator.go index ed81c702d0..b9fe6d5881 100644 --- a/translator/translate/otel/processor/awsentity/translator.go +++ b/translator/translate/otel/processor/awsentity/translator.go @@ -4,6 +4,7 @@ package awsentity import ( + "os" "strings" "go.opentelemetry.io/collector/component" @@ -55,8 +56,10 @@ func (t *translator) ID() component.ID { } func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { + ctx := context.CurrentContext() + // Do not send entity for ECS - if context.CurrentContext().RunInContainer() && ecsutil.GetECSUtilSingleton().IsECS() { + if ctx.RunInContainer() && ecsutil.GetECSUtilSingleton().IsECS() { return nil, nil } @@ -70,32 +73,47 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { cfg.ScrapeDatapointAttribute = true } - hostedInConfigKey := common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignals, "hosted_in") - hostedIn, hostedInConfigured := common.GetString(conf, hostedInConfigKey) - if !hostedInConfigured { - hostedInConfigKey = common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignalsFallback, "hosted_in") - hostedIn, hostedInConfigured = common.GetString(conf, hostedInConfigKey) - } - if common.IsAppSignalsKubernetes() { - if !hostedInConfigured { - hostedIn = util.GetClusterNameFromEc2Tagger() - } - } + cfg.KubernetesMode = ctx.KubernetesMode() //TODO: This logic is more or less identical to what AppSignals does. This should be moved to a common place for reuse - ctx := context.CurrentContext() - mode := ctx.KubernetesMode() - cfg.KubernetesMode = mode - - mode = ctx.Mode() if cfg.KubernetesMode != "" { - cfg.ClusterName = hostedIn + searchKeys := []string{ + common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignals, "hosted_in"), + common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignalsFallback, "hosted_in"), + common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.KubernetesKey, "cluster_name"), + } + + var clusterName string + var found bool + + for _, path := range searchKeys { + val, ok := common.GetString(conf, path) + if ok && val != "" { + clusterName = val + found = true + break + } + } + + if !found { + envVarClusterName := os.Getenv("K8S_CLUSTER_NAME") + if envVarClusterName != "" { + clusterName = envVarClusterName + found = true + } + } + + if !found { + clusterName = util.GetClusterNameFromEc2Tagger() + } + + cfg.ClusterName = clusterName } // We want to keep platform config variable to be // anything that is non-Kubernetes related so the // processor can perform different logics for EKS // in EC2 or Non-EC2 - cfg.Platform = mode + cfg.Platform = ctx.Mode() return cfg, nil } diff --git a/translator/translate/otel/processor/awsentity/translator_test.go b/translator/translate/otel/processor/awsentity/translator_test.go index fb75ca8e1c..20bb59ee3c 100644 --- a/translator/translate/otel/processor/awsentity/translator_test.go +++ b/translator/translate/otel/processor/awsentity/translator_test.go @@ -20,6 +20,7 @@ func TestTranslate(t *testing.T) { input map[string]interface{} mode string kubernetesMode string + envClusterName string want *awsentity.Config }{ "OnlyProfile": { @@ -39,6 +40,35 @@ func TestTranslate(t *testing.T) { Platform: config.ModeEC2, }, }, + "KubernetesUnderLogs": { + input: map[string]interface{}{ + "logs": map[string]interface{}{ + "metrics_collected": map[string]interface{}{ + "kubernetes": map[string]interface{}{ + "cluster_name": "ci-logs", + }, + }, + }, + }, + mode: config.ModeEC2, + kubernetesMode: config.ModeEKS, + want: &awsentity.Config{ + ClusterName: "ci-logs", + KubernetesMode: config.ModeEKS, + Platform: config.ModeEC2, + }, + }, + "EnvVar": { + input: map[string]interface{}{}, + mode: config.ModeEC2, + kubernetesMode: config.ModeEKS, + envClusterName: "env-cluster", + want: &awsentity.Config{ + ClusterName: "env-cluster", + KubernetesMode: config.ModeEKS, + Platform: config.ModeEC2, + }, + }, "ECS": { input: map[string]interface{}{}, mode: config.ModeECS, @@ -56,6 +86,11 @@ func TestTranslate(t *testing.T) { context.CurrentContext().SetMode(testCase.mode) context.CurrentContext().SetKubernetesMode(testCase.kubernetesMode) } + if testCase.envClusterName != "" { + t.Setenv("K8S_CLUSTER_NAME", testCase.envClusterName) + } else { + t.Setenv("K8S_CLUSTER_NAME", "") + } tt := NewTranslator() assert.Equal(t, "awsentity", tt.ID().String()) conf := confmap.NewFromStringMap(testCase.input) diff --git a/translator/translate/otel/processor/k8sattributesprocessor/k8sattributes_agent.yaml b/translator/translate/otel/processor/k8sattributesprocessor/k8sattributes_agent.yaml new file mode 100644 index 0000000000..09469fb865 --- /dev/null +++ b/translator/translate/otel/processor/k8sattributesprocessor/k8sattributes_agent.yaml @@ -0,0 +1,16 @@ +pod_association: + - sources: + - from: connection +extract: + metadata: + - k8s.namespace.name + - k8s.pod.name + - k8s.replicaset.name + - k8s.deployment.name + - k8s.daemonset.name + - k8s.statefulset.name + - k8s.cronjob.name + - k8s.job.name + - k8s.node.name +filter: + node_from_env_var: K8S_NODE_NAME \ No newline at end of file diff --git a/translator/translate/otel/processor/k8sattributesprocessor/k8sattributes_gateway.yaml b/translator/translate/otel/processor/k8sattributesprocessor/k8sattributes_gateway.yaml new file mode 100644 index 0000000000..aa2ab8b119 --- /dev/null +++ b/translator/translate/otel/processor/k8sattributesprocessor/k8sattributes_gateway.yaml @@ -0,0 +1,14 @@ +pod_association: + - sources: + - from: connection +extract: + metadata: + - k8s.namespace.name + - k8s.pod.name + - k8s.replicaset.name + - k8s.deployment.name + - k8s.daemonset.name + - k8s.statefulset.name + - k8s.cronjob.name + - k8s.job.name + - k8s.node.name \ No newline at end of file diff --git a/translator/translate/otel/processor/k8sattributesprocessor/translator.go b/translator/translate/otel/processor/k8sattributesprocessor/translator.go new file mode 100644 index 0000000000..4893ef80c4 --- /dev/null +++ b/translator/translate/otel/processor/k8sattributesprocessor/translator.go @@ -0,0 +1,57 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package k8sattributesprocessor + +import ( + _ "embed" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor" + + "github.com/aws/amazon-cloudwatch-agent/translator/context" + "github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/processor" +) + +//go:embed k8sattributes_agent.yaml +var k8sAttributesAgentConfig string + +//go:embed k8sattributes_gateway.yaml +var k8sAttributesGatewayConfig string + +type translator struct { + name string + factory processor.Factory +} + +var _ common.Translator[component.Config] = (*translator)(nil) + +func NewTranslatorWithName(name string) common.Translator[component.Config] { + return &translator{name, k8sattributesprocessor.NewFactory()} +} + +func (t *translator) ID() component.ID { + return component.NewIDWithName(t.factory.Type(), t.name) +} + +func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) { + cfg := t.factory.CreateDefaultConfig().(*k8sattributesprocessor.Config) + currentContext := context.CurrentContext() + + if currentContext.KubernetesMode() == "" { + return nil, fmt.Errorf("k8sattributesprocessor is not supported in this context") + } + + switch workloadType := currentContext.WorkloadType(); workloadType { + case "DaemonSet": + return common.GetYamlFileToYamlConfig(cfg, k8sAttributesAgentConfig) + case "Deployment", "StatefulSet": + return common.GetYamlFileToYamlConfig(cfg, k8sAttributesGatewayConfig) + default: + return nil, fmt.Errorf("k8sattributesprocessor is not supported for workload type: %s", workloadType) + } +} diff --git a/translator/translate/otel/processor/k8sattributesprocessor/translator_test.go b/translator/translate/otel/processor/k8sattributesprocessor/translator_test.go new file mode 100644 index 0000000000..4f77f9e96d --- /dev/null +++ b/translator/translate/otel/processor/k8sattributesprocessor/translator_test.go @@ -0,0 +1,138 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package k8sattributesprocessor + +import ( + "fmt" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/confmap" + + "github.com/aws/amazon-cloudwatch-agent/translator/config" + "github.com/aws/amazon-cloudwatch-agent/translator/context" +) + +func TestTranslate(t *testing.T) { + testCases := map[string]struct { + input map[string]interface{} + mode string + kubernetesMode string + workloadType string + want *k8sattributesprocessor.Config + wantErr error + }{ + "DaemonSet": { + input: map[string]interface{}{}, + mode: config.ModeEC2, + kubernetesMode: config.ModeEKS, + workloadType: "DaemonSet", + want: &k8sattributesprocessor.Config{ + Association: []k8sattributesprocessor.PodAssociationConfig{ + { + Sources: []k8sattributesprocessor.PodAssociationSourceConfig{ + { + From: "connection", + }, + }, + }, + }, + Extract: k8sattributesprocessor.ExtractConfig{ + Metadata: []string{"k8s.namespace.name", "k8s.pod.name", "k8s.replicaset.name", "k8s.deployment.name", "k8s.daemonset.name", "k8s.statefulset.name", "k8s.cronjob.name", "k8s.job.name", "k8s.node.name"}, + }, + Filter: k8sattributesprocessor.FilterConfig{ + NodeFromEnvVar: "K8S_NODE_NAME", + }, + }, + }, + "Deployment": { + input: map[string]interface{}{}, + mode: config.ModeEC2, + kubernetesMode: config.ModeEKS, + workloadType: "Deployment", + want: &k8sattributesprocessor.Config{ + Association: []k8sattributesprocessor.PodAssociationConfig{ + { + Sources: []k8sattributesprocessor.PodAssociationSourceConfig{ + { + From: "connection", + }, + }, + }, + }, + Extract: k8sattributesprocessor.ExtractConfig{ + Metadata: []string{"k8s.namespace.name", "k8s.pod.name", "k8s.replicaset.name", "k8s.deployment.name", "k8s.daemonset.name", "k8s.statefulset.name", "k8s.cronjob.name", "k8s.job.name", "k8s.node.name"}, + }, + Filter: k8sattributesprocessor.FilterConfig{ + NodeFromEnvVar: "", + }, + }, + }, + "StatefulSet": { + input: map[string]interface{}{}, + mode: config.ModeEC2, + kubernetesMode: config.ModeEKS, + workloadType: "StatefulSet", + want: &k8sattributesprocessor.Config{ + Association: []k8sattributesprocessor.PodAssociationConfig{ + { + Sources: []k8sattributesprocessor.PodAssociationSourceConfig{ + { + From: "connection", + }, + }, + }, + }, + Extract: k8sattributesprocessor.ExtractConfig{ + Metadata: []string{"k8s.namespace.name", "k8s.pod.name", "k8s.replicaset.name", "k8s.deployment.name", "k8s.daemonset.name", "k8s.statefulset.name", "k8s.cronjob.name", "k8s.job.name", "k8s.node.name"}, + }, + Filter: k8sattributesprocessor.FilterConfig{ + NodeFromEnvVar: "", + }, + }, + }, + "NotKubernetes": { + input: map[string]interface{}{}, + mode: config.ModeEC2, + kubernetesMode: "", + workloadType: "Unknown", + wantErr: fmt.Errorf("k8sattributesprocessor is not supported in this context"), + }, + "Unknown": { + input: map[string]interface{}{}, + mode: config.ModeEC2, + kubernetesMode: config.ModeEKS, + workloadType: "Unknown", + wantErr: fmt.Errorf("k8sattributesprocessor is not supported for workload type: Unknown"), + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + context.CurrentContext().SetMode(testCase.mode) + context.CurrentContext().SetKubernetesMode(testCase.kubernetesMode) + context.CurrentContext().SetWorkloadType(testCase.workloadType) + tt := NewTranslatorWithName("") + assert.Equal(t, "k8sattributes", tt.ID().String()) + conf := confmap.NewFromStringMap(testCase.input) + got, err := tt.Translate(conf) + + if testCase.wantErr != nil { + assert.Error(t, err) + assert.Equal(t, testCase.wantErr.Error(), err.Error()) + return + } + + assert.NoError(t, err) + if testCase.want == nil { + assert.Nil(t, got) + } else { + expect := got.(*k8sattributesprocessor.Config) + assert.Equal(t, testCase.want.Association, expect.Association) + assert.Equal(t, testCase.want.Extract, expect.Extract) + assert.Equal(t, testCase.want.Filter, expect.Filter) + } + }) + } +} diff --git a/translator/util/eksdetector/eksdetector.go b/translator/util/eksdetector/eksdetector.go index 58830883d6..ab8d19a778 100644 --- a/translator/util/eksdetector/eksdetector.go +++ b/translator/util/eksdetector/eksdetector.go @@ -6,6 +6,7 @@ package eksdetector import ( "context" "fmt" + "os" "sync" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -15,6 +16,7 @@ import ( type Detector interface { getConfigMap(namespace string, name string) (map[string]string, error) + getWorkloadType() (string, error) } type EksDetector struct { @@ -22,8 +24,9 @@ type EksDetector struct { } type IsEKSCache struct { - Value bool - Err error + Value bool + Workload string + Err error } const ( @@ -60,6 +63,7 @@ var ( once.Do(func() { var errors error var value bool + var workloadType string // Create eks detector eksDetector, err := NewDetector() if err != nil { @@ -72,8 +76,11 @@ var ( if err == nil { value = awsAuth != nil } + + // Get workload type + workloadType, _ = eksDetector.getWorkloadType() } - isEKSCacheSingleton = IsEKSCache{Value: value, Err: errors} + isEKSCacheSingleton = IsEKSCache{Value: value, Workload: workloadType, Err: errors} }) return isEKSCacheSingleton @@ -90,6 +97,33 @@ func (d *EksDetector) getConfigMap(namespace string, name string) (map[string]st return configMap.Data, nil } +func (d *EksDetector) getWorkloadType() (string, error) { + podName := os.Getenv("K8S_POD_NAME") + namespace := os.Getenv("K8S_NAMESPACE") + + if podName == "" || namespace == "" { + return "", fmt.Errorf("K8S_POD_NAME/K8S_NAMESPACE environment variables not set") + } + + pod, err := d.Clientset.CoreV1().Pods(namespace).Get(context.TODO(), podName, metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("failed to get pod: %v", err) + } + + for _, owner := range pod.OwnerReferences { + switch owner.Kind { + case "DaemonSet": + return "DaemonSet", nil + case "StatefulSet": + return "StatefulSet", nil + case "ReplicaSet": + return "Deployment", nil + } + } + + return "Unknown", nil +} + func getClient() (kubernetes.Interface, error) { //Get cluster config confs, err := getInClusterConfig() diff --git a/translator/util/eksdetector/eksdetector_test.go b/translator/util/eksdetector/eksdetector_test.go index a5dafaf446..6c74fb0085 100644 --- a/translator/util/eksdetector/eksdetector_test.go +++ b/translator/util/eksdetector/eksdetector_test.go @@ -5,6 +5,7 @@ package eksdetector import ( "fmt" + "os" "testing" "github.com/stretchr/testify/assert" @@ -82,6 +83,134 @@ func Test_getConfigMap(t *testing.T) { assert.NotNil(t, res) } +func Test_getWorkloadType_EnvNotSet(t *testing.T) { + client := fake.NewSimpleClientset() + testDetector := &EksDetector{Clientset: client} + + workloadType, err := testDetector.getWorkloadType() + assert.Error(t, err) + assert.Contains(t, err.Error(), "K8S_POD_NAME/K8S_NAMESPACE environment variables not set") + assert.Equal(t, "", workloadType) +} + +func Test_getWorkloadType_PodNotFound(t *testing.T) { + os.Setenv("K8S_POD_NAME", "nonexistent-pod") + os.Setenv("K8S_NAMESPACE", "default") + + client := fake.NewSimpleClientset() + testDetector := &EksDetector{Clientset: client} + + workloadType, err := testDetector.getWorkloadType() + assert.Error(t, err) + assert.Equal(t, "", workloadType) +} + +func Test_getWorkloadType_DaemonSet(t *testing.T) { + podName := "test-pod" + namespace := "default" + os.Setenv("K8S_POD_NAME", podName) + os.Setenv("K8S_NAMESPACE", namespace) + + // Create a pod with an OwnerReference of kind DaemonSet + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "DaemonSet", + Name: "test-daemonset", + }, + }, + }, + } + + client := fake.NewSimpleClientset(pod) + testDetector := &EksDetector{Clientset: client} + + workloadType, err := testDetector.getWorkloadType() + assert.NoError(t, err) + assert.Equal(t, "DaemonSet", workloadType) +} + +func Test_getWorkloadType_StatefulSet(t *testing.T) { + podName := "test-pod" + namespace := "default" + os.Setenv("K8S_POD_NAME", podName) + os.Setenv("K8S_NAMESPACE", namespace) + + // Create a pod with an OwnerReference of kind StatefulSet + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "StatefulSet", + Name: "test-statefulset", + }, + }, + }, + } + + client := fake.NewSimpleClientset(pod) + testDetector := &EksDetector{Clientset: client} + + workloadType, err := testDetector.getWorkloadType() + assert.NoError(t, err) + assert.Equal(t, "StatefulSet", workloadType) +} + +func Test_getWorkloadType_ReplicaSet(t *testing.T) { + podName := "test-pod" + namespace := "default" + os.Setenv("K8S_POD_NAME", podName) + os.Setenv("K8S_NAMESPACE", namespace) + + // Create a pod with an OwnerReference of kind ReplicaSet + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "test-replicaset", + }, + }, + }, + } + + client := fake.NewSimpleClientset(pod) + testDetector := &EksDetector{Clientset: client} + + workloadType, err := testDetector.getWorkloadType() + assert.NoError(t, err) + assert.Equal(t, "Deployment", workloadType) +} + +func Test_getWorkloadType_Unknown(t *testing.T) { + podName := "test-pod" + namespace := "default" + os.Setenv("K8S_POD_NAME", podName) + os.Setenv("K8S_NAMESPACE", namespace) + + // Create a pod with no OwnerReferences + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: namespace, + }, + } + + client := fake.NewSimpleClientset(pod) + testDetector := &EksDetector{Clientset: client} + + workloadType, err := testDetector.getWorkloadType() + assert.NoError(t, err) + assert.Equal(t, "Unknown", workloadType) +} + func Test_getClientError(t *testing.T) { //InClusterConfig error getInClusterConfig = func() (*rest.Config, error) { diff --git a/translator/util/eksdetector/eksdetectortestutil.go b/translator/util/eksdetector/eksdetectortestutil.go index c3b14a48c5..da093a0a75 100644 --- a/translator/util/eksdetector/eksdetectortestutil.go +++ b/translator/util/eksdetector/eksdetectortestutil.go @@ -44,3 +44,8 @@ func (detector *MockDetector) getConfigMap(namespace string, name string) (map[s args := detector.Called(namespace, name) return args.Get(0).(map[string]string), args.Error(1) } + +func (detector *MockDetector) getWorkloadType() (string, error) { + args := detector.Called() + return args.Get(0).(string), args.Error(1) +} diff --git a/translator/util/sdkutil.go b/translator/util/sdkutil.go index d0bb20510b..4a7cb577fb 100644 --- a/translator/util/sdkutil.go +++ b/translator/util/sdkutil.go @@ -80,6 +80,16 @@ func DetectKubernetesMode(configuredMode string) string { } +func DetectWorkloadType() string { + isEKS := IsEKS() + + if isEKS.Err != nil { + return "" + } + + return isEKS.Workload +} + func SDKRegionWithCredsMap(mode string, credsConfig map[string]string) (region string) { credsMap := GetCredentials(mode, credsConfig) diff --git a/translator/util/sdkutil_test.go b/translator/util/sdkutil_test.go index 94d4d4f3c6..0db425a3a8 100644 --- a/translator/util/sdkutil_test.go +++ b/translator/util/sdkutil_test.go @@ -56,3 +56,19 @@ func TestDetectKubernetesMode(t *testing.T) { }) } } + +func TestDetectWorkloadType(t *testing.T) { + t.Run("IsEKS returns an error", func(t *testing.T) { + IsEKS = func() eksdetector.IsEKSCache { + return eksdetector.IsEKSCache{Err: fmt.Errorf("test error")} + } + require.Equal(t, "", DetectWorkloadType(), "Expected empty workload when IsEKS returns an error") + }) + + t.Run("IsEKS returns a valid workload", func(t *testing.T) { + IsEKS = func() eksdetector.IsEKSCache { + return eksdetector.IsEKSCache{Err: nil, Workload: "DaemonSet"} + } + require.Equal(t, "DaemonSet", DetectWorkloadType(), "Expected workload 'DaemonSet' when IsEKS returns no error") + }) +}