Skip to content

Commit

Permalink
Refactor discarded metrics to report retention_hours
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanGuedes committed Jan 21, 2025
1 parent 6532b4b commit 87db09e
Show file tree
Hide file tree
Showing 15 changed files with 177 additions and 71 deletions.
6 changes: 6 additions & 0 deletions pkg/compactor/deletion/tenant_request_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"time"

"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/validation"
)

Expand Down Expand Up @@ -81,3 +83,7 @@ func (f *fakeLimits) RetentionPeriod(userID string) time.Duration {
func (f *fakeLimits) StreamRetention(userID string) []validation.StreamRetention {
return f.getLimitForUser(userID).streamRetention
}

func (f *fakeLimits) RetentionHours(userID string, labels labels.Labels) string {

Check warning on line 87 in pkg/compactor/deletion/tenant_request_handler_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'labels' seems to be unused, consider removing or renaming it as _ (revive)
return util.RetentionHours(f.getLimitForUser(userID).retentionPeriod)
}
68 changes: 47 additions & 21 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
streams := make([]KeyedStream, 0, len(req.Streams))
validatedLineSize := 0
validatedLineCount := 0
validationMetrics := newValidationMetrics()

var validationErrors util.GroupedErrors

Expand Down Expand Up @@ -494,6 +493,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

tenantRetentionHours := d.validator.Limits.RetentionHours(tenantID, nil)

func() {
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
Expand All @@ -513,23 +514,25 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
d.truncateLines(validationContext, &stream)

var lbs labels.Labels
lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, stream)
lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, stream, tenantRetentionHours)
if err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(len(stream.Entries)))
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetentionHours).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID).Add(float64(discardedBytes))
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID, tenantRetentionHours).Add(float64(discardedBytes))
continue
}

retentionHours := d.validator.Limits.RetentionHours(tenantID, lbs)

if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID); missing {
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(len(stream.Entries)))
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(discardedBytes))
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours).Add(float64(discardedBytes))
continue
}

Expand All @@ -538,7 +541,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
prevTs := stream.Entries[0].Timestamp

for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil {
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry, retentionHours); err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
continue
Expand Down Expand Up @@ -593,8 +596,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

n++
validatedLineSize += util.EntryTotalSize(&entry)
validatedLineCount++
validationMetrics.compute(entry, retentionHours)
pushSize += len(entry.Line)
}
stream.Entries = stream.Entries[:n]
Expand All @@ -618,7 +620,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion)
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationMetrics, validation.BlockedIngestion, tenantRetentionHours)

err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)
Expand All @@ -632,10 +634,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error())
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.RateLimited)
if !d.ingestionRateLimiter.AllowN(now, tenantID, validationMetrics.lineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationMetrics, validation.RateLimited, tenantRetentionHours)

