Skip to content

Commit

Permalink
WIP attempt to consume all tarian-detector events
Browse files Browse the repository at this point in the history
  • Loading branch information
andylibrian committed Mar 31, 2024
1 parent b60ae12 commit a65d7ef
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 16 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,4 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
)

replace github.com/intelops/tarian-detector => github.com/andylibrian/tarian-detector v0.0.0-20240329063104-78cf2b8cebcd
replace github.com/intelops/tarian-detector => github.com/andylibrian/tarian-detector v0.0.0-20240328182858-117380ab8670
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/andylibrian/tarian-detector v0.0.0-20240329063104-78cf2b8cebcd h1:Cz1gdyL11hflWpjAdu8s9yB2bOK0zKu/t11HUIePuXo=
github.com/andylibrian/tarian-detector v0.0.0-20240329063104-78cf2b8cebcd/go.mod h1:dXcRWq8AHABseHsjcnM8iJqwXCGX+dGGOR8kiXw1acY=
github.com/andylibrian/tarian-detector v0.0.0-20240328182858-117380ab8670 h1:NreLcDoqG8JM3X79HHRfl355wepviVLKtq6XWe0bMGs=
github.com/andylibrian/tarian-detector v0.0.0-20240328182858-117380ab8670/go.mod h1:dXcRWq8AHABseHsjcnM8iJqwXCGX+dGGOR8kiXw1acY=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down
25 changes: 13 additions & 12 deletions pkg/nodeagent/nodeagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package nodeagent

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"regexp"
Expand Down Expand Up @@ -220,6 +219,11 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
continue
}

detectionDataType := event["eventId"].(string)
if detectionDataType != "sys_execve_entry" {
continue
}

pid := event["hostProcessId"].(uint32)

// Retrieve the container ID.
Expand All @@ -238,10 +242,7 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
continue
}

// TODO: sys_execve_entry could be added here
// But for kubectl exec, the detected entry comm is still the wrapper: runc:init
// With sys_execve_exit, the comm is the target process
detectionDataType := event["eventId"].(string)
// detectionDataType := event["eventId"].(string)
if detectionDataType == "sys_execve_entry" {
execEvent, err2 := n.execEventFromTarianDetector(event, containerID, pod)
if err2 != nil {
Expand All @@ -255,16 +256,16 @@ func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error {
}
}

n.logger.WithField("execEvent", execEvent).WithField("event", event).Info("DEBUG")
n.logger.WithField("execEvent", execEvent).WithField("event", event).Info("=============== DEBUG")
}

byteData, err := json.Marshal(event)
if err != nil {
n.logger.Error("tarian-detector: error while marshaling event", "err", err)
continue
}
// byteData, err := json.Marshal(event)
// if err != nil {
// n.logger.Error("tarian-detector: error while marshaling event", "err", err)
// continue
// }

n.SendDetectionEventToClusterAgent(detectionDataType, string(byteData))
// n.SendDetectionEventToClusterAgent(detectionDataType, string(byteData))
}
}
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/server/ingestion_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ func NewIngestionWorker(logger *logrus.Logger, eventStore store.EventStore, queu
// - If there are errors during processing, they are logged.
func (iw *IngestionWorker) Start() {
go iw.loopConsumeQueue(iw.IngestionQueue)
go iw.loopConsumeQueue(iw.ingestionQueueForEventDetection)
go iw.loopConsumeQueueEventDetection(iw.ingestionQueueForEventDetection)
}

func (iw *IngestionWorker) loopConsumeQueue(queue protoqueue.QueueSubscriber) {
for {
msg, err := queue.NextMessage(&tarianpb.Event{})
iw.logger.WithField("event", msg).Infof("loopConsumeQueue: got message")
if err != nil {
iw.logger.WithError(err).Error("error while processing event")
continue
Expand All @@ -68,3 +69,26 @@ func (iw *IngestionWorker) loopConsumeQueue(queue protoqueue.QueueSubscriber) {
}
}
}

func (iw *IngestionWorker) loopConsumeQueueEventDetection(queue protoqueue.QueueSubscriber) {
for {
msg, err := queue.NextMessage(&tarianpb.Event{})
if err != nil {
// iw.logger.WithError(err).Error("error while processing event")
continue
}

event, ok := msg.(*tarianpb.Event)
if !ok {
// iw.logger.WithError(err).Error("error while processing event")
continue
}

event.ServerTimestamp = timestamppb.Now()
err = iw.eventStore.Add(event)

if err != nil {
// iw.logger.WithError(err).Error("error while processing event")
}
}
}

0 comments on commit a65d7ef

Please sign in to comment.