From a5c323a3d012d67f5d04ef90144edc4ce675c56f Mon Sep 17 00:00:00 2001
From: Leonardo Parente <23251360+leoparente@users.noreply.github.com>
Date: Mon, 8 Apr 2024 15:13:07 -0300
Subject: [PATCH 1/2] Fix netflow port numbers and start/finish

---
 src/inputs/flow/NetflowData.h | 24 ++++++++++++------------
 1 file changed, 12 insertions(+), 12 deletions(-)

diff --git a/src/inputs/flow/NetflowData.h b/src/inputs/flow/NetflowData.h
index 0eb8424c5..a6e6bfc08 100644
--- a/src/inputs/flow/NetflowData.h
+++ b/src/inputs/flow/NetflowData.h
@@ -87,11 +87,11 @@ static bool process_netflow_v1(NFSample *sample)
         memcpy(flow_sample.dst_ip.data(), &nf1_flow->dest_ip, sizeof(uint32_t));
         memcpy(flow_sample.nexthop_ip.data(), &nf1_flow->nexthop_ip, sizeof(uint32_t));
 
-        flow_sample.src_port = nf1_flow->src_port;
-        flow_sample.dst_port = nf1_flow->dest_port;
+        flow_sample.src_port = be16toh(nf1_flow->src_port);
+        flow_sample.dst_port = be16toh(nf1_flow->dest_port);
 
-        flow_sample.flow_start = nf1_flow->flow_start;
-        flow_sample.flow_finish = nf1_flow->flow_finish;
+        flow_sample.flow_start = be32toh(nf1_flow->flow_start);
+        flow_sample.flow_finish = be32toh(nf1_flow->flow_finish);
 
         flow_sample.if_index_in = be16toh(nf1_flow->if_index_in);
         flow_sample.if_index_out = be16toh(nf1_flow->if_index_out);
@@ -136,11 +136,11 @@ static bool process_netflow_v5(NFSample *sample)
         memcpy(flow_sample.dst_ip.data(), &nf5_flow->dest_ip, sizeof(uint32_t));
         memcpy(flow_sample.nexthop_ip.data(), &nf5_flow->nexthop_ip, sizeof(uint32_t));
 
-        flow_sample.src_port = nf5_flow->src_port;
-        flow_sample.dst_port = nf5_flow->dest_port;
+        flow_sample.src_port = be16toh(nf5_flow->src_port);
+        flow_sample.dst_port = be16toh(nf5_flow->dest_port);
 
-        flow_sample.flow_start = nf5_flow->flow_start;
-        flow_sample.flow_finish = nf5_flow->flow_finish;
+        flow_sample.flow_start = be32toh(nf5_flow->flow_start);
+        flow_sample.flow_finish = be32toh(nf5_flow->flow_finish);
 
         flow_sample.if_index_in = be16toh(nf5_flow->if_index_in);
         flow_sample.if_index_out = be16toh(nf5_flow->if_index_out);
@@ -190,11 +190,11 @@ static bool process_netflow_v7(NFSample *sample)
         memcpy(flow_sample.dst_ip.data(), &nf7_flow->dest_ip, sizeof(uint32_t));
         memcpy(flow_sample.nexthop_ip.data(), &nf7_flow->nexthop_ip, sizeof(uint32_t));
 
-        flow_sample.src_port = nf7_flow->src_port;
-        flow_sample.dst_port = nf7_flow->dest_port;
+        flow_sample.src_port = be16toh(nf7_flow->src_port);
+        flow_sample.dst_port = be16toh(nf7_flow->dest_port);
 
-        flow_sample.flow_start = nf7_flow->flow_start;
-        flow_sample.flow_finish = nf7_flow->flow_finish;
+        flow_sample.flow_start = be32toh(nf7_flow->flow_start);
+        flow_sample.flow_finish = be32toh(nf7_flow->flow_finish);
 
         flow_sample.if_index_in = be16toh(nf7_flow->if_index_in);
         flow_sample.if_index_out = be16toh(nf7_flow->if_index_out);

From 17941931ef85c9fe3ee3d68f37330dd7442bb7e2 Mon Sep 17 00:00:00 2001
From: Leonardo Parente <23251360+leoparente@users.noreply.github.com>
Date: Mon, 8 Apr 2024 15:50:05 -0300
Subject: [PATCH 2/2] Add unit tests for netflow v5

