From 04b618168f83f1632c4fd8dfd077598cee87f048 Mon Sep 17 00:00:00 2001
From: lspgn <lspgn@users.noreply.github.com>
Date: Sun, 19 May 2024 10:04:12 -0700
Subject: [PATCH 1/5] fix: udp receiver not passing errors

---
 utils/udp.go | 26 +++++++++++++++++++++-----
 1 file changed, 21 insertions(+), 5 deletions(-)

diff --git a/utils/udp.go b/utils/udp.go
index a0664ed7..6cf54f15 100644
--- a/utils/udp.go
+++ b/utils/udp.go
@@ -130,10 +130,10 @@ func (r *UDPReceiver) Errors() <-chan error {
 
 func (r *UDPReceiver) receive(addr string, port int, started chan bool) error {
 	pconn, err := reuseport.ListenPacket("udp", fmt.Sprintf("%s:%d", addr, port))
-	close(started)
 	if err != nil {
 		return err
 	}
+	close(started) // indicates receiver is setup
 
 	q := make(chan bool)
 	// function to quit
@@ -243,20 +243,34 @@ func (r *UDPReceiver) decoders(workers int, decodeFunc DecoderFunc) error {
 }
 
 // Starts the UDP receiving workers
-func (r *UDPReceiver) receivers(sockets int, addr string, port int) error {
+func (r *UDPReceiver) receivers(sockets int, addr string, port int) (rErr error) {
 	for i := 0; i < sockets; i++ {
+		if rErr != nil { // do not instanciate the rest of the receivers
+			break
+		}
+
 		r.wg.Add(1)
-		started := make(chan bool)
+		started := make(chan bool) // indicates receiver setup is complete
 		go func() {
 			defer r.wg.Done()
 			if err := r.receive(addr, port, started); err != nil {
-				r.logError(&ReceiverError{err})
+				err = &ReceiverError{err}
+
+				select {
+				case <-started:
+				default: // in case the receiver is not started yet
+					rErr = err
+					close(started)
+					return
+				}
+
+				r.logError(err)
 			}
 		}()
 		<-started
 	}
 
-	return nil
+	return rErr
 }
 
 // Start UDP receivers and the processing routines
@@ -269,9 +283,11 @@ func (r *UDPReceiver) Start(addr string, port int, decodeFunc DecoderFunc) error
 	}
 
 	if err := r.decoders(r.workers, decodeFunc); err != nil {
+		r.Stop()
 		return err
 	}
 	if err := r.receivers(r.workers, addr, port); err != nil {
+		r.Stop()
 		return err
 	}
 	return nil

From 7f450b6a3fb48480065592659466d3df211bf6cb Mon Sep 17 00:00:00 2001
From: lspgn <lspgn@users.noreply.github.com>
Date: Sun, 19 May 2024 10:12:56 -0700
Subject: [PATCH 2/5] split initialization of receive function

---
 utils/udp.go | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/utils/udp.go b/utils/udp.go
index 6cf54f15..7364c32c 100644
--- a/utils/udp.go
+++ b/utils/udp.go
@@ -150,6 +150,11 @@ func (r *UDPReceiver) receive(addr string, port int, started chan bool) error {
 	if !ok {
 		return err
 	}
+
+	return r.receiveRoutine(udpconn)
+}
+
+func (r *UDPReceiver) receiveRoutine(udpconn *net.UDPConn) (err error) {
 	localAddr, _ := udpconn.LocalAddr().(*net.UDPAddr)
 
 	for {

From 9fc1d308a79a534cabe9d5669bee72e7b73279f2 Mon Sep 17 00:00:00 2001
From: lspgn <lspgn@users.noreply.github.com>
Date: Sun, 19 May 2024 10:18:23 -0700
Subject: [PATCH 3/5] fix error

---
 utils/udp.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/utils/udp.go b/utils/udp.go
index 7364c32c..ad5fc73b 100644
--- a/utils/udp.go
+++ b/utils/udp.go
@@ -148,7 +148,7 @@ func (r *UDPReceiver) receive(addr string, port int, started chan bool) error {
 
 	udpconn, ok := pconn.(*net.UDPConn)
 	if !ok {
-		return err
+		return fmt.Errorf("not a udp connection")
 	}
 
 	return r.receiveRoutine(udpconn)

From 3b13f8fa7eb21dceabf56533b486d3a6416d2dfb Mon Sep 17 00:00:00 2001
From: lspgn <lspgn@users.noreply.github.com>
Date: Sun, 19 May 2024 10:24:57 -0700
Subject: [PATCH 4/5] use sockets count

---
 utils/udp.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/utils/udp.go b/utils/udp.go
index ad5fc73b..f2e24ccf 100644
--- a/utils/udp.go
+++ b/utils/udp.go
@@ -291,7 +291,7 @@ func (r *UDPReceiver) Start(addr string, port int, decodeFunc DecoderFunc) error
 		r.Stop()
 		return err
 	}
-	if err := r.receivers(r.workers, addr, port); err != nil {
+	if err := r.receivers(r.sockets, addr, port); err != nil {
 		r.Stop()
 		return err
 	}

From 09680256ab2a48d145db7ea40b28d59ad5ad562d Mon Sep 17 00:00:00 2001
From: lspgn <lspgn@users.noreply.github.com>
Date: Sun, 19 May 2024 12:04:06 -0700
Subject: [PATCH 5/5] add brackets to IPv6

---
 utils/udp.go | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/utils/udp.go b/utils/udp.go
index f2e24ccf..7efe73d7 100644
--- a/utils/udp.go
+++ b/utils/udp.go
@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"net"
 	"net/netip"
+	"strings"
 	"sync"
 	"time"
 
@@ -129,6 +130,10 @@ func (r *UDPReceiver) Errors() <-chan error {
 }
 
 func (r *UDPReceiver) receive(addr string, port int, started chan bool) error {
+	if strings.IndexRune(addr, ':') >= 0 && strings.IndexRune(addr, '[') == -1 {
+		addr = "[" + addr + "]"
+	}
+
 	pconn, err := reuseport.ListenPacket("udp", fmt.Sprintf("%s:%d", addr, port))
 	if err != nil {
 		return err