Skip to content

Commit

Permalink
Use bloomcompactor.DayTable in gateway code
Browse files Browse the repository at this point in the history
Where previously a model.Time was used to define the day of a task.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Feb 12, 2024
1 parent d58031e commit 378a903
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
tasksCh := make(chan Task, len(tasks))
for _, task := range tasks {
task := task
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "day", task.day, "series", len(task.series))
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))
g.queue.Enqueue(tenantID, []string{}, task, func() {
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
Expand Down
7 changes: 4 additions & 3 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/bloomcompactor"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -69,7 +70,7 @@ type Task struct {
ctx context.Context

// TODO(chaudum): Investigate how to remove that.
day model.Time
table bloomcompactor.DayTable
}

// NewTask returns a new Task that can be enqueued to the task queue.
Expand All @@ -89,7 +90,7 @@ func NewTask(ctx context.Context, tenantID string, refs seriesWithBounds, filter
filters: filters,
series: refs.series,
bounds: refs.bounds,
day: refs.day,
table: refs.table,
ctx: ctx,
done: make(chan struct{}),
responses: make([]v1.Output, 0, len(refs.series)),
Expand Down Expand Up @@ -129,7 +130,7 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
filters: t.filters,
series: series,
bounds: t.bounds,
day: t.day,
table: t.table,
ctx: t.ctx,
done: make(chan struct{}),
responses: make([]v1.Output, 0, len(series)),
Expand Down
13 changes: 6 additions & 7 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/bloomcompactor"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)
Expand All @@ -35,10 +35,9 @@ type processor struct {
}

func (p *processor) run(ctx context.Context, tasks []Task) error {
for ts, tasks := range group(tasks, func(t Task) model.Time { return t.day }) {
interval := bloomshipper.NewInterval(ts, ts.Add(Day))
for ts, tasks := range group(tasks, func(t Task) bloomcompactor.DayTable { return t.table }) {
tenant := tasks[0].Tenant
err := p.processTasks(ctx, tenant, interval, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
err := p.processTasks(ctx, tenant, ts, []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}, tasks)
if err != nil {
for _, task := range tasks {
task.CloseWithError(err)
Expand All @@ -52,11 +51,11 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {
return nil
}

func (p *processor) processTasks(ctx context.Context, tenant string, interval bloomshipper.Interval, keyspaces []v1.FingerprintBounds, tasks []Task) error {
func (p *processor) processTasks(ctx context.Context, tenant string, day bloomcompactor.DayTable, keyspaces []v1.FingerprintBounds, tasks []Task) error {
minFpRange, maxFpRange := getFirstLast(keyspaces)
metaSearch := bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: interval,
Interval: day.Bounds(),
Keyspace: v1.FingerprintBounds{Min: minFpRange.Min, Max: maxFpRange.Max},
}
metas, err := p.store.FetchMetas(ctx, metaSearch)
Expand All @@ -65,7 +64,7 @@ func (p *processor) processTasks(ctx context.Context, tenant string, interval bl
}
p.metrics.metasFetched.WithLabelValues(p.id).Observe(float64(len(metas)))

blocksRefs := bloomshipper.BlocksForMetas(metas, interval, keyspaces)
blocksRefs := bloomshipper.BlocksForMetas(metas, day.Bounds(), keyspaces)
return p.processBlocks(ctx, partition(tasks, blocksRefs))
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/bloomcompactor"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util/constants"
Expand Down Expand Up @@ -109,7 +110,7 @@ func TestProcessor(t *testing.T) {
Start: now.Add(-1 * time.Hour),
End: now,
},
day: truncateDay(now),
table: bloomcompactor.DayTable(truncateDay(now)),
}
filters := []syntax.LineFilter{
{Ty: 0, Match: "no match"},
Expand Down Expand Up @@ -153,7 +154,7 @@ func TestProcessor(t *testing.T) {
Start: now.Add(-1 * time.Hour),
End: now,
},
day: truncateDay(now),
table: bloomcompactor.DayTable(truncateDay(now)),
}
filters := []syntax.LineFilter{
{Ty: 0, Match: "no match"},
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/exp/slices"

"github.com/grafana/loki/pkg/bloomcompactor"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -121,7 +122,7 @@ func partitionFingerprintRange(tasks []Task, blocks []bloomshipper.BlockRef) (re

type seriesWithBounds struct {
bounds model.Interval
day model.Time
table bloomcompactor.DayTable
series []*logproto.GroupedChunkRefs
}

Expand Down Expand Up @@ -173,7 +174,7 @@ func partitionRequest(req *logproto.FilterChunkRefRequest) []seriesWithBounds {
Start: minTs,
End: maxTs,
},
day: day,
table: bloomcompactor.DayTable(day),
series: res,
})
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/bloomcompactor"
"github.com/grafana/loki/pkg/logproto"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
Expand Down Expand Up @@ -176,7 +177,7 @@ func TestPartitionRequest(t *testing.T) {
exp: []seriesWithBounds{
{
bounds: model.Interval{Start: ts.Add(-60 * time.Minute), End: ts.Add(-45 * time.Minute)},
day: mktime("2024-01-24 00:00"),
table: bloomcompactor.DayTable(mktime("2024-01-24 00:00")),
series: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Expand Down Expand Up @@ -217,7 +218,7 @@ func TestPartitionRequest(t *testing.T) {
exp: []seriesWithBounds{
{
bounds: model.Interval{Start: ts.Add(-23 * time.Hour), End: ts.Add(-22 * time.Hour)},
day: mktime("2024-01-23 00:00"),
table: bloomcompactor.DayTable(mktime("2024-01-23 00:00")),
series: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Expand All @@ -229,7 +230,7 @@ func TestPartitionRequest(t *testing.T) {
},
{
bounds: model.Interval{Start: ts.Add(-2 * time.Hour), End: ts.Add(-1 * time.Hour)},
day: mktime("2024-01-24 00:00"),
table: bloomcompactor.DayTable(mktime("2024-01-24 00:00")),
series: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x01,
Expand Down Expand Up @@ -258,7 +259,7 @@ func TestPartitionRequest(t *testing.T) {
exp: []seriesWithBounds{
{
bounds: model.Interval{Start: ts.Add(-13 * time.Hour), End: ts.Add(-11 * time.Hour)},
day: mktime("2024-01-23 00:00"),
table: bloomcompactor.DayTable(mktime("2024-01-23 00:00")),
series: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Expand All @@ -270,7 +271,7 @@ func TestPartitionRequest(t *testing.T) {
},
{
bounds: model.Interval{Start: ts.Add(-13 * time.Hour), End: ts.Add(-11 * time.Hour)},
day: mktime("2024-01-24 00:00"),
table: bloomcompactor.DayTable(mktime("2024-01-24 00:00")),
series: []*logproto.GroupedChunkRefs{
{
Fingerprint: 0x00,
Expand Down

0 comments on commit 378a903

Please sign in to comment.