Skip to content

Commit

Permalink
[CLIENT-2721] Make PartitionFilter.Retry public
Browse files Browse the repository at this point in the history
  • Loading branch information
khaf committed Dec 14, 2023
1 parent 3eab345 commit 0fbd578
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion partition_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type PartitionFilter struct {
// Partitions encapsulates the cursor for the progress of the scan/query to be used for pagination.
Partitions []*PartitionStatus
Done bool
retry bool
Retry bool
}

// NewPartitionFilterAll creates a partition filter that
Expand Down
18 changes: 9 additions & 9 deletions partition_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ func newPartitionTracker(policy *MultiPolicy, filter *PartitionFilter, nodes []*

if len(filter.Partitions) == 0 {
filter.Partitions = pt.initPartitions(policy, filter.Count, filter.Digest)
filter.retry = true
filter.Retry = true
} else {
// Retry all partitions when maxRecords not specified.
if policy.MaxRecords <= 0 {
filter.retry = true
filter.Retry = true
}

// Reset replica sequence and last node used.
Expand Down Expand Up @@ -173,7 +173,7 @@ func (pt *partitionTracker) assignPartitionsToNodes(cluster *Cluster, namespace
}

p := NewPartitionForReplicaPolicy(namespace, pt.replica)
retry := (pt.partitionFilter == nil || pt.partitionFilter.retry) && (pt.iteration == 1)
retry := (pt.partitionFilter == nil || pt.partitionFilter.Retry) && (pt.iteration == 1)

for _, part := range pt.partitions {
if retry || part.Retry {
Expand Down Expand Up @@ -212,7 +212,7 @@ func (pt *partitionTracker) assignPartitionsToNodes(cluster *Cluster, namespace
// Global retry will be set to false if the scan/query completes normally and maxRecords
// is specified.
if pt.partitionFilter != nil {
pt.partitionFilter.retry = true
pt.partitionFilter.Retry = true
}

pt.recordCount = nil
Expand Down Expand Up @@ -311,7 +311,7 @@ func (pt *partitionTracker) isComplete(cluster *Cluster, policy *BasePolicy) (bo
if partsUnavailable == 0 {
if pt.maxRecords <= 0 {
if pt.partitionFilter != nil {
pt.partitionFilter.retry = false
pt.partitionFilter.Retry = false
pt.partitionFilter.Done = true
}
} else if pt.iteration > 1 {
Expand All @@ -320,7 +320,7 @@ func (pt *partitionTracker) isComplete(cluster *Cluster, policy *BasePolicy) (bo
// next iteration. If that node finally succeeds, the other original nodes still
// need to be retried if partition state is reused in the next scan/query command.
// Force retry on all node partitions.
pt.partitionFilter.retry = true
pt.partitionFilter.Retry = true
pt.partitionFilter.Done = false
}
} else {
Expand All @@ -339,7 +339,7 @@ func (pt *partitionTracker) isComplete(cluster *Cluster, policy *BasePolicy) (bo
}

if pt.partitionFilter != nil {
pt.partitionFilter.retry = false
pt.partitionFilter.Retry = false
pt.partitionFilter.Done = done
}
} else {
Expand All @@ -353,7 +353,7 @@ func (pt *partitionTracker) isComplete(cluster *Cluster, policy *BasePolicy) (bo
}

if pt.partitionFilter != nil {
pt.partitionFilter.retry = false
pt.partitionFilter.Retry = false
pt.partitionFilter.Done = (recordCount == 0)
}
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func (pt *partitionTracker) markRetry(nodePartitions *nodePartitions) {
func (pt *partitionTracker) partitionError() {
// Mark all partitions for retry on fatal errors.
if pt.partitionFilter != nil {
pt.partitionFilter.retry = true
pt.partitionFilter.Retry = true
}
}

Expand Down

0 comments on commit 0fbd578

Please sign in to comment.