Skip to content

Commit

Permalink
face: fix PitToken race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Dec 17, 2024
1 parent 895d1e2 commit 4b3e306
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 53 deletions.
8 changes: 7 additions & 1 deletion dispatch/face.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ type Face interface {

State() defn.State

SendPacket(packet *defn.Pkt)
SendPacket(out OutPkt)
}

type OutPkt struct {
Pkt *defn.Pkt
PitToken []byte
InFace *uint64
}

// FaceDispatch is used to allow forwarding to interact with faces without a circular dependency issue.
Expand Down
10 changes: 5 additions & 5 deletions face/link-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type LinkService interface {
Run(initial []byte)

// Add a packet to the send queue for this link service
SendPacket(packet *defn.Pkt)
SendPacket(out dispatch.OutPkt)
// Synchronously handle an incoming frame and dispatch to fw
handleIncomingFrame(frame []byte)

Expand All @@ -63,7 +63,7 @@ type linkServiceBase struct {
faceID uint64
transport transport
stopped chan bool
sendQueue chan *defn.Pkt
sendQueue chan dispatch.OutPkt

// Counters
nInInterests uint64
Expand Down Expand Up @@ -93,7 +93,7 @@ func (l *linkServiceBase) SetFaceID(faceID uint64) {

func (l *linkServiceBase) makeLinkServiceBase() {
l.stopped = make(chan bool)
l.sendQueue = make(chan *defn.Pkt, faceQueueSize)
l.sendQueue = make(chan dispatch.OutPkt, faceQueueSize)
}

//
Expand Down Expand Up @@ -207,9 +207,9 @@ func (l *linkServiceBase) Close() {
//

// SendPacket adds a packet to the send queue for this link service
func (l *linkServiceBase) SendPacket(packet *defn.Pkt) {
func (l *linkServiceBase) SendPacket(out dispatch.OutPkt) {
select {
case l.sendQueue <- packet:
case l.sendQueue <- out:
// Packet queued successfully
core.LogTrace(l, "Queued packet for Link Service")
default:
Expand Down
35 changes: 19 additions & 16 deletions face/ndnlp-link-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/named-data/YaNFD/core"
defn "github.com/named-data/YaNFD/defn"
"github.com/named-data/YaNFD/dispatch"
enc "github.com/zjkmxy/go-ndn/pkg/encoding"
spec "github.com/zjkmxy/go-ndn/pkg/ndn/spec_2022"
"github.com/zjkmxy/go-ndn/pkg/utils"
Expand Down Expand Up @@ -171,7 +172,8 @@ func (l *NDNLPLinkService) runSend() {
}
}

func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) {
func sendPacket(l *NDNLPLinkService, out dispatch.OutPkt) {
pkt := out.Pkt
wire := pkt.Raw

// Counters
Expand Down Expand Up @@ -239,14 +241,15 @@ func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) {
}

// Congestion marking
congestionMark := pkt.CongestionMark // from upstream
if congestionMarking {
// GetSendQueueSize is expensive, so only check every 1/2 of the threshold
// and only if we can mark congestion for this particular packet
if l.congestionCheck > l.options.DefaultCongestionThresholdBytes {
if now.After(l.lastTimeCongestionMarked.Add(l.options.BaseCongestionMarkingInterval)) &&
l.transport.GetSendQueueSize() > l.options.DefaultCongestionThresholdBytes {
core.LogWarn(l, "Marking congestion")
fragments[0].CongestionMark = utils.IdPtr[uint64](1)
congestionMark = utils.IdPtr[uint64](1) // ours
l.lastTimeCongestionMarked = now
}

Expand All @@ -256,23 +259,23 @@ func sendPacket(l *NDNLPLinkService, pkt *defn.Pkt) {
l.congestionCheck += uint64(len(wire)) // approx
}

// PIT tokens
if len(pkt.PitToken) > 0 {
fragments[0].PitToken = pkt.PitToken
}
// Send fragment(s)
for _, fragment := range fragments {
// PIT tokens
if len(out.PitToken) > 0 {
fragment.PitToken = out.PitToken
}

// Incoming face indication
if l.options.IsIncomingFaceIndicationEnabled && pkt.IncomingFaceID != nil {
fragments[0].IncomingFaceId = pkt.IncomingFaceID
}
// Incoming face indication
if l.options.IsIncomingFaceIndicationEnabled && out.InFace != nil {
fragment.IncomingFaceId = out.InFace
}

// Congestion marking
if pkt.CongestionMark != nil {
fragments[0].CongestionMark = pkt.CongestionMark
}
// Congestion marking
if congestionMark != nil {
fragment.CongestionMark = congestionMark
}

// Send fragment(s)
for _, fragment := range fragments {
pkt := &spec.Packet{
LpPacket: fragment,
}
Expand Down
60 changes: 29 additions & 31 deletions fw/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,8 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) {
*interest.HopLimitV -= 1
}

// Get PIT token (if any)
incomingPitToken := make([]byte, 0)
if len(packet.PitToken) > 0 {
incomingPitToken = make([]byte, len(packet.PitToken))
copy(incomingPitToken, packet.PitToken)
core.LogTrace(t, "OnIncomingInterest: ", packet.Name, ", FaceID=", incomingFace.FaceID(), ", Has PitToken")
} else {
core.LogTrace(t, "OnIncomingInterest: ", packet.Name, ", FaceID=", incomingFace.FaceID())
}
// Log PIT token (if any)
core.LogTrace(t, "OnIncomingInterest: ", packet.Name, ", FaceID=", incomingFace.FaceID(), ", PitTokenL=", len(packet.PitToken))

// Check if violates /localhost
if incomingFace.Scope() == defn.NonLocal &&
Expand Down Expand Up @@ -260,7 +253,7 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) {

// Add in-record and determine if already pending
// this looks like custom interest again, but again can be changed without much issue?
_, isAlreadyPending := pitEntry.InsertInRecord(interest, incomingFace.FaceID(), incomingPitToken)
_, isAlreadyPending := pitEntry.InsertInRecord(interest, incomingFace.FaceID(), packet.PitToken)

if !isAlreadyPending {
core.LogTrace(t, "Interest ", packet.Name, " is not pending")
Expand Down Expand Up @@ -299,21 +292,25 @@ func (t *Thread) processIncomingInterest(packet *defn.Pkt) {
if packet.NextHopFaceID != nil {
if dispatch.GetFace(*packet.NextHopFaceID) != nil {
core.LogTrace(t, "NextHopFaceId is set for Interest ", packet.Name, " - dispatching directly to face")
dispatch.GetFace(*packet.NextHopFaceID).SendPacket(packet)
dispatch.GetFace(*packet.NextHopFaceID).SendPacket(dispatch.OutPkt{
Pkt: packet,
PitToken: packet.PitToken, // TODO: ??
InFace: packet.IncomingFaceID,
})
} else {
core.LogInfo(t, "Non-existent face specified in NextHopFaceId for Interest ", packet.Name, " - DROP")
}
return
}

// Pass to strategy AfterReceiveInterest pipeline
var nexthops []*table.FibNextHopEntry
if fhName == nil {
nexthops = table.FibStrategyTable.FindNextHopsEnc(interest.NameV)
} else {
nexthops = table.FibStrategyTable.FindNextHopsEnc(fhName)
// Use forwarding hint if present
lookupName := interest.NameV
if fhName != nil {
lookupName = fhName
}

// Pass to strategy AfterReceiveInterest pipeline
nexthops := table.FibStrategyTable.FindNextHopsEnc(lookupName)
strategy.AfterReceiveInterest(packet, pitEntry, incomingFace.FaceID(), nexthops)
}

Expand Down Expand Up @@ -353,16 +350,18 @@ func (t *Thread) processOutgoingInterest(

t.NOutInterests++

// Make new PIT token if needed
pitToken := make([]byte, 6)
binary.BigEndian.PutUint16(pitToken, uint16(t.threadID))
binary.BigEndian.PutUint32(pitToken[2:], pitEntry.Token())

// Send on outgoing face
packet.IncomingFaceID = utils.IdPtr(inFace)
outgoingFace.SendPacket(dispatch.OutPkt{
Pkt: packet,
PitToken: pitToken,
InFace: utils.IdPtr(inFace),
})

// Make new PIT token if needed
if len(packet.PitToken) != 6 {
packet.PitToken = make([]byte, 6)
}
binary.BigEndian.PutUint16(packet.PitToken, uint16(t.threadID))
binary.BigEndian.PutUint32(packet.PitToken[2:], pitEntry.Token())
outgoingFace.SendPacket(packet)
return true
}

Expand Down Expand Up @@ -517,10 +516,9 @@ func (t *Thread) processOutgoingData(
t.NSatisfiedInterests++

// Send on outgoing face
if len(packet.PitToken) != len(pitToken) {
packet.PitToken = make([]byte, len(pitToken))
}
copy(packet.PitToken, pitToken)
packet.IncomingFaceID = utils.IdPtr(uint64(inFace))
outgoingFace.SendPacket(packet)
outgoingFace.SendPacket(dispatch.OutPkt{
Pkt: packet,
PitToken: pitToken,
InFace: utils.IdPtr(inFace),
})
}

0 comments on commit 4b3e306

Please sign in to comment.