diff --git a/aggregator/data.go b/aggregator/data.go index 415cef8..cf93f47 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -126,16 +126,8 @@ func (a *Aggregator) processk8s() { a.processPod(d) case k8s.SERVICE: a.processSvc(d) - case k8s.REPLICASET: - a.processReplicaSet(d) - case k8s.DEPLOYMENT: - a.processDeployment(d) - case k8s.ENDPOINTS: - a.processEndpoints(d) case k8s.CONTAINER: a.processContainer(d) - case k8s.DAEMONSET: - a.processDaemonSet(d) default: log.Logger.Warn().Msgf("unknown resource type %s", d.ResourceType) } diff --git a/aggregator/persist.go b/aggregator/persist.go index 62e9b88..d61ec37 100644 --- a/aggregator/persist.go +++ b/aggregator/persist.go @@ -5,7 +5,6 @@ import ( "github.com/ddosify/alaz/k8s" "github.com/ddosify/alaz/log" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" ) @@ -125,81 +124,6 @@ func (a *Aggregator) processSvc(d k8s.K8sResourceMessage) { } } -func (a *Aggregator) persistReplicaSet(dto datastore.ReplicaSet, eventType string) { - err := a.ds.PersistReplicaSet(dto, eventType) - if err != nil { - log.Logger.Error().Err(err).Msgf("error on persistReplicaset call to %s", eventType) - } -} - -func (a *Aggregator) processReplicaSet(d k8s.K8sResourceMessage) { - replicaSet := d.Object.(*appsv1.ReplicaSet) - - var ownerType, ownerID, ownerName string - if len(replicaSet.OwnerReferences) > 0 { - ownerType = replicaSet.OwnerReferences[0].Kind - ownerID = string(replicaSet.OwnerReferences[0].UID) - ownerName = replicaSet.OwnerReferences[0].Name - } else { - log.Logger.Debug().Msgf("ReplicaSet %s/%s has no owner, event: %s", replicaSet.Namespace, replicaSet.Name, d.EventType) - } - - dtoReplicaSet := datastore.ReplicaSet{ - UID: string(replicaSet.UID), - Name: ownerName, - Namespace: replicaSet.Namespace, - OwnerType: ownerType, - OwnerID: ownerID, - OwnerName: ownerName, - Replicas: replicaSet.Status.Replicas, - } - - switch d.EventType { - case k8s.ADD: - go a.persistReplicaSet(dtoReplicaSet, ADD) - case k8s.UPDATE: - go a.persistReplicaSet(dtoReplicaSet, UPDATE) - case k8s.DELETE: - go a.persistReplicaSet(dtoReplicaSet, DELETE) - } - -} - -func (a *Aggregator) processDeployment(d k8s.K8sResourceMessage) { - deployment := d.Object.(*appsv1.Deployment) - - dto := datastore.Deployment{ - UID: string(deployment.UID), - Name: deployment.Name, - Namespace: deployment.Namespace, - Replicas: deployment.Status.Replicas, - } - - switch d.EventType { - case k8s.ADD: - go func() { - err := a.ds.PersistDeployment(dto, ADD) - if err != nil { - log.Logger.Error().Err(err).Msgf("error on PersistDeployment call to %s, uid: %s", ADD, dto.UID) - } - }() - case k8s.UPDATE: - go func() { - err := a.ds.PersistDeployment(dto, UPDATE) - if err != nil { - log.Logger.Error().Err(err).Msgf("error on PersistDeployment call to %s, uid: %s", UPDATE, dto.UID) - } - }() - case k8s.DELETE: - go func() { - err := a.ds.PersistDeployment(dto, DELETE) - if err != nil { - log.Logger.Error().Err(err).Msgf("error on PersistDeployment call to %s, uid: %s", DELETE, dto.UID) - } - }() - } -} - func (a *Aggregator) processContainer(d k8s.K8sResourceMessage) { c := d.Object.(*k8s.Container) @@ -229,99 +153,3 @@ func (a *Aggregator) processContainer(d k8s.K8sResourceMessage) { // No need for delete container } } - -func (a *Aggregator) processEndpoints(ep k8s.K8sResourceMessage) { - endpoints := ep.Object.(*corev1.Endpoints) - - // subsets - adrs := []datastore.Address{} - - // subset[0].address -> ips - // subset[0].ports -> ports - - for _, subset := range endpoints.Subsets { - ips := []datastore.AddressIP{} - ports := []datastore.AddressPort{} - - for _, addr := range subset.Addresses { - // Probably external IP - if addr.TargetRef == nil { - ips = append(ips, datastore.AddressIP{ - IP: addr.IP, - }) - continue - } - - // TargetRef: Pod probably - ips = append(ips, datastore.AddressIP{ - Type: string(addr.TargetRef.Kind), - ID: string(addr.TargetRef.UID), - Name: addr.TargetRef.Name, - Namespace: addr.TargetRef.Namespace, - IP: addr.IP, - }) - } - - for _, port := range subset.Ports { - ports = append(ports, datastore.AddressPort{ - Port: port.Port, - Protocol: string(port.Protocol), - }) - } - - adrs = append(adrs, datastore.Address{ - IPs: ips, - Ports: ports, - }) - } - - dto := datastore.Endpoints{ - UID: string(endpoints.UID), - Name: endpoints.Name, - Namespace: endpoints.Namespace, - Addresses: adrs, - } - - switch ep.EventType { - case k8s.ADD: - go func() { - err := a.ds.PersistEndpoints(dto, ADD) - if err != nil { - log.Logger.Error().Err(err).Msgf("error on PersistEndpoints call to %s, uid: %s", ADD, dto.UID) - } - }() - case k8s.UPDATE: - go func() { - err := a.ds.PersistEndpoints(dto, UPDATE) - if err != nil { - log.Logger.Error().Err(err).Msgf("error on PersistEndpoints call to %s, uid: %s", UPDATE, dto.UID) - } - }() - case k8s.DELETE: - go func() { - err := a.ds.PersistEndpoints(dto, DELETE) - if err != nil { - log.Logger.Error().Err(err).Msgf("error on PersistEndpoints call to %s, uid: %s", DELETE, dto.UID) - } - }() - } -} - -func (a *Aggregator) processDaemonSet(d k8s.K8sResourceMessage) { - daemonSet := d.Object.(*appsv1.DaemonSet) - - dtoDaemonSet := datastore.DaemonSet{ - UID: string(daemonSet.UID), - Name: daemonSet.Name, - Namespace: daemonSet.Namespace, - } - - switch d.EventType { - case k8s.ADD: - go a.ds.PersistDaemonSet(dtoDaemonSet, ADD) - case k8s.UPDATE: - go a.ds.PersistDaemonSet(dtoDaemonSet, UPDATE) - case k8s.DELETE: - go a.ds.PersistDaemonSet(dtoDaemonSet, DELETE) - } -} diff --git a/ebpf/throughput/main.go b/ebpf/throughput/main.go index 6383a43..793eeb4 100644 --- a/ebpf/throughput/main.go +++ b/ebpf/throughput/main.go @@ -90,16 +90,14 @@ func DeployAndWait(ctx context.Context, ch chan interface{}, eventChan <-chan in bpfEvent := (*ThroughputEventBpf)(unsafe.Pointer(&record.RawSample[0])) - go func() { - ch <- ThroughputEvent{ - Timestamp: bpfEvent.Timestamp, - Size: bpfEvent.Size, - SPort: bpfEvent.SPort, - DPort: bpfEvent.DPort, - SAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.SAddr[0], bpfEvent.SAddr[1], bpfEvent.SAddr[2], bpfEvent.SAddr[3]), - DAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.DAddr[0], bpfEvent.DAddr[1], bpfEvent.DAddr[2], bpfEvent.DAddr[3]), - } - }() + ch <- ThroughputEvent{ + Timestamp: bpfEvent.Timestamp, + Size: bpfEvent.Size, + SPort: bpfEvent.SPort, + DPort: bpfEvent.DPort, + SAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.SAddr[0], bpfEvent.SAddr[1], bpfEvent.SAddr[2], bpfEvent.SAddr[3]), + DAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.DAddr[0], bpfEvent.DAddr[1], bpfEvent.DAddr[2], bpfEvent.DAddr[3]), + } } for { diff --git a/k8s/daemonset.go b/k8s/daemonset.go deleted file mode 100644 index cd0a5b1..0000000 --- a/k8s/daemonset.go +++ /dev/null @@ -1,31 +0,0 @@ -package k8s - -func getOnAddDaemonSetFunc(ch chan interface{}) func(interface{}) { - return func(obj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: DAEMONSET, - EventType: ADD, - Object: obj, - } - } -} - -func getOnUpdateDaemonSetFunc(ch chan interface{}) func(interface{}, interface{}) { - return func(oldObj, newObj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: DAEMONSET, - EventType: UPDATE, - Object: newObj, - } - } -} - -func getOnDeleteDaemonSetFunc(ch chan interface{}) func(interface{}) { - return func(obj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: DAEMONSET, - EventType: DELETE, - Object: obj, - } - } -} diff --git a/k8s/deployment.go b/k8s/deployment.go deleted file mode 100644 index 51dac1c..0000000 --- a/k8s/deployment.go +++ /dev/null @@ -1,31 +0,0 @@ -package k8s - -func getOnAddDeploymentSetFunc(ch chan interface{}) func(interface{}) { - return func(obj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: DEPLOYMENT, - EventType: ADD, - Object: obj, - } - } -} - -func getOnUpdateDeploymentSetFunc(ch chan interface{}) func(interface{}, interface{}) { - return func(oldObj, newObj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: DEPLOYMENT, - EventType: UPDATE, - Object: newObj, - } - } -} - -func getOnDeleteDeploymentSetFunc(ch chan interface{}) func(interface{}) { - return func(obj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: DEPLOYMENT, - EventType: DELETE, - Object: obj, - } - } -} diff --git a/k8s/endpoints.go b/k8s/endpoints.go deleted file mode 100644 index b5cd84e..0000000 --- a/k8s/endpoints.go +++ /dev/null @@ -1,31 +0,0 @@ -package k8s - -func getOnAddEndpointsSetFunc(ch chan interface{}) func(interface{}) { - return func(obj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: ENDPOINTS, - EventType: ADD, - Object: obj, - } - } -} - -func getOnUpdateEndpointsSetFunc(ch chan interface{}) func(interface{}, interface{}) { - return func(oldObj, newObj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: ENDPOINTS, - EventType: UPDATE, - Object: newObj, - } - } -} - -func getOnDeleteEndpointsSetFunc(ch chan interface{}) func(interface{}) { - return func(obj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: ENDPOINTS, - EventType: DELETE, - Object: obj, - } - } -} diff --git a/k8s/informer.go b/k8s/informer.go index 7e5a949..0edaf32 100644 --- a/k8s/informer.go +++ b/k8s/informer.go @@ -13,7 +13,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" - appsv1 "k8s.io/client-go/informers/apps/v1" v1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -26,13 +25,9 @@ import ( type K8SResourceType string const ( - SERVICE = "Service" - POD = "Pod" - REPLICASET = "ReplicaSet" - DEPLOYMENT = "Deployment" - ENDPOINTS = "Endpoints" - CONTAINER = "Container" - DAEMONSET = "DaemonSet" + SERVICE = "Service" + POD = "Pod" + CONTAINER = "Container" ) const ( @@ -48,12 +43,8 @@ type K8sCollector struct { stopper chan struct{} // stop signal for the informers doneChan chan struct{} // done signal for k8sCollector // watchers - podInformer v1.PodInformer - serviceInformer v1.ServiceInformer - replicasetInformer appsv1.ReplicaSetInformer - deploymentInformer appsv1.DeploymentInformer - endpointsInformer v1.EndpointsInformer - daemonsetInformer appsv1.DaemonSetInformer + podInformer v1.PodInformer + serviceInformer v1.ServiceInformer Events chan interface{} } @@ -70,22 +61,6 @@ func (k *K8sCollector) Init(events chan interface{}) error { k.serviceInformer = k.informersFactory.Core().V1().Services() k.watchers[SERVICE] = k.informersFactory.Core().V1().Services().Informer() - // ReplicaSet - k.replicasetInformer = k.informersFactory.Apps().V1().ReplicaSets() - k.watchers[REPLICASET] = k.replicasetInformer.Informer() - - // Deployment - k.deploymentInformer = k.informersFactory.Apps().V1().Deployments() - k.watchers[DEPLOYMENT] = k.deploymentInformer.Informer() - - // Endpoints - k.endpointsInformer = k.informersFactory.Core().V1().Endpoints() - k.watchers[ENDPOINTS] = k.endpointsInformer.Informer() - - // DaemonSet - k.daemonsetInformer = k.informersFactory.Apps().V1().DaemonSets() - k.watchers[DAEMONSET] = k.daemonsetInformer.Informer() - defer runtime.HandleCrash() // Add event handlers @@ -107,30 +82,6 @@ func (k *K8sCollector) Init(events chan interface{}) error { DeleteFunc: getOnDeleteServiceFunc(k.Events), }) - k.watchers[REPLICASET].AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: getOnAddReplicaSetFunc(k.Events), - UpdateFunc: getOnUpdateReplicaSetFunc(k.Events), - DeleteFunc: getOnDeleteReplicaSetFunc(k.Events), - }) - - k.watchers[DEPLOYMENT].AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: getOnAddDeploymentSetFunc(k.Events), - UpdateFunc: getOnUpdateDeploymentSetFunc(k.Events), - DeleteFunc: getOnDeleteDeploymentSetFunc(k.Events), - }) - - k.watchers[ENDPOINTS].AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: getOnAddEndpointsSetFunc(k.Events), - UpdateFunc: getOnUpdateEndpointsSetFunc(k.Events), - DeleteFunc: getOnDeleteEndpointsSetFunc(k.Events), - }) - - k.watchers[DAEMONSET].AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: getOnAddDaemonSetFunc(k.Events), - UpdateFunc: getOnUpdateDaemonSetFunc(k.Events), - DeleteFunc: getOnDeleteDaemonSetFunc(k.Events), - }) - wg := sync.WaitGroup{} wg.Add(len(k.watchers)) for _, watcher := range k.watchers { diff --git a/k8s/replicaset.go b/k8s/replicaset.go deleted file mode 100644 index d14ea9c..0000000 --- a/k8s/replicaset.go +++ /dev/null @@ -1,31 +0,0 @@ -package k8s - -func getOnAddReplicaSetFunc(ch chan interface{}) func(interface{}) { - return func(obj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: REPLICASET, - EventType: ADD, - Object: obj, - } - } -} - -func getOnUpdateReplicaSetFunc(ch chan interface{}) func(interface{}, interface{}) { - return func(oldObj, newObj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: REPLICASET, - EventType: UPDATE, - Object: newObj, - } - } -} - -func getOnDeleteReplicaSetFunc(ch chan interface{}) func(interface{}) { - return func(obj interface{}) { - ch <- K8sResourceMessage{ - ResourceType: REPLICASET, - EventType: DELETE, - Object: obj, - } - } -}