Skip to content

Commit

Permalink
support listing only and exclusively not in-cluster ("not cached") co…
Browse files Browse the repository at this point in the history
…ntent

* add lsmsg flag
  - reuse the slot previously allocated by deprecated and removed one (in re: caching ls ais:// pages)
* CLI: add `--not-cached` option
* with refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Feb 10, 2025
1 parent 1c0b9fd commit 048342b
Show file tree
Hide file tree
Showing 26 changed files with 156 additions and 111 deletions.
2 changes: 1 addition & 1 deletion ais/backend/ais.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (m *AISbp) ListObjects(remoteBck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRe
}
remoteMsg := msg.Clone()
remoteMsg.PageSize = calcPageSize(remoteMsg.PageSize, remoteBck.MaxPageSize())
remoteMsg.ClearFlag(apc.LsDiff | apc.LsObjCached)
remoteMsg.ClearFlag(apc.LsDiff | apc.LsCached)

// TODO:
// Currently, not encoding xaction (aka request) `UUID` from the remote cluster
Expand Down
12 changes: 8 additions & 4 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,7 +1652,7 @@ func (p *proxy) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bc
// default props & flags => user-provided message
switch {
case lsmsg.Props == "":
if lsmsg.IsFlagSet(apc.LsObjCached) {
if lsmsg.IsFlagSet(apc.LsCached) {
lsmsg.AddProps(apc.GetPropsDefaultAIS...)
} else {
lsmsg.AddProps(apc.GetPropsMinimal...)
Expand All @@ -1664,7 +1664,7 @@ func (p *proxy) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.Bc
lsmsg.SetFlag(apc.LsNameSize)
}
if bck.IsHT() || lsmsg.IsFlagSet(apc.LsArchDir) {
lsmsg.SetFlag(apc.LsObjCached)
lsmsg.SetFlag(apc.LsCached)
}

// do page
Expand Down Expand Up @@ -1778,8 +1778,12 @@ func (p *proxy) lsPage(bck *meta.Bck, amsg *apc.ActMsg, lsmsg *apc.LsoMsg, hdr h

// list-objects flow control helper
func (p *proxy) _lsofc(bck *meta.Bck, lsmsg *apc.LsoMsg, smap *smapX) (tsi *meta.Snode, listRemote, wantOnlyRemote bool, err error) {
listRemote = bck.IsRemote() && !lsmsg.IsFlagSet(apc.LsObjCached)
listRemote = bck.IsRemote() && !lsmsg.IsFlagSet(apc.LsCached)
if !listRemote {
if lsmsg.IsFlagSet(apc.LsNotCached) {
err = fmt.Errorf("bucket %s is not remote - cannot list 'not cached' objects (by definition, all respective objects are in-cluster)",
bck.Cname(""))
}
return
}
if bck.Props.BID == 0 {
Expand All @@ -1789,7 +1793,7 @@ func (p *proxy) _lsofc(bck *meta.Bck, lsmsg *apc.LsoMsg, smap *smapX) (tsi *meta
debug.Assert(lsmsg.IsFlagSet(apc.LsDontAddRemote))
wantOnlyRemote = true
if !lsmsg.WantOnlyRemoteProps() {
err = fmt.Errorf("cannot list remote not-in-cluster bucket %s for not-only-remote object properties: %q",
err = fmt.Errorf("cannot list remote and not in-cluster bucket %s for not-only-remote object properties: %q",
bck.Cname(""), lsmsg.Props)
return
}
Expand Down
12 changes: 6 additions & 6 deletions ais/test/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ func TestListObjectsRemoteCached(t *testing.T) {
tlog.Logf("list remote objects with evict=%t\n", evict)
m.remotePuts(evict)

msg := &apc.LsoMsg{PageSize: 10, Flags: apc.LsObjCached}
msg := &apc.LsoMsg{PageSize: 10, Flags: apc.LsCached}
msg.AddProps(apc.GetPropsDefaultAIS...)
msg.AddProps(apc.GetPropsVersion)

Expand Down Expand Up @@ -1128,7 +1128,7 @@ func TestListObjectsRandPageSize(t *testing.T) {
num: rand.IntN(5000) + 1000,
fileSize: 128,
}
msg = &apc.LsoMsg{Flags: apc.LsObjCached}
msg = &apc.LsoMsg{Flags: apc.LsCached}
)

if !bck.IsAIS() {
Expand Down Expand Up @@ -2522,7 +2522,7 @@ func TestCopyBucket(t *testing.T) {
msg := &apc.LsoMsg{}
msg.AddProps(apc.GetPropsVersion)
if test.dstRemote {
msg.Flags = apc.LsObjCached
msg.Flags = apc.LsCached
}

dstBckList, err := api.ListObjects(baseParams, dstm.bck, msg, api.ListArgs{})
Expand Down Expand Up @@ -2581,7 +2581,7 @@ func TestCopyBucketSync(t *testing.T) {
tassert.Errorf(t, len(m.objNames) == m.num, "expected %d in the source bucket, got %d", m.num, len(m.objNames))

tlog.Logf("list source %s objects\n", cliBck.Cname(""))
msg := &apc.LsoMsg{Prefix: m.prefix, Flags: apc.LsObjCached}
msg := &apc.LsoMsg{Prefix: m.prefix, Flags: apc.LsCached}
lst, err := api.ListObjects(baseParams, m.bck, msg, api.ListArgs{})
tassert.CheckFatal(t, err)
tassert.Errorf(t, len(lst.Entries) == m.num, "expected %d present (cached) in the source bucket, got %d", m.num, len(lst.Entries))
Expand Down Expand Up @@ -3075,7 +3075,7 @@ func TestBackendBucket(t *testing.T) {
)

// Check if cached listing works correctly.
cacheMsg := &apc.LsoMsg{Flags: apc.LsObjCached, Prefix: m.prefix}
cacheMsg := &apc.LsoMsg{Flags: apc.LsCached, Prefix: m.prefix}
aisObjList, err = api.ListObjects(baseParams, aisBck, cacheMsg, api.ListArgs{})
tassert.CheckFatal(t, err)
tassert.Fatalf(
Expand Down Expand Up @@ -3435,7 +3435,7 @@ func TestBucketListAndSummary(t *testing.T) {
} else {
msg := &apc.LsoMsg{PageSize: int64(min(m.num/3, 256))} // mult. pages
if test.cached {
msg.Flags = apc.LsObjCached
msg.Flags = apc.LsCached
}
lst, err := api.ListObjects(baseParams, m.bck, msg, api.ListArgs{})
tassert.CheckFatal(t, err)
Expand Down
2 changes: 1 addition & 1 deletion ais/test/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func (m *ioContext) ensureNumCopies(baseParams api.BaseParams, expectedCopies in
tassert.CheckFatal(m.t, err)

// List Bucket - primarily for the copies
msg := &apc.LsoMsg{Flags: apc.LsObjCached, Prefix: m.prefix}
msg := &apc.LsoMsg{Flags: apc.LsCached, Prefix: m.prefix}
msg.AddProps(apc.GetPropsCopies, apc.GetPropsAtime, apc.GetPropsStatus)
objectList, err := api.ListObjects(baseParams, m.bck, msg, api.ListArgs{})
tassert.CheckFatal(m.t, err)
Expand Down
2 changes: 1 addition & 1 deletion ais/test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1727,7 +1727,7 @@ func TestOperationsWithRanges(t *testing.T) {
)
if evict {
xid, err = api.EvictMultiObj(baseParams, b, nil /*lst objnames*/, test.rangeStr)
msg.Flags = apc.LsObjCached
msg.Flags = apc.LsCached
kind = apc.ActEvictObjects
} else {
xid, err = api.DeleteMultiObj(baseParams, b, nil /*lst objnames*/, test.rangeStr)
Expand Down
4 changes: 2 additions & 2 deletions ais/test/objprops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,10 +610,10 @@ func testListObjects(t *testing.T, proxyURL string, bck cmn.Bck, msg *apc.LsoMsg
case msg == nil:
tlog.Logf("LIST %s []\n", bck.String())
case msg.Prefix == "" && msg.PageSize == 0 && msg.ContinuationToken == "":
tlog.Logf("LIST %s [cached: %t]\n", bck.String(), msg.IsFlagSet(apc.LsObjCached))
tlog.Logf("LIST %s [cached: %t]\n", bck.String(), msg.IsFlagSet(apc.LsCached))
default:
tlog.Logf("LIST %s [prefix: %q, page_size: %d, cached: %t, token: %q]\n",
bck.String(), msg.Prefix, msg.PageSize, msg.IsFlagSet(apc.LsObjCached), msg.ContinuationToken)
bck.String(), msg.Prefix, msg.PageSize, msg.IsFlagSet(apc.LsCached), msg.ContinuationToken)
}
baseParams := tools.BaseAPIParams(proxyURL)
resList, err := api.ListObjects(baseParams, bck, msg, api.ListArgs{})
Expand Down
6 changes: 3 additions & 3 deletions ais/test/regression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestListObjectsCloudGetLocation(t *testing.T) {

m.puts()

listObjectsMsg := &apc.LsoMsg{Props: apc.GetPropsLocation, Flags: apc.LsObjCached}
listObjectsMsg := &apc.LsoMsg{Props: apc.GetPropsLocation, Flags: apc.LsCached}
lst, err := api.ListObjects(baseParams, bck, listObjectsMsg, api.ListArgs{})
tassert.CheckFatal(t, err)

Expand Down Expand Up @@ -778,7 +778,7 @@ func TestPrefetchList(t *testing.T) {
}

msg := &apc.LsoMsg{}
msg.SetFlag(apc.LsObjCached)
msg.SetFlag(apc.LsCached)
lst, err := api.ListObjects(baseParams, bck, msg, api.ListArgs{})
tassert.CheckFatal(t, err)
if len(lst.Entries) != m.num {
Expand Down Expand Up @@ -905,7 +905,7 @@ func TestPrefetchRange(t *testing.T) {
}

msg := &apc.LsoMsg{Prefix: m.prefix}
msg.SetFlag(apc.LsObjCached)
msg.SetFlag(apc.LsCached)
lst, err := api.ListObjects(baseParams, bck, msg, api.ListArgs{})
tassert.CheckFatal(t, err)
if len(lst.Entries) < len(files) {
Expand Down
46 changes: 26 additions & 20 deletions api/apc/lsmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@ const (

// LsoMsg flags
const (
// Applies to objects from the buckets with remote backends (e.g., to optimize-out listing remotes)
// See related Flt* enum
LsObjCached = 1 << iota
// only list in-cluster objects, i.e., those from the respective remote bucket that are present (\"cached\")
// see also: flt* enum and `LsNotCached` below
LsCached = 1 << iota

LsMissing // include missing main obj (with copy existing)
// include missing main obj (with copy existing)
LsMissing

LsDeleted // include obj-s marked for deletion (TODO: not implemented yet)
// include obj-s marked for deletion (TODO: not implemented yet)
LsDeleted

LsArchDir // expand archives as directories
// expand archives as directories
LsArchDir

LsNameOnly // return only object names and, spearately, statuses
LsNameSize // same as above and size (minor speedup)
// return only object names and, spearately, statuses
LsNameOnly

// same as above and size (minor speedup)
LsNameSize

// Background: ============================================================
// as far as AIS is concerned, adding a (confirmed to exist)
Expand All @@ -50,35 +56,35 @@ const (
// * `QparamDontHeadRemote` (this package)
LsDontHeadRemote

// To list remote buckets without adding them to aistore
// list remote buckets without adding them to aistore
// See also:
// * cmd/cli/cli/const.go for `dontAddRemoteFlag`
// * `QparamDontAddRemote` (this package)
LsDontAddRemote

// Deprecated
_useListObjsCache //nolint:unused // Kept for backward compatibility.
// strict opposite of the `LsCached`
LsNotCached

// For remote buckets - list only remote props (aka `wantOnlyRemote`). When false,
// for remote buckets - list only remote props (aka `wantOnlyRemote`). When false,
// the default that's being used is: `WantOnlyRemoteProps` - see below.
// When true, the request gets executed in a pass-through fashion whereby a single ais target
// simply forwards it to the associated remote backend and delivers the results as is to the
// requesting proxy and, subsequently, to client.
LsWantOnlyRemoteProps

// List objects without recursion (POSIX-wise).
// See related feature flag: feat.DontOptimizeVirtualDir
// list objects without recursion (POSIX-wise).
// see related feature flag: feat.DontOptimizeVirtualDir
LsNoRecursion

// Bidirectional (remote <-> in-cluster) diff requires remote metadata-capable (`HasVersioningMD`) buckets;
// bidirectional (remote <-> in-cluster) diff requires remote metadata-capable (`HasVersioningMD`) buckets;
// it entails:
// - checking whether remote version exists,
// and if it does,
// - checking whether it differs from its in-cluster copy.
// See related `cmn.LsoEnt` flags: `EntryVerChanged` and `EntryVerRemoved`, respectively.
// see related `cmn.LsoEnt` flags: `EntryVerChanged` and `EntryVerRemoved`, respectively.
LsDiff

// Do not return virtual subdirectories - do not include them as `cmn.LsoEnt` entries
// do not return virtual subdirectories - do not include them as `cmn.LsoEnt` entries
LsNoDirs
)

Expand Down Expand Up @@ -154,7 +160,7 @@ type LsoMsg struct {
StartAfter string `json:"start_after,omitempty"` // start listing after (AIS buckets only)
ContinuationToken string `json:"continuation_token"` // => LsoResult.ContinuationToken => LsoMsg.ContinuationToken
SID string `json:"target"` // selected target to solely execute backend.list-objects
Flags uint64 `json:"flags,string"` // enum {LsObjCached, ...} - "LsoMsg flags" above
Flags uint64 `json:"flags,string"` // enum {LsCached, ...} - "LsoMsg flags" above
PageSize int64 `json:"pagesize"` // max entries returned by list objects call
}

Expand Down Expand Up @@ -224,7 +230,7 @@ func (lsmsg *LsoMsg) Str(cname string) string {
}

sb.WriteString(", flags:")
if lsmsg.IsFlagSet(LsObjCached) {
if lsmsg.IsFlagSet(LsCached) {
sb.WriteString("cached,")
}
if lsmsg.IsFlagSet(LsMissing) {
Expand All @@ -249,7 +255,7 @@ func (lsmsg *LsoMsg) Str(cname string) string {
return s[:len(s)-1]
}

// LsoMsg flags enum: LsObjCached, ...
// LsoMsg flags enum: LsCached, ...
func (lsmsg *LsoMsg) SetFlag(flag uint64) { lsmsg.Flags |= flag }
func (lsmsg *LsoMsg) ClearFlag(flag uint64) { lsmsg.Flags &= ^flag }
func (lsmsg *LsoMsg) IsFlagSet(flags uint64) bool { return lsmsg.Flags&flags == flags }
Expand Down
2 changes: 1 addition & 1 deletion bench/tools/aisloader/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func listObjectNames(p *params) ([]string, error) {
msg = &apc.LsoMsg{Prefix: p.subDir}
)
if cached {
msg.Flags |= apc.LsObjCached // remote bucket: in-cluster objects only
msg.Flags |= apc.LsCached // remote bucket: in-cluster objects only
}
if !listDirs {
msg.Flags |= apc.LsNoDirs // aisloader's default (to override, use --list-dirs)
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/cli/arch_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ var (
Name: cmdList,
Usage: "List archived content (supported formats: " + archFormats + ")",
ArgsUsage: optionalShardArgument,
Flags: sortFlags(rmFlags(bucketCmdsFlags[commandList], listArchFlag)),
Flags: sortFlags(rmFlags(lsCmdFlags, listArchFlag)),
Action: listArchHandler,
BashComplete: bucketCompletions(bcmplop{}),
}
Expand Down
77 changes: 40 additions & 37 deletions cmd/cli/cli/bucket_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,43 @@ const evictUsage = "Evict one remote bucket, multiple remote buckets, or\n" +
indent1 + "\t- 'evict gs://abc --template \"shard-{0000..9999}.tar.lz4\"'\t- evict the matching range (prefix + brace expansion);\n" +
indent1 + "\t- 'evict \"gs://abc/shard-{0000..9999}.tar.lz4\"'\t- same as above (notice double quotes)"

// flags
var (
// flags
lsCmdFlags = []cli.Flag{
allObjsOrBcksFlag,
listCachedFlag,
listNotCachedFlag,
nameOnlyFlag,
objPropsFlag,
regexLsAnyFlag,
templateFlag,
listObjPrefixFlag,
pageSizeFlag,
pagedFlag,
objLimitFlag,
refreshFlag,
showUnmatchedFlag,
noHeaderFlag,
noFooterFlag,
maxPagesFlag,
startAfterFlag,
bckSummaryFlag,
noRecursFlag,
noDirsFlag,
dontHeadRemoteFlag,
dontAddRemoteFlag,
listArchFlag,
unitsFlag,
silentFlag,
dontWaitFlag,
diffFlag,
countAndTimeFlag,
// bucket inventory
useInventoryFlag,
invNameFlag,
invIDFlag,
}

bucketCmdsFlags = map[string][]cli.Flag{
commandCreate: {
ignoreErrorFlag,
Expand Down Expand Up @@ -157,52 +192,20 @@ var (
},
cmdResetBprops: {},

commandList: {
allObjsOrBcksFlag,
listObjCachedFlag,
nameOnlyFlag,
objPropsFlag,
regexLsAnyFlag,
templateFlag,
listObjPrefixFlag,
pageSizeFlag,
pagedFlag,
objLimitFlag,
refreshFlag,
showUnmatchedFlag,
noHeaderFlag,
noFooterFlag,
maxPagesFlag,
startAfterFlag,
bckSummaryFlag,
noRecursFlag,
noDirsFlag,
dontHeadRemoteFlag,
dontAddRemoteFlag,
listArchFlag,
unitsFlag,
silentFlag,
dontWaitFlag,
diffFlag,
countAndTimeFlag,
// bucket inventory
useInventoryFlag,
invNameFlag,
invIDFlag,
},

cmdLRU: {
enableFlag,
disableFlag,
},
}
)

// commands
// commands
var (
bucketsObjectsCmdList = cli.Command{
Name: commandList,
Usage: listAnyUsage,
ArgsUsage: lsAnyCommandArgument,
Flags: sortFlags(bucketCmdsFlags[commandList]),
Flags: sortFlags(lsCmdFlags),
Action: listAnyHandler,
BashComplete: bucketCompletions(bcmplop{}),
}
Expand Down
Loading

0 comments on commit 048342b

Please sign in to comment.