Skip to content

Commit

Permalink
vecindex: add pacer for vector index operations
Browse files Browse the repository at this point in the history
The pacer will limit the rate of foreground insert and delete operations in the
vector index such that background split and merge operations can keep up. It
does this by setting the allowed ops/sec and then delaying operations that
would otherwise exceed this limit.

This PR introduces the pacer, but does not change the vector index to use it.
That will come in a later PR. However, this PR does simulate usage of the pacer
under different interesting conditions, including edge cases.

Epic: CRDB-42943

Release note: None
  • Loading branch information
andy-kimball committed Jan 14, 2025
1 parent ada0ea7 commit 2e10bb8
Show file tree
Hide file tree
Showing 14 changed files with 1,411 additions and 7 deletions.
6 changes: 3 additions & 3 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4495,10 +4495,10 @@ def go_deps():
name = "com_github_guptarohit_asciigraph",
build_file_proto_mode = "disable_global",
importpath = "github.com/guptarohit/asciigraph",
sha256 = "c2b81da57a50425d313a684efd13d9741c4e9df4c3cca92dea34d562d34271a1",
strip_prefix = "github.com/guptarohit/asciigraph@v0.5.5",
sha256 = "ec30034bd6d082f3242a5410ae1d02d9a4d164504e735f8448766461207be5a5",
strip_prefix = "github.com/guptarohit/asciigraph@v0.7.3",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/guptarohit/asciigraph/com_github_guptarohit_asciigraph-v0.5.5.zip",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/guptarohit/asciigraph/com_github_guptarohit_asciigraph-v0.7.3.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/grpc-ecosystem/go-grpc-prometheus/com_github_grpc_ecosystem_go_grpc_prometheus-v1.2.0.zip": "124dfc63aa52611a2882417e685c0452d4d99d64c13836a6a6747675e911fc17",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/grpc-ecosystem/grpc-gateway/com_github_grpc_ecosystem_grpc_gateway-v1.16.0.zip": "377b03aef288b34ed894449d3ddba40d525dd7fb55de6e79045cdf499e7fe565",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/gsterjov/go-libsecret/com_github_gsterjov_go_libsecret-v0.0.0-20161001094733-a6f4afe4910c.zip": "cffe0a452fd3f00e4d07730caeb254417a720d907294b5b4a3428322655fb130",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/guptarohit/asciigraph/com_github_guptarohit_asciigraph-v0.5.5.zip": "c2b81da57a50425d313a684efd13d9741c4e9df4c3cca92dea34d562d34271a1",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/guptarohit/asciigraph/com_github_guptarohit_asciigraph-v0.7.3.zip": "ec30034bd6d082f3242a5410ae1d02d9a4d164504e735f8448766461207be5a5",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/hailocab/go-hostpool/com_github_hailocab_go_hostpool-v0.0.0-20160125115350-e80d13ce29ed.zip": "faf2b985681cda77ab928976b620b790585e364b6aff351483227d474db85e9a",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/hashicorp/consul/api/com_github_hashicorp_consul_api-v1.10.1.zip": "a84081dcb2361b540bb787871abedc0f9569c09637f5b5c40e973500a4402a82",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/hashicorp/consul/sdk/com_github_hashicorp_consul_sdk-v0.8.0.zip": "cf29fff6c000ee67eda1b8cacec9648d06944e3cdbb80e2e22dc0165708974c6",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ require (
github.com/goware/modvendor v0.5.0
github.com/grafana/grafana-openapi-client-go v0.0.0-20240215164046-eb0e60d27cb7
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/guptarohit/asciigraph v0.5.5
github.com/guptarohit/asciigraph v0.7.3
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040
github.com/irfansharif/recorder v0.0.0-20211218081646-a21b46510fd6
github.com/jackc/pgx/v5 v5.4.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,8 @@ github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/guptarohit/asciigraph v0.5.5 h1:ccFnUF8xYIOUPPY3tmdvRyHqmn1MYI9iv1pLKX+/ZkQ=
github.com/guptarohit/asciigraph v0.5.5/go.mod h1:dYl5wwK4gNsnFf9Zp+l06rFiDZ5YtXM6x7SRWZ3KGag=
github.com/guptarohit/asciigraph v0.7.3 h1:p05XDDn7cBTWiBqWb30mrwxd6oU0claAjqeytllnsPY=
github.com/guptarohit/asciigraph v0.7.3/go.mod h1:dYl5wwK4gNsnFf9Zp+l06rFiDZ5YtXM6x7SRWZ3KGag=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/vecindex/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"fixup_processor.go",
"index_stats.go",
"kmeans.go",
"pacer.go",
"split_data.go",
"vector_index.go",
],
Expand All @@ -26,6 +27,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/vector",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_errors//:errors",
"@org_golang_x_exp//slices",
"@org_gonum_v1_gonum//stat",
Expand All @@ -38,6 +40,7 @@ go_test(
"fixup_processor_test.go",
"index_stats_test.go",
"kmeans_test.go",
"pacer_test.go",
"vector_index_test.go",
],
data = glob(["testdata/**"]),
Expand All @@ -52,8 +55,10 @@ go_test(
"//pkg/util/num32",
"//pkg/util/stop",
"//pkg/util/vector",
"@com_github_cockroachdb_crlib//crtime",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_guptarohit_asciigraph//:asciigraph",
"@com_github_stretchr_testify//require",
"@org_gonum_v1_gonum//floats/scalar",
"@org_gonum_v1_gonum//stat",
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/vecindex/fixup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
// maxFixups specifies the maximum number of pending index fixups that can be
// enqueued by foreground threads, waiting for processing. Hitting this limit
// indicates the background goroutine has fallen far behind.
const maxFixups = 100
const maxFixups = 200

// fixup describes an index fixup so that it can be enqueued for processing.
// Each fixup type needs to have some subset of the fields defined.
Expand Down
290 changes: 290 additions & 0 deletions pkg/sql/vecindex/pacer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package vecindex

import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/crlib/crtime"
)

// targetQueuedFixups is the number of fixups that are allowed in the queue
// before throttling may begin. Note that even if the current queue size is
// below this threshold, throttling will still occur if the queue size is
// increasing at too high of a rate. Also, this is a "soft" target; as long as
// the size is reasonably close, the pacer won't do much.
const targetQueuedFixups = 5

// maxQueueSizeRate clamps the measured change in queue size over the course of
// one second, either positive or negative. This avoids pacer overreaction to
// brief bursts of change in small intervals.
const maxQueueSizeRate = 5

// gradualQueueSizeMax specifies the max rate of change when the fixup queue
// size needs to be reduced. For example, if the current fixup queue size is 50,
// this is much bigger than the allowed size of 5. However, rather than attempt
// to reduce the size from 50 to 5 in one step, this setting tries to reduce it
// in increments of 2 fixups per second.
const gradualQueueSizeMax = 2

// deltaFactor governs how quickly the pacer makes changes to the allowed
// ops/sec. A higher factor value makes the pacer more responsive to changes,
// but increases how much it will overshoot the point of equilibrium.
const deltaFactor = 2

// pacer limits the rate of foreground insert and delete operations in the
// vector index such that background split and merge operations can keep up. It
// does this by setting the allowed ops/sec and then delaying operations that
// would otherwise exceed this limit.
//
// During normal operation, the pacer sets ops/sec at a level that tries to
// maintain the fixup queue at its current size (i.e. change rate of zero).
// However, there are two cases in which it will target a non-zero change rate:
//
// 1. If the fixup queue is empty (or nearly so) and operations are being
// delayed, then the pacer will raise allowed ops/sec, since it might be set
// too low (as evidenced by a small queue).
// 2. If the fixup queue size is > targetQueuedFixups, then the pacer will
// reduce allowed ops/sec in an attempt to reduce queue size. It does this
// in increments, with the goal of reducing queue size over time rather than
// all at once.
type pacer struct {
// monoNow measures elapsed time and can be mocked for testing.
monoNow func() crtime.Mono

mu struct {
syncutil.Mutex

// lastOpAt records the time of the last insert or delete operation that
// updated the token bucket.
lastOpAt crtime.Mono

// lastUpdateAt records the time of the last update to allowed ops/sec.
lastUpdateAt crtime.Mono

// lastQueuedFixups remembers the size of the fixup queue when the last
// insert or delete operation was executed. It's used to observe the delta
// in queue size since that time.
lastQueuedFixups int

// queueSizeRate estimates how much the size of the fixup queue has changed
// over the last second. It is computed as an exponential moving average
// (EMA) and clamped to +-maxQueueSizeRate.
queueSizeRate float64

// allowedOpsPerSec is the maximum rate of insert or delete operations
// that is allowed by the pacer.
allowedOpsPerSec float64

// currentTokens tracks how many tokens are currently in the bucket. Each
// token represents one insert or delete operation. When currentTokens
// drops below zero, operations will be delayed.
currentTokens float64

// delayed is true if the pacer has delayed an insert or delete operation
// since the last update to allowed ops/sec.
delayed bool
}
}

// Init sets up the pacer. "initialOpsPerSec" initializes the token bucket
// refill rate that governs how many insert or delete operations run per second.
// This value will automatically change over time, but a more accurate initial
// value can decrease "ramp up" time for the pacer as it learns the optimal
// pace. "initialFixups" specifies the initial number of fixups in the queue
// (used for testing).
func (p *pacer) Init(initialOpsPerSec int, initialFixups int, monoNow func() crtime.Mono) {
p.monoNow = monoNow
p.mu.lastUpdateAt = monoNow()
p.mu.allowedOpsPerSec = float64(initialOpsPerSec)
p.mu.lastQueuedFixups = initialFixups
}

// OnFixup is called when the size of the fixup queue has changed because a
// fixup has been added or removed to/from the queue by the vector index.
func (p *pacer) OnFixup(queuedFixups int) {
p.mu.Lock()
defer p.mu.Unlock()

// Compute elapsed time since the last update to allowed ops/sec.
now := p.monoNow()
sinceUpdate := now.Sub(p.mu.lastUpdateAt)
if sinceUpdate == 0 {
// Avoid division by zero.
sinceUpdate = 1
}
p.mu.lastUpdateAt = now

p.updateOpsPerSecLocked(sinceUpdate, queuedFixups)
}

// OnInsertOrDelete is called when an insert or delete operation is about to be
// run by the vector index. It takes the current size of the fixup queue and
// based on that, returns how much time to delay before running the operation.
// This ensures that background index maintenance operations do not fall too far
// behind foreground operations.
func (p *pacer) OnInsertOrDelete(queuedFixups int) time.Duration {
p.mu.Lock()
defer p.mu.Unlock()

// Fast path: if there are enough tokens in the bucket, no need for delay.
p.mu.currentTokens--
if p.mu.currentTokens >= 0 {
return 0
}

// If it's been at least a second since allowed ops/sec was updated, do so
// now. This handles an edge case where ops/sec is being throttled so heavily
// (e.g. 1 op/sec) that fixups are rare, and it takes too long to increase
// allowed ops/sec.
now := p.monoNow()
sinceUpdate := now.Sub(p.mu.lastUpdateAt)
if sinceUpdate >= time.Second {
p.mu.lastUpdateAt = now
p.updateOpsPerSecLocked(sinceUpdate, queuedFixups)
}

// Compute elapsed time since the last insert or delete operation that
// updated the token bucket.
sinceOp := now.Sub(p.mu.lastOpAt)
p.mu.lastOpAt = now

// Add tokens to the bucket based on elapsed time. Allow bucket to contain
// up to one second of excess tokens.
p.mu.currentTokens += p.mu.allowedOpsPerSec * sinceOp.Seconds()
if p.mu.currentTokens > p.mu.allowedOpsPerSec {
p.mu.currentTokens = p.mu.allowedOpsPerSec
}

if p.mu.currentTokens >= 0 {
// Enough tokens, no delay.
return 0
}

// The token bucket has gone into "debt", so return the pacing delay that
// enforces the allowed ops/sec. This is the inverse of allowed ops/sec, or
// the "time between operations". It is multiplied by the number of tokens
// of other waiting operations (i.e. the token debt). For example, if the
// allowed ops/sec is 1000, then operations should have an average 1 ms
// interval between them. If three operations arrive in immediate succession,
// then the first should have 0 ms delay, the second should have 1 ms delay,
// and the third should have 2 ms delay, and so on.
p.mu.delayed = true
return time.Duration(float64(time.Second) * -p.mu.currentTokens / p.mu.allowedOpsPerSec)
}

// OnInsertOrDeleteCanceled should be called when an insert or delete operation
// has its context canceled while waiting out its pacing delay. Because the
// operation was never completed, its token should not be consumed. Without
// this, a repeated sequence of cancellations could cause the token bucket to go
// increasingly negative, causing ever-increasing delays.
func (p *pacer) OnInsertOrDeleteCanceled() {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.currentTokens++
}

// updateOpsPerSecLocked updates the allowed ops/sec based on the number of
// fixups in the queue. Updates are scaled by the amount of time that's elapsed
// since the last call to updateOpsPerSecLocked. Allowing sub-second elapsed
// increments allows the pacer to be significantly more responsive.
func (p *pacer) updateOpsPerSecLocked(elapsed time.Duration, queuedFixups int) {
// Remember if any operation was throttled since the last call to update.
delayed := p.mu.delayed
p.mu.delayed = false

// Calculate the desired rate of change in the fixup queue size over the next
// second.
var desiredQueueSizeRate float64
if queuedFixups > targetQueuedFixups {
// If the fixup queue is too large, reduce it a gradual rate that's
// proportional to its distance from the target. Never reduce it more
// than gradualQueueSizeMax.
const gradualRateFactor = 10
desiredQueueSizeRate = float64(targetQueuedFixups-queuedFixups) / gradualRateFactor
desiredQueueSizeRate = max(desiredQueueSizeRate, -gradualQueueSizeMax)
} else if queuedFixups <= 1 {
// If the fixup queue is empty or has just one fixup, then it could be
// that background fixups are happening fast enough. However, it's also
// possible that the fixup queue is small because the pacer is heavily
// throttling operations. Sharply increase allowed ops/sec, up to the
// target, in case that's true.
desiredQueueSizeRate = float64(targetQueuedFixups - queuedFixups)
}

// Calculate the actual rate of change in the fixup queue size over the last
// second.
actualQueueSizeRate := p.calculateQueueSizeRateLocked(elapsed, queuedFixups)

// Calculate the net rate that's needed to match the desired rate. For
// example, if we desire to decrease the queue size by 2 fixups/sec, but the
// queue is actually growing at 2 fixups/sec, then we need a net decrease of
// 4 fixups/sec.
netQueueSizeRate := desiredQueueSizeRate - actualQueueSizeRate
netQueueSizeRate = max(min(netQueueSizeRate, maxQueueSizeRate), -maxQueueSizeRate)

// Do not increase allowed ops/sec if operations are not being throttled.
// Otherwise, if there's little or no activity, the pacer would never stop
// increasing allowed ops/sec.
if netQueueSizeRate > 0 && !delayed {
return
}

// Determine how much to change allowed ops/sec to achieve the net change in
// fixup queue size over the next second. When allowed ops/sec is small,
// allow it to ramp quickly by starting with a minimum delta of 10 ops/sec.
const minDeltaOpsPerSec = 10
deltaOpsPerSec := max(p.mu.allowedOpsPerSec, minDeltaOpsPerSec)
if netQueueSizeRate < 0 {
// Decrease ops/sec up to some % of its current value. For example, if
// deltaFactor is 2, then it can decrease by up to 50% of its current
// value.
deltaOpsPerSec = deltaOpsPerSec - deltaOpsPerSec/deltaFactor
} else {
// Increase ops/sec by some % of its current value. For example, if
// deltaFactor is 2, then it can increase by up to 100% of its current
// value.
deltaOpsPerSec = deltaOpsPerSec*deltaFactor - deltaOpsPerSec
}

// Scale the change in ops/sec by the magnitude of desired change with respect
// to the max allowed change.
deltaOpsPerSec *= netQueueSizeRate / maxQueueSizeRate

// Scale the delta based on the elapsed time. For example, if we want to
// decrease ops/sec by 200, but it's been only 0.5 seconds since the last
// fixup, then we need to change ops/sec by -200 * 0.5 = -100. This allows
// for multiple micro-adjustments over the course of a second that add up to
// the full adjustment (if the trend doesn't change).
deltaOpsPerSec = deltaOpsPerSec * min(elapsed.Seconds(), 1)

// Update allowed ops/sec, but don't let it fall below 1, even in case where,
// for example, fixups are somehow blocked.
p.mu.allowedOpsPerSec = max(p.mu.allowedOpsPerSec+deltaOpsPerSec, 1)
}

// calculateQueueSizeRateLocked calculates the exponential moving average (EMA)
// of the rate of change in the fixup queue size, over the last second.
func (p *pacer) calculateQueueSizeRateLocked(elapsed time.Duration, queuedFixups int) float64 {
// Calculate the rate of change in the fixup queue size over the elapsed time
// period.
queueSizeRate := float64(queuedFixups-p.mu.lastQueuedFixups) / elapsed.Seconds()
p.mu.lastQueuedFixups = queuedFixups

// Factor that sample into the EMA by weighting it according to the elapsed
// time (clamped to 1 second max).
alpha := min(elapsed.Seconds(), 1)
p.mu.queueSizeRate = alpha*queueSizeRate + (1-alpha)*p.mu.queueSizeRate

// Clamp the overall rate of change in order to prevent anomalies when a large
// number of fixups are generated in a short amount of time (e.g. because of
// statistical clustering or a backlog of fixups that is suddenly added to
// the queue).
p.mu.queueSizeRate = max(min(p.mu.queueSizeRate, maxQueueSizeRate), -maxQueueSizeRate)

return p.mu.queueSizeRate
}
Loading

0 comments on commit 2e10bb8

Please sign in to comment.