Skip to content

Commit

Permalink
Export private fields and add Recordset.BVal
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Mar 11, 2024
1 parent bdaabc2 commit 933ff2d
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 42 deletions.
4 changes: 2 additions & 2 deletions cdt_bitwise.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,12 @@ func packCDTBitIfcVarParamsAsArray(packer BufferEx, opType int16, ctx []*CDTCont
size += n

for _, c := range ctx {
if n, err = packAInt64(packer, int64(c.id)); err != nil {
if n, err = packAInt64(packer, int64(c.Id)); err != nil {
return size + n, err
}
size += n

if n, err = c.value.pack(packer); err != nil {
if n, err = c.Value.pack(packer); err != nil {
return size + n, err
}
size += n
Expand Down
12 changes: 6 additions & 6 deletions cdt_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ const (
// An array of CTX identifies location of the list/map on multiple
// levels on nesting.
type CDTContext struct {
id int
value Value
Id int
Value Value
}

// CDTContextToBase64 converts a []*CDTContext into a base64 encoded string.
Expand Down Expand Up @@ -78,26 +78,26 @@ func Base64ToCDTContext(b64 string) ([]*CDTContext, Error) {

res := make([]*CDTContext, 0, len(list)/2)
for i := 0; i < len(list); i += 2 {
res = append(res, &CDTContext{id: list[i].(int), value: NewValue(list[i+1])})
res = append(res, &CDTContext{Id: list[i].(int), Value: NewValue(list[i+1])})
}

return res, nil
}

// String implements the Stringer interface for CDTContext
func (ctx *CDTContext) String() string {
return fmt.Sprintf("CDTContext{id: %d, value: %s}", ctx.id, ctx.value.String())
return fmt.Sprintf("CDTContext{id: %d, value: %s}", ctx.Id, ctx.Value.String())
}

func (ctx *CDTContext) pack(cmd BufferEx) (int, Error) {
size := 0
sz, err := packAInt64(cmd, int64(ctx.id))
sz, err := packAInt64(cmd, int64(ctx.Id))
size += sz
if err != nil {
return size, err
}

sz, err = ctx.value.pack(cmd)
sz, err = ctx.Value.pack(cmd)
size += sz

return size, err
Expand Down
16 changes: 8 additions & 8 deletions cdt_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,12 @@ func packCDTParamsAsArray(packer BufferEx, opType int16, ctx []*CDTContext, para
size += n

for _, c := range ctx {
if n, err = packAInt64(packer, int64(c.id)); err != nil {
if n, err = packAInt64(packer, int64(c.Id)); err != nil {
return size + n, err
}
size += n

if n, err = c.value.pack(packer); err != nil {
if n, err = c.Value.pack(packer); err != nil {
return size + n, err
}
size += n
Expand Down Expand Up @@ -289,12 +289,12 @@ func packCDTIfcVarParamsAsArray(packer BufferEx, opType int16, ctx []*CDTContext
size += n

for _, c := range ctx {
if n, err = packAInt64(packer, int64(c.id)); err != nil {
if n, err = packAInt64(packer, int64(c.Id)); err != nil {
return size + n, err
}
size += n

if n, err = c.value.pack(packer); err != nil {
if n, err = c.Value.pack(packer); err != nil {
return size + n, err
}
size += n
Expand Down Expand Up @@ -368,24 +368,24 @@ func packCDTCreate(packer BufferEx, opType int16, ctx []*CDTContext, flag int, p

for i := 0; i < last; i++ {
c = ctx[i]
if n, err = packAInt64(packer, int64(c.id)); err != nil {
if n, err = packAInt64(packer, int64(c.Id)); err != nil {
return size + n, err
}
size += n

if n, err = c.value.pack(packer); err != nil {
if n, err = c.Value.pack(packer); err != nil {
return size + n, err
}
size += n
}

c = ctx[last]
if n, err = packAInt64(packer, int64(c.id|flag)); err != nil {
if n, err = packAInt64(packer, int64(c.Id|flag)); err != nil {
return size + n, err
}
size += n

if n, err = c.value.pack(packer); err != nil {
if n, err = c.Value.pack(packer); err != nil {
return size + n, err
}
size += n
Expand Down
2 changes: 1 addition & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1844,7 +1844,7 @@ func (cmd *baseCommand) setQuery(policy *QueryPolicy, wpolicy *WritePolicy, stat

if statement.functionName != "" {
cmd.writeFieldHeader(1, UDF_OP)
if statement.returnData {
if statement.ReturnData {
cmd.dataBuffer[cmd.dataOffset] = byte(1)
} else {
cmd.dataBuffer[cmd.dataOffset] = byte(2)
Expand Down
2 changes: 1 addition & 1 deletion exp_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func cdtListAddWrite(
var returnType ExpType
if len(ctx) == 0 {
returnType = ExpTypeLIST
} else if (ctx[0].id & ctxTypeListIndex) == 0 {
} else if (ctx[0].Id & ctxTypeListIndex) == 0 {
returnType = ExpTypeMAP
} else {
returnType = ExpTypeLIST
Expand Down
2 changes: 1 addition & 1 deletion exp_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ func expMapAddWrite(
var returnType ExpType
if len(ctx) == 0 {
returnType = ExpTypeMAP
} else if (ctx[0].id & ctxTypeListIndex) == 0 {
} else if (ctx[0].Id & ctxTypeListIndex) == 0 {
returnType = ExpTypeMAP
} else {
returnType = ExpTypeLIST
Expand Down
22 changes: 11 additions & 11 deletions filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,76 +35,76 @@ type Filter struct {
// Value can be an integer, string or a blob (byte array). Byte arrays are only supported on server v7+.
func NewEqualFilter(binName string, value interface{}, ctx ...*CDTContext) *Filter {
val := NewValue(value)
return newFilter(binName, ICT_DEFAULT, val.GetType(), val, val, ctx)
return NewFilter(binName, ICT_DEFAULT, val.GetType(), val, val, ctx)
}

// NewRangeFilter creates a range filter for query.
// Range arguments must be int64 values.
// String ranges are not supported.
func NewRangeFilter(binName string, begin int64, end int64, ctx ...*CDTContext) *Filter {
vBegin, vEnd := NewValue(begin), NewValue(end)
return newFilter(binName, ICT_DEFAULT, vBegin.GetType(), vBegin, vEnd, ctx)
return NewFilter(binName, ICT_DEFAULT, vBegin.GetType(), vBegin, vEnd, ctx)
}

// NewContainsFilter creates a contains filter for query on collection index.
// Value can be an integer, string or a blob (byte array). Byte arrays are only supported on server v7+.
func NewContainsFilter(binName string, indexCollectionType IndexCollectionType, value interface{}, ctx ...*CDTContext) *Filter {
v := NewValue(value)
return newFilter(binName, indexCollectionType, v.GetType(), v, v, ctx)
return NewFilter(binName, indexCollectionType, v.GetType(), v, v, ctx)
}

// NewContainsRangeFilter creates a contains filter for query on ranges of data in a collection index.
func NewContainsRangeFilter(binName string, indexCollectionType IndexCollectionType, begin, end int64, ctx ...*CDTContext) *Filter {
vBegin, vEnd := NewValue(begin), NewValue(end)
return newFilter(binName, indexCollectionType, vBegin.GetType(), vBegin, vEnd, ctx)
return NewFilter(binName, indexCollectionType, vBegin.GetType(), vBegin, vEnd, ctx)
}

// NewGeoWithinRegionFilter creates a geospatial "within region" filter for query.
// Argument must be a valid GeoJSON region.
func NewGeoWithinRegionFilter(binName, region string, ctx ...*CDTContext) *Filter {
v := NewStringValue(region)
return newFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, v, v, ctx)
return NewFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, v, v, ctx)
}

// NewGeoWithinRegionForCollectionFilter creates a geospatial "within region" filter for query on collection index.
// Argument must be a valid GeoJSON region.
func NewGeoWithinRegionForCollectionFilter(binName string, collectionType IndexCollectionType, region string, ctx ...*CDTContext) *Filter {
v := NewStringValue(region)
return newFilter(binName, collectionType, ParticleType.GEOJSON, v, v, ctx)
return NewFilter(binName, collectionType, ParticleType.GEOJSON, v, v, ctx)
}

// NewGeoRegionsContainingPointFilter creates a geospatial "containing point" filter for query.
// Argument must be a valid GeoJSON point.
func NewGeoRegionsContainingPointFilter(binName, point string, ctx ...*CDTContext) *Filter {
v := NewStringValue(point)
return newFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, v, v, ctx)
return NewFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, v, v, ctx)
}

// NewGeoRegionsContainingPointForCollectionFilter creates a geospatial "containing point" filter for query on collection index.
// Argument must be a valid GeoJSON point.
func NewGeoRegionsContainingPointForCollectionFilter(binName string, collectionType IndexCollectionType, point string, ctx ...*CDTContext) *Filter {
v := NewStringValue(point)
return newFilter(binName, collectionType, ParticleType.GEOJSON, v, v, ctx)
return NewFilter(binName, collectionType, ParticleType.GEOJSON, v, v, ctx)
}

// NewGeoWithinRadiusFilter creates a geospatial "within radius" filter for query.
// Arguments must be valid longitude/latitude/radius (meters) values.
func NewGeoWithinRadiusFilter(binName string, lng, lat, radius float64, ctx ...*CDTContext) *Filter {
rgnStr := fmt.Sprintf("{ \"type\": \"AeroCircle\", "+"\"coordinates\": [[%.8f, %.8f], %f] }", lng, lat, radius)
return newFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, NewValue(rgnStr), NewValue(rgnStr), ctx)
return NewFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, NewValue(rgnStr), NewValue(rgnStr), ctx)
}

// NewGeoWithinRadiusForCollectionFilter creates a geospatial "within radius" filter for query on collection index.
// Arguments must be valid longitude/latitude/radius (meters) values.
func NewGeoWithinRadiusForCollectionFilter(binName string, collectionType IndexCollectionType, lng, lat, radius float64, ctx ...*CDTContext) *Filter {
rgnStr := fmt.Sprintf("{ \"type\": \"AeroCircle\", "+"\"coordinates\": [[%.8f, %.8f], %f] }", lng, lat, radius)
return newFilter(binName, collectionType, ParticleType.GEOJSON, NewValue(rgnStr), NewValue(rgnStr), ctx)
return NewFilter(binName, collectionType, ParticleType.GEOJSON, NewValue(rgnStr), NewValue(rgnStr), ctx)
}

// Create a filter for query.
// Range arguments must be longs or integers which can be cast to longs.
// String ranges are not supported.
func newFilter(name string, indexCollectionType IndexCollectionType, valueParticleType int, begin Value, end Value, ctx []*CDTContext) *Filter {
func NewFilter(name string, indexCollectionType IndexCollectionType, valueParticleType int, begin Value, end Value, ctx []*CDTContext) *Filter {
return &Filter{
name: name,
idxType: indexCollectionType,
Expand Down
9 changes: 5 additions & 4 deletions multi_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ func (cmd *baseMultiCommand) parseKey(fieldCount int, bval *int64) (*Key, Error)
return nil, err.setNode(cmd.node)
}
case BVAL_ARRAY:
*bval = Buffer.LittleBytesToInt64(cmd.dataBuffer, 1)
v := Buffer.LittleBytesToInt64(cmd.dataBuffer, 1)
bval = &v
}
}

Expand Down Expand Up @@ -291,8 +292,8 @@ func (cmd *baseMultiCommand) parseRecordResults(ifc command, receiveSize int) (b
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))

var bval int64
key, err := cmd.parseKey(fieldCount, &bval)
var bval *int64
key, err := cmd.parseKey(fieldCount, bval)
if err != nil {
err = newNodeError(cmd.node, err)
return false, err
Expand Down Expand Up @@ -365,7 +366,7 @@ func (cmd *baseMultiCommand) parseRecordResults(ifc command, receiveSize int) (b
// block forever, or panic in case the channel is closed in the meantime.
select {
// send back the result on the async channel
case cmd.recordset.records <- &Result{Record: newRecord(cmd.node, key, bins, generation, expiration), Err: nil}:
case cmd.recordset.records <- &Result{Record: newRecord(cmd.node, key, bins, generation, expiration), Err: nil, BVal: bval}:
case <-cmd.recordset.cancelled:
switch cmd.terminationErrorType {
case types.SCAN_TERMINATED:
Expand Down
6 changes: 4 additions & 2 deletions partition_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,16 @@ func (pt *partitionTracker) setDigest(nodePartitions *nodePartitions, key *Key)
}
}

func (pt *partitionTracker) setLast(nodePartitions *nodePartitions, key *Key, bval int64) {
func (pt *partitionTracker) setLast(nodePartitions *nodePartitions, key *Key, bval *int64) {
partitionId := key.PartitionId()
if partitionId-pt.partitionBegin < 0 {
panic(fmt.Sprintf("Partition mismatch: key.partitionId: %d, partitionBegin: %d", partitionId, pt.partitionBegin))
}
ps := pt.partitions[partitionId-pt.partitionBegin]
ps.Digest = key.digest[:]
ps.BVal = bval
if bval != nil {
ps.BVal = *bval
}

// nodePartitions is nil in Proxy client
if nodePartitions != nil {
Expand Down
1 change: 1 addition & 0 deletions recordset.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
type Result struct {
Record *Record
Err Error
BVal *int64
}

// String implements the Stringer interface
Expand Down
13 changes: 7 additions & 6 deletions statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Statement struct {
// If not set, the server will determine the index from the filter's bin name.
IndexName string

// BinNames detemines bin names (optional)
// BinNames determines bin names (optional)
BinNames []string

// Filter determines query index filter (Optional).
Expand All @@ -51,7 +51,7 @@ type Statement struct {
TaskId uint64

// determines if the query should return data
returnData bool
ReturnData bool
}

// NewStatement initializes a new Statement instance.
Expand All @@ -60,7 +60,7 @@ func NewStatement(ns string, set string, binNames ...string) *Statement {
Namespace: ns,
SetName: set,
BinNames: binNames,
returnData: true,
ReturnData: true,
TaskId: xornd.Uint64(),
}
}
Expand All @@ -75,7 +75,8 @@ func (stmt *Statement) String() string {
stmt.packageName,
stmt.functionName,
stmt.functionArgs,
stmt.TaskId, stmt.returnData,
stmt.TaskId,
stmt.ReturnData,
)
}

Expand All @@ -94,7 +95,7 @@ func (stmt *Statement) SetAggregateFunction(packageName string, functionName str
stmt.packageName = packageName
stmt.functionName = functionName
stmt.functionArgs = functionArgs
stmt.returnData = returnData
stmt.ReturnData = returnData
}

// IsScan determines is the Statement is a full namespace/set scan or a selective Query.
Expand All @@ -111,7 +112,7 @@ func (stmt *Statement) terminationError() types.ResultCode {

// Always set the taskID client-side to a non-zero random value
func (stmt *Statement) prepare(returnData bool) {
stmt.returnData = returnData
stmt.ReturnData = returnData
}

func (stmt *Statement) grpc(policy *QueryPolicy, ops []*Operation) *kvs.Statement {
Expand Down

0 comments on commit 933ff2d

Please sign in to comment.