From adf1bfdebe28ffae6e5e4cf004353afca5f1d80c Mon Sep 17 00:00:00 2001 From: Haotian Yi Date: Sat, 15 Feb 2025 15:21:06 -0800 Subject: [PATCH 1/7] Add an interface for congestion window --- std/object/client_consume_seg.go | 9 +- std/object/congestion/congestion_window.go | 30 ++++++ .../congestion/congestion_window_aimd.go | 96 +++++++++++++++++++ .../congestion/congestion_window_fixed.go | 39 ++++++++ 4 files changed, 170 insertions(+), 4 deletions(-) create mode 100644 std/object/congestion/congestion_window.go create mode 100644 std/object/congestion/congestion_window_aimd.go create mode 100644 std/object/congestion/congestion_window_fixed.go diff --git a/std/object/client_consume_seg.go b/std/object/client_consume_seg.go index c3ae9f03..a160f7d8 100644 --- a/std/object/client_consume_seg.go +++ b/std/object/client_consume_seg.go @@ -8,6 +8,7 @@ import ( enc "github.com/named-data/ndnd/std/encoding" "github.com/named-data/ndnd/std/log" "github.com/named-data/ndnd/std/ndn" + "github.com/named-data/ndnd/std/object/congestion" ) // round-robin based segment fetcher @@ -25,7 +26,7 @@ type rrSegFetcher struct { // number of outstanding interests outstanding int // window size - window int + window congestion.CongestionWindow } func newRrSegFetcher(client *Client) rrSegFetcher { @@ -33,7 +34,7 @@ func newRrSegFetcher(client *Client) rrSegFetcher { mutex: sync.RWMutex{}, client: client, streams: make([]*ConsumeState, 0), - window: 100, + window: congestion.NewFixedCongestionWindow(100), // fixed window trial outstanding: 0, } } @@ -47,7 +48,7 @@ func (s *rrSegFetcher) String() string { func (s *rrSegFetcher) IsCongested() bool { s.mutex.RLock() defer s.mutex.RUnlock() - return s.outstanding >= s.window + return s.outstanding >= s.window.Size() } // add a stream to the fetch queue @@ -71,7 +72,7 @@ func (s *rrSegFetcher) findWork() *ConsumeState { s.mutex.Lock() defer s.mutex.Unlock() - if s.outstanding >= s.window { + if s.outstanding >= s.window.Size() { return nil } diff --git a/std/object/congestion/congestion_window.go b/std/object/congestion/congestion_window.go new file mode 100644 index 00000000..0af5a6ea --- /dev/null +++ b/std/object/congestion/congestion_window.go @@ -0,0 +1,30 @@ +package congestion + +import "time" + +// CongestionSignal represents signals to adjust the congestion window. +type CongestionSignal int + +const ( + SigData = iota // data is fetched + SigLoss // data loss detected + SigCongest // congestion detected (e.g. NACK with a reason of congestion) +) + +// Congestion window change event +type WindowEvent struct { + age time.Time // time of the event + cwnd int // new window size +} + +// CongestionWindow provides an interface for congestion control that manages a window +type CongestionWindow interface { + String() string + + EventChannel() <-chan WindowEvent // where window events are emitted + HandleSignal(signal CongestionSignal) // signal handler + + Size() int + IncreaseWindow() + DecreaseWindow() +} \ No newline at end of file diff --git a/std/object/congestion/congestion_window_aimd.go b/std/object/congestion/congestion_window_aimd.go new file mode 100644 index 00000000..37ce99ba --- /dev/null +++ b/std/object/congestion/congestion_window_aimd.go @@ -0,0 +1,96 @@ +package congestion + +import ( + "math" + "time" + + "github.com/named-data/ndnd/std/log" +) + +// AIMDCongestionControl is an implementation of CongestionWindow using Additive Increase Multiplicative Decrease algorithm +type AIMDCongestionWindow struct { + window int // window size + eventCh chan WindowEvent // channel for emitting window change event + + initCwnd int // initial window size + ssthresh int // slow start threshold + minSsthresh int // minimum slow start threshold + aiStep int // additive increase step + mdCoef float64 // multiplicative decrease coefficient + resetCwnd bool // whether to reset cwnd after decrease +} + +// TODO: should we bundle the parameters into an AIMDOption struct? + +func NewAIMDCongestionWindow(cwnd int) *AIMDCongestionWindow { + return &AIMDCongestionWindow{ + window: cwnd, + eventCh: make(chan WindowEvent), + + initCwnd: cwnd, + ssthresh: math.MaxInt, + aiStep: 1, + mdCoef: 0.5, + resetCwnd: false, // defaults + } +} + +// log identifier +func (cw *AIMDCongestionWindow) String() string { + return "aimd-congestion-window" +} + +func (cw *AIMDCongestionWindow) Size() int { + return cw.window +} + +func (cw *AIMDCongestionWindow) IncreaseWindow() { + if cw.window < cw.ssthresh { + cw.window += cw.aiStep // additive increase + } else { + cw.window += cw.aiStep / cw.window // congestion avoidance + + // note: the congestion avoidance formula differs from RFC 5681 Section 3.1 + // recommendations and is borrowed from ndn-tools/catchunks, check + // https://github.com/named-data/ndn-tools/blob/130975c4be69d126fede77d47a50580d5e8b25b0/tools/chunks/catchunks/pipeline-interests-aimd.cpp#L45 + } + + cw.EmitWindowEvent(time.Now(), cw.window) // window change signal +} + +func (cw *AIMDCongestionWindow) DecreaseWindow() { + cw.ssthresh = int(math.Max(float64(cw.window) * cw.mdCoef, float64(cw.minSsthresh))) + + if cw.resetCwnd { + cw.window = cw.initCwnd + } else { + cw.window = cw.ssthresh + } + + cw.EmitWindowEvent(time.Now(), cw.window) // window change signal +} + +func (cw *AIMDCongestionWindow) EventChannel() <-chan WindowEvent { + return cw.eventCh +} + +func (cw *AIMDCongestionWindow) HandleSignal(signal CongestionSignal) { + switch signal { + case SigData: + cw.IncreaseWindow() + case SigLoss, SigCongest: + cw.DecreaseWindow() + default: + // no-op + } +} + +func (cw *AIMDCongestionWindow) EmitWindowEvent(age time.Time, cwnd int) { + // non-blocking send to the channel + select { + case cw.eventCh <- WindowEvent{age: age, cwnd: cwnd}: + default: + // if the channel is full, we log the change event + log.Debug(cw, "Window size changes", "window", cw.window) + } +} \ No newline at end of file diff --git a/std/object/congestion/congestion_window_fixed.go b/std/object/congestion/congestion_window_fixed.go new file mode 100644 index 00000000..ea3fd934 --- /dev/null +++ b/std/object/congestion/congestion_window_fixed.go @@ -0,0 +1,39 @@ +package congestion + +// FixedCongestionControl is an implementation of CongestionWindow using Additive Increase Multiplicative Decrease algorithm +type FixedCongestionWindow struct { + window int // window size + eventCh chan WindowEvent // channel for emitting window change event +} + +func NewFixedCongestionWindow(cwnd int) *FixedCongestionWindow { + return &FixedCongestionWindow{ + window: cwnd, + eventCh: make(chan WindowEvent), + } +} + +// log identifier +func (cw *FixedCongestionWindow) String() string { + return "fixed-congestion-window" +} + +func (cw *FixedCongestionWindow) Size() int { + return cw.window +} + +func (cw *FixedCongestionWindow) IncreaseWindow() { + // intentionally left blank: window size is fixed +} + +func (cw *FixedCongestionWindow) DecreaseWindow() { + // intentionally left blank: window size is fixed +} + +func (cw *FixedCongestionWindow) EventChannel() <-chan WindowEvent { + return cw.eventCh +} + +func (cw *FixedCongestionWindow) HandleSignal(signal CongestionSignal) { + // intentionally left blank: fixed CW doesn't respond to signals +} \ No newline at end of file From 8564c63c3123a2a2876383775d88b12823231487 Mon Sep 17 00:00:00 2001 From: Haotian Yi Date: Thu, 20 Feb 2025 17:31:55 -0800 Subject: [PATCH 2/7] Refactor to support a retransmission queue --- std/object/client_consume_seg.go | 186 +++++++++++++++++++++++++------ 1 file changed, 152 insertions(+), 34 deletions(-) diff --git a/std/object/client_consume_seg.go b/std/object/client_consume_seg.go index a160f7d8..a2e41d5d 100644 --- a/std/object/client_consume_seg.go +++ b/std/object/client_consume_seg.go @@ -1,6 +1,7 @@ package object import ( + "container/list" "fmt" "slices" "sync" @@ -8,7 +9,9 @@ import ( enc "github.com/named-data/ndnd/std/encoding" "github.com/named-data/ndnd/std/log" "github.com/named-data/ndnd/std/ndn" - "github.com/named-data/ndnd/std/object/congestion" + spec "github.com/named-data/ndnd/std/ndn/spec_2022" + cong "github.com/named-data/ndnd/std/object/congestion" + "github.com/named-data/ndnd/std/utils" ) // round-robin based segment fetcher @@ -26,7 +29,21 @@ type rrSegFetcher struct { // number of outstanding interests outstanding int // window size - window congestion.CongestionWindow + window cong.CongestionWindow + // retransmission queue + retxQueue *list.List + // remaining segments to be transmitted by state + txCounter map[*ConsumeState]int + // maximum number of retries + maxRetries int +} + +// retxEntry represents an entry in the retransmission queue +// it contains the consumer state, segment number and the number of retries left for the segment +type retxEntry struct { + state *ConsumeState + seg uint64 + retries int } func newRrSegFetcher(client *Client) rrSegFetcher { @@ -34,8 +51,11 @@ func newRrSegFetcher(client *Client) rrSegFetcher { mutex: sync.RWMutex{}, client: client, streams: make([]*ConsumeState, 0), - window: congestion.NewFixedCongestionWindow(100), // fixed window trial + window: cong.NewFixedCongestionWindow(100), outstanding: 0, + retxQueue: list.New(), + txCounter: make(map[*ConsumeState]int), + maxRetries: 3, } } @@ -72,10 +92,6 @@ func (s *rrSegFetcher) findWork() *ConsumeState { s.mutex.Lock() defer s.mutex.Unlock() - if s.outstanding >= s.window.Size() { - return nil - } - // round-robin selection of the next stream to fetch next := func() *ConsumeState { if len(s.streams) == 0 { @@ -133,34 +149,90 @@ func (s *rrSegFetcher) findWork() *ConsumeState { func (s *rrSegFetcher) check() { for { - state := s.findWork() - if state == nil { + log.Debug(nil, "Checking for work") + + // check if the window is full + if s.IsCongested() { + log.Debug(nil, "Window full", "size", s.window.Size()) + return // no need to generate new interests + } + + var ( + state *ConsumeState + seg uint64 + retries int = s.maxRetries // TODO: make it configurable + ) + + // if there are retransmissions, handle them first + if s.retxQueue.Len() > 0 { + log.Debug(nil, "Retransmitting") + + retx := s.retxQueue.Remove(s.retxQueue.Front()).(*retxEntry) + state = retx.state + seg = retx.seg + retries = retx.retries + + } else { // if no retransmissions, find a stream to work on + state = s.findWork() + if state == nil { + return + } + + // update window parameters + seg = uint64(state.wnd[2]) + state.wnd[2]++ + } + + // build interest + name := state.fetchName.Append(enc.NewSegmentComponent(seg)) + config := &ndn.InterestConfig{ + MustBeFresh: false, + Nonce: utils.ConvertNonce(s.client.engine.Timer().Nonce()), // new nonce for each call + } + var appParam enc.Wire = nil + var signer ndn.Signer = nil + + log.Debug(nil, "Building interest", "name", name, "config", config) + interest, err := s.client.Engine().Spec().MakeInterest(name, config, appParam, signer) + if err != nil { + s.handleResult(ndn.ExpressCallbackArgs{ + Result: ndn.InterestResultError, + Error: err, + }, state, seg, retries) + return + } + + // build express callback function + callback := func(args ndn.ExpressCallbackArgs) { + s.handleResult(args, state, seg, retries) + } + + // express interest + log.Debug(nil, "Expressing interest", "name", interest.FinalName) + err = s.client.Engine().Express(interest, callback) + if err != nil { + s.handleResult(ndn.ExpressCallbackArgs{ + Result: ndn.InterestResultError, + Error: err, + }, state, seg, retries) return } - // update window parameters - seg := uint64(state.wnd[2]) + // increment outstanding interest count + s.mutex.Lock() s.outstanding++ - state.wnd[2]++ - - // queue outgoing interest for the next segment - s.client.ExpressR(ndn.ExpressRArgs{ - Name: state.fetchName.Append(enc.NewSegmentComponent(seg)), - Config: &ndn.InterestConfig{ - MustBeFresh: false, - }, - Retries: 3, - Callback: func(args ndn.ExpressCallbackArgs) { - s.handleData(args, state) - }, - }) + s.mutex.Unlock() } } -// handleData is called when a data packet is received. +// handleResult is called when the result for an interest is ready. // It is necessary that this function be called only from one goroutine - the engine. -// The notable exception here is when there is a timeout, which has a separate goroutine. -func (s *rrSegFetcher) handleData(args ndn.ExpressCallbackArgs, state *ConsumeState) { +func (s *rrSegFetcher) handleResult(args ndn.ExpressCallbackArgs, state *ConsumeState, seg uint64, retries int) { + // get the name of the interest + var interestName enc.Name = state.fetchName.Append(enc.NewSegmentComponent(seg)) + log.Debug(nil, "Parsing interest result", "name", interestName) + + // decrement outstanding interest count s.mutex.Lock() s.outstanding-- s.mutex.Unlock() @@ -169,16 +241,41 @@ func (s *rrSegFetcher) handleData(args ndn.ExpressCallbackArgs, state *ConsumeSt return } - if args.Result == ndn.InterestResultError { - state.finalizeError(fmt.Errorf("%w: fetch seg failed: %w", ndn.ErrNetwork, args.Error)) - return - } + // handle the result + switch args.Result { + case ndn.InterestResultTimeout: + log.Debug(nil, "Interest timeout", "name", interestName) + + s.window.HandleSignal(cong.SigLoss) + s.enqueueForRetransmission(state, seg, retries - 1) + + case ndn.InterestResultNack: + log.Debug(nil, "Interest nack'd", "name", interestName) + + switch args.NackReason { + case spec.NackReasonDuplicate: + // ignore Nack for duplicates + case spec.NackReasonCongestion: + // congestion signal + s.window.HandleSignal(cong.SigCongest) + s.enqueueForRetransmission(state, seg, retries - 1) + default: + // treat as irrecoverable error for now + state.finalizeError(fmt.Errorf("%w: fetch seg failed with result: %s", ndn.ErrNetwork, args.Result)) + } - if args.Result != ndn.InterestResultData { + case ndn.InterestResultData: // data is successfully retrieved + s.handleData(args, state) + + default: // treat as irrecoverable error for now state.finalizeError(fmt.Errorf("%w: fetch seg failed with result: %s", ndn.ErrNetwork, args.Result)) - return } +} +// handleData is called when the interest result is processed and the data is ready to be validated. +// It is necessary that this function be called only from one goroutine - the engine. +// The notable exception here is when there is a timeout, which has a separate goroutine. +func (s *rrSegFetcher) handleData(args ndn.ExpressCallbackArgs, state *ConsumeState) { s.client.Validate(args.Data, args.SigCovered, func(valid bool, err error) { if !valid { state.finalizeError(fmt.Errorf("%w: validate seg failed: %w", ndn.ErrSecurity, err)) @@ -204,6 +301,7 @@ func (s *rrSegFetcher) handleValidatedData(args ndn.ExpressCallbackArgs, state * } state.segCnt = int(fbId.NumberVal()) + 1 + s.txCounter[state] = state.segCnt // number of segments to be transmitted for this state if state.segCnt > maxObjectSeg || state.segCnt <= 0 { state.finalizeError(fmt.Errorf("%w: invalid FinalBlockId=%d", ndn.ErrProtocol, state.segCnt)) return @@ -236,17 +334,23 @@ func (s *rrSegFetcher) handleValidatedData(args ndn.ExpressCallbackArgs, state * panic("[BUG] consume: nil data segment") } + // decrease transmission counter + s.mutex.Lock() + s.txCounter[state]-- + s.mutex.Unlock() + // if this is the first outstanding segment, move windows if state.wnd[1] == segNum { for state.wnd[1] < state.segCnt && state.content[state.wnd[1]] != nil { state.wnd[1]++ } - if state.wnd[1] == state.segCnt { + if state.wnd[1] == state.segCnt && s.txCounter[state] == 0 { log.Debug(s, "Stream completed successfully", "name", state.fetchName) s.mutex.Lock() s.remove(state) + delete(s.txCounter, state) s.mutex.Unlock() if !state.complete.Swap(true) { @@ -266,3 +370,17 @@ func (s *rrSegFetcher) handleValidatedData(args ndn.ExpressCallbackArgs, state * // s.outstanding) // } } + +// enqueueForRetransmission enqueues a segment for retransmission +// it registers retries and treats exhausted retries as irrecoverable errors +func (s *rrSegFetcher) enqueueForRetransmission(state *ConsumeState, seg uint64, retries int) { + if (retries == 0) { // retransmission exhausted + state.finalizeError(fmt.Errorf("%w: retries exhausted, segment number=%d", ndn.ErrNetwork, seg)) + return + } + + s.mutex.Lock() + defer s.mutex.Unlock() + + s.retxQueue.PushBack(&retxEntry{state, seg, retries}) +} \ No newline at end of file From 95b6b15b078d147d279c35a8536babafd6de0ec1 Mon Sep 17 00:00:00 2001 From: Haotian Yi Date: Sun, 23 Feb 2025 18:22:46 -0800 Subject: [PATCH 3/7] Fix bug: uninitialized minSsthresh --- std/object/congestion/congestion_window_aimd.go | 1 + 1 file changed, 1 insertion(+) diff --git a/std/object/congestion/congestion_window_aimd.go b/std/object/congestion/congestion_window_aimd.go index 37ce99ba..e6663ae9 100644 --- a/std/object/congestion/congestion_window_aimd.go +++ b/std/object/congestion/congestion_window_aimd.go @@ -29,6 +29,7 @@ func NewAIMDCongestionWindow(cwnd int) *AIMDCongestionWindow { initCwnd: cwnd, ssthresh: math.MaxInt, + minSsthresh: 2, aiStep: 1, mdCoef: 0.5, resetCwnd: false, // defaults From 40f1f0fd52bdab01860fb9702ff9645db9096862 Mon Sep 17 00:00:00 2001 From: Haotian Yi Date: Sun, 23 Feb 2025 18:28:20 -0800 Subject: [PATCH 4/7] Fix bug: not starting new work after timeouts and nacks --- std/object/client_consume_seg.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/std/object/client_consume_seg.go b/std/object/client_consume_seg.go index a2e41d5d..6bc1137f 100644 --- a/std/object/client_consume_seg.go +++ b/std/object/client_consume_seg.go @@ -270,6 +270,8 @@ func (s *rrSegFetcher) handleResult(args ndn.ExpressCallbackArgs, state *Consume default: // treat as irrecoverable error for now state.finalizeError(fmt.Errorf("%w: fetch seg failed with result: %s", ndn.ErrNetwork, args.Result)) } + + s.check() // check for more work } // handleData is called when the interest result is processed and the data is ready to be validated. @@ -282,7 +284,6 @@ func (s *rrSegFetcher) handleData(args ndn.ExpressCallbackArgs, state *ConsumeSt } else { s.handleValidatedData(args, state) } - s.check() }) } From c41f07d1311fc8e26d99db09120ca97ff41e5323 Mon Sep 17 00:00:00 2001 From: Haotian Yi Date: Sun, 23 Feb 2025 20:31:03 -0800 Subject: [PATCH 5/7] Add SigData emission --- std/object/client_consume_seg.go | 1 + 1 file changed, 1 insertion(+) diff --git a/std/object/client_consume_seg.go b/std/object/client_consume_seg.go index 6bc1137f..edc1eadd 100644 --- a/std/object/client_consume_seg.go +++ b/std/object/client_consume_seg.go @@ -266,6 +266,7 @@ func (s *rrSegFetcher) handleResult(args ndn.ExpressCallbackArgs, state *Consume case ndn.InterestResultData: // data is successfully retrieved s.handleData(args, state) + s.window.HandleSignal(cong.SigData) default: // treat as irrecoverable error for now state.finalizeError(fmt.Errorf("%w: fetch seg failed with result: %s", ndn.ErrNetwork, args.Result)) From 28560e28438bcd9bd7c4063167effd558f19049a Mon Sep 17 00:00:00 2001 From: Haotian Yi Date: Sun, 23 Feb 2025 20:40:26 -0800 Subject: [PATCH 6/7] Change internal window representation to float64 --- .../congestion/congestion_window_aimd.go | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/std/object/congestion/congestion_window_aimd.go b/std/object/congestion/congestion_window_aimd.go index e6663ae9..90a7339f 100644 --- a/std/object/congestion/congestion_window_aimd.go +++ b/std/object/congestion/congestion_window_aimd.go @@ -9,13 +9,13 @@ import ( // AIMDCongestionControl is an implementation of CongestionWindow using Additive Increase Multiplicative Decrease algorithm type AIMDCongestionWindow struct { - window int // window size + window float64 // window size - float64 to allow percentage growth in congestion avoidance phase eventCh chan WindowEvent // channel for emitting window change event - initCwnd int // initial window size - ssthresh int // slow start threshold - minSsthresh int // minimum slow start threshold - aiStep int // additive increase step + initCwnd float64 // initial window size + ssthresh float64 // slow start threshold + minSsthresh float64 // minimum slow start threshold + aiStep float64 // additive increase step mdCoef float64 // multiplicative decrease coefficient resetCwnd bool // whether to reset cwnd after decrease } @@ -24,13 +24,13 @@ type AIMDCongestionWindow struct { func NewAIMDCongestionWindow(cwnd int) *AIMDCongestionWindow { return &AIMDCongestionWindow{ - window: cwnd, + window: float64(cwnd), eventCh: make(chan WindowEvent), - initCwnd: cwnd, - ssthresh: math.MaxInt, - minSsthresh: 2, - aiStep: 1, + initCwnd: float64(cwnd), + ssthresh: math.MaxFloat64, + minSsthresh: 2.0, + aiStep: 1.0, mdCoef: 0.5, resetCwnd: false, // defaults } @@ -42,7 +42,7 @@ func (cw *AIMDCongestionWindow) String() string { } func (cw *AIMDCongestionWindow) Size() int { - return cw.window + return int(math.Floor(cw.window)) } func (cw *AIMDCongestionWindow) IncreaseWindow() { @@ -56,11 +56,11 @@ func (cw *AIMDCongestionWindow) IncreaseWindow() { // https://github.com/named-data/ndn-tools/blob/130975c4be69d126fede77d47a50580d5e8b25b0/tools/chunks/catchunks/pipeline-interests-aimd.cpp#L45 } - cw.EmitWindowEvent(time.Now(), cw.window) // window change signal + cw.EmitWindowEvent(time.Now(), cw.Size()) // window change signal } func (cw *AIMDCongestionWindow) DecreaseWindow() { - cw.ssthresh = int(math.Max(float64(cw.window) * cw.mdCoef, float64(cw.minSsthresh))) + cw.ssthresh = math.Max(cw.window * cw.mdCoef, cw.minSsthresh) if cw.resetCwnd { cw.window = cw.initCwnd @@ -68,7 +68,7 @@ func (cw *AIMDCongestionWindow) DecreaseWindow() { cw.window = cw.ssthresh } - cw.EmitWindowEvent(time.Now(), cw.window) // window change signal + cw.EmitWindowEvent(time.Now(), cw.Size()) // window change signal } func (cw *AIMDCongestionWindow) EventChannel() <-chan WindowEvent { From 7124f5bcb48b0d49b794a6c180f9e09133606f7b Mon Sep 17 00:00:00 2001 From: Haotian Yi Date: Mon, 24 Feb 2025 17:12:09 -0800 Subject: [PATCH 7/7] Fix bug: race conditions --- std/object/client_consume_seg.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/std/object/client_consume_seg.go b/std/object/client_consume_seg.go index edc1eadd..801b33db 100644 --- a/std/object/client_consume_seg.go +++ b/std/object/client_consume_seg.go @@ -167,7 +167,18 @@ func (s *rrSegFetcher) check() { if s.retxQueue.Len() > 0 { log.Debug(nil, "Retransmitting") - retx := s.retxQueue.Remove(s.retxQueue.Front()).(*retxEntry) + var retx *retxEntry + + s.mutex.Lock() + front := s.retxQueue.Front() + if front != nil { + retx = s.retxQueue.Remove(front).(*retxEntry) + s.mutex.Unlock() + } else { + s.mutex.Unlock() + continue + } + state = retx.state seg = retx.seg retries = retx.retries @@ -179,8 +190,10 @@ func (s *rrSegFetcher) check() { } // update window parameters + s.mutex.Lock() seg = uint64(state.wnd[2]) state.wnd[2]++ + s.mutex.Unlock() } // build interest