Skip to content

Commit

Permalink
fixup! Merge limits_config and runtime_config
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Jan 15, 2025
1 parent df145c4 commit 79fb5d3
Show file tree
Hide file tree
Showing 45 changed files with 218 additions and 312 deletions.
10 changes: 5 additions & 5 deletions integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"github.com/grafana/loki/v3/integration/util"

"github.com/grafana/loki/v3/pkg/loki"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/util/cfg"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/validation"
)

var configTemplate = template.Must(template.New("").Parse(`
Expand Down Expand Up @@ -509,14 +509,14 @@ func (c *Component) Restart() error {
}

type runtimeConfigValues struct {
TenantLimits map[string]*validation.Limits `yaml:"overrides"`
TenantLimits map[string]*runtime.Limits `yaml:"overrides"`
}

func (c *Component) SetTenantLimits(tenant string, limits validation.Limits) error {
func (c *Component) SetTenantLimits(tenant string, limits runtime.Limits) error {
rcv := runtimeConfigValues{}
rcv.TenantLimits = c.loki.TenantLimits.AllByUserID()
if rcv.TenantLimits == nil {
rcv.TenantLimits = map[string]*validation.Limits{}
rcv.TenantLimits = map[string]*runtime.Limits{}
}
rcv.TenantLimits[tenant] = &limits

Expand All @@ -528,7 +528,7 @@ func (c *Component) SetTenantLimits(tenant string, limits validation.Limits) err
return os.WriteFile(c.overridesFile, config, 0640) // #nosec G306 -- this is fencing off the "other" permissions
}

func (c *Component) GetTenantLimits(tenant string) validation.Limits {
func (c *Component) GetTenantLimits(tenant string) runtime.Limits {
limits := c.loki.TenantLimits.TenantLimits(tenant)
if limits == nil {
return c.loki.Cfg.LimitsConfig
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/planner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/grafana/loki/v3/pkg/bloombuild/planner/queue"
"github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies"
"github.com/grafana/loki/v3/pkg/compactor/retention"
)

// Config configures the bloom-planner component.
Expand Down Expand Up @@ -50,7 +51,7 @@ func (cfg *Config) Validate() error {
}

type Limits interface {
RetentionLimits
retention.Limits
strategies.Limits
BloomCreationEnabled(tenantID string) bool
BloomBuildMaxBuilders(tenantID string) int
Expand Down
15 changes: 4 additions & 11 deletions pkg/bloombuild/planner/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
storageconfig "github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/validation"
)

type RetentionConfig struct {
Expand All @@ -40,16 +40,9 @@ func (cfg *RetentionConfig) Validate() error {
return nil
}

type RetentionLimits interface {
RetentionPeriod(userID string) time.Duration
StreamRetention(userID string) []runtime.StreamRetention
AllByUserID() map[string]*validation.Limits
DefaultLimits() *validation.Limits
}

type RetentionManager struct {
cfg RetentionConfig
limits RetentionLimits
limits retention.Limits
bloomStore bloomshipper.StoreBase
metrics *Metrics
logger log.Logger
Expand All @@ -61,7 +54,7 @@ type RetentionManager struct {

func NewRetentionManager(
cfg RetentionConfig,
limits RetentionLimits,
limits retention.Limits,
bloomStore bloomshipper.StoreBase,
metrics *Metrics,
logger log.Logger,
Expand Down Expand Up @@ -214,7 +207,7 @@ func findLongestRetention(globalRetention time.Duration, streamRetention []runti
return globalRetention
}

func retentionByTenant(limits RetentionLimits) map[string]time.Duration {
func retentionByTenant(limits retention.Limits) map[string]time.Duration {
all := limits.AllByUserID()
if len(all) == 0 {
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/planner/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -420,7 +421,7 @@ func TestFindLongestRetention(t *testing.T) {
func TestSmallestRetention(t *testing.T) {
for _, tc := range []struct {
name string
limits RetentionLimits
limits retention.Limits
expectedRetention time.Duration
expectedHasRetention bool
}{
Expand Down
5 changes: 2 additions & 3 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
"github.com/grafana/loki/v3/pkg/validation"
)

func stringSlice[T fmt.Stringer](s []T) []string {
Expand All @@ -54,8 +53,8 @@ func groupRefs(t *testing.T, chunkRefs []*logproto.ChunkRef) []*logproto.Grouped
return grouped
}

func newLimits() *validation.Overrides {
limits := validation.Limits{}
func newLimits() *runtime.Overrides {
limits := runtime.Limits{}
flagext.DefaultValues(&limits)
limits.BloomGatewayEnabled = true
limits.BloomGatewayShardSize = 1
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ type storeContainer struct {
}

type Limits interface {
runtime.ExportedLimits
deletion.Limits
retention.Limits
DefaultLimits() *runtime.Limits
}

func NewCompactor(cfg Config, objectStoreClients map[config.DayTime]client.ObjectClient, deleteStoreClient client.ObjectClient, schemaConfig config.SchemaConfig, limits Limits, r prometheus.Registerer, metricsNamespace string) (*Compactor, error) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/util/constants"
loki_net "github.com/grafana/loki/v3/pkg/util/net"
"github.com/grafana/loki/v3/pkg/validation"
)

const indexTablePrefix = "table_"
Expand Down Expand Up @@ -55,7 +54,7 @@ func setupTestCompactor(t *testing.T, objectClients map[config.DayTime]client.Ob

require.NoError(t, cfg.Validate())

defaultLimits := validation.Limits{}
defaultLimits := runtime.Limits{}
flagext.DefaultValues(&defaultLimits)
require.NoError(t, defaultLimits.RetentionPeriod.Set("30d"))

Expand Down
3 changes: 1 addition & 2 deletions pkg/compactor/retention/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ type expirationChecker struct {
}

type Limits interface {
runtime.ExportedLimits
RetentionPeriod(userID string) time.Duration
StreamRetention(userID string) []runtime.StreamRetention
AllByUserID() map[string]*runtime.Limits
DefaultLimits() *runtime.Limits
}

func NewExpirationChecker(limits Limits) ExpirationChecker {
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func Test_IncrementTimestamp(t *testing.T) {
}

func Test_MissingEnforcedLabels(t *testing.T) {
limits := &validation.Limits{}
limits := &runtime.Limits{}
flagext.DefaultValues(limits)

limits.EnforcedLabels = []string{"app", "env"}
Expand All @@ -454,7 +454,7 @@ func Test_MissingEnforcedLabels(t *testing.T) {
}

func Test_PushWithEnforcedLabels(t *testing.T) {
limits := &validation.Limits{}
limits := &runtime.Limits{}
flagext.DefaultValues(limits)

// makeWriteRequest only contains a `{foo="bar"}` label.
Expand Down
9 changes: 4 additions & 5 deletions pkg/distributor/field_detection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/grafana/loki/pkg/push"
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/validation"

"github.com/grafana/loki/pkg/push"
)

func Test_DetectLogLevels(t *testing.T) {
setup := func(discoverLogLevels bool) (*validation.Limits, *mockIngester) {
limits := &validation.Limits{}
setup := func(discoverLogLevels bool) (*runtime.Limits, *mockIngester) {
limits := &runtime.Limits{}
flagext.DefaultValues(limits)

limits.DiscoverLogLevels = discoverLogLevels
Expand Down
9 changes: 3 additions & 6 deletions pkg/distributor/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@ import (
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/user"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/runtime"

"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/validation"
)

func TestDistributorRingHandler(t *testing.T) {
Expand Down Expand Up @@ -64,7 +61,7 @@ func TestDistributorRingHandler(t *testing.T) {
}

func TestRequestParserWrapping(t *testing.T) {
limits := &validation.Limits{}
limits := &runtime.Limits{}
flagext.DefaultValues(limits)
limits.RejectOldSamples = false
distributors, _ := prepare(t, 1, 3, limits, nil)
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/ratestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/grafana/loki/v3/pkg/distributor/shardstreams"
"github.com/grafana/loki/v3/pkg/validation"
"github.com/grafana/loki/v3/pkg/runtime"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -338,8 +338,8 @@ type fakeOverrides struct {
enabled bool
}

func (c *fakeOverrides) AllByUserID() map[string]*validation.Limits {
return map[string]*validation.Limits{
func (c *fakeOverrides) AllByUserID() map[string]*runtime.Limits {
return map[string]*runtime.Limits{
"ingester0": {
ShardStreams: shardstreams.Config{
Enabled: c.enabled,
Expand Down
3 changes: 1 addition & 2 deletions pkg/ingester/recalculate_owned_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/grafana/loki/v3/pkg/runtime"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
"github.com/grafana/loki/v3/pkg/validation"
)

func Test_recalculateOwnedStreams_newRecalculateOwnedStreamsIngester(t *testing.T) {
Expand Down Expand Up @@ -65,7 +64,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
},
}

limits, err := runtime.NewOverrides(validation.Limits{
limits, err := runtime.NewOverrides(runtime.Limits{
MaxGlobalStreamsPerUser: 100,
UseOwnedStreamCount: testData.featureEnabled,
}, nil)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestStreamIterator(t *testing.T) {
func TestEntryErrorCorrectlyReported(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.MaxChunkAge = time.Minute
l := validation.Limits{
l := runtime.Limits{
PerStreamRateLimit: 15,
PerStreamRateLimitBurst: 15,
}
Expand Down Expand Up @@ -455,7 +455,7 @@ func TestUnorderedPush(t *testing.T) {
}

func TestPushRateLimit(t *testing.T) {
l := validation.Limits{
l := runtime.Limits{
PerStreamRateLimit: 10,
PerStreamRateLimitBurst: 10,
}
Expand Down Expand Up @@ -494,7 +494,7 @@ func TestPushRateLimit(t *testing.T) {
}

func TestPushRateLimitAllOrNothing(t *testing.T) {
l := validation.Limits{
l := runtime.Limits{
PerStreamRateLimit: 10,
PerStreamRateLimitBurst: 10,
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/ingester/streams_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/validation"
runtime "github.com/grafana/loki/v3/pkg/runtime"
)

func TestStreamsMap(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
limits, err := runtime.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
chunkfmt, headfmt := defaultChunkFormat(t)
Expand All @@ -31,7 +31,6 @@ func TestStreamsMap(t *testing.T) {
NewStreamRateCalculator(),
NilMetrics,
nil,
nil,
),
newStream(
chunkfmt,
Expand All @@ -47,7 +46,6 @@ func TestStreamsMap(t *testing.T) {
NewStreamRateCalculator(),
NilMetrics,
nil,
nil,
),
}
var s *stream
Expand Down
4 changes: 2 additions & 2 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/loki"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/storage"
chunk "github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/config"
Expand All @@ -31,7 +32,6 @@ import (
"github.com/grafana/loki/v3/pkg/util/constants"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/marshal"
"github.com/grafana/loki/v3/pkg/validation"
)

const schemaConfigFilename = "schemaconfig"
Expand Down Expand Up @@ -464,7 +464,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
}

limits, err := validation.NewOverrides(conf.LimitsConfig, nil)
limits, err := runtime.NewOverrides(conf.LimitsConfig, nil)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 79fb5d3

Please sign in to comment.