Skip to content

Commit

Permalink
face: fix multicast udp transport
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Dec 14, 2024
1 parent c91acbc commit 541bb50
Showing 1 changed file with 78 additions and 57 deletions.
135 changes: 78 additions & 57 deletions face/multicast-udp-transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ package face

import (
"errors"
"fmt"
"net"
"strconv"
"strings"

"github.com/named-data/YaNFD/core"
defn "github.com/named-data/YaNFD/defn"
Expand All @@ -35,24 +36,21 @@ func MakeMulticastUDPTransport(localURI *defn.URI) (*MulticastUDPTransport, erro
return nil, core.ErrNotCanonical
}

t := new(MulticastUDPTransport)
// Get local interface
localIf, err := InterfaceByIP(net.ParseIP(localURI.PathHost()))
if err != nil || localIf == nil {
core.LogError(t, "Unable to get interface for local URI ", localURI, ": ", err)
}

// Get remote Uri
var remote string
if localURI.Scheme() == "udp4" {
t.makeTransportBase(
defn.DecodeURIString("udp4://"+udp4MulticastAddress+":"+strconv.FormatUint(uint64(UDPMulticastPort), 10)),
localURI, PersistencyPermanent, defn.NonLocal, defn.MultiAccess, defn.MaxNDNPacketSize)
remote = fmt.Sprintf("udp4://%s:%d", udp4MulticastAddress, UDPMulticastPort)
} else if localURI.Scheme() == "udp6" {
t.makeTransportBase(
defn.DecodeURIString("udp6://["+udp6MulticastAddress+"%"+localIf.Name+"]:"+
strconv.FormatUint(uint64(UDPMulticastPort), 10)),
localURI, PersistencyPermanent, defn.NonLocal, defn.MultiAccess, defn.MaxNDNPacketSize)
remote = fmt.Sprintf("udp6://[%s]:%d", udp6MulticastAddress, UDPMulticastPort)
}
t.scope = defn.NonLocal

// Create transport
t := &MulticastUDPTransport{}
t.makeTransportBase(
defn.DecodeURIString(remote),
localURI, PersistencyPermanent,
defn.NonLocal, defn.MultiAccess,
defn.MaxNDNPacketSize)

// Format group and local addresses
t.groupAddr.IP = net.ParseIP(t.remoteURI.PathHost())
Expand All @@ -64,32 +62,51 @@ func MakeMulticastUDPTransport(localURI *defn.URI) (*MulticastUDPTransport, erro

// Configure dialer so we can allow address reuse
t.dialer = &net.Dialer{LocalAddr: &t.localAddr, Control: impl.SyscallReuseAddr}
t.running.Store(true)

// Create send connection
sendConn, err := t.dialer.Dial(t.remoteURI.Scheme(), t.groupAddr.String())
err := t.connectSend()
if err != nil {
return nil, errors.New("Unable to create send connection to group address: " + err.Error())
t.Close()
return nil, err
}

t.sendConn = sendConn.(*net.UDPConn)
t.running.Store(true)

// Create receive connection
t.recvConn, err = net.ListenMulticastUDP(t.remoteURI.Scheme(), localIf, &t.groupAddr)
err = t.connectRecv()
if err != nil {
return nil, errors.New("Unable to create receive connection for group address on " +
localIf.Name + ": " + err.Error())
t.Close()
return nil, err
}

return t, nil
}

func (t *MulticastUDPTransport) connectSend() error {
sendConn, err := t.dialer.Dial(t.remoteURI.Scheme(), t.groupAddr.String())
if err != nil {
return errors.New("Unable to create send connection to group address: " + err.Error())
}
t.sendConn = sendConn.(*net.UDPConn)
return nil
}

func (t *MulticastUDPTransport) connectRecv() error {
localIf, err := InterfaceByIP(net.ParseIP(t.localURI.PathHost()))
if err != nil || localIf == nil {
return fmt.Errorf("unable to get interface for local URI %s: %s", t.localURI, err.Error())
}

t.recvConn, err = net.ListenMulticastUDP(t.remoteURI.Scheme(), localIf, &t.groupAddr)
if err != nil {
return fmt.Errorf("unable to create receive conn for group %s: %s", localIf.Name, err.Error())
}
return nil
}

func (t *MulticastUDPTransport) String() string {
return "MulticastUDPTransport, FaceID=" + strconv.FormatUint(t.faceID, 10) +
", RemoteURI=" + t.remoteURI.String() + ", LocalURI=" + t.localURI.String()
return fmt.Sprintf("MulticastUDPTransport, FaceID=%d, RemoteURI=%s, LocalURI=%s", t.faceID, t.remoteURI, t.localURI)
}

// SetPersistency changes the persistency of the face.
func (t *MulticastUDPTransport) SetPersistency(persistency Persistency) bool {
if persistency == t.persistency {
return true
Expand All @@ -103,7 +120,6 @@ func (t *MulticastUDPTransport) SetPersistency(persistency Persistency) bool {
return false
}

// GetSendQueueSize returns the current size of the send queue.
func (t *MulticastUDPTransport) GetSendQueueSize() uint64 {
rawConn, err := t.recvConn.SyscallConn()
if err != nil {
Expand All @@ -113,59 +129,64 @@ func (t *MulticastUDPTransport) GetSendQueueSize() uint64 {
}

func (t *MulticastUDPTransport) sendFrame(frame []byte) {
if !t.running.Load() {
return
}

if len(frame) > t.MTU() {
core.LogWarn(t, "Attempted to send frame larger than MTU - DROP")
return
}

core.LogDebug(t, "Sending frame of size ", len(frame))
_, err := t.sendConn.Write(frame)
if err != nil {
core.LogWarn(t, "Unable to send on socket - DROP")
t.sendConn.Close()
sendConn, err := t.dialer.Dial(t.remoteURI.Scheme(), t.groupAddr.String())
if err != nil {
core.LogError(t, "Unable to create send connection to group address: ", err)

// Re-create the socket if connection is still running
if t.running.Load() {
err = t.connectSend()
if err != nil {
core.LogError(t, "Unable to re-create send connection: ", err)
return
}
}
t.sendConn = sendConn.(*net.UDPConn)
}

t.nOutBytes += uint64(len(frame))
}

func (t *MulticastUDPTransport) runReceive() {
recvBuf := make([]byte, defn.MaxNDNPacketSize)
defer t.Close()

for {
readSize, remoteAddr, err := t.recvConn.ReadFromUDP(recvBuf)
err := readTlvStream(t.recvConn, func(b []byte) {
t.nInBytes += uint64(len(b))
t.linkService.handleIncomingFrame(b)
}, func(err error) bool {
// Same as unicast UDP transport
return strings.Contains(err.Error(), "connection refused")
})
if err != nil {
// Re-create the socket
localIf, err := InterfaceByIP(net.ParseIP(t.localURI.PathHost()))
if err != nil || localIf == nil {
core.LogError(t, "Unable to get interface for local URI ", t.localURI, ": ", err)
core.LogWarn(t, "Unable to read from socket (", err, ") - Face DOWN")

// Re-create the socket if connection is still running
if t.running.Load() {
err = t.connectRecv()
if err != nil {
core.LogError(t, "Unable to re-create receive connection: ", err)
return
}
}
t.recvConn, _ = net.ListenMulticastUDP(t.remoteURI.Scheme(), localIf, &t.groupAddr)

}

core.LogTrace(t, "Receive of size ", readSize, " from ", remoteAddr)
t.nInBytes += uint64(readSize)

if readSize > defn.MaxNDNPacketSize {
core.LogWarn(t, "Received too much data without valid TLV block - DROP")
}
if readSize <= 0 {
core.LogInfo(t, "Socket close.")
continue
}

// Packet was successfully received, send up to link service
t.linkService.handleIncomingFrame(recvBuf[:readSize])
}
}

func (t *MulticastUDPTransport) Close() {
if t.running.Swap(false) {
if t.sendConn != nil && t.recvConn != nil {
if t.sendConn != nil {
t.sendConn.Close()
}
if t.recvConn != nil {
t.recvConn.Close()
}
}
Expand Down

0 comments on commit 541bb50

Please sign in to comment.