diff --git a/internal/component/common/relabel/process.go b/internal/component/common/relabel/process.go index 6346b089b8..5a99589d06 100644 --- a/internal/component/common/relabel/process.go +++ b/internal/component/common/relabel/process.go @@ -14,6 +14,7 @@ import ( type LabelBuilder interface { Get(label string) string + // TODO(thampiotr): test that Set and Del can be called while iterating. Range(f func(label string, value string)) Set(label string, val string) Del(ns ...string) diff --git a/internal/component/discovery/discovery.go b/internal/component/discovery/discovery.go index 40cfe406b2..be05c73fe6 100644 --- a/internal/component/discovery/discovery.go +++ b/internal/component/discovery/discovery.go @@ -220,7 +220,7 @@ func (c *Component) runDiscovery(ctx context.Context, d DiscovererWithMetrics) { } func toAlloyTargets(cache map[string]*targetgroup.Group) []Target { - // logDebugInfo(cache) + logDebugInfo(cache) targetsCount := 0 for _, group := range cache { targetsCount += len(group.Targets) @@ -236,24 +236,46 @@ func toAlloyTargets(cache map[string]*targetgroup.Group) []Target { return allTargets } -// -// var lastLogTime = time.Now() -// func logDebugInfo(c map[string]*targetgroup.Group) { -// if time.Since(lastLogTime) < time.Second*10 { -// return -// } -// numGroups := len(c) -// fmt.Printf("DEBUG ============================================\n") -// fmt.Printf("numGroups: %v\n", numGroups) -// for name, group := range c { -// fmt.Printf("group: %q, common labels: %v, targets: %v\n", name, len(group.Labels), len(group.Targets)) -// avgTargetLabels := 0 -// for _, target := range group.Targets { -// avgTargetLabels += len(target) -// } -// fmt.Printf("avgTargetLabels: %v\n", float32(avgTargetLabels)/float32(len(group.Targets))) -// } -// lastLogTime = time.Now() -// } +var lastLogTime = time.Now() + +/* +SOME SAMPLE OF WHAT WE SEE IN dev: + +grafana-agent numGroups: 7666 +grafana-agent group: "pod/kube-system/node-local-dns-mt9gj", common labels: 26, targets: 4 +grafana-agent avgTargetLabels: 7.25 +grafana-agent group: "pod/kube-system/ip-masq-agent-t6lcw", common labels: 20, targets: 1 +grafana-agent avgTargetLabels: 5 +grafana-agent group: "pod/kube-system/calico-node-qgm4m", common labels: 20, targets: 3 +grafana-agent avgTargetLabels: 5 +grafana-agent group: "pod/kube-system/calico-node-gs88j", common labels: 20, targets: 3 +grafana-agent avgTargetLabels: 5 +grafana-agent group: "pod/kube-system/ip-masq-agent-jc4rl", common labels: 20, targets: 1 +grafana-agent avgTargetLabels: 5 +grafana-agent avgTargetLabels: 3 +grafana-agent group: "node/gke-dev-us-central-0-main-n2s32-0-fc4-8660b9f7-fnm2", common labels: 64, targets: 1 +grafana-agent group: "pod/kube-system/ip-masq-agent-nl44p", common labels: 20, targets: 1 +grafana-agent avgTargetLabels: 5 +grafana-agent group: "pod/kube-system/kube-proxy-gke-dev-us-central-0-cache-n2hc8-1-fc-ff37d44f-58mj", common labels: 22, targets: 1 +grafana-agent avgTargetLabels: 5 +grafana-agent group: "pod/kube-system/node-local-dns-q4b7k", common labels: 26, targets: 4 +*/ +func logDebugInfo(c map[string]*targetgroup.Group) { + if time.Since(lastLogTime) < time.Second*10 { + return + } + numGroups := len(c) + fmt.Printf("DEBUG ============================================\n") + fmt.Printf("numGroups: %v\n", numGroups) + for name, group := range c { + fmt.Printf("group: %q, common labels: %v, targets: %v\n", name, len(group.Labels), len(group.Targets)) + avgTargetLabels := 0 + for _, target := range group.Targets { + avgTargetLabels += len(target) + } + fmt.Printf("avgTargetLabels: %v\n", float32(avgTargetLabels)/float32(len(group.Targets))) + } + lastLogTime = time.Now() +} func (c *Component) LiveDebugging(_ int) {} diff --git a/internal/component/discovery/discovery_test.go b/internal/component/discovery/discovery_test.go index 9b05fd603a..407ff5d7ef 100644 --- a/internal/component/discovery/discovery_test.go +++ b/internal/component/discovery/discovery_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/service/livedebugging" ) @@ -175,7 +176,7 @@ func TestDiscoveryUpdates(t *testing.T) { require.EventuallyWithT(t, func(t *assert.CollectT) { publishedExportsMut.Lock() defer publishedExportsMut.Unlock() - assert.Equal(t, tc.expectedInitialExports, publishedExports) + assertExportsEqual(t, tc.expectedInitialExports, publishedExports) }, 3*time.Second, time.Millisecond) discoverer = newFakeDiscoverer() @@ -188,7 +189,7 @@ func TestDiscoveryUpdates(t *testing.T) { require.EventuallyWithT(t, func(t *assert.CollectT) { publishedExportsMut.Lock() defer publishedExportsMut.Unlock() - assert.Equal(t, tc.expectedUpdatedExports, publishedExports) + assertExportsEqual(t, tc.expectedUpdatedExports, publishedExports) }, 3*time.Second, time.Millisecond) ctxCancel() @@ -197,12 +198,24 @@ func TestDiscoveryUpdates(t *testing.T) { require.EventuallyWithT(t, func(t *assert.CollectT) { publishedExportsMut.Lock() defer publishedExportsMut.Unlock() - assert.Equal(t, tc.expectedFinalExports, publishedExports) + assertExportsEqual(t, tc.expectedFinalExports, publishedExports) }, 3*time.Second, time.Millisecond) }) } } +func assertExportsEqual(t *assert.CollectT, expected []component.Exports, actual []component.Exports) { + if actual == nil { + assert.NotNil(t, actual) + return + } + equal := equality.DeepEqual(expected, actual) + assert.True(t, equal, "expected and actual exports are different") + if !equal { // also do assert.Equal to get a nice diff view if there is an issue. + assert.Equal(t, expected, actual) + } +} + /* on darwin/arm64/Apple M2: Benchmark_ToAlloyTargets-8 150 7549967 ns/op 12768249 B/op 40433 allocs/op diff --git a/internal/component/discovery/target.go b/internal/component/discovery/target.go index 8dba1f5bab..3e84576909 100644 --- a/internal/component/discovery/target.go +++ b/internal/component/discovery/target.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" modellabels "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/syntax" ) @@ -30,6 +31,7 @@ var ( _ syntax.Capsule = Target{} _ syntax.ConvertibleIntoCapsule = Target{} _ syntax.ConvertibleFromCapsule = &Target{} + _ equality.CustomEquality = Target{} ) func ComponentTargetsToPromTargetGroups(jobName string, tgs []Target) map[string][]*targetgroup.Group { @@ -61,10 +63,8 @@ func NewEmptyTarget() Target { return NewTargetFromLabelSet(make(commonlabels.LabelSet)) } -func NewTargetFromLabelSet(targetLabels commonlabels.LabelSet) Target { - return Target{ - own: targetLabels, - } +func NewTargetFromLabelSet(ls commonlabels.LabelSet) Target { + return NewTargetFromSpecificAndBaseLabelSet(ls, nil) } // TODO(thampiotr): discovery.* @@ -273,10 +273,18 @@ func (t Target) String() string { s = append(s, fmt.Sprintf("%q=%q", key, value)) return true }) + slices.Sort(s) return fmt.Sprintf("{%s}", strings.Join(s, ", ")) } -func (t Target) Equal(other Target) bool { +func (t Target) Equals(other any) bool { + if ot, ok := other.(Target); ok { + return t.EqualsTarget(ot) + } + return false +} + +func (t Target) EqualsTarget(other Target) bool { if t.Len() != other.Len() { return false } diff --git a/internal/component/discovery/target_builder.go b/internal/component/discovery/target_builder.go index 115c0c0906..e90154a41d 100644 --- a/internal/component/discovery/target_builder.go +++ b/internal/component/discovery/target_builder.go @@ -6,46 +6,14 @@ import ( "github.com/grafana/alloy/internal/component/common/relabel" ) -type TargetBuilder interface { - Delete(key string) - Set(key, value string) - MergeWith(t Target) - Target() Target -} - -type mapTargetBuilder struct { - t map[string]string -} - -func NewTargetBuilder() TargetBuilder { - return &mapTargetBuilder{t: make(map[string]string)} -} - -func NewTargetBuilderFrom(t Target) TargetBuilder { - return &mapTargetBuilder{t: t.AsMap()} -} - -func (m *mapTargetBuilder) Delete(key string) { - delete(m.t, key) -} - -func (m *mapTargetBuilder) Set(key, value string) { - m.t[key] = value -} - -func (m *mapTargetBuilder) MergeWith(t Target) { - t.ForEachLabel(func(key string, value string) bool { - m.t[key] = value - return true - }) -} +const initialLabelsBuilderCapacity = 5 -func (m *mapTargetBuilder) Target() Target { - return NewTargetFromMap(m.t) +type TargetBuilderTwo interface { + relabel.LabelBuilder + Target() Target + SetKV(kv ...string) TargetBuilderTwo } -const initialLabelsBuilderCapacity = 5 - // TODO(thampiotr): maybe all targets can have toDel map ready? type labelsBuilder struct { group commonlabels.LabelSet @@ -55,11 +23,32 @@ type labelsBuilder struct { toDel map[string]struct{} } -func NewLabelsBuilderFromTarget(t Target) relabel.LabelBuilder { - return &labelsBuilder{ - group: t.group, - own: t.own, - // TODO(thampiotr): we could postulate that builder is throw-away after .ToTarget() and use object pool for these. +func (l labelsBuilder) SetKV(kv ...string) TargetBuilderTwo { + for i := 0; i < len(kv); i += 2 { + l.Set(kv[i], kv[i+1]) + } + return l +} + +// NewLabelsBuilder creates an empty labels builder. +func NewLabelsBuilder() TargetBuilderTwo { + return labelsBuilder{ + group: nil, + own: make(commonlabels.LabelSet), + toAdd: make(map[string]string), + toDel: make(map[string]struct{}), + } +} + +func NewLabelsBuilderFromTarget(t Target) TargetBuilderTwo { + return NewLabelsBuilderFromLabelSets(t.group, t.own) +} + +func NewLabelsBuilderFromLabelSets(group, own commonlabels.LabelSet) TargetBuilderTwo { + return labelsBuilder{ + group: group, + own: own, + // TODO(thampiotr): we could postulate that builder is throw-away after .Target() and use object pool for these. toAdd: make(map[string]string, initialLabelsBuilderCapacity), toDel: make(map[string]struct{}, initialLabelsBuilderCapacity), } @@ -68,7 +57,7 @@ func NewLabelsBuilderFromTarget(t Target) relabel.LabelBuilder { func NewTargetFromLabelsBuilder(lb relabel.LabelBuilder) Target { // Use optimised path if possible if ilb, ok := lb.(*labelsBuilder); ok { - return ilb.ToTarget() // TODO(thampiotr): it could be part of interface and we'd avoid this type checking + return ilb.Target() // TODO(thampiotr): it could be part of interface and we'd avoid this type checking } // TODO(thampiotr): remove this, just checking for now @@ -129,7 +118,7 @@ func (l labelsBuilder) Del(labels ...string) { } // TODO(thampiotr): this can be more optimal still... -func (l labelsBuilder) ToTarget() Target { +func (l labelsBuilder) Target() Target { if len(l.toAdd) == 0 && len(l.toDel) == 0 { return NewTargetFromSpecificAndBaseLabelSet(l.own, l.group) } @@ -175,8 +164,7 @@ func (l labelsBuilder) ToTarget() Target { } if modifyGroup { // TODO(thampiotr): When relabeling a lot of targets we possibly will produce a lot of l.groups that will - // all be the same. So maybe we need a step to consolidate them? Or rewrite the relabeling - // logic to work on collection of targets with groups. + // all be the same. So maybe we need a step to consolidate them? newGroup = make(commonlabels.LabelSet, len(l.group)) for k, v := range l.group { if _, ok := l.toDel[string(k)]; ok { diff --git a/internal/component/discovery/target_test.go b/internal/component/discovery/target_test.go index 76be3dc8ed..eba309118c 100644 --- a/internal/component/discovery/target_test.go +++ b/internal/component/discovery/target_test.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/stretchr/testify/require" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/syntax/parser" "github.com/grafana/alloy/syntax/vm" ) @@ -42,29 +43,27 @@ func TestDecodeMap(t *testing.T) { slices.Sort(seen) require.Equal(t, []string{"a=5", "b=10"}, seen) - // Some loggers print targets out, check it's all good. But without caring about order. - str := fmt.Sprintf("%s", actual) - valid := str == `{a="5", b="10"}` || str == `{b="10", a="5"}` - require.True(t, valid) + // Some loggers print targets out, check it's all good. + require.Equal(t, `{"a"="5", "b"="10"}`, fmt.Sprintf("%s", actual)) } func TestTargetBuilder(t *testing.T) { target := NewTargetFromMap(map[string]string{"a": "5", "b": "10"}) - builder := NewTargetBuilderFrom(target) + builder := NewLabelsBuilderFromTarget(target) builder.Set("foo", "bar") get, ok := builder.Target().Get("foo") require.True(t, ok) require.Equal(t, "bar", get) - builder = NewTargetBuilderFrom(target) - builder.Delete("foo") + builder = NewLabelsBuilderFromTarget(target) + builder.Del("foo") get, ok = builder.Target().Get("foo") require.False(t, ok) require.Equal(t, "", get) // Test setting on empty target (verifies it won't panic) - NewTargetBuilder().Set("foo", "bar") + NewLabelsBuilder().Set("foo", "bar") } func TestConvertFromNative(t *testing.T) { @@ -86,18 +85,35 @@ func TestConvertFromNative(t *testing.T) { NewTargetFromMap(map[string]string{"nae": "nae", "boom": "bap"}), } - require.Equal(t, expected, toAlloyTargets(map[string]*targetgroup.Group{"test": nativeGroup})) + require.True(t, equality.DeepEqual(expected, toAlloyTargets(map[string]*targetgroup.Group{"test": nativeGroup}))) } -func TestEquals(t *testing.T) { +func TestEquals_Basic(t *testing.T) { // NOTE: if we start caching anything as a field, the equality may break. We should test it. t1 := NewTargetFromMap(map[string]string{"hip": "hop", "boom": "bap"}) require.Equal(t, 2, t1.Labels().Len()) - t2 := NewTargetBuilderFrom(t1) - t2.Set("boom", "bap") + tb := NewLabelsBuilderFromTarget(t1) + tb.Set("boom", "bap") + t2 := tb.Target() + // This is a way commonly used in tests. + require.Equal(t, t1, t2) // This is the way exports are compared in BuiltinComponentNode.setExports, and it's important for performance that // Targets equality is working correctly. - require.True(t, reflect.DeepEqual(t1, t2.Target())) + require.True(t, reflect.DeepEqual(t1, t2)) +} + +// TODO(thampiotr): will need a lot more tests like this and with a builder +func TestEquals_Custom(t *testing.T) { + t1 := NewTargetFromSpecificAndBaseLabelSet( + model.LabelSet{"foo": "bar"}, + model.LabelSet{"hip": "hop"}, + ) + t2 := NewTargetFromSpecificAndBaseLabelSet( + nil, + model.LabelSet{"hip": "hop", "foo": "bar"}, + ) + require.NotEqual(t, t1, t2) + require.True(t, t1.EqualsTarget(t2)) } func Benchmark_Targets_TypicalPipeline(b *testing.B) { diff --git a/internal/runtime/componenttest/componenttest.go b/internal/runtime/componenttest/componenttest.go index dc5c1b9e2d..4526ba00c9 100644 --- a/internal/runtime/componenttest/componenttest.go +++ b/internal/runtime/componenttest/componenttest.go @@ -5,19 +5,21 @@ import ( "context" "fmt" "os" - "reflect" "sync" "time" - "github.com/grafana/alloy/internal/service/labelstore" - "github.com/grafana/alloy/internal/service/livedebugging" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" + "github.com/grafana/alloy/internal/runtime/equality" + "github.com/grafana/alloy/internal/service/labelstore" + "github.com/grafana/alloy/internal/service/livedebugging" + "github.com/go-kit/log" + "go.opentelemetry.io/otel/trace/noop" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/runtime/logging" - "go.opentelemetry.io/otel/trace/noop" ) // A Controller is a testing controller which controls a single component. @@ -66,7 +68,7 @@ func NewControllerFromReg(l log.Logger, reg component.Registration) *Controller func (c *Controller) onStateChange(e component.Exports) { c.exportsMut.Lock() - changed := !reflect.DeepEqual(c.exports, e) + changed := !equality.DeepEqual(c.exports, e) c.exports = e c.exportsMut.Unlock() diff --git a/internal/runtime/equality/deep_equal.go b/internal/runtime/equality/deep_equal.go new file mode 100644 index 0000000000..bb8ee4429b --- /dev/null +++ b/internal/runtime/equality/deep_equal.go @@ -0,0 +1,2 @@ +package equality + diff --git a/internal/runtime/equality/equality.go b/internal/runtime/equality/equality.go new file mode 100644 index 0000000000..eee9e27807 --- /dev/null +++ b/internal/runtime/equality/equality.go @@ -0,0 +1,122 @@ +package equality + +import ( + "reflect" +) + +var customEqualityType = reflect.TypeOf((*CustomEquality)(nil)).Elem() + +// CustomEquality allows to define custom Equals implementation. This can be used, for example, with exported types, +// so that the Runtime can short-circuit propagating updates when it is not necessary. +type CustomEquality interface { + Equals(other any) bool +} + +// DeepEqual is a wrapper around reflect.DeepEqual, which first checks if arguments implement CustomEquality. If they +// do, their Equals method is used for comparison instead of reflect.DeepEqual. +// For simplicity, DeepEqual requires x and y to be of the same type before calling CustomEquality.Equals. +func DeepEqual(x, y any) bool { + if x == nil || y == nil { + return x == y + } + v1 := reflect.ValueOf(x) + v2 := reflect.ValueOf(y) + + // See if we can compare them using CustomEquality + if r := deepCustomEqual(v1, v2); r.compared { + return r.isEqual + } + // Otherwise fall back to reflect.DeepEqual + return reflect.DeepEqual(x, y) +} + +type result struct { + compared bool + isEqual bool +} + +func successfulCompare(isEqual bool) result { return result{compared: true, isEqual: isEqual} } + +var ( + couldNotCompare = result{compared: false, isEqual: false} + comparedAndEqual = result{compared: true, isEqual: true} +) + +func deepCustomEqual(v1, v2 reflect.Value) result { + if !v1.IsValid() || !v2.IsValid() { + return couldNotCompare + } + + if v1.Type() != v2.Type() { + return couldNotCompare + } + + if v1.Type().Implements(customEqualityType) { + return successfulCompare(v1.Interface().(CustomEquality).Equals(v2.Interface())) + } + + // Somewhat redundant, but just in case: + if v2.Type().Implements(customEqualityType) { + return successfulCompare(v2.Interface().(CustomEquality).Equals(v1.Interface())) + } + + switch v1.Kind() { + case reflect.Array: + for i := 0; i < v1.Len(); i++ { + partResult := deepCustomEqual(v1.Index(i), v2.Index(i)) + if !partResult.compared || !partResult.isEqual { + return partResult + } + } + return comparedAndEqual + case reflect.Slice: + if v1.IsNil() != v2.IsNil() { + return couldNotCompare + } + if v1.Len() != v2.Len() { + return couldNotCompare + } + for i := 0; i < v1.Len(); i++ { + partResult := deepCustomEqual(v1.Index(i), v2.Index(i)) + if !partResult.compared || !partResult.isEqual { + return partResult + } + } + return comparedAndEqual + case reflect.Interface, reflect.Pointer: + if v1.IsNil() || v2.IsNil() { + return couldNotCompare + } + return deepCustomEqual(v1.Elem(), v2.Elem()) + case reflect.Struct: + for i, n := 0, v1.NumField(); i < n; i++ { + partResult := deepCustomEqual(v1.Field(i), v2.Field(i)) + if !partResult.compared || !partResult.isEqual { + return partResult + } + } + return comparedAndEqual + case reflect.Map: + if v1.IsNil() != v2.IsNil() { + return couldNotCompare + } + if v1.Len() != v2.Len() { + return couldNotCompare + } + iter := v1.MapRange() + for iter.Next() { + val1 := iter.Value() + val2 := v2.MapIndex(iter.Key()) + if !val1.IsValid() || !val2.IsValid() { + return couldNotCompare + } + partResult := deepCustomEqual(val1, val2) + if !partResult.compared || !partResult.isEqual { + return partResult + } + } + return comparedAndEqual + default: + return couldNotCompare + } +} diff --git a/internal/runtime/internal/controller/node_builtin_component.go b/internal/runtime/internal/controller/node_builtin_component.go index 43e4893a2f..cc6fba1fa2 100644 --- a/internal/runtime/internal/controller/node_builtin_component.go +++ b/internal/runtime/internal/controller/node_builtin_component.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/runtime/logging" "github.com/grafana/alloy/internal/runtime/tracing" "github.com/grafana/alloy/syntax/ast" @@ -282,7 +283,7 @@ func (cn *BuiltinComponentNode) evaluate(scope *vm.Scope) error { return nil } - if reflect.DeepEqual(cn.args, argsCopyValue) { + if equality.DeepEqual(cn.args, argsCopyValue) { // Ignore components which haven't changed. This reduces the cost of // calling evaluate for components where evaluation is expensive (e.g., if // re-evaluating requires re-starting some internal logic). @@ -371,7 +372,7 @@ func (cn *BuiltinComponentNode) setExports(e component.Exports) { var changed bool cn.exportsMut.Lock() - if !reflect.DeepEqual(cn.exports, e) { + if !equality.DeepEqual(cn.exports, e) { changed = true cn.exports = e } diff --git a/internal/runtime/internal/controller/node_custom_component.go b/internal/runtime/internal/controller/node_custom_component.go index 52fff29bc0..a06f4b8193 100644 --- a/internal/runtime/internal/controller/node_custom_component.go +++ b/internal/runtime/internal/controller/node_custom_component.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "path" - "reflect" "strings" "sync" "time" @@ -12,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" ) @@ -264,7 +264,7 @@ func (cn *CustomComponentNode) setExports(e component.Exports) { var changed bool cn.exportsMut.Lock() - if !reflect.DeepEqual(cn.exports, e) { + if !equality.DeepEqual(cn.exports, e) { changed = true cn.exports = e } diff --git a/internal/runtime/internal/controller/node_service.go b/internal/runtime/internal/controller/node_service.go index 85619b2d34..899cb10c47 100644 --- a/internal/runtime/internal/controller/node_service.go +++ b/internal/runtime/internal/controller/node_service.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/service" "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/vm" @@ -104,7 +105,7 @@ func (sn *ServiceNode) Evaluate(scope *vm.Scope) error { // since services expect a non-pointer. argsCopyValue := reflect.ValueOf(argsPointer).Elem().Interface() - if reflect.DeepEqual(sn.args, argsCopyValue) { + if equality.DeepEqual(sn.args, argsCopyValue) { // Ignore arguments which haven't changed. This reduces the cost of calling // evaluate for services where evaluation is expensive (e.g., if // re-evaluating requires re-starting some internal logic). diff --git a/internal/runtime/internal/controller/value_cache.go b/internal/runtime/internal/controller/value_cache.go index 009c592d74..f1319db82b 100644 --- a/internal/runtime/internal/controller/value_cache.go +++ b/internal/runtime/internal/controller/value_cache.go @@ -2,10 +2,10 @@ package controller import ( "fmt" - "reflect" "sync" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/syntax/vm" ) @@ -96,7 +96,7 @@ func (vc *valueCache) CacheModuleExportValue(name string, value any) { v, found := vc.moduleExports[name] if !found { vc.moduleChangedIndex++ - } else if !reflect.DeepEqual(v, value) { + } else if !equality.DeepEqual(v, value) { vc.moduleChangedIndex++ } diff --git a/internal/runtime/internal/importsource/import_file.go b/internal/runtime/internal/importsource/import_file.go index e4691d9ed5..d0e10e02c6 100644 --- a/internal/runtime/internal/importsource/import_file.go +++ b/internal/runtime/internal/importsource/import_file.go @@ -7,14 +7,15 @@ import ( "io/fs" "os" "path/filepath" - "reflect" "strings" "sync" "time" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" filedetector "github.com/grafana/alloy/internal/filedetector" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax/vm" @@ -84,7 +85,7 @@ func (im *ImportFile) Evaluate(scope *vm.Scope) error { return fmt.Errorf("decoding configuration: %w", err) } - if reflect.DeepEqual(im.args, arguments) { + if equality.DeepEqual(im.args, arguments) { return nil } im.args = arguments diff --git a/internal/runtime/internal/importsource/import_git.go b/internal/runtime/internal/importsource/import_git.go index 02ee1ed675..29f3ef8b22 100644 --- a/internal/runtime/internal/importsource/import_git.go +++ b/internal/runtime/internal/importsource/import_git.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "path/filepath" - "reflect" "strings" "sync" "time" @@ -13,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/vcs" "github.com/grafana/alloy/syntax" @@ -93,7 +93,7 @@ func (im *ImportGit) Evaluate(scope *vm.Scope) error { return fmt.Errorf("decoding configuration: %w", err) } - if reflect.DeepEqual(im.args, arguments) { + if equality.DeepEqual(im.args, arguments) { return nil } @@ -208,7 +208,7 @@ func (im *ImportGit) Update(args component.Arguments) (err error) { // Create or update the repo field. // Failure to update repository makes the module loader temporarily use cached contents on disk - if im.repo == nil || !reflect.DeepEqual(repoOpts, im.repoOpts) { + if im.repo == nil || !equality.DeepEqual(repoOpts, im.repoOpts) { r, err := vcs.NewGitRepo(context.Background(), im.repoPath, repoOpts) if err != nil { if errors.As(err, &vcs.UpdateFailedError{}) { diff --git a/internal/runtime/internal/importsource/import_http.go b/internal/runtime/internal/importsource/import_http.go index 815748f8f6..73f19eced0 100644 --- a/internal/runtime/internal/importsource/import_http.go +++ b/internal/runtime/internal/importsource/import_http.go @@ -5,12 +5,12 @@ import ( "fmt" "net/http" "path" - "reflect" "time" "github.com/grafana/alloy/internal/component" common_config "github.com/grafana/alloy/internal/component/common/config" remote_http "github.com/grafana/alloy/internal/component/remote/http" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/syntax/vm" ) @@ -84,7 +84,7 @@ func (im *ImportHTTP) Evaluate(scope *vm.Scope) error { im.arguments = arguments } - if reflect.DeepEqual(im.arguments, arguments) { + if equality.DeepEqual(im.arguments, arguments) { return nil } diff --git a/internal/runtime/internal/importsource/import_string.go b/internal/runtime/internal/importsource/import_string.go index a8a1249fc4..e1ec0644eb 100644 --- a/internal/runtime/internal/importsource/import_string.go +++ b/internal/runtime/internal/importsource/import_string.go @@ -3,9 +3,9 @@ package importsource import ( "context" "fmt" - "reflect" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/syntax/alloytypes" "github.com/grafana/alloy/syntax/vm" ) @@ -37,7 +37,7 @@ func (im *ImportString) Evaluate(scope *vm.Scope) error { return fmt.Errorf("decoding configuration: %w", err) } - if reflect.DeepEqual(im.arguments, arguments) { + if equality.DeepEqual(im.arguments, arguments) { return nil } im.arguments = arguments diff --git a/internal/runtime/internal/testcomponents/module/git/git.go b/internal/runtime/internal/testcomponents/module/git/git.go index b4cf2fb984..12100d2bfb 100644 --- a/internal/runtime/internal/testcomponents/module/git/git.go +++ b/internal/runtime/internal/testcomponents/module/git/git.go @@ -5,13 +5,14 @@ import ( "context" "errors" "path/filepath" - "reflect" "sync" "time" "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/featuregate" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/runtime/internal/testcomponents/module" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/vcs" @@ -199,7 +200,7 @@ func (c *Component) Update(args component.Arguments) (err error) { // Create or update the repo field. // Failure to update repository makes the module loader temporarily use cached contents on disk - if c.repo == nil || !reflect.DeepEqual(repoOpts, c.repoOpts) { + if c.repo == nil || !equality.DeepEqual(repoOpts, c.repoOpts) { r, err := vcs.NewGitRepo(context.Background(), repoPath, repoOpts) if err != nil { if errors.As(err, &vcs.UpdateFailedError{}) { diff --git a/internal/runtime/internal/testcomponents/module/module.go b/internal/runtime/internal/testcomponents/module/module.go index f7a420b2c4..f9d3c81d1f 100644 --- a/internal/runtime/internal/testcomponents/module/module.go +++ b/internal/runtime/internal/testcomponents/module/module.go @@ -3,11 +3,11 @@ package module import ( "context" "fmt" - "reflect" "sync" "time" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/equality" "github.com/grafana/alloy/internal/runtime/logging/level" ) @@ -44,7 +44,7 @@ func NewModuleComponent(o component.Options) (*ModuleComponent, error) { // It will set the component health in addition to return the error so that the consumer can rely on either or both. // If the content is the same as the last time it was successfully loaded, it will not be reloaded. func (c *ModuleComponent) LoadAlloySource(args map[string]any, contentValue string) error { - if reflect.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() { + if equality.DeepEqual(args, c.getLatestArgs()) && contentValue == c.getLatestContent() { return nil }