From 04b618168f83f1632c4fd8dfd077598cee87f048 Mon Sep 17 00:00:00 2001 From: lspgn <lspgn@users.noreply.github.com> Date: Sun, 19 May 2024 10:04:12 -0700 Subject: [PATCH 1/5] fix: udp receiver not passing errors --- utils/udp.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/utils/udp.go b/utils/udp.go index a0664ed7..6cf54f15 100644 --- a/utils/udp.go +++ b/utils/udp.go @@ -130,10 +130,10 @@ func (r *UDPReceiver) Errors() <-chan error { func (r *UDPReceiver) receive(addr string, port int, started chan bool) error { pconn, err := reuseport.ListenPacket("udp", fmt.Sprintf("%s:%d", addr, port)) - close(started) if err != nil { return err } + close(started) // indicates receiver is setup q := make(chan bool) // function to quit @@ -243,20 +243,34 @@ func (r *UDPReceiver) decoders(workers int, decodeFunc DecoderFunc) error { } // Starts the UDP receiving workers -func (r *UDPReceiver) receivers(sockets int, addr string, port int) error { +func (r *UDPReceiver) receivers(sockets int, addr string, port int) (rErr error) { for i := 0; i < sockets; i++ { + if rErr != nil { // do not instanciate the rest of the receivers + break + } + r.wg.Add(1) - started := make(chan bool) + started := make(chan bool) // indicates receiver setup is complete go func() { defer r.wg.Done() if err := r.receive(addr, port, started); err != nil { - r.logError(&ReceiverError{err}) + err = &ReceiverError{err} + + select { + case <-started: + default: // in case the receiver is not started yet + rErr = err + close(started) + return + } + + r.logError(err) } }() <-started } - return nil + return rErr } // Start UDP receivers and the processing routines @@ -269,9 +283,11 @@ func (r *UDPReceiver) Start(addr string, port int, decodeFunc DecoderFunc) error } if err := r.decoders(r.workers, decodeFunc); err != nil { + r.Stop() return err } if err := r.receivers(r.workers, addr, port); err != nil { + r.Stop() return err } return nil From 7f450b6a3fb48480065592659466d3df211bf6cb Mon Sep 17 00:00:00 2001 From: lspgn <lspgn@users.noreply.github.com> Date: Sun, 19 May 2024 10:12:56 -0700 Subject: [PATCH 2/5] split initialization of receive function --- utils/udp.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/utils/udp.go b/utils/udp.go index 6cf54f15..7364c32c 100644 --- a/utils/udp.go +++ b/utils/udp.go @@ -150,6 +150,11 @@ func (r *UDPReceiver) receive(addr string, port int, started chan bool) error { if !ok { return err } + + return r.receiveRoutine(udpconn) +} + +func (r *UDPReceiver) receiveRoutine(udpconn *net.UDPConn) (err error) { localAddr, _ := udpconn.LocalAddr().(*net.UDPAddr) for { From 9fc1d308a79a534cabe9d5669bee72e7b73279f2 Mon Sep 17 00:00:00 2001 From: lspgn <lspgn@users.noreply.github.com> Date: Sun, 19 May 2024 10:18:23 -0700 Subject: [PATCH 3/5] fix error --- utils/udp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/udp.go b/utils/udp.go index 7364c32c..ad5fc73b 100644 --- a/utils/udp.go +++ b/utils/udp.go @@ -148,7 +148,7 @@ func (r *UDPReceiver) receive(addr string, port int, started chan bool) error { udpconn, ok := pconn.(*net.UDPConn) if !ok { - return err + return fmt.Errorf("not a udp connection") } return r.receiveRoutine(udpconn) From 3b13f8fa7eb21dceabf56533b486d3a6416d2dfb Mon Sep 17 00:00:00 2001 From: lspgn <lspgn@users.noreply.github.com> Date: Sun, 19 May 2024 10:24:57 -0700 Subject: [PATCH 4/5] use sockets count --- utils/udp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/udp.go b/utils/udp.go index ad5fc73b..f2e24ccf 100644 --- a/utils/udp.go +++ b/utils/udp.go @@ -291,7 +291,7 @@ func (r *UDPReceiver) Start(addr string, port int, decodeFunc DecoderFunc) error r.Stop() return err } - if err := r.receivers(r.workers, addr, port); err != nil { + if err := r.receivers(r.sockets, addr, port); err != nil { r.Stop() return err } From 09680256ab2a48d145db7ea40b28d59ad5ad562d Mon Sep 17 00:00:00 2001 From: lspgn <lspgn@users.noreply.github.com> Date: Sun, 19 May 2024 12:04:06 -0700 Subject: [PATCH 5/5] add brackets to IPv6 --- utils/udp.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/utils/udp.go b/utils/udp.go index f2e24ccf..7efe73d7 100644 --- a/utils/udp.go +++ b/utils/udp.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "net/netip" + "strings" "sync" "time" @@ -129,6 +130,10 @@ func (r *UDPReceiver) Errors() <-chan error { } func (r *UDPReceiver) receive(addr string, port int, started chan bool) error { + if strings.IndexRune(addr, ':') >= 0 && strings.IndexRune(addr, '[') == -1 { + addr = "[" + addr + "]" + } + pconn, err := reuseport.ListenPacket("udp", fmt.Sprintf("%s:%d", addr, port)) if err != nil { return err