From 7a95cb86633215805a4b4b7d9399c71c38f43584 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 12 Feb 2024 15:25:51 +0100 Subject: [PATCH] Configurable bloom tokenizer and block settings (#11889) **What this PR does / why we need it**: This PR makes the max block size configurable and wires up the already existing nGramLen and nGramSkip settings. **Special notes for your reviewer**: **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) --- docs/sources/configure/_index.md | 6 ++++++ pkg/bloomcompactor/bloomcompactor.go | 1 + pkg/bloomcompactor/config.go | 1 + pkg/bloomcompactor/controller.go | 10 +++++++++- pkg/validation/limits.go | 26 +++++++++++++++++--------- 5 files changed, 34 insertions(+), 10 deletions(-) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index c3854e434f4da..e79a2503176fc 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -3140,6 +3140,12 @@ shard_streams: # CLI flag: -bloom-gateway.cache-key-interval [bloom_gateway_cache_key_interval: | default = 15m] +# The maximum bloom block size. A value of 0 sets an unlimited size. Default is +# 200MB. The actual block size might exceed this limit since blooms will be +# added to blocks until the block exceeds the maximum block size. +# CLI flag: -bloom-compactor.max-block-size +[bloom_compactor_max_block_size: | default = 200MB] + # Allow user to send structured metadata in push payload. # CLI flag: -validation.allow-structured-metadata [allow_structured_metadata: | default = false] diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index a91b6796e5360..8a3e7c6266c1d 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -98,6 +98,7 @@ func New( c.tsdbStore, c.bloomStore, chunkLoader, + c.limits, c.metrics, c.logger, ) diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index 37c8844378465..37aac3310829a 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -63,6 +63,7 @@ type Limits interface { BloomNGramLength(tenantID string) int BloomNGramSkip(tenantID string) int BloomFalsePositiveRate(tenantID string) float64 + BloomCompactorMaxBlockSize(tenantID string) int } // TODO(owen-d): Remove this type in favor of config.DayTime diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index 92a6f8ca5f7c5..cf6fff090f0ae 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -21,6 +21,7 @@ type SimpleBloomController struct { bloomStore bloomshipper.Store chunkLoader ChunkLoader metrics *Metrics + limits Limits // TODO(owen-d): add metrics logger log.Logger @@ -30,6 +31,7 @@ func NewSimpleBloomController( tsdbStore TSDBStore, blockStore bloomshipper.Store, chunkLoader ChunkLoader, + limits Limits, metrics *Metrics, logger log.Logger, ) *SimpleBloomController { @@ -38,6 +40,7 @@ func NewSimpleBloomController( bloomStore: blockStore, chunkLoader: chunkLoader, metrics: metrics, + limits: limits, logger: logger, } } @@ -110,6 +113,11 @@ func (s *SimpleBloomController) buildBlocks( return errors.Wrap(err, "failed to create plan") } + nGramSize := uint64(s.limits.BloomNGramLength(tenant)) + nGramSkip := uint64(s.limits.BloomNGramSkip(tenant)) + maxBlockSize := uint64(s.limits.BloomCompactorMaxBlockSize(tenant)) + blockOpts := v1.NewBlockOptions(nGramSize, nGramSkip, maxBlockSize) + // 4. Generate Blooms // Now that we have the gaps, we will generate a bloom block for each gap. // We can accelerate this by using existing blocks which may already contain @@ -148,7 +156,7 @@ func (s *SimpleBloomController) buildBlocks( gen := NewSimpleBloomGenerator( tenant, - v1.DefaultBlockOptions, // TODO(salvacorts) make block options configurable + blockOpts, seriesItr, s.chunkLoader, preExistingBlocks, diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index e3052c1781b89..262631643c723 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -56,6 +56,7 @@ const ( defaultMaxStructuredMetadataSize = "64kb" defaultMaxStructuredMetadataCount = 128 + defaultBloomCompactorMaxBlockSize = "200MB" ) // Limits describe all the limits for users; can be used to describe global default @@ -187,15 +188,16 @@ type Limits struct { BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"` - BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` - BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` - BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"` - BloomCompactorChunksBatchSize int `yaml:"bloom_compactor_chunks_batch_size" json:"bloom_compactor_chunks_batch_size"` - BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"` - BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"` - BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"` - BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"` - BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval"` + BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` + BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` + BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"` + BloomCompactorChunksBatchSize int `yaml:"bloom_compactor_chunks_batch_size" json:"bloom_compactor_chunks_batch_size"` + BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"` + BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"` + BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"` + BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"` + BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval"` + BloomCompactorMaxBlockSize flagext.ByteSize `yaml:"bloom_compactor_max_block_size" json:"bloom_compactor_max_block_size"` AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` @@ -333,6 +335,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.") f.IntVar(&l.BloomGatewayBlocksDownloadingParallelism, "bloom-gateway.blocks-downloading-parallelism", 50, "Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.") f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Interval for computing the cache key in the Bloom Gateway.") + _ = l.BloomCompactorMaxBlockSize.Set(defaultBloomCompactorMaxBlockSize) + f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size", "The maximum bloom block size. A value of 0 sets an unlimited size. Default is 200MB. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.") l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) @@ -882,6 +886,10 @@ func (o *Overrides) BloomNGramSkip(userID string) int { return o.getOverridesForUser(userID).BloomNGramSkip } +func (o *Overrides) BloomCompactorMaxBlockSize(userID string) int { + return o.getOverridesForUser(userID).BloomCompactorMaxBlockSize.Val() +} + func (o *Overrides) BloomFalsePositiveRate(userID string) float64 { return o.getOverridesForUser(userID).BloomFalsePositiveRate }