Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove bloomcompactor.DayTable in favour of config.DayTime #11917

Merged
merged 2 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ func runWithRetries(

type tenantTable struct {
tenant string
table DayTable
table config.DayTime
ownershipRange v1.FingerprintBounds
}

func (c *Compactor) tenants(ctx context.Context, table DayTable) (v1.Iterator[string], error) {
func (c *Compactor) tenants(ctx context.Context, table config.DayTime) (v1.Iterator[string], error) {
tenants, err := c.tsdbStore.UsersForPeriod(ctx, table)
if err != nil {
return nil, errors.Wrap(err, "getting tenants")
Expand Down Expand Up @@ -214,10 +214,9 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
from := ts.Add(-maxCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
through := ts.Add(-minCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)

fromDay := DayTable(model.TimeFromUnixNano(from))
throughDay := DayTable(model.TimeFromUnixNano(through))
fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
return newDayRangeIterator(fromDay, throughDay)

}

func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
Expand Down Expand Up @@ -295,10 +294,10 @@ func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) erro
}

type dayRangeIterator struct {
min, max, cur DayTable
min, max, cur config.DayTime
}

func newDayRangeIterator(min, max DayTable) *dayRangeIterator {
func newDayRangeIterator(min, max config.DayTime) *dayRangeIterator {
return &dayRangeIterator{min: min, max: max, cur: min.Dec()}
}

Expand All @@ -307,7 +306,7 @@ func (r *dayRangeIterator) Next() bool {
return r.cur.Before(r.max)
}

func (r *dayRangeIterator) At() DayTable {
func (r *dayRangeIterator) At() config.DayTime {
return r.cur
}

Expand Down
38 changes: 0 additions & 38 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"fmt"
"time"

"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
"github.com/grafana/loki/pkg/util/ring"
)
Expand Down Expand Up @@ -70,37 +66,3 @@ type Limits interface {
BloomFalsePositiveRate(tenantID string) float64
BloomCompactorMaxBlockSize(tenantID string) int
}

// TODO(owen-d): Remove this type in favor of config.DayTime
type DayTable model.Time

func (d DayTable) String() string {
return fmt.Sprintf("%d", d.ModelTime().Time().UnixNano()/int64(config.ObjectStorageIndexRequiredPeriod))
}

func (d DayTable) Inc() DayTable {
return DayTable(d.ModelTime().Add(config.ObjectStorageIndexRequiredPeriod))
}

func (d DayTable) Dec() DayTable {
return DayTable(d.ModelTime().Add(-config.ObjectStorageIndexRequiredPeriod))
}

func (d DayTable) Before(other DayTable) bool {
return d.ModelTime().Before(model.Time(other))
}

func (d DayTable) After(other DayTable) bool {
return d.ModelTime().After(model.Time(other))
}

func (d DayTable) ModelTime() model.Time {
return model.Time(d)
}

func (d DayTable) Bounds() bloomshipper.Interval {
return bloomshipper.Interval{
Start: model.Time(d),
End: model.Time(d.Inc()),
}
}
13 changes: 5 additions & 8 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
)
Expand Down Expand Up @@ -54,7 +55,7 @@ func (s *SimpleBloomController) rwFn() (v1.BlockWriter, v1.BlockReader) {

func (s *SimpleBloomController) buildBlocks(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
ownershipRange v1.FingerprintBounds,
) error {
Expand All @@ -77,15 +78,11 @@ func (s *SimpleBloomController) buildBlocks(
}

// 2. Fetch metas
bounds := table.Bounds()
metas, err := s.bloomStore.FetchMetas(
ctx,
bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: bloomshipper.Interval{
Start: bounds.Start,
End: bounds.End,
},
Interval: bloomshipper.NewInterval(table.Bounds()),
Keyspace: ownershipRange,
},
)
Expand Down Expand Up @@ -184,7 +181,7 @@ func (s *SimpleBloomController) buildBlocks(
blockCt++
blk := newBlocks.At()

built, err := bloomshipper.BlockFrom(tenant, table.String(), blk)
built, err := bloomshipper.BlockFrom(tenant, table.Table(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
return errors.Wrap(err, "failed to build block")
Expand Down Expand Up @@ -222,7 +219,7 @@ func (s *SimpleBloomController) buildBlocks(

func (s *SimpleBloomController) loadWorkForGap(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
Expand Down
34 changes: 17 additions & 17 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ const (
)

type TSDBStore interface {
UsersForPeriod(ctx context.Context, table DayTable) ([]string, error)
ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
LoadTSDB(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand All @@ -49,13 +49,13 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore {
}
}

func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table DayTable) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.String(), true) // bypass cache for ease of testing
func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.Table(), true) // bypass cache for ease of testing
return users, err
}

func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.String(), tenant, true) // bypass cache for ease of testing
func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.Table(), tenant, true) // bypass cache for ease of testing
if err != nil {
return nil, errors.Wrap(err, "failed to list user files")
}
Expand All @@ -80,14 +80,14 @@ func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table DayTable, tenan

func (b *BloomTSDBStore) LoadTSDB(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.CloseableIterator[*v1.Series], error) {
withCompression := id.Name() + gzipExtension

data, err := b.storage.GetUserFile(ctx, table.String(), tenant, withCompression)
data, err := b.storage.GetUserFile(ctx, table.Table(), tenant, withCompression)
if err != nil {
return nil, errors.Wrap(err, "failed to get file")
}
Expand Down Expand Up @@ -244,11 +244,11 @@ func NewTSDBStores(
return res, nil
}

func (s *TSDBStores) storeForPeriod(table DayTable) (TSDBStore, error) {
func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) {
for i := len(s.schemaCfg.Configs) - 1; i >= 0; i-- {
period := s.schemaCfg.Configs[i]

if !table.Before(DayTable(period.From.Time)) {
if !table.Before(period.From) {
// we have the desired period config

if s.stores[i] != nil {
Expand All @@ -260,19 +260,19 @@ func (s *TSDBStores) storeForPeriod(table DayTable) (TSDBStore, error) {
return nil, errors.Errorf(
"store for period is not of TSDB type (%s) while looking up store for (%v)",
period.IndexType,
table.ModelTime().Time(),
table,
)
}

}

return nil, fmt.Errorf(
"There is no store matching no matching period found for table (%v) -- too early",
table.ModelTime().Time(),
"there is no store matching no matching period found for table (%v) -- too early",
table,
)
}

func (s *TSDBStores) UsersForPeriod(ctx context.Context, table DayTable) ([]string, error) {
func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
store, err := s.storeForPeriod(table)
if err != nil {
return nil, err
Expand All @@ -281,7 +281,7 @@ func (s *TSDBStores) UsersForPeriod(ctx context.Context, table DayTable) ([]stri
return store.UsersForPeriod(ctx, table)
}

func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
store, err := s.storeForPeriod(table)
if err != nil {
return nil, err
Expand All @@ -292,7 +292,7 @@ func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table DayTable, tenant st

func (s *TSDBStores) LoadTSDB(
ctx context.Context,
table DayTable,
table config.DayTime,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
tasksCh := make(chan Task, len(tasks))
for _, task := range tasks {
task := task
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "day", task.day, "series", len(task.series))
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))
g.queue.Enqueue(tenantID, []string{}, task, func() {
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
Expand Down
7 changes: 4 additions & 3 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
)

const (
Expand Down Expand Up @@ -69,7 +70,7 @@ type Task struct {
ctx context.Context

// TODO(chaudum): Investigate how to remove that.
day model.Time
table config.DayTime
}

// NewTask returns a new Task that can be enqueued to the task queue.
Expand All @@ -89,7 +90,7 @@ func NewTask(ctx context.Context, tenantID string, refs seriesWithBounds, filter
filters: filters,
series: refs.series,
bounds: refs.bounds,
day: refs.day,
table: refs.table,
ctx: ctx,
done: make(chan struct{}),
responses: make([]v1.Output, 0, len(refs.series)),
Expand Down Expand Up @@ -129,7 +130,7 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
filters: t.filters,
series: series,
bounds: t.bounds,
day: t.day,
table: t.table,
ctx: t.ctx,
done: make(chan struct{}),
responses: make([]v1.Output, 0, len(series)),
Expand Down
10 changes: 5 additions & 5 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

Expand All @@ -35,10 +35,9 @@ type processor struct {
}

func (p *processor) run(ctx context.Context, tasks []Task) error {
for ts, tasks := range group(tasks, func(t Task) model.Time { return t.day }) {
interval := bloomshipper.NewInterval(ts, ts.Add(Day))
for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) {
tenant := tasks[0].Tenant
err := p.processTasks(ctx, tenant, interval, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
err := p.processTasks(ctx, tenant, ts, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
if err != nil {
for _, task := range tasks {
task.CloseWithError(err)
Expand All @@ -52,8 +51,9 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {
return nil
}

func (p *processor) processTasks(ctx context.Context, tenant string, interval bloomshipper.Interval, keyspaces []v1.FingerprintBounds, tasks []Task) error {
func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, keyspaces []v1.FingerprintBounds, tasks []Task) error {
minFpRange, maxFpRange := getFirstLast(keyspaces)
interval := bloomshipper.NewInterval(day.Bounds())
metaSearch := bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: interval,
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util/constants"
)
Expand Down Expand Up @@ -109,7 +110,7 @@ func TestProcessor(t *testing.T) {
Start: now.Add(-1 * time.Hour),
End: now,
},
day: truncateDay(now),
table: config.NewDayTime(truncateDay(now)),
}
filters := []syntax.LineFilter{
{Ty: 0, Match: "no match"},
Expand Down Expand Up @@ -153,7 +154,7 @@ func TestProcessor(t *testing.T) {
Start: now.Add(-1 * time.Hour),
End: now,
},
day: truncateDay(now),
table: config.NewDayTime(truncateDay(now)),
}
filters := []syntax.LineFilter{
{Ty: 0, Match: "no match"},
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

Expand Down Expand Up @@ -121,7 +122,7 @@ func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (re

type seriesWithBounds struct {
bounds model.Interval
day model.Time
table config.DayTime
series []*logproto.GroupedChunkRefs
}

Expand Down Expand Up @@ -173,7 +174,7 @@ func partitionRequest(req *logproto.FilterChunkRefRequest) []seriesWithBounds {
Start: minTs,
End: maxTs,
},
day: day,
table: config.NewDayTime(day),
series: res,
})
}
Expand Down
Loading
Loading