Skip to content

Commit

Permalink
skip-lookup followed by list-remote (fix)
Browse files Browse the repository at this point in the history
* refactor and fix the relevant snippet
* main loop: check 'aborted' also after getting next page

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Feb 12, 2025
1 parent e95f3ac commit 7762faf
Showing 1 changed file with 46 additions and 17 deletions.
63 changes: 46 additions & 17 deletions xact/xs/lso.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ loop:
r.IncPending()
resp := r.doPage()
r.DecPending()
if r.IsAborted() {
break loop
}
if resp.Err == nil {
// report heterogeneous stats (x-list is an exception)
r.ObjsAdd(len(resp.Lst.Entries), 0)
Expand Down Expand Up @@ -413,22 +416,7 @@ func (r *LsoXact) nextPageR() (err error) {
r.wiCnt.Inc()

if r.walk.this {
nentries := allocLsoEntries()
page, err = npg.nextPageR(nentries)
r.walk.last = page.ContinuationToken == ""

if !r.walk.wor && !r.IsAborted() {
if err == nil {
// bcast page
err = r.bcast(page)
}
if err == nil && !r.walk.dontPopulate {
err = npg.filterAddLmeta(page)
}
if err != nil {
r.sendTerm(r.msg.UUID, nil, err)
}
}
page, err = r.thisPageR(npg)
} else {
debug.Assert(!r.msg.WantOnlyRemoteProps() && /*same*/ !r.walk.wor)
select {
Expand Down Expand Up @@ -465,6 +453,45 @@ ex:
return
}

func (r *LsoXact) thisPageR(npg *npgCtx) (page *cmn.LsoRes, err error) {
var (
aborted bool
nentries = allocLsoEntries()
)
page, err = npg.nextPageR(nentries)
if err != nil {
goto rerr
}
r.walk.last = page.ContinuationToken == ""

if r.walk.wor {
return page, nil
}
if aborted = r.IsAborted(); aborted {
goto rerr
}

// bcast page
if err = r.bcast(page); err != nil {
goto rerr
}
// populate
if !r.walk.dontPopulate {
if err = npg.filterAddLmeta(page); err != nil {
goto rerr
}
}

return page, nil

rerr:
if aborted && err == nil {
err = r.AbortErr()
}
r.sendTerm(r.msg.UUID, nil, err)
return page, err
}

func (r *LsoXact) bcast(page *cmn.LsoRes) (err error) {
if r.p.dm == nil { // single target
return nil
Expand Down Expand Up @@ -688,7 +715,9 @@ func (r *LsoXact) recv(hdr *transport.ObjHdr, objReader io.Reader, err error) er
debug.Assert(lsoIsRemote(r.p.Bck, r.msg.IsFlagSet(apc.LsCached)))

if hdr.Opcode == opcodeAbrt {
err = errors.New(hdr.ObjName) // definitely see `streamingX.sendTerm()`
// TODO: consider r.Abort(err); today it'll idle for a while
// see: streamingX.sendTerm
err = errors.New(hdr.ObjName)
}
if err != nil && !cos.IsEOF(err) {
nlog.Errorln(core.T.String(), r.String(), len(r.remtCh), err)
Expand Down

0 comments on commit 7762faf

Please sign in to comment.