Skip to content

Commit

Permalink
Add k8sattributesprocessor to k8s otlp pipeline with workload type de…
Browse files Browse the repository at this point in the history
…tection
  • Loading branch information
musa-asad committed Feb 3, 2025
1 parent 77aa32a commit 58fa4a9
Show file tree
Hide file tree
Showing 18 changed files with 570 additions and 26 deletions.
1 change: 1 addition & 0 deletions cmd/config-translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func initFlags() {
mode := translatorUtil.DetectAgentMode(*inputMode)
ctx.SetMode(mode)
ctx.SetKubernetesMode(translatorUtil.DetectKubernetesMode(mode))
ctx.SetWorkloadType(translatorUtil.DetectWorkloadType())
}

/**
Expand Down
9 changes: 9 additions & 0 deletions translator/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Context struct {
outputTomlFilePath string
mode string
kubernetesMode string
workloadType string
shortMode string
credentials map[string]string
proxy map[string]string
Expand Down Expand Up @@ -99,6 +100,10 @@ func (ctx *Context) KubernetesMode() string {
return ctx.kubernetesMode
}

func (ctx *Context) WorkloadType() string {
return ctx.workloadType
}

func (ctx *Context) ShortMode() string {
return ctx.shortMode
}
Expand Down Expand Up @@ -150,6 +155,10 @@ func (ctx *Context) SetKubernetesMode(mode string) {
}
}

func (ctx *Context) SetWorkloadType(workloadType string) {
ctx.workloadType = workloadType
}

func (ctx *Context) SetCredentials(creds map[string]string) {
ctx.credentials = creds
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,34 @@ processors:
match_type: ""
initial_value: 2
max_staleness: 0s
k8sattributes/hostOtlpMetrics/cloudwatchlogs:
auth_type: serviceAccount
context: ""
exclude:
pods:
- name: jaeger-agent
- name: jaeger-collector
extract:
metadata:
- k8s.namespace.name
- k8s.pod.name
- k8s.replicaset.name
- k8s.deployment.name
- k8s.daemonset.name
- k8s.statefulset.name
- k8s.cronjob.name
- k8s.job.name
- k8s.node.name
filter:
namespace: ""
node: ""
node_from_env_var: K8S_NODE_NAME
kube_config_path: ""
passthrough: false
pod_association:
- sources:
- from: connection
name: ""
receivers:
otlp/metrics:
protocols:
Expand Down Expand Up @@ -121,8 +149,9 @@ service:
exporters:
- awsemf
processors:
- awsentity/service/otlp
- cumulativetodelta/hostOtlpMetrics/cloudwatchlogs
- k8sattributes/hostOtlpMetrics/cloudwatchlogs
- awsentity/service/otlp
- batch/hostOtlpMetrics/cloudwatchlogs
receivers:
- otlp/metrics
Expand Down
31 changes: 30 additions & 1 deletion translator/tocwconfig/sampleConfig/otlp_metrics_eks_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,34 @@ processors:
imds_retries: 1
middleware: agenthealth/statuscode
refresh_interval_seconds: 0s
k8sattributes/hostOtlpMetrics:
auth_type: serviceAccount
context: ""
exclude:
pods:
- name: jaeger-agent
- name: jaeger-collector
extract:
metadata:
- k8s.namespace.name
- k8s.pod.name
- k8s.replicaset.name
- k8s.deployment.name
- k8s.daemonset.name
- k8s.statefulset.name
- k8s.cronjob.name
- k8s.job.name
- k8s.node.name
filter:
namespace: ""
node: ""
node_from_env_var: K8S_NODE_NAME
kube_config_path: ""
passthrough: false
pod_association:
- sources:
- from: connection
name: ""
receivers:
otlp/metrics:
protocols:
Expand Down Expand Up @@ -106,9 +134,10 @@ service:
exporters:
- awscloudwatch
processors:
- awsentity/service/otlp
- cumulativetodelta/hostOtlpMetrics
- ec2tagger
- k8sattributes/hostOtlpMetrics
- awsentity/service/otlp
receivers:
- otlp/metrics
telemetry:
Expand Down
2 changes: 2 additions & 0 deletions translator/tocwconfig/tocwconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func TestOtlpMetricsConfigKubernetes(t *testing.T) {
resetContext(t)
context.CurrentContext().SetMode(config.ModeEC2)
context.CurrentContext().SetKubernetesMode(config.ModeK8sEC2)
context.CurrentContext().SetWorkloadType("DaemonSet")
context.CurrentContext().SetRunInContainer(true)
checkTranslation(t, "otlp_metrics_eks_config", "linux", nil, "")
checkTranslation(t, "otlp_metrics_eks_config", "darwin", nil, "")
Expand All @@ -315,6 +316,7 @@ func TestOtlpMetricsEmfConfigKubernetes(t *testing.T) {
resetContext(t)
context.CurrentContext().SetMode(config.ModeEC2)
context.CurrentContext().SetKubernetesMode(config.ModeK8sEC2)
context.CurrentContext().SetWorkloadType("DaemonSet")
context.CurrentContext().SetRunInContainer(true)
t.Setenv(config.HOST_NAME, "host_name_from_env")
checkTranslation(t, "otlp_metrics_cloudwatchlogs_eks_config", "linux", nil, "")
Expand Down
2 changes: 2 additions & 0 deletions translator/translate/otel/pipeline/host/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/cumulativetodeltaprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/ec2taggerprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/k8sattributesprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/metricsdecorator"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/rollupprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil"
Expand Down Expand Up @@ -106,6 +107,7 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators,
case common.PipelineNameHostOtlpMetrics:
// TODO: For OTLP, the entity processor is only on K8S for now. Eventually this should be added to EC2
if currentContext.KubernetesMode() != "" {
translators.Processors.Set(k8sattributesprocessor.NewTranslatorWithName(t.name))
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, common.OtlpKey, false)
}
case common.PipelineNameHostCustomMetrics:
Expand Down
4 changes: 2 additions & 2 deletions translator/translate/otel/pipeline/host/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineID: "metrics/hostOtlpMetrics",
receivers: []string{"nop", "other"},
processors: []string{"cumulativetodelta/hostOtlpMetrics", "awsentity/service/otlp"},
processors: []string{"cumulativetodelta/hostOtlpMetrics", "k8sattributes/hostOtlpMetrics", "awsentity/service/otlp"},
exporters: []string{"awscloudwatch"},
extensions: []string{"agenthealth/metrics", "agenthealth/statuscode"},
},
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineID: "metrics/hostOtlpMetrics/cloudwatchlogs",
receivers: []string{"nop", "other"},
processors: []string{"cumulativetodelta/hostOtlpMetrics/cloudwatchlogs", "awsentity/service/otlp", "batch/hostOtlpMetrics/cloudwatchlogs"},
processors: []string{"cumulativetodelta/hostOtlpMetrics/cloudwatchlogs", "k8sattributes/hostOtlpMetrics/cloudwatchlogs", "awsentity/service/otlp", "batch/hostOtlpMetrics/cloudwatchlogs"},
exporters: []string{"awsemf"},
extensions: []string{"agenthealth/logs", "agenthealth/statuscode"},
},
Expand Down
56 changes: 37 additions & 19 deletions translator/translate/otel/processor/awsentity/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package awsentity

import (
"os"
"strings"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -55,8 +56,10 @@ func (t *translator) ID() component.ID {
}

func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) {
ctx := context.CurrentContext()

// Do not send entity for ECS
if context.CurrentContext().RunInContainer() && ecsutil.GetECSUtilSingleton().IsECS() {
if ctx.RunInContainer() && ecsutil.GetECSUtilSingleton().IsECS() {
return nil, nil
}

Expand All @@ -70,32 +73,47 @@ func (t *translator) Translate(conf *confmap.Conf) (component.Config, error) {
cfg.ScrapeDatapointAttribute = true
}

hostedInConfigKey := common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignals, "hosted_in")
hostedIn, hostedInConfigured := common.GetString(conf, hostedInConfigKey)
if !hostedInConfigured {
hostedInConfigKey = common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignalsFallback, "hosted_in")
hostedIn, hostedInConfigured = common.GetString(conf, hostedInConfigKey)
}
if common.IsAppSignalsKubernetes() {
if !hostedInConfigured {
hostedIn = util.GetClusterNameFromEc2Tagger()
}
}
cfg.KubernetesMode = ctx.KubernetesMode()

//TODO: This logic is more or less identical to what AppSignals does. This should be moved to a common place for reuse
ctx := context.CurrentContext()
mode := ctx.KubernetesMode()
cfg.KubernetesMode = mode

mode = ctx.Mode()
if cfg.KubernetesMode != "" {
cfg.ClusterName = hostedIn
searchKeys := []string{
common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignals, "hosted_in"),
common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.AppSignalsFallback, "hosted_in"),
common.ConfigKey(common.LogsKey, common.MetricsCollectedKey, common.KubernetesKey, "cluster_name"),
}

var clusterName string
var found bool

for _, path := range searchKeys {
val, ok := common.GetString(conf, path)
if ok && val != "" {
clusterName = val
found = true
break
}
}

if !found {
envVarClusterName := os.Getenv("K8S_CLUSTER_NAME")
if envVarClusterName != "" {
clusterName = envVarClusterName
found = true
}
}

if !found {
clusterName = util.GetClusterNameFromEc2Tagger()
}

cfg.ClusterName = clusterName
}

// We want to keep platform config variable to be
// anything that is non-Kubernetes related so the
// processor can perform different logics for EKS
// in EC2 or Non-EC2
cfg.Platform = mode
cfg.Platform = ctx.Mode()
return cfg, nil
}
35 changes: 35 additions & 0 deletions translator/translate/otel/processor/awsentity/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestTranslate(t *testing.T) {
input map[string]interface{}
mode string
kubernetesMode string
envClusterName string
want *awsentity.Config
}{
"OnlyProfile": {
Expand All @@ -39,6 +40,35 @@ func TestTranslate(t *testing.T) {
Platform: config.ModeEC2,
},
},
"KubernetesUnderLogs": {
input: map[string]interface{}{
"logs": map[string]interface{}{
"metrics_collected": map[string]interface{}{
"kubernetes": map[string]interface{}{
"cluster_name": "ci-logs",
},
},
},
},
mode: config.ModeEC2,
kubernetesMode: config.ModeEKS,
want: &awsentity.Config{
ClusterName: "ci-logs",
KubernetesMode: config.ModeEKS,
Platform: config.ModeEC2,
},
},
"EnvVar": {
input: map[string]interface{}{},
mode: config.ModeEC2,
kubernetesMode: config.ModeEKS,
envClusterName: "env-cluster",
want: &awsentity.Config{
ClusterName: "env-cluster",
KubernetesMode: config.ModeEKS,
Platform: config.ModeEC2,
},
},
"ECS": {
input: map[string]interface{}{},
mode: config.ModeECS,
Expand All @@ -56,6 +86,11 @@ func TestTranslate(t *testing.T) {
context.CurrentContext().SetMode(testCase.mode)
context.CurrentContext().SetKubernetesMode(testCase.kubernetesMode)
}
if testCase.envClusterName != "" {
t.Setenv("K8S_CLUSTER_NAME", testCase.envClusterName)
} else {
t.Setenv("K8S_CLUSTER_NAME", "")
}
tt := NewTranslator()
assert.Equal(t, "awsentity", tt.ID().String())
conf := confmap.NewFromStringMap(testCase.input)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pod_association:
- sources:
- from: connection
extract:
metadata:
- k8s.namespace.name
- k8s.pod.name
- k8s.replicaset.name
- k8s.deployment.name
- k8s.daemonset.name
- k8s.statefulset.name
- k8s.cronjob.name
- k8s.job.name
- k8s.node.name
filter:
node_from_env_var: K8S_NODE_NAME
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pod_association:
- sources:
- from: connection
extract:
metadata:
- k8s.namespace.name
- k8s.pod.name
- k8s.replicaset.name
- k8s.deployment.name
- k8s.daemonset.name
- k8s.statefulset.name
- k8s.cronjob.name
- k8s.job.name
- k8s.node.name
Loading

0 comments on commit 58fa4a9

Please sign in to comment.