---
 src/handlers/flow/test_flows.cpp |  46 +++++++++++++++++++++++++++++--
 src/tests/fixtures/nf5.pcap      | Bin 0 -> 332 bytes
 2 files changed, 44 insertions(+), 2 deletions(-)
 create mode 100644 src/tests/fixtures/nf5.pcap

diff --git a/src/handlers/flow/test_flows.cpp b/src/handlers/flow/test_flows.cpp
index a0aeded68..0f6ff3caa 100644
--- a/src/handlers/flow/test_flows.cpp
+++ b/src/handlers/flow/test_flows.cpp
@@ -1,6 +1,6 @@
 #include <catch2/catch_test_macros.hpp>
-#include <catch2/matchers/catch_matchers.hpp>
 #include <catch2/catch_test_visor.hpp>
+#include <catch2/matchers/catch_matchers.hpp>
 
 #include "FlowInputStream.h"
 #include "FlowStreamHandler.h"
@@ -399,7 +399,49 @@ TEST_CASE("Parse sflow stream with interfaces filter", "[sflow][flow]")
     CHECK(j["devices"]["192.168.0.11"]["interfaces"]["37"]["top_out_src_ips_and_port_bytes"][0]["estimate"] == nullptr);
 }
 
-TEST_CASE("Parse netflow stream", "[netflow][flow]")
+TEST_CASE("Parse netflow v5 stream", "[netflow][flow]")
+{
+
+    FlowInputStream stream{"netflow-test"};
+    stream.config_set("flow_type", "netflow");
+    stream.config_set("pcap_file", "tests/fixtures/nf5.pcap");
+
+    visor::Config c;
+    auto stream_proxy = stream.add_event_proxy(c);
+    c.config_set<uint64_t>("num_periods", 1);
+    FlowStreamHandler flow_handler{"flow-test", stream_proxy, &c};
+    flow_handler.config_set<visor::Configurable::StringList>("enable", visor::Configurable::StringList({"top_tos"}));
+
+    flow_handler.start();
+    stream.start();
+    stream.stop();
+    flow_handler.stop();
+
+    auto event_data = flow_handler.metrics()->bucket(0)->event_data_locked();
+
+    // confirmed with wireshark
+    CHECK(event_data.num_events->value() == 2);
+    CHECK(event_data.num_samples->value() == 2);
+
+    nlohmann::json j;
+    flow_handler.metrics()->bucket(0)->to_json(j);
+
+    CHECK(j["devices"]["10.0.0.2"]["records_flows"] == 3);
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["dst_ips_out"] == 2);
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["src_ips_in"] == 1);
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["dst_ports_out"] == 3);
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["cardinality"]["src_ports_in"] == 2);
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_src_ips_bytes"][0]["estimate"] == 1720);
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_src_ips_packets"][0]["estimate"] == 33);
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dscp_bytes"][0]["estimate"] == 1220);
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dscp_bytes"][0]["name"] == "CS6");
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_ecn_packets"][0]["estimate"] == 33);
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_ecn_packets"][0]["name"] == "Not-ECT");
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dst_ports_bytes"][0]["estimate"] == 1128);
+    CHECK(j["devices"]["10.0.0.2"]["interfaces"]["0"]["top_in_dst_ports_bytes"][0]["name"] == "telnet");
+}
+
+TEST_CASE("Parse netflow v9 stream", "[netflow][flow]")
 {
 
     FlowInputStream stream{"netflow-test"};
diff --git a/src/tests/fixtures/nf5.pcap b/src/tests/fixtures/nf5.pcap
new file mode 100644
index 0000000000000000000000000000000000000000..43d60baaa8aff08b2836815f49feb23eefaa528f
GIT binary patch
literal 332
zcmca|c+)~A1{MYcU}0bcava(v;(trBGb{qKL3q{fGYrfEAd*RdgTa-7VTw^c1H*s8
zlBZk@3`{`G{H2(KouPrfmw}amiNQzYUA#oQgY>pnaH9{b;9&$}CWK93i(+7WCJrVJ
z1_r?cFg{2<$ZQxNNJ}#?uw=maA*>AI3=C`sV0>4gi;p_9GZX;@Kp4ZtDMke#7nh2l
txHzEv8PLUy3_cnSKo`5p_-6v8A+}<10V@Lo;}@8X3=BXAGcfLe@c}^~F{1zg

literal 0
HcmV?d00001