Skip to content

Commit

Permalink
distributor: adjust request limit for replication sets
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <[email protected]>
  • Loading branch information
narqo committed Feb 14, 2025
1 parent 64c405d commit 0ce2492
Showing 1 changed file with 40 additions and 5 deletions.
45 changes: 40 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2754,23 +2754,25 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
return nil, err
}

resultLimit := math.MaxInt
if hints != nil && hints.Limit > 0 {
resultLimit = hints.Limit
req.Limit = int64(adjustRequestLimitForReplicationSets(ctx, d, replicationSets, resultLimit))
}

resps, err := forReplicationSets(ctx, d, replicationSets, func(ctx context.Context, client ingester_client.IngesterClient) (*ingester_client.MetricsForLabelMatchersResponse, error) {
return client.MetricsForLabelMatchers(ctx, req)
})
if err != nil {
return nil, err
}

metricsLimit := math.MaxInt
if hints != nil && hints.Limit > 0 {
metricsLimit = hints.Limit
}
metrics := map[uint64]labels.Labels{}
respsLoop:
for _, resp := range resps {
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp)
for _, m := range ms {
if len(metrics) >= metricsLimit {
if len(metrics) >= resultLimit {
break respsLoop
}
metrics[labels.StableHash(m)] = m
Expand All @@ -2789,6 +2791,39 @@ respsLoop:
return result, nil
}

// adjustRequestLimitForReplicationSets recalculated the limit with respect to replication sets.
// The returned value is the approximation, a query to an individual instance in the replication sets needs to be limited with.
func adjustRequestLimitForReplicationSets(ctx context.Context, d *Distributor, replicationSets []ring.ReplicationSet, limit int) int {
if limit == 0 {
return limit
}

var shardSize int
if d.cfg.IngestStorageConfig.Enabled {
// When ingest storage is enabled, each partition, represented by one replication set is owned by only one ingester.
// So we use the number of replication sets to count the number of shards.
shardSize = len(replicationSets)
} else if len(replicationSets) == 1 {
// We expect to always have exactly 1 replication set when ingest storage is disabled.
// In classic Mimir the total number of shards (ingestion-tenant-shard-size) is the number of ingesters in the shard across all zones.
shardSize = len(replicationSets[0].Instances) / d.ingestersRing.ReplicationFactor()
}
if shardSize == 0 {
return limit
}

newLimit := limit / shardSize

spanLog := spanlogger.FromContext(ctx, d.log)
spanLog.DebugLog(
"msg", "the limit of query needs to be adjusted to account for request sharding",
"original", limit,
"updated", newLimit,
)

return newLimit
}

// MetricsMetadata returns the metrics metadata based on the provided req.
func (d *Distributor) MetricsMetadata(ctx context.Context, req *ingester_client.MetricsMetadataRequest) ([]scrape.MetricMetadata, error) {
replicationSets, err := d.getIngesterReplicationSetsForQuery(ctx)
Expand Down

0 comments on commit 0ce2492

Please sign in to comment.