Skip to content

Commit

Permalink
Add lifecycle docs on ingest-adapter appender.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <[email protected]>
  • Loading branch information
Harkishen-Singh committed Apr 27, 2022
1 parent a0cae7d commit 6a005c4
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ dockers:
- go.sum
- go.mod
- pkg
- cmd
- migration-tool
image_templates:
- "timescale/prom-migrator:0.0.3"
- "timescale/prom-migrator:latest"
Expand Down
1 change: 0 additions & 1 deletion build/prom-migrator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
FROM golang:1.18.1-alpine AS builder
COPY ./.git build/.git
COPY ./pkg build/pkg
COPY ./cmd build/cmd
COPY ./migration-tool build/migration-tool
COPY ./go.mod build/go.mod
COPY ./go.sum build/go.sum
Expand Down
30 changes: 29 additions & 1 deletion pkg/rules/adapters/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,19 @@ func NewIngestAdapter(ingestor *ingestor.DBIngestor) *ingestAdapter {
type appenderAdapter struct {
data map[string][]model.Insertable
ingestor *ingestor.DBIngestor
closed bool
}

// Appender creates a new appender for Prometheus rules manager.
// Lifecycle
// ---------
// An appender is a type that stores data belonging to a single transaction. A new appender is created
// in each evaluation of a rule in Prometheus. No appender should ingest data concurrently.
//
// The appended samples must become persistent only after a Commit(). If Commit() returns any error,
// Rollback() is called, after which, the appender must never be used.
//
// Note: The rule manager does not call Rollback() yet.
func (a ingestAdapter) Appender(_ context.Context) storage.Appender {
return &appenderAdapter{
data: make(map[string][]model.Insertable),
Expand All @@ -42,6 +53,9 @@ func (a ingestAdapter) Appender(_ context.Context) storage.Appender {
}

func (app *appenderAdapter) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if err := app.shouldAppend(); err != nil {
return 0, err
}
series, metricName, err := app.ingestor.SeriesCache().GetSeriesFromProtos(util.LabelToPrompbLabels(l))
if err != nil {
return 0, fmt.Errorf("get series from protos: %w", err)
Expand All @@ -56,12 +70,18 @@ func (app *appenderAdapter) Append(_ storage.SeriesRef, l labels.Labels, t int64
}

func (app *appenderAdapter) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
if err := app.shouldAppend(); err != nil {
return 0, err
}
// We do not support appending exemplars in recording rules since this is not yet implemented upstream.
// Once upstream implements this feature, we can modify this function.
return 0, nil
return 0, fmt.Errorf("promscale: appending exemplars in rules not implemented")
}

func (app *appenderAdapter) Commit() error {
if err := app.shouldAppend(); err != nil {
return err
}
// Note: InsertTs does 2 things:
// 1. Ingest series
// 2. Ingest samples
Expand All @@ -75,7 +95,15 @@ func (app *appenderAdapter) Commit() error {
return errors.WithMessage(err, "rules: error ingesting data into db-ingestor")
}

func (app *appenderAdapter) shouldAppend() error {
if app.closed {
return fmt.Errorf("cannot append: closed appender")
}
return nil
}

func (app *appenderAdapter) Rollback() error {
app.closed = true
app.data = map[string][]model.Insertable{}
app.ingestor = nil
return nil
Expand Down
10 changes: 8 additions & 2 deletions pkg/rules/adapters/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ func (q querierAdapter) Select(sortSeries bool, hints *storage.SelectHints, matc
return seriesSet
}

func (q querierAdapter) LabelValues(name string, _ ...*labels.Matcher) ([]string, storage.Warnings, error) {
// Weak TODO: We need to implement the matchers.
func (q querierAdapter) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
if len(matchers) > 0 {
// Weak TODO: We need to implement the matchers.
// Note: We behave the same as Prometheus does at the moment.
// See https://github.com/prometheus/prometheus/blob/9558b9b54bd3d0cb1d63b9084f8cbcda6b0d72fb/tsdb/index/index.go#L1483
return nil, nil, fmt.Errorf("searching by matchers not implemented in LabelValues()")
}

return q.qr.LabelValues(name)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ func (m *manager) Run() error {
g.Add(func() error {
log.Info("msg", "starting discovery manager...")
return errors.WithMessage(m.discoveryManager.Run(), "error running discovery manager")
}, func(err error) {})
}, func(err error) {
log.Info("msg", "stopping discovery manager")
})

g.Add(func() error {
log.Info("msg", "starting notifier manager...")
Expand Down
5 changes: 4 additions & 1 deletion pkg/rules/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ func engineQueryFunc(engine *promscale_promql.Engine, q promscale_promql.Queryab
}
}

// Compile-time test to make sure that sizes of both vectors are the same.
// Note: This is a compile-time test to make sure that sizes of both vectors are the same.
// If the upstream vector changes, then this will block compilation. This protects the yoloVector
// function from breaking with unexpected changes in Prometheus mod version.
// We also have a unit test for this that compares the equality of these two structs.
var _ = [1]bool{}[unsafe.Sizeof(promscale_promql.Vector{})-unsafe.Sizeof(prometheus_promql.Vector{})] // #nosec

// My guess is this should be way faster than looping through individual samples
Expand Down
18 changes: 18 additions & 0 deletions pkg/rules/upstream_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package rules

import (
"fmt"
"reflect"
"testing"

"github.com/prometheus/prometheus/model/labels"
Expand All @@ -10,6 +12,22 @@ import (
promscale_promql "github.com/timescale/promscale/pkg/promql"
)

func TestVectorCompatibility(t *testing.T) {
// Note: We check for Sample for incompatibilities and not Vector
// since Vector is not a struct, rather a type on []Sample. Hence, we
// use Sample for checks as that's what the Vector relies on.
typA := reflect.ValueOf(prometheus_promql.Sample{}).Type()
typB := reflect.ValueOf(promscale_promql.Sample{}).Type()

numA := typA.NumField()
numB := typB.NumField()
require.Equal(t, numB, numA, "number of struct fields mismatch")

for i := 0; i < numA; i++ {
require.Equal(t, typB.Field(i).Type.Kind(), typA.Field(i).Type.Kind(), "mismatch in field type at ", fmt.Sprint(i))
}
}

func TestYoloVector(t *testing.T) {
cases := []struct {
name string
Expand Down

0 comments on commit 6a005c4

Please sign in to comment.