Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
kenanfarukcakir committed Aug 21, 2023
1 parent 964cf4e commit b48838e
Show file tree
Hide file tree
Showing 16 changed files with 12 additions and 253 deletions.
25 changes: 2 additions & 23 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package aggregator

// aggregate data from different sources
// 1. k8s
// 2. containerd
// 2. containerd (TODO)
// 3. ebpf
// 4. cgroup (TODO)
// 5. docker (TODO)
Expand Down Expand Up @@ -32,7 +32,6 @@ import (
type Aggregator struct {
// listen to events from different sources
k8sChan <-chan interface{}
crChan <-chan interface{}
ebpfChan <-chan interface{}

// store the service map
Expand Down Expand Up @@ -64,8 +63,7 @@ type SocketMap struct {
}

type ClusterInfo struct {
mu sync.RWMutex
// TODO: If pod has more than one container, we need to differentiate
mu sync.RWMutex
PodIPToPodUid map[string]types.UID `json:"podIPToPodUid"`
ServiceIPToServiceUid map[string]types.UID `json:"serviceIPToServiceUid"`

Expand Down Expand Up @@ -144,7 +142,6 @@ func NewAggregator(k8sChan <-chan interface{}, crChan <-chan interface{}, ebpfCh

return &Aggregator{
k8sChan: k8sChan,
crChan: crChan,
ebpfChan: ebpfChan,
clusterInfo: clusterInfo,
ds: ds,
Expand All @@ -154,7 +151,6 @@ func NewAggregator(k8sChan <-chan interface{}, crChan <-chan interface{}, ebpfCh

func (a *Aggregator) Run() {
go a.processk8s()
go a.processCR()
go a.processEbpf()
}

Expand Down Expand Up @@ -213,13 +209,6 @@ func (a *Aggregator) processk8s() {
}
}

func (a *Aggregator) processCR() {
for data := range a.crChan {
// TODO
_ = data
}
}

func (a *Aggregator) processEbpf() {
for data := range a.ebpfChan {
bpfEvent, ok := data.(ebpf.BpfEvent)
Expand Down Expand Up @@ -574,16 +563,6 @@ func (a *Aggregator) processL7(d l7_req.L7Event) {
reqDto.FromType, reqDto.ToType = reqDto.ToType, reqDto.FromType
}

// TODO: remove this, for debugging
if d.Protocol == l7_req.L7_PROTOCOL_HTTP && d.Status == 0 {
log.Logger.Warn().Str("payload", string(d.Payload[0:d.PayloadSize])).
Str("fromIP", reqDto.FromIP).Uint16("fromPort", reqDto.FromPort).
Str("toIP", reqDto.ToIP).Uint16("toPort", reqDto.ToPort).
Str("protocol", reqDto.Protocol).Str("method", reqDto.Method).
Uint32("pid", d.Pid).Uint64("fd", d.Fd).
Str("path", reqDto.Path).Msg("http call with status 0 aggregator")
}

go func() {
err := a.ds.PersistRequest(reqDto)
if err != nil {
Expand Down
7 changes: 3 additions & 4 deletions aggregator/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (a *Aggregator) processDeployment(d k8s.K8sResourceMessage) {
go func() {
err := a.ds.PersistDeployment(dto, DELETE)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on PersistDeployment call to %s, uid: %s", UPDATE, dto.UID)
log.Logger.Error().Err(err).Msgf("error on PersistDeployment call to %s, uid: %s", DELETE, dto.UID)
}
}()
}
Expand All @@ -201,7 +201,6 @@ func (a *Aggregator) processContainer(d k8s.K8sResourceMessage) {
c := d.Object.(*k8s.Container)

dto := datastore.Container{
UID: c.UID,
Name: c.Name,
Namespace: c.Namespace,
PodUID: c.PodUID,
Expand All @@ -214,14 +213,14 @@ func (a *Aggregator) processContainer(d k8s.K8sResourceMessage) {
go func() {
err := a.ds.PersistContainer(dto, ADD)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on PersistContainer call to %s, uid: %s", UPDATE, dto.UID)
log.Logger.Error().Err(err).Msgf("error on PersistContainer call to %s", ADD)
}
}()
case k8s.UPDATE:
go func() {
err := a.ds.PersistContainer(dto, UPDATE)
if err != nil {
log.Logger.Error().Err(err).Msgf("error on PersistContainer call to %s, uid: %s", UPDATE, dto.UID)
log.Logger.Error().Err(err).Msgf("error on PersistContainer call to %s", UPDATE)
}
}()
// No need for delete container
Expand Down
2 changes: 1 addition & 1 deletion aggregator/sock_line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func TestSocketLine(t *testing.T) {
}

func TestXxx(t *testing.T) {
assumedInterval := uint64(2 * time.Second) // TODO: make configurable
assumedInterval := uint64(2 * time.Second)
nl := NewSocketLine(1, 0)

wg := sync.WaitGroup{}
Expand Down
7 changes: 1 addition & 6 deletions aggregator/sock_num_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,6 @@ func (nl *SocketLine) GetValue(timestamp uint64) (*SockInfo, error) {

// Return the value associated with the closest previous timestamp

// TODO: lru cache
// if no new values are added, return from cache
// A client that uses same socket for a long time will have a lot of requests
// no need to search for the same value again and again

nl.Values[index-1].LastMatch = uint64(time.Now().UnixNano())
return nl.Values[index-1].SockInfo, nil
}
Expand Down Expand Up @@ -121,7 +116,7 @@ func (nl *SocketLine) DeleteUnused() {
}

// assumedInterval is inversely proportional to the number of requests being discarded
assumedInterval := uint64(5 * time.Minute) // TODO: make configurable
assumedInterval := uint64(5 * time.Minute)

// delete all values that
// closed and its LastMatch + assumedInterval < lastMatchedReqTime
Expand Down
127 changes: 0 additions & 127 deletions cruntimes/containerd.go

This file was deleted.

26 changes: 0 additions & 26 deletions cruntimes/runtime.go

This file was deleted.

5 changes: 2 additions & 3 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type BackendDS struct {
containerEventChan chan interface{} // *ContainerEvent
dsEventChan chan interface{} // *DaemonSetEvent

// TODO:
// TODO add:
// statefulset
// job
// cronjob
Expand Down Expand Up @@ -469,8 +469,7 @@ func newHandler(logger nodeExportLogger) *nodeExporterHandler {
}

if innerHandler, err := h.innerHandler(); err != nil {
// TODO: remove panic
panic(fmt.Sprintf("Couldn't create metrics handler: %s", err))
log.Logger.Error().Msgf("Couldn't create metrics handler: %s", err)
} else {
h.inner = innerHandler
}
Expand Down
1 change: 0 additions & 1 deletion datastore/dto.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type Address struct {
}

type Container struct {
UID string `json:"uid"` // TODO: remove
Name string `json:"name"`
Namespace string `json:"namespace"`
PodUID string `json:"pod"` // Pod UID
Expand Down
1 change: 0 additions & 1 deletion datastore/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ func convertEpToEpEvent(ep Endpoints, eventType string) EpEvent {

func convertContainerToContainerEvent(c Container, eventType string) ContainerEvent {
return ContainerEvent{
UID: c.UID,
EventType: eventType,
Name: c.Name,
Namespace: c.Namespace,
Expand Down
Binary file modified ebpf/l7_req/bpf_bpfeb.o
Binary file not shown.
Binary file modified ebpf/l7_req/bpf_bpfel.o
Binary file not shown.
6 changes: 0 additions & 6 deletions ebpf/l7_req/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,6 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __s64 ret) {
bpf_map_delete_elem(&active_reads, &id);
bpf_map_delete_elem(&active_l7_requests, &k);

// TODO: remove this, only for debugging
if (e->protocol == PROTOCOL_HTTP && e->status == 0) {
char msgCtx[] = "http call with status code 0, fd: %ld, pid: %ld";
bpf_trace_printk(msgCtx, sizeof(msgCtx), e->fd,e->pid);
}

long r = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
if (r < 0) {
char msg[] = "could not write to l7_events - %ld";
Expand Down
8 changes: 0 additions & 8 deletions ebpf/l7_req/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,14 +346,6 @@ func DeployAndWait(parentCtx context.Context, ch chan interface{}) {
method = "Unknown"
}

// TODO: remove this, for debugging
if protocol == L7_PROTOCOL_HTTP && l7Event.Status == 0 {
log.Logger.Warn().Str("protocol", protocol).Str("method", method).
Str("payload", string(l7Event.Payload[:l7Event.PayloadSize])).
Uint32("pid", l7Event.Pid).
Msg("http call with status 0 at ebpf map read")
}

ch <- L7Event{
Fd: l7Event.Fd,
Pid: l7Event.Pid,
Expand Down
6 changes: 3 additions & 3 deletions ebpf/tcp_state/tcp_sockets.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "../../headers/bpf.h"
#include "../../headers/common.h"
#include "../../headers/tcp.h"
#include "../headers/bpf.h"
#include "../headers/common.h"
#include "../headers/tcp.h"

#include <bpf/bpf_core_read.h>
#include <bpf/bpf_helpers.h>
Expand Down
6 changes: 0 additions & 6 deletions k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package k8s

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
)

type Container struct {
UID string `json:"uid"` // TODO: remove
Name string `json:"name"`
Namespace string `json:"namespace"`
PodUID string `json:"pod"` // Pod UID
Expand Down Expand Up @@ -37,7 +35,6 @@ func getContainers(pod *corev1.Pod) []*Container {
}

containers = append(containers, &Container{
UID: string(uuid.NewUUID()), // TODO: container has no UID, remove?
Name: container.Name,
Namespace: pod.Namespace,
PodUID: string(pod.UID),
Expand Down Expand Up @@ -73,9 +70,6 @@ func getOnUpdatePodFunc(ch chan interface{}) func(interface{}, interface{}) {
return func(oldObj, newObj interface{}) {
pod := newObj.(*corev1.Pod)

// TODO: detect changes in containers ?? , we have no uid
// backend must all delete and add again in case of update

containers := getContainers(pod)
ch <- K8sResourceMessage{
ResourceType: POD,
Expand Down
Loading

0 comments on commit b48838e

Please sign in to comment.