Skip to content

Commit

Permalink
[CLIENT-2823] Support QueryDuration
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Mar 26, 2024
1 parent acdfa78 commit 723eae2
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 12 deletions.
25 changes: 15 additions & 10 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ const (
_INFO2_DURABLE_DELETE int = (1 << 4)
// Create only. Fail if record already exists.
_INFO2_CREATE_ONLY int = (1 << 5)

// Treat as long query, but relax read consistency.
_INFO2_RELAX_AP_LONG_QUERY = (1 << 6)
// Return a result for every operation.
_INFO2_RESPOND_ALL_OPS int = (1 << 7)

Expand Down Expand Up @@ -374,7 +375,7 @@ func (cmd *baseCommand) setReadForKeyOnly(policy *BasePolicy, key *Key) Error {
return err
}

cmd.writeHeaderRead(policy, _INFO1_READ|_INFO1_GET_ALL, 0, fieldCount, 0)
cmd.writeHeaderRead(policy, _INFO1_READ|_INFO1_GET_ALL, 0, 0, fieldCount, 0)
if err := cmd.writeKey(key, false); err != nil {
return err
}
Expand Down Expand Up @@ -421,7 +422,7 @@ func (cmd *baseCommand) setRead(policy *BasePolicy, key *Key, binNames []string)
if len(binNames) == 0 {
attr |= _INFO1_GET_ALL
}
cmd.writeHeaderRead(policy, attr, 0, fieldCount, len(binNames))
cmd.writeHeaderRead(policy, attr, 0, 0, fieldCount, len(binNames))

if err := cmd.writeKey(key, false); err != nil {
return err
Expand Down Expand Up @@ -1147,7 +1148,7 @@ func (cmd *baseCommand) setBatchRead(policy *BatchPolicy, keys []*Key, batch *ba
readAttr |= _INFO1_READ_MODE_AP_ALL
}

cmd.writeHeaderRead(&policy.BasePolicy, readAttr|_INFO1_BATCH, 0, fieldCount, 0)
cmd.writeHeaderRead(&policy.BasePolicy, readAttr|_INFO1_BATCH, 0, 0, fieldCount, 0)

if policy.FilterExpression != nil {
if err := cmd.writeFilterExpression(policy.FilterExpression, predSize); err != nil {
Expand Down Expand Up @@ -1282,7 +1283,7 @@ func (cmd *baseCommand) setBatchIndexRead(policy *BatchPolicy, records []*BatchR
readAttr |= _INFO1_READ_MODE_AP_ALL
}

cmd.writeHeaderRead(&policy.BasePolicy, readAttr|_INFO1_BATCH, 0, fieldCount, 0)
cmd.writeHeaderRead(&policy.BasePolicy, readAttr|_INFO1_BATCH, 0, 0, fieldCount, 0)

if policy.FilterExpression != nil {
if err := cmd.writeFilterExpression(policy.FilterExpression, predSize); err != nil {
Expand Down Expand Up @@ -1471,7 +1472,7 @@ func (cmd *baseCommand) setScan(policy *ScanPolicy, namespace *string, setName *
if len(binNames) > 0 {
operationCount = len(binNames)
}
cmd.writeHeaderRead(&policy.BasePolicy, readAttr, infoAttr, fieldCount, operationCount)
cmd.writeHeaderRead(&policy.BasePolicy, readAttr, 0, infoAttr, fieldCount, operationCount)

if namespace != nil {
cmd.writeFieldString(*namespace, NAMESPACE)
Expand Down Expand Up @@ -1731,17 +1732,21 @@ func (cmd *baseCommand) setQuery(policy *QueryPolicy, wpolicy *WritePolicy, stat
cmd.writeHeaderWrite(wpolicy, _INFO2_WRITE, fieldCount, operationCount)
} else {
readAttr := _INFO1_READ | _INFO1_NOBINDATA
writeAttr := 0

if policy.IncludeBinData {
readAttr = _INFO1_READ
}
if policy.ShortQuery {
if policy.ShortQuery || policy.ExpectedDuration == SHORT {
readAttr |= _INFO1_SHORT_QUERY
} else if policy.ExpectedDuration == LONG_RELAX_AP {
writeAttr |= _INFO2_RELAX_AP_LONG_QUERY
}
infoAttr := 0
if isNew {
infoAttr = _INFO3_PARTITION_DONE
}
cmd.writeHeaderRead(&policy.BasePolicy, readAttr, infoAttr, fieldCount, operationCount)
cmd.writeHeaderRead(&policy.BasePolicy, readAttr, writeAttr, infoAttr, fieldCount, operationCount)
}

if statement.Namespace != "" {
Expand Down Expand Up @@ -2113,7 +2118,7 @@ func (cmd *baseCommand) writeHeaderReadWrite(policy *WritePolicy, args *operateA
}

// Header write for read commands.
func (cmd *baseCommand) writeHeaderRead(policy *BasePolicy, readAttr, infoAttr, fieldCount, operationCount int) {
func (cmd *baseCommand) writeHeaderRead(policy *BasePolicy, readAttr, writeAttr, infoAttr, fieldCount, operationCount int) {
switch policy.ReadModeSC {
case ReadModeSCSession:
case ReadModeSCLinearize:
Expand All @@ -2135,7 +2140,7 @@ func (cmd *baseCommand) writeHeaderRead(policy *BasePolicy, readAttr, infoAttr,
// Write all header data except total size which must be written last.
cmd.dataBuffer[8] = _MSG_REMAINING_HEADER_SIZE // Message header length.
cmd.dataBuffer[9] = byte(readAttr)
cmd.dataBuffer[10] = 0
cmd.dataBuffer[10] = byte(writeAttr)
cmd.dataBuffer[11] = byte(infoAttr)

for i := 12; i < 18; i++ {
Expand Down
1 change: 1 addition & 0 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func (p *BasePolicy) compress() bool {
}

func (p *BasePolicy) grpc() *kvs.ReadPolicy {
// TODO: support ReadTouchTTLPercent in the future for the proxy client
return &kvs.ReadPolicy{
Replica: p.ReplicaPolicy.grpc(),
ReadModeSC: p.ReadModeSC.grpc(),
Expand Down
65 changes: 65 additions & 0 deletions query_durtion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2014-2022 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package aerospike

// QueryDuration defines the expected query duration. The server treats the query in different ways depending on the expected duration.
// This enum is ignored for aggregation queries, background queries and server versions < 6.0.
type QueryDuration int

const (
// LONG specifies that the query is expected to return more than 100 records per node. The server optimizes for a large record set in
// the following ways:
//
// Allow query to be run in multiple threads using the server's query threading configuration.
// Do not relax read consistency for AP namespaces.
// Add the query to the server's query monitor.
// Do not add the overall latency to the server's latency histogram.
// Do not allow server timeouts.
LONG = iota

// Short specifies that the query is expected to return less than 100 records per node. The server optimizes for a small record set in
// the following ways:
// Always run the query in one thread and ignore the server's query threading configuration.
// Allow query to be inlined directly on the server's service thread.
// Relax read consistency for AP namespaces.
// Do not add the query to the server's query monitor.
// Add the overall latency to the server's latency histogram.
// Allow server timeouts. The default server timeout for a short query is 1 second.
SHORT

// LONG_RELAX_AP will treat query as a LONG query, but relax read consistency for AP namespaces.
// This value is treated exactly like LONG for server versions < 7.1.
LONG_RELAX_AP
)

// func (rp QueryDuration) grpc() kvs.Replica {
// TODO: Support GRPC conversions for the proxy client
// switch rp {
// case MASTER:
// return kvs.Replica_MASTER
// case MASTER_PROLES:
// return kvs.Replica_MASTER_PROLES
// case RANDOM:
// return kvs.Replica_RANDOM
// case SEQUENCE:
// return kvs.Replica_SEQUENCE
// case PREFER_RACK:
// return kvs.Replica_PREFER_RACK
// }
// panic("UNREACHABLE")
// }
13 changes: 11 additions & 2 deletions query_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,21 @@ import (
type QueryPolicy struct {
MultiPolicy

// ShortQuery detemines wether query expected to return less than 100 records.
// Expected query duration. The server treats the query in different ways depending on the expected duration.
// This field is ignored for aggregation queries, background queries and server versions < 6.0.
//
// Default: LONG
ExpectedDuration QueryDuration

// ShortQuery determines wether query expected to return less than 100 records.
// If true, the server will optimize the query for a small record set.
// This field is ignored for aggregation queries, background queries
// and server versions 6.0+.
//
// Default: false
// This field is deprecated and will eventually be removed. Use ExpectedDuration instead.
// For backwards compatibility: If ShortQuery is true, the query is treated as a short query and
// ExpectedDuration is ignored. If shortQuery is false, ExpectedDuration is used defaults to {@link QueryDuration#LONG}.
ShortQuery bool
}

Expand Down Expand Up @@ -59,7 +68,7 @@ func (qp *QueryPolicy) grpc() *kvs.QueryPolicy {
MaxConcurrentNodes := uint32(qp.MaxConcurrentNodes)
IncludeBinData := qp.IncludeBinData
FailOnClusterChange := false //qp.FailOnClusterChange
ShortQuery := qp.ShortQuery
ShortQuery := qp.ShortQuery || qp.ExpectedDuration == SHORT
InfoTimeout := uint32(qp.SocketTimeout / time.Millisecond)

return &kvs.QueryPolicy{
Expand Down

0 comments on commit 723eae2

Please sign in to comment.