Skip to content

Commit

Permalink
Replace bloomcompactor.DayTable with config.DayTime
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Feb 12, 2024
1 parent 0468c60 commit 85ed7b2
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 92 deletions.
12 changes: 6 additions & 6 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ func runWithRetries(

type tenantTable struct {
tenant string
table DayTable
table config.DayTime
ownershipRange v1.FingerprintBounds
}

func (c *Compactor) tenants(ctx context.Context, table DayTable) (v1.Iterator[string], error) {
func (c *Compactor) tenants(ctx context.Context, table config.DayTime) (v1.Iterator[string], error) {
tenants, err := c.tsdbStore.UsersForPeriod(ctx, table)
if err != nil {
return nil, errors.Wrap(err, "getting tenants")
Expand Down Expand Up @@ -207,7 +207,7 @@ func (c *Compactor) runOne(ctx context.Context) error {
func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
from := model.TimeFromUnixNano(ts.Add(-c.cfg.MaxCompactionAge).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod))
through := model.TimeFromUnixNano(ts.Add(-c.cfg.MinCompactionAge).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod))
return newDayRangeIterator(DayTable(from), DayTable(through))
return newDayRangeIterator(config.NewDayTime(from), config.NewDayTime(through))
}

func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
Expand Down Expand Up @@ -285,10 +285,10 @@ func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) erro
}

type dayRangeIterator struct {
min, max, cur DayTable
min, max, cur config.DayTime
}

