Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Jan 15, 2025
1 parent b13711c commit c8037f7
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 28 deletions.
6 changes: 4 additions & 2 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,10 @@ func Test_SeriesIterator(t *testing.T) {
require.NoError(t, err)

limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
wfm := newWriteFailuresManager(limits)

for i := 0; i < 3; i++ {
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), wfm, nil)
require.Nil(t, err)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
Expand Down Expand Up @@ -507,9 +508,10 @@ func Benchmark_SeriesIterator(b *testing.B) {
require.NoError(b, err)

limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
wfm := newWriteFailuresManager(limits)

for i := range instances {
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), wfm, nil)

require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{
Expand Down
52 changes: 32 additions & 20 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,31 @@ import (
"testing"
"time"

"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/httpreq"

"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"

"github.com/grafana/loki/v3/pkg/logql/log"

gokitlog "github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/distributor/shardstreams"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/querier/astmapper"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/httpreq"
)

func defaultConfig() *Config {
Expand Down Expand Up @@ -78,8 +78,9 @@ func TestLabelsCollisions(t *testing.T) {
limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
wfm := newWriteFailuresManager(limits)

i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil)
require.Nil(t, err)

// avoid entries from the future.
Expand All @@ -106,8 +107,9 @@ func TestConcurrentPushes(t *testing.T) {
limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
wfm := newWriteFailuresManager(limits)

inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil)
require.Nil(t, err)

const (
Expand Down Expand Up @@ -158,8 +160,9 @@ func TestGetStreamRates(t *testing.T) {
limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
wfm := newWriteFailuresManager(limits)

inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil)
require.NoError(t, err)

const (
Expand Down Expand Up @@ -245,6 +248,7 @@ func TestSyncPeriod(t *testing.T) {
limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
wfm := newWriteFailuresManager(limits)

const (
syncPeriod = 1 * time.Minute
Expand All @@ -253,7 +257,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)

inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil)
require.Nil(t, err)

lbls := makeRandomLabels()
Expand Down Expand Up @@ -290,6 +294,8 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
wfm := newWriteFailuresManager(limits)

indexShards := 2

// just some random values
Expand All @@ -298,7 +304,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
cfg.SyncMinUtilization = 0.20
cfg.IndexShards = indexShards

instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
instance, err := newInstance(cfg, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil)
require.Nil(t, err)

currentTime := time.Now()
Expand Down Expand Up @@ -507,8 +513,9 @@ func Benchmark_PushInstance(b *testing.B) {
limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
wfm := newWriteFailuresManager(limits)

i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil)
ctx := context.Background()

for n := 0; n < b.N; n++ {
Expand Down Expand Up @@ -549,10 +556,11 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
limits, err := runtime.NewOverrides(l, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
wfm := newWriteFailuresManager(limits)

ctx := context.Background()

inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
inst, _ := newInstance(&Config{}, defaultPeriodConfigs, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil)
expr, err := syntax.ParseLogSelector(`{namespace="foo",pod="bar",instance=~"10.*"}`, true)
require.NoError(b, err)
t, err := newTailer("foo", expr, nil, 10)
Expand Down Expand Up @@ -1089,6 +1097,7 @@ func TestStreamShardingUsage(t *testing.T) {
require.NoError(t, err)

limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
wfm := newWriteFailuresManager(limits)

defaultShardStreamsCfg := limiter.limits.ShardStreams("fake")
tenantShardStreamsCfg := limiter.limits.ShardStreams(customTenant1)
Expand All @@ -1107,10 +1116,9 @@ func TestStreamShardingUsage(t *testing.T) {

t.Run("invalid push returns error", func(t *testing.T) {
tracker := &mockUsageTracker{}
i, _ := newInstance(&Config{IndexShards: 1, OwnedStreamsCheckInterval: 1 * time.Second}, defaultPeriodConfigs, customTenant1, limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, tracker)

i, _ := newInstance(&Config{IndexShards: 1, OwnedStreamsCheckInterval: 1 * time.Second}, defaultPeriodConfigs, customTenant1, limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, tracker)
ctx := context.Background()

err = i.Push(ctx, &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand All @@ -1128,9 +1136,9 @@ func TestStreamShardingUsage(t *testing.T) {
})

t.Run("valid push returns no error", func(t *testing.T) {
i, _ := newInstance(&Config{IndexShards: 1, OwnedStreamsCheckInterval: 1 * time.Second}, defaultPeriodConfigs, customTenant2, limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
ctx := context.Background()
i, _ := newInstance(&Config{IndexShards: 1, OwnedStreamsCheckInterval: 1 * time.Second}, defaultPeriodConfigs, customTenant2, limiter, noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), wfm, nil)

ctx := context.Background()
err = i.Push(ctx, &logproto.PushRequest{
Streams: []logproto.Stream{
{
Expand Down Expand Up @@ -1461,7 +1469,7 @@ func defaultInstance(t *testing.T) *instance {
nil,
nil,
NewStreamRateCalculator(),
nil,
newWriteFailuresManager(overrides),
nil,
)
require.Nil(t, err)
Expand Down Expand Up @@ -1562,3 +1570,7 @@ func (m *mockUsageTracker) DiscardedBytesAdd(_ context.Context, _ string, _ stri
// ReceivedBytesAdd implements push.UsageTracker.
func (*mockUsageTracker) ReceivedBytesAdd(_ context.Context, _ string, _ time.Duration, _ labels.Labels, _ float64) {
}

func newWriteFailuresManager(limits *runtime.Overrides) *writefailures.Manager {
return writefailures.NewManager(gokitlog.NewNopLogger(), prometheus.NewPedanticRegistry(), writefailures.Cfg{}, limits, "ingester")
}
2 changes: 1 addition & 1 deletion pkg/ingester/recalculate_owned_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
nil,
nil,
NewStreamRateCalculator(),
nil,
newWriteFailuresManager(limits),
nil,
)
require.NoError(t, err)
Expand Down
8 changes: 3 additions & 5 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ type stream struct {

chunkFormat byte
chunkHeadBlockFormat chunkenc.HeadBlockFmt

configs writefailures.Limits
}

type chunkDesc struct {
Expand Down Expand Up @@ -365,13 +363,13 @@ func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry, usa
}

func (s *stream) handleLoggingOfDuplicateEntry(entry logproto.Entry) {
if s.configs == nil {
if s.writeFailures == nil {
return
}
if s.configs.LogDuplicateMetrics(s.tenant) {
if s.writeFailures.LogDuplicateMetrics(s.tenant) {
s.metrics.duplicateLogBytesTotal.WithLabelValues(s.tenant).Add(float64(len(entry.Line)))
}
if s.configs.LogDuplicateStreamInfo(s.tenant) {
if s.writeFailures.LogDuplicateStreamInfo(s.tenant) {
errMsg := fmt.Sprintf("duplicate log entry with size=%d at timestamp %s for stream %s", len(entry.Line), entry.Timestamp.Format(time.RFC3339), s.labelsString)
dupErr := errors.New(errMsg)
s.writeFailures.Log(s.tenant, dupErr)
Expand Down
4 changes: 4 additions & 0 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ type fakeLimits struct {
metricAggregationEnabled bool
}

func (f *fakeLimits) LogPushRequestStreams(_ string) bool {
return false
}

func (f *fakeLimits) PatternIngesterTokenizableJSONFields(_ string) []string {
return []string{"log", "message", "msg", "msg_", "_msg", "content"}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/runtime/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ type Limits struct {
S3SSEKMSKeyID string `yaml:"s3_sse_kms_key_id" json:"s3_sse_kms_key_id" doc:"nocli|description=S3 server-side encryption KMS Key ID. Ignored if the SSE type override is not set."`
S3SSEKMSEncryptionContext string `yaml:"s3_sse_kms_encryption_context" json:"s3_sse_kms_encryption_context" doc:"nocli|description=S3 server-side encryption KMS encryption context. If unset and the key ID override is set, the encryption context will not be provided to S3. Ignored if the SSE type override is not set."`

// Per-tenant overrides that moved from operations_config to limits_config
LogStreamCreation bool `yaml:"log_stream_creation" json:"log_stream_creation"`
LogPushRequest bool `yaml:"log_push_request" json:"log_stream_request"`
LogPushRequestStreams bool `yaml:"log_push_request_streams" json:"log_push_request_streams"`
Expand Down

0 comments on commit c8037f7

Please sign in to comment.