err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize)
err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validationMetrics.lineCount, validationMetrics.lineSize)
d.writeFailuresManager.Log(tenantID, err)
// Return a 429 to indicate to the client they are being rate limited
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error())
Expand Down Expand Up @@ -769,16 +771,18 @@ func (d *Distributor) trackDiscardedData(
req *logproto.PushRequest,
validationContext validationContext,
tenantID string,
validatedLineCount int,
validatedLineSize int,
validationMetrics validationMetrics,
reason string,
tenantRetentionHours string,
) {
validation.DiscardedSamples.WithLabelValues(reason, tenantID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(reason, tenantID).Add(float64(validatedLineSize))
for retentionHours, count := range validationMetrics.lineCountPerRetentionHours {
validation.DiscardedSamples.WithLabelValues(reason, tenantID, retentionHours).Add(float64(count))
validation.DiscardedBytes.WithLabelValues(reason, tenantID, retentionHours).Add(float64(validationMetrics.lineSizePerRetentionHours[retentionHours]))
}

if d.usageTracker != nil {
for _, stream := range req.Streams {
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream)
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream, tenantRetentionHours)
if err != nil {
continue
}
Expand Down Expand Up @@ -1157,7 +1161,7 @@ type labelData struct {
hash uint64
}

func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream logproto.Stream) (labels.Labels, string, uint64, error) {
func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream logproto.Stream, tenantRetentionHours string) (labels.Labels, string, uint64, error) {
if val, ok := d.labelCache.Get(key); ok {
return val.ls, val.ls.String(), val.hash, nil
}
Expand All @@ -1167,7 +1171,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,
return nil, "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
}

if err := d.validator.ValidateLabels(vContext, ls, stream); err != nil {
if err := d.validator.ValidateLabels(vContext, ls, stream, tenantRetentionHours); err != nil {
return nil, "", 0, err
}

Expand Down Expand Up @@ -1260,3 +1264,25 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l
func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}

type validationMetrics struct {
lineSizePerRetentionHours map[string]int
lineCountPerRetentionHours map[string]int
lineSize int
lineCount int
}

func newValidationMetrics() validationMetrics {
return validationMetrics{
lineSizePerRetentionHours: make(map[string]int),
lineCountPerRetentionHours: make(map[string]int),
}
}

func (v *validationMetrics) compute(entry logproto.Entry, retentionHours string) {
totalEntrySize := util.EntryTotalSize(&entry)
v.lineSizePerRetentionHours[retentionHours] += totalEntrySize
v.lineCountPerRetentionHours[retentionHours]++
v.lineSize += totalEntrySize
v.lineCount++
}
7 changes: 5 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
fe "github.com/grafana/loki/v3/pkg/util/flagext"
loki_flagext "github.com/grafana/loki/v3/pkg/util/flagext"
Expand Down Expand Up @@ -1226,14 +1227,15 @@ func BenchmarkShardStream(b *testing.B) {
func Benchmark_SortLabelsOnPush(b *testing.B) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
retentionHours := util.RetentionHours(time.Duration(limits.RetentionPeriod))
distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil)
d := distributors[0]
request := makeWriteRequest(10, 10)
vCtx := d.validator.getValidationContextForTime(testTime, "123")
for n := 0; n < b.N; n++ {
stream := request.Streams[0]
stream.Labels = `{buzz="f", a="b"}`
_, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, stream)
_, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, stream, retentionHours)
if err != nil {
panic("parseStreamLabels fail,err:" + err.Error())
}
Expand Down Expand Up @@ -1273,6 +1275,7 @@ func TestParseStreamLabels(t *testing.T) {
},
} {
limits := tc.generateLimits()
retentionHours := util.RetentionHours(time.Duration(limits.RetentionPeriod))
distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil)
d := distributors[0]

Expand All @@ -1281,7 +1284,7 @@ func TestParseStreamLabels(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
lbs, lbsString, hash, err := d.parseStreamLabels(vCtx, tc.origLabels, logproto.Stream{
Labels: tc.origLabels,
})
}, retentionHours)
if tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, err)
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/distributor/shardstreams"
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/prometheus/prometheus/model/labels"
)

// Limits is an interface for distributor limits/related configs
Expand Down Expand Up @@ -35,6 +36,7 @@ type Limits interface {
MaxStructuredMetadataSize(userID string) int
MaxStructuredMetadataCount(userID string) int
OTLPConfig(userID string) push.OTLPConfig
RetentionHours(userID string, labels labels.Labels) string

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
Expand Down
44 changes: 22 additions & 22 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
}

// ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly.
func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry) error {
func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, labels labels.Labels, entry logproto.Entry, retentionHours string) error {
ts := entry.Timestamp.UnixNano()
validation.LineLengthHist.Observe(float64(len(entry.Line)))
structuredMetadataCount := len(entry.StructuredMetadata)
Expand All @@ -97,8 +97,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la
// Makes time string on the error message formatted consistently.
formatedEntryTime := entry.Timestamp.Format(timeFormat)
formatedRejectMaxAgeTime := time.Unix(0, vCtx.rejectOldSampleMaxAge).Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID).Add(entrySize)
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, vCtx.userID, retentionHours).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.GreaterThanMaxSampleAge, labels, entrySize)
}
Expand All @@ -107,8 +107,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la

