Skip to content

Commit

Permalink
Code cleaning in callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
pappz committed Jan 30, 2025
1 parent d6c2aab commit 2067836
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 44 deletions.
28 changes: 9 additions & 19 deletions client/internal/peer/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,11 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
semaphore: semaphore,
}

rFns := WorkerRelayCallbacks{
OnConnReady: conn.relayConnectionIsReady,
OnDisconnected: conn.onWorkerRelayStateDisconnected,
}

wFns := WorkerICECallbacks{
OnConnReady: conn.iCEConnectionIsReady,
OnStatusChanged: conn.onWorkerICEStateDisconnected,
}

ctrl := isController(config)
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager, rFns)
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager)

relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally, wFns)
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -304,7 +294,7 @@ func (conn *Conn) GetKey() string {
}

// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
conn.mu.Lock()
defer conn.mu.Unlock()

Expand Down Expand Up @@ -376,15 +366,15 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
}

// todo review to make sense to handle connecting and disconnected status also?
func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
func (conn *Conn) onICEStateDisconnected() {
conn.mu.Lock()
defer conn.mu.Unlock()

if conn.ctx.Err() != nil {
return
}

conn.log.Tracef("ICE connection state changed to %s", newState)
conn.log.Tracef("ICE connection state changed to disconnected")

if conn.wgProxyICE != nil {
if err := conn.wgProxyICE.CloseConn(); err != nil {
Expand All @@ -404,8 +394,8 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
conn.currentConnPriority = connPriorityRelay
}

changed := conn.statusICE.Get() != newState && newState != StatusConnecting
conn.statusICE.Set(newState)
changed := conn.statusICE.Get() != StatusDisconnected
conn.statusICE.Set(StatusDisconnected)

conn.guard.SetICEConnDisconnected(changed)

Expand All @@ -422,7 +412,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
}
}

func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
conn.mu.Lock()
defer conn.mu.Unlock()

Expand Down Expand Up @@ -474,7 +464,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
}

func (conn *Conn) onWorkerRelayStateDisconnected() {
func (conn *Conn) onRelayDisconnected() {
conn.mu.Lock()
defer conn.mu.Unlock()

Expand Down
15 changes: 5 additions & 10 deletions client/internal/peer/worker_ice.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,15 @@ type ICEConnInfo struct {
RelayedOnLocal bool
}

type WorkerICECallbacks struct {
OnConnReady func(ConnPriority, ICEConnInfo)
OnStatusChanged func(ConnStatus)
}

type WorkerICE struct {
ctx context.Context
log *log.Entry
config ConnConfig
conn *Conn
signaler *Signaler
iFaceDiscover stdnet.ExternalIFaceDiscover
statusRecorder *Status
hasRelayOnLocally bool
conn WorkerICECallbacks

agent *ice.Agent
muxAgent sync.Mutex
Expand All @@ -60,16 +55,16 @@ type WorkerICE struct {
lastKnownState ice.ConnectionState
}

func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool, callBacks WorkerICECallbacks) (*WorkerICE, error) {
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *Conn, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) {
w := &WorkerICE{
ctx: ctx,
log: log,
config: config,
conn: conn,
signaler: signaler,
iFaceDiscover: ifaceDiscover,
statusRecorder: statusRecorder,
hasRelayOnLocally: hasRelayOnLocally,
conn: callBacks,
}

localUfrag, localPwd, err := icemaker.GenerateICECredentials()
Expand Down Expand Up @@ -155,7 +150,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
RelayedOnLocal: isRelayCandidate(pair.Local),
}
w.log.Debugf("on ICE conn read to use ready")
go w.conn.OnConnReady(selectedPriority(pair), ci)
go w.conn.onICEConnectionIsReady(selectedPriority(pair), ci)
}

// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
Expand Down Expand Up @@ -220,7 +215,7 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected:
if w.lastKnownState != ice.ConnectionStateDisconnected {
w.lastKnownState = ice.ConnectionStateDisconnected
w.conn.OnStatusChanged(StatusDisconnected)
w.conn.onICEStateDisconnected()
}
w.closeAgent(agentCancel)
default:
Expand Down
25 changes: 10 additions & 15 deletions client/internal/peer/worker_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@ type RelayConnInfo struct {
rosenpassAddr string
}

type WorkerRelayCallbacks struct {
OnConnReady func(RelayConnInfo)
OnDisconnected func()
}

type WorkerRelay struct {
log *log.Entry
isController bool
config ConnConfig
conn *Conn
relayManager relayClient.ManagerService
callBacks WorkerRelayCallbacks

relayedConn net.Conn
relayLock sync.Mutex
Expand All @@ -38,13 +33,13 @@ type WorkerRelay struct {
wgWatcher *WGWatcher
}

func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay {
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager relayClient.ManagerService) *WorkerRelay {
r := &WorkerRelay{
log: log,
isController: ctrl,
config: config,
conn: conn,
relayManager: relayManager,
callBacks: callbacks,
wgWatcher: NewWGWatcher(log, config.WgConfig.WgInterface, config.Key),
}
return r
Expand Down Expand Up @@ -81,23 +76,23 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
w.relayedConn = relayedConn
w.relayLock.Unlock()

err = w.relayManager.AddCloseListener(srv, w.onRelayMGDisconnected)
err = w.relayManager.AddCloseListener(srv, w.onRelayClientDisconnected)
if err != nil {
log.Errorf("failed to add close listener: %s", err)
_ = relayedConn.Close()
return
}

w.log.Debugf("peer conn opened via Relay: %s", srv)
go w.callBacks.OnConnReady(RelayConnInfo{
go w.conn.onRelayConnectionIsReady(RelayConnInfo{
relayedConn: relayedConn,
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
})
}

func (w *WorkerRelay) EnableWgWatcher(ctx context.Context) {
w.wgWatcher.EnableWgWatcher(ctx, w.disconnected)
w.wgWatcher.EnableWgWatcher(ctx, w.onWGDisconnected)
}

func (w *WorkerRelay) DisableWgWatcher() {
Expand Down Expand Up @@ -128,12 +123,12 @@ func (w *WorkerRelay) CloseConn() {
}
}

func (w *WorkerRelay) disconnected() {
func (w *WorkerRelay) onWGDisconnected() {
w.relayLock.Lock()
_ = w.relayedConn.Close()
w.relayLock.Unlock()

w.callBacks.OnDisconnected()
w.conn.onRelayDisconnected()
}

func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
Expand All @@ -150,7 +145,7 @@ func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress st
return remoteRelayAddress
}

func (w *WorkerRelay) onRelayMGDisconnected() {
func (w *WorkerRelay) onRelayClientDisconnected() {
w.wgWatcher.DisableWgWatcher()
go w.callBacks.OnDisconnected()
go w.conn.onRelayDisconnected()
}

0 comments on commit 2067836

Please sign in to comment.