Skip to content

Commit

Permalink
Use multiple of default replication factor for dynamic replication
Browse files Browse the repository at this point in the history
Instead of replicating blocks to all store-gateways when they are eligible
for dynamic replication, adjust the replication factor by a multiple of the
default. This reduces disk and memory requirements for large tenants.

Related #10382
Related #9944

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters committed Feb 12, 2025
1 parent bcd6c87 commit 5b98e3f
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 93 deletions.
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -11725,6 +11725,17 @@
"fieldFlag": "store-gateway.dynamic-replication.max-time-threshold",
"fieldType": "duration",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "multiple",
"required": false,
"desc": "Multiple of the default replication factor that should be used for recent blocks. Minimum value is 2",
"fieldValue": null,
"fieldDefaultValue": 2,
"fieldFlag": "store-gateway.dynamic-replication.multiple",
"fieldType": "int",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -3203,6 +3203,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.
-store-gateway.dynamic-replication.max-time-threshold duration
[experimental] Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas. (default 25h0m0s)
-store-gateway.dynamic-replication.multiple int
[experimental] Multiple of the default replication factor that should be used for recent blocks. Minimum value is 2 (default 2)
-store-gateway.enabled-tenants comma-separated-list-of-strings
Comma separated list of tenants that can be loaded by the store-gateway. If specified, only blocks for these tenants will be loaded by the store-gateway, otherwise all tenants can be loaded. Subject to sharding.
-store-gateway.sharding-ring.auto-forget-enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4959,6 +4959,11 @@ dynamic_replication:
# CLI flag: -store-gateway.dynamic-replication.max-time-threshold
[max_time_threshold: <duration> | default = 25h]
# (experimental) Multiple of the default replication factor that should be
# used for recent blocks. Minimum value is 2
# CLI flag: -store-gateway.dynamic-replication.multiple
[multiple: <int> | default = 2]
# (advanced) Comma separated list of tenants that can be loaded by the
# store-gateway. If specified, only blocks for these tenants will be loaded by
# the store-gateway, otherwise all tenants can be loaded. Subject to sharding.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
var dynamicReplication storegateway.DynamicReplication = storegateway.NewNopDynamicReplication()
if gatewayCfg.DynamicReplication.Enabled {
dynamicReplication = storegateway.NewMaxTimeDynamicReplication(
gatewayCfg.DynamicReplication.MaxTimeThreshold,
gatewayCfg,
// Keep syncing blocks to store-gateways for a grace period (3 times the sync interval) to
// ensure they are not unloaded while they are still being queried.
mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval,
Expand Down
6 changes: 2 additions & 4 deletions pkg/querier/blocks_store_replicated_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,13 @@ func (s *blocksStoreReplicationSet) stopping(_ error) error {
func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blocks bucketindex.Blocks, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
blocksByAddr := make(map[string][]ulid.ULID)
instances := make(map[string]ring.InstanceDesc)

userRing := storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits)
replicationOption := ring.WithReplicationFactor(userRing.InstancesCount())

// Find the replication set of each block we need to query.
for _, block := range blocks {
var ringOpts []ring.Option
if s.dynamicReplication.EligibleForQuerying(block) {
ringOpts = append(ringOpts, replicationOption)
if eligible, replicationFactor := s.dynamicReplication.EligibleForQuerying(block); eligible {
ringOpts = append(ringOpts, ring.WithReplicationFactor(replicationFactor))
}

// Note that we don't pass buffers since we retain instances from the returned replication set.
Expand Down
57 changes: 35 additions & 22 deletions pkg/storegateway/dynamic_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,30 @@ import (

var (
errInvalidDynamicReplicationMaxTimeThreshold = errors.New("invalid dynamic replication max time threshold, the value must be at least one hour")
errInvalidDynamicReplicationFactor = errors.New("invalid dynamic replication factor, the value must be at least 2")
)

type DynamicReplicationConfig struct {
Enabled bool `yaml:"enabled" category:"experimental"`
MaxTimeThreshold time.Duration `yaml:"max_time_threshold" category:"experimental"`
Multiple int `yaml:"multiple" category:"experimental"`
}

func (cfg *DynamicReplicationConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.Enabled, prefix+"dynamic-replication.enabled", false, "Use a higher number of replicas for recent blocks. Useful to spread query load more evenly at the cost of slightly higher disk usage.")
f.DurationVar(&cfg.MaxTimeThreshold, prefix+"dynamic-replication.max-time-threshold", 25*time.Hour, "Threshold of the most recent sample in a block used to determine it is eligible for higher than default replication. If a block has samples within this amount of time, it is considered recent and will be owned by more replicas.")
f.IntVar(&cfg.Multiple, prefix+"dynamic-replication.multiple", 2, "Multiple of the default replication factor that should be used for recent blocks. Minimum value is 2")
}

func (cfg *DynamicReplicationConfig) Validate() error {
if cfg.Enabled && cfg.MaxTimeThreshold < time.Hour {
return errInvalidDynamicReplicationMaxTimeThreshold
if cfg.Enabled {
if cfg.MaxTimeThreshold < time.Hour {
return errInvalidDynamicReplicationMaxTimeThreshold
}

if cfg.Multiple < 2 {
return errInvalidDynamicReplicationFactor
}
}

return nil
Expand All @@ -41,13 +50,15 @@ type ReplicatedBlock interface {
// DynamicReplication determines if a TSDB block is eligible to be sync to and queried from more
// store-gateways than the configured replication factor based on metadata about the block.
type DynamicReplication interface {
// EligibleForSync returns true if the block can be synced to more than the configured (via
// replication factor) number of store-gateways, false otherwise.
EligibleForSync(b ReplicatedBlock) bool
// EligibleForSync returns true if the block can be synced to more than the default number of
// store-gateways and the appropriate replication factor to use, false and an undefined replication
// factor otherwise.
EligibleForSync(b ReplicatedBlock) (bool, int)

// EligibleForQuerying returns true if the block can be safely queried from more than the
// configured (via replication factor) number of store-gateways, false otherwise.
EligibleForQuerying(b ReplicatedBlock) bool
// default number of store-gateways and the appropriate replication factor to use, false and
// an undefined replication factor otherwise.
EligibleForQuerying(b ReplicatedBlock) (bool, int)
}

func NewNopDynamicReplication() *NopDynamicReplication {
Expand All @@ -57,19 +68,20 @@ func NewNopDynamicReplication() *NopDynamicReplication {
// NopDynamicReplication is an DynamicReplication implementation that always returns false.
type NopDynamicReplication struct{}

func (n NopDynamicReplication) EligibleForSync(ReplicatedBlock) bool {
return false
func (n NopDynamicReplication) EligibleForSync(ReplicatedBlock) (bool, int) {
return false, 0
}

func (n NopDynamicReplication) EligibleForQuerying(ReplicatedBlock) bool {
return false
func (n NopDynamicReplication) EligibleForQuerying(ReplicatedBlock) (bool, int) {
return false, 0
}

func NewMaxTimeDynamicReplication(maxTime time.Duration, gracePeriod time.Duration) *MaxTimeDynamicReplication {
func NewMaxTimeDynamicReplication(cfg Config, gracePeriod time.Duration) *MaxTimeDynamicReplication {
return &MaxTimeDynamicReplication{
maxTimeThreshold: maxTime,
gracePeriod: gracePeriod,
now: time.Now,
maxTimeThreshold: cfg.DynamicReplication.MaxTimeThreshold,
replicationFactor: cfg.DynamicReplication.Multiple * cfg.ShardingRing.ReplicationFactor,
gracePeriod: gracePeriod,
now: time.Now,
}
}

Expand All @@ -78,22 +90,23 @@ func NewMaxTimeDynamicReplication(maxTime time.Duration, gracePeriod time.Durati
// recent sample) is. A grace period can optionally be used to ensure that blocks are
// synced to store-gateways until they are no longer being queried.
type MaxTimeDynamicReplication struct {
maxTimeThreshold time.Duration
gracePeriod time.Duration
now func() time.Time
maxTimeThreshold time.Duration
replicationFactor int
gracePeriod time.Duration
now func() time.Time
}

func (e *MaxTimeDynamicReplication) EligibleForSync(b ReplicatedBlock) bool {
func (e *MaxTimeDynamicReplication) EligibleForSync(b ReplicatedBlock) (bool, int) {
now := e.now()
maxTimeDelta := now.Sub(b.GetMaxTime())
// We keep syncing blocks for `gracePeriod` after they are no longer eligible for
// querying to ensure that they are not unloaded by store-gateways while still being
// queried.
return maxTimeDelta <= (e.maxTimeThreshold + e.gracePeriod)
return maxTimeDelta <= (e.maxTimeThreshold + e.gracePeriod), e.replicationFactor
}

func (e *MaxTimeDynamicReplication) EligibleForQuerying(b ReplicatedBlock) bool {
func (e *MaxTimeDynamicReplication) EligibleForQuerying(b ReplicatedBlock) (bool, int) {
now := e.now()
maxTimeDelta := now.Sub(b.GetMaxTime())
return maxTimeDelta <= e.maxTimeThreshold
return maxTimeDelta <= e.maxTimeThreshold, e.replicationFactor
}
20 changes: 17 additions & 3 deletions pkg/storegateway/dynamic_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,18 @@ func TestMaxTimeExpandedReplication(t *testing.T) {
// Round "now" to the nearest millisecond since we are using millisecond precision
// for min/max times for the blocks.
now := time.Now().Round(time.Millisecond)
replication := NewMaxTimeDynamicReplication(25*time.Hour, 45*time.Minute)
cfg := Config{
DynamicReplication: DynamicReplicationConfig{
Enabled: true,
MaxTimeThreshold: 25 * time.Hour,
Multiple: 2,
},
ShardingRing: RingConfig{
ReplicationFactor: 3,
},
}

replication := NewMaxTimeDynamicReplication(cfg, 45*time.Minute)
replication.now = func() time.Time { return now }

type testCase struct {
Expand Down Expand Up @@ -69,11 +80,14 @@ func TestMaxTimeExpandedReplication(t *testing.T) {

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
canSync := replication.EligibleForSync(&tc.block)
canQuery := replication.EligibleForQuerying(&tc.block)
canSync, rfSync := replication.EligibleForSync(&tc.block)
canQuery, rfQuery := replication.EligibleForQuerying(&tc.block)

require.Equal(t, tc.expectedSync, canSync, "expected to be able/not-able to sync block %+v using %+v", tc.block, replication)
require.Equal(t, 6, rfSync, "expected dynamic replication factor of 6")

require.Equal(t, tc.expectedQuery, canQuery, "expected to be able/not-able to query block %+v using %+v", tc.block, replication)
require.Equal(t, 6, rfQuery, "expected dynamic replication factor of 6")
})
}
}
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi
var dynamicReplication DynamicReplication = NewNopDynamicReplication()
if gatewayCfg.DynamicReplication.Enabled {
dynamicReplication = NewMaxTimeDynamicReplication(
gatewayCfg.DynamicReplication.MaxTimeThreshold,
gatewayCfg,
// Keep syncing blocks to store-gateways for a grace period (3 times the sync interval) to
// ensure they are not unloaded while they are still being queried.
mimir_tsdb.NewBlockDiscoveryDelayMultiplier*storageCfg.BucketStore.SyncInterval,
Expand Down
5 changes: 2 additions & 3 deletions pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,13 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string,
}

r := GetShuffleShardingSubring(s.r, userID, s.limits)
replicationOption := ring.WithReplicationFactor(r.InstancesCount())
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
bufOption := ring.WithBuffers(bufDescs, bufHosts, bufZones)

for blockID := range metas {
ringOpts := []ring.Option{bufOption}
if s.dynamicReplication.EligibleForSync(metas[blockID]) {
ringOpts = append(ringOpts, replicationOption)
if eligible, replicationFactor := s.dynamicReplication.EligibleForSync(metas[blockID]); eligible {
ringOpts = append(ringOpts, ring.WithReplicationFactor(replicationFactor))
}

// Check if the block is owned by the store-gateway
Expand Down
Loading

0 comments on commit 5b98e3f

Please sign in to comment.