diff --git a/cdt_bitwise.go b/cdt_bitwise.go index 349d65c1..17202bae 100644 --- a/cdt_bitwise.go +++ b/cdt_bitwise.go @@ -481,12 +481,12 @@ func packCDTBitIfcVarParamsAsArray(packer BufferEx, opType int16, ctx []*CDTCont size += n for _, c := range ctx { - if n, err = packAInt64(packer, int64(c.id)); err != nil { + if n, err = packAInt64(packer, int64(c.Id)); err != nil { return size + n, err } size += n - if n, err = c.value.pack(packer); err != nil { + if n, err = c.Value.pack(packer); err != nil { return size + n, err } size += n diff --git a/cdt_context.go b/cdt_context.go index 32939982..5f5e1265 100644 --- a/cdt_context.go +++ b/cdt_context.go @@ -37,8 +37,8 @@ const ( // An array of CTX identifies location of the list/map on multiple // levels on nesting. type CDTContext struct { - id int - value Value + Id int + Value Value } // CDTContextToBase64 converts a []*CDTContext into a base64 encoded string. @@ -78,7 +78,7 @@ func Base64ToCDTContext(b64 string) ([]*CDTContext, Error) { res := make([]*CDTContext, 0, len(list)/2) for i := 0; i < len(list); i += 2 { - res = append(res, &CDTContext{id: list[i].(int), value: NewValue(list[i+1])}) + res = append(res, &CDTContext{Id: list[i].(int), Value: NewValue(list[i+1])}) } return res, nil @@ -86,18 +86,18 @@ func Base64ToCDTContext(b64 string) ([]*CDTContext, Error) { // String implements the Stringer interface for CDTContext func (ctx *CDTContext) String() string { - return fmt.Sprintf("CDTContext{id: %d, value: %s}", ctx.id, ctx.value.String()) + return fmt.Sprintf("CDTContext{id: %d, value: %s}", ctx.Id, ctx.Value.String()) } func (ctx *CDTContext) pack(cmd BufferEx) (int, Error) { size := 0 - sz, err := packAInt64(cmd, int64(ctx.id)) + sz, err := packAInt64(cmd, int64(ctx.Id)) size += sz if err != nil { return size, err } - sz, err = ctx.value.pack(cmd) + sz, err = ctx.Value.pack(cmd) size += sz return size, err diff --git a/cdt_list.go b/cdt_list.go index f5443ff8..7cdeff95 100644 --- a/cdt_list.go +++ b/cdt_list.go @@ -219,12 +219,12 @@ func packCDTParamsAsArray(packer BufferEx, opType int16, ctx []*CDTContext, para size += n for _, c := range ctx { - if n, err = packAInt64(packer, int64(c.id)); err != nil { + if n, err = packAInt64(packer, int64(c.Id)); err != nil { return size + n, err } size += n - if n, err = c.value.pack(packer); err != nil { + if n, err = c.Value.pack(packer); err != nil { return size + n, err } size += n @@ -289,12 +289,12 @@ func packCDTIfcVarParamsAsArray(packer BufferEx, opType int16, ctx []*CDTContext size += n for _, c := range ctx { - if n, err = packAInt64(packer, int64(c.id)); err != nil { + if n, err = packAInt64(packer, int64(c.Id)); err != nil { return size + n, err } size += n - if n, err = c.value.pack(packer); err != nil { + if n, err = c.Value.pack(packer); err != nil { return size + n, err } size += n @@ -368,24 +368,24 @@ func packCDTCreate(packer BufferEx, opType int16, ctx []*CDTContext, flag int, p for i := 0; i < last; i++ { c = ctx[i] - if n, err = packAInt64(packer, int64(c.id)); err != nil { + if n, err = packAInt64(packer, int64(c.Id)); err != nil { return size + n, err } size += n - if n, err = c.value.pack(packer); err != nil { + if n, err = c.Value.pack(packer); err != nil { return size + n, err } size += n } c = ctx[last] - if n, err = packAInt64(packer, int64(c.id|flag)); err != nil { + if n, err = packAInt64(packer, int64(c.Id|flag)); err != nil { return size + n, err } size += n - if n, err = c.value.pack(packer); err != nil { + if n, err = c.Value.pack(packer); err != nil { return size + n, err } size += n diff --git a/command.go b/command.go index 6c947ecf..e7ef433f 100644 --- a/command.go +++ b/command.go @@ -1844,7 +1844,7 @@ func (cmd *baseCommand) setQuery(policy *QueryPolicy, wpolicy *WritePolicy, stat if statement.functionName != "" { cmd.writeFieldHeader(1, UDF_OP) - if statement.returnData { + if statement.ReturnData { cmd.dataBuffer[cmd.dataOffset] = byte(1) } else { cmd.dataBuffer[cmd.dataOffset] = byte(2) diff --git a/exp_list.go b/exp_list.go index 5c0b9566..12753b58 100644 --- a/exp_list.go +++ b/exp_list.go @@ -637,7 +637,7 @@ func cdtListAddWrite( var returnType ExpType if len(ctx) == 0 { returnType = ExpTypeLIST - } else if (ctx[0].id & ctxTypeListIndex) == 0 { + } else if (ctx[0].Id & ctxTypeListIndex) == 0 { returnType = ExpTypeMAP } else { returnType = ExpTypeLIST diff --git a/exp_map.go b/exp_map.go index 658f2015..9d7804b7 100644 --- a/exp_map.go +++ b/exp_map.go @@ -809,7 +809,7 @@ func expMapAddWrite( var returnType ExpType if len(ctx) == 0 { returnType = ExpTypeMAP - } else if (ctx[0].id & ctxTypeListIndex) == 0 { + } else if (ctx[0].Id & ctxTypeListIndex) == 0 { returnType = ExpTypeMAP } else { returnType = ExpTypeLIST diff --git a/filter.go b/filter.go index f341e865..612e4b32 100644 --- a/filter.go +++ b/filter.go @@ -35,7 +35,7 @@ type Filter struct { // Value can be an integer, string or a blob (byte array). Byte arrays are only supported on server v7+. func NewEqualFilter(binName string, value interface{}, ctx ...*CDTContext) *Filter { val := NewValue(value) - return newFilter(binName, ICT_DEFAULT, val.GetType(), val, val, ctx) + return NewFilter(binName, ICT_DEFAULT, val.GetType(), val, val, ctx) } // NewRangeFilter creates a range filter for query. @@ -43,68 +43,68 @@ func NewEqualFilter(binName string, value interface{}, ctx ...*CDTContext) *Filt // String ranges are not supported. func NewRangeFilter(binName string, begin int64, end int64, ctx ...*CDTContext) *Filter { vBegin, vEnd := NewValue(begin), NewValue(end) - return newFilter(binName, ICT_DEFAULT, vBegin.GetType(), vBegin, vEnd, ctx) + return NewFilter(binName, ICT_DEFAULT, vBegin.GetType(), vBegin, vEnd, ctx) } // NewContainsFilter creates a contains filter for query on collection index. // Value can be an integer, string or a blob (byte array). Byte arrays are only supported on server v7+. func NewContainsFilter(binName string, indexCollectionType IndexCollectionType, value interface{}, ctx ...*CDTContext) *Filter { v := NewValue(value) - return newFilter(binName, indexCollectionType, v.GetType(), v, v, ctx) + return NewFilter(binName, indexCollectionType, v.GetType(), v, v, ctx) } // NewContainsRangeFilter creates a contains filter for query on ranges of data in a collection index. func NewContainsRangeFilter(binName string, indexCollectionType IndexCollectionType, begin, end int64, ctx ...*CDTContext) *Filter { vBegin, vEnd := NewValue(begin), NewValue(end) - return newFilter(binName, indexCollectionType, vBegin.GetType(), vBegin, vEnd, ctx) + return NewFilter(binName, indexCollectionType, vBegin.GetType(), vBegin, vEnd, ctx) } // NewGeoWithinRegionFilter creates a geospatial "within region" filter for query. // Argument must be a valid GeoJSON region. func NewGeoWithinRegionFilter(binName, region string, ctx ...*CDTContext) *Filter { v := NewStringValue(region) - return newFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, v, v, ctx) + return NewFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, v, v, ctx) } // NewGeoWithinRegionForCollectionFilter creates a geospatial "within region" filter for query on collection index. // Argument must be a valid GeoJSON region. func NewGeoWithinRegionForCollectionFilter(binName string, collectionType IndexCollectionType, region string, ctx ...*CDTContext) *Filter { v := NewStringValue(region) - return newFilter(binName, collectionType, ParticleType.GEOJSON, v, v, ctx) + return NewFilter(binName, collectionType, ParticleType.GEOJSON, v, v, ctx) } // NewGeoRegionsContainingPointFilter creates a geospatial "containing point" filter for query. // Argument must be a valid GeoJSON point. func NewGeoRegionsContainingPointFilter(binName, point string, ctx ...*CDTContext) *Filter { v := NewStringValue(point) - return newFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, v, v, ctx) + return NewFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, v, v, ctx) } // NewGeoRegionsContainingPointForCollectionFilter creates a geospatial "containing point" filter for query on collection index. // Argument must be a valid GeoJSON point. func NewGeoRegionsContainingPointForCollectionFilter(binName string, collectionType IndexCollectionType, point string, ctx ...*CDTContext) *Filter { v := NewStringValue(point) - return newFilter(binName, collectionType, ParticleType.GEOJSON, v, v, ctx) + return NewFilter(binName, collectionType, ParticleType.GEOJSON, v, v, ctx) } // NewGeoWithinRadiusFilter creates a geospatial "within radius" filter for query. // Arguments must be valid longitude/latitude/radius (meters) values. func NewGeoWithinRadiusFilter(binName string, lng, lat, radius float64, ctx ...*CDTContext) *Filter { rgnStr := fmt.Sprintf("{ \"type\": \"AeroCircle\", "+"\"coordinates\": [[%.8f, %.8f], %f] }", lng, lat, radius) - return newFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, NewValue(rgnStr), NewValue(rgnStr), ctx) + return NewFilter(binName, ICT_DEFAULT, ParticleType.GEOJSON, NewValue(rgnStr), NewValue(rgnStr), ctx) } // NewGeoWithinRadiusForCollectionFilter creates a geospatial "within radius" filter for query on collection index. // Arguments must be valid longitude/latitude/radius (meters) values. func NewGeoWithinRadiusForCollectionFilter(binName string, collectionType IndexCollectionType, lng, lat, radius float64, ctx ...*CDTContext) *Filter { rgnStr := fmt.Sprintf("{ \"type\": \"AeroCircle\", "+"\"coordinates\": [[%.8f, %.8f], %f] }", lng, lat, radius) - return newFilter(binName, collectionType, ParticleType.GEOJSON, NewValue(rgnStr), NewValue(rgnStr), ctx) + return NewFilter(binName, collectionType, ParticleType.GEOJSON, NewValue(rgnStr), NewValue(rgnStr), ctx) } // Create a filter for query. // Range arguments must be longs or integers which can be cast to longs. // String ranges are not supported. -func newFilter(name string, indexCollectionType IndexCollectionType, valueParticleType int, begin Value, end Value, ctx []*CDTContext) *Filter { +func NewFilter(name string, indexCollectionType IndexCollectionType, valueParticleType int, begin Value, end Value, ctx []*CDTContext) *Filter { return &Filter{ name: name, idxType: indexCollectionType, diff --git a/multi_command.go b/multi_command.go index dc8a175e..66bafcf5 100644 --- a/multi_command.go +++ b/multi_command.go @@ -221,7 +221,8 @@ func (cmd *baseMultiCommand) parseKey(fieldCount int, bval *int64) (*Key, Error) return nil, err.setNode(cmd.node) } case BVAL_ARRAY: - *bval = Buffer.LittleBytesToInt64(cmd.dataBuffer, 1) + v := Buffer.LittleBytesToInt64(cmd.dataBuffer, 1) + bval = &v } } @@ -291,8 +292,8 @@ func (cmd *baseMultiCommand) parseRecordResults(ifc command, receiveSize int) (b fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18)) opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20)) - var bval int64 - key, err := cmd.parseKey(fieldCount, &bval) + var bval *int64 + key, err := cmd.parseKey(fieldCount, bval) if err != nil { err = newNodeError(cmd.node, err) return false, err @@ -365,7 +366,7 @@ func (cmd *baseMultiCommand) parseRecordResults(ifc command, receiveSize int) (b // block forever, or panic in case the channel is closed in the meantime. select { // send back the result on the async channel - case cmd.recordset.records <- &Result{Record: newRecord(cmd.node, key, bins, generation, expiration), Err: nil}: + case cmd.recordset.records <- &Result{Record: newRecord(cmd.node, key, bins, generation, expiration), Err: nil, BVal: bval}: case <-cmd.recordset.cancelled: switch cmd.terminationErrorType { case types.SCAN_TERMINATED: diff --git a/partition_tracker.go b/partition_tracker.go index c6fc6744..8e6f80c4 100644 --- a/partition_tracker.go +++ b/partition_tracker.go @@ -279,14 +279,16 @@ func (pt *partitionTracker) setDigest(nodePartitions *nodePartitions, key *Key) } } -func (pt *partitionTracker) setLast(nodePartitions *nodePartitions, key *Key, bval int64) { +func (pt *partitionTracker) setLast(nodePartitions *nodePartitions, key *Key, bval *int64) { partitionId := key.PartitionId() if partitionId-pt.partitionBegin < 0 { panic(fmt.Sprintf("Partition mismatch: key.partitionId: %d, partitionBegin: %d", partitionId, pt.partitionBegin)) } ps := pt.partitions[partitionId-pt.partitionBegin] ps.Digest = key.digest[:] - ps.BVal = bval + if bval != nil { + ps.BVal = *bval + } // nodePartitions is nil in Proxy client if nodePartitions != nil { diff --git a/recordset.go b/recordset.go index ad8d27e6..cad725fb 100644 --- a/recordset.go +++ b/recordset.go @@ -29,6 +29,7 @@ import ( type Result struct { Record *Record Err Error + BVal *int64 } // String implements the Stringer interface diff --git a/statement.go b/statement.go index 050bc50e..45a9ca7b 100644 --- a/statement.go +++ b/statement.go @@ -34,7 +34,7 @@ type Statement struct { // If not set, the server will determine the index from the filter's bin name. IndexName string - // BinNames detemines bin names (optional) + // BinNames determines bin names (optional) BinNames []string // Filter determines query index filter (Optional). @@ -51,7 +51,7 @@ type Statement struct { TaskId uint64 // determines if the query should return data - returnData bool + ReturnData bool } // NewStatement initializes a new Statement instance. @@ -60,7 +60,7 @@ func NewStatement(ns string, set string, binNames ...string) *Statement { Namespace: ns, SetName: set, BinNames: binNames, - returnData: true, + ReturnData: true, TaskId: xornd.Uint64(), } } @@ -75,7 +75,8 @@ func (stmt *Statement) String() string { stmt.packageName, stmt.functionName, stmt.functionArgs, - stmt.TaskId, stmt.returnData, + stmt.TaskId, + stmt.ReturnData, ) } @@ -94,7 +95,7 @@ func (stmt *Statement) SetAggregateFunction(packageName string, functionName str stmt.packageName = packageName stmt.functionName = functionName stmt.functionArgs = functionArgs - stmt.returnData = returnData + stmt.ReturnData = returnData } // IsScan determines is the Statement is a full namespace/set scan or a selective Query. @@ -111,7 +112,7 @@ func (stmt *Statement) terminationError() types.ResultCode { // Always set the taskID client-side to a non-zero random value func (stmt *Statement) prepare(returnData bool) { - stmt.returnData = returnData + stmt.ReturnData = returnData } func (stmt *Statement) grpc(policy *QueryPolicy, ops []*Operation) *kvs.Statement {