From 81df2852562604f2c6cdde2dc00f146649745b26 Mon Sep 17 00:00:00 2001 From: Ashish Kumar Singh Date: Wed, 13 Dec 2023 10:09:04 +0000 Subject: [PATCH 1/2] feat: only check address at ice connecting stage --- udp_muxed_conn.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/udp_muxed_conn.go b/udp_muxed_conn.go index e69c307..843e779 100644 --- a/udp_muxed_conn.go +++ b/udp_muxed_conn.go @@ -8,12 +8,17 @@ import ( "io" "net" "sync" + "sync/atomic" "time" "github.com/pion/logging" "github.com/pion/transport/v3/packetio" ) +const ( + iceConnectedTimeout = 25 * time.Second +) + type udpMuxedConnParams struct { Mux *UDPMuxDefault AddrPool *sync.Pool @@ -33,6 +38,9 @@ type udpMuxedConn struct { closedChan chan struct{} closeOnce sync.Once mu sync.Mutex + + startAt time.Time + iceConnected atomic.Bool } func newUDPMuxedConn(params *udpMuxedConnParams) *udpMuxedConn { @@ -40,6 +48,7 @@ func newUDPMuxedConn(params *udpMuxedConnParams) *udpMuxedConn { params: params, buf: packetio.NewBuffer(), closedChan: make(chan struct{}), + startAt: time.Now(), } return p @@ -80,10 +89,18 @@ func (c *udpMuxedConn) WriteTo(buf []byte, rAddr net.Addr) (n int, err error) { if c.isClosed() { return 0, io.ErrClosedPipe } - // Each time we write to a new address, we'll register it with the mux - addr := rAddr.String() - if !c.containsAddress(addr) { - c.addAddress(addr) + + // Only check the address at the ICE connecting stage to reduce the check cost + if !c.iceConnected.Load() { + if time.Since(c.startAt) > iceConnectedTimeout { + c.iceConnected.Store(true) + } else { + // Each time we write to a new address, we'll register it with the mux + addr := rAddr.String() + if !c.containsAddress(addr) { + c.addAddress(addr) + } + } } return c.params.Mux.writeTo(buf, rAddr) From ab72dae786861d146cd2706f125850220fb384c6 Mon Sep 17 00:00:00 2001 From: Ashish Kumar Singh Date: Wed, 13 Dec 2023 12:57:30 +0000 Subject: [PATCH 2/2] feat: reduce time.now calls and use atomic int instead of value --- candidate_base.go | 44 +++++++++++++++++++++++++++++++------------- candidate_test.go | 10 ++++++---- 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/candidate_base.go b/candidate_base.go index dad95d2..0e2b99b 100644 --- a/candidate_base.go +++ b/candidate_base.go @@ -18,6 +18,10 @@ import ( "github.com/pion/stun/v2" ) +var ( + startTime = time.Now() +) + type candidateBase struct { id string networkType NetworkType @@ -31,9 +35,9 @@ type candidateBase struct { resolvedAddr net.Addr - lastSent atomic.Value - lastReceived atomic.Value - conn net.PacketConn + lastSentAfter atomic.Int64 + lastReceivedAfter atomic.Int64 + conn net.PacketConn currAgent *Agent closeCh chan struct{} @@ -207,6 +211,7 @@ func (c *candidateBase) start(a *Agent, conn net.PacketConn, initializedCh <-cha c.conn = conn c.closeCh = make(chan struct{}) c.closedCh = make(chan struct{}) + startTime = time.Now() go c.recvLoop(initializedCh) } @@ -400,34 +405,47 @@ func (c *candidateBase) String() string { // LastReceived returns a time.Time indicating the last time // this candidate was received func (c *candidateBase) LastReceived() time.Time { - if lastReceived, ok := c.lastReceived.Load().(time.Time); ok { - return lastReceived + // if lastReceived, ok := c.lastReceived.Load().(time.Time); ok { + // return lastReceived + // } + // return time.Time{} + diff := c.lastReceivedAfter.Load() + if diff > 0 { + return startTime.Add(time.Duration(diff)) } + return time.Time{} } -func (c *candidateBase) setLastReceived(t time.Time) { - c.lastReceived.Store(t) +func (c *candidateBase) setLastReceived() { + c.lastReceivedAfter.Store(time.Since(startTime).Nanoseconds()) } // LastSent returns a time.Time indicating the last time // this candidate was sent func (c *candidateBase) LastSent() time.Time { - if lastSent, ok := c.lastSent.Load().(time.Time); ok { - return lastSent + // if lastSent, ok := c.lastSent.Load().(time.Time); ok { + // return lastSent + // } + // return time.Time{} + + diff := c.lastSentAfter.Load() + if diff > 0 { + return startTime.Add(time.Duration(diff)) } + return time.Time{} } -func (c *candidateBase) setLastSent(t time.Time) { - c.lastSent.Store(t) +func (c *candidateBase) setLastSent() { + c.lastSentAfter.Store(time.Since(startTime).Nanoseconds()) } func (c *candidateBase) seen(outbound bool) { if outbound { - c.setLastSent(time.Now()) + c.setLastSent() //time.Now()) } else { - c.setLastReceived(time.Now()) + c.setLastReceived() //time.Now()) } } diff --git a/candidate_test.go b/candidate_test.go index 9fb7ebc..69dd3ec 100644 --- a/candidate_test.go +++ b/candidate_test.go @@ -183,16 +183,18 @@ func TestCandidateLastSent(t *testing.T) { candidate := candidateBase{} assert.Equal(t, candidate.LastSent(), time.Time{}) now := time.Now() - candidate.setLastSent(now) - assert.Equal(t, candidate.LastSent(), now) + candidate.setLastSent() + // assert.Equal(t, candidate.LastSent(), now) + assert.WithinDuration(t, candidate.LastSent(), now, time.Millisecond) } func TestCandidateLastReceived(t *testing.T) { candidate := candidateBase{} assert.Equal(t, candidate.LastReceived(), time.Time{}) now := time.Now() - candidate.setLastReceived(now) - assert.Equal(t, candidate.LastReceived(), now) + candidate.setLastReceived() + // assert.Equal(t, candidate.LastReceived(), now) + assert.WithinDuration(t, candidate.LastReceived(), now, time.Millisecond) } func TestCandidateFoundation(t *testing.T) {