diff --git a/cmd/goflow2/main.go b/cmd/goflow2/main.go index cc994eea..1e68f693 100644 --- a/cmd/goflow2/main.go +++ b/cmd/goflow2/main.go @@ -247,10 +247,11 @@ func main() { l.Info("starting collection") cfg := &utils.UDPReceiverConfig{ - Sockets: numSockets, - Workers: numWorkers, - QueueSize: queueSize, - Blocking: isBlocking, + Sockets: numSockets, + Workers: numWorkers, + QueueSize: queueSize, + Blocking: isBlocking, + ReceiverCallback: metrics.NewReceiverMetric(), } recv, err := utils.NewUDPReceiver(cfg) if err != nil { diff --git a/metrics/metrics.go b/metrics/metrics.go index 76972be6..c1972ee7 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -9,6 +9,22 @@ const ( ) var ( + MetricReceivedDroppedPackets = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "flow_dropped_packets_total", + Help: "Packets dropped before processing.", + Namespace: NAMESPACE, + }, + []string{"remote_ip", "local_ip", "local_port"}, + ) + MetricReceivedDroppedBytes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "flow_dropped_bytes_total", + Help: "Bytes dropped before processing.", + Namespace: NAMESPACE, + }, + []string{"remote_ip", "local_ip", "local_port"}, + ) MetricTrafficBytes = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "flow_traffic_bytes_total", @@ -114,6 +130,9 @@ var ( ) func init() { + prometheus.MustRegister(MetricReceivedDroppedPackets) + prometheus.MustRegister(MetricReceivedDroppedBytes) + prometheus.MustRegister(MetricTrafficBytes) prometheus.MustRegister(MetricTrafficPackets) prometheus.MustRegister(MetricPacketSizeSum) diff --git a/metrics/receiver.go b/metrics/receiver.go new file mode 100644 index 00000000..e4408d99 --- /dev/null +++ b/metrics/receiver.go @@ -0,0 +1,32 @@ +package metrics + +import ( + "fmt" + + "github.com/netsampler/goflow2/v2/utils" + + "github.com/prometheus/client_golang/prometheus" +) + +type ReceiverMetric struct { +} + +func NewReceiverMetric() *ReceiverMetric { + return &ReceiverMetric{} +} + +func (r *ReceiverMetric) Dropped(pkt utils.Message) { + remote := pkt.Src.Addr().Unmap().String() + localIP := pkt.Dst.Addr().Unmap().String() + + port := fmt.Sprintf("%d", pkt.Dst.Port()) + size := len(pkt.Payload) + + labels := prometheus.Labels{ + "remote_ip": remote, + "local_ip": localIP, + "local_port": port, + } + MetricReceivedDroppedPackets.With(labels).Inc() + MetricReceivedDroppedBytes.With(labels).Add(float64(size)) +} diff --git a/utils/udp.go b/utils/udp.go index 6203dfc3..a0664ed7 100644 --- a/utils/udp.go +++ b/utils/udp.go @@ -10,6 +10,10 @@ import ( reuseport "github.com/libp2p/go-reuseport" ) +type ReceiverCallback interface { + Dropped(msg Message) +} + // Callback used to decode a UDP message type DecoderFunc func(msg interface{}) error @@ -48,6 +52,8 @@ type UDPReceiver struct { workers int sockets int + + cb ReceiverCallback } type UDPReceiverConfig struct { @@ -55,6 +61,8 @@ type UDPReceiverConfig struct { Sockets int Blocking bool QueueSize int + + ReceiverCallback ReceiverCallback } func NewUDPReceiver(cfg *UDPReceiverConfig) (*UDPReceiver, error) { @@ -80,6 +88,7 @@ func NewUDPReceiver(cfg *UDPReceiverConfig) (*UDPReceiver, error) { r.workers = cfg.Workers dispatchSize = cfg.QueueSize r.blocking = cfg.Blocking + r.cb = cfg.ReceiverCallback } if dispatchSize == 0 { @@ -171,6 +180,14 @@ func (r *UDPReceiver) receive(addr string, port int, started chan bool) error { case <-r.q: return nil default: + if r.cb != nil { + r.cb.Dropped(Message{ + Src: pkt.src.AddrPort(), + Dst: pkt.dst.AddrPort(), + Payload: pkt.payload[0:pkt.size], + Received: pkt.received, + }) + } packetPool.Put(pkt) // increase counter }