func newDayRangeIterator(min, max DayTable) *dayRangeIterator {
func newDayRangeIterator(min, max config.DayTime) *dayRangeIterator {
return &dayRangeIterator{min: min, max: max, cur: min.Dec()}
}

Expand All @@ -297,7 +297,7 @@ func (r *dayRangeIterator) Next() bool {
return r.cur.Before(r.max)
}

func (r *dayRangeIterator) At() DayTable {
func (r *dayRangeIterator) At() config.DayTime {
return r.cur
}

Expand Down
39 changes: 0 additions & 39 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,8 @@ package bloomcompactor

import (
"flag"
"fmt"
"time"

"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
"github.com/grafana/loki/pkg/util/ring"
)
Expand Down Expand Up @@ -65,37 +60,3 @@ type Limits interface {
BloomFalsePositiveRate(tenantID string) float64
BloomCompactorMaxBlockSize(tenantID string) int
}

// TODO(owen-d): Remove this type in favor of config.DayTime
type DayTable model.Time

func (d DayTable) String() string {
return fmt.Sprintf("%d", d.ModelTime().Time().UnixNano()/int64(config.ObjectStorageIndexRequiredPeriod))
}

func (d DayTable) Inc() DayTable {
return DayTable(d.ModelTime().Add(config.ObjectStorageIndexRequiredPeriod))
}

func (d DayTable) Dec() DayTable {
return DayTable(d.ModelTime().Add(-config.ObjectStorageIndexRequiredPeriod))
}

func (d DayTable) Before(other DayTable) bool {
return d.ModelTime().Before(model.Time(other))
}

func (d DayTable) After(other DayTable) bool {
return d.ModelTime().After(model.Time(other))
}

func (d DayTable) ModelTime() model.Time {
return model.Time(d)
}

func (d DayTable) Bounds() bloomshipper.Interval {
return bloomshipper.Interval{
Start: model.Time(d),
End: model.Time(d.Inc()),
}
}
13 changes: 5 additions & 8 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
)
Expand Down Expand Up @@ -54,7 +55,7 @@ func (s *SimpleBloomController) rwFn() (v1.BlockWriter, v1.BlockReader) {

func (s *SimpleBloomController) buildBlocks(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
ownershipRange v1.FingerprintBounds,
) error {
Expand All @@ -77,15 +78,11 @@ func (s *SimpleBloomController) buildBlocks(
}

// 2. Fetch metas
bounds := table.Bounds()
metas, err := s.bloomStore.FetchMetas(
ctx,
bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: bloomshipper.Interval{
Start: bounds.Start,
End: bounds.End,
},
Interval: bloomshipper.NewInterval(table.Bounds()),
Keyspace: ownershipRange,
},
)
Expand Down Expand Up @@ -185,7 +182,7 @@ func (s *SimpleBloomController) buildBlocks(

if err := client.PutBlock(
ctx,
bloomshipper.BlockFrom(tenant, table.String(), blk),
bloomshipper.BlockFrom(tenant, table.Table(), blk),
); err != nil {
level.Error(logger).Log("msg", "failed to write block", "err", err)
closePreExistingBlocks()
Expand Down Expand Up @@ -215,7 +212,7 @@ func (s *SimpleBloomController) buildBlocks(

func (s *SimpleBloomController) loadWorkForGap(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
Expand Down
34 changes: 17 additions & 17 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ const (
)

type TSDBStore interface {
UsersForPeriod(ctx context.Context, table DayTable) ([]string, error)
ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
LoadTSDB(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand All @@ -48,13 +48,13 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore {
}
}

func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table DayTable) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.String(), false)
func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.Table(), false)
return users, err
}

func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.String(), tenant, false)
func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.Table(), tenant, false)
if err != nil {
return nil, errors.Wrap(err, "failed to list user files")
}
Expand All @@ -79,12 +79,12 @@ func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table DayTable, tenan

func (b *BloomTSDBStore) LoadTSDB(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.CloseableIterator[*v1.Series], error) {
data, err := b.storage.GetUserFile(ctx, table.String(), tenant, id.Name())
data, err := b.storage.GetUserFile(ctx, table.Table(), tenant, id.Name())
if err != nil {
return nil, errors.Wrap(err, "failed to get file")
}
Expand Down Expand Up @@ -233,11 +233,11 @@ func NewTSDBStores(
return res, nil
}

func (s *TSDBStores) storeForPeriod(table DayTable) (TSDBStore, error) {
func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) {
for i := len(s.schemaCfg.Configs) - 1; i >= 0; i-- {
period := s.schemaCfg.Configs[i]

if !table.Before(DayTable(period.From.Time)) {
if !table.Before(period.From) {
// we have the desired period config

if s.stores[i] != nil {
Expand All @@ -249,19 +249,19 @@ func (s *TSDBStores) storeForPeriod(table DayTable) (TSDBStore, error) {
return nil, errors.Errorf(
"store for period is not of TSDB type (%s) while looking up store for (%v)",
period.IndexType,
table.ModelTime().Time(),
table,
)
}

}

return nil, fmt.Errorf(
"There is no store matching no matching period found for table (%v) -- too early",
table.ModelTime().Time(),
"there is no store matching no matching period found for table (%v) -- too early",
table,
)
}

func (s *TSDBStores) UsersForPeriod(ctx context.Context, table DayTable) ([]string, error) {
func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
store, err := s.storeForPeriod(table)
if err != nil {
return nil, err
Expand All @@ -270,7 +270,7 @@ func (s *TSDBStores) UsersForPeriod(ctx context.Context, table DayTable) ([]stri
return store.UsersForPeriod(ctx, table)
}

func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
store, err := s.storeForPeriod(table)
if err != nil {
return nil, err
Expand All @@ -281,7 +281,7 @@ func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table DayTable, tenant st

func (s *TSDBStores) LoadTSDB(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/bloomcompactor"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
)

const (
Expand Down Expand Up @@ -70,7 +70,7 @@ type Task struct {
ctx context.Context

// TODO(chaudum): Investigate how to remove that.
table bloomcompactor.DayTable
table config.DayTime
}

// NewTask returns a new Task that can be enqueued to the task queue.
Expand Down
11 changes: 6 additions & 5 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (

"github.com/go-kit/log"

"github.com/grafana/loki/pkg/bloomcompactor"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

Expand All @@ -35,7 +35,7 @@ type processor struct {
}

func (p *processor) run(ctx context.Context, tasks []Task) error {
for ts, tasks := range group(tasks, func(t Task) bloomcompactor.DayTable { return t.table }) {
for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) {
tenant := tasks[0].Tenant
err := p.processTasks(ctx, tenant, ts, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
if err != nil {
Expand All @@ -51,11 +51,12 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {
return nil
}

func (p *processor) processTasks(ctx context.Context, tenant string, day bloomcompactor.DayTable, keyspaces []v1.FingerprintBounds, tasks []Task) error {
func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, keyspaces []v1.FingerprintBounds, tasks []Task) error {
minFpRange, maxFpRange := getFirstLast(keyspaces)
interval := bloomshipper.NewInterval(day.Bounds())
metaSearch := bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: day.Bounds(),
Interval: interval,
Keyspace: v1.FingerprintBounds{Min: minFpRange.Min, Max: maxFpRange.Max},
}
metas, err := p.store.FetchMetas(ctx, metaSearch)
Expand All @@ -64,7 +65,7 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day bloomco
}
p.metrics.metasFetched.WithLabelValues(p.id).Observe(float64(len(metas)))

blocksRefs := bloomshipper.BlocksForMetas(metas, day.Bounds(), keyspaces)
blocksRefs := bloomshipper.BlocksForMetas(metas, interval, keyspaces)
return p.processBlocks(ctx, partition(tasks, blocksRefs))
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/bloomcompactor"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util/constants"
)
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestProcessor(t *testing.T) {
Start: now.Add(-1 * time.Hour),
End: now,
},
table: bloomcompactor.DayTable(truncateDay(now)),
table: config.NewDayTime(truncateDay(now)),
}
filters := []syntax.LineFilter{
{Ty: 0, Match: "no match"},
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestProcessor(t *testing.T) {
Start: now.Add(-1 * time.Hour),
End: now,
},
table: bloomcompactor.DayTable(truncateDay(now)),
table: config.NewDayTime(truncateDay(now)),
}
filters := []syntax.LineFilter{
{Ty: 0, Match: "no match"},
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/exp/slices"

"github.com/grafana/loki/pkg/bloomcompactor"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

Expand Down Expand Up @@ -122,7 +122,7 @@ func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (re

type seriesWithBounds struct {
bounds model.Interval
table bloomcompactor.DayTable
table config.DayTime
series []*logproto.GroupedChunkRefs
}

Expand Down Expand Up @@ -174,7 +174,7 @@ func partitionRequest(req *logproto.FilterChunkRefRequest) []seriesWithBounds {
Start: minTs,
End: maxTs,
},
table: bloomcompactor.DayTable(day),
table: config.NewDayTime(day),
series: res,
})
}
Expand Down
Loading

0 comments on commit 85ed7b2

Please sign in to comment.