diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 96d6f89d892fc..05de421b7d47b 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -459,9 +459,10 @@ func Test_SeriesIterator(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + wfm := newWriteFailuresManager(limits) for i := 0; i < 3; i++ { - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), wfm, nil) require.Nil(t, err) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}})) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}})) @@ -507,9 +508,10 @@ func Benchmark_SeriesIterator(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + wfm := newWriteFailuresManager(limits) for i := range instances { - inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil) + inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), wfm, nil) require.NoError(b, inst.Push(context.Background(), &logproto.PushRequest{ diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index be04bb13d4709..cf3b51c60a7c2 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -10,23 +10,21 @@ import ( "testing" "time" - "github.com/grafana/loki/v3/pkg/storage/types" - "github.com/grafana/loki/v3/pkg/util/httpreq" - - "github.com/grafana/dskit/tenant" - "github.com/grafana/dskit/user" - - "github.com/grafana/loki/v3/pkg/logql/log" - + gokitlog "github.com/go-kit/log" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/tenant" + "github.com/grafana/dskit/user" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/distributor/shardstreams" + "github.com/grafana/loki/v3/pkg/distributor/writefailures" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" + "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/querier/astmapper" "github.com/grafana/loki/v3/pkg/querier/plan" @@ -34,7 +32,9 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume" + "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/util/constants" + "github.com/grafana/loki/v3/pkg/util/httpreq" ) func defaultConfig() *Config { @@ -78,8 +78,9 @@ func TestLabelsCollisions(t *testing.T) { limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + wfm := newWriteFailuresManager(limits) - i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) + i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil) require.Nil(t, err) // avoid entries from the future. @@ -106,8 +107,9 @@ func TestConcurrentPushes(t *testing.T) { limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + wfm := newWriteFailuresManager(limits) - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil) require.Nil(t, err) const ( @@ -158,8 +160,9 @@ func TestGetStreamRates(t *testing.T) { limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + wfm := newWriteFailuresManager(limits) - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil) require.NoError(t, err) const ( @@ -245,6 +248,7 @@ func TestSyncPeriod(t *testing.T) { limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + wfm := newWriteFailuresManager(limits) const ( syncPeriod = 1 * time.Minute @@ -253,7 +257,7 @@ func TestSyncPeriod(t *testing.T) { minUtil = 0.20 ) - inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) + inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil) require.Nil(t, err) lbls := makeRandomLabels() @@ -290,6 +294,8 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + wfm := newWriteFailuresManager(limits) + indexShards := 2 // just some random values @@ -298,7 +304,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) { cfg.SyncMinUtilization = 0.20 cfg.IndexShards = indexShards - instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) + instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil) require.Nil(t, err) currentTime := time.Now() @@ -507,8 +513,9 @@ func Benchmark_PushInstance(b *testing.B) { limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + wfm := newWriteFailuresManager(limits) - i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) + i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil) ctx := context.Background() for n := 0; n < b.N; n++ { @@ -549,10 +556,11 @@ func Benchmark_instance_addNewTailer(b *testing.B) { limits, err := runtime.NewOverrides(l, nil) require.NoError(b, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + wfm := newWriteFailuresManager(limits) ctx := context.Background() - inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) + inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil) expr, err := syntax.ParseLogSelector(`{namespace="foo",pod="bar",instance=~"10.*"}`, true) require.NoError(b, err) t, err := newTailer("foo", expr, nil, 10) @@ -1089,6 +1097,7 @@ func TestStreamShardingUsage(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits}) + wfm := newWriteFailuresManager(limits) defaultShardStreamsCfg := limiter.limits.ShardStreams("fake") tenantShardStreamsCfg := limiter.limits.ShardStreams(customTenant1) @@ -1107,10 +1116,9 @@ func TestStreamShardingUsage(t *testing.T) { t.Run("invalid push returns error", func(t *testing.T) { tracker := &mockUsageTracker{} + i, _ := newInstance(&Config{IndexShards: 1, OwnedStreamsCheckInterval: 1 * time.Second}, defaultPeriodConfigs, customTenant1, limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, tracker) - i, _ := newInstance(&Config{IndexShards: 1, OwnedStreamsCheckInterval: 1 * time.Second}, defaultPeriodConfigs, customTenant1, limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, tracker) ctx := context.Background() - err = i.Push(ctx, &logproto.PushRequest{ Streams: []logproto.Stream{ { @@ -1128,9 +1136,9 @@ func TestStreamShardingUsage(t *testing.T) { }) t.Run("valid push returns no error", func(t *testing.T) { - i, _ := newInstance(&Config{IndexShards: 1, OwnedStreamsCheckInterval: 1 * time.Second}, defaultPeriodConfigs, customTenant2, limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil) - ctx := context.Background() + i, _ := newInstance(&Config{IndexShards: 1, OwnedStreamsCheckInterval: 1 * time.Second}, defaultPeriodConfigs, customTenant2, limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil) + ctx := context.Background() err = i.Push(ctx, &logproto.PushRequest{ Streams: []logproto.Stream{ { @@ -1461,7 +1469,7 @@ func defaultInstance(t *testing.T) *instance { nil, nil, NewStreamRateCalculator(), - nil, + newWriteFailuresManager(overrides), nil, ) require.Nil(t, err) @@ -1562,3 +1570,7 @@ func (m *mockUsageTracker) DiscardedBytesAdd(_ context.Context, _ string, _ stri // ReceivedBytesAdd implements push.UsageTracker. func (*mockUsageTracker) ReceivedBytesAdd(_ context.Context, _ string, _ time.Duration, _ labels.Labels, _ float64) { } + +func newWriteFailuresManager(limits *runtime.Overrides) *writefailures.Manager { + return writefailures.NewManager(gokitlog.NewNopLogger(), prometheus.NewPedanticRegistry(), writefailures.Cfg{}, limits, "ingester") +} diff --git a/pkg/ingester/recalculate_owned_streams_test.go b/pkg/ingester/recalculate_owned_streams_test.go index 692068a54062d..e89a593bd880b 100644 --- a/pkg/ingester/recalculate_owned_streams_test.go +++ b/pkg/ingester/recalculate_owned_streams_test.go @@ -83,7 +83,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) nil, nil, NewStreamRateCalculator(), - nil, + newWriteFailuresManager(limits), nil, ) require.NoError(t, err) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index e3e1b01cfe278..077ccb53aef26 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -79,8 +79,6 @@ type stream struct { chunkFormat byte chunkHeadBlockFormat chunkenc.HeadBlockFmt - - configs writefailures.Limits } type chunkDesc struct { @@ -365,13 +363,13 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa } func (s *stream) handleLoggingOfDuplicateEntry(entry logproto.Entry) { - if s.configs == nil { + if s.writeFailures == nil { return } - if s.configs.LogDuplicateMetrics(s.tenant) { + if s.writeFailures.LogDuplicateMetrics(s.tenant) { s.metrics.duplicateLogBytesTotal.WithLabelValues(s.tenant).Add(float64(len(entry.Line))) } - if s.configs.LogDuplicateStreamInfo(s.tenant) { + if s.writeFailures.LogDuplicateStreamInfo(s.tenant) { errMsg := fmt.Sprintf("duplicate log entry with size=%d at timestamp %s for stream %s", len(entry.Line), entry.Timestamp.Format(time.RFC3339), s.labelsString) dupErr := errors.New(errMsg) s.writeFailures.Log(s.tenant, dupErr) diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index 0b1404fe544a8..96ff088d65d4e 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -344,6 +344,10 @@ type fakeLimits struct { metricAggregationEnabled bool } +func (f *fakeLimits) LogPushRequestStreams(_ string) bool { + return false +} + func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string { return []string{"log", "message", "msg", "msg_", "_msg", "content"} } diff --git a/pkg/runtime/limits.go b/pkg/runtime/limits.go index 0502135b6a833..fe5fc0e27d21a 100644 --- a/pkg/runtime/limits.go +++ b/pkg/runtime/limits.go @@ -247,6 +247,7 @@ type Limits struct { S3SSEKMSKeyID string `yaml:"s3_sse_kms_key_id" json:"s3_sse_kms_key_id" doc:"nocli|description=S3 server-side encryption KMS Key ID. Ignored if the SSE type override is not set."` S3SSEKMSEncryptionContext string `yaml:"s3_sse_kms_encryption_context" json:"s3_sse_kms_encryption_context" doc:"nocli|description=S3 server-side encryption KMS encryption context. If unset and the key ID override is set, the encryption context will not be provided to S3. Ignored if the SSE type override is not set."` + // Per-tenant overrides that moved from operations_config to limits_config LogStreamCreation bool `yaml:"log_stream_creation" json:"log_stream_creation"` LogPushRequest bool `yaml:"log_push_request" json:"log_stream_request"` LogPushRequestStreams bool `yaml:"log_push_request_streams" json:"log_push_request_streams"`