Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various fixes #64

Merged
merged 13 commits into from
Dec 11, 2024
5 changes: 4 additions & 1 deletion face/internal-transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ func (t *InternalTransport) sendFrame(frame []byte) {
t.nOutBytes += uint64(len(frame))

core.LogDebug(t, "Sending frame of size ", len(frame))
t.recvQueue <- frame

frameCopy := make([]byte, len(frame))
copy(frameCopy, frame)
t.recvQueue <- frameCopy
}

func (t *InternalTransport) runReceive() {
Expand Down
19 changes: 12 additions & 7 deletions face/ndnlp-link-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func sendPacket(l *NDNLPLinkService, netPacket *ndn_defn.PendingPacket) {
nFragments := int((len(wire) + effectiveMtu - 1) / effectiveMtu)
lastFragSize := len(wire) - effectiveMtu*(nFragments-1)
fragments = make([]*spec.LpPacket, nFragments)
reader := enc.NewWireReader(enc.Wire{wire})
reader := enc.NewBufferReader(wire)
for i := 0; i < nFragments; i++ {
if i < nFragments-1 {
frag, err := reader.ReadWire(effectiveMtu)
Expand Down Expand Up @@ -291,11 +291,8 @@ func (l *NDNLPLinkService) handleIncomingFrame(rawFrame []byte) {
// We have to copy so receive transport buffer can be reused
wire := make([]byte, len(rawFrame))
copy(wire, rawFrame)
l.processIncomingFrame(wire)
}

func (l *NDNLPLinkService) processIncomingFrame(wire []byte) {
// all incoming frames come through a link service
// All incoming frames come through a link service
// Attempt to decode buffer into LpPacket
netPacket := &ndn_defn.PendingPacket{
IncomingFaceID: utils.IdPtr(l.faceID),
Expand Down Expand Up @@ -367,11 +364,19 @@ func (l *NDNLPLinkService) processIncomingFrame(wire []byte) {
if len(packet.LpPacket.PitToken) > 0 {
netPacket.PitToken = packet.LpPacket.PitToken
}
packet, _, e = spec.ReadPacket(enc.NewWireReader(fragment))

// Copy fragment to wire buffer
wire = wire[:0]
for _, b := range fragment {
wire = append(wire, b...)
}

// Parse inner packet in place
packet, _, e = spec.ReadPacket(enc.NewBufferReader(wire))
if e != nil {
return
}
netPacket.RawBytes = fragment.Join()
netPacket.RawBytes = wire
netPacket.EncPacket = packet
}
// Counters
Expand Down
41 changes: 23 additions & 18 deletions fw/bestroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
enc "github.com/zjkmxy/go-ndn/pkg/encoding"
)

// BestRoute is a forwarding strategy that forwards Interests to the nexthop with the lowest cost.
// BestRoute is a forwarding strategy that forwards Interests
// to the nexthop with the lowest cost.
type BestRoute struct {
StrategyBase
}
Expand All @@ -27,46 +28,50 @@ func init() {
StrategyVersions["best-route"] = []uint64{1}
}

// Instantiate creates a new instance of the BestRoute strategy.
func (s *BestRoute) Instantiate(fwThread *Thread) {
s.NewStrategyBase(fwThread, enc.Component{
Typ: enc.TypeGenericNameComponent, Val: []byte("best-route"),
}, 1, "BestRoute")
}

// AfterContentStoreHit ...
func (s *BestRoute) AfterContentStoreHit(pendingPacket *ndn_defn.PendingPacket, pitEntry table.PitEntry, inFace uint64) {
// Send downstream
core.LogTrace(s, "AfterContentStoreHit: Forwarding content store hit Data=", pendingPacket.NameCache,
" to FaceID=", inFace)
s.SendData(pendingPacket, pitEntry, inFace, 0) // 0 indicates ContentStore is source
func (s *BestRoute) AfterContentStoreHit(
packet *ndn_defn.PendingPacket,
pitEntry table.PitEntry,
inFace uint64,
) {
core.LogTrace(s, "AfterContentStoreHit: Forwarding content store hit Data=", packet.NameCache, " to FaceID=", inFace)
s.SendData(packet, pitEntry, inFace, 0) // 0 indicates ContentStore is source
}

// AfterReceiveData ...
func (s *BestRoute) AfterReceiveData(pendingPacket *ndn_defn.PendingPacket, pitEntry table.PitEntry, inFace uint64) {
func (s *BestRoute) AfterReceiveData(
packet *ndn_defn.PendingPacket,
pitEntry table.PitEntry,
inFace uint64,
) {
core.LogTrace(s, "AfterReceiveData: Data=", ", ", len(pitEntry.InRecords()), " In-Records")
for faceID := range pitEntry.InRecords() {
core.LogTrace(s, "AfterReceiveData: Forwarding Data=", pendingPacket.NameCache, " to FaceID=", faceID)
s.SendData(pendingPacket, pitEntry, faceID, inFace)
core.LogTrace(s, "AfterReceiveData: Forwarding Data=", packet.NameCache, " to FaceID=", faceID)
s.SendData(packet, pitEntry, faceID, inFace)
}
}

// AfterReceiveInterest ...
func (s *BestRoute) AfterReceiveInterest(
pendingPacket *ndn_defn.PendingPacket, pitEntry table.PitEntry, inFace uint64, nexthops []*table.FibNextHopEntry,
packet *ndn_defn.PendingPacket,
pitEntry table.PitEntry,
inFace uint64,
nexthops []*table.FibNextHopEntry,
) {
sort.Slice(nexthops, func(i, j int) bool { return nexthops[i].Cost < nexthops[j].Cost })
for _, nh := range nexthops {
core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", pendingPacket.NameCache, " to FaceID=", nh.Nexthop)
if sent := s.SendInterest(pendingPacket, pitEntry, nh.Nexthop, inFace); sent {
core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", packet.NameCache, " to FaceID=", nh.Nexthop)
if sent := s.SendInterest(packet, pitEntry, nh.Nexthop, inFace); sent {
return
}
}

core.LogDebug(s, "AfterReceiveInterest: No usable nexthop for Interest=", pendingPacket.NameCache, " - DROP")
core.LogDebug(s, "AfterReceiveInterest: No usable nexthop for Interest=", packet.NameCache, " - DROP")
}

// BeforeSatisfyInterest ...
func (s *BestRoute) BeforeSatisfyInterest(pitEntry table.PitEntry, inFace uint64) {
// This does nothing in BestRoute
}
43 changes: 22 additions & 21 deletions fw/multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,49 +26,50 @@ func init() {
StrategyVersions["multicast"] = []uint64{1}
}

// Instantiate creates a new instance of the Multicast strategy.
func (s *Multicast) Instantiate(fwThread *Thread) {
s.NewStrategyBase(fwThread, enc.Component{
Typ: enc.TypeGenericNameComponent, Val: []byte("multicast"),
}, 1, "Multicast")
}

// AfterContentStoreHit ...
func (s *Multicast) AfterContentStoreHit(pendingPacket *ndn_defn.PendingPacket, pitEntry table.PitEntry, inFace uint64) {
// Send downstream
core.LogTrace(s, "AfterContentStoreHit: Forwarding content store hit Data=", pendingPacket.NameCache,
" to FaceID=", inFace)
s.SendData(pendingPacket, pitEntry, inFace, 0) // 0 indicates ContentStore is source
func (s *Multicast) AfterContentStoreHit(
packet *ndn_defn.PendingPacket,
pitEntry table.PitEntry,
inFace uint64,
) {
core.LogTrace(s, "AfterContentStoreHit: Forwarding content store hit Data=", packet.NameCache, " to FaceID=", inFace)
s.SendData(packet, pitEntry, inFace, 0) // 0 indicates ContentStore is source
}

// AfterReceiveData ...
func (s *Multicast) AfterReceiveData(pendingPacket *ndn_defn.PendingPacket, pitEntry table.PitEntry, inFace uint64) {
core.LogTrace(s, "AfterReceiveData: Data=", pendingPacket.EncPacket.Data.NameV, ", ",
len(pitEntry.InRecords()), " In-Records")
func (s *Multicast) AfterReceiveData(
packet *ndn_defn.PendingPacket,
pitEntry table.PitEntry,
inFace uint64,
) {
core.LogTrace(s, "AfterReceiveData: Data=", packet.NameCache, ", ", len(pitEntry.InRecords()), " In-Records")
for faceID := range pitEntry.InRecords() {
core.LogTrace(s, "AfterReceiveData: Forwarding Data=", pendingPacket.EncPacket.Data.NameV, " to FaceID=", faceID)
s.SendData(pendingPacket, pitEntry, faceID, inFace)
core.LogTrace(s, "AfterReceiveData: Forwarding Data=", packet.NameCache, " to FaceID=", faceID)
s.SendData(packet, pitEntry, faceID, inFace)
}
}

// AfterReceiveInterest ...
func (s *Multicast) AfterReceiveInterest(
pendingPacket *ndn_defn.PendingPacket, pitEntry table.PitEntry, inFace uint64, nexthops []*table.FibNextHopEntry,
packet *ndn_defn.PendingPacket,
pitEntry table.PitEntry,
inFace uint64,
nexthops []*table.FibNextHopEntry,
) {
if len(nexthops) == 0 {
core.LogDebug(s, "AfterReceiveInterest: No nexthop for Interest=", pendingPacket.NameCache,
" - DROP")
core.LogDebug(s, "AfterReceiveInterest: No nexthop for Interest=", packet.NameCache, " - DROP")
return
}

for _, nexthop := range nexthops {
core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", pendingPacket.NameCache,
" to FaceID=", nexthop.Nexthop)
s.SendInterest(pendingPacket, pitEntry, nexthop.Nexthop, inFace)
core.LogTrace(s, "AfterReceiveInterest: Forwarding Interest=", packet.NameCache, " to FaceID=", nexthop.Nexthop)
s.SendInterest(packet, pitEntry, nexthop.Nexthop, inFace)
}
}

// BeforeSatisfyInterest ...
func (s *Multicast) BeforeSatisfyInterest(pitEntry table.PitEntry, inFace uint64) {
// This does nothing in Multicast
}
41 changes: 30 additions & 11 deletions fw/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package fw
import (
"strconv"

"github.com/named-data/YaNFD/core"
"github.com/named-data/YaNFD/ndn_defn"
"github.com/named-data/YaNFD/table"
enc "github.com/zjkmxy/go-ndn/pkg/encoding"
Expand All @@ -25,11 +24,22 @@ type Strategy interface {
String() string
GetName() enc.Name

AfterContentStoreHit(pendingPacket *ndn_defn.PendingPacket, pitEntry table.PitEntry, inFace uint64)
AfterReceiveData(pendingPacket *ndn_defn.PendingPacket, pitEntry table.PitEntry, inFace uint64)
AfterContentStoreHit(
packet *ndn_defn.PendingPacket,
pitEntry table.PitEntry,
inFace uint64)
AfterReceiveData(
packet *ndn_defn.PendingPacket,
pitEntry table.PitEntry,
inFace uint64)
AfterReceiveInterest(
pendingPacket *ndn_defn.PendingPacket, pitEntry table.PitEntry, inFace uint64, nexthops []*table.FibNextHopEntry)
BeforeSatisfyInterest(pitEntry table.PitEntry, inFace uint64)
packet *ndn_defn.PendingPacket,
pitEntry table.PitEntry,
inFace uint64,
nexthops []*table.FibNextHopEntry)
BeforeSatisfyInterest(
pitEntry table.PitEntry,
inFace uint64)
}

// StrategyBase provides common helper methods for YaNFD forwarding strategies.
Expand All @@ -44,14 +54,17 @@ type StrategyBase struct {

// NewStrategyBase is a helper that allows specific strategies to initialize the base.
func (s *StrategyBase) NewStrategyBase(
fwThread *Thread, strategyName enc.Component, version uint64, strategyLogName string,
fwThread *Thread,
strategyName enc.Component,
version uint64,
strategyLogName string,
) {
var err error
s.thread = fwThread
s.threadID = s.thread.threadID
s.name, err = enc.NameFromStr(StrategyPrefix)
if err != nil {
core.LogFatal(s, "StrategyPrefix is a bad NDN Name")
panic("StrategyPrefix is a bad NDN Name")
}
s.strategyName = strategyName
s.name = append(s.name, strategyName, enc.NewVersionComponent(version))
Expand All @@ -70,19 +83,25 @@ func (s *StrategyBase) GetName() enc.Name {

// SendInterest sends an Interest on the specified face.
func (s *StrategyBase) SendInterest(
pendingPacket *ndn_defn.PendingPacket, pitEntry table.PitEntry, nexthop uint64, inFace uint64,
packet *ndn_defn.PendingPacket,
pitEntry table.PitEntry,
nexthop uint64,
inFace uint64,
) bool {
return s.thread.processOutgoingInterest(pendingPacket, pitEntry, nexthop, inFace)
return s.thread.processOutgoingInterest(packet, pitEntry, nexthop, inFace)
}

// SendData sends a Data packet on the specified face.
func (s *StrategyBase) SendData(
pendingPacket *ndn_defn.PendingPacket, pitEntry table.PitEntry, nexthop uint64, inFace uint64,
packet *ndn_defn.PendingPacket,
pitEntry table.PitEntry,
nexthop uint64,
inFace uint64,
) {
var pitToken []byte
if inRecord, ok := pitEntry.InRecords()[nexthop]; ok {
pitToken = inRecord.PitToken
delete(pitEntry.InRecords(), nexthop)
}
s.thread.processOutgoingData(pendingPacket, nexthop, pitToken, inFace)
s.thread.processOutgoingData(packet, nexthop, pitToken, inFace)
}
Loading
Loading