Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[usm] add regular and raw tracepoints /sched_process_exit #33943

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
12 changes: 12 additions & 0 deletions pkg/collector/corechecks/ebpf/probe/ebpfcheck/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,3 +965,15 @@ func hashMapNumberOfEntriesWithHelper(mp *ebpf.Map, mapid ebpf.MapID, mphCache *

return int64(res), nil
}

// HashMapNumberOfEntries returns the number of entries in the map
func HashMapNumberOfEntries(mp *ebpf.Map) (int64, error) {
if isPerCPU(mp.Type()) {
return -1, fmt.Errorf("unsupported map type: %s", mp.String())
}
buffers := entryCountBuffers{
keysBufferSizeLimit: 0, // No limit
valuesBufferSizeLimit: 0, // No limit
}
return hashMapNumberOfEntriesWithIteration(mp, &buffers, 1)
}
22 changes: 22 additions & 0 deletions pkg/network/ebpf/c/protocols/flush.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,26 @@ int tracepoint__net__netif_receive_skb(void *ctx) {
return 0;
}

SEC("tracepoint/sched/sched_process_exit")
int tracepoint__sched__sched_process_exit(void *ctx) {
CHECK_BPF_PROGRAM_BYPASSED()
u64 pid_tgid = bpf_get_current_pid_tgid();

bpf_map_delete_elem(&ssl_read_args, &pid_tgid);
bpf_map_delete_elem(&ssl_read_ex_args, &pid_tgid);

return 0;
}

SEC("raw_tracepoint/sched_process_exit")
int raw_tracepoint__sched_process_exit(void *ctx) {
CHECK_BPF_PROGRAM_BYPASSED()
u64 pid_tgid = bpf_get_current_pid_tgid();

bpf_map_delete_elem(&ssl_read_args, &pid_tgid);
bpf_map_delete_elem(&ssl_read_ex_args, &pid_tgid);

return 0;
}

#endif
1 change: 1 addition & 0 deletions pkg/network/protocols/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import "C"
type ConnTuple = C.conn_tuple_t
type SslSock C.ssl_sock_t
type SslReadArgs C.ssl_read_args_t
type SslReadExArgs C.ssl_read_ex_args_t

type EbpfEvent C.http_event_t
type EbpfTx C.http_transaction_t
Expand Down
5 changes: 5 additions & 0 deletions pkg/network/protocols/http/types_linux.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 38 additions & 1 deletion pkg/network/usm/ebpf_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"slices"
"unsafe"

manager "github.com/DataDog/ebpf-manager"
"github.com/cilium/ebpf"
"github.com/davecgh/go-spew/spew"

Expand All @@ -35,7 +34,9 @@ import (
"github.com/DataDog/datadog-agent/pkg/network/tracer/offsetguess"
"github.com/DataDog/datadog-agent/pkg/network/usm/buildmode"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
"github.com/DataDog/datadog-agent/pkg/util/log"
manager "github.com/DataDog/ebpf-manager"
)

var (
Expand Down Expand Up @@ -149,6 +150,29 @@ func newEBPFProgram(c *config.Config, connectionProtocolMap *ebpf.Map) (*ebpfPro
}
}

if rawTracepointsSupported() {
// use a raw tracepoint on a supported kernel to intercept terminated threads and clear the corresponding maps.
mgr.Probes = append(mgr.Probes, []*manager.Probe{
{
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: "raw_tracepoint__sched_process_exit",
UID: probeUID,
},
TracepointName: "sched_process_exit",
},
}...)
} else {
// use a regular tracepoint to intercept terminated threads.
mgr.Probes = append(mgr.Probes, []*manager.Probe{
{
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: "tracepoint__sched__sched_process_exit",
UID: probeUID,
},
},
}...)
}

program := &ebpfProgram{
Manager: ddebpf.NewManager(mgr, "usm", &ebpftelemetry.ErrorsTelemetryModifier{}),
cfg: c,
Expand Down Expand Up @@ -462,6 +486,14 @@ func (e *ebpfProgram) init(buf bytecode.AssetReader, options manager.Options) er
}
}

if rawTracepointsSupported() {
// exclude regular tracepoint if kernel supports raw tracepoint
options.ExcludedFunctions = append(options.ExcludedFunctions, "tracepoint__sched__sched_process_exit")
} else {
//exclude a raw tracepoint if kernel does not support it.
options.ExcludedFunctions = append(options.ExcludedFunctions, "raw_tracepoint__sched_process_exit")
}

err := e.InitWithOptions(buf, &options)
if err != nil {
cleanup()
Expand All @@ -474,6 +506,11 @@ func (e *ebpfProgram) init(buf bytecode.AssetReader, options manager.Options) er
return err
}

func rawTracepointsSupported() bool {
kversion, err := kernel.HostVersion()
return err == nil && kversion >= kernel.VersionCode(4, 17, 0)
}

func getAssetName(module string, debug bool) string {
if debug {
return fmt.Sprintf("%s-debug.o", module)
Expand Down
34 changes: 30 additions & 4 deletions pkg/network/usm/ebpf_ssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,9 @@ func (o *sslProgram) ConfigureOptions(_ *manager.Manager, options *manager.Optio
}

// PreStart is called before the start of the provided eBPF manager.
func (o *sslProgram) PreStart(*manager.Manager) error {
func (o *sslProgram) PreStart(m *manager.Manager) error {
o.watcher.Start()
o.watcher.SetEbpfManager(m)
o.istioMonitor.Start()
o.nodeJSMonitor.Start()
return nil
Expand Down Expand Up @@ -534,9 +535,34 @@ func (o *sslProgram) DumpMaps(w io.Writer, mapName string, currentMap *ebpf.Map)
io.WriteString(w, "Map: '"+mapName+"', key: 'C.__u64', value: 'C.ssl_read_args_t'\n")
iter := currentMap.Iterate()
var key uint64
var value http.SslReadArgs
for iter.Next(unsafe.Pointer(&key), unsafe.Pointer(&value)) {
spew.Fdump(w, key, value)
// The wrapper struct prevents 'Fdump' from accessing content of pointers.
a := struct {
Ctx unsafe.Pointer
Buf unsafe.Pointer
}{
Ctx: unsafe.Pointer(http.SslReadArgs{}.Ctx),
Buf: unsafe.Pointer(http.SslReadArgs{}.Buf),
}
for iter.Next(unsafe.Pointer(&key), unsafe.Pointer(&a)) {
spew.Fdump(w, key, a)
}

case "ssl_read_ex_args": // maps/ssl_read_ex_args (BPF_MAP_TYPE_HASH), key C.__u64, value C.ssl_read_ex_args_t
io.WriteString(w, "Map: '"+mapName+"', key: 'C.__u64', value: 'C.ssl_read_ex_args_t'\n")
iter := currentMap.Iterate()
var key uint64
// The wrapper struct prevents 'Fdump' from accessing content of pointers.
a := struct {
Ctx unsafe.Pointer
Buf unsafe.Pointer
SizeOutParam unsafe.Pointer
}{
Ctx: unsafe.Pointer(http.SslReadExArgs{}.Ctx),
Buf: unsafe.Pointer(http.SslReadExArgs{}.Buf),
SizeOutParam: unsafe.Pointer(http.SslReadExArgs{}.Out_param),
}
for iter.Next(unsafe.Pointer(&key), unsafe.Pointer(&a)) {
spew.Fdump(w, key, a)
}

case "bio_new_socket_args": // maps/bio_new_socket_args (BPF_MAP_TYPE_HASH), key C.__u64, value C.__u32
Expand Down
168 changes: 168 additions & 0 deletions pkg/network/usm/monitor_tls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/DataDog/datadog-agent/pkg/collector/corechecks/ebpf/probe/ebpfcheck"
"github.com/DataDog/datadog-agent/pkg/ebpf/ebpftest"
"github.com/DataDog/datadog-agent/pkg/eventmonitor/consumers"
consumerstestutil "github.com/DataDog/datadog-agent/pkg/eventmonitor/consumers/testutil"
Expand All @@ -46,8 +47,10 @@ import (
usmtestutil "github.com/DataDog/datadog-agent/pkg/network/usm/testutil"
"github.com/DataDog/datadog-agent/pkg/network/usm/utils"
"github.com/DataDog/datadog-agent/pkg/process/monitor"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
globalutils "github.com/DataDog/datadog-agent/pkg/util/testutil"
dockerutils "github.com/DataDog/datadog-agent/pkg/util/testutil/docker"
manager "github.com/DataDog/ebpf-manager"
)

type tlsSuite struct {
Expand Down Expand Up @@ -991,3 +994,168 @@ func (s *tlsSuite) TestNodeJSTLS() {
}
}
}

// TestSSLReadArgsMaps verifies proper clearance of SSL-related kernel maps.
func (s *tlsSuite) TestSSLReadArgsMaps() {
t := s.T()
// setup monitor
cfg := utils.NewUSMEmptyConfig()
cfg.EnableNativeTLSMonitoring = true

addressOfHTTPPythonServer := "127.0.0.1:4443"
cmd := testutil.HTTPPythonServer(t, addressOfHTTPPythonServer, testutil.Options{
EnableTLS: true,
})

monitor := setupUSMTLSMonitor(t, cfg, reInitEventConsumer)
// Giving the tracer time to install the hooks
utils.WaitForProgramsToBeTraced(t, consts.USMModuleName, "shared_libraries", cmd.Process.Pid, utils.ManualTracingFallbackEnabled)
if !utils.IsProgramTraced(consts.USMModuleName, "shared_libraries", cmd.Process.Pid) {
return
}

// find probes for the programs we want to manipulate
probeSSLReadEx := getProbeByName(monitor.ebpfProgram.Manager.Manager, "uretprobe__SSL_read_ex")
require.NotNil(t, probeSSLReadEx)

probeProcExit := getProbeProcExit(t, monitor)
require.NotNil(t, probeProcExit)

// find the map
readExArgsMap, _, _ := monitor.ebpfProgram.Manager.Manager.GetMap("ssl_read_ex_args")
require.NotNil(t, readExArgsMap)

// create client
client, requestFn := simpleGetRequestsGenerator(t, addressOfHTTPPythonServer)

units := []struct {
name string
expected int64
preRun func(t *testing.T)
postRun func(t *testing.T)
}{
{
// disable both 'SSL_read_ex' and 'sched_process_exit', check the map is not empty
name: "eBPF programs disabled",
expected: 1,
preRun: func(t *testing.T) {
cleanProtocolMaps(t, "ssl", monitor.ebpfProgram.Manager.Manager)
detachProbe(t, monitor.ebpfProgram.Manager.Manager, probeSSLReadEx)
err := probeProcExit.Pause()
assert.NoError(t, err)
},
postRun: func(*testing.T) {
client.CloseIdleConnections()
},
},
{
// disable both 'SSL_read_ex' and 'sched_process_exit', ensure the cleaner properly clears the map
name: "periodic cleaner",
expected: 0,
preRun: func(t *testing.T) {
cleanProtocolMaps(t, "ssl", monitor.ebpfProgram.Manager.Manager)
detachProbe(t, monitor.ebpfProgram.Manager.Manager, probeSSLReadEx)
err := probeProcExit.Pause()
assert.NoError(t, err)
},
postRun: func(t *testing.T) {
client.CloseIdleConnections()
err := monitor.ebpfProgram.cleanDeadPidsInSslMaps()
assert.NoError(t, err)
},
},
{
// check if only 'sched_process_exit' is present and process terminates then the map is empty.
// must be last test case, because it terminates the server
name: "check terminated server",
expected: 0,
preRun: func(t *testing.T) {
cleanProtocolMaps(t, "ssl", monitor.ebpfProgram.Manager.Manager)
err := probeProcExit.Resume()
assert.NoError(t, err)
},
postRun: func(*testing.T) {
client.CloseIdleConnections()
cmd.Process.Kill()
cmd.Wait()
},
},
}
for _, unit := range units {
t.Run(unit.name, func(t *testing.T) {
unit.preRun(t)

// send requests to server
for i := 0; i < numberOfRequests; i++ {
requestFn()
}
unit.postRun(t)

require.Eventually(t, func() bool {
num, err := ebpfcheck.HashMapNumberOfEntries(readExArgsMap)
assert.NoError(t, err)

return num == unit.expected
}, 2*time.Second, 200*time.Millisecond, "unexpected map entries")
})
if t.Failed() {
t.Logf("Unexpect number of entries in the map")
ebpftest.DumpMapsTestHelper(t, monitor.DumpMaps, "ssl_read_ex_args")
}
}
}

// getProbeByName returns the probe of running or paused program that match the input name
func getProbeByName(m *manager.Manager, name string) *manager.Probe {
for _, probe := range m.Probes {
if probe.EBPFFuncName == name && probe.IsRunning() {
return probe
}
}
return nil
}

// getProbeProcExit returns the 'Probe' for the process exit program, either as a regular trace or a raw trace.
func getProbeProcExit(t *testing.T, monitor *Monitor) *manager.Probe {
probeRawProcExit := getProbeByName(monitor.ebpfProgram.Manager.Manager, "raw_tracepoint__sched_process_exit")
if probeRawProcExit != nil {
return probeRawProcExit
}
if kversion, err := kernel.HostVersion(); err == nil && kversion >= kernel.VersionCode(4, 17, 0) {
// raw tracepoints are supported on kernel>=4.17
require.FailNow(t, "raw tracepoint missing despite kernel supports it")
}
probeProcExit := getProbeByName(monitor.ebpfProgram.Manager.Manager, "tracepoint__sched__sched_process_exit")
return probeProcExit
}

func detachProbe(t *testing.T, m *manager.Manager, probe *manager.Probe) {
id := manager.ProbeIdentificationPair{
UID: probe.UID,
EBPFFuncName: probe.EBPFFuncName,
}
err := m.DetachHook(id)
assert.NoError(t, err)
}

// cleanDeadPidsInSslMaps finds activated 'sslProgram' and calls map cleaner.
func (e *ebpfProgram) cleanDeadPidsInSslMaps() error {
for _, prot := range e.enabledProtocols {
if prot.Instance.Name() == "openssl" {
switch prot.Instance.(type) {
case *sslProgram:
p, ok := prot.Instance.(*sslProgram)
if ok {
return p.cleanDeadPidsInMaps(e.Manager.Manager)
}
default:
}
}
}
return nil
}

// cleanDeadPidsInMaps clears terminated processes from SSL-related kernel maps.
func (o *sslProgram) cleanDeadPidsInMaps(manager *manager.Manager) error {
return o.watcher.CleanDeadPidsInMaps(manager, []string{"ssl_read_args", "ssl_read_ex_args"}, nil)
}
Loading
Loading