Skip to content

Commit

Permalink
refactor: link service names
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Dec 10, 2024
1 parent bac8924 commit 7cac9d7
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 84 deletions.
84 changes: 49 additions & 35 deletions face/link-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,43 +225,57 @@ func (l *linkServiceBase) SendPacket(packet *defn.Pkt) {
}
}

func (l *linkServiceBase) dispatchIncomingPacket(netPacket *defn.Pkt) {
// Hand off to network layer by dispatching to appropriate forwarding thread(s)
switch {
case netPacket.L3.Interest != nil:
netPacket.Name = netPacket.L3.Interest.NameV
thread := fw.HashNameToFwThread(netPacket.L3.Interest.NameV)
core.LogTrace(l, "Dispatched Interest to thread ", thread)
dispatch.GetFWThread(thread).QueueInterest(netPacket)
case netPacket.L3.Data != nil:
netPacket.Name = netPacket.L3.Data.NameV
if len(netPacket.PitToken) == 6 {
// Decode PitToken. If it's for us, it's a uint16 + uint32.
pitTokenThread := binary.BigEndian.Uint16(netPacket.PitToken)
fwThread := dispatch.GetFWThread(int(pitTokenThread))
if fwThread == nil { // invalid PIT token present
core.LogError(l, "Invalid PIT token attached to Data packet - DROP")
break
}

core.LogTrace(l, "Dispatched Data to thread ", pitTokenThread)
fwThread.QueueData(netPacket)
} else if l.Scope() == defn.Local {
// Only if from a local face (and therefore from a producer), dispatch to threads matching every prefix.
// We need to do this because producers do not attach PIT tokens to their data packets.
for _, thread := range fw.HashNameToAllPrefixFwThreads(netPacket.L3.Data.NameV) {
core.LogTrace(l, "Prefix dispatched local-origin Data packet to thread ", thread)
dispatch.GetFWThread(thread).QueueData(netPacket)
}
} else {
// Only exact-match for now (no CanBePrefix)
thread := fw.HashNameToFwThread(netPacket.L3.Data.NameV)
core.LogTrace(l, "Dispatched Data to thread ", thread)
dispatch.GetFWThread(thread).QueueData(netPacket)
func (l *linkServiceBase) dispatchInterest(pkt *defn.Pkt) {
if pkt.L3.Interest == nil {
panic("dispatchInterest called with packet that is not Interest")
}

// Store name for easy access
pkt.Name = pkt.L3.Interest.NameV

// Hash name to thread
thread := fw.HashNameToFwThread(pkt.Name)
core.LogTrace(l, "Dispatched Interest to thread ", thread)
dispatch.GetFWThread(thread).QueueInterest(pkt)
}

func (l *linkServiceBase) dispatchData(pkt *defn.Pkt) {
if pkt.L3.Data == nil {
panic("dispatchData called with packet that is not Data")
}

// Store name for easy access
pkt.Name = pkt.L3.Data.NameV

// Decode PitToken. If it's for us, it's a uint16 + uint32.
if len(pkt.PitToken) == 6 {
thread := binary.BigEndian.Uint16(pkt.PitToken)
fwThread := dispatch.GetFWThread(int(thread))
if fwThread == nil {
core.LogError(l, "Invalid PIT token attached to Data packet - DROP")
return
}
default:
core.LogError(l, "Cannot dispatch packet of unknown type ")

core.LogTrace(l, "Dispatched Data to thread ", thread)
fwThread.QueueData(pkt)
return
}

// Only if from a local face (and therefore from a producer), dispatch to
// threads matching every prefix. We need to do this because producers do
// not attach PIT tokens to their data packets.
if l.Scope() == defn.Local {
for _, thread := range fw.HashNameToAllPrefixFwThreads(pkt.Name) {
core.LogTrace(l, "Prefix dispatched local-origin Data packet to thread ", thread)
dispatch.GetFWThread(thread).QueueData(pkt)
}
return
}

// Only exact-match for now (no CanBePrefix)
thread := fw.HashNameToFwThread(pkt.Name)
core.LogTrace(l, "Dispatched Data to thread ", thread)
dispatch.GetFWThread(thread).QueueData(pkt)
}

func (l *linkServiceBase) Close() {
Expand Down
109 changes: 60 additions & 49 deletions face/ndnlp-link-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,28 +143,28 @@ func (l *NDNLPLinkService) Run(optNewFrame []byte) {
<-l.hasImplQuit
l.HasQuit <- true
}
func sendPacket(l *NDNLPLinkService, netPacket *defn.Pkt) {
wire := netPacket.Raw
func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) {
wire := pkt.Raw

if l.transport.State() != defn.Up {
core.LogWarn(l, "Attempted to send frame on down face - DROP and stop LinkService")
l.hasImplQuit <- true
return
}
// Counters
if netPacket.L3.Interest != nil {
if pkt.L3.Interest != nil {
l.nOutInterests++
} else if netPacket.L3.Data != nil {
} else if pkt.L3.Data != nil {
l.nOutData++
}

now := time.Now()

effectiveMtu := l.transport.MTU() - l.headerOverhead
if netPacket.PitToken != nil {
if pkt.PitToken != nil {
effectiveMtu -= pitTokenOverhead
}
if netPacket.CongestionMark != nil {
if pkt.CongestionMark != nil {
effectiveMtu -= congestionMarkOverhead
}

Expand Down Expand Up @@ -234,18 +234,18 @@ func sendPacket(l *NDNLPLinkService, netPacket *defn.Pkt) {
}

// PIT tokens
if len(netPacket.PitToken) > 0 {
fragments[0].PitToken = netPacket.PitToken
if len(pkt.PitToken) > 0 {
fragments[0].PitToken = pkt.PitToken
}

// Incoming face indication
if l.options.IsIncomingFaceIndicationEnabled && netPacket.IncomingFaceID != nil {
fragments[0].IncomingFaceId = netPacket.IncomingFaceID
if l.options.IsIncomingFaceIndicationEnabled && pkt.IncomingFaceID != nil {
fragments[0].IncomingFaceId = pkt.IncomingFaceID
}

// Congestion marking
if netPacket.CongestionMark != nil {
fragments[0].CongestionMark = netPacket.CongestionMark
if pkt.CongestionMark != nil {
fragments[0].CongestionMark = pkt.CongestionMark
}

// Send fragment(s)
Expand Down Expand Up @@ -278,36 +278,40 @@ func (l *NDNLPLinkService) runSend() {

for {
select {
case netPacket := <-l.sendQueue:
sendPacket(l, netPacket)
case pkt := <-l.sendQueue:
sendPacket(l, pkt)
case <-l.hasTransportQuit:
l.hasImplQuit <- true
return
}
}
}

func (l *NDNLPLinkService) handleIncomingFrame(rawFrame []byte) {
func (l *NDNLPLinkService) handleIncomingFrame(frame []byte) {
// We have to copy so receive transport buffer can be reused
wire := make([]byte, len(rawFrame))
copy(wire, rawFrame)
wire := make([]byte, len(frame))
copy(wire, frame)

// All incoming frames come through a link service
// Attempt to decode buffer into LpPacket
netPacket := &defn.Pkt{
pkt := &defn.Pkt{
IncomingFaceID: utils.IdPtr(l.faceID),
}
packet, _, e := spec.ReadPacket(enc.NewBufferReader(wire))
if e != nil {
core.LogError(l, e)

L2, _, err := spec.ReadPacket(enc.NewBufferReader(wire))
if err != nil {
core.LogError(l, err)
return
}
if packet.LpPacket == nil {

if L2.LpPacket == nil {
// Bare Data or Interest packet
netPacket.Raw = wire
netPacket.L3 = packet
pkt.Raw = wire
pkt.L3 = L2
} else {
fragment := packet.LpPacket.Fragment
// NDNLPv2 frame
LP := L2.LpPacket
fragment := LP.Fragment

// If there is no fragment, then IDLE packet, drop.
if len(fragment) == 0 {
Expand All @@ -317,52 +321,52 @@ func (l *NDNLPLinkService) handleIncomingFrame(rawFrame []byte) {

// Reassembly
if l.options.IsReassemblyEnabled {
if packet.LpPacket.Sequence == nil {
if LP.Sequence == nil {
core.LogInfo(l, "Received NDNLPv2 frame without Sequence but reassembly requires it - DROP")
return
}

fragIndex := uint64(0)
if packet.LpPacket.FragIndex != nil {
fragIndex = *packet.LpPacket.FragIndex
if LP.FragIndex != nil {
fragIndex = *LP.FragIndex
}
fragCount := uint64(1)
if packet.LpPacket.FragCount != nil {
fragCount = *packet.LpPacket.FragCount
if LP.FragCount != nil {
fragCount = *LP.FragCount
}
baseSequence := *packet.LpPacket.Sequence - fragIndex
baseSequence := *LP.Sequence - fragIndex

core.LogTrace(l, "Received fragment ", fragIndex, " of ", fragCount, " for ", baseSequence)
if fragIndex == 0 && fragCount == 1 {
// Bypass reassembly since only one fragment
} else {
fragment = l.reassemblePacket(packet.LpPacket, baseSequence, fragIndex, fragCount)
fragment = l.reassemblePacket(LP, baseSequence, fragIndex, fragCount)
if fragment == nil {
// Nothing more to be done, so return
return
}
}
} else if packet.LpPacket.FragCount != nil || packet.LpPacket.FragIndex != nil {
} else if LP.FragCount != nil || LP.FragIndex != nil {
core.LogWarn(l, "Received NDNLPv2 frame containing fragmentation fields but reassembly disabled - DROP")
return
}

// Congestion mark
netPacket.CongestionMark = packet.LpPacket.CongestionMark
pkt.CongestionMark = LP.CongestionMark

// Consumer-controlled forwarding (NextHopFaceId)
if l.options.IsConsumerControlledForwardingEnabled && packet.LpPacket.NextHopFaceId != nil {
netPacket.NextHopFaceID = packet.LpPacket.NextHopFaceId
if l.options.IsConsumerControlledForwardingEnabled && LP.NextHopFaceId != nil {
pkt.NextHopFaceID = LP.NextHopFaceId
}

// Local cache policy
if l.options.IsLocalCachePolicyEnabled && packet.LpPacket.CachePolicy != nil {
netPacket.CachePolicy = &packet.LpPacket.CachePolicy.CachePolicyType
if l.options.IsLocalCachePolicyEnabled && LP.CachePolicy != nil {
pkt.CachePolicy = &LP.CachePolicy.CachePolicyType
}

// PIT Token
if len(packet.LpPacket.PitToken) > 0 {
netPacket.PitToken = packet.LpPacket.PitToken
if len(LP.PitToken) > 0 {
pkt.PitToken = LP.PitToken
}

// Copy fragment to wire buffer
Expand All @@ -372,24 +376,31 @@ func (l *NDNLPLinkService) handleIncomingFrame(rawFrame []byte) {
}

// Parse inner packet in place
packet, _, e = spec.ReadPacket(enc.NewBufferReader(wire))
if e != nil {
L3, _, err := spec.ReadPacket(enc.NewBufferReader(wire))
if err != nil {
return
}
netPacket.Raw = wire
netPacket.L3 = packet
pkt.Raw = wire
pkt.L3 = L3
}
// Counters
if netPacket.L3.Interest != nil {

// Dispatch and update counters
if pkt.L3.Interest != nil {
l.nInInterests++
} else if netPacket.L3.Data != nil {
l.dispatchInterest(pkt)
} else if pkt.L3.Data != nil {
l.nInData++
l.dispatchData(pkt)
} else {
core.LogError(l, "Attempted dispatch packet of unknown type")
}
l.dispatchIncomingPacket(netPacket)
}

func (l *NDNLPLinkService) reassemblePacket(
frame *spec.LpPacket, baseSequence uint64, fragIndex uint64, fragCount uint64,
frame *spec.LpPacket,
baseSequence uint64,
fragIndex uint64,
fragCount uint64,
) enc.Wire {
_, hasSequence := l.partialMessageStore[baseSequence]
if !hasSequence {
Expand Down

0 comments on commit 7cac9d7

Please sign in to comment.