Skip to content

Commit

Permalink
Merge pull request #170 from pPrecel/keda-duplication
Browse files Browse the repository at this point in the history
Cover case when Keda CR is duplicated
  • Loading branch information
kyma-bot authored May 25, 2023
2 parents d8bcfdf + 212ca73 commit a62e0f9
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 1 deletion.
13 changes: 13 additions & 0 deletions api/v1alpha1/keda_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ const (
StateProcessing = "Processing"
StateDeleting = "Deleting"

ServedTrue = "True"
ServedFalse = "False"

ConditionReasonDeploymentUpdateErr = ConditionReason("KedaDeploymentUpdateErr")
ConditionReasonVerificationErr = ConditionReason("VerificationErr")
ConditionReasonVerified = ConditionReason("Verified")
ConditionReasonApplyObjError = ConditionReason("ApplyObjError")
ConditionReasonVerification = ConditionReason("Verification")
ConditionReasonInitialized = ConditionReason("Initialized")
ConditionReasonKedaDuplicated = ConditionReason("KedaDuplicated")

ConditionTypeInstalled = ConditionType("Installed")
OperatorLogLevelDebug = OperatorLogLevel("debug")
Expand Down Expand Up @@ -330,8 +334,17 @@ func (k *Keda) UpdateStateDeletion() {
k.Status.State = StateDeleting
}

func (k *Keda) UpdateServed(served string) {
k.Status.Served = served
}

func (k *Keda) IsServedEmpty() bool {
return k.Status.Served == ""
}

type Status struct {
State string `json:"state"`
Served string `json:"served"`
Conditions []metav1.Condition `json:"conditions,omitempty"`
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ loop:

func NewFsm(log *zap.SugaredLogger, cfg Cfg, k8s K8s) Fsm {
return &fsm{
fn: sFnTakeSnapshot,
fn: sFnServedFilter,
Cfg: cfg,
log: log,
K8s: k8s,
Expand Down
54 changes: 54 additions & 0 deletions pkg/reconciler/served_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package reconciler

import (
"context"
"fmt"
"github.com/kyma-project/keda-manager/api/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func sFnServedFilter(ctx context.Context, r *fsm, s *systemState) (stateFn, *ctrl.Result, error) {
if s.instance.IsServedEmpty() {

// keda CRs check
servedKeda, err := findServedKeda(ctx, r.Client)
if err != nil {
return stopWithErrorAndNoRequeue(err)
}

s.instance.UpdateServed(v1alpha1.ServedTrue)
if servedKeda != nil {
s.instance.UpdateServed(v1alpha1.ServedFalse)
s.instance.UpdateStateFromErr(v1alpha1.ConditionTypeInstalled, v1alpha1.ConditionReasonKedaDuplicated,
fmt.Errorf("only one instance of Keda is allowed (current served instance: %s/%s)",
servedKeda.GetNamespace(), servedKeda.GetName()))
}

return stopWithRequeue()
}

if s.instance.Status.Served == v1alpha1.ServedFalse {
return nil, nil, nil
}

return switchState(sFnTakeSnapshot)
}

func findServedKeda(ctx context.Context, c client.Client) (*v1alpha1.Keda, error) {
var kedaList v1alpha1.KedaList

err := c.List(ctx, &kedaList)

if err != nil {
return nil, err
}

for _, item := range kedaList.Items {
if !item.IsServedEmpty() && item.Status.Served == v1alpha1.ServedTrue {
return &item, nil
}
}

return nil, nil
}
158 changes: 158 additions & 0 deletions pkg/reconciler/served_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package reconciler

import (
"context"
"fmt"
"reflect"
"runtime"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/onsi/gomega"

"github.com/kyma-project/keda-manager/api/v1alpha1"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func Test_sFnServedFilter(t *testing.T) {
t.Run("skip processing when served is false", func(t *testing.T) {
s := &systemState{
instance: v1alpha1.Keda{
Status: v1alpha1.Status{
Served: v1alpha1.ServedFalse,
},
},
}

nextFn, result, err := sFnServedFilter(context.TODO(), nil, s)

require.Nil(t, err)
require.Nil(t, nextFn)
require.Nil(t, result)
})

t.Run("do next step when served is true", func(t *testing.T) {
s := &systemState{
instance: v1alpha1.Keda{
Status: v1alpha1.Status{
Served: v1alpha1.ServedTrue,
},
},
}

nextFn, result, err := sFnServedFilter(context.TODO(), nil, s)

require.Nil(t, err)
requireEqualFunc(t, sFnTakeSnapshot, nextFn)
require.Nil(t, result)
})

t.Run("set served value from nil to true when there is no served keda on cluster", func(t *testing.T) {
s := &systemState{
instance: v1alpha1.Keda{
Status: v1alpha1.Status{},
},
}

r := &fsm{
K8s: K8s{
Client: func() client.Client {
scheme := apiruntime.NewScheme()
require.NoError(t, v1alpha1.AddToScheme(scheme))

return fake.NewClientBuilder().
WithScheme(scheme).
WithRuntimeObjects(
fixServedKeda("test-1", "default", ""),
fixServedKeda("test-2", "keda-test", v1alpha1.ServedFalse),
fixServedKeda("test-3", "keda-test-2", ""),
fixServedKeda("test-4", "default", v1alpha1.ServedFalse),
).Build()
}(),
},
}

nextFn, result, err := sFnServedFilter(context.TODO(), r, s)

require.Nil(t, err)
requireEqualFunc(t, sFnUpdateStatus(&ctrl.Result{Requeue: true}, nil), nextFn)
require.Nil(t, result)

require.Equal(t, v1alpha1.ServedTrue, s.instance.Status.Served)
})

t.Run("set served value from nil to false and set condition to error when there is at lease one served keda on cluster", func(t *testing.T) {
s := &systemState{
instance: v1alpha1.Keda{
Status: v1alpha1.Status{},
},
}

r := &fsm{
K8s: K8s{
Client: func() client.Client {
scheme := apiruntime.NewScheme()
require.NoError(t, v1alpha1.AddToScheme(scheme))

return fake.NewClientBuilder().
WithScheme(scheme).
WithRuntimeObjects(
fixServedKeda("test-1", "default", v1alpha1.ServedFalse),
fixServedKeda("test-2", "keda-test", v1alpha1.ServedTrue),
fixServedKeda("test-3", "keda-test-2", ""),
fixServedKeda("test-4", "default", v1alpha1.ServedFalse),
).Build()
}(),
},
}

nextFn, result, err := sFnServedFilter(context.TODO(), r, s)

require.Nil(t, err)
requireEqualFunc(t, sFnUpdateStatus(&ctrl.Result{Requeue: true}, nil), nextFn)
require.Nil(t, result)

require.Equal(t, v1alpha1.StateError, s.instance.Status.State)
require.Equal(t, v1alpha1.ServedFalse, s.instance.Status.Served)

expectedCondition := metav1.Condition{
Type: string(v1alpha1.ConditionTypeInstalled),
Status: "False",
Reason: string(v1alpha1.ConditionReasonKedaDuplicated),
Message: "only one instance of Keda is allowed (current served instance: keda-test/test-2)",
}
opt := cmp.Comparer(func(x, y metav1.Condition) bool {
return x.Type == y.Type && x.Status == y.Status && x.Reason == y.Reason && x.Message == y.Message
})
g := gomega.NewWithT(t)
g.Expect(s.instance.Status.Conditions).Should(gomega.ContainElement(gomega.BeComparableTo(expectedCondition, opt)))
})
}

func fixServedKeda(name, namespace string, served string) *v1alpha1.Keda {
return &v1alpha1.Keda{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Status: v1alpha1.Status{
Served: served,
},
}
}

func requireEqualFunc(t *testing.T, expected, actual stateFn) {
expectedValueOf := reflect.ValueOf(expected)
actualValueOf := reflect.ValueOf(actual)
require.True(t, expectedValueOf.Pointer() == actualValueOf.Pointer(),
fmt.Sprintf("expected '%s', got '%s", getFnName(expected), getFnName(actual)))
}

func getFnName(fn stateFn) string {
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
}

0 comments on commit a62e0f9

Please sign in to comment.