From b55abe4dd62a4da9eefeb831e9226b8027e2da38 Mon Sep 17 00:00:00 2001 From: Rick Date: Mon, 3 Feb 2025 18:05:51 -0500 Subject: [PATCH 1/2] Add Confused Deputy Prevention (#1503) --- .github/workflows/ec2-integration-test.yml | 6 +- cfg/aws/credentials.go | 36 ++++++++- cfg/aws/credentials_test.go | 89 ++++++++++++++++++++++ cfg/envconfig/envconfig.go | 4 + 4 files changed, 131 insertions(+), 4 deletions(-) create mode 100644 cfg/aws/credentials_test.go diff --git a/.github/workflows/ec2-integration-test.yml b/.github/workflows/ec2-integration-test.yml index 2a31c4d468..5d1eb6245c 100644 --- a/.github/workflows/ec2-integration-test.yml +++ b/.github/workflows/ec2-integration-test.yml @@ -92,9 +92,11 @@ jobs: terraform init if terraform apply --auto-approve \ - -var="ssh_key_value=${{env.PRIVATE_KEY}}" -var="github_test_repo=${{ inputs.test_repo_url }}" \ + -var="ssh_key_value=${{env.PRIVATE_KEY}}" \ + -var="github_test_repo=${{ inputs.test_repo_url }}" \ -var="test_name=${{ matrix.arrays.os }}" \ - -var="cwa_github_sha=${{inputs.github_sha}}" -var="install_agent=${{ matrix.arrays.installAgentCommand }}" \ + -var="cwa_github_sha=${{inputs.github_sha}}" \ + -var="install_agent=${{ matrix.arrays.installAgentCommand }}" \ -var="github_test_repo_branch=${{inputs.test_repo_branch}}" \ -var="ec2_instance_type=${{ matrix.arrays.instanceType }}" \ -var="user=${{ matrix.arrays.username }}" \ diff --git a/cfg/aws/credentials.go b/cfg/aws/credentials.go index c0867ba258..7701d6b7f3 100644 --- a/cfg/aws/credentials.go +++ b/cfg/aws/credentials.go @@ -16,9 +16,11 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/endpoints" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sts" + "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" "github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent" ) @@ -174,7 +176,7 @@ func (s *stsCredentialProvider) Retrieve() (credentials.Value, error) { func newStsCredentials(c client.ConfigProvider, roleARN string, region string) *credentials.Credentials { regional := &stscreds.AssumeRoleProvider{ - Client: sts.New(c, &aws.Config{ + Client: newStsClient(c, &aws.Config{ Region: aws.String(region), STSRegionalEndpoint: endpoints.RegionalSTSEndpoint, HTTPClient: &http.Client{Timeout: 1 * time.Minute}, @@ -188,7 +190,7 @@ func newStsCredentials(c client.ConfigProvider, roleARN string, region string) * fallbackRegion := getFallbackRegion(region) partitional := &stscreds.AssumeRoleProvider{ - Client: sts.New(c, &aws.Config{ + Client: newStsClient(c, &aws.Config{ Region: aws.String(fallbackRegion), Endpoint: aws.String(getFallbackEndpoint(fallbackRegion)), STSRegionalEndpoint: endpoints.RegionalSTSEndpoint, @@ -203,6 +205,36 @@ func newStsCredentials(c client.ConfigProvider, roleARN string, region string) * return credentials.NewCredentials(&stsCredentialProvider{regional: regional, partitional: partitional}) } +const ( + SourceArnHeaderKey = "x-amz-source-arn" + SourceAccountHeaderKey = "x-amz-source-account" +) + +// newStsClient creates a new STS client with the provided config and options. +// Additionally, if specific environment variables are set, it also appends the confused deputy headers to requests +// made by the client. These headers allow resource-based policies to limit the permissions that a service has to +// a specific resource. Note that BOTH environment variables need to contain non-empty values in order for the headers +// to be set. +// +// See https://docs.aws.amazon.com/IAM/latest/UserGuide/confused-deputy.html#cross-service-confused-deputy-prevention +func newStsClient(p client.ConfigProvider, cfgs ...*aws.Config) *sts.STS { + + sourceAccount := os.Getenv(envconfig.AmzSourceAccount) + sourceArn := os.Getenv(envconfig.AmzSourceArn) + + client := sts.New(p, cfgs...) + if sourceAccount != "" && sourceArn != "" { + client.Handlers.Sign.PushFront(func(r *request.Request) { + r.HTTPRequest.Header.Set(SourceArnHeaderKey, sourceArn) + r.HTTPRequest.Header.Set(SourceAccountHeaderKey, sourceAccount) + }) + + log.Printf("I! Found confused deputy header environment variables: source account: %q, source arn: %q", sourceAccount, sourceArn) + } + + return client +} + // The partitional STS endpoint used to fallback when regional STS endpoint is not activated. func getFallbackEndpoint(region string) string { partition := getPartition(region) diff --git a/cfg/aws/credentials_test.go b/cfg/aws/credentials_test.go new file mode 100644 index 0000000000..722590db60 --- /dev/null +++ b/cfg/aws/credentials_test.go @@ -0,0 +1,89 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package aws + +import ( + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/awstesting/mock" + "github.com/aws/aws-sdk-go/service/sts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/aws/amazon-cloudwatch-agent/cfg/envconfig" +) + +func TestConfusedDeputyHeaders(t *testing.T) { + tests := []struct { + name string + envSourceArn string + envSourceAccount string + expectedHeaderArn string + expectedHeaderAccount string + }{ + { + name: "unpopulated", + envSourceArn: "", + envSourceAccount: "", + expectedHeaderArn: "", + expectedHeaderAccount: "", + }, + { + name: "both populated", + envSourceArn: "arn:aws:ec2:us-east-1:474668408639:instance/i-08293cd9825754f7c", + envSourceAccount: "539247453986", + expectedHeaderArn: "arn:aws:ec2:us-east-1:474668408639:instance/i-08293cd9825754f7c", + expectedHeaderAccount: "539247453986", + }, + { + name: "only source arn populated", + envSourceArn: "arn:aws:ec2:us-east-1:474668408639:instance/i-08293cd9825754f7c", + envSourceAccount: "", + expectedHeaderArn: "", + expectedHeaderAccount: "", + }, + { + name: "only source account populated", + envSourceArn: "", + envSourceAccount: "539247453986", + expectedHeaderArn: "", + expectedHeaderAccount: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + t.Setenv(envconfig.AmzSourceAccount, tt.envSourceAccount) + t.Setenv(envconfig.AmzSourceArn, tt.envSourceArn) + + client := newStsClient(mock.Session, &aws.Config{ + // These are examples credentials pulled from: + // https://docs.aws.amazon.com/STS/latest/APIReference/API_GetAccessKeyInfo.html + Credentials: credentials.NewStaticCredentials("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", ""), + Region: aws.String("us-east-1"), + }) + + request, _ := client.AssumeRoleRequest(&sts.AssumeRoleInput{ + // We aren't going to actually make the assume role call, we are just going + // to verify the headers are present once signed so the RoleArn and RoleSessionName + // arguments are irrelevant. Fill them out with something so the request is valid. + RoleArn: aws.String("arn:aws:iam::012345678912:role/XXXXXXXX"), + RoleSessionName: aws.String("MockSession"), + }) + + // Headers are generated after the request is signed (but before it's sent) + err := request.Sign() + require.NoError(t, err) + + headerSourceArn := request.HTTPRequest.Header.Get(SourceArnHeaderKey) + assert.Equal(t, tt.expectedHeaderArn, headerSourceArn) + + headerSourceAccount := request.HTTPRequest.Header.Get(SourceAccountHeaderKey) + assert.Equal(t, tt.expectedHeaderAccount, headerSourceAccount) + }) + } + +} diff --git a/cfg/envconfig/envconfig.go b/cfg/envconfig/envconfig.go index 3fd4b637d2..afbf4918de 100644 --- a/cfg/envconfig/envconfig.go +++ b/cfg/envconfig/envconfig.go @@ -32,6 +32,10 @@ const ( CWConfigContent = "CW_CONFIG_CONTENT" CWOtelConfigContent = "CW_OTEL_CONFIG_CONTENT" CWAgentMergedOtelConfig = "CWAGENT_MERGED_OTEL_CONFIG" + + // confused deputy prevention related headers + AmzSourceAccount = "AMZ_SOURCE_ACCOUNT" // populates the "x-amz-source-account" header + AmzSourceArn = "AMZ_SOURCE_ARN" // populates the "x-amz-source-arn" header ) const ( From c37b9f8f93d260980eb94c21c8fb7e739561582f Mon Sep 17 00:00:00 2001 From: Ping Xiang <64551395+pxaws@users.noreply.github.com> Date: Tue, 4 Feb 2025 14:32:21 -0800 Subject: [PATCH 2/2] refactor list and watch logic out (#1523) Co-authored-by: Ping Xiang <> --- .../internal/resolver/kubernetes.go | 393 -------- .../internal/resolver/kubernetes_test.go | 902 ------------------ .../internal/resolver/kubernetes_utils.go | 37 + .../resolver/kubernetes_utils_test.go | 229 +++++ .../internal/resolver/podwatcher.go | 198 ++++ .../internal/resolver/podwatcher_test.go | 517 ++++++++++ .../internal/resolver/servicetoworkload.go | 81 ++ .../resolver/servicetoworkload_test.go | 101 ++ .../internal/resolver/servicewatcher.go | 114 +++ .../internal/resolver/servicewatcher_test.go | 106 ++ 10 files changed, 1383 insertions(+), 1295 deletions(-) create mode 100644 plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils_test.go create mode 100644 plugins/processors/awsapplicationsignals/internal/resolver/podwatcher.go create mode 100644 plugins/processors/awsapplicationsignals/internal/resolver/podwatcher_test.go create mode 100644 plugins/processors/awsapplicationsignals/internal/resolver/servicetoworkload.go create mode 100644 plugins/processors/awsapplicationsignals/internal/resolver/servicetoworkload_test.go create mode 100644 plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher.go create mode 100644 plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher_test.go diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go index 753af4e60e..262c61cf64 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes.go @@ -11,14 +11,11 @@ import ( "sync" "time" - mapset "github.com/deckarep/golang-set/v2" "go.opentelemetry.io/collector/pdata/pcommon" semconv "go.opentelemetry.io/collector/semconv/v1.22.0" "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common" @@ -56,24 +53,6 @@ type kubernetesResolver struct { safeStopCh *safeChannel // trace and metric processors share the same kubernetesResolver and might close the same channel separately } -// a safe channel which can be closed multiple times -type safeChannel struct { - sync.Mutex - - ch chan struct{} - closed bool -} - -func (sc *safeChannel) Close() { - sc.Lock() - defer sc.Unlock() - - if !sc.closed { - close(sc.ch) - sc.closed = true - } -} - var ( once sync.Once instance *kubernetesResolver @@ -84,378 +63,6 @@ func jitterSleep(seconds int) { time.Sleep(jitter) } -// Deleter represents a type that can delete a key from a map after a certain delay. -type Deleter interface { - DeleteWithDelay(m *sync.Map, key interface{}) -} - -// TimedDeleter deletes a key after a specified delay. -type TimedDeleter struct { - Delay time.Duration -} - -func (td *TimedDeleter) DeleteWithDelay(m *sync.Map, key interface{}) { - go func() { - time.Sleep(td.Delay) - m.Delete(key) - }() -} - -func (s *serviceWatcher) onAddOrUpdateService(service *corev1.Service) { - // service can also have an external IP (or ingress IP) that could be accessed - // this field can be either an IP address (in some edge case) or a hostname (see "EXTERNAL-IP" column in "k get svc" output) - // [ec2-user@ip-172-31-11-104 one-step]$ k get svc -A - // NAMESPACE NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE - // default pet-clinic-frontend ClusterIP 10.100.216.182 8080/TCP 108m - // default vets-service ClusterIP 10.100.62.167 8083/TCP 108m - // default visits-service ClusterIP 10.100.96.5 8082/TCP 108m - // ingress-nginx default-http-backend ClusterIP 10.100.11.231 80/TCP 108m - // ingress-nginx ingress-nginx LoadBalancer 10.100.154.5 aex7997ece08c435dbd2b912fd5aa5bd-5372117830.xxxxx.elb.amazonaws.com 80:32080/TCP,443:32081/TCP,9113:30410/TCP 108m - // kube-system kube-dns ClusterIP 10.100.0.10 - // - // we ignore such case for now and may need to consider it in the future - if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != corev1.ClusterIPNone { - s.ipToServiceAndNamespace.Store(service.Spec.ClusterIP, getServiceAndNamespace(service)) - } - labelSet := mapset.NewSet[string]() - for key, value := range service.Spec.Selector { - labelSet.Add(key + "=" + value) - } - if labelSet.Cardinality() > 0 { - s.serviceAndNamespaceToSelectors.Store(getServiceAndNamespace(service), labelSet) - } -} - -func (s *serviceWatcher) onDeleteService(service *corev1.Service, deleter Deleter) { - if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != corev1.ClusterIPNone { - deleter.DeleteWithDelay(s.ipToServiceAndNamespace, service.Spec.ClusterIP) - } - deleter.DeleteWithDelay(s.serviceAndNamespaceToSelectors, getServiceAndNamespace(service)) -} - -func (p *podWatcher) removeHostNetworkRecords(pod *corev1.Pod) { - for _, port := range getHostNetworkPorts(pod) { - p.deleter.DeleteWithDelay(p.ipToPod, pod.Status.HostIP+":"+port) - } -} - -func (p *podWatcher) handlePodAdd(pod *corev1.Pod) { - if pod.Spec.HostNetwork && pod.Status.HostIP != "" { - for _, port := range getHostNetworkPorts(pod) { - p.ipToPod.Store(pod.Status.HostIP+":"+port, pod.Name) - } - } - if pod.Status.PodIP != "" { - p.ipToPod.Store(pod.Status.PodIP, pod.Name) - } -} - -func (p *podWatcher) handlePodUpdate(newPod *corev1.Pod, oldPod *corev1.Pod) { - // HostNetwork is an immutable field - if newPod.Spec.HostNetwork && oldPod.Status.HostIP != newPod.Status.HostIP { - if oldPod.Status.HostIP != "" { - p.logger.Debug("deleting host ip from cache", zap.String("hostNetwork", oldPod.Status.HostIP)) - p.removeHostNetworkRecords(oldPod) - } - if newPod.Status.HostIP != "" { - for _, port := range getHostNetworkPorts(newPod) { - p.ipToPod.Store(newPod.Status.HostIP+":"+port, newPod.Name) - } - } - } - if oldPod.Status.PodIP != newPod.Status.PodIP { - if oldPod.Status.PodIP != "" { - p.logger.Debug("deleting pod ip from cache", zap.String("podNetwork", oldPod.Status.PodIP)) - p.deleter.DeleteWithDelay(p.ipToPod, oldPod.Status.PodIP) - } - if newPod.Status.PodIP != "" { - p.ipToPod.Store(newPod.Status.PodIP, newPod.Name) - } - } -} - -func (p *podWatcher) onAddOrUpdatePod(pod, oldPod *corev1.Pod) { - if oldPod == nil { - p.handlePodAdd(pod) - } else { - p.handlePodUpdate(pod, oldPod) - } - - workloadAndNamespace := getWorkloadAndNamespace(pod) - - if workloadAndNamespace != "" { - p.podToWorkloadAndNamespace.Store(pod.Name, workloadAndNamespace) - podLabels := mapset.NewSet[string]() - for key, value := range pod.ObjectMeta.Labels { - podLabels.Add(key + "=" + value) - } - if podLabels.Cardinality() > 0 { - p.workloadAndNamespaceToLabels.Store(workloadAndNamespace, podLabels) - } - if oldPod == nil { - p.workloadPodCount[workloadAndNamespace]++ - p.logger.Debug("Added pod", zap.String("pod", pod.Name), zap.String("workload", workloadAndNamespace), zap.Int("count", p.workloadPodCount[workloadAndNamespace])) - } - } -} - -func (p *podWatcher) onDeletePod(obj interface{}) { - pod := obj.(*corev1.Pod) - if pod.Spec.HostNetwork && pod.Status.HostIP != "" { - p.logger.Debug("deleting host ip from cache", zap.String("hostNetwork", pod.Status.HostIP)) - p.removeHostNetworkRecords(pod) - } - if pod.Status.PodIP != "" { - p.logger.Debug("deleting pod ip from cache", zap.String("podNetwork", pod.Status.PodIP)) - p.deleter.DeleteWithDelay(p.ipToPod, pod.Status.PodIP) - } - - if workloadKey, ok := p.podToWorkloadAndNamespace.Load(pod.Name); ok { - workloadAndNamespace := workloadKey.(string) - p.workloadPodCount[workloadAndNamespace]-- - p.logger.Debug("decrementing pod count", zap.String("workload", workloadAndNamespace), zap.Int("podCount", p.workloadPodCount[workloadAndNamespace])) - if p.workloadPodCount[workloadAndNamespace] == 0 { - p.deleter.DeleteWithDelay(p.workloadAndNamespaceToLabels, workloadAndNamespace) - } - } else { - p.logger.Error("failed to load pod workloadKey", zap.String("pod", pod.Name)) - } - p.deleter.DeleteWithDelay(p.podToWorkloadAndNamespace, pod.Name) -} - -type podWatcher struct { - ipToPod *sync.Map - podToWorkloadAndNamespace *sync.Map - workloadAndNamespaceToLabels *sync.Map - workloadPodCount map[string]int - logger *zap.Logger - informer cache.SharedIndexInformer - deleter Deleter -} - -func newPodWatcher(logger *zap.Logger, informer cache.SharedIndexInformer, deleter Deleter) *podWatcher { - return &podWatcher{ - ipToPod: &sync.Map{}, - podToWorkloadAndNamespace: &sync.Map{}, - workloadAndNamespaceToLabels: &sync.Map{}, - workloadPodCount: make(map[string]int), - logger: logger, - informer: informer, - deleter: deleter, - } -} - -func (p *podWatcher) run(stopCh chan struct{}) { - p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - pod := obj.(*corev1.Pod) - p.logger.Debug("list and watch for pod: ADD " + pod.Name) - p.onAddOrUpdatePod(pod, nil) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - pod := newObj.(*corev1.Pod) - oldPod := oldObj.(*corev1.Pod) - p.logger.Debug("list and watch for pods: UPDATE " + pod.Name) - p.onAddOrUpdatePod(pod, oldPod) - }, - DeleteFunc: func(obj interface{}) { - pod := obj.(*corev1.Pod) - p.logger.Debug("list and watch for pods: DELETE " + pod.Name) - p.onDeletePod(obj) - }, - }) - - go p.informer.Run(stopCh) - -} - -func (p *podWatcher) waitForCacheSync(stopCh chan struct{}) { - if !cache.WaitForNamedCacheSync("podWatcher", stopCh, p.informer.HasSynced) { - p.logger.Fatal("timed out waiting for kubernetes pod watcher caches to sync") - } - - p.logger.Info("podWatcher: Cache synced") -} - -type serviceWatcher struct { - ipToServiceAndNamespace *sync.Map - serviceAndNamespaceToSelectors *sync.Map - logger *zap.Logger - informer cache.SharedIndexInformer - deleter Deleter -} - -func newServiceWatcher(logger *zap.Logger, informer cache.SharedIndexInformer, deleter Deleter) *serviceWatcher { - return &serviceWatcher{ - ipToServiceAndNamespace: &sync.Map{}, - serviceAndNamespaceToSelectors: &sync.Map{}, - logger: logger, - informer: informer, - deleter: deleter, - } -} - -func (s *serviceWatcher) Run(stopCh chan struct{}) { - s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - service := obj.(*corev1.Service) - s.logger.Debug("list and watch for services: ADD " + service.Name) - s.onAddOrUpdateService(service) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - service := newObj.(*corev1.Service) - s.logger.Debug("list and watch for services: UPDATE " + service.Name) - s.onAddOrUpdateService(service) - }, - DeleteFunc: func(obj interface{}) { - service := obj.(*corev1.Service) - s.logger.Debug("list and watch for services: DELETE " + service.Name) - s.onDeleteService(service, s.deleter) - }, - }) - go s.informer.Run(stopCh) -} - -func (s *serviceWatcher) waitForCacheSync(stopCh chan struct{}) { - if !cache.WaitForNamedCacheSync("serviceWatcher", stopCh, s.informer.HasSynced) { - s.logger.Fatal("timed out waiting for kubernetes service watcher caches to sync") - } - - s.logger.Info("serviceWatcher: Cache synced") -} - -type serviceToWorkloadMapper struct { - serviceAndNamespaceToSelectors *sync.Map - workloadAndNamespaceToLabels *sync.Map - serviceToWorkload *sync.Map - logger *zap.Logger - deleter Deleter -} - -func newServiceToWorkloadMapper(serviceAndNamespaceToSelectors, workloadAndNamespaceToLabels, serviceToWorkload *sync.Map, logger *zap.Logger, deleter Deleter) *serviceToWorkloadMapper { - return &serviceToWorkloadMapper{ - serviceAndNamespaceToSelectors: serviceAndNamespaceToSelectors, - workloadAndNamespaceToLabels: workloadAndNamespaceToLabels, - serviceToWorkload: serviceToWorkload, - logger: logger, - deleter: deleter, - } -} - -func (m *serviceToWorkloadMapper) mapServiceToWorkload() { - m.logger.Debug("Map service to workload at:", zap.Time("time", time.Now())) - - m.serviceAndNamespaceToSelectors.Range(func(key, value interface{}) bool { - var workloads []string - serviceAndNamespace := key.(string) - _, serviceNamespace := extractResourceAndNamespace(serviceAndNamespace) - serviceLabels := value.(mapset.Set[string]) - - m.workloadAndNamespaceToLabels.Range(func(workloadKey, labelsValue interface{}) bool { - labels := labelsValue.(mapset.Set[string]) - workloadAndNamespace := workloadKey.(string) - _, workloadNamespace := extractResourceAndNamespace(workloadAndNamespace) - if workloadNamespace == serviceNamespace && workloadNamespace != "" && serviceLabels.IsSubset(labels) { - m.logger.Debug("Found workload for service", zap.String("service", serviceAndNamespace), zap.String("workload", workloadAndNamespace)) - workloads = append(workloads, workloadAndNamespace) - } - - return true - }) - - if len(workloads) > 1 { - m.logger.Info("Multiple workloads found for service. You will get unexpected results.", zap.String("service", serviceAndNamespace), zap.Strings("workloads", workloads)) - } else if len(workloads) == 1 { - m.serviceToWorkload.Store(serviceAndNamespace, workloads[0]) - } else { - m.logger.Debug("No workload found for service", zap.String("service", serviceAndNamespace)) - m.deleter.DeleteWithDelay(m.serviceToWorkload, serviceAndNamespace) - } - return true - }) -} - -func (m *serviceToWorkloadMapper) Start(stopCh chan struct{}) { - // do the first mapping immediately - m.mapServiceToWorkload() - m.logger.Debug("First-time map service to workload at:", zap.Time("time", time.Now())) - - go func() { - for { - select { - case <-stopCh: - return - case <-time.After(time.Minute + 30*time.Second): - m.mapServiceToWorkload() - m.logger.Debug("Map service to workload at:", zap.Time("time", time.Now())) - } - } - }() -} - -// minimizePod removes fields that could contain large objects, and retain essential -// fields needed for IP/name translation. The following fields must be kept: -// - ObjectMeta: Namespace, Name, Labels, OwnerReference -// - Spec: HostNetwork, ContainerPorts -// - Status: PodIP/s, HostIP/s -func minimizePod(obj interface{}) (interface{}, error) { - if pod, ok := obj.(*corev1.Pod); ok { - pod.Annotations = nil - pod.Finalizers = nil - pod.ManagedFields = nil - - pod.Spec.Volumes = nil - pod.Spec.InitContainers = nil - pod.Spec.EphemeralContainers = nil - pod.Spec.ImagePullSecrets = nil - pod.Spec.HostAliases = nil - pod.Spec.SchedulingGates = nil - pod.Spec.ResourceClaims = nil - pod.Spec.Tolerations = nil - pod.Spec.Affinity = nil - - pod.Status.InitContainerStatuses = nil - pod.Status.ContainerStatuses = nil - pod.Status.EphemeralContainerStatuses = nil - - for i := 0; i < len(pod.Spec.Containers); i++ { - c := &pod.Spec.Containers[i] - c.Image = "" - c.Command = nil - c.Args = nil - c.EnvFrom = nil - c.Env = nil - c.Resources = corev1.ResourceRequirements{} - c.VolumeMounts = nil - c.VolumeDevices = nil - c.SecurityContext = nil - } - } - return obj, nil -} - -// minimizeService removes fields that could contain large objects, and retain essential -// fields needed for IP/name translation. The following fields must be kept: -// - ObjectMeta: Namespace, Name -// - Spec: Selectors, ClusterIP -func minimizeService(obj interface{}) (interface{}, error) { - if svc, ok := obj.(*corev1.Service); ok { - svc.Annotations = nil - svc.Finalizers = nil - svc.ManagedFields = nil - - svc.Spec.LoadBalancerSourceRanges = nil - svc.Spec.SessionAffinityConfig = nil - svc.Spec.IPFamilies = nil - svc.Spec.IPFamilyPolicy = nil - svc.Spec.InternalTrafficPolicy = nil - svc.Spec.InternalTrafficPolicy = nil - - svc.Status.Conditions = nil - } - return obj, nil -} - func getKubernetesResolver(platformCode, clusterName string, logger *zap.Logger) subResolver { once.Do(func() { config, err := clientcmd.BuildConfigFromFlags("", "") diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go index 254a02c14c..65728bd605 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_test.go @@ -9,15 +9,11 @@ import ( "strings" "sync" "testing" - "time" - mapset "github.com/deckarep/golang-set/v2" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" semconv "go.opentelemetry.io/collector/semconv/v1.22.0" "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/common" "github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsapplicationsignals/config" @@ -34,677 +30,6 @@ func (md *MockDeleter) DeleteWithDelay(m *sync.Map, key interface{}) { var mockDeleter = &MockDeleter{} -// TestAttachNamespace function -func TestAttachNamespace(t *testing.T) { - result := attachNamespace("testResource", "testNamespace") - if result != "testResource@testNamespace" { - t.Errorf("attachNamespace was incorrect, got: %s, want: %s.", result, "testResource@testNamespace") - } -} - -// TestGetServiceAndNamespace function -func TestGetServiceAndNamespace(t *testing.T) { - service := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testService", - Namespace: "testNamespace", - }, - } - result := getServiceAndNamespace(service) - if result != "testService@testNamespace" { - t.Errorf("getServiceAndNamespace was incorrect, got: %s, want: %s.", result, "testService@testNamespace") - } -} - -// TestExtractResourceAndNamespace function -func TestExtractResourceAndNamespace(t *testing.T) { - // Test normal case - name, namespace := extractResourceAndNamespace("testService@testNamespace") - if name != "testService" || namespace != "testNamespace" { - t.Errorf("extractResourceAndNamespace was incorrect, got: %s and %s, want: %s and %s.", name, namespace, "testService", "testNamespace") - } - - // Test invalid case - name, namespace = extractResourceAndNamespace("invalid") - if name != "" || namespace != "" { - t.Errorf("extractResourceAndNamespace was incorrect, got: %s and %s, want: %s and %s.", name, namespace, "", "") - } -} - -func TestExtractWorkloadNameFromRS(t *testing.T) { - testCases := []struct { - name string - replicaSetName string - want string - shouldErr bool - }{ - { - name: "Valid ReplicaSet Name", - replicaSetName: "my-deployment-5859ffc7ff", - want: "my-deployment", - shouldErr: false, - }, - { - name: "Invalid ReplicaSet Name - No Hyphen", - replicaSetName: "mydeployment5859ffc7ff", - want: "", - shouldErr: true, - }, - { - name: "Invalid ReplicaSet Name - Less Than 10 Suffix Characters", - replicaSetName: "my-deployment-bc2", - want: "", - shouldErr: true, - }, - { - name: "Invalid ReplicaSet Name - More Than 10 Suffix Characters", - replicaSetName: "my-deployment-5859ffc7ffx", - want: "", - shouldErr: true, - }, - { - name: "Invalid ReplicaSet Name - Invalid Characters in Suffix", - replicaSetName: "my-deployment-aeiou12345", - want: "", - shouldErr: true, - }, - { - name: "Invalid ReplicaSet Name - Empty String", - replicaSetName: "", - want: "", - shouldErr: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - got, err := extractWorkloadNameFromRS(tc.replicaSetName) - - if (err != nil) != tc.shouldErr { - t.Errorf("extractWorkloadNameFromRS() error = %v, wantErr %v", err, tc.shouldErr) - return - } - - if got != tc.want { - t.Errorf("extractWorkloadNameFromRS() = %v, want %v", got, tc.want) - } - }) - } -} - -func TestExtractWorkloadNameFromPodName(t *testing.T) { - testCases := []struct { - name string - podName string - want string - shouldErr bool - }{ - { - name: "Valid Pod Name", - podName: "my-replicaset-bc24f", - want: "my-replicaset", - shouldErr: false, - }, - { - name: "Invalid Pod Name - No Hyphen", - podName: "myreplicasetbc24f", - want: "", - shouldErr: true, - }, - { - name: "Invalid Pod Name - Less Than 5 Suffix Characters", - podName: "my-replicaset-bc2", - want: "", - shouldErr: true, - }, - { - name: "Invalid Pod Name - More Than 5 Suffix Characters", - podName: "my-replicaset-bc24f5", - want: "", - shouldErr: true, - }, - { - name: "Invalid Pod Name - Empty String", - podName: "", - want: "", - shouldErr: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - got, err := extractWorkloadNameFromPodName(tc.podName) - - if (err != nil) != tc.shouldErr { - t.Errorf("extractWorkloadNameFromPodName() error = %v, wantErr %v", err, tc.shouldErr) - return - } - - if got != tc.want { - t.Errorf("extractWorkloadNameFromPodName() = %v, want %v", got, tc.want) - } - }) - } -} - -// TestGetWorkloadAndNamespace function -func TestGetWorkloadAndNamespace(t *testing.T) { - // Test ReplicaSet case - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - OwnerReferences: []metav1.OwnerReference{ - { - Kind: "ReplicaSet", - Name: "testDeployment-5d68bc5f49", - }, - }, - }, - } - result := getWorkloadAndNamespace(pod) - if result != "testDeployment@testNamespace" { - t.Errorf("getDeploymentAndNamespace was incorrect, got: %s, want: %s.", result, "testDeployment@testNamespace") - } - - // Test StatefulSet case - pod.ObjectMeta.OwnerReferences[0].Kind = "StatefulSet" - pod.ObjectMeta.OwnerReferences[0].Name = "testStatefulSet" - result = getWorkloadAndNamespace(pod) - if result != "testStatefulSet@testNamespace" { - t.Errorf("getWorkloadAndNamespace was incorrect, got: %s, want: %s.", result, "testStatefulSet@testNamespace") - } - - // Test Other case - pod.ObjectMeta.OwnerReferences[0].Kind = "Other" - pod.ObjectMeta.OwnerReferences[0].Name = "testOther" - result = getWorkloadAndNamespace(pod) - if result != "" { - t.Errorf("getWorkloadAndNamespace was incorrect, got: %s, want: %s.", result, "") - } - - // Test no OwnerReferences case - pod.ObjectMeta.OwnerReferences = nil - result = getWorkloadAndNamespace(pod) - if result != "" { - t.Errorf("getWorkloadAndNamespace was incorrect, got: %s, want: %s.", result, "") - } -} - -func TestServiceToWorkloadMapper_MapServiceToWorkload(t *testing.T) { - logger, _ := zap.NewDevelopment() - - serviceAndNamespaceToSelectors := &sync.Map{} - workloadAndNamespaceToLabels := &sync.Map{} - serviceToWorkload := &sync.Map{} - - serviceAndNamespaceToSelectors.Store("service1@namespace1", mapset.NewSet("label1=value1", "label2=value2")) - workloadAndNamespaceToLabels.Store("deployment1@namespace1", mapset.NewSet("label1=value1", "label2=value2", "label3=value3")) - - mapper := newServiceToWorkloadMapper(serviceAndNamespaceToSelectors, workloadAndNamespaceToLabels, serviceToWorkload, logger, mockDeleter) - mapper.mapServiceToWorkload() - - if _, ok := serviceToWorkload.Load("service1@namespace1"); !ok { - t.Errorf("Expected service1@namespace1 to be mapped to a workload, but it was not") - } -} - -func TestServiceToWorkloadMapper_MapServiceToWorkload_NoWorkload(t *testing.T) { - logger, _ := zap.NewDevelopment() - - serviceAndNamespaceToSelectors := &sync.Map{} - workloadAndNamespaceToLabels := &sync.Map{} - serviceToWorkload := &sync.Map{} - - // Add a service with no matching workload - serviceAndNamespace := "service@namespace" - serviceAndNamespaceToSelectors.Store(serviceAndNamespace, mapset.NewSet("label1=value1")) - serviceToWorkload.Store(serviceAndNamespace, "workload@namespace") - - mapper := newServiceToWorkloadMapper(serviceAndNamespaceToSelectors, workloadAndNamespaceToLabels, serviceToWorkload, logger, mockDeleter) - mapper.mapServiceToWorkload() - - // Check that the service was deleted from serviceToWorkload - if _, ok := serviceToWorkload.Load(serviceAndNamespace); ok { - t.Errorf("Service was not deleted from serviceToWorkload") - } -} - -func TestServiceToWorkloadMapper_MapServiceToWorkload_MultipleWorkloads(t *testing.T) { - logger, _ := zap.NewDevelopment() - - serviceAndNamespaceToSelectors := &sync.Map{} - workloadAndNamespaceToLabels := &sync.Map{} - serviceToWorkload := &sync.Map{} - - serviceAndNamespace := "service@namespace" - serviceAndNamespaceToSelectors.Store(serviceAndNamespace, mapset.NewSet("label1=value1", "label2=value2")) - - // Add two workloads with matching labels to the service - workloadAndNamespaceToLabels.Store("workload1@namespace", mapset.NewSet("label1=value1", "label2=value2", "label3=value3")) - workloadAndNamespaceToLabels.Store("workload2@namespace", mapset.NewSet("label1=value1", "label2=value2", "label4=value4")) - - mapper := newServiceToWorkloadMapper(serviceAndNamespaceToSelectors, workloadAndNamespaceToLabels, serviceToWorkload, logger, mockDeleter) - mapper.mapServiceToWorkload() - - // Check that the service does not map to any workload - if _, ok := serviceToWorkload.Load(serviceAndNamespace); ok { - t.Errorf("Unexpected mapping of service to multiple workloads") - } -} - -func TestMapServiceToWorkload_StopsWhenSignaled(t *testing.T) { - logger, _ := zap.NewDevelopment() - - serviceAndNamespaceToSelectors := &sync.Map{} - workloadAndNamespaceToLabels := &sync.Map{} - serviceToWorkload := &sync.Map{} - - stopchan := make(chan struct{}) - - // Signal the stopchan to stop after 100 milliseconds - time.AfterFunc(100*time.Millisecond, func() { - close(stopchan) - }) - - mapper := newServiceToWorkloadMapper(serviceAndNamespaceToSelectors, workloadAndNamespaceToLabels, serviceToWorkload, logger, mockDeleter) - - start := time.Now() - mapper.Start(stopchan) - duration := time.Since(start) - - // Check that the function stopped in a reasonable time after the stop signal - if duration > 200*time.Millisecond { - t.Errorf("mapServiceToWorkload did not stop in a reasonable time after the stop signal, duration: %v", duration) - } -} - -func TestOnAddOrUpdateService(t *testing.T) { - // Create a fake service - service := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "myservice", - Namespace: "mynamespace", - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "1.2.3.4", - Selector: map[string]string{ - "app": "myapp", - }, - }, - } - - // Create the maps - ipToServiceAndNamespace := &sync.Map{} - serviceAndNamespaceToSelectors := &sync.Map{} - - // Call the function - svcWatcher := newServiceWatcherForTesting(ipToServiceAndNamespace, serviceAndNamespaceToSelectors) - svcWatcher.onAddOrUpdateService(service) - - // Check that the maps contain the expected entries - if _, ok := ipToServiceAndNamespace.Load("1.2.3.4"); !ok { - t.Errorf("ipToServiceAndNamespace does not contain the service IP") - } - if _, ok := serviceAndNamespaceToSelectors.Load("myservice@mynamespace"); !ok { - t.Errorf("serviceAndNamespaceToSelectors does not contain the service") - } -} - -func TestOnDeleteService(t *testing.T) { - // Create a fake service - service := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "myservice", - Namespace: "mynamespace", - }, - Spec: corev1.ServiceSpec{ - ClusterIP: "1.2.3.4", - Selector: map[string]string{ - "app": "myapp", - }, - }, - } - - // Create the maps and add the service to them - ipToServiceAndNamespace := &sync.Map{} - ipToServiceAndNamespace.Store("1.2.3.4", "myservice@mynamespace") - serviceAndNamespaceToSelectors := &sync.Map{} - serviceAndNamespaceToSelectors.Store("myservice@mynamespace", mapset.NewSet("app=myapp")) - - // Call the function - svcWatcher := newServiceWatcherForTesting(ipToServiceAndNamespace, serviceAndNamespaceToSelectors) - svcWatcher.onDeleteService(service, mockDeleter) - - // Check that the maps do not contain the service - if _, ok := ipToServiceAndNamespace.Load("1.2.3.4"); ok { - t.Errorf("ipToServiceAndNamespace still contains the service IP") - } - if _, ok := serviceAndNamespaceToSelectors.Load("myservice@mynamespace"); ok { - t.Errorf("serviceAndNamespaceToSelectors still contains the service") - } -} - -func TestOnAddOrUpdatePod(t *testing.T) { - t.Run("pod with both PodIP and HostIP", func(t *testing.T) { - ipToPod := &sync.Map{} - podToWorkloadAndNamespace := &sync.Map{} - workloadAndNamespaceToLabels := &sync.Map{} - workloadPodCount := map[string]int{} - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - OwnerReferences: []metav1.OwnerReference{ - { - Kind: "ReplicaSet", - Name: "testDeployment-598b89cd8d", - }, - }, - }, - Status: corev1.PodStatus{ - PodIP: "1.2.3.4", - HostIP: "5.6.7.8", - }, - } - - poWatcher := newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels, workloadPodCount) - poWatcher.onAddOrUpdatePod(pod, nil) - - // Test the mappings in ipToPod - if podName, _ := ipToPod.Load("1.2.3.4"); podName != "testPod" { - t.Errorf("ipToPod was incorrect, got: %s, want: %s.", podName, "testPod") - } - - // Test the mapping in podToWorkloadAndNamespace - if depAndNamespace, _ := podToWorkloadAndNamespace.Load("testPod"); depAndNamespace != "testDeployment@testNamespace" { - t.Errorf("podToWorkloadAndNamespace was incorrect, got: %s, want: %s.", depAndNamespace, "testDeployment@testNamespace") - } - - // Test the count in workloadPodCount - if count := workloadPodCount["testDeployment@testNamespace"]; count != 1 { - t.Errorf("workloadPodCount was incorrect, got: %d, want: %d.", count, 1) - } - }) - - t.Run("pod with only HostIP", func(t *testing.T) { - ipToPod := &sync.Map{} - podToWorkloadAndNamespace := &sync.Map{} - workloadAndNamespaceToLabels := &sync.Map{} - workloadPodCount := map[string]int{} - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - OwnerReferences: []metav1.OwnerReference{ - { - Kind: "ReplicaSet", - Name: "testDeployment-7b74958fb8", - }, - }, - }, - Status: corev1.PodStatus{ - HostIP: "5.6.7.8", - }, - Spec: corev1.PodSpec{ - HostNetwork: true, - Containers: []corev1.Container{ - { - Ports: []corev1.ContainerPort{ - { - HostPort: int32(8080), - }, - }, - }, - }, - }, - } - - poWatcher := newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels, workloadPodCount) - poWatcher.onAddOrUpdatePod(pod, nil) - - // Test the mappings in ipToPod - if podName, _ := ipToPod.Load("5.6.7.8:8080"); podName != "testPod" { - t.Errorf("ipToPod was incorrect, got: %s, want: %s.", podName, "testPod") - } - - // Test the mapping in podToWorkloadAndNamespace - if depAndNamespace, _ := podToWorkloadAndNamespace.Load("testPod"); depAndNamespace != "testDeployment@testNamespace" { - t.Errorf("podToWorkloadAndNamespace was incorrect, got: %s, want: %s.", depAndNamespace, "testDeployment@testNamespace") - } - - // Test the count in workloadPodCount - if count := workloadPodCount["testDeployment@testNamespace"]; count != 1 { - t.Errorf("workloadPodCount was incorrect, got: %d, want: %d.", count, 1) - } - }) - - t.Run("pod updated with different set of labels", func(t *testing.T) { - ipToPod := &sync.Map{} - podToWorkloadAndNamespace := &sync.Map{} - workloadAndNamespaceToLabels := &sync.Map{} - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - Labels: map[string]string{ - "label1": "value1", - "label2": "value2", - }, - OwnerReferences: []metav1.OwnerReference{ - { - Kind: "ReplicaSet", - Name: "testDeployment-5d68bc5f49", - }, - }, - }, - Status: corev1.PodStatus{ - HostIP: "5.6.7.8", - }, - Spec: corev1.PodSpec{ - HostNetwork: true, - Containers: []corev1.Container{ - { - Ports: []corev1.ContainerPort{ - {HostPort: 8080}, - }, - }, - }, - }, - } - - // add the pod - poWatcher := newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels, map[string]int{}) - poWatcher.onAddOrUpdatePod(pod, nil) - - // Test the mappings in ipToPod - if podName, ok := ipToPod.Load("5.6.7.8:8080"); !ok && podName != "testPod" { - t.Errorf("ipToPod[%s] was incorrect, got: %s, want: %s.", "5.6.7.8:8080", podName, "testPod") - } - - // Test the mapping in workloadAndNamespaceToLabels - labels, _ := workloadAndNamespaceToLabels.Load("testDeployment@testNamespace") - expectedLabels := []string{"label1=value1", "label2=value2"} - for _, label := range expectedLabels { - if !labels.(mapset.Set[string]).Contains(label) { - t.Errorf("deploymentAndNamespaceToLabels was incorrect, got: %v, want: %s.", labels, label) - } - } - - pod2 := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - Labels: map[string]string{ - "label1": "value1", - "label2": "value2", - "label3": "value3", - }, - OwnerReferences: []metav1.OwnerReference{ - { - Kind: "ReplicaSet", - Name: "testDeployment-5d68bc5f49", - }, - }, - }, - Status: corev1.PodStatus{ - PodIP: "1.2.3.4", - HostIP: "5.6.7.8", - }, - Spec: corev1.PodSpec{ - HostNetwork: true, - Containers: []corev1.Container{ - { - Ports: []corev1.ContainerPort{ - {HostPort: 8080}, - }, - }, - }, - }, - } - - // add the pod - poWatcher.onAddOrUpdatePod(pod2, pod) - - // Test the mappings in ipToPod - if podName, ok := ipToPod.Load("5.6.7.8:8080"); !ok && podName != "testPod" { - t.Errorf("ipToPod[%s] was incorrect, got: %s, want: %s.", "5.6.7.8:8080", podName, "testPod") - } - - if podName, ok := ipToPod.Load("1.2.3.4"); !ok && podName != "testPod" { - t.Errorf("ipToPod[%s] was incorrect, got: %s, want: %s.", "1.2.3.4", podName, "testPod") - } - // Test the mapping in workloadAndNamespaceToLabels - labels, _ = workloadAndNamespaceToLabels.Load("testDeployment@testNamespace") - expectedLabels = []string{"label1=value1", "label2=value2", "label3=value3"} - for _, label := range expectedLabels { - if !labels.(mapset.Set[string]).Contains(label) { - t.Errorf("workloadAndNamespaceToLabels was incorrect, got: %v, want: %s.", labels, label) - } - } - }) -} - -func TestOnDeletePod(t *testing.T) { - t.Run("pod with both PodIP and HostIP", func(t *testing.T) { - ipToPod := &sync.Map{} - podToWorkloadAndNamespace := &sync.Map{} - workloadAndNamespaceToLabels := &sync.Map{} - workloadPodCount := map[string]int{} - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - OwnerReferences: []metav1.OwnerReference{ - { - Kind: "ReplicaSet", - Name: "testDeployment-xyz", - }, - }, - }, - Status: corev1.PodStatus{ - PodIP: "1.2.3.4", - HostIP: "5.6.7.8", - }, - } - - // Assume the pod has already been added - ipToPod.Store(pod.Status.PodIP, pod.Name) - ipToPod.Store(pod.Status.HostIP, pod.Name) - podToWorkloadAndNamespace.Store(pod.Name, "testDeployment@testNamespace") - workloadAndNamespaceToLabels.Store("testDeployment@testNamespace", "testLabels") - workloadPodCount["testDeployment@testNamespace"] = 1 - - poWatcher := newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels, workloadPodCount) - poWatcher.onDeletePod(pod) - - // Test if the entries in ipToPod and podToWorkloadAndNamespace have been deleted - if _, ok := ipToPod.Load("1.2.3.4"); ok { - t.Errorf("ipToPod deletion was incorrect, key: %s still exists", "1.2.3.4") - } - - if _, ok := podToWorkloadAndNamespace.Load("testPod"); ok { - t.Errorf("podToWorkloadAndNamespace deletion was incorrect, key: %s still exists", "testPod") - } - - // Test if the count in workloadPodCount has been decremented and the entry in workloadAndNamespaceToLabels has been deleted - if count := workloadPodCount["testDeployment@testNamespace"]; count != 0 { - t.Errorf("workloadPodCount was incorrect, got: %d, want: %d.", count, 0) - } - - if _, ok := workloadAndNamespaceToLabels.Load("testDeployment@testNamespace"); ok { - t.Errorf("workloadAndNamespaceToLabels deletion was incorrect, key: %s still exists", "testDeployment@testNamespace") - } - }) - - t.Run("pod with only HostIP and some network ports", func(t *testing.T) { - ipToPod := &sync.Map{} - podToWorkloadAndNamespace := &sync.Map{} - workloadAndNamespaceToLabels := &sync.Map{} - workloadPodCount := map[string]int{} - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - OwnerReferences: []metav1.OwnerReference{ - { - Kind: "ReplicaSet", - Name: "testDeployment-xyz", - }, - }, - }, - Status: corev1.PodStatus{ - HostIP: "5.6.7.8", - }, - Spec: corev1.PodSpec{ - HostNetwork: true, - Containers: []corev1.Container{ - { - Ports: []corev1.ContainerPort{ - { - HostPort: int32(8080), - }, - }, - }, - }, - }, - } - - // Assume the pod has already been added - ipToPod.Store(pod.Status.HostIP, pod.Name) - ipToPod.Store(pod.Status.HostIP+":8080", pod.Name) - podToWorkloadAndNamespace.Store(pod.Name, "testDeployment@testNamespace") - workloadAndNamespaceToLabels.Store("testDeployment@testNamespace", "testLabels") - workloadPodCount["testDeployment@testNamespace"] = 1 - - poWatcher := newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels, workloadPodCount) - poWatcher.onDeletePod(pod) - - // Test if the entries in ipToPod and podToWorkloadAndNamespace have been deleted - if _, ok := ipToPod.Load("5.6.7.8:8080"); ok { - t.Errorf("ipToPod deletion was incorrect, key: %s still exists", "5.6.7.8:8080") - } - - if _, ok := podToWorkloadAndNamespace.Load("testPod"); ok { - t.Errorf("podToDeploymentAndNamespace deletion was incorrect, key: %s still exists", "testPod") - } - - // Test if the count in workloadPodCount has been decremented and the entry in workloadAndNamespaceToLabels has been deleted - if count := workloadPodCount["testDeployment@testNamespace"]; count != 0 { - t.Errorf("workloadPodCount was incorrect, got: %d, want: %d.", count, 0) - } - - if _, ok := workloadAndNamespaceToLabels.Load("testDeployment@testNamespace"); ok { - t.Errorf("workloadAndNamespaceToLabels deletion was incorrect, key: %s still exists", "testDeployment@testNamespace") - } - }) -} - func TestEksResolver(t *testing.T) { logger, _ := zap.NewProduction() ctx := context.Background() @@ -1064,230 +389,3 @@ func TestK8sResourceAttributesResolverOnK8SOnPrem(t *testing.T) { }) } } - -func TestExtractIPPort(t *testing.T) { - // Test valid IP:Port - ip, port, ok := extractIPPort("192.0.2.0:8080") - assert.Equal(t, "192.0.2.0", ip) - assert.Equal(t, "8080", port) - assert.True(t, ok) - - // Test invalid IP:Port - ip, port, ok = extractIPPort("192.0.2:8080") - assert.Equal(t, "", ip) - assert.Equal(t, "", port) - assert.False(t, ok) - - // Test IP only - ip, port, ok = extractIPPort("192.0.2.0") - assert.Equal(t, "", ip) - assert.Equal(t, "", port) - assert.False(t, ok) -} - -func TestFilterPodIPFields(t *testing.T) { - meta := metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - Labels: map[string]string{ - "name": "app", - }, - } - pod := &corev1.Pod{ - ObjectMeta: meta, - Spec: corev1.PodSpec{ - HostNetwork: true, - Containers: []corev1.Container{ - {}, - }, - }, - Status: corev1.PodStatus{}, - } - newPod, err := minimizePod(pod) - assert.Nil(t, err) - assert.Empty(t, getHostNetworkPorts(newPod.(*corev1.Pod))) - - podStatus := corev1.PodStatus{ - PodIP: "192.168.0.12", - HostIPs: []corev1.HostIP{ - { - IP: "132.168.3.12", - }, - }, - } - pod = &corev1.Pod{ - ObjectMeta: meta, - Spec: corev1.PodSpec{ - HostNetwork: true, - Containers: []corev1.Container{ - { - Ports: []corev1.ContainerPort{ - {HostPort: 8080}, - }, - }, - }, - }, - Status: podStatus, - } - newPod, err = minimizePod(pod) - assert.Nil(t, err) - assert.Equal(t, "app", newPod.(*corev1.Pod).Labels["name"]) - assert.Equal(t, []string{"8080"}, getHostNetworkPorts(newPod.(*corev1.Pod))) - assert.Equal(t, podStatus, newPod.(*corev1.Pod).Status) - - pod = &corev1.Pod{ - Spec: corev1.PodSpec{ - HostNetwork: true, - Containers: []corev1.Container{ - { - Ports: []corev1.ContainerPort{ - {HostPort: 8080}, - {HostPort: 8081}, - }, - }, - }, - }, - Status: podStatus, - } - newPod, err = minimizePod(pod) - assert.Nil(t, err) - assert.Equal(t, []string{"8080", "8081"}, getHostNetworkPorts(newPod.(*corev1.Pod))) - assert.Equal(t, podStatus, newPod.(*corev1.Pod).Status) -} - -func TestFilterServiceIPFields(t *testing.T) { - meta := metav1.ObjectMeta{ - Name: "test", - Namespace: "default", - } - svc := &corev1.Service{ - ObjectMeta: meta, - Spec: corev1.ServiceSpec{ - Selector: map[string]string{ - "name": "app", - }, - ClusterIP: "10.0.12.4", - }, - } - newSvc, err := minimizeService(svc) - assert.Nil(t, err) - assert.Equal(t, "10.0.12.4", newSvc.(*corev1.Service).Spec.ClusterIP) - assert.Equal(t, "app", newSvc.(*corev1.Service).Spec.Selector["name"]) -} - -func TestHandlePodUpdate(t *testing.T) { - testCases := []struct { - name string - oldPod *corev1.Pod - newPod *corev1.Pod - initialIPToPod map[string]string - expectedIPToPod map[string]string - }{ - { - name: "Old and New Pod Do Not Use Host Network, Different Pod IPs", - oldPod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "mypod", - }, - Status: corev1.PodStatus{ - PodIP: "10.0.0.3", - }, - Spec: corev1.PodSpec{ - HostNetwork: false, - }, - }, - newPod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "mypod", - }, - Status: corev1.PodStatus{ - PodIP: "10.0.0.4", - }, - Spec: corev1.PodSpec{ - HostNetwork: false, - }, - }, - initialIPToPod: map[string]string{ - "10.0.0.3": "mypod", - }, - expectedIPToPod: map[string]string{ - "10.0.0.4": "mypod", - }, - }, - { - name: "Old Pod Has Empty PodIP, New Pod Does Not Use Host Network, Non-Empty Pod IP", - oldPod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "mypod", - }, - Status: corev1.PodStatus{ - PodIP: "", - }, - Spec: corev1.PodSpec{ - HostNetwork: false, - }, - }, - newPod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "mypod", - }, - Status: corev1.PodStatus{ - PodIP: "10.0.0.5", - }, - Spec: corev1.PodSpec{ - HostNetwork: false, - }, - }, - initialIPToPod: map[string]string{}, - expectedIPToPod: map[string]string{ - "10.0.0.5": "mypod", - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ipToPod := &sync.Map{} - // Initialize ipToPod map - for k, v := range tc.initialIPToPod { - ipToPod.Store(k, v) - } - poWatcher := newPodWatcherForTesting(ipToPod, nil, nil, map[string]int{}) - poWatcher.handlePodUpdate(tc.newPod, tc.oldPod) - - // Now validate that ipToPod map has been updated correctly - for key, expectedValue := range tc.expectedIPToPod { - val, ok := ipToPod.Load(key) - if !ok || val.(string) != expectedValue { - t.Errorf("Expected record for %v to be %v, got %v", key, expectedValue, val) - } - } - // Validate that old keys have been removed - for key := range tc.initialIPToPod { - if _, ok := tc.expectedIPToPod[key]; !ok { - if _, ok := ipToPod.Load(key); ok { - t.Errorf("Expected record for %v to be removed, but it was not", key) - } - } - } - }) - } -} - -func newServiceWatcherForTesting(ipToServiceAndNamespace, serviceAndNamespaceToSelectors *sync.Map) *serviceWatcher { - logger, _ := zap.NewDevelopment() - return &serviceWatcher{ipToServiceAndNamespace, serviceAndNamespaceToSelectors, logger, nil, nil} -} - -func newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels *sync.Map, workloadPodCount map[string]int) *podWatcher { - logger, _ := zap.NewDevelopment() - return &podWatcher{ - ipToPod: ipToPod, - podToWorkloadAndNamespace: podToWorkloadAndNamespace, - workloadAndNamespaceToLabels: workloadAndNamespaceToLabels, - workloadPodCount: workloadPodCount, - logger: logger, - informer: nil, - deleter: mockDeleter, - } -} diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils.go b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils.go index 32befd4c79..11bc68a3b0 100644 --- a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils.go +++ b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils.go @@ -10,6 +10,8 @@ import ( "regexp" "strconv" "strings" + "sync" + "time" corev1 "k8s.io/api/core/v1" ) @@ -140,3 +142,38 @@ func isIP(ipString string) bool { ip := net.ParseIP(ipString) return ip != nil } + +// a safe channel which can be closed multiple times +type safeChannel struct { + sync.Mutex + + ch chan struct{} + closed bool +} + +func (sc *safeChannel) Close() { + sc.Lock() + defer sc.Unlock() + + if !sc.closed { + close(sc.ch) + sc.closed = true + } +} + +// Deleter represents a type that can delete a key from a map after a certain delay. +type Deleter interface { + DeleteWithDelay(m *sync.Map, key interface{}) +} + +// TimedDeleter deletes a key after a specified delay. +type TimedDeleter struct { + Delay time.Duration +} + +func (td *TimedDeleter) DeleteWithDelay(m *sync.Map, key interface{}) { + go func() { + time.Sleep(td.Delay) + m.Delete(key) + }() +} diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils_test.go b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils_test.go new file mode 100644 index 0000000000..f8552aedab --- /dev/null +++ b/plugins/processors/awsapplicationsignals/internal/resolver/kubernetes_utils_test.go @@ -0,0 +1,229 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package resolver + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// TestAttachNamespace function +func TestAttachNamespace(t *testing.T) { + result := attachNamespace("testResource", "testNamespace") + if result != "testResource@testNamespace" { + t.Errorf("attachNamespace was incorrect, got: %s, want: %s.", result, "testResource@testNamespace") + } +} + +// TestGetServiceAndNamespace function +func TestGetServiceAndNamespace(t *testing.T) { + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testService", + Namespace: "testNamespace", + }, + } + result := getServiceAndNamespace(service) + if result != "testService@testNamespace" { + t.Errorf("getServiceAndNamespace was incorrect, got: %s, want: %s.", result, "testService@testNamespace") + } +} + +// TestExtractResourceAndNamespace function +func TestExtractResourceAndNamespace(t *testing.T) { + // Test normal case + name, namespace := extractResourceAndNamespace("testService@testNamespace") + if name != "testService" || namespace != "testNamespace" { + t.Errorf("extractResourceAndNamespace was incorrect, got: %s and %s, want: %s and %s.", name, namespace, "testService", "testNamespace") + } + + // Test invalid case + name, namespace = extractResourceAndNamespace("invalid") + if name != "" || namespace != "" { + t.Errorf("extractResourceAndNamespace was incorrect, got: %s and %s, want: %s and %s.", name, namespace, "", "") + } +} + +func TestExtractWorkloadNameFromRS(t *testing.T) { + testCases := []struct { + name string + replicaSetName string + want string + shouldErr bool + }{ + { + name: "Valid ReplicaSet Name", + replicaSetName: "my-deployment-5859ffc7ff", + want: "my-deployment", + shouldErr: false, + }, + { + name: "Invalid ReplicaSet Name - No Hyphen", + replicaSetName: "mydeployment5859ffc7ff", + want: "", + shouldErr: true, + }, + { + name: "Invalid ReplicaSet Name - Less Than 10 Suffix Characters", + replicaSetName: "my-deployment-bc2", + want: "", + shouldErr: true, + }, + { + name: "Invalid ReplicaSet Name - More Than 10 Suffix Characters", + replicaSetName: "my-deployment-5859ffc7ffx", + want: "", + shouldErr: true, + }, + { + name: "Invalid ReplicaSet Name - Invalid Characters in Suffix", + replicaSetName: "my-deployment-aeiou12345", + want: "", + shouldErr: true, + }, + { + name: "Invalid ReplicaSet Name - Empty String", + replicaSetName: "", + want: "", + shouldErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got, err := extractWorkloadNameFromRS(tc.replicaSetName) + + if (err != nil) != tc.shouldErr { + t.Errorf("extractWorkloadNameFromRS() error = %v, wantErr %v", err, tc.shouldErr) + return + } + + if got != tc.want { + t.Errorf("extractWorkloadNameFromRS() = %v, want %v", got, tc.want) + } + }) + } +} + +func TestExtractWorkloadNameFromPodName(t *testing.T) { + testCases := []struct { + name string + podName string + want string + shouldErr bool + }{ + { + name: "Valid Pod Name", + podName: "my-replicaset-bc24f", + want: "my-replicaset", + shouldErr: false, + }, + { + name: "Invalid Pod Name - No Hyphen", + podName: "myreplicasetbc24f", + want: "", + shouldErr: true, + }, + { + name: "Invalid Pod Name - Less Than 5 Suffix Characters", + podName: "my-replicaset-bc2", + want: "", + shouldErr: true, + }, + { + name: "Invalid Pod Name - More Than 5 Suffix Characters", + podName: "my-replicaset-bc24f5", + want: "", + shouldErr: true, + }, + { + name: "Invalid Pod Name - Empty String", + podName: "", + want: "", + shouldErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got, err := extractWorkloadNameFromPodName(tc.podName) + + if (err != nil) != tc.shouldErr { + t.Errorf("extractWorkloadNameFromPodName() error = %v, wantErr %v", err, tc.shouldErr) + return + } + + if got != tc.want { + t.Errorf("extractWorkloadNameFromPodName() = %v, want %v", got, tc.want) + } + }) + } +} + +// TestGetWorkloadAndNamespace function +func TestGetWorkloadAndNamespace(t *testing.T) { + // Test ReplicaSet case + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPod", + Namespace: "testNamespace", + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "testDeployment-5d68bc5f49", + }, + }, + }, + } + result := getWorkloadAndNamespace(pod) + if result != "testDeployment@testNamespace" { + t.Errorf("getDeploymentAndNamespace was incorrect, got: %s, want: %s.", result, "testDeployment@testNamespace") + } + + // Test StatefulSet case + pod.ObjectMeta.OwnerReferences[0].Kind = "StatefulSet" + pod.ObjectMeta.OwnerReferences[0].Name = "testStatefulSet" + result = getWorkloadAndNamespace(pod) + if result != "testStatefulSet@testNamespace" { + t.Errorf("getWorkloadAndNamespace was incorrect, got: %s, want: %s.", result, "testStatefulSet@testNamespace") + } + + // Test Other case + pod.ObjectMeta.OwnerReferences[0].Kind = "Other" + pod.ObjectMeta.OwnerReferences[0].Name = "testOther" + result = getWorkloadAndNamespace(pod) + if result != "" { + t.Errorf("getWorkloadAndNamespace was incorrect, got: %s, want: %s.", result, "") + } + + // Test no OwnerReferences case + pod.ObjectMeta.OwnerReferences = nil + result = getWorkloadAndNamespace(pod) + if result != "" { + t.Errorf("getWorkloadAndNamespace was incorrect, got: %s, want: %s.", result, "") + } +} + +func TestExtractIPPort(t *testing.T) { + // Test valid IP:Port + ip, port, ok := extractIPPort("192.0.2.0:8080") + assert.Equal(t, "192.0.2.0", ip) + assert.Equal(t, "8080", port) + assert.True(t, ok) + + // Test invalid IP:Port + ip, port, ok = extractIPPort("192.0.2:8080") + assert.Equal(t, "", ip) + assert.Equal(t, "", port) + assert.False(t, ok) + + // Test IP only + ip, port, ok = extractIPPort("192.0.2.0") + assert.Equal(t, "", ip) + assert.Equal(t, "", port) + assert.False(t, ok) +} diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/podwatcher.go b/plugins/processors/awsapplicationsignals/internal/resolver/podwatcher.go new file mode 100644 index 0000000000..97b5d0a978 --- /dev/null +++ b/plugins/processors/awsapplicationsignals/internal/resolver/podwatcher.go @@ -0,0 +1,198 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package resolver + +import ( + "sync" + + mapset "github.com/deckarep/golang-set/v2" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +func (p *podWatcher) removeHostNetworkRecords(pod *corev1.Pod) { + for _, port := range getHostNetworkPorts(pod) { + p.deleter.DeleteWithDelay(p.ipToPod, pod.Status.HostIP+":"+port) + } +} + +func (p *podWatcher) handlePodAdd(pod *corev1.Pod) { + if pod.Spec.HostNetwork && pod.Status.HostIP != "" { + for _, port := range getHostNetworkPorts(pod) { + p.ipToPod.Store(pod.Status.HostIP+":"+port, pod.Name) + } + } + if pod.Status.PodIP != "" { + p.ipToPod.Store(pod.Status.PodIP, pod.Name) + } +} + +func (p *podWatcher) handlePodUpdate(newPod *corev1.Pod, oldPod *corev1.Pod) { + // HostNetwork is an immutable field + if newPod.Spec.HostNetwork && oldPod.Status.HostIP != newPod.Status.HostIP { + if oldPod.Status.HostIP != "" { + p.logger.Debug("deleting host ip from cache", zap.String("hostNetwork", oldPod.Status.HostIP)) + p.removeHostNetworkRecords(oldPod) + } + if newPod.Status.HostIP != "" { + for _, port := range getHostNetworkPorts(newPod) { + p.ipToPod.Store(newPod.Status.HostIP+":"+port, newPod.Name) + } + } + } + if oldPod.Status.PodIP != newPod.Status.PodIP { + if oldPod.Status.PodIP != "" { + p.logger.Debug("deleting pod ip from cache", zap.String("podNetwork", oldPod.Status.PodIP)) + p.deleter.DeleteWithDelay(p.ipToPod, oldPod.Status.PodIP) + } + if newPod.Status.PodIP != "" { + p.ipToPod.Store(newPod.Status.PodIP, newPod.Name) + } + } +} + +func (p *podWatcher) onAddOrUpdatePod(pod, oldPod *corev1.Pod) { + if oldPod == nil { + p.handlePodAdd(pod) + } else { + p.handlePodUpdate(pod, oldPod) + } + + workloadAndNamespace := getWorkloadAndNamespace(pod) + + if workloadAndNamespace != "" { + p.podToWorkloadAndNamespace.Store(pod.Name, workloadAndNamespace) + podLabels := mapset.NewSet[string]() + for key, value := range pod.ObjectMeta.Labels { + podLabels.Add(key + "=" + value) + } + if podLabels.Cardinality() > 0 { + p.workloadAndNamespaceToLabels.Store(workloadAndNamespace, podLabels) + } + if oldPod == nil { + p.workloadPodCount[workloadAndNamespace]++ + p.logger.Debug("Added pod", zap.String("pod", pod.Name), zap.String("workload", workloadAndNamespace), zap.Int("count", p.workloadPodCount[workloadAndNamespace])) + } + } +} + +func (p *podWatcher) onDeletePod(obj interface{}) { + pod := obj.(*corev1.Pod) + if pod.Spec.HostNetwork && pod.Status.HostIP != "" { + p.logger.Debug("deleting host ip from cache", zap.String("hostNetwork", pod.Status.HostIP)) + p.removeHostNetworkRecords(pod) + } + if pod.Status.PodIP != "" { + p.logger.Debug("deleting pod ip from cache", zap.String("podNetwork", pod.Status.PodIP)) + p.deleter.DeleteWithDelay(p.ipToPod, pod.Status.PodIP) + } + + if workloadKey, ok := p.podToWorkloadAndNamespace.Load(pod.Name); ok { + workloadAndNamespace := workloadKey.(string) + p.workloadPodCount[workloadAndNamespace]-- + p.logger.Debug("decrementing pod count", zap.String("workload", workloadAndNamespace), zap.Int("podCount", p.workloadPodCount[workloadAndNamespace])) + if p.workloadPodCount[workloadAndNamespace] == 0 { + p.deleter.DeleteWithDelay(p.workloadAndNamespaceToLabels, workloadAndNamespace) + } + } else { + p.logger.Error("failed to load pod workloadKey", zap.String("pod", pod.Name)) + } + p.deleter.DeleteWithDelay(p.podToWorkloadAndNamespace, pod.Name) +} + +type podWatcher struct { + ipToPod *sync.Map + podToWorkloadAndNamespace *sync.Map + workloadAndNamespaceToLabels *sync.Map + workloadPodCount map[string]int + logger *zap.Logger + informer cache.SharedIndexInformer + deleter Deleter +} + +func newPodWatcher(logger *zap.Logger, informer cache.SharedIndexInformer, deleter Deleter) *podWatcher { + return &podWatcher{ + ipToPod: &sync.Map{}, + podToWorkloadAndNamespace: &sync.Map{}, + workloadAndNamespaceToLabels: &sync.Map{}, + workloadPodCount: make(map[string]int), + logger: logger, + informer: informer, + deleter: deleter, + } +} + +func (p *podWatcher) run(stopCh chan struct{}) { + p.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*corev1.Pod) + p.logger.Debug("list and watch for pod: ADD " + pod.Name) + p.onAddOrUpdatePod(pod, nil) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + pod := newObj.(*corev1.Pod) + oldPod := oldObj.(*corev1.Pod) + p.logger.Debug("list and watch for pods: UPDATE " + pod.Name) + p.onAddOrUpdatePod(pod, oldPod) + }, + DeleteFunc: func(obj interface{}) { + pod := obj.(*corev1.Pod) + p.logger.Debug("list and watch for pods: DELETE " + pod.Name) + p.onDeletePod(obj) + }, + }) + + go p.informer.Run(stopCh) + +} + +func (p *podWatcher) waitForCacheSync(stopCh chan struct{}) { + if !cache.WaitForNamedCacheSync("podWatcher", stopCh, p.informer.HasSynced) { + p.logger.Fatal("timed out waiting for kubernetes pod watcher caches to sync") + } + + p.logger.Info("podWatcher: Cache synced") +} + +// minimizePod removes fields that could contain large objects, and retain essential +// fields needed for IP/name translation. The following fields must be kept: +// - ObjectMeta: Namespace, Name, Labels, OwnerReference +// - Spec: HostNetwork, ContainerPorts +// - Status: PodIP/s, HostIP/s +func minimizePod(obj interface{}) (interface{}, error) { + if pod, ok := obj.(*corev1.Pod); ok { + pod.Annotations = nil + pod.Finalizers = nil + pod.ManagedFields = nil + + pod.Spec.Volumes = nil + pod.Spec.InitContainers = nil + pod.Spec.EphemeralContainers = nil + pod.Spec.ImagePullSecrets = nil + pod.Spec.HostAliases = nil + pod.Spec.SchedulingGates = nil + pod.Spec.ResourceClaims = nil + pod.Spec.Tolerations = nil + pod.Spec.Affinity = nil + + pod.Status.InitContainerStatuses = nil + pod.Status.ContainerStatuses = nil + pod.Status.EphemeralContainerStatuses = nil + + for i := 0; i < len(pod.Spec.Containers); i++ { + c := &pod.Spec.Containers[i] + c.Image = "" + c.Command = nil + c.Args = nil + c.EnvFrom = nil + c.Env = nil + c.Resources = corev1.ResourceRequirements{} + c.VolumeMounts = nil + c.VolumeDevices = nil + c.SecurityContext = nil + } + } + return obj, nil +} diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/podwatcher_test.go b/plugins/processors/awsapplicationsignals/internal/resolver/podwatcher_test.go new file mode 100644 index 0000000000..02b45a1f42 --- /dev/null +++ b/plugins/processors/awsapplicationsignals/internal/resolver/podwatcher_test.go @@ -0,0 +1,517 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package resolver + +import ( + "sync" + "testing" + + mapset "github.com/deckarep/golang-set/v2" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels *sync.Map, workloadPodCount map[string]int) *podWatcher { + logger, _ := zap.NewDevelopment() + return &podWatcher{ + ipToPod: ipToPod, + podToWorkloadAndNamespace: podToWorkloadAndNamespace, + workloadAndNamespaceToLabels: workloadAndNamespaceToLabels, + workloadPodCount: workloadPodCount, + logger: logger, + informer: nil, + deleter: mockDeleter, + } +} + +func TestOnAddOrUpdatePod(t *testing.T) { + t.Run("pod with both PodIP and HostIP", func(t *testing.T) { + ipToPod := &sync.Map{} + podToWorkloadAndNamespace := &sync.Map{} + workloadAndNamespaceToLabels := &sync.Map{} + workloadPodCount := map[string]int{} + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPod", + Namespace: "testNamespace", + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "testDeployment-598b89cd8d", + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "1.2.3.4", + HostIP: "5.6.7.8", + }, + } + + poWatcher := newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels, workloadPodCount) + poWatcher.onAddOrUpdatePod(pod, nil) + + // Test the mappings in ipToPod + if podName, _ := ipToPod.Load("1.2.3.4"); podName != "testPod" { + t.Errorf("ipToPod was incorrect, got: %s, want: %s.", podName, "testPod") + } + + // Test the mapping in podToWorkloadAndNamespace + if depAndNamespace, _ := podToWorkloadAndNamespace.Load("testPod"); depAndNamespace != "testDeployment@testNamespace" { + t.Errorf("podToWorkloadAndNamespace was incorrect, got: %s, want: %s.", depAndNamespace, "testDeployment@testNamespace") + } + + // Test the count in workloadPodCount + if count := workloadPodCount["testDeployment@testNamespace"]; count != 1 { + t.Errorf("workloadPodCount was incorrect, got: %d, want: %d.", count, 1) + } + }) + + t.Run("pod with only HostIP", func(t *testing.T) { + ipToPod := &sync.Map{} + podToWorkloadAndNamespace := &sync.Map{} + workloadAndNamespaceToLabels := &sync.Map{} + workloadPodCount := map[string]int{} + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPod", + Namespace: "testNamespace", + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "testDeployment-7b74958fb8", + }, + }, + }, + Status: corev1.PodStatus{ + HostIP: "5.6.7.8", + }, + Spec: corev1.PodSpec{ + HostNetwork: true, + Containers: []corev1.Container{ + { + Ports: []corev1.ContainerPort{ + { + HostPort: int32(8080), + }, + }, + }, + }, + }, + } + + poWatcher := newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels, workloadPodCount) + poWatcher.onAddOrUpdatePod(pod, nil) + + // Test the mappings in ipToPod + if podName, _ := ipToPod.Load("5.6.7.8:8080"); podName != "testPod" { + t.Errorf("ipToPod was incorrect, got: %s, want: %s.", podName, "testPod") + } + + // Test the mapping in podToWorkloadAndNamespace + if depAndNamespace, _ := podToWorkloadAndNamespace.Load("testPod"); depAndNamespace != "testDeployment@testNamespace" { + t.Errorf("podToWorkloadAndNamespace was incorrect, got: %s, want: %s.", depAndNamespace, "testDeployment@testNamespace") + } + + // Test the count in workloadPodCount + if count := workloadPodCount["testDeployment@testNamespace"]; count != 1 { + t.Errorf("workloadPodCount was incorrect, got: %d, want: %d.", count, 1) + } + }) + + t.Run("pod updated with different set of labels", func(t *testing.T) { + ipToPod := &sync.Map{} + podToWorkloadAndNamespace := &sync.Map{} + workloadAndNamespaceToLabels := &sync.Map{} + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPod", + Namespace: "testNamespace", + Labels: map[string]string{ + "label1": "value1", + "label2": "value2", + }, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "testDeployment-5d68bc5f49", + }, + }, + }, + Status: corev1.PodStatus{ + HostIP: "5.6.7.8", + }, + Spec: corev1.PodSpec{ + HostNetwork: true, + Containers: []corev1.Container{ + { + Ports: []corev1.ContainerPort{ + {HostPort: 8080}, + }, + }, + }, + }, + } + + // add the pod + poWatcher := newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels, map[string]int{}) + poWatcher.onAddOrUpdatePod(pod, nil) + + // Test the mappings in ipToPod + if podName, ok := ipToPod.Load("5.6.7.8:8080"); !ok && podName != "testPod" { + t.Errorf("ipToPod[%s] was incorrect, got: %s, want: %s.", "5.6.7.8:8080", podName, "testPod") + } + + // Test the mapping in workloadAndNamespaceToLabels + labels, _ := workloadAndNamespaceToLabels.Load("testDeployment@testNamespace") + expectedLabels := []string{"label1=value1", "label2=value2"} + for _, label := range expectedLabels { + if !labels.(mapset.Set[string]).Contains(label) { + t.Errorf("deploymentAndNamespaceToLabels was incorrect, got: %v, want: %s.", labels, label) + } + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPod", + Namespace: "testNamespace", + Labels: map[string]string{ + "label1": "value1", + "label2": "value2", + "label3": "value3", + }, + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "testDeployment-5d68bc5f49", + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "1.2.3.4", + HostIP: "5.6.7.8", + }, + Spec: corev1.PodSpec{ + HostNetwork: true, + Containers: []corev1.Container{ + { + Ports: []corev1.ContainerPort{ + {HostPort: 8080}, + }, + }, + }, + }, + } + + // add the pod + poWatcher.onAddOrUpdatePod(pod2, pod) + + // Test the mappings in ipToPod + if podName, ok := ipToPod.Load("5.6.7.8:8080"); !ok && podName != "testPod" { + t.Errorf("ipToPod[%s] was incorrect, got: %s, want: %s.", "5.6.7.8:8080", podName, "testPod") + } + + if podName, ok := ipToPod.Load("1.2.3.4"); !ok && podName != "testPod" { + t.Errorf("ipToPod[%s] was incorrect, got: %s, want: %s.", "1.2.3.4", podName, "testPod") + } + // Test the mapping in workloadAndNamespaceToLabels + labels, _ = workloadAndNamespaceToLabels.Load("testDeployment@testNamespace") + expectedLabels = []string{"label1=value1", "label2=value2", "label3=value3"} + for _, label := range expectedLabels { + if !labels.(mapset.Set[string]).Contains(label) { + t.Errorf("workloadAndNamespaceToLabels was incorrect, got: %v, want: %s.", labels, label) + } + } + }) +} + +func TestOnDeletePod(t *testing.T) { + t.Run("pod with both PodIP and HostIP", func(t *testing.T) { + ipToPod := &sync.Map{} + podToWorkloadAndNamespace := &sync.Map{} + workloadAndNamespaceToLabels := &sync.Map{} + workloadPodCount := map[string]int{} + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPod", + Namespace: "testNamespace", + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "testDeployment-xyz", + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "1.2.3.4", + HostIP: "5.6.7.8", + }, + } + + // Assume the pod has already been added + ipToPod.Store(pod.Status.PodIP, pod.Name) + ipToPod.Store(pod.Status.HostIP, pod.Name) + podToWorkloadAndNamespace.Store(pod.Name, "testDeployment@testNamespace") + workloadAndNamespaceToLabels.Store("testDeployment@testNamespace", "testLabels") + workloadPodCount["testDeployment@testNamespace"] = 1 + + poWatcher := newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels, workloadPodCount) + poWatcher.onDeletePod(pod) + + // Test if the entries in ipToPod and podToWorkloadAndNamespace have been deleted + if _, ok := ipToPod.Load("1.2.3.4"); ok { + t.Errorf("ipToPod deletion was incorrect, key: %s still exists", "1.2.3.4") + } + + if _, ok := podToWorkloadAndNamespace.Load("testPod"); ok { + t.Errorf("podToWorkloadAndNamespace deletion was incorrect, key: %s still exists", "testPod") + } + + // Test if the count in workloadPodCount has been decremented and the entry in workloadAndNamespaceToLabels has been deleted + if count := workloadPodCount["testDeployment@testNamespace"]; count != 0 { + t.Errorf("workloadPodCount was incorrect, got: %d, want: %d.", count, 0) + } + + if _, ok := workloadAndNamespaceToLabels.Load("testDeployment@testNamespace"); ok { + t.Errorf("workloadAndNamespaceToLabels deletion was incorrect, key: %s still exists", "testDeployment@testNamespace") + } + }) + + t.Run("pod with only HostIP and some network ports", func(t *testing.T) { + ipToPod := &sync.Map{} + podToWorkloadAndNamespace := &sync.Map{} + workloadAndNamespaceToLabels := &sync.Map{} + workloadPodCount := map[string]int{} + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testPod", + Namespace: "testNamespace", + OwnerReferences: []metav1.OwnerReference{ + { + Kind: "ReplicaSet", + Name: "testDeployment-xyz", + }, + }, + }, + Status: corev1.PodStatus{ + HostIP: "5.6.7.8", + }, + Spec: corev1.PodSpec{ + HostNetwork: true, + Containers: []corev1.Container{ + { + Ports: []corev1.ContainerPort{ + { + HostPort: int32(8080), + }, + }, + }, + }, + }, + } + + // Assume the pod has already been added + ipToPod.Store(pod.Status.HostIP, pod.Name) + ipToPod.Store(pod.Status.HostIP+":8080", pod.Name) + podToWorkloadAndNamespace.Store(pod.Name, "testDeployment@testNamespace") + workloadAndNamespaceToLabels.Store("testDeployment@testNamespace", "testLabels") + workloadPodCount["testDeployment@testNamespace"] = 1 + + poWatcher := newPodWatcherForTesting(ipToPod, podToWorkloadAndNamespace, workloadAndNamespaceToLabels, workloadPodCount) + poWatcher.onDeletePod(pod) + + // Test if the entries in ipToPod and podToWorkloadAndNamespace have been deleted + if _, ok := ipToPod.Load("5.6.7.8:8080"); ok { + t.Errorf("ipToPod deletion was incorrect, key: %s still exists", "5.6.7.8:8080") + } + + if _, ok := podToWorkloadAndNamespace.Load("testPod"); ok { + t.Errorf("podToDeploymentAndNamespace deletion was incorrect, key: %s still exists", "testPod") + } + + // Test if the count in workloadPodCount has been decremented and the entry in workloadAndNamespaceToLabels has been deleted + if count := workloadPodCount["testDeployment@testNamespace"]; count != 0 { + t.Errorf("workloadPodCount was incorrect, got: %d, want: %d.", count, 0) + } + + if _, ok := workloadAndNamespaceToLabels.Load("testDeployment@testNamespace"); ok { + t.Errorf("workloadAndNamespaceToLabels deletion was incorrect, key: %s still exists", "testDeployment@testNamespace") + } + }) +} + +func TestHandlePodUpdate(t *testing.T) { + testCases := []struct { + name string + oldPod *corev1.Pod + newPod *corev1.Pod + initialIPToPod map[string]string + expectedIPToPod map[string]string + }{ + { + name: "Old and New Pod Do Not Use Host Network, Different Pod IPs", + oldPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mypod", + }, + Status: corev1.PodStatus{ + PodIP: "10.0.0.3", + }, + Spec: corev1.PodSpec{ + HostNetwork: false, + }, + }, + newPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mypod", + }, + Status: corev1.PodStatus{ + PodIP: "10.0.0.4", + }, + Spec: corev1.PodSpec{ + HostNetwork: false, + }, + }, + initialIPToPod: map[string]string{ + "10.0.0.3": "mypod", + }, + expectedIPToPod: map[string]string{ + "10.0.0.4": "mypod", + }, + }, + { + name: "Old Pod Has Empty PodIP, New Pod Does Not Use Host Network, Non-Empty Pod IP", + oldPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mypod", + }, + Status: corev1.PodStatus{ + PodIP: "", + }, + Spec: corev1.PodSpec{ + HostNetwork: false, + }, + }, + newPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mypod", + }, + Status: corev1.PodStatus{ + PodIP: "10.0.0.5", + }, + Spec: corev1.PodSpec{ + HostNetwork: false, + }, + }, + initialIPToPod: map[string]string{}, + expectedIPToPod: map[string]string{ + "10.0.0.5": "mypod", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ipToPod := &sync.Map{} + // Initialize ipToPod map + for k, v := range tc.initialIPToPod { + ipToPod.Store(k, v) + } + poWatcher := newPodWatcherForTesting(ipToPod, nil, nil, map[string]int{}) + poWatcher.handlePodUpdate(tc.newPod, tc.oldPod) + + // Now validate that ipToPod map has been updated correctly + for key, expectedValue := range tc.expectedIPToPod { + val, ok := ipToPod.Load(key) + if !ok || val.(string) != expectedValue { + t.Errorf("Expected record for %v to be %v, got %v", key, expectedValue, val) + } + } + // Validate that old keys have been removed + for key := range tc.initialIPToPod { + if _, ok := tc.expectedIPToPod[key]; !ok { + if _, ok := ipToPod.Load(key); ok { + t.Errorf("Expected record for %v to be removed, but it was not", key) + } + } + } + }) + } +} + +func TestFilterPodIPFields(t *testing.T) { + meta := metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + Labels: map[string]string{ + "name": "app", + }, + } + pod := &corev1.Pod{ + ObjectMeta: meta, + Spec: corev1.PodSpec{ + HostNetwork: true, + Containers: []corev1.Container{ + {}, + }, + }, + Status: corev1.PodStatus{}, + } + newPod, err := minimizePod(pod) + assert.Nil(t, err) + assert.Empty(t, getHostNetworkPorts(newPod.(*corev1.Pod))) + + podStatus := corev1.PodStatus{ + PodIP: "192.168.0.12", + HostIPs: []corev1.HostIP{ + { + IP: "132.168.3.12", + }, + }, + } + pod = &corev1.Pod{ + ObjectMeta: meta, + Spec: corev1.PodSpec{ + HostNetwork: true, + Containers: []corev1.Container{ + { + Ports: []corev1.ContainerPort{ + {HostPort: 8080}, + }, + }, + }, + }, + Status: podStatus, + } + newPod, err = minimizePod(pod) + assert.Nil(t, err) + assert.Equal(t, "app", newPod.(*corev1.Pod).Labels["name"]) + assert.Equal(t, []string{"8080"}, getHostNetworkPorts(newPod.(*corev1.Pod))) + assert.Equal(t, podStatus, newPod.(*corev1.Pod).Status) + + pod = &corev1.Pod{ + Spec: corev1.PodSpec{ + HostNetwork: true, + Containers: []corev1.Container{ + { + Ports: []corev1.ContainerPort{ + {HostPort: 8080}, + {HostPort: 8081}, + }, + }, + }, + }, + Status: podStatus, + } + newPod, err = minimizePod(pod) + assert.Nil(t, err) + assert.Equal(t, []string{"8080", "8081"}, getHostNetworkPorts(newPod.(*corev1.Pod))) + assert.Equal(t, podStatus, newPod.(*corev1.Pod).Status) +} diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/servicetoworkload.go b/plugins/processors/awsapplicationsignals/internal/resolver/servicetoworkload.go new file mode 100644 index 0000000000..6cdc69f69e --- /dev/null +++ b/plugins/processors/awsapplicationsignals/internal/resolver/servicetoworkload.go @@ -0,0 +1,81 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package resolver + +import ( + "sync" + "time" + + mapset "github.com/deckarep/golang-set/v2" + "go.uber.org/zap" +) + +type serviceToWorkloadMapper struct { + serviceAndNamespaceToSelectors *sync.Map + workloadAndNamespaceToLabels *sync.Map + serviceToWorkload *sync.Map + logger *zap.Logger + deleter Deleter +} + +func newServiceToWorkloadMapper(serviceAndNamespaceToSelectors, workloadAndNamespaceToLabels, serviceToWorkload *sync.Map, logger *zap.Logger, deleter Deleter) *serviceToWorkloadMapper { + return &serviceToWorkloadMapper{ + serviceAndNamespaceToSelectors: serviceAndNamespaceToSelectors, + workloadAndNamespaceToLabels: workloadAndNamespaceToLabels, + serviceToWorkload: serviceToWorkload, + logger: logger, + deleter: deleter, + } +} + +func (m *serviceToWorkloadMapper) mapServiceToWorkload() { + m.logger.Debug("Map service to workload at:", zap.Time("time", time.Now())) + + m.serviceAndNamespaceToSelectors.Range(func(key, value interface{}) bool { + var workloads []string + serviceAndNamespace := key.(string) + _, serviceNamespace := extractResourceAndNamespace(serviceAndNamespace) + serviceLabels := value.(mapset.Set[string]) + + m.workloadAndNamespaceToLabels.Range(func(workloadKey, labelsValue interface{}) bool { + labels := labelsValue.(mapset.Set[string]) + workloadAndNamespace := workloadKey.(string) + _, workloadNamespace := extractResourceAndNamespace(workloadAndNamespace) + if workloadNamespace == serviceNamespace && workloadNamespace != "" && serviceLabels.IsSubset(labels) { + m.logger.Debug("Found workload for service", zap.String("service", serviceAndNamespace), zap.String("workload", workloadAndNamespace)) + workloads = append(workloads, workloadAndNamespace) + } + + return true + }) + + if len(workloads) > 1 { + m.logger.Info("Multiple workloads found for service. You will get unexpected results.", zap.String("service", serviceAndNamespace), zap.Strings("workloads", workloads)) + } else if len(workloads) == 1 { + m.serviceToWorkload.Store(serviceAndNamespace, workloads[0]) + } else { + m.logger.Debug("No workload found for service", zap.String("service", serviceAndNamespace)) + m.deleter.DeleteWithDelay(m.serviceToWorkload, serviceAndNamespace) + } + return true + }) +} + +func (m *serviceToWorkloadMapper) Start(stopCh chan struct{}) { + // do the first mapping immediately + m.mapServiceToWorkload() + m.logger.Debug("First-time map service to workload at:", zap.Time("time", time.Now())) + + go func() { + for { + select { + case <-stopCh: + return + case <-time.After(time.Minute + 30*time.Second): + m.mapServiceToWorkload() + m.logger.Debug("Map service to workload at:", zap.Time("time", time.Now())) + } + } + }() +} diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/servicetoworkload_test.go b/plugins/processors/awsapplicationsignals/internal/resolver/servicetoworkload_test.go new file mode 100644 index 0000000000..b2589542e1 --- /dev/null +++ b/plugins/processors/awsapplicationsignals/internal/resolver/servicetoworkload_test.go @@ -0,0 +1,101 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package resolver + +import ( + "sync" + "testing" + "time" + + mapset "github.com/deckarep/golang-set/v2" + "go.uber.org/zap" +) + +func TestMapServiceToWorkload(t *testing.T) { + logger, _ := zap.NewDevelopment() + + serviceAndNamespaceToSelectors := &sync.Map{} + workloadAndNamespaceToLabels := &sync.Map{} + serviceToWorkload := &sync.Map{} + + serviceAndNamespaceToSelectors.Store("service1@namespace1", mapset.NewSet("label1=value1", "label2=value2")) + workloadAndNamespaceToLabels.Store("deployment1@namespace1", mapset.NewSet("label1=value1", "label2=value2", "label3=value3")) + + mapper := newServiceToWorkloadMapper(serviceAndNamespaceToSelectors, workloadAndNamespaceToLabels, serviceToWorkload, logger, mockDeleter) + mapper.mapServiceToWorkload() + + if _, ok := serviceToWorkload.Load("service1@namespace1"); !ok { + t.Errorf("Expected service1@namespace1 to be mapped to a workload, but it was not") + } +} + +func TestMapServiceToWorkload_NoWorkload(t *testing.T) { + logger, _ := zap.NewDevelopment() + + serviceAndNamespaceToSelectors := &sync.Map{} + workloadAndNamespaceToLabels := &sync.Map{} + serviceToWorkload := &sync.Map{} + + // Add a service with no matching workload + serviceAndNamespace := "service@namespace" + serviceAndNamespaceToSelectors.Store(serviceAndNamespace, mapset.NewSet("label1=value1")) + serviceToWorkload.Store(serviceAndNamespace, "workload@namespace") + + mapper := newServiceToWorkloadMapper(serviceAndNamespaceToSelectors, workloadAndNamespaceToLabels, serviceToWorkload, logger, mockDeleter) + mapper.mapServiceToWorkload() + + // Check that the service was deleted from serviceToWorkload + if _, ok := serviceToWorkload.Load(serviceAndNamespace); ok { + t.Errorf("Service was not deleted from serviceToWorkload") + } +} + +func TestMapServiceToWorkload_MultipleWorkloads(t *testing.T) { + logger, _ := zap.NewDevelopment() + + serviceAndNamespaceToSelectors := &sync.Map{} + workloadAndNamespaceToLabels := &sync.Map{} + serviceToWorkload := &sync.Map{} + + serviceAndNamespace := "service@namespace" + serviceAndNamespaceToSelectors.Store(serviceAndNamespace, mapset.NewSet("label1=value1", "label2=value2")) + + // Add two workloads with matching labels to the service + workloadAndNamespaceToLabels.Store("workload1@namespace", mapset.NewSet("label1=value1", "label2=value2", "label3=value3")) + workloadAndNamespaceToLabels.Store("workload2@namespace", mapset.NewSet("label1=value1", "label2=value2", "label4=value4")) + + mapper := newServiceToWorkloadMapper(serviceAndNamespaceToSelectors, workloadAndNamespaceToLabels, serviceToWorkload, logger, mockDeleter) + mapper.mapServiceToWorkload() + + // Check that the service does not map to any workload + if _, ok := serviceToWorkload.Load(serviceAndNamespace); ok { + t.Errorf("Unexpected mapping of service to multiple workloads") + } +} + +func TestStopsWhenSignaled(t *testing.T) { + logger, _ := zap.NewDevelopment() + + serviceAndNamespaceToSelectors := &sync.Map{} + workloadAndNamespaceToLabels := &sync.Map{} + serviceToWorkload := &sync.Map{} + + stopchan := make(chan struct{}) + + // Signal the stopchan to stop after 100 milliseconds + time.AfterFunc(100*time.Millisecond, func() { + close(stopchan) + }) + + mapper := newServiceToWorkloadMapper(serviceAndNamespaceToSelectors, workloadAndNamespaceToLabels, serviceToWorkload, logger, mockDeleter) + + start := time.Now() + mapper.Start(stopchan) + duration := time.Since(start) + + // Check that the function stopped in a reasonable time after the stop signal + if duration > 200*time.Millisecond { + t.Errorf("mapServiceToWorkload did not stop in a reasonable time after the stop signal, duration: %v", duration) + } +} diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher.go b/plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher.go new file mode 100644 index 0000000000..da84309821 --- /dev/null +++ b/plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher.go @@ -0,0 +1,114 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package resolver + +import ( + "sync" + + mapset "github.com/deckarep/golang-set/v2" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" +) + +type serviceWatcher struct { + ipToServiceAndNamespace *sync.Map + serviceAndNamespaceToSelectors *sync.Map + logger *zap.Logger + informer cache.SharedIndexInformer + deleter Deleter +} + +func newServiceWatcher(logger *zap.Logger, informer cache.SharedIndexInformer, deleter Deleter) *serviceWatcher { + return &serviceWatcher{ + ipToServiceAndNamespace: &sync.Map{}, + serviceAndNamespaceToSelectors: &sync.Map{}, + logger: logger, + informer: informer, + deleter: deleter, + } +} + +func (s *serviceWatcher) Run(stopCh chan struct{}) { + s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + service := obj.(*corev1.Service) + s.logger.Debug("list and watch for services: ADD " + service.Name) + s.onAddOrUpdateService(service) + }, + UpdateFunc: func(_, newObj interface{}) { + service := newObj.(*corev1.Service) + s.logger.Debug("list and watch for services: UPDATE " + service.Name) + s.onAddOrUpdateService(service) + }, + DeleteFunc: func(obj interface{}) { + service := obj.(*corev1.Service) + s.logger.Debug("list and watch for services: DELETE " + service.Name) + s.onDeleteService(service, s.deleter) + }, + }) + go s.informer.Run(stopCh) +} + +func (s *serviceWatcher) waitForCacheSync(stopCh chan struct{}) { + if !cache.WaitForNamedCacheSync("serviceWatcher", stopCh, s.informer.HasSynced) { + s.logger.Fatal("timed out waiting for kubernetes service watcher caches to sync") + } + + s.logger.Info("serviceWatcher: Cache synced") +} + +func (s *serviceWatcher) onAddOrUpdateService(service *corev1.Service) { + // service can also have an external IP (or ingress IP) that could be accessed + // this field can be either an IP address (in some edge case) or a hostname (see "EXTERNAL-IP" column in "k get svc" output) + // [ec2-user@ip-172-31-11-104 one-step]$ k get svc -A + // NAMESPACE NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE + // default pet-clinic-frontend ClusterIP 10.100.216.182 8080/TCP 108m + // default vets-service ClusterIP 10.100.62.167 8083/TCP 108m + // default visits-service ClusterIP 10.100.96.5 8082/TCP 108m + // ingress-nginx default-http-backend ClusterIP 10.100.11.231 80/TCP 108m + // ingress-nginx ingress-nginx LoadBalancer 10.100.154.5 aex7997ece08c435dbd2b912fd5aa5bd-5372117830.xxxxx.elb.amazonaws.com 80:32080/TCP,443:32081/TCP,9113:30410/TCP 108m + // kube-system kube-dns ClusterIP 10.100.0.10 + // + // we ignore such case for now and may need to consider it in the future + if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != corev1.ClusterIPNone { + s.ipToServiceAndNamespace.Store(service.Spec.ClusterIP, getServiceAndNamespace(service)) + } + labelSet := mapset.NewSet[string]() + for key, value := range service.Spec.Selector { + labelSet.Add(key + "=" + value) + } + if labelSet.Cardinality() > 0 { + s.serviceAndNamespaceToSelectors.Store(getServiceAndNamespace(service), labelSet) + } +} + +func (s *serviceWatcher) onDeleteService(service *corev1.Service, deleter Deleter) { + if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != corev1.ClusterIPNone { + deleter.DeleteWithDelay(s.ipToServiceAndNamespace, service.Spec.ClusterIP) + } + deleter.DeleteWithDelay(s.serviceAndNamespaceToSelectors, getServiceAndNamespace(service)) +} + +// minimizeService removes fields that could contain large objects, and retain essential +// fields needed for IP/name translation. The following fields must be kept: +// - ObjectMeta: Namespace, Name +// - Spec: Selectors, ClusterIP +func minimizeService(obj interface{}) (interface{}, error) { + if svc, ok := obj.(*corev1.Service); ok { + svc.Annotations = nil + svc.Finalizers = nil + svc.ManagedFields = nil + + svc.Spec.LoadBalancerSourceRanges = nil + svc.Spec.SessionAffinityConfig = nil + svc.Spec.IPFamilies = nil + svc.Spec.IPFamilyPolicy = nil + svc.Spec.InternalTrafficPolicy = nil + svc.Spec.InternalTrafficPolicy = nil + + svc.Status.Conditions = nil + } + return obj, nil +} diff --git a/plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher_test.go b/plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher_test.go new file mode 100644 index 0000000000..9e2bbdeeb0 --- /dev/null +++ b/plugins/processors/awsapplicationsignals/internal/resolver/servicewatcher_test.go @@ -0,0 +1,106 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT + +package resolver + +import ( + "sync" + "testing" + + mapset "github.com/deckarep/golang-set/v2" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func newServiceWatcherForTesting(ipToServiceAndNamespace, serviceAndNamespaceToSelectors *sync.Map) *serviceWatcher { + logger, _ := zap.NewDevelopment() + return &serviceWatcher{ipToServiceAndNamespace, serviceAndNamespaceToSelectors, logger, nil, nil} +} + +func TestOnAddOrUpdateService(t *testing.T) { + // Create a fake service + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myservice", + Namespace: "mynamespace", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "1.2.3.4", + Selector: map[string]string{ + "app": "myapp", + }, + }, + } + + // Create the maps + ipToServiceAndNamespace := &sync.Map{} + serviceAndNamespaceToSelectors := &sync.Map{} + + // Call the function + svcWatcher := newServiceWatcherForTesting(ipToServiceAndNamespace, serviceAndNamespaceToSelectors) + svcWatcher.onAddOrUpdateService(service) + + // Check that the maps contain the expected entries + if _, ok := ipToServiceAndNamespace.Load("1.2.3.4"); !ok { + t.Errorf("ipToServiceAndNamespace does not contain the service IP") + } + if _, ok := serviceAndNamespaceToSelectors.Load("myservice@mynamespace"); !ok { + t.Errorf("serviceAndNamespaceToSelectors does not contain the service") + } +} + +func TestOnDeleteService(t *testing.T) { + // Create a fake service + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "myservice", + Namespace: "mynamespace", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "1.2.3.4", + Selector: map[string]string{ + "app": "myapp", + }, + }, + } + + // Create the maps and add the service to them + ipToServiceAndNamespace := &sync.Map{} + ipToServiceAndNamespace.Store("1.2.3.4", "myservice@mynamespace") + serviceAndNamespaceToSelectors := &sync.Map{} + serviceAndNamespaceToSelectors.Store("myservice@mynamespace", mapset.NewSet("app=myapp")) + + // Call the function + svcWatcher := newServiceWatcherForTesting(ipToServiceAndNamespace, serviceAndNamespaceToSelectors) + svcWatcher.onDeleteService(service, mockDeleter) + + // Check that the maps do not contain the service + if _, ok := ipToServiceAndNamespace.Load("1.2.3.4"); ok { + t.Errorf("ipToServiceAndNamespace still contains the service IP") + } + if _, ok := serviceAndNamespaceToSelectors.Load("myservice@mynamespace"); ok { + t.Errorf("serviceAndNamespaceToSelectors still contains the service") + } +} + +func TestFilterServiceIPFields(t *testing.T) { + meta := metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + } + svc := &corev1.Service{ + ObjectMeta: meta, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "name": "app", + }, + ClusterIP: "10.0.12.4", + }, + } + newSvc, err := minimizeService(svc) + assert.Nil(t, err) + assert.Equal(t, "10.0.12.4", newSvc.(*corev1.Service).Spec.ClusterIP) + assert.Equal(t, "app", newSvc.(*corev1.Service).Spec.Selector["name"]) +}