if ts > vCtx.creationGracePeriod {
formatedEntryTime := entry.Timestamp.Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID).Add(entrySize)
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, vCtx.userID, retentionHours).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.TooFarInFuture, labels, entrySize)
}
Expand All @@ -120,8 +120,8 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID).Add(entrySize)
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, vCtx.userID, retentionHours).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.LineTooLong, labels, entrySize)
}
Expand All @@ -130,26 +130,26 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la

if structuredMetadataCount > 0 {
if !vCtx.allowStructuredMetadata {
validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID).Add(entrySize)
validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours).Inc()
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, vCtx.userID, retentionHours).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.DisallowedStructuredMetadata, labels, entrySize)
}
return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels)
}

if maxSize := vCtx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID).Add(entrySize)
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, vCtx.userID, retentionHours).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooLarge, labels, entrySize)
}
return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, vCtx.maxStructuredMetadataSize)
}

if maxCount := vCtx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID).Add(entrySize)
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, vCtx.userID, retentionHours).Add(entrySize)
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx, vCtx.userID, validation.StructuredMetadataTooMany, labels, entrySize)
}
Expand All @@ -161,9 +161,9 @@ func (v Validator) ValidateEntry(ctx context.Context, vCtx validationContext, la
}

// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream) error {
func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, stream logproto.Stream, retentionHours string) error {
if len(ls) == 0 {
validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID).Inc()
validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID, retentionHours).Inc()
return fmt.Errorf(validation.MissingLabelsErrorMsg)
}

Expand All @@ -180,20 +180,20 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea
}

if numLabelNames > ctx.maxLabelNamesPerSeries {
updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream)
updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream, retentionHours)
return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries)
}

lastLabelName := ""
for _, l := range ls {
if len(l.Name) > ctx.maxLabelNameLength {
updateMetrics(validation.LabelNameTooLong, ctx.userID, stream)
updateMetrics(validation.LabelNameTooLong, ctx.userID, stream, retentionHours)
return fmt.Errorf(validation.LabelNameTooLongErrorMsg, stream.Labels, l.Name)
} else if len(l.Value) > ctx.maxLabelValueLength {
updateMetrics(validation.LabelValueTooLong, ctx.userID, stream)
updateMetrics(validation.LabelValueTooLong, ctx.userID, stream, retentionHours)
return fmt.Errorf(validation.LabelValueTooLongErrorMsg, stream.Labels, l.Value)
} else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 {
updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream)
updateMetrics(validation.DuplicateLabelNames, ctx.userID, stream, retentionHours)
return fmt.Errorf(validation.DuplicateLabelNamesErrorMsg, stream.Labels, l.Name)
}
lastLabelName = l.Name
Expand All @@ -210,8 +210,8 @@ func (v Validator) ShouldBlockIngestion(ctx validationContext, now time.Time) (b
return now.Before(ctx.blockIngestionUntil), ctx.blockIngestionUntil, ctx.blockIngestionStatusCode
}

func updateMetrics(reason, userID string, stream logproto.Stream) {
validation.DiscardedSamples.WithLabelValues(reason, userID).Add(float64(len(stream.Entries)))
func updateMetrics(reason, userID string, stream logproto.Stream, retentionHours string) {
validation.DiscardedSamples.WithLabelValues(reason, userID, retentionHours).Add(float64(len(stream.Entries)))
bytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(reason, userID).Add(float64(bytes))
validation.DiscardedBytes.WithLabelValues(reason, userID, retentionHours).Add(float64(bytes))
}
Loading

0 comments on commit 87db09e

Please sign in to comment.