Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: congestion control for catchunks #132

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 169 additions & 35 deletions std/object/client_consume_seg.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package object

import (
"container/list"
"fmt"
"slices"
"sync"

enc "github.com/named-data/ndnd/std/encoding"
"github.com/named-data/ndnd/std/log"
"github.com/named-data/ndnd/std/ndn"
spec "github.com/named-data/ndnd/std/ndn/spec_2022"
cong "github.com/named-data/ndnd/std/object/congestion"
"github.com/named-data/ndnd/std/utils"
)

// round-robin based segment fetcher
Expand All @@ -25,16 +29,33 @@ type rrSegFetcher struct {
// number of outstanding interests
outstanding int
// window size
window int
window cong.CongestionWindow
// retransmission queue
retxQueue *list.List
// remaining segments to be transmitted by state
txCounter map[*ConsumeState]int
// maximum number of retries
maxRetries int
}

// retxEntry represents an entry in the retransmission queue
// it contains the consumer state, segment number and the number of retries left for the segment
type retxEntry struct {
state *ConsumeState
seg uint64
retries int
}

func newRrSegFetcher(client *Client) rrSegFetcher {
return rrSegFetcher{
mutex: sync.RWMutex{},
client: client,
streams: make([]*ConsumeState, 0),
window: 100,
window: cong.NewFixedCongestionWindow(100),
outstanding: 0,
retxQueue: list.New(),
txCounter: make(map[*ConsumeState]int),
maxRetries: 3,
}
}

Expand All @@ -47,7 +68,7 @@ func (s *rrSegFetcher) String() string {
func (s *rrSegFetcher) IsCongested() bool {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.outstanding >= s.window
return s.outstanding >= s.window.Size()
}

// add a stream to the fetch queue
Expand All @@ -71,10 +92,6 @@ func (s *rrSegFetcher) findWork() *ConsumeState {
s.mutex.Lock()
defer s.mutex.Unlock()

if s.outstanding >= s.window {
return nil
}

// round-robin selection of the next stream to fetch
next := func() *ConsumeState {
if len(s.streams) == 0 {
Expand Down Expand Up @@ -132,34 +149,103 @@ func (s *rrSegFetcher) findWork() *ConsumeState {

func (s *rrSegFetcher) check() {
for {
state := s.findWork()
if state == nil {
log.Debug(nil, "Checking for work")

// check if the window is full
if s.IsCongested() {
log.Debug(nil, "Window full", "size", s.window.Size())
return // no need to generate new interests
}

var (
state *ConsumeState
seg uint64
retries int = s.maxRetries // TODO: make it configurable
)

// if there are retransmissions, handle them first
if s.retxQueue.Len() > 0 {
log.Debug(nil, "Retransmitting")

var retx *retxEntry

s.mutex.Lock()
front := s.retxQueue.Front()
if front != nil {
retx = s.retxQueue.Remove(front).(*retxEntry)
s.mutex.Unlock()
} else {
s.mutex.Unlock()
continue
}

state = retx.state
seg = retx.seg
retries = retx.retries

} else { // if no retransmissions, find a stream to work on
state = s.findWork()
if state == nil {
return
}

// update window parameters
s.mutex.Lock()
seg = uint64(state.wnd[2])
state.wnd[2]++
s.mutex.Unlock()
}

// build interest
name := state.fetchName.Append(enc.NewSegmentComponent(seg))
config := &ndn.InterestConfig{
MustBeFresh: false,
Nonce: utils.ConvertNonce(s.client.engine.Timer().Nonce()), // new nonce for each call
}
var appParam enc.Wire = nil
var signer ndn.Signer = nil

log.Debug(nil, "Building interest", "name", name, "config", config)
interest, err := s.client.Engine().Spec().MakeInterest(name, config, appParam, signer)
if err != nil {
s.handleResult(ndn.ExpressCallbackArgs{
Result: ndn.InterestResultError,
Error: err,
}, state, seg, retries)
return
}

// update window parameters
seg := uint64(state.wnd[2])
// build express callback function
callback := func(args ndn.ExpressCallbackArgs) {
s.handleResult(args, state, seg, retries)
}

// express interest
log.Debug(nil, "Expressing interest", "name", interest.FinalName)
err = s.client.Engine().Express(interest, callback)
if err != nil {
s.handleResult(ndn.ExpressCallbackArgs{
Result: ndn.InterestResultError,
Error: err,
}, state, seg, retries)
return
}

// increment outstanding interest count
s.mutex.Lock()
s.outstanding++
state.wnd[2]++

// queue outgoing interest for the next segment
s.client.ExpressR(ndn.ExpressRArgs{
Name: state.fetchName.Append(enc.NewSegmentComponent(seg)),
Config: &ndn.InterestConfig{
MustBeFresh: false,
},
Retries: 3,
Callback: func(args ndn.ExpressCallbackArgs) {
s.handleData(args, state)
},
})
s.mutex.Unlock()
}
}

// handleData is called when a data packet is received.
// handleResult is called when the result for an interest is ready.
// It is necessary that this function be called only from one goroutine - the engine.
// The notable exception here is when there is a timeout, which has a separate goroutine.
func (s *rrSegFetcher) handleData(args ndn.ExpressCallbackArgs, state *ConsumeState) {
func (s *rrSegFetcher) handleResult(args ndn.ExpressCallbackArgs, state *ConsumeState, seg uint64, retries int) {
// get the name of the interest
var interestName enc.Name = state.fetchName.Append(enc.NewSegmentComponent(seg))
log.Debug(nil, "Parsing interest result", "name", interestName)

// decrement outstanding interest count
s.mutex.Lock()
s.outstanding--
s.mutex.Unlock()
Expand All @@ -168,23 +254,50 @@ func (s *rrSegFetcher) handleData(args ndn.ExpressCallbackArgs, state *ConsumeSt
return
}

if args.Result == ndn.InterestResultError {
state.finalizeError(fmt.Errorf("%w: fetch seg failed: %w", ndn.ErrNetwork, args.Error))
return
}
// handle the result
switch args.Result {
case ndn.InterestResultTimeout:
log.Debug(nil, "Interest timeout", "name", interestName)

s.window.HandleSignal(cong.SigLoss)
s.enqueueForRetransmission(state, seg, retries - 1)

case ndn.InterestResultNack:
log.Debug(nil, "Interest nack'd", "name", interestName)

switch args.NackReason {
case spec.NackReasonDuplicate:
// ignore Nack for duplicates
case spec.NackReasonCongestion:
// congestion signal
s.window.HandleSignal(cong.SigCongest)
s.enqueueForRetransmission(state, seg, retries - 1)
default:
// treat as irrecoverable error for now
state.finalizeError(fmt.Errorf("%w: fetch seg failed with result: %s", ndn.ErrNetwork, args.Result))
}

case ndn.InterestResultData: // data is successfully retrieved
s.handleData(args, state)
s.window.HandleSignal(cong.SigData)

if args.Result != ndn.InterestResultData {
default: // treat as irrecoverable error for now
state.finalizeError(fmt.Errorf("%w: fetch seg failed with result: %s", ndn.ErrNetwork, args.Result))
return
}

s.check() // check for more work
}

// handleData is called when the interest result is processed and the data is ready to be validated.
// It is necessary that this function be called only from one goroutine - the engine.
// The notable exception here is when there is a timeout, which has a separate goroutine.
func (s *rrSegFetcher) handleData(args ndn.ExpressCallbackArgs, state *ConsumeState) {
s.client.Validate(args.Data, args.SigCovered, func(valid bool, err error) {
if !valid {
state.finalizeError(fmt.Errorf("%w: validate seg failed: %w", ndn.ErrSecurity, err))
} else {
s.handleValidatedData(args, state)
}
s.check()
})
}

Expand All @@ -203,6 +316,7 @@ func (s *rrSegFetcher) handleValidatedData(args ndn.ExpressCallbackArgs, state *
}

state.segCnt = int(fbId.NumberVal()) + 1
s.txCounter[state] = state.segCnt // number of segments to be transmitted for this state
if state.segCnt > maxObjectSeg || state.segCnt <= 0 {
state.finalizeError(fmt.Errorf("%w: invalid FinalBlockId=%d", ndn.ErrProtocol, state.segCnt))
return
Expand Down Expand Up @@ -235,17 +349,23 @@ func (s *rrSegFetcher) handleValidatedData(args ndn.ExpressCallbackArgs, state *
panic("[BUG] consume: nil data segment")
}

// decrease transmission counter
s.mutex.Lock()
s.txCounter[state]--
s.mutex.Unlock()

// if this is the first outstanding segment, move windows
if state.wnd[1] == segNum {
for state.wnd[1] < state.segCnt && state.content[state.wnd[1]] != nil {
state.wnd[1]++
}

if state.wnd[1] == state.segCnt {
if state.wnd[1] == state.segCnt && s.txCounter[state] == 0 {
log.Debug(s, "Stream completed successfully", "name", state.fetchName)

s.mutex.Lock()
s.remove(state)
delete(s.txCounter, state)
s.mutex.Unlock()

if !state.complete.Swap(true) {
Expand All @@ -265,3 +385,17 @@ func (s *rrSegFetcher) handleValidatedData(args ndn.ExpressCallbackArgs, state *
// s.outstanding)
// }
}

// enqueueForRetransmission enqueues a segment for retransmission
// it registers retries and treats exhausted retries as irrecoverable errors
func (s *rrSegFetcher) enqueueForRetransmission(state *ConsumeState, seg uint64, retries int) {
if (retries == 0) { // retransmission exhausted
state.finalizeError(fmt.Errorf("%w: retries exhausted, segment number=%d", ndn.ErrNetwork, seg))
return
}

s.mutex.Lock()
defer s.mutex.Unlock()

s.retxQueue.PushBack(&retxEntry{state, seg, retries})
}
30 changes: 30 additions & 0 deletions std/object/congestion/congestion_window.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package congestion

import "time"

// CongestionSignal represents signals to adjust the congestion window.
type CongestionSignal int

const (
SigData = iota // data is fetched
SigLoss // data loss detected
SigCongest // congestion detected (e.g. NACK with a reason of congestion)
)

// Congestion window change event
type WindowEvent struct {
age time.Time // time of the event
cwnd int // new window size
}

// CongestionWindow provides an interface for congestion control that manages a window
type CongestionWindow interface {
String() string

EventChannel() <-chan WindowEvent // where window events are emitted
HandleSignal(signal CongestionSignal) // signal handler

Size() int
IncreaseWindow()
DecreaseWindow()
}
Loading