Skip to content

Commit

Permalink
Use multiple of default replication factor for dynamic replication (#…
Browse files Browse the repository at this point in the history
…10637)

Instead of replicating blocks to all store-gateways when they are eligible
for dynamic replication, adjust the replication factor by a multiple of the
default. This reduces disk and memory requirements for large tenants.

Related #10382
Related #9944

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters authored Feb 13, 2025
1 parent bcd6c87 commit a54bc12
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 93 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* [ENHANCEMENT] Ruler: Adds support for filtering results from rule status endpoint by `file[]`, `rule_group[]` and `rule_name[]`. #10589
* [ENHANCEMENT] Query-frontend: Add option to "spin off" subqueries as actual range queries, so that they benefit from query acceleration techniques such as sharding, splitting and caching. To enable this, set the `-query-frontend.spin-off-instant-subqueries-to-url=<url>` option on the frontend and the `instant_queries_with_subquery_spin_off` per-tenant override with regular expressions matching the queries to enable. #10460 #10603 #10621
* [ENHANCEMENT] Querier, ingester: The series API respects passed `limit` parameter. #10620
* [ENHANCEMENT] Store-gateway: Add experimental settings under `-store-gateway.dynamic-replication` to allow more than the default of 3 store-gateways to own recent blocks. #10382 #10637
* [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185
* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154
* [BUGFIX] Query-frontend and querier: show warning/info annotations in some cases where they were missing (if a lazy querier was used). #10277
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -11725,6 +11725,17 @@
"fieldFlag": "store-gateway.dynamic-replication.max-time-threshold",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "multiple",
"required": false,
"desc": "Multiple of the default replication factor that should be used for recent blocks. Minimum value is 2",
"fieldValue": null,
"fieldDefaultValue": 2,
"fieldFlag": "store-gateway.dynamic-replication.multiple",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -3203,6 +3203,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.
-store-gateway.dynamic-replication.max-time-threshold duration
[experimental] Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas. (default 25h0m0s)
-store-gateway.dynamic-replication.multiple int
[experimental] Multiple of the default replication factor that should be used for recent blocks. Minimum value is 2 (default 2)
-store-gateway.enabled-tenants comma-separated-list-of-strings
Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.
-store-gateway.sharding-ring.auto-forget-enabled
Expand Down
1 change: 1 addition & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ The following features are currently experimental:
- `-query-scheduler.querier-forget-delay`
- Store-gateway
- Eagerly loading some blocks on startup even when lazy loading is enabled `-blocks-storage.bucket-store.index-header.eager-loading-startup-enabled`
- Allow more than the default of 3 store-gateways to own recent blocks `-store-gateway.dynamic-replication`
- Read-write deployment mode
- API endpoints:
- `/api/v1/user_limits`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4959,6 +4959,11 @@ dynamic_replication:
# CLI flag: -store-gateway.dynamic-replication.max-time-threshold
[max_time_threshold: <duration> | default = 25h]
# (experimental) Multiple of the default replication factor that should be
# used for recent blocks. Minimum value is 2
# CLI flag: -store-gateway.dynamic-replication.multiple
[multiple: <int> | default = 2]
# (advanced) Comma separated list of tenants that can be loaded by the
# store-gateway. If specified, only blocks for these tenants will be loaded by
# the store-gateway, otherwise all tenants can be loaded. Subject to sharding.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
var dynamicReplication storegateway.DynamicReplication = storegateway.NewNopDynamicReplication()
if gatewayCfg.DynamicReplication.Enabled {
dynamicReplication = storegateway.NewMaxTimeDynamicReplication(
gatewayCfg.DynamicReplication.MaxTimeThreshold,
gatewayCfg,
// Keep syncing blocks to store-gateways for a grace period (3 times the sync interval) to
// ensure they are not unloaded while they are still being queried.
mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval,
Expand Down
6 changes: 2 additions & 4 deletions pkg/querier/blocks_store_replicated_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,13 @@ func (s *blocksStoreReplicationSet) stopping(_ error) error {
func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blocks bucketindex.Blocks, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
blocksByAddr := make(map[string][]ulid.ULID)
instances := make(map[string]ring.InstanceDesc)

userRing := storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits)
replicationOption := ring.WithReplicationFactor(userRing.InstancesCount())

// Find the replication set of each block we need to query.
for _, block := range blocks {
var ringOpts []ring.Option
if s.dynamicReplication.EligibleForQuerying(block) {
ringOpts = append(ringOpts, replicationOption)
if eligible, replicationFactor := s.dynamicReplication.EligibleForQuerying(block); eligible {
ringOpts = append(ringOpts, ring.WithReplicationFactor(replicationFactor))
}

// Note that we don't pass buffers since we retain instances from the returned replication set.
Expand Down
57 changes: 35 additions & 22 deletions pkg/storegateway/dynamic_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,30 @@ import (

var (
errInvalidDynamicReplicationMaxTimeThreshold = errors.New("invalid dynamic replication max time threshold, the value must be at least one hour")
errInvalidDynamicReplicationFactor = errors.New("invalid dynamic replication factor, the value must be at least 2")
)

type DynamicReplicationConfig struct {
Enabled bool `yaml:"enabled" category:"experimental"`
MaxTimeThreshold time.Duration `yaml:"max_time_threshold" category:"experimental"`
Multiple int `yaml:"multiple" category:"experimental"`
}

func (cfg *DynamicReplicationConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.Enabled, prefix+"dynamic-replication.enabled", false, "Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.")
f.DurationVar(&cfg.MaxTimeThreshold, prefix+"dynamic-replication.max-time-threshold", 25*time.Hour, "Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas.")
f.IntVar(&cfg.Multiple, prefix+"dynamic-replication.multiple", 2, "Multiple of the default replication factor that should be used for recent blocks. Minimum value is 2")
}

func (cfg *DynamicReplicationConfig) Validate() error {
if cfg.Enabled && cfg.MaxTimeThreshold < time.Hour {
return errInvalidDynamicReplicationMaxTimeThreshold
if cfg.Enabled {
if cfg.MaxTimeThreshold < time.Hour {
return errInvalidDynamicReplicationMaxTimeThreshold
}

if cfg.Multiple < 2 {
return errInvalidDynamicReplicationFactor
}
}

return nil
Expand All @@ -41,13 +50,15 @@ type ReplicatedBlock interface {
// DynamicReplication determines if a TSDB block is eligible to be sync to and queried from more
// store-gateways than the configured replication factor based on metadata about the block.
type DynamicReplication interface {
// EligibleForSync returns true if the block can be synced to more than the configured (via
// replication factor) number of store-gateways, false otherwise.
EligibleForSync(b ReplicatedBlock) bool
// EligibleForSync returns true if the block can be synced to more than the default number of
// store-gateways and the appropriate replication factor to use, false and an undefined replication
// factor otherwise.
EligibleForSync(b ReplicatedBlock) (bool, int)

// EligibleForQuerying returns true if the block can be safely queried from more than the
// configured (via replication factor) number of store-gateways, false otherwise.
EligibleForQuerying(b ReplicatedBlock) bool
// default number of store-gateways and the appropriate replication factor to use, false and
// an undefined replication factor otherwise.
EligibleForQuerying(b ReplicatedBlock) (bool, int)
}

func NewNopDynamicReplication() *NopDynamicReplication {
Expand All @@ -57,19 +68,20 @@ func NewNopDynamicReplication() *NopDynamicReplication {
// NopDynamicReplication is an DynamicReplication implementation that always returns false.
type NopDynamicReplication struct{}

func (n NopDynamicReplication) EligibleForSync(ReplicatedBlock) bool {
return false
func (n NopDynamicReplication) EligibleForSync(ReplicatedBlock) (bool, int) {
return false, 0
}

func (n NopDynamicReplication) EligibleForQuerying(ReplicatedBlock) bool {
return false
func (n NopDynamicReplication) EligibleForQuerying(ReplicatedBlock) (bool, int) {
return false, 0
}

func NewMaxTimeDynamicReplication(maxTime time.Duration, gracePeriod time.Duration) *MaxTimeDynamicReplication {
func NewMaxTimeDynamicReplication(cfg Config, gracePeriod time.Duration) *MaxTimeDynamicReplication {
return &MaxTimeDynamicReplication{
maxTimeThreshold: maxTime,
gracePeriod: gracePeriod,
now: time.Now,
maxTimeThreshold: cfg.DynamicReplication.MaxTimeThreshold,
replicationFactor: cfg.DynamicReplication.Multiple * cfg.ShardingRing.ReplicationFactor,
gracePeriod: gracePeriod,
now: time.Now,
}
}

Expand All @@ -78,22 +90,23 @@ func NewMaxTimeDynamicReplication(maxTime time.Duration, gracePeriod time.Durati
// recent sample) is. A grace period can optionally be used to ensure that blocks are
// synced to store-gateways until they are no longer being queried.
type MaxTimeDynamicReplication struct {
maxTimeThreshold time.Duration
gracePeriod time.Duration
now func() time.Time
maxTimeThreshold time.Duration
replicationFactor int
gracePeriod time.Duration
now func() time.Time
}

func (e *MaxTimeDynamicReplication) EligibleForSync(b ReplicatedBlock) bool {
func (e *MaxTimeDynamicReplication) EligibleForSync(b ReplicatedBlock) (bool, int) {
now := e.now()
maxTimeDelta := now.Sub(b.GetMaxTime())
// We keep syncing blocks for `gracePeriod` after they are no longer eligible for
// querying to ensure that they are not unloaded by store-gateways while still being
// queried.
return maxTimeDelta <= (e.maxTimeThreshold + e.gracePeriod)
return maxTimeDelta <= (e.maxTimeThreshold + e.gracePeriod), e.replicationFactor
}

func (e *MaxTimeDynamicReplication) EligibleForQuerying(b ReplicatedBlock) bool {
func (e *MaxTimeDynamicReplication) EligibleForQuerying(b ReplicatedBlock) (bool, int) {
now := e.now()
maxTimeDelta := now.Sub(b.GetMaxTime())
return maxTimeDelta <= e.maxTimeThreshold
return maxTimeDelta <= e.maxTimeThreshold, e.replicationFactor
}
20 changes: 17 additions & 3 deletions pkg/storegateway/dynamic_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,18 @@ func TestMaxTimeExpandedReplication(t *testing.T) {
// Round "now" to the nearest millisecond since we are using millisecond precision
// for min/max times for the blocks.
now := time.Now().Round(time.Millisecond)
replication := NewMaxTimeDynamicReplication(25*time.Hour, 45*time.Minute)
cfg := Config{
DynamicReplication: DynamicReplicationConfig{
Enabled: true,
MaxTimeThreshold: 25 * time.Hour,
Multiple: 2,
},
ShardingRing: RingConfig{
ReplicationFactor: 3,
},
}

replication := NewMaxTimeDynamicReplication(cfg, 45*time.Minute)
replication.now = func() time.Time { return now }

type testCase struct {
Expand Down Expand Up @@ -69,11 +80,14 @@ func TestMaxTimeExpandedReplication(t *testing.T) {

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
canSync := replication.EligibleForSync(&tc.block)
canQuery := replication.EligibleForQuerying(&tc.block)
canSync, rfSync := replication.EligibleForSync(&tc.block)
canQuery, rfQuery := replication.EligibleForQuerying(&tc.block)

require.Equal(t, tc.expectedSync, canSync, "expected to be able/not-able to sync block %+v using %+v", tc.block, replication)
require.Equal(t, 6, rfSync, "expected dynamic replication factor of 6")

require.Equal(t, tc.expectedQuery, canQuery, "expected to be able/not-able to query block %+v using %+v", tc.block, replication)
require.Equal(t, 6, rfQuery, "expected dynamic replication factor of 6")
})
}
}
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi
var dynamicReplication DynamicReplication = NewNopDynamicReplication()
if gatewayCfg.DynamicReplication.Enabled {
dynamicReplication = NewMaxTimeDynamicReplication(
gatewayCfg.DynamicReplication.MaxTimeThreshold,
gatewayCfg,
// Keep syncing blocks to store-gateways for a grace period (3 times the sync interval) to
// ensure they are not unloaded while they are still being queried.
mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval,
Expand Down
5 changes: 2 additions & 3 deletions pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,13 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string,
}

r := GetShuffleShardingSubring(s.r, userID, s.limits)
replicationOption := ring.WithReplicationFactor(r.InstancesCount())
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
bufOption := ring.WithBuffers(bufDescs, bufHosts, bufZones)

for blockID := range metas {
ringOpts := []ring.Option{bufOption}
if s.dynamicReplication.EligibleForSync(metas[blockID]) {
ringOpts = append(ringOpts, replicationOption)
if eligible, replicationFactor := s.dynamicReplication.EligibleForSync(metas[blockID]); eligible {
ringOpts = append(ringOpts, ring.WithReplicationFactor(replicationFactor))
}

// Check if the block is owned by the store-gateway
Expand Down
Loading

0 comments on commit a54bc12

Please sign in to comment.