Skip to content

Commit

Permalink
Add support for RX threads.
Browse files Browse the repository at this point in the history
This should support NFQ with in autofp and workers mode. It also
adds very basic support for the ips packet metrics as

  suricata_ip_blocked_packets_total

Closes #17
  • Loading branch information
awelzel committed Aug 5, 2024
1 parent 73713b0 commit 6b85c1e
Show file tree
Hide file tree
Showing 5 changed files with 13,393 additions and 37 deletions.
117 changes: 87 additions & 30 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,19 @@ var (
newPerThreadCounterMetric("flow_bypassed", "local_capture_bytes_total", "", "local_capture_bytes"),
}

perThreadIpsMetrics = []metricInfo{
newPerThreadCounterMetric("ips", "accepted_packets_total", "", "accepted"),
newPerThreadCounterMetric("ips", "blocked_packets_total", "", "blocked"),
newPerThreadCounterMetric("ips", "rejected_packets_total", "", "rejected"),
newPerThreadCounterMetric("ips", "replaced_packets_total", "", "replaced"),
}

perThreadTcpMetricsReceive = []metricInfo{
newPerThreadCounterMetric("tcp", "syn_packets_total", "", "syn"),
newPerThreadCounterMetric("tcp", "synack_packets_total", "", "synack"),
newPerThreadCounterMetric("tcp", "rst_packets_total", "", "rst"),
}

// From .thread.tcp
perThreadTcpMetrics = []metricInfo{
// New in 7.0.0
Expand All @@ -220,9 +233,6 @@ var (
newPerThreadCounterMetric("tcp", "invalid_checksum_packets_total", "", "invalid_checksum"),
// Removed in 7.0.0: 0360cb654293c333e3be70204705fa7ec328512e
newPerThreadCounterMetric("tcp", "no_flow_total", "", "no_flow").Optional(),
newPerThreadCounterMetric("tcp", "syn_packets_total", "", "syn"),
newPerThreadCounterMetric("tcp", "synack_packets_total", "", "synack"),
newPerThreadCounterMetric("tcp", "rst_packets_total", "", "rst"),
newPerThreadCounterMetric("tcp", "midstream_pickups_total", "", "midstream_pickups"),
newPerThreadCounterMetric("tcp", "pkt_on_wrong_thread_total", "", "pkt_on_wrong_thread"),
newPerThreadCounterMetric("tcp", "segment_memcap_drop_total", "", "segment_memcap_drop"),
Expand Down Expand Up @@ -533,7 +543,12 @@ func handleNapatechMetrics(ch chan<- prometheus.Metric, message map[string]any)
}
}

func handleWorkerThread(ch chan<- prometheus.Metric, threadName string, thread map[string]any) {
// Handle the shared RX and worker thread portions.
//
// Depending on autofp or workers runmode, the "capture" entry
// is in the RX threads.
func handleReceiveCommon(ch chan<- prometheus.Metric, threadName string, thread map[string]any) {

if capture, ok := thread["capture"].(map[string]any); ok {
for _, m := range perThreadCaptureMetrics {
if cm := newConstMetric(m, capture, threadName); cm != nil {
Expand Down Expand Up @@ -564,58 +579,95 @@ func handleWorkerThread(ch chan<- prometheus.Metric, threadName string, thread m
}
}

tcp := thread["tcp"].(map[string]any)
for _, m := range perThreadTcpMetrics {
if cm := newConstMetric(m, tcp, threadName); cm != nil {
ch <- cm
}
}

flow := thread["flow"].(map[string]any)
for _, m := range perThreadFlowMetrics {
if cm := newConstMetric(m, flow, threadName); cm != nil {
ch <- cm
}
}

wrk := flow["wrk"].(map[string]any)
for _, m := range perThreadFlowWrkMetrics {
if cm := newConstMetric(m, wrk, threadName); cm != nil {
// Convert all decoder entries that look like numbers
// as perThreadDecoder metric with a "kind" label.
decoder := thread["decoder"].(map[string]any)
for _, m := range perThreadDecoderMetrics {
if cm := newConstMetric(m, decoder, threadName); cm != nil {
ch <- cm
}
}

// Defrag stats from worker and receive threads.
defrag := thread["defrag"].(map[string]any)
defragIpv4 := defrag["ipv4"].(map[string]any)
defragIpv6 := defrag["ipv6"].(map[string]any)

for _, m := range perThreadDefragIpv4Metrics {
if cm := newConstMetric(m, defragIpv4, threadName); cm != nil {
ch <- cm
}
}

for _, m := range perThreadDefragIpv6Metrics {
if cm := newConstMetric(m, defragIpv6, threadName); cm != nil {
ch <- cm
}
}

for _, m := range perThreadDefragMetrics {
if cm := newConstMetric(m, defrag, threadName); cm != nil {
ch <- cm
}
}

detect := thread["detect"].(map[string]any)
for _, m := range perThreadDetectMetrics {
if cm := newConstMetric(m, detect, threadName); cm != nil {
tcp := thread["tcp"].(map[string]any)
for _, m := range perThreadTcpMetricsReceive {
if cm := newConstMetric(m, tcp, threadName); cm != nil {
ch <- cm
}
}
}

// Convert all decoder entries that look like numbers
// as perThreadDecoder metric with a "kind" label.
decoder := thread["decoder"].(map[string]any)
for _, m := range perThreadDecoderMetrics {
if cm := newConstMetric(m, decoder, threadName); cm != nil {
func handleIps(ch chan<- prometheus.Metric, threadName string, thread map[string]any) {
// Extract basic IPS metrics if they exist.
if ips, ok := thread["ips"].(map[string]any); ok {
for _, m := range perThreadIpsMetrics {
if cm := newConstMetric(m, ips, threadName); cm != nil {
ch <- cm
}
}
}
}

// Receive threads have the same layout as worker threads.
func handleReceiveThread(ch chan<- prometheus.Metric, threadName string, thread map[string]any) {
handleReceiveCommon(ch, threadName, thread)
handleIps(ch, threadName, thread)
}

func handleTransmitThread(ch chan<- prometheus.Metric, threadName string, thread map[string]any) {
handleIps(ch, threadName, thread)
}

func handleWorkerThread(ch chan<- prometheus.Metric, threadName string, thread map[string]any) {
handleReceiveCommon(ch, threadName, thread)
handleIps(ch, threadName, thread)

tcp := thread["tcp"].(map[string]any)
for _, m := range perThreadTcpMetrics {
if cm := newConstMetric(m, tcp, threadName); cm != nil {
ch <- cm
}
}

flow := thread["flow"].(map[string]any)
for _, m := range perThreadFlowMetrics {
if cm := newConstMetric(m, flow, threadName); cm != nil {
ch <- cm
}
}

wrk := flow["wrk"].(map[string]any)
for _, m := range perThreadFlowWrkMetrics {
if cm := newConstMetric(m, wrk, threadName); cm != nil {
ch <- cm
}
}

detect := thread["detect"].(map[string]any)
for _, m := range perThreadDetectMetrics {
if cm := newConstMetric(m, detect, threadName); cm != nil {
ch <- cm
}
}
Expand Down Expand Up @@ -753,8 +805,12 @@ func produceMetrics(ch chan<- prometheus.Metric, counters map[string]any) {
// Produce per thread metrics
for threadName, thread_ := range message["threads"].(map[string]any) {
if thread, ok := thread_.(map[string]any); ok {
if strings.HasPrefix(threadName, "W#") {
if strings.HasPrefix(threadName, "W#") || strings.HasPrefix(threadName, "W-") {
handleWorkerThread(ch, threadName, thread)
} else if strings.HasPrefix(threadName, "RX") {
handleReceiveThread(ch, threadName, thread)
} else if strings.HasPrefix(threadName, "TX") {
handleTransmitThread(ch, threadName, thread)
} else if strings.HasPrefix(threadName, "FM") {
handleFlowManagerThread(ch, threadName, thread)
} else if strings.HasPrefix(threadName, "FR") {
Expand Down Expand Up @@ -829,6 +885,7 @@ func main() {
r.MustRegister(&suricataCollector{NewSuricataClient(*socketPath), sync.Mutex{}})

http.Handle(*path, promhttp.HandlerFor(r, promhttp.HandlerOpts{}))

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte(`<html>
<head><title>Suricata Exporter</title></head>
Expand Down
120 changes: 113 additions & 7 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ func aggregateMetrics(metrics []prometheus.Metric) map[string][]testMetric {
return result
}

func sortedThreadNames(tms []testMetric) string {
tns := make([]string, len(tms)) // thread names
for i, tm := range tms {
tns[i] = tm.labels["thread"]
}

sort.Strings(tns)

return fmt.Sprintf("%v", tns)
}

// Helper converting *prometheus.Metric to something easier usable for testing.
func testMetricFromMetric(m prometheus.Metric) testMetric {
desc := m.Desc()
Expand Down Expand Up @@ -216,13 +227,7 @@ func TestDump604AFPacket(t *testing.T) {
t.Errorf("Unexpected number of suricata_kernel_packets metrics: %v", len(tms))
}

tns := make([]string, len(tms)) // thread names
for i, tm := range tms {
tns[i] = tm.labels["thread"]
}

sort.Strings(tns)
threadNames := fmt.Sprintf("%v", tns)
threadNames := sortedThreadNames(tms)
if threadNames != "[W#01-wlp0s20f3 W#02-wlp0s20f3 W#03-wlp0s20f3 W#04-wlp0s20f3 W#05-wlp0s20f3 W#06-wlp0s20f3 W#07-wlp0s20f3 W#08-wlp0s20f3]" {
t.Errorf("Unexpected threadNames: %v", threadNames)
}
Expand Down Expand Up @@ -354,3 +359,104 @@ func TestDump701(t *testing.T) {
t.Errorf("Unexpected number of suricata_flow_mgr_flows_checked_total: %v", len(tms))
}
}

func TestDump706NFQAutoFP(t *testing.T) {
data, err := os.ReadFile("./testdata/dump-counters-7.0.6-nfq-autofp.json")
if err != nil {
log.Panicf("Unable to open file: %s", err)
}

var counters map[string]any
err = json.Unmarshal(data, &counters)
if err != nil {
t.Error(err)
}

metrics := produceMetricsHelper(counters)
agged := aggregateMetrics(metrics)

tms := agged["suricata_ips_blocked_packets_total"]
if len(tms) != 14 {
t.Errorf("Unexpected number of suricata_ips_blocked_total: %v", len(tms))
}

threadNames := sortedThreadNames(tms)
if threadNames != "[RX-NFQ#0 RX-NFQ#1 RX-NFQ#2 RX-NFQ#3 TX#00 TX#01 TX#02 TX#03 W#01 W#02 W#03 W#04 W#05 W#06]" {
t.Errorf("Wrong threads %v", threadNames)
}
}

func TestDump706NFQWorkers(t *testing.T) {
data, err := os.ReadFile("./testdata/dump-counters-7.0.6-nfq-workers.json")
if err != nil {
log.Panicf("Unable to open file: %s", err)
}

var counters map[string]any
err = json.Unmarshal(data, &counters)
if err != nil {
t.Error(err)
}

metrics := produceMetricsHelper(counters)
agged := aggregateMetrics(metrics)

tms := agged["suricata_ips_blocked_packets_total"]
if len(tms) != 4 {
t.Errorf("Unexpected number of suricata_ips_blocked_total: %v", len(tms))
}

threadNames := sortedThreadNames(tms)
if threadNames != "[W-NFQ#0 W-NFQ#1 W-NFQ#2 W-NFQ#3]" {
t.Errorf("Wrong threads %v", threadNames)
}
}

func TestDump706AFPacketAutoFP(t *testing.T) {
data, err := os.ReadFile("./testdata/dump-counters-7.0.6-afpacket-autofp.json")
if err != nil {
log.Panicf("Unable to open file: %s", err)
}

var counters map[string]any
err = json.Unmarshal(data, &counters)
if err != nil {
t.Error(err)
}

metrics := produceMetricsHelper(counters)
agged := aggregateMetrics(metrics)
tms, ok := agged["suricata_capture_kernel_packets_total"] // test metrics
if !ok {
t.Errorf("Failed to find suricata_capture_kernel_packets metrics")
}

if len(tms) != 2 {
t.Errorf("Unexpected number of suricata_kernel_packets metrics: %v", len(tms))
}

threadNames := sortedThreadNames(tms)
if threadNames != "[RX#01 RX#02]" {
t.Errorf("Wrong threads %v", threadNames)
}

tms, ok = agged["suricata_decoder_packets_total"]
if !ok {
t.Errorf("Failed to find suricata_decoder_packets_total metrics")
}

// Decoder stats are reported for rx and worker threads.
if len(tms) != 8 {
t.Errorf("Unexpected number of suricata_decoder_packets_total metrics: %v", len(tms))
}

tms, ok = agged["suricata_tcp_syn_packets_total"]
if !ok {
t.Errorf("Failed to find suricata_tcp_syn_packets_total")
}

// TCP metrics report for rx and worker threads.
if len(tms) != 8 {
t.Errorf("Unexpected number of suricata_decoder_packets_total metrics: %v", len(tms))
}
}
Loading

0 comments on commit 6b85c1e

Please sign in to comment.