Skip to content

Commit

Permalink
equality
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Jan 30, 2025
1 parent e30b3f4 commit 75c444e
Show file tree
Hide file tree
Showing 19 changed files with 290 additions and 112 deletions.
1 change: 1 addition & 0 deletions internal/component/common/relabel/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

type LabelBuilder interface {
Get(label string) string
// TODO(thampiotr): test that Set and Del can be called while iterating.
Range(f func(label string, value string))
Set(label string, val string)
Del(ns ...string)
Expand Down
62 changes: 42 additions & 20 deletions internal/component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (c *Component) runDiscovery(ctx context.Context, d DiscovererWithMetrics) {
}

func toAlloyTargets(cache map[string]*targetgroup.Group) []Target {
// logDebugInfo(cache)
logDebugInfo(cache)
targetsCount := 0
for _, group := range cache {
targetsCount += len(group.Targets)
Expand All @@ -236,24 +236,46 @@ func toAlloyTargets(cache map[string]*targetgroup.Group) []Target {
return allTargets
}

//
// var lastLogTime = time.Now()
// func logDebugInfo(c map[string]*targetgroup.Group) {
// if time.Since(lastLogTime) < time.Second*10 {
// return
// }
// numGroups := len(c)
// fmt.Printf("DEBUG ============================================\n")
// fmt.Printf("numGroups: %v\n", numGroups)
// for name, group := range c {
// fmt.Printf("group: %q, common labels: %v, targets: %v\n", name, len(group.Labels), len(group.Targets))
// avgTargetLabels := 0
// for _, target := range group.Targets {
// avgTargetLabels += len(target)
// }
// fmt.Printf("avgTargetLabels: %v\n", float32(avgTargetLabels)/float32(len(group.Targets)))
// }
// lastLogTime = time.Now()
// }
var lastLogTime = time.Now()

/*
SOME SAMPLE OF WHAT WE SEE IN dev:
grafana-agent numGroups: 7666
grafana-agent group: "pod/kube-system/node-local-dns-mt9gj", common labels: 26, targets: 4
grafana-agent avgTargetLabels: 7.25
grafana-agent group: "pod/kube-system/ip-masq-agent-t6lcw", common labels: 20, targets: 1
grafana-agent avgTargetLabels: 5
grafana-agent group: "pod/kube-system/calico-node-qgm4m", common labels: 20, targets: 3
grafana-agent avgTargetLabels: 5
grafana-agent group: "pod/kube-system/calico-node-gs88j", common labels: 20, targets: 3
grafana-agent avgTargetLabels: 5
grafana-agent group: "pod/kube-system/ip-masq-agent-jc4rl", common labels: 20, targets: 1
grafana-agent avgTargetLabels: 5
grafana-agent avgTargetLabels: 3
grafana-agent group: "node/gke-dev-us-central-0-main-n2s32-0-fc4-8660b9f7-fnm2", common labels: 64, targets: 1
grafana-agent group: "pod/kube-system/ip-masq-agent-nl44p", common labels: 20, targets: 1
grafana-agent avgTargetLabels: 5
grafana-agent group: "pod/kube-system/kube-proxy-gke-dev-us-central-0-cache-n2hc8-1-fc-ff37d44f-58mj", common labels: 22, targets: 1
grafana-agent avgTargetLabels: 5
grafana-agent group: "pod/kube-system/node-local-dns-q4b7k", common labels: 26, targets: 4
*/
func logDebugInfo(c map[string]*targetgroup.Group) {
if time.Since(lastLogTime) < time.Second*10 {
return
}
numGroups := len(c)
fmt.Printf("DEBUG ============================================\n")
fmt.Printf("numGroups: %v\n", numGroups)
for name, group := range c {
fmt.Printf("group: %q, common labels: %v, targets: %v\n", name, len(group.Labels), len(group.Targets))
avgTargetLabels := 0
for _, target := range group.Targets {
avgTargetLabels += len(target)
}
fmt.Printf("avgTargetLabels: %v\n", float32(avgTargetLabels)/float32(len(group.Targets)))
}
lastLogTime = time.Now()
}

func (c *Component) LiveDebugging(_ int) {}
19 changes: 16 additions & 3 deletions internal/component/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/runtime/equality"
"github.com/grafana/alloy/internal/service/livedebugging"
)

Expand Down Expand Up @@ -175,7 +176,7 @@ func TestDiscoveryUpdates(t *testing.T) {
require.EventuallyWithT(t, func(t *assert.CollectT) {
publishedExportsMut.Lock()
defer publishedExportsMut.Unlock()
assert.Equal(t, tc.expectedInitialExports, publishedExports)
assertExportsEqual(t, tc.expectedInitialExports, publishedExports)
}, 3*time.Second, time.Millisecond)

discoverer = newFakeDiscoverer()
Expand All @@ -188,7 +189,7 @@ func TestDiscoveryUpdates(t *testing.T) {
require.EventuallyWithT(t, func(t *assert.CollectT) {
publishedExportsMut.Lock()
defer publishedExportsMut.Unlock()
assert.Equal(t, tc.expectedUpdatedExports, publishedExports)
assertExportsEqual(t, tc.expectedUpdatedExports, publishedExports)
}, 3*time.Second, time.Millisecond)

ctxCancel()
Expand All @@ -197,12 +198,24 @@ func TestDiscoveryUpdates(t *testing.T) {
require.EventuallyWithT(t, func(t *assert.CollectT) {
publishedExportsMut.Lock()
defer publishedExportsMut.Unlock()
assert.Equal(t, tc.expectedFinalExports, publishedExports)
assertExportsEqual(t, tc.expectedFinalExports, publishedExports)
}, 3*time.Second, time.Millisecond)
})
}
}

func assertExportsEqual(t *assert.CollectT, expected []component.Exports, actual []component.Exports) {
if actual == nil {
assert.NotNil(t, actual)
return
}
equal := equality.DeepEqual(expected, actual)
assert.True(t, equal, "expected and actual exports are different")
if !equal { // also do assert.Equal to get a nice diff view if there is an issue.
assert.Equal(t, expected, actual)
}
}

/*
on darwin/arm64/Apple M2:
Benchmark_ToAlloyTargets-8 150 7549967 ns/op 12768249 B/op 40433 allocs/op
Expand Down
18 changes: 13 additions & 5 deletions internal/component/discovery/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
modellabels "github.com/prometheus/prometheus/model/labels"

"github.com/grafana/alloy/internal/runtime/equality"
"github.com/grafana/alloy/syntax"
)

Expand All @@ -30,6 +31,7 @@ var (
_ syntax.Capsule = Target{}
_ syntax.ConvertibleIntoCapsule = Target{}
_ syntax.ConvertibleFromCapsule = &Target{}
_ equality.CustomEquality = Target{}
)

func ComponentTargetsToPromTargetGroups(jobName string, tgs []Target) map[string][]*targetgroup.Group {
Expand Down Expand Up @@ -61,10 +63,8 @@ func NewEmptyTarget() Target {
return NewTargetFromLabelSet(make(commonlabels.LabelSet))
}

func NewTargetFromLabelSet(targetLabels commonlabels.LabelSet) Target {
return Target{
own: targetLabels,
}
func NewTargetFromLabelSet(ls commonlabels.LabelSet) Target {
return NewTargetFromSpecificAndBaseLabelSet(ls, nil)
}

// TODO(thampiotr): discovery.*
Expand Down Expand Up @@ -273,10 +273,18 @@ func (t Target) String() string {
s = append(s, fmt.Sprintf("%q=%q", key, value))
return true
})
slices.Sort(s)
return fmt.Sprintf("{%s}", strings.Join(s, ", "))
}

func (t Target) Equal(other Target) bool {
func (t Target) Equals(other any) bool {
if ot, ok := other.(Target); ok {
return t.EqualsTarget(ot)
}
return false
}

func (t Target) EqualsTarget(other Target) bool {
if t.Len() != other.Len() {
return false
}
Expand Down
80 changes: 34 additions & 46 deletions internal/component/discovery/target_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,14 @@ import (
"github.com/grafana/alloy/internal/component/common/relabel"
)

type TargetBuilder interface {
Delete(key string)
Set(key, value string)
MergeWith(t Target)
Target() Target
}

type mapTargetBuilder struct {
t map[string]string
}

func NewTargetBuilder() TargetBuilder {
return &mapTargetBuilder{t: make(map[string]string)}
}

func NewTargetBuilderFrom(t Target) TargetBuilder {
return &mapTargetBuilder{t: t.AsMap()}
}

func (m *mapTargetBuilder) Delete(key string) {
delete(m.t, key)
}

func (m *mapTargetBuilder) Set(key, value string) {
m.t[key] = value
}

func (m *mapTargetBuilder) MergeWith(t Target) {
t.ForEachLabel(func(key string, value string) bool {
m.t[key] = value
return true
})
}
const initialLabelsBuilderCapacity = 5

func (m *mapTargetBuilder) Target() Target {
return NewTargetFromMap(m.t)
type TargetBuilderTwo interface {
relabel.LabelBuilder
Target() Target
SetKV(kv ...string) TargetBuilderTwo
}

const initialLabelsBuilderCapacity = 5

// TODO(thampiotr): maybe all targets can have toDel map ready?
type labelsBuilder struct {
group commonlabels.LabelSet
Expand All @@ -55,11 +23,32 @@ type labelsBuilder struct {
toDel map[string]struct{}
}

func NewLabelsBuilderFromTarget(t Target) relabel.LabelBuilder {
return &labelsBuilder{
group: t.group,
own: t.own,
// TODO(thampiotr): we could postulate that builder is throw-away after .ToTarget() and use object pool for these.
func (l labelsBuilder) SetKV(kv ...string) TargetBuilderTwo {
for i := 0; i < len(kv); i += 2 {
l.Set(kv[i], kv[i+1])
}
return l
}

// NewLabelsBuilder creates an empty labels builder.
func NewLabelsBuilder() TargetBuilderTwo {
return labelsBuilder{
group: nil,
own: make(commonlabels.LabelSet),
toAdd: make(map[string]string),
toDel: make(map[string]struct{}),
}
}

func NewLabelsBuilderFromTarget(t Target) TargetBuilderTwo {
return NewLabelsBuilderFromLabelSets(t.group, t.own)
}

func NewLabelsBuilderFromLabelSets(group, own commonlabels.LabelSet) TargetBuilderTwo {
return labelsBuilder{
group: group,
own: own,
// TODO(thampiotr): we could postulate that builder is throw-away after .Target() and use object pool for these.
toAdd: make(map[string]string, initialLabelsBuilderCapacity),
toDel: make(map[string]struct{}, initialLabelsBuilderCapacity),
}
Expand All @@ -68,7 +57,7 @@ func NewLabelsBuilderFromTarget(t Target) relabel.LabelBuilder {
func NewTargetFromLabelsBuilder(lb relabel.LabelBuilder) Target {
// Use optimised path if possible
if ilb, ok := lb.(*labelsBuilder); ok {
return ilb.ToTarget() // TODO(thampiotr): it could be part of interface and we'd avoid this type checking
return ilb.Target() // TODO(thampiotr): it could be part of interface and we'd avoid this type checking
}

// TODO(thampiotr): remove this, just checking for now
Expand Down Expand Up @@ -129,7 +118,7 @@ func (l labelsBuilder) Del(labels ...string) {
}

// TODO(thampiotr): this can be more optimal still...
func (l labelsBuilder) ToTarget() Target {
func (l labelsBuilder) Target() Target {
if len(l.toAdd) == 0 && len(l.toDel) == 0 {
return NewTargetFromSpecificAndBaseLabelSet(l.own, l.group)
}
Expand Down Expand Up @@ -175,8 +164,7 @@ func (l labelsBuilder) ToTarget() Target {
}
if modifyGroup {
// TODO(thampiotr): When relabeling a lot of targets we possibly will produce a lot of l.groups that will
// all be the same. So maybe we need a step to consolidate them? Or rewrite the relabeling
// logic to work on collection of targets with groups.
// all be the same. So maybe we need a step to consolidate them?
newGroup = make(commonlabels.LabelSet, len(l.group))
for k, v := range l.group {
if _, ok := l.toDel[string(k)]; ok {
Expand Down
42 changes: 29 additions & 13 deletions internal/component/discovery/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/runtime/equality"
"github.com/grafana/alloy/syntax/parser"
"github.com/grafana/alloy/syntax/vm"
)
Expand Down Expand Up @@ -42,29 +43,27 @@ func TestDecodeMap(t *testing.T) {
slices.Sort(seen)
require.Equal(t, []string{"a=5", "b=10"}, seen)

// Some loggers print targets out, check it's all good. But without caring about order.
str := fmt.Sprintf("%s", actual)
valid := str == `{a="5", b="10"}` || str == `{b="10", a="5"}`
require.True(t, valid)
// Some loggers print targets out, check it's all good.
require.Equal(t, `{"a"="5", "b"="10"}`, fmt.Sprintf("%s", actual))
}

func TestTargetBuilder(t *testing.T) {
target := NewTargetFromMap(map[string]string{"a": "5", "b": "10"})
builder := NewTargetBuilderFrom(target)
builder := NewLabelsBuilderFromTarget(target)

builder.Set("foo", "bar")
get, ok := builder.Target().Get("foo")
require.True(t, ok)
require.Equal(t, "bar", get)

builder = NewTargetBuilderFrom(target)
builder.Delete("foo")
builder = NewLabelsBuilderFromTarget(target)
builder.Del("foo")
get, ok = builder.Target().Get("foo")
require.False(t, ok)
require.Equal(t, "", get)

// Test setting on empty target (verifies it won't panic)
NewTargetBuilder().Set("foo", "bar")
NewLabelsBuilder().Set("foo", "bar")
}

func TestConvertFromNative(t *testing.T) {
Expand All @@ -86,18 +85,35 @@ func TestConvertFromNative(t *testing.T) {
NewTargetFromMap(map[string]string{"nae": "nae", "boom": "bap"}),
}

require.Equal(t, expected, toAlloyTargets(map[string]*targetgroup.Group{"test": nativeGroup}))
require.True(t, equality.DeepEqual(expected, toAlloyTargets(map[string]*targetgroup.Group{"test": nativeGroup})))
}

func TestEquals(t *testing.T) {
func TestEquals_Basic(t *testing.T) {
// NOTE: if we start caching anything as a field, the equality may break. We should test it.
t1 := NewTargetFromMap(map[string]string{"hip": "hop", "boom": "bap"})
require.Equal(t, 2, t1.Labels().Len())
t2 := NewTargetBuilderFrom(t1)
t2.Set("boom", "bap")
tb := NewLabelsBuilderFromTarget(t1)
tb.Set("boom", "bap")
t2 := tb.Target()
// This is a way commonly used in tests.
require.Equal(t, t1, t2)
// This is the way exports are compared in BuiltinComponentNode.setExports, and it's important for performance that
// Targets equality is working correctly.
require.True(t, reflect.DeepEqual(t1, t2.Target()))
require.True(t, reflect.DeepEqual(t1, t2))
}

// TODO(thampiotr): will need a lot more tests like this and with a builder
func TestEquals_Custom(t *testing.T) {
t1 := NewTargetFromSpecificAndBaseLabelSet(
model.LabelSet{"foo": "bar"},
model.LabelSet{"hip": "hop"},
)
t2 := NewTargetFromSpecificAndBaseLabelSet(
nil,
model.LabelSet{"hip": "hop", "foo": "bar"},
)
require.NotEqual(t, t1, t2)
require.True(t, t1.EqualsTarget(t2))
}

func Benchmark_Targets_TypicalPipeline(b *testing.B) {
Expand Down
Loading

0 comments on commit 75c444e

Please sign in to comment.