Skip to content

Commit

Permalink
Merge pull request #1 from dyte-in/feat/general-optimizations
Browse files Browse the repository at this point in the history
feat: general optimizations
  • Loading branch information
AshishKumar4 authored Dec 13, 2023
2 parents 5139807 + ab72dae commit 2c44424
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 21 deletions.
44 changes: 31 additions & 13 deletions candidate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"github.com/pion/stun/v2"
)

var (
startTime = time.Now()
)

type candidateBase struct {
id string
networkType NetworkType
Expand All @@ -31,9 +35,9 @@ type candidateBase struct {

resolvedAddr net.Addr

lastSent atomic.Value
lastReceived atomic.Value
conn net.PacketConn
lastSentAfter atomic.Int64
lastReceivedAfter atomic.Int64
conn net.PacketConn

currAgent *Agent
closeCh chan struct{}
Expand Down Expand Up @@ -207,6 +211,7 @@ func (c *candidateBase) start(a *Agent, conn net.PacketConn, initializedCh <-cha
c.conn = conn
c.closeCh = make(chan struct{})
c.closedCh = make(chan struct{})
startTime = time.Now()

go c.recvLoop(initializedCh)
}
Expand Down Expand Up @@ -400,34 +405,47 @@ func (c *candidateBase) String() string {
// LastReceived returns a time.Time indicating the last time
// this candidate was received
func (c *candidateBase) LastReceived() time.Time {
if lastReceived, ok := c.lastReceived.Load().(time.Time); ok {
return lastReceived
// if lastReceived, ok := c.lastReceived.Load().(time.Time); ok {
// return lastReceived
// }
// return time.Time{}
diff := c.lastReceivedAfter.Load()
if diff > 0 {
return startTime.Add(time.Duration(diff))
}

return time.Time{}
}

func (c *candidateBase) setLastReceived(t time.Time) {
c.lastReceived.Store(t)
func (c *candidateBase) setLastReceived() {
c.lastReceivedAfter.Store(time.Since(startTime).Nanoseconds())
}

// LastSent returns a time.Time indicating the last time
// this candidate was sent
func (c *candidateBase) LastSent() time.Time {
if lastSent, ok := c.lastSent.Load().(time.Time); ok {
return lastSent
// if lastSent, ok := c.lastSent.Load().(time.Time); ok {
// return lastSent
// }
// return time.Time{}

diff := c.lastSentAfter.Load()
if diff > 0 {
return startTime.Add(time.Duration(diff))
}

return time.Time{}
}

func (c *candidateBase) setLastSent(t time.Time) {
c.lastSent.Store(t)
func (c *candidateBase) setLastSent() {
c.lastSentAfter.Store(time.Since(startTime).Nanoseconds())
}

func (c *candidateBase) seen(outbound bool) {
if outbound {
c.setLastSent(time.Now())
c.setLastSent() //time.Now())
} else {
c.setLastReceived(time.Now())
c.setLastReceived() //time.Now())
}
}

Expand Down
10 changes: 6 additions & 4 deletions candidate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,18 @@ func TestCandidateLastSent(t *testing.T) {
candidate := candidateBase{}
assert.Equal(t, candidate.LastSent(), time.Time{})
now := time.Now()
candidate.setLastSent(now)
assert.Equal(t, candidate.LastSent(), now)
candidate.setLastSent()
// assert.Equal(t, candidate.LastSent(), now)
assert.WithinDuration(t, candidate.LastSent(), now, time.Millisecond)
}

func TestCandidateLastReceived(t *testing.T) {
candidate := candidateBase{}
assert.Equal(t, candidate.LastReceived(), time.Time{})
now := time.Now()
candidate.setLastReceived(now)
assert.Equal(t, candidate.LastReceived(), now)
candidate.setLastReceived()
// assert.Equal(t, candidate.LastReceived(), now)
assert.WithinDuration(t, candidate.LastReceived(), now, time.Millisecond)
}

func TestCandidateFoundation(t *testing.T) {
Expand Down
25 changes: 21 additions & 4 deletions udp_muxed_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/pion/logging"
"github.com/pion/transport/v3/packetio"
)

const (
iceConnectedTimeout = 25 * time.Second
)

type udpMuxedConnParams struct {
Mux *UDPMuxDefault
AddrPool *sync.Pool
Expand All @@ -33,13 +38,17 @@ type udpMuxedConn struct {
closedChan chan struct{}
closeOnce sync.Once
mu sync.Mutex

startAt time.Time
iceConnected atomic.Bool
}

func newUDPMuxedConn(params *udpMuxedConnParams) *udpMuxedConn {
p := &udpMuxedConn{
params: params,
buf: packetio.NewBuffer(),
closedChan: make(chan struct{}),
startAt: time.Now(),
}

return p
Expand Down Expand Up @@ -80,10 +89,18 @@ func (c *udpMuxedConn) WriteTo(buf []byte, rAddr net.Addr) (n int, err error) {
if c.isClosed() {
return 0, io.ErrClosedPipe
}
// Each time we write to a new address, we'll register it with the mux
addr := rAddr.String()
if !c.containsAddress(addr) {
c.addAddress(addr)

// Only check the address at the ICE connecting stage to reduce the check cost
if !c.iceConnected.Load() {
if time.Since(c.startAt) > iceConnectedTimeout {
c.iceConnected.Store(true)
} else {
// Each time we write to a new address, we'll register it with the mux
addr := rAddr.String()
if !c.containsAddress(addr) {
c.addAddress(addr)
}
}
}

return c.params.Mux.writeTo(buf, rAddr)
Expand Down

0 comments on commit 2c44424

Please sign in to comment.