diff --git a/api/v1alpha1/keda_types.go b/api/v1alpha1/keda_types.go index a1b91450..ce5c73f2 100644 --- a/api/v1alpha1/keda_types.go +++ b/api/v1alpha1/keda_types.go @@ -37,12 +37,16 @@ const ( StateProcessing = "Processing" StateDeleting = "Deleting" + ServedTrue = "True" + ServedFalse = "False" + ConditionReasonDeploymentUpdateErr = ConditionReason("KedaDeploymentUpdateErr") ConditionReasonVerificationErr = ConditionReason("VerificationErr") ConditionReasonVerified = ConditionReason("Verified") ConditionReasonApplyObjError = ConditionReason("ApplyObjError") ConditionReasonVerification = ConditionReason("Verification") ConditionReasonInitialized = ConditionReason("Initialized") + ConditionReasonKedaDuplicated = ConditionReason("KedaDuplicated") ConditionTypeInstalled = ConditionType("Installed") OperatorLogLevelDebug = OperatorLogLevel("debug") @@ -330,8 +334,17 @@ func (k *Keda) UpdateStateDeletion() { k.Status.State = StateDeleting } +func (k *Keda) UpdateServed(served string) { + k.Status.Served = served +} + +func (k *Keda) IsServedEmpty() bool { + return k.Status.Served == "" +} + type Status struct { State string `json:"state"` + Served string `json:"served"` Conditions []metav1.Condition `json:"conditions,omitempty"` } diff --git a/pkg/reconciler/fsm.go b/pkg/reconciler/fsm.go index 7bbe09ba..db989030 100644 --- a/pkg/reconciler/fsm.go +++ b/pkg/reconciler/fsm.go @@ -203,7 +203,7 @@ loop: func NewFsm(log *zap.SugaredLogger, cfg Cfg, k8s K8s) Fsm { return &fsm{ - fn: sFnTakeSnapshot, + fn: sFnServedFilter, Cfg: cfg, log: log, K8s: k8s, diff --git a/pkg/reconciler/served_filter.go b/pkg/reconciler/served_filter.go new file mode 100644 index 00000000..6dd50e82 --- /dev/null +++ b/pkg/reconciler/served_filter.go @@ -0,0 +1,54 @@ +package reconciler + +import ( + "context" + "fmt" + "github.com/kyma-project/keda-manager/api/v1alpha1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func sFnServedFilter(ctx context.Context, r *fsm, s *systemState) (stateFn, *ctrl.Result, error) { + if s.instance.IsServedEmpty() { + + // keda CRs check + servedKeda, err := findServedKeda(ctx, r.Client) + if err != nil { + return stopWithErrorAndNoRequeue(err) + } + + s.instance.UpdateServed(v1alpha1.ServedTrue) + if servedKeda != nil { + s.instance.UpdateServed(v1alpha1.ServedFalse) + s.instance.UpdateStateFromErr(v1alpha1.ConditionTypeInstalled, v1alpha1.ConditionReasonKedaDuplicated, + fmt.Errorf("only one instance of Keda is allowed (current served instance: %s/%s)", + servedKeda.GetNamespace(), servedKeda.GetName())) + } + + return stopWithRequeue() + } + + if s.instance.Status.Served == v1alpha1.ServedFalse { + return nil, nil, nil + } + + return switchState(sFnTakeSnapshot) +} + +func findServedKeda(ctx context.Context, c client.Client) (*v1alpha1.Keda, error) { + var kedaList v1alpha1.KedaList + + err := c.List(ctx, &kedaList) + + if err != nil { + return nil, err + } + + for _, item := range kedaList.Items { + if !item.IsServedEmpty() && item.Status.Served == v1alpha1.ServedTrue { + return &item, nil + } + } + + return nil, nil +} diff --git a/pkg/reconciler/served_filter_test.go b/pkg/reconciler/served_filter_test.go new file mode 100644 index 00000000..664b3f0e --- /dev/null +++ b/pkg/reconciler/served_filter_test.go @@ -0,0 +1,158 @@ +package reconciler + +import ( + "context" + "fmt" + "reflect" + "runtime" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/onsi/gomega" + + "github.com/kyma-project/keda-manager/api/v1alpha1" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiruntime "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func Test_sFnServedFilter(t *testing.T) { + t.Run("skip processing when served is false", func(t *testing.T) { + s := &systemState{ + instance: v1alpha1.Keda{ + Status: v1alpha1.Status{ + Served: v1alpha1.ServedFalse, + }, + }, + } + + nextFn, result, err := sFnServedFilter(context.TODO(), nil, s) + + require.Nil(t, err) + require.Nil(t, nextFn) + require.Nil(t, result) + }) + + t.Run("do next step when served is true", func(t *testing.T) { + s := &systemState{ + instance: v1alpha1.Keda{ + Status: v1alpha1.Status{ + Served: v1alpha1.ServedTrue, + }, + }, + } + + nextFn, result, err := sFnServedFilter(context.TODO(), nil, s) + + require.Nil(t, err) + requireEqualFunc(t, sFnTakeSnapshot, nextFn) + require.Nil(t, result) + }) + + t.Run("set served value from nil to true when there is no served keda on cluster", func(t *testing.T) { + s := &systemState{ + instance: v1alpha1.Keda{ + Status: v1alpha1.Status{}, + }, + } + + r := &fsm{ + K8s: K8s{ + Client: func() client.Client { + scheme := apiruntime.NewScheme() + require.NoError(t, v1alpha1.AddToScheme(scheme)) + + return fake.NewClientBuilder(). + WithScheme(scheme). + WithRuntimeObjects( + fixServedKeda("test-1", "default", ""), + fixServedKeda("test-2", "keda-test", v1alpha1.ServedFalse), + fixServedKeda("test-3", "keda-test-2", ""), + fixServedKeda("test-4", "default", v1alpha1.ServedFalse), + ).Build() + }(), + }, + } + + nextFn, result, err := sFnServedFilter(context.TODO(), r, s) + + require.Nil(t, err) + requireEqualFunc(t, sFnUpdateStatus(&ctrl.Result{Requeue: true}, nil), nextFn) + require.Nil(t, result) + + require.Equal(t, v1alpha1.ServedTrue, s.instance.Status.Served) + }) + + t.Run("set served value from nil to false and set condition to error when there is at lease one served keda on cluster", func(t *testing.T) { + s := &systemState{ + instance: v1alpha1.Keda{ + Status: v1alpha1.Status{}, + }, + } + + r := &fsm{ + K8s: K8s{ + Client: func() client.Client { + scheme := apiruntime.NewScheme() + require.NoError(t, v1alpha1.AddToScheme(scheme)) + + return fake.NewClientBuilder(). + WithScheme(scheme). + WithRuntimeObjects( + fixServedKeda("test-1", "default", v1alpha1.ServedFalse), + fixServedKeda("test-2", "keda-test", v1alpha1.ServedTrue), + fixServedKeda("test-3", "keda-test-2", ""), + fixServedKeda("test-4", "default", v1alpha1.ServedFalse), + ).Build() + }(), + }, + } + + nextFn, result, err := sFnServedFilter(context.TODO(), r, s) + + require.Nil(t, err) + requireEqualFunc(t, sFnUpdateStatus(&ctrl.Result{Requeue: true}, nil), nextFn) + require.Nil(t, result) + + require.Equal(t, v1alpha1.StateError, s.instance.Status.State) + require.Equal(t, v1alpha1.ServedFalse, s.instance.Status.Served) + + expectedCondition := metav1.Condition{ + Type: string(v1alpha1.ConditionTypeInstalled), + Status: "False", + Reason: string(v1alpha1.ConditionReasonKedaDuplicated), + Message: "only one instance of Keda is allowed (current served instance: keda-test/test-2)", + } + opt := cmp.Comparer(func(x, y metav1.Condition) bool { + return x.Type == y.Type && x.Status == y.Status && x.Reason == y.Reason && x.Message == y.Message + }) + g := gomega.NewWithT(t) + g.Expect(s.instance.Status.Conditions).Should(gomega.ContainElement(gomega.BeComparableTo(expectedCondition, opt))) + }) +} + +func fixServedKeda(name, namespace string, served string) *v1alpha1.Keda { + return &v1alpha1.Keda{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Status: v1alpha1.Status{ + Served: served, + }, + } +} + +func requireEqualFunc(t *testing.T, expected, actual stateFn) { + expectedValueOf := reflect.ValueOf(expected) + actualValueOf := reflect.ValueOf(actual) + require.True(t, expectedValueOf.Pointer() == actualValueOf.Pointer(), + fmt.Sprintf("expected '%s', got '%s", getFnName(expected), getFnName(actual))) +} + +func getFnName(fn stateFn) string { + return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() +}