From a65d7ef19bf1e49082d8a62a9e6919433294debd Mon Sep 17 00:00:00 2001 From: Andy Librian Date: Sun, 31 Mar 2024 08:45:32 +0700 Subject: [PATCH] WIP attempt to consume all tarian-detector events --- go.mod | 2 +- go.sum | 4 ++-- pkg/nodeagent/nodeagent.go | 25 +++++++++++++------------ pkg/server/ingestion_worker.go | 26 +++++++++++++++++++++++++- 4 files changed, 41 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 08c202a..0e79cd0 100644 --- a/go.mod +++ b/go.mod @@ -119,4 +119,4 @@ require ( sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect ) -replace github.com/intelops/tarian-detector => github.com/andylibrian/tarian-detector v0.0.0-20240329063104-78cf2b8cebcd +replace github.com/intelops/tarian-detector => github.com/andylibrian/tarian-detector v0.0.0-20240328182858-117380ab8670 diff --git a/go.sum b/go.sum index 1913de0..8ecc19a 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= -github.com/andylibrian/tarian-detector v0.0.0-20240329063104-78cf2b8cebcd h1:Cz1gdyL11hflWpjAdu8s9yB2bOK0zKu/t11HUIePuXo= -github.com/andylibrian/tarian-detector v0.0.0-20240329063104-78cf2b8cebcd/go.mod h1:dXcRWq8AHABseHsjcnM8iJqwXCGX+dGGOR8kiXw1acY= +github.com/andylibrian/tarian-detector v0.0.0-20240328182858-117380ab8670 h1:NreLcDoqG8JM3X79HHRfl355wepviVLKtq6XWe0bMGs= +github.com/andylibrian/tarian-detector v0.0.0-20240328182858-117380ab8670/go.mod h1:dXcRWq8AHABseHsjcnM8iJqwXCGX+dGGOR8kiXw1acY= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= diff --git a/pkg/nodeagent/nodeagent.go b/pkg/nodeagent/nodeagent.go index 1807925..68eec03 100644 --- a/pkg/nodeagent/nodeagent.go +++ b/pkg/nodeagent/nodeagent.go @@ -2,7 +2,6 @@ package nodeagent import ( "context" - "encoding/json" "fmt" "path/filepath" "regexp" @@ -220,6 +219,11 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error { continue } + detectionDataType := event["eventId"].(string) + if detectionDataType != "sys_execve_entry" { + continue + } + pid := event["hostProcessId"].(uint32) // Retrieve the container ID. @@ -238,10 +242,7 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error { continue } - // TODO: sys_execve_entry could be added here - // But for kubectl exec, the detected entry comm is still the wrapper: runc:init - // With sys_execve_exit, the comm is the target process - detectionDataType := event["eventId"].(string) + // detectionDataType := event["eventId"].(string) if detectionDataType == "sys_execve_entry" { execEvent, err2 := n.execEventFromTarianDetector(event, containerID, pod) if err2 != nil { @@ -255,16 +256,16 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error { } } - n.logger.WithField("execEvent", execEvent).WithField("event", event).Info("DEBUG") + n.logger.WithField("execEvent", execEvent).WithField("event", event).Info("=============== DEBUG") } - byteData, err := json.Marshal(event) - if err != nil { - n.logger.Error("tarian-detector: error while marshaling event", "err", err) - continue - } + // byteData, err := json.Marshal(event) + // if err != nil { + // n.logger.Error("tarian-detector: error while marshaling event", "err", err) + // continue + // } - n.SendDetectionEventToClusterAgent(detectionDataType, string(byteData)) + // n.SendDetectionEventToClusterAgent(detectionDataType, string(byteData)) } } } diff --git a/pkg/server/ingestion_worker.go b/pkg/server/ingestion_worker.go index a81359d..c915262 100644 --- a/pkg/server/ingestion_worker.go +++ b/pkg/server/ingestion_worker.go @@ -43,12 +43,13 @@ func NewIngestionWorker(logger *logrus.Logger, eventStore store.EventStore, queu // - If there are errors during processing, they are logged. func (iw *IngestionWorker) Start() { go iw.loopConsumeQueue(iw.IngestionQueue) - go iw.loopConsumeQueue(iw.ingestionQueueForEventDetection) + go iw.loopConsumeQueueEventDetection(iw.ingestionQueueForEventDetection) } func (iw *IngestionWorker) loopConsumeQueue(queue protoqueue.QueueSubscriber) { for { msg, err := queue.NextMessage(&tarianpb.Event{}) + iw.logger.WithField("event", msg).Infof("loopConsumeQueue: got message") if err != nil { iw.logger.WithError(err).Error("error while processing event") continue @@ -68,3 +69,26 @@ func (iw *IngestionWorker) loopConsumeQueue(queue protoqueue.QueueSubscriber) { } } } + +func (iw *IngestionWorker) loopConsumeQueueEventDetection(queue protoqueue.QueueSubscriber) { + for { + msg, err := queue.NextMessage(&tarianpb.Event{}) + if err != nil { + // iw.logger.WithError(err).Error("error while processing event") + continue + } + + event, ok := msg.(*tarianpb.Event) + if !ok { + // iw.logger.WithError(err).Error("error while processing event") + continue + } + + event.ServerTimestamp = timestamppb.Now() + err = iw.eventStore.Add(event) + + if err != nil { + // iw.logger.WithError(err).Error("error while processing event") + } + } +}