From 72146cabfc07590667c8cf741ad0b42c908a49bf Mon Sep 17 00:00:00 2001 From: Chang Chen Chien Date: Sun, 12 Nov 2023 16:48:52 +0800 Subject: [PATCH] feat: Align go-redis ProbabilisticCmdable (#406) * init commit * WIP: Add new command types * copy tests from go-redis * add go-redis test url * WIP: scaffolding BF* cmd TODO: CacheCompat * add BF* to CacheCompat * WIP: Compat * WIP: TDigest * fill all commands * WIP: Finish BF.INFO cmd * WIP: BF.INFO: use generic to handle BfInfoArgType * introduce baseCmd * minor tweak * change tag from cmd to redis * make tests for BF.INFO pass * add notes for argument in rueidiscompat.Scan() * implement newScanDumpCmd * implement newCMSInfoCmd * implement CF.* * implement TopK.*, TDigest.* * minor tweaks * revert command spec for TDIGEST.ADD, allow multiple option in value * revert command spec for TDIGEST.BYRANK to avoid breaking change * revert command for CMS.MERGE * use str instead of fmt.Sprint to avoid unexpected any to string conversion * revert command spec for TDIGEST.BYREVRANK * minor tweaks * revert TDIGEST.ADD: remove multiple:true in values block * replace fmt.Sprint() by str() --- rueidiscompat/adapter.go | 650 +++++++++++++++++++++++++++++++ rueidiscompat/adapter_test.go | 714 ++++++++++++++++++++++++++++++++++ rueidiscompat/command.go | 310 +++++++++++++++ rueidiscompat/hscan.go | 1 + 4 files changed, 1675 insertions(+) diff --git a/rueidiscompat/adapter.go b/rueidiscompat/adapter.go index 99be17e9..3120f7da 100644 --- a/rueidiscompat/adapter.go +++ b/rueidiscompat/adapter.go @@ -388,6 +388,81 @@ type Cmdable interface { // TODO ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd GearsCmdable + ProbabilisticCmdable +} + +// https://github.com/redis/go-redis/blob/af4872cbd0de349855ce3f0978929c2f56eb995f/probabilistic.go#L10 +type ProbabilisticCmdable interface { + BFAdd(ctx context.Context, key string, element interface{}) *BoolCmd + BFCard(ctx context.Context, key string) *IntCmd + BFExists(ctx context.Context, key string, element interface{}) *BoolCmd + BFInfo(ctx context.Context, key string) *BFInfoCmd + BFInfoArg(ctx context.Context, key, option string) *BFInfoCmd + BFInfoCapacity(ctx context.Context, key string) *BFInfoCmd + BFInfoSize(ctx context.Context, key string) *BFInfoCmd + BFInfoFilters(ctx context.Context, key string) *BFInfoCmd + BFInfoItems(ctx context.Context, key string) *BFInfoCmd + BFInfoExpansion(ctx context.Context, key string) *BFInfoCmd + BFInsert(ctx context.Context, key string, options *BFInsertOptions, elements ...interface{}) *BoolSliceCmd + BFMAdd(ctx context.Context, key string, elements ...interface{}) *BoolSliceCmd + BFMExists(ctx context.Context, key string, elements ...interface{}) *BoolSliceCmd + BFReserve(ctx context.Context, key string, errorRate float64, capacity int64) *StatusCmd + BFReserveExpansion(ctx context.Context, key string, errorRate float64, capacity, expansion int64) *StatusCmd + BFReserveNonScaling(ctx context.Context, key string, errorRate float64, capacity int64) *StatusCmd + BFReserveWithArgs(ctx context.Context, key string, options *BFReserveOptions) *StatusCmd + BFScanDump(ctx context.Context, key string, iterator int64) *ScanDumpCmd + BFLoadChunk(ctx context.Context, key string, iterator int64, data interface{}) *StatusCmd + + CFAdd(ctx context.Context, key string, element interface{}) *BoolCmd + CFAddNX(ctx context.Context, key string, element interface{}) *BoolCmd + CFCount(ctx context.Context, key string, element interface{}) *IntCmd + CFDel(ctx context.Context, key string, element interface{}) *BoolCmd + CFExists(ctx context.Context, key string, element interface{}) *BoolCmd + CFInfo(ctx context.Context, key string) *CFInfoCmd + CFInsert(ctx context.Context, key string, options *CFInsertOptions, elements ...interface{}) *BoolSliceCmd + CFInsertNX(ctx context.Context, key string, options *CFInsertOptions, elements ...interface{}) *IntSliceCmd + CFMExists(ctx context.Context, key string, elements ...interface{}) *BoolSliceCmd + CFReserve(ctx context.Context, key string, capacity int64) *StatusCmd + CFReserveWithArgs(ctx context.Context, key string, options *CFReserveOptions) *StatusCmd + CFReserveExpansion(ctx context.Context, key string, capacity int64, expansion int64) *StatusCmd + CFReserveBucketSize(ctx context.Context, key string, capacity int64, bucketsize int64) *StatusCmd + CFReserveMaxIterations(ctx context.Context, key string, capacity int64, maxiterations int64) *StatusCmd + CFScanDump(ctx context.Context, key string, iterator int64) *ScanDumpCmd + CFLoadChunk(ctx context.Context, key string, iterator int64, data interface{}) *StatusCmd + + CMSIncrBy(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd + CMSInfo(ctx context.Context, key string) *CMSInfoCmd + CMSInitByDim(ctx context.Context, key string, width, height int64) *StatusCmd + CMSInitByProb(ctx context.Context, key string, errorRate, probability float64) *StatusCmd + CMSMerge(ctx context.Context, destKey string, sourceKeys ...string) *StatusCmd + CMSMergeWithWeight(ctx context.Context, destKey string, sourceKeys map[string]int64) *StatusCmd + CMSQuery(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd + + TopKAdd(ctx context.Context, key string, elements ...interface{}) *StringSliceCmd + TopKCount(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd + TopKIncrBy(ctx context.Context, key string, elements ...interface{}) *StringSliceCmd + TopKInfo(ctx context.Context, key string) *TopKInfoCmd + TopKList(ctx context.Context, key string) *StringSliceCmd + TopKListWithCount(ctx context.Context, key string) *MapStringIntCmd + TopKQuery(ctx context.Context, key string, elements ...interface{}) *BoolSliceCmd + TopKReserve(ctx context.Context, key string, k int64) *StatusCmd + TopKReserveWithOptions(ctx context.Context, key string, k int64, width, depth int64, decay float64) *StatusCmd + + TDigestAdd(ctx context.Context, key string, elements ...float64) *StatusCmd + TDigestByRank(ctx context.Context, key string, rank ...uint64) *FloatSliceCmd + TDigestByRevRank(ctx context.Context, key string, rank ...uint64) *FloatSliceCmd + TDigestCDF(ctx context.Context, key string, elements ...float64) *FloatSliceCmd + TDigestCreate(ctx context.Context, key string) *StatusCmd + TDigestCreateWithCompression(ctx context.Context, key string, compression int64) *StatusCmd + TDigestInfo(ctx context.Context, key string) *TDigestInfoCmd + TDigestMax(ctx context.Context, key string) *FloatCmd + TDigestMin(ctx context.Context, key string) *FloatCmd + TDigestMerge(ctx context.Context, destKey string, options *TDigestMergeOptions, sourceKeys ...string) *StatusCmd + TDigestQuantile(ctx context.Context, key string, elements ...float64) *FloatSliceCmd + TDigestRank(ctx context.Context, key string, values ...float64) *IntSliceCmd + TDigestReset(ctx context.Context, key string) *StatusCmd + TDigestRevRank(ctx context.Context, key string, values ...float64) *IntSliceCmd + TDigestTrimmedMean(ctx context.Context, key string, lowCutQuantile, highCutQuantile float64) *FloatCmd } // Align with go-redis @@ -2879,6 +2954,7 @@ func (c *Compat) TFunctionLoad(ctx context.Context, lib string) *StatusCmd { return newStatusCmd(resp) } +// FIXME: should check nil of options func (c *Compat) TFunctionLoadArgs(ctx context.Context, lib string, options *TFunctionLoadOptions) *StatusCmd { b := c.client.B() var cmd cmds.Completed @@ -2970,6 +3046,473 @@ func (c *Compat) TFCallASYNCArgs(ctx context.Context, libName string, funcName s return newCmd(resp) } +func (c *Compat) BFAdd(ctx context.Context, key string, element interface{}) *BoolCmd { + cmd := c.client.B().BfAdd().Key(key).Item(str(element)).Build() + return newBoolCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFCard(ctx context.Context, key string) *IntCmd { + cmd := c.client.B().BfCard().Key(key).Build() + return newIntCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFExists(ctx context.Context, key string, element interface{}) *BoolCmd { + cmd := c.client.B().BfExists().Key(key).Item(str(element)).Build() + return newBoolCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFInfo(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Build() + return newBFInfoCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFInfoArg(ctx context.Context, key, option string) *BFInfoCmd { + switch option { + case "CAPACITY": + return c.BFInfoCapacity(ctx, key) + case "SIZE": + return c.BFInfoSize(ctx, key) + case "FILTERS": + return c.BFInfoFilters(ctx, key) + case "ITEMS": + return c.BFInfoItems(ctx, key) + case "EXPANSION": + return c.BFInfoExpansion(ctx, key) + default: + panic(fmt.Sprintf("unknown option %v", option)) + } +} + +func (c *Compat) BFInfoCapacity(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Capacity().Build() + return newBFInfoCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFInfoSize(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Size().Build() + return newBFInfoCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFInfoFilters(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Filters().Build() + return newBFInfoCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFInfoItems(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Items().Build() + return newBFInfoCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFInfoExpansion(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Expansion().Build() + return newBFInfoCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFInsert(ctx context.Context, key string, options *BFInsertOptions, elements ...interface{}) *BoolSliceCmd { + _cmd := c.client.B(). + BfInsert(). + Key(key). + Capacity(options.Capacity). + Error(options.Error). + Expansion(options.Expansion) + if options.NonScaling { + _cmd.Nonscaling() + } + if options.NoCreate { + _cmd.Nocreate() + } + items := _cmd.Items() + for _, e := range elements { + items.Item(str(e)) + } + cmd := (cmds.BfInsertItem)(items).Build() + return newBoolSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFMAdd(ctx context.Context, key string, elements ...interface{}) *BoolSliceCmd { + cmd := c.client.B().BfMadd().Key(key) + var last cmds.BfMaddItem + for _, e := range elements { + last = cmd.Item(str(e)) + } + return newBoolSliceCmd(c.client.Do(ctx, last.Build())) +} + +func (c *Compat) BFMExists(ctx context.Context, key string, elements ...interface{}) *BoolSliceCmd { + cmd := c.client.B().BfMexists().Key(key) + var last cmds.BfMexistsItem + for _, e := range elements { + last = cmd.Item(str(e)) + } + return newBoolSliceCmd(c.client.Do(ctx, last.Build())) +} + +func (c *Compat) BFReserve(ctx context.Context, key string, errorRate float64, capacity int64) *StatusCmd { + cmd := c.client.B().BfReserve().Key(key).ErrorRate(errorRate).Capacity(capacity).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFReserveExpansion(ctx context.Context, key string, errorRate float64, capacity, expansion int64) *StatusCmd { + cmd := c.client.B().BfReserve().Key(key).ErrorRate(errorRate).Capacity(capacity).Expansion(expansion).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFReserveNonScaling(ctx context.Context, key string, errorRate float64, capacity int64) *StatusCmd { + cmd := c.client.B().BfReserve().Key(key).ErrorRate(errorRate).Capacity(capacity).Nonscaling().Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFReserveWithArgs(ctx context.Context, key string, options *BFReserveOptions) *StatusCmd { + cmd := c.client.B().BfReserve().Key(key).ErrorRate(options.Error).Capacity(options.Capacity).Expansion(options.Expansion) + if options.NonScaling { + cmd.Nonscaling() + } + return newStatusCmd(c.client.Do(ctx, cmd.Build())) +} + +func (c *Compat) BFScanDump(ctx context.Context, key string, iterator int64) *ScanDumpCmd { + cmd := c.client.B().BfScandump().Key(key).Iterator(iterator).Build() + return newScanDumpCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) BFLoadChunk(ctx context.Context, key string, iterator int64, data interface{}) *StatusCmd { + cmd := c.client.B().BfLoadchunk().Key(key).Iterator(iterator).Data(str(data)).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFAdd(ctx context.Context, key string, element interface{}) *BoolCmd { + cmd := c.client.B().CfAdd().Key(key).Item(str(element)).Build() + return newBoolCmd(c.client.Do(ctx, cmd)) +} +func (c *Compat) CFAddNX(ctx context.Context, key string, element interface{}) *BoolCmd { + cmd := c.client.B().CfAddnx().Key(key).Item(str(element)).Build() + return newBoolCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFCount(ctx context.Context, key string, element interface{}) *IntCmd { + cmd := c.client.B().CfCount().Key(key).Item(str(element)).Build() + return newIntCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFDel(ctx context.Context, key string, element interface{}) *BoolCmd { + cmd := c.client.B().CfDel().Key(key).Item(str(element)).Build() + return newBoolCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFExists(ctx context.Context, key string, element interface{}) *BoolCmd { + cmd := c.client.B().CfExists().Key(key).Item(str(element)).Build() + return newBoolCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFInfo(ctx context.Context, key string) *CFInfoCmd { + cmd := c.client.B().CfInfo().Key(key).Build() + return newCFInfoCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFInsert(ctx context.Context, key string, options *CFInsertOptions, elements ...interface{}) *BoolSliceCmd { + _cmd := c.client.B().CfInsert().Key(key) + if options != nil { + _cmd.Capacity(options.Capacity) + if options.NoCreate { + _cmd.Nocreate() + } + } + items := _cmd.Items() + for _, e := range elements { + items.Item(str(e)) + } + cmd := (cmds.CfInsertItem)(items).Build() + return newBoolSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFInsertNX(ctx context.Context, key string, options *CFInsertOptions, elements ...interface{}) *IntSliceCmd { + _cmd := c.client.B().CfInsertnx().Key(key).Capacity(options.Capacity) + if options.NoCreate { + _cmd.Nocreate() + } + items := _cmd.Items() + for _, e := range elements { + items.Item(str(e)) + } + cmd := (cmds.CfInsertItem)(items).Build() + return newIntSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFMExists(ctx context.Context, key string, elements ...interface{}) *BoolSliceCmd { + _cmd := c.client.B().CfMexists().Key(key) + for _, e := range elements { + _cmd.Item(str(e)) + } + cmd := (cmds.CfMexistsItem)(_cmd).Build() + return newBoolSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFReserve(ctx context.Context, key string, capacity int64) *StatusCmd { + cmd := c.client.B().CfReserve().Key(key).Capacity(capacity).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFReserveWithArgs(ctx context.Context, key string, options *CFReserveOptions) *StatusCmd { + cmd := c.client.B(). + CfReserve(). + Key(key). + Capacity(options.Capacity). + Bucketsize(options.BucketSize). + Maxiterations(options.MaxIterations). + Expansion(options.Expansion). + Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFReserveExpansion(ctx context.Context, key string, capacity int64, expansion int64) *StatusCmd { + cmd := c.client.B(). + CfReserve(). + Key(key). + Capacity(capacity). + Expansion(expansion). + Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFReserveBucketSize(ctx context.Context, key string, capacity int64, bucketsize int64) *StatusCmd { + cmd := c.client.B(). + CfReserve(). + Key(key). + Capacity(capacity). + Bucketsize(bucketsize). + Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFReserveMaxIterations(ctx context.Context, key string, capacity int64, maxiterations int64) *StatusCmd { + cmd := c.client.B(). + CfReserve(). + Key(key). + Capacity(capacity). + Maxiterations(maxiterations). + Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFScanDump(ctx context.Context, key string, iterator int64) *ScanDumpCmd { + cmd := c.client.B().CfScandump().Key(key).Iterator(iterator).Build() + return newScanDumpCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CFLoadChunk(ctx context.Context, key string, iterator int64, data interface{}) *StatusCmd { + cmd := c.client.B().CfLoadchunk().Key(key).Iterator(iterator).Data(str(data)).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CMSIncrBy(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd { + _cmd := c.client.B().CmsIncrby().Key(key) + for i := 0; i < len(elements); i += 2 { + _cmd.Item(str(elements[i])).Increment((int64)(elements[i+1].(int))) + } + cmd := (cmds.CmsIncrbyItemsIncrement)(_cmd).Build() + return newIntSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CMSInfo(ctx context.Context, key string) *CMSInfoCmd { + cmd := c.client.B().CmsInfo().Key(key).Build() + return newCMSInfoCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CMSInitByDim(ctx context.Context, key string, width, height int64) *StatusCmd { + cmd := c.client.B().CmsInitbydim().Key(key).Width(width).Depth(height).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CMSInitByProb(ctx context.Context, key string, errorRate, probability float64) *StatusCmd { + cmd := c.client.B().CmsInitbyprob().Key(key).Error(errorRate).Probability(probability).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CMSMerge(ctx context.Context, destKey string, sourceKeys ...string) *StatusCmd { + cmd := c.client.B().CmsMerge().Destination(destKey).Numkeys((int64)(len(sourceKeys))).Source(sourceKeys...).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CMSMergeWithWeight(ctx context.Context, destKey string, sourceKeys map[string]int64) *StatusCmd { + _cmd := c.client.B().CmsMerge().Destination(destKey).Numkeys((int64)(len(sourceKeys))) + keys := make([]string, 0, len(sourceKeys)) + for k := range sourceKeys { + keys = append(keys, k) + } + for _, k := range keys { + _cmd.Source(k) + } + wCmd := (cmds.CmsMergeSource)(_cmd).Weights() + for _, k := range keys { + // weight should be integer + // we converts int64 to float64 to avoid API breaking change + wCmd.Weight((float64)(sourceKeys[k])) + } + cmd := (cmds.CmsMergeWeightWeight)(wCmd).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) CMSQuery(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd { + _cmd := c.client.B().CmsQuery().Key(key) + for _, e := range elements { + _cmd.Item(str(e)) + } + cmd := (cmds.CmsQueryItem)(_cmd).Build() + return newIntSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TopKAdd(ctx context.Context, key string, elements ...interface{}) *StringSliceCmd { + _cmd := c.client.B().TopkAdd().Key(key) + for _, e := range elements { + _cmd.Items(str(e)) + } + cmd := (cmds.TopkAddItems)(_cmd).Build() + return newStringSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TopKCount(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd { + _cmd := c.client.B().TopkCount().Key(key) + for _, e := range elements { + _cmd.Item(str(e)) + } + cmd := (cmds.TopkCountItem)(_cmd).Build() + return newIntSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TopKIncrBy(ctx context.Context, key string, elements ...interface{}) *StringSliceCmd { + _cmd := c.client.B().TopkIncrby().Key(key) + for _, e := range elements { + _cmd.Item(str(e)) + } + cmd := (cmds.TopkIncrbyItemsIncrement)(_cmd).Build() + return newStringSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TopKInfo(ctx context.Context, key string) *TopKInfoCmd { + cmd := c.client.B().TopkInfo().Key(key).Build() + return newTopKInfoCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TopKList(ctx context.Context, key string) *StringSliceCmd { + cmd := c.client.B().TopkList().Key(key).Build() + return newStringSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TopKListWithCount(ctx context.Context, key string) *MapStringIntCmd { + cmd := c.client.B().TopkList().Key(key).Withcount().Build() + return newMapStringIntCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TopKQuery(ctx context.Context, key string, elements ...interface{}) *BoolSliceCmd { + _cmd := c.client.B().TopkQuery().Key(key) + for _, e := range elements { + _cmd.Item(str(e)) + } + cmd := (cmds.TopkQueryItem)(_cmd).Build() + return newBoolSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TopKReserve(ctx context.Context, key string, k int64) *StatusCmd { + cmd := c.client.B().TopkReserve().Key(key).Topk(k).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TopKReserveWithOptions(ctx context.Context, key string, k int64, width, depth int64, decay float64) *StatusCmd { + cmd := c.client.B().TopkReserve().Key(key).Topk(k).Width(width).Depth(depth).Decay(decay).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestAdd(ctx context.Context, key string, elements ...float64) *StatusCmd { + _cmd := c.client.B().TdigestAdd().Key(key) + for _, e := range elements { + _cmd.Value(e) + } + cmd := (cmds.TdigestAddValuesValue)(_cmd).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestByRank(ctx context.Context, key string, rank ...uint64) *FloatSliceCmd { + _cmd := c.client.B().TdigestByrank().Key(key) + for _, r := range rank { + _cmd.Rank((float64)(r)) + } + cmd := (cmds.TdigestByrankRank)(_cmd).Build() + return newFloatSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestByRevRank(ctx context.Context, key string, rank ...uint64) *FloatSliceCmd { + _cmd := c.client.B().TdigestByrevrank().Key(key) + for _, r := range rank { + _cmd.ReverseRank((float64)(r)) + } + cmd := (cmds.TdigestByrevrankReverseRank)(_cmd).Build() + return newFloatSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestCDF(ctx context.Context, key string, elements ...float64) *FloatSliceCmd { + cmd := c.client.B().TdigestCdf().Key(key).Value(elements...).Build() + return newFloatSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestCreate(ctx context.Context, key string) *StatusCmd { + cmd := c.client.B().TdigestCreate().Key(key).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) + +} +func (c *Compat) TDigestCreateWithCompression(ctx context.Context, key string, compression int64) *StatusCmd { + cmd := c.client.B().TdigestCreate().Key(key).Compression(compression).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestInfo(ctx context.Context, key string) *TDigestInfoCmd { + cmd := c.client.B().TdigestInfo().Key(key).Build() + return newTDigestInfoCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestMax(ctx context.Context, key string) *FloatCmd { + cmd := c.client.B().TdigestMax().Key(key).Build() + return newFloatCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestMin(ctx context.Context, key string) *FloatCmd { + cmd := c.client.B().TdigestMin().Key(key).Build() + return newFloatCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestMerge(ctx context.Context, destKey string, options *TDigestMergeOptions, sourceKeys ...string) *StatusCmd { + _cmd := c.client.B().TdigestMerge().DestinationKey(destKey).Numkeys(int64(len(sourceKeys))).SourceKey(sourceKeys...).Compression(options.Compression) + if options.Override { + _cmd.Override() + } + cmd := (cmds.TdigestMergeOverride)(_cmd).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestQuantile(ctx context.Context, key string, elements ...float64) *FloatSliceCmd { + cmd := c.client.B().TdigestQuantile().Key(key).Quantile(elements...).Build() + return newFloatSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestRank(ctx context.Context, key string, values ...float64) *IntSliceCmd { + cmd := c.client.B().TdigestRank().Key(key).Value(values...).Build() + return newIntSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestReset(ctx context.Context, key string) *StatusCmd { + cmd := c.client.B().TdigestReset().Key(key).Build() + return newStatusCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestRevRank(ctx context.Context, key string, values ...float64) *IntSliceCmd { + cmd := c.client.B().TdigestRevrank().Key(key).Value(values...).Build() + return newIntSliceCmd(c.client.Do(ctx, cmd)) +} + +func (c *Compat) TDigestTrimmedMean(ctx context.Context, key string, lowCutQuantile, highCutQuantile float64) *FloatCmd { + cmd := c.client.B().TdigestTrimmedMean().Key(key).LowCutQuantile(lowCutQuantile).HighCutQuantile(highCutQuantile).Build() + return newFloatCmd(c.client.Do(ctx, cmd)) +} + func (c CacheCompat) BitCount(ctx context.Context, key string, bitCount *BitCount) *IntCmd { var resp rueidis.RedisResult if bitCount == nil { @@ -3413,6 +3956,113 @@ func (c CacheCompat) ZScore(ctx context.Context, key, member string) *FloatCmd { return newFloatCmd(resp) } +func (c CacheCompat) BFExists(ctx context.Context, key string, element interface{}) *BoolCmd { + cmd := c.client.B().BfExists().Key(key).Item(str(element)).Cache() + resp := c.client.DoCache(ctx, cmd, c.ttl) + return newBoolCmd(resp) +} + +func (c CacheCompat) BFInfo(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Cache() + resp := c.client.DoCache(ctx, cmd, c.ttl) + return newBFInfoCmd(resp) +} + +func (c CacheCompat) BFInfoArg(ctx context.Context, key, option string) *BFInfoCmd { + switch option { + case "CAPACITY": + return c.BFInfoCapacity(ctx, key) + case "SIZE": + return c.BFInfoSize(ctx, key) + case "FILTERS": + return c.BFInfoFilters(ctx, key) + case "ITEMS": + return c.BFInfoItems(ctx, key) + case "EXPANSION": + return c.BFInfoExpansion(ctx, key) + default: + panic(fmt.Sprintf("unknown option %v", option)) + } +} + +func (c CacheCompat) BFInfoCapacity(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Capacity().Cache() + resp := c.client.DoCache(ctx, cmd, c.ttl) + return newBFInfoCmd(resp) +} + +func (c CacheCompat) BFInfoSize(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Size().Cache() + resp := c.client.DoCache(ctx, cmd, c.ttl) + return newBFInfoCmd(resp) +} + +func (c CacheCompat) BFInfoFilters(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Filters().Cache() + resp := c.client.DoCache(ctx, cmd, c.ttl) + return newBFInfoCmd(resp) +} + +func (c CacheCompat) BFInfoItems(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Items().Cache() + resp := c.client.DoCache(ctx, cmd, c.ttl) + return newBFInfoCmd(resp) +} + +func (c CacheCompat) BFInfoExpansion(ctx context.Context, key string) *BFInfoCmd { + cmd := c.client.B().BfInfo().Key(key).Expansion().Cache() + resp := c.client.DoCache(ctx, cmd, c.ttl) + return newBFInfoCmd(resp) +} + +func (c *CacheCompat) CFCount(ctx context.Context, key string, element interface{}) *IntCmd { + cmd := c.client.B().CfCount().Key(key).Item(str(element)).Cache() + return newIntCmd(c.client.DoCache(ctx, cmd, c.ttl)) +} + +func (c *CacheCompat) CFExists(ctx context.Context, key string, element interface{}) *BoolCmd { + cmd := c.client.B().CfExists().Key(key).Item(str(element)).Cache() + return newBoolCmd(c.client.DoCache(ctx, cmd, c.ttl)) +} + +func (c *CacheCompat) CFInfo(ctx context.Context, key string) *CFInfoCmd { + cmd := c.client.B().CfInfo().Key(key).Cache() + return newCFInfoCmd(c.client.DoCache(ctx, cmd, c.ttl)) +} + +func (c *CacheCompat) CMSInfo(ctx context.Context, key string) *CMSInfoCmd { + cmd := c.client.B().CmsInfo().Key(key).Cache() + return newCMSInfoCmd(c.client.DoCache(ctx, cmd, c.ttl)) +} + +func (c *CacheCompat) CMSQuery(ctx context.Context, key string, elements ...interface{}) *IntSliceCmd { + _cmd := c.client.B().CmsQuery().Key(key) + for _, e := range elements { + _cmd.Item(str(e)) + } + cmd := (cmds.CmsQueryItem)(_cmd).Cache() + return newIntSliceCmd(c.client.DoCache(ctx, cmd, c.ttl)) +} + +func (c *CacheCompat) TopKInfo(ctx context.Context, key string) *TopKInfoCmd { + cmd := c.client.B().TopkInfo().Key(key).Cache() + return newTopKInfoCmd(c.client.DoCache(ctx, cmd, c.ttl)) +} + +func (c *CacheCompat) TopKList(ctx context.Context, key string) *StringSliceCmd { + cmd := c.client.B().TopkList().Key(key).Cache() + return newStringSliceCmd(c.client.DoCache(ctx, cmd, c.ttl)) +} + +func (c *CacheCompat) TopKQuery(ctx context.Context, key string, elements ...interface{}) *BoolSliceCmd { + _cmd := c.client.B().TopkQuery().Key(key) + for _, e := range elements { + _cmd.Item(str(e)) + } + cmd := (cmds.TopkQueryItem)(_cmd).Cache() + return newBoolSliceCmd(c.client.DoCache(ctx, cmd, c.ttl)) +} + func str(arg any) string { if arg == nil { return "" diff --git a/rueidiscompat/adapter_test.go b/rueidiscompat/adapter_test.go index 57db2cea..dc8ddd42 100644 --- a/rueidiscompat/adapter_test.go +++ b/rueidiscompat/adapter_test.go @@ -30,6 +30,7 @@ import ( "context" "encoding/json" "fmt" + "math" "strconv" "strings" "testing" @@ -8811,6 +8812,719 @@ func testAdapterCache(resp3 bool) { Expect(resultAdd).To(BeEquivalentTo("bar")) }) }) + // https://github.com/redis/go-redis/blob/master/probabilistic_test.go#L14 + Describe("ProbabilisticCmdable", func() { + ctx := context.TODO() + BeforeEach(func() { + Expect(adapter.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + }) + Describe("bloom", Label("bloom"), func() { + It("should BFAdd", Label("bloom", "bfadd"), func() { + resultAdd, err := adapter.BFAdd(ctx, "testbf1", 1).Result() + + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd).To(BeTrue()) + + resultInfo, err := adapter.BFInfo(ctx, "testbf1").Result() + + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo).To(BeAssignableToTypeOf(BFInfo{})) + Expect(resultInfo.ItemsInserted).To(BeEquivalentTo(int64(1))) + }) + + It("should BFCard", Label("bloom", "bfcard"), func() { + // This is a probabilistic data structure, and it's not always guaranteed that we will get back + // the exact number of inserted items, during hash collisions + // But with such a low number of items (only 3), + // the probability of a collision is very low, so we can expect to get back the exact number of items + _, err := adapter.BFAdd(ctx, "testbf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + _, err = adapter.BFAdd(ctx, "testbf1", "item2").Result() + Expect(err).NotTo(HaveOccurred()) + _, err = adapter.BFAdd(ctx, "testbf1", 3).Result() + Expect(err).NotTo(HaveOccurred()) + + result, err := adapter.BFCard(ctx, "testbf1").Result() + + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeEquivalentTo(int64(3))) + }) + + It("should BFExists", Label("bloom", "bfexists"), func() { + exists, err := adapter.BFExists(ctx, "testbf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(exists).To(BeFalse()) + + _, err = adapter.BFAdd(ctx, "testbf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + + exists, err = adapter.BFExists(ctx, "testbf1", "item1").Result() + + Expect(err).NotTo(HaveOccurred()) + Expect(exists).To(BeTrue()) + }) + + It("should BFInfo and BFReserve", Label("bloom", "bfinfo", "bfreserve"), func() { + err := adapter.BFReserve(ctx, "testbf1", 0.001, 2000).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err := adapter.BFInfo(ctx, "testbf1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeAssignableToTypeOf(BFInfo{})) + Expect(result.Capacity).To(BeEquivalentTo(int64(2000))) + }) + + It("should BFInfoCapacity, BFInfoSize, BFInfoFilters, BFInfoItems, BFInfoExpansion, ", Label("bloom", "bfinfocapacity", "bfinfosize", "bfinfofilters", "bfinfoitems", "bfinfoexpansion"), func() { + err := adapter.BFReserve(ctx, "testbf1", 0.001, 2000).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err := adapter.BFInfoCapacity(ctx, "testbf1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result.Capacity).To(BeEquivalentTo(int64(2000))) + + result, err = adapter.BFInfoItems(ctx, "testbf1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result.ItemsInserted).To(BeEquivalentTo(int64(0))) + + result, err = adapter.BFInfoSize(ctx, "testbf1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result.Size).To(BeEquivalentTo(int64(4056))) + + err = adapter.BFReserveExpansion(ctx, "testbf2", 0.001, 2000, 3).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err = adapter.BFInfoFilters(ctx, "testbf2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result.Filters).To(BeEquivalentTo(int64(1))) + + result, err = adapter.BFInfoExpansion(ctx, "testbf2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result.ExpansionRate).To(BeEquivalentTo(int64(3))) + }) + + It("should BFInsert", Label("bloom", "bfinsert"), func() { + options := &BFInsertOptions{ + Capacity: 2000, + Error: 0.001, + Expansion: 3, + NonScaling: false, + NoCreate: true, + } + + resultInsert, err := adapter.BFInsert(ctx, "testbf1", options, "item1").Result() + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("not found")) + + options = &BFInsertOptions{ + Capacity: 2000, + Error: 0.001, + Expansion: 3, + NonScaling: false, + NoCreate: false, + } + + resultInsert, err = adapter.BFInsert(ctx, "testbf1", options, "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(resultInsert)).To(BeEquivalentTo(1)) + + exists, err := adapter.BFExists(ctx, "testbf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(exists).To(BeTrue()) + + result, err := adapter.BFInfo(ctx, "testbf1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeAssignableToTypeOf(BFInfo{})) + Expect(result.Capacity).To(BeEquivalentTo(int64(2000))) + Expect(result.ExpansionRate).To(BeEquivalentTo(int64(3))) + }) + + It("should BFMAdd", Label("bloom", "bfmadd"), func() { + resultAdd, err := adapter.BFMAdd(ctx, "testbf1", "item1", "item2", "item3").Result() + + Expect(err).NotTo(HaveOccurred()) + Expect(len(resultAdd)).To(Equal(3)) + + resultInfo, err := adapter.BFInfo(ctx, "testbf1").Result() + + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo).To(BeAssignableToTypeOf(BFInfo{})) + Expect(resultInfo.ItemsInserted).To(BeEquivalentTo(int64(3))) + resultAdd2, err := adapter.BFMAdd(ctx, "testbf1", "item1", "item2", "item4").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultAdd2[0]).To(BeFalse()) + Expect(resultAdd2[1]).To(BeFalse()) + Expect(resultAdd2[2]).To(BeTrue()) + }) + + It("should BFMExists", Label("bloom", "bfmexists"), func() { + exist, err := adapter.BFMExists(ctx, "testbf1", "item1", "item2", "item3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(exist)).To(Equal(3)) + Expect(exist[0]).To(BeFalse()) + Expect(exist[1]).To(BeFalse()) + Expect(exist[2]).To(BeFalse()) + + _, err = adapter.BFMAdd(ctx, "testbf1", "item1", "item2", "item3").Result() + Expect(err).NotTo(HaveOccurred()) + + exist, err = adapter.BFMExists(ctx, "testbf1", "item1", "item2", "item3", "item4").Result() + + Expect(err).NotTo(HaveOccurred()) + Expect(len(exist)).To(Equal(4)) + Expect(exist[0]).To(BeTrue()) + Expect(exist[1]).To(BeTrue()) + Expect(exist[2]).To(BeTrue()) + Expect(exist[3]).To(BeFalse()) + }) + + It("should BFReserveExpansion", Label("bloom", "bfreserveexpansion"), func() { + err := adapter.BFReserveExpansion(ctx, "testbf1", 0.001, 2000, 3).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err := adapter.BFInfo(ctx, "testbf1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeAssignableToTypeOf(BFInfo{})) + Expect(result.Capacity).To(BeEquivalentTo(int64(2000))) + Expect(result.ExpansionRate).To(BeEquivalentTo(int64(3))) + }) + + It("should BFReserveNonScaling", Label("bloom", "bfreservenonscaling"), func() { + err := adapter.BFReserveNonScaling(ctx, "testbfns1", 0.001, 1000).Err() + Expect(err).NotTo(HaveOccurred()) + + _, err = adapter.BFInfo(ctx, "testbfns1").Result() + Expect(err).NotTo(HaveOccurred()) + }) + + It("should BFScanDump and BFLoadChunk", Label("bloom", "bfscandump", "bfloadchunk"), func() { + err := adapter.BFReserve(ctx, "testbfsd1", 0.001, 3000).Err() + Expect(err).NotTo(HaveOccurred()) + for i := 0; i < 1000; i++ { + adapter.BFAdd(ctx, "testbfsd1", i) + } + infBefore := adapter.BFInfoSize(ctx, "testbfsd1") + fd := []ScanDump{} + sd, err := adapter.BFScanDump(ctx, "testbfsd1", 0).Result() + for { + if sd.Iter == 0 { + break + } + Expect(err).NotTo(HaveOccurred()) + fd = append(fd, sd) + sd, err = adapter.BFScanDump(ctx, "testbfsd1", sd.Iter).Result() + } + adapter.Del(ctx, "testbfsd1") + for _, e := range fd { + adapter.BFLoadChunk(ctx, "testbfsd1", e.Iter, e.Data) + } + infAfter := adapter.BFInfoSize(ctx, "testbfsd1") + Expect(infBefore).To(BeEquivalentTo(infAfter)) + }) + + It("should BFReserveWithArgs", Label("bloom", "bfreserveargs"), func() { + options := &BFReserveOptions{ + Capacity: 2000, + Error: 0.001, + Expansion: 3, + NonScaling: false, + } + err := adapter.BFReserveWithArgs(ctx, "testbf", options).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err := adapter.BFInfo(ctx, "testbf").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeAssignableToTypeOf(BFInfo{})) + Expect(result.Capacity).To(BeEquivalentTo(int64(2000))) + Expect(result.ExpansionRate).To(BeEquivalentTo(int64(3))) + }) + }) + + Describe("cuckoo", Label("cuckoo"), func() { + It("should CFAdd", Label("cuckoo", "cfadd"), func() { + add, err := adapter.CFAdd(ctx, "testcf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(add).To(BeTrue()) + + exists, err := adapter.CFExists(ctx, "testcf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(exists).To(BeTrue()) + + info, err := adapter.CFInfo(ctx, "testcf1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info).To(BeAssignableToTypeOf(CFInfo{})) + Expect(info.NumItemsInserted).To(BeEquivalentTo(int64(1))) + }) + + It("should CFAddNX", Label("cuckoo", "cfaddnx"), func() { + add, err := adapter.CFAddNX(ctx, "testcf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(add).To(BeTrue()) + + exists, err := adapter.CFExists(ctx, "testcf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(exists).To(BeTrue()) + + result, err := adapter.CFAddNX(ctx, "testcf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeFalse()) + + info, err := adapter.CFInfo(ctx, "testcf1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info).To(BeAssignableToTypeOf(CFInfo{})) + Expect(info.NumItemsInserted).To(BeEquivalentTo(int64(1))) + }) + + It("should CFCount", Label("cuckoo", "cfcount"), func() { + err := adapter.CFAdd(ctx, "testcf1", "item1").Err() + cnt, err := adapter.CFCount(ctx, "testcf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(cnt).To(BeEquivalentTo(int64(1))) + + err = adapter.CFAdd(ctx, "testcf1", "item1").Err() + Expect(err).NotTo(HaveOccurred()) + + cnt, err = adapter.CFCount(ctx, "testcf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(cnt).To(BeEquivalentTo(int64(2))) + }) + + It("should CFDel and CFExists", Label("cuckoo", "cfdel", "cfexists"), func() { + err := adapter.CFAdd(ctx, "testcf1", "item1").Err() + Expect(err).NotTo(HaveOccurred()) + + exists, err := adapter.CFExists(ctx, "testcf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(exists).To(BeTrue()) + + del, err := adapter.CFDel(ctx, "testcf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(del).To(BeTrue()) + + exists, err = adapter.CFExists(ctx, "testcf1", "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(exists).To(BeFalse()) + }) + + It("should CFInfo and CFReserve", Label("cuckoo", "cfinfo", "cfreserve"), func() { + err := adapter.CFReserve(ctx, "testcf1", 1000).Err() + Expect(err).NotTo(HaveOccurred()) + err = adapter.CFReserveExpansion(ctx, "testcfe1", 1000, 1).Err() + Expect(err).NotTo(HaveOccurred()) + err = adapter.CFReserveBucketSize(ctx, "testcfbs1", 1000, 4).Err() + Expect(err).NotTo(HaveOccurred()) + err = adapter.CFReserveMaxIterations(ctx, "testcfmi1", 1000, 10).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err := adapter.CFInfo(ctx, "testcf1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeAssignableToTypeOf(CFInfo{})) + }) + + It("should CFScanDump and CFLoadChunk", Label("bloom", "cfscandump", "cfloadchunk"), func() { + err := adapter.CFReserve(ctx, "testcfsd1", 1000).Err() + Expect(err).NotTo(HaveOccurred()) + for i := 0; i < 1000; i++ { + Item := fmt.Sprintf("item%d", i) + adapter.CFAdd(ctx, "testcfsd1", Item) + } + infBefore := adapter.CFInfo(ctx, "testcfsd1") + fd := []ScanDump{} + sd, err := adapter.CFScanDump(ctx, "testcfsd1", 0).Result() + for { + if sd.Iter == 0 { + break + } + Expect(err).NotTo(HaveOccurred()) + fd = append(fd, sd) + sd, err = adapter.CFScanDump(ctx, "testcfsd1", sd.Iter).Result() + } + adapter.Del(ctx, "testcfsd1") + for _, e := range fd { + adapter.CFLoadChunk(ctx, "testcfsd1", e.Iter, e.Data) + } + infAfter := adapter.CFInfo(ctx, "testcfsd1") + Expect(infBefore).To(BeEquivalentTo(infAfter)) + }) + + It("should CFInfo and CFReserveWithArgs", Label("cuckoo", "cfinfo", "cfreserveargs"), func() { + args := &CFReserveOptions{ + Capacity: 2048, + BucketSize: 3, + MaxIterations: 15, + Expansion: 2, + } + + err := adapter.CFReserveWithArgs(ctx, "testcf1", args).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err := adapter.CFInfo(ctx, "testcf1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(BeAssignableToTypeOf(CFInfo{})) + Expect(result.BucketSize).To(BeEquivalentTo(int64(3))) + Expect(result.MaxIteration).To(BeEquivalentTo(int64(15))) + Expect(result.ExpansionRate).To(BeEquivalentTo(int64(2))) + }) + + It("should CFInsert", Label("cuckoo", "cfinsert"), func() { + args := &CFInsertOptions{ + Capacity: 3000, + NoCreate: true, + } + + result, err := adapter.CFInsert(ctx, "testcf1", args, "item1", "item2", "item3").Result() + Expect(err).To(HaveOccurred()) + + args = &CFInsertOptions{ + Capacity: 3000, + NoCreate: false, + } + + result, err = adapter.CFInsert(ctx, "testcf1", args, "item1", "item2", "item3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(3)) + }) + + It("should CFInsertNX", Label("cuckoo", "cfinsertnx"), func() { + args := &CFInsertOptions{ + Capacity: 3000, + NoCreate: true, + } + + result, err := adapter.CFInsertNX(ctx, "testcf1", args, "item1", "item2", "item2").Result() + Expect(err).To(HaveOccurred()) + + args = &CFInsertOptions{ + Capacity: 3000, + NoCreate: false, + } + + result, err = adapter.CFInsertNX(ctx, "testcf2", args, "item1", "item2", "item2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(3)) + Expect(result[0]).To(BeEquivalentTo(int64(1))) + Expect(result[1]).To(BeEquivalentTo(int64(1))) + Expect(result[2]).To(BeEquivalentTo(int64(0))) + }) + + It("should CFMexists", Label("cuckoo", "cfmexists"), func() { + err := adapter.CFInsert(ctx, "testcf1", nil, "item1", "item2", "item3").Err() + Expect(err).NotTo(HaveOccurred()) + + result, err := adapter.CFMExists(ctx, "testcf1", "item1", "item2", "item3", "item4").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(4)) + Expect(result[0]).To(BeTrue()) + Expect(result[1]).To(BeTrue()) + Expect(result[2]).To(BeTrue()) + Expect(result[3]).To(BeFalse()) + }) + }) + + Describe("CMS", Label("cms"), func() { + It("should CMSIncrBy", Label("cms", "cmsincrby"), func() { + err := adapter.CMSInitByDim(ctx, "testcms1", 5, 10).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err := adapter.CMSIncrBy(ctx, "testcms1", "item1", 1, "item2", 2, "item3", 3).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(3)) + Expect(result[0]).To(BeEquivalentTo(int64(1))) + Expect(result[1]).To(BeEquivalentTo(int64(2))) + Expect(result[2]).To(BeEquivalentTo(int64(3))) + }) + + It("should CMSInitByDim and CMSInfo", Label("cms", "cmsinitbydim", "cmsinfo"), func() { + err := adapter.CMSInitByDim(ctx, "testcms1", 5, 10).Err() + Expect(err).NotTo(HaveOccurred()) + + info, err := adapter.CMSInfo(ctx, "testcms1").Result() + Expect(err).NotTo(HaveOccurred()) + + Expect(info).To(BeAssignableToTypeOf(CMSInfo{})) + Expect(info.Width).To(BeEquivalentTo(int64(5))) + Expect(info.Depth).To(BeEquivalentTo(int64(10))) + }) + + It("should CMSInitByProb", Label("cms", "cmsinitbyprob"), func() { + err := adapter.CMSInitByProb(ctx, "testcms1", 0.002, 0.01).Err() + Expect(err).NotTo(HaveOccurred()) + + info, err := adapter.CMSInfo(ctx, "testcms1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info).To(BeAssignableToTypeOf(CMSInfo{})) + }) + + It("should CMSMerge, CMSMergeWithWeight and CMSQuery", Label("cms", "cmsmerge", "cmsquery"), func() { + err := adapter.CMSMerge(ctx, "destCms1", "testcms2", "testcms3").Err() + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("CMS: key does not exist")) + + err = adapter.CMSInitByDim(ctx, "destCms1", 5, 10).Err() + Expect(err).NotTo(HaveOccurred()) + err = adapter.CMSInitByDim(ctx, "destCms2", 5, 10).Err() + Expect(err).NotTo(HaveOccurred()) + err = adapter.CMSInitByDim(ctx, "cms1", 2, 20).Err() + Expect(err).NotTo(HaveOccurred()) + err = adapter.CMSInitByDim(ctx, "cms2", 3, 20).Err() + Expect(err).NotTo(HaveOccurred()) + + err = adapter.CMSMerge(ctx, "destCms1", "cms1", "cms2").Err() + Expect(err).To(MatchError("CMS: width/depth is not equal")) + + adapter.Del(ctx, "cms1", "cms2") + + err = adapter.CMSInitByDim(ctx, "cms1", 5, 10).Err() + Expect(err).NotTo(HaveOccurred()) + err = adapter.CMSInitByDim(ctx, "cms2", 5, 10).Err() + Expect(err).NotTo(HaveOccurred()) + + adapter.CMSIncrBy(ctx, "cms1", "item1", 1, "item2", 2) + adapter.CMSIncrBy(ctx, "cms2", "item2", 2, "item3", 3) + + err = adapter.CMSMerge(ctx, "destCms1", "cms1", "cms2").Err() + Expect(err).NotTo(HaveOccurred()) + + result, err := adapter.CMSQuery(ctx, "destCms1", "item1", "item2", "item3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(3)) + Expect(result[0]).To(BeEquivalentTo(int64(1))) + Expect(result[1]).To(BeEquivalentTo(int64(4))) + Expect(result[2]).To(BeEquivalentTo(int64(3))) + + sourceSketches := map[string]int64{ + "cms1": 1, + "cms2": 2, + } + err = adapter.CMSMergeWithWeight(ctx, "destCms2", sourceSketches).Err() + Expect(err).NotTo(HaveOccurred()) + + result, err = adapter.CMSQuery(ctx, "destCms2", "item1", "item2", "item3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(result)).To(BeEquivalentTo(3)) + Expect(result[0]).To(BeEquivalentTo(int64(1))) + Expect(result[1]).To(BeEquivalentTo(int64(6))) + Expect(result[2]).To(BeEquivalentTo(int64(6))) + }) + }) + + Describe("TopK", Label("topk"), func() { + It("should TopKReserve, TopKInfo, TopKAdd, TopKQuery, TopKCount, TopKIncrBy, TopKList, TopKListWithCount", Label("topk", "topkreserve", "topkinfo", "topkadd", "topkquery", "topkcount", "topkincrby", "topklist", "topklistwithcount"), func() { + err := adapter.TopKReserve(ctx, "topk1", 3).Err() + Expect(err).NotTo(HaveOccurred()) + + resultInfo, err := adapter.TopKInfo(ctx, "topk1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo.K).To(BeEquivalentTo(int64(3))) + + resultAdd, err := adapter.TopKAdd(ctx, "topk1", "item1", "item2", 3, "item1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(resultAdd)).To(BeEquivalentTo(int64(4))) + + resultQuery, err := adapter.TopKQuery(ctx, "topk1", "item1", "item2", 4, 3).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(resultQuery)).To(BeEquivalentTo(4)) + Expect(resultQuery[0]).To(BeTrue()) + Expect(resultQuery[1]).To(BeTrue()) + Expect(resultQuery[2]).To(BeFalse()) + Expect(resultQuery[3]).To(BeTrue()) + + resultCount, err := adapter.TopKCount(ctx, "topk1", "item1", "item2", "item3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(resultCount)).To(BeEquivalentTo(3)) + Expect(resultCount[0]).To(BeEquivalentTo(int64(2))) + Expect(resultCount[1]).To(BeEquivalentTo(int64(1))) + Expect(resultCount[2]).To(BeEquivalentTo(int64(0))) + + resultIncr, err := adapter.TopKIncrBy(ctx, "topk1", "item1", 5, "item2", 10).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(resultIncr)).To(BeEquivalentTo(2)) + + resultCount, err = adapter.TopKCount(ctx, "topk1", "item1", "item2", "item3").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(resultCount)).To(BeEquivalentTo(3)) + Expect(resultCount[0]).To(BeEquivalentTo(int64(7))) + Expect(resultCount[1]).To(BeEquivalentTo(int64(11))) + Expect(resultCount[2]).To(BeEquivalentTo(int64(0))) + + resultList, err := adapter.TopKList(ctx, "topk1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(resultList)).To(BeEquivalentTo(3)) + Expect(resultList).To(ContainElements("item2", "item1", "3")) + + resultListWithCount, err := adapter.TopKListWithCount(ctx, "topk1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(resultListWithCount)).To(BeEquivalentTo(3)) + Expect(resultListWithCount["3"]).To(BeEquivalentTo(int64(1))) + Expect(resultListWithCount["item1"]).To(BeEquivalentTo(int64(7))) + Expect(resultListWithCount["item2"]).To(BeEquivalentTo(int64(11))) + }) + + It("should TopKReserveWithOptions", Label("topk", "topkreservewithoptions"), func() { + err := adapter.TopKReserveWithOptions(ctx, "topk1", 3, 1500, 8, 0.5).Err() + Expect(err).NotTo(HaveOccurred()) + + resultInfo, err := adapter.TopKInfo(ctx, "topk1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resultInfo.K).To(BeEquivalentTo(int64(3))) + Expect(resultInfo.Width).To(BeEquivalentTo(int64(1500))) + Expect(resultInfo.Depth).To(BeEquivalentTo(int64(8))) + Expect(resultInfo.Decay).To(BeEquivalentTo(0.5)) + }) + }) + + Describe("t-digest", Label("tdigest"), func() { + It("should TDigestAdd, TDigestCreate, TDigestInfo, TDigestByRank, TDigestByRevRank, TDigestCDF, TDigestMax, TDigestMin, TDigestQuantile, TDigestRank, TDigestRevRank, TDigestTrimmedMean, TDigestReset, ", Label("tdigest", "tdigestadd", "tdigestcreate", "tdigestinfo", "tdigestbyrank", "tdigestbyrevrank", "tdigestcdf", "tdigestmax", "tdigestmin", "tdigestquantile", "tdigestrank", "tdigestrevrank", "tdigesttrimmedmean", "tdigestreset"), func() { + err := adapter.TDigestCreate(ctx, "tdigest1").Err() + Expect(err).NotTo(HaveOccurred()) + + info, err := adapter.TDigestInfo(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info.Observations).To(BeEquivalentTo(int64(0))) + + // Test with empty sketch + byRank, err := adapter.TDigestByRank(ctx, "tdigest1", 0, 1, 2, 3).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(byRank)).To(BeEquivalentTo(4)) + + byRevRank, err := adapter.TDigestByRevRank(ctx, "tdigest1", 0, 1, 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(byRevRank)).To(BeEquivalentTo(3)) + + cdf, err := adapter.TDigestCDF(ctx, "tdigest1", 15, 35, 70).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(cdf)).To(BeEquivalentTo(3)) + + max, err := adapter.TDigestMax(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(math.IsNaN(max)).To(BeTrue()) + + min, err := adapter.TDigestMin(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(math.IsNaN(min)).To(BeTrue()) + + quantile, err := adapter.TDigestQuantile(ctx, "tdigest1", 0.1, 0.2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(quantile)).To(BeEquivalentTo(2)) + + rank, err := adapter.TDigestRank(ctx, "tdigest1", 10, 20).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rank)).To(BeEquivalentTo(2)) + + revRank, err := adapter.TDigestRevRank(ctx, "tdigest1", 10, 20).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(revRank)).To(BeEquivalentTo(2)) + + trimmedMean, err := adapter.TDigestTrimmedMean(ctx, "tdigest1", 0.1, 0.6).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(math.IsNaN(trimmedMean)).To(BeTrue()) + + // Add elements + err = adapter.TDigestAdd(ctx, "tdigest1", 10, 20, 30, 40, 50, 60, 70, 80, 90, 100).Err() + Expect(err).NotTo(HaveOccurred()) + + info, err = adapter.TDigestInfo(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info.Observations).To(BeEquivalentTo(int64(10))) + + byRank, err = adapter.TDigestByRank(ctx, "tdigest1", 0, 1, 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(byRank)).To(BeEquivalentTo(3)) + Expect(byRank[0]).To(BeEquivalentTo(float64(10))) + Expect(byRank[1]).To(BeEquivalentTo(float64(20))) + Expect(byRank[2]).To(BeEquivalentTo(float64(30))) + + byRevRank, err = adapter.TDigestByRevRank(ctx, "tdigest1", 0, 1, 2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(byRevRank)).To(BeEquivalentTo(3)) + Expect(byRevRank[0]).To(BeEquivalentTo(float64(100))) + Expect(byRevRank[1]).To(BeEquivalentTo(float64(90))) + Expect(byRevRank[2]).To(BeEquivalentTo(float64(80))) + + cdf, err = adapter.TDigestCDF(ctx, "tdigest1", 15, 35, 70).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(cdf)).To(BeEquivalentTo(3)) + Expect(cdf[0]).To(BeEquivalentTo(0.1)) + Expect(cdf[1]).To(BeEquivalentTo(0.3)) + Expect(cdf[2]).To(BeEquivalentTo(0.65)) + + max, err = adapter.TDigestMax(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(max).To(BeEquivalentTo(float64(100))) + + min, err = adapter.TDigestMin(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(min).To(BeEquivalentTo(float64(10))) + + quantile, err = adapter.TDigestQuantile(ctx, "tdigest1", 0.1, 0.2).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(quantile)).To(BeEquivalentTo(2)) + Expect(quantile[0]).To(BeEquivalentTo(float64(20))) + Expect(quantile[1]).To(BeEquivalentTo(float64(30))) + + rank, err = adapter.TDigestRank(ctx, "tdigest1", 10, 20).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(rank)).To(BeEquivalentTo(2)) + Expect(rank[0]).To(BeEquivalentTo(int64(0))) + Expect(rank[1]).To(BeEquivalentTo(int64(1))) + + revRank, err = adapter.TDigestRevRank(ctx, "tdigest1", 10, 20).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(len(revRank)).To(BeEquivalentTo(2)) + Expect(revRank[0]).To(BeEquivalentTo(int64(9))) + Expect(revRank[1]).To(BeEquivalentTo(int64(8))) + + trimmedMean, err = adapter.TDigestTrimmedMean(ctx, "tdigest1", 0.1, 0.6).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(trimmedMean).To(BeEquivalentTo(float64(40))) + + reset, err := adapter.TDigestReset(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(reset).To(BeEquivalentTo("OK")) + }) + + It("should TDigestCreateWithCompression", Label("tdigest", "tcreatewithcompression"), func() { + err := adapter.TDigestCreateWithCompression(ctx, "tdigest1", 2000).Err() + Expect(err).NotTo(HaveOccurred()) + + info, err := adapter.TDigestInfo(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info.Compression).To(BeEquivalentTo(int64(2000))) + }) + + It("should TDigestMerge", Label("tdigest", "tmerge"), func() { + err := adapter.TDigestCreate(ctx, "tdigest1").Err() + Expect(err).NotTo(HaveOccurred()) + err = adapter.TDigestAdd(ctx, "tdigest1", 10, 20, 30, 40, 50, 60, 70, 80, 90, 100).Err() + Expect(err).NotTo(HaveOccurred()) + + err = adapter.TDigestCreate(ctx, "tdigest2").Err() + Expect(err).NotTo(HaveOccurred()) + err = adapter.TDigestAdd(ctx, "tdigest2", 15, 25, 35, 45, 55, 65, 75, 85, 95, 105).Err() + Expect(err).NotTo(HaveOccurred()) + + err = adapter.TDigestCreate(ctx, "tdigest3").Err() + Expect(err).NotTo(HaveOccurred()) + err = adapter.TDigestAdd(ctx, "tdigest3", 50, 60, 70, 80, 90, 100, 110, 120, 130, 140).Err() + Expect(err).NotTo(HaveOccurred()) + + options := &TDigestMergeOptions{ + Compression: 1000, + Override: false, + } + err = adapter.TDigestMerge(ctx, "tdigest1", options, "tdigest2", "tdigest3").Err() + Expect(err).NotTo(HaveOccurred()) + + info, err := adapter.TDigestInfo(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(info.Observations).To(BeEquivalentTo(int64(30))) + Expect(info.Compression).To(BeEquivalentTo(int64(1000))) + + max, err := adapter.TDigestMax(ctx, "tdigest1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(max).To(BeEquivalentTo(float64(140))) + }) + }) + }) } func libCode(libName string) string { diff --git a/rueidiscompat/command.go b/rueidiscompat/command.go index 7044a135..657009cc 100644 --- a/rueidiscompat/command.go +++ b/rueidiscompat/command.go @@ -2860,3 +2860,313 @@ func newMapStringInterfaceSliceCmd(res rueidis.RedisResult) *MapStringInterfaceS } return out } + +type baseCmd[T any] struct { + err error + val T +} + +func (cmd *baseCmd[T]) SetVal(val T) { + cmd.val = val +} + +func (cmd *baseCmd[T]) Val() T { + return cmd.val +} + +func (cmd *baseCmd[T]) Err() error { + return cmd.err +} + +func (cmd *baseCmd[T]) Result() (T, error) { + return cmd.Val(), cmd.Err() +} + +type BFInsertOptions struct { + Capacity int64 + Error float64 + Expansion int64 + NonScaling bool + NoCreate bool +} + +type BFReserveOptions struct { + Capacity int64 + Error float64 + Expansion int64 + NonScaling bool +} + +type CFReserveOptions struct { + Capacity int64 + BucketSize int64 + MaxIterations int64 + Expansion int64 +} + +type CFInsertOptions struct { + Capacity int64 + NoCreate bool +} + +type BFInfo struct { + Capacity int64 `redis:"Capacity"` + Size int64 `redis:"Size"` + Filters int64 `redis:"Number of filters"` + ItemsInserted int64 `redis:"Number of items inserted"` + ExpansionRate int64 `redis:"Expansion rate"` +} + +type BFInfoCmd struct { + baseCmd[BFInfo] +} + +func newBFInfoCmd(res rueidis.RedisResult) *BFInfoCmd { + cmd := &BFInfoCmd{} + info := BFInfo{} + if err := res.Error(); err != nil { + cmd.err = err + return cmd + } + m, err := res.AsIntMap() + if err != nil { + cmd.err = err + return cmd + } + keys := make([]string, 0, len(m)) + values := make([]any, 0, len(m)) + for k, v := range m { + keys = append(keys, k) + values = append(values, strconv.FormatInt(v, 10)) + } + if err := Scan(&info, keys, values); err != nil { + cmd.err = err + return cmd + } + cmd.SetVal(info) + return cmd +} + +type ScanDump struct { + Iter int64 + Data string +} + +type ScanDumpCmd struct { + baseCmd[ScanDump] +} + +func newScanDumpCmd(res rueidis.RedisResult) *ScanDumpCmd { + cmd := &ScanDumpCmd{} + scanDump := ScanDump{} + if err := res.Error(); err != nil { + cmd.err = err + return cmd + } + arr, err := res.ToArray() + if err != nil { + cmd.err = err + return cmd + } + if len(arr) != 2 { + panic(fmt.Sprintf("wrong length of redis message, got %v, want %v", len(arr), 2)) + } + iter, err := arr[0].AsInt64() + if err != nil { + cmd.err = err + return cmd + } + data, err := arr[1].ToString() + if err != nil { + cmd.err = err + return cmd + } + scanDump.Iter = iter + scanDump.Data = data + cmd.SetVal(scanDump) + return cmd +} + +type CFInfo struct { + Size int64 `redis:"Size"` + NumBuckets int64 `redis:"Number of buckets"` + NumFilters int64 `redis:"Number of filters"` + NumItemsInserted int64 `redis:"Number of items inserted"` + NumItemsDeleted int64 `redis:"Number of items deleted"` + BucketSize int64 `redis:"Bucket size"` + ExpansionRate int64 `redis:"Expansion rate"` + MaxIteration int64 `redis:"Max iterations"` +} + +type CFInfoCmd struct { + baseCmd[CFInfo] +} + +func newCFInfoCmd(res rueidis.RedisResult) *CFInfoCmd { + cmd := &CFInfoCmd{} + info := CFInfo{} + m, err := res.AsMap() + if err != nil { + cmd.err = err + return cmd + } + keys := make([]string, 0, len(m)) + values := make([]any, 0, len(m)) + for k, v := range m { + keys = append(keys, k) + val, err := v.AsInt64() + if err != nil { + cmd.err = err + return cmd + } + values = append(values, strconv.FormatInt(val, 10)) + } + if err := Scan(&info, keys, values); err != nil { + cmd.err = err + return cmd + } + cmd.SetVal(info) + return cmd +} + +type CMSInfo struct { + Width int64 `redis:"width"` + Depth int64 `redis:"depth"` + Count int64 `redis:"count"` +} + +type CMSInfoCmd struct { + baseCmd[CMSInfo] +} + +func newCMSInfoCmd(res rueidis.RedisResult) *CMSInfoCmd { + cmd := &CMSInfoCmd{} + info := CMSInfo{} + m, err := res.AsIntMap() + if err != nil { + cmd.err = err + return cmd + } + keys := make([]string, 0, len(m)) + values := make([]any, 0, len(m)) + for k, v := range m { + keys = append(keys, k) + values = append(values, strconv.FormatInt(v, 10)) + } + if err := Scan(&info, keys, values); err != nil { + cmd.err = err + return cmd + } + cmd.SetVal(info) + return cmd +} + +type TopKInfo struct { + K int64 `redis:"k"` + Width int64 `redis:"width"` + Depth int64 `redis:"depth"` + Decay float64 `redis:"decay"` +} + +type TopKInfoCmd struct { + baseCmd[TopKInfo] +} + +func newTopKInfoCmd(res rueidis.RedisResult) *TopKInfoCmd { + cmd := &TopKInfoCmd{} + info := TopKInfo{} + m, err := res.ToMap() + if err != nil { + cmd.err = err + return cmd + } + keys := make([]string, 0, len(m)) + values := make([]any, 0, len(m)) + for k, v := range m { + keys = append(keys, k) + switch k { + case "k", "width", "depth": + intVal, err := v.AsInt64() + if err != nil { + cmd.err = err + return cmd + } + values = append(values, strconv.FormatInt(intVal, 10)) + case "decay": + decay, err := v.AsFloat64() + if err != nil { + cmd.err = err + return cmd + } + // args of strconv.FormatFloat is copied from cmds.TopkReserveParamsDepth.Decay + values = append(values, strconv.FormatFloat(decay, 'f', -1, 64)) + default: + panic("unexpected key") + } + } + if err := Scan(&info, keys, values); err != nil { + cmd.err = err + return cmd + } + cmd.SetVal(info) + return cmd +} + +type MapStringIntCmd struct { + baseCmd[map[string]int64] +} + +func newMapStringIntCmd(res rueidis.RedisResult) *MapStringIntCmd { + cmd := &MapStringIntCmd{} + m, err := res.AsIntMap() + if err != nil { + cmd.err = err + return cmd + } + cmd.SetVal(m) + return cmd +} + +// Ref: https://redis.io/commands/tdigest.info/ +type TDigestInfo struct { + Compression int64 `redis:"Compression"` + Capacity int64 `redis:"Capacity"` + MergedNodes int64 `redis:"Merged nodes"` + UnmergedNodes int64 `redis:"UnmergedNodes"` + MergedWeight int64 `redis:"MergedWeight"` + UnmergedWeight int64 `redis:"Unmerged weight"` + Observations int64 `redis:"Observations"` + TotalCompressions int64 `redis:"Total compressions"` + MemoryUsage int64 `redis:"Memory usage"` +} + +type TDigestInfoCmd struct { + baseCmd[TDigestInfo] +} + +func newTDigestInfoCmd(res rueidis.RedisResult) *TDigestInfoCmd { + cmd := &TDigestInfoCmd{} + info := TDigestInfo{} + m, err := res.AsIntMap() + if err != nil { + cmd.err = err + return cmd + } + keys := make([]string, 0, len(m)) + values := make([]any, 0, len(m)) + for k, v := range m { + keys = append(keys, k) + values = append(values, strconv.FormatInt(v, 10)) + } + if err := Scan(&info, keys, values); err != nil { + cmd.err = err + return cmd + } + cmd.SetVal(info) + return cmd +} + +type TDigestMergeOptions struct { + Compression int64 + Override bool +} diff --git a/rueidiscompat/hscan.go b/rueidiscompat/hscan.go index 8a3c531f..a0401401 100644 --- a/rueidiscompat/hscan.go +++ b/rueidiscompat/hscan.go @@ -99,6 +99,7 @@ func Struct(dst interface{}) (StructValue, error) { // Scan scans the results from a key-value Redis map result set to a destination struct. // The Redis keys are matched to the struct's field with the `redis` tag. +// NOTE: vals' element's underlying type should be string func Scan(dst interface{}, keys []string, vals []interface{}) error { if len(keys) != len(vals) { return errors.New("args should have the same number of keys and vals")