From f2cc4b4f7cbe2580083b30672fd0bbb668a74ac4 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Tue, 19 Sep 2023 12:22:16 +0530 Subject: [PATCH 01/12] Added support for PDB and readiness --- api/v1/aerospikecluster_mutating_webhook.go | 10 +- api/v1/aerospikecluster_types.go | 10 +- api/v1/aerospikecluster_validating_webhook.go | 23 +++++ api/v1/zz_generated.deepcopy.go | 10 ++ .../asdb.aerospike.com_aerospikeclusters.yaml | 16 ++++ ...rnetes-operator.clusterserviceversion.yaml | 9 +- config/rbac/role.yaml | 9 ++ controllers/aerospikecluster_controller.go | 1 + controllers/pod.go | 7 ++ controllers/poddistruptionbudget.go | 96 +++++++++++++++++++ controllers/reconciler.go | 11 +++ controllers/statefulset.go | 65 ++++++++++++- ..._aerospikeclusters.asdb.aerospike.com.yaml | 16 ++++ ...erospike-operator-manager-clusterrole.yaml | 9 ++ test/cluster_helper.go | 57 ++++++++++- test/poddisruptionbudget_test.go | 77 +++++++++++++++ test/sample_files_test.go | 2 +- test/utils.go | 5 + 18 files changed, 423 insertions(+), 10 deletions(-) create mode 100644 controllers/poddistruptionbudget.go create mode 100644 test/poddisruptionbudget_test.go diff --git a/api/v1/aerospikecluster_mutating_webhook.go b/api/v1/aerospikecluster_mutating_webhook.go index 4dc2f371c..ed6a44019 100644 --- a/api/v1/aerospikecluster_mutating_webhook.go +++ b/api/v1/aerospikecluster_mutating_webhook.go @@ -25,6 +25,7 @@ import ( "gomodules.xyz/jsonpatch/v2" v1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/webhook" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -69,6 +70,13 @@ func (c *AerospikeCluster) Default(operation v1.Operation) admission.Response { } func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error { + // Set maxUnavailable default + if c.Spec.MaxUnavailable == nil { + // Set default maxUnavailable to 1 + maxUnavailable := intstr.FromInt(1) + c.Spec.MaxUnavailable = &maxUnavailable + } + // Set network defaults c.Spec.AerospikeNetworkPolicy.setDefaults(c.ObjectMeta.Namespace) @@ -504,7 +512,7 @@ func setDefaultNetworkConf( ) } // Override these sections - // TODO: These values lines will be replaces with runtime info by script in init-container + // TODO: These values lines will be replaced with runtime info by script in init-container // See if we can get better way to make template serviceDefaults := map[string]interface{}{} srvPort := GetServicePort(configSpec) diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 9346610f5..4f0ad56bc 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -37,6 +37,10 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability // Aerospike cluster size // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Cluster Size" Size int32 `json:"size"` + // MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application + // disruption. This value is used to create PodDisruptionBudget. Defaults to 1. + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Max Unavailable" + MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` // Aerospike server image // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Server Image" Image string `json:"image"` @@ -578,10 +582,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability // AerospikeClusterStatusSpec captures the current status of the cluster. type AerospikeClusterStatusSpec struct { //nolint:govet // for readability // Aerospike cluster size - // +operator-sdk:csv:customresourcedefinitions:type=status,displayName="Cluster Size" Size int32 `json:"size,omitempty"` // Aerospike server image Image string `json:"image,omitempty"` + // MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application + // disruption. This value is used to create PodDisruptionBudget. Defaults to 1. + MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` // If set true then multiple pods can be created per Kubernetes Node. // This will create a NodePort service for each Pod. // NodePort, as the name implies, opens a specific port on all the Kubernetes Nodes , @@ -869,6 +875,7 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec, status.Size = spec.Size status.Image = spec.Image + status.MaxUnavailable = spec.MaxUnavailable // Storage statusStorage := AerospikeStorageSpec{} @@ -958,6 +965,7 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec spec.Size = status.Size spec.Image = status.Image + spec.MaxUnavailable = status.MaxUnavailable // Storage specStorage := AerospikeStorageSpec{} diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index 96d8e9454..f7e319314 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -167,6 +167,11 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error { return fmt.Errorf("invalid cluster size 0") } + // Validate MaxUnavailable for PodDisruptionBudget + if err := c.validateMaxUnavailable(); err != nil { + return err + } + // Validate Image version version, err := GetImageVersion(c.Spec.Image) if err != nil { @@ -2190,3 +2195,21 @@ func validateIntOrStringField(value *intstr.IntOrString, fieldPath string) error return nil } + +func (c *AerospikeCluster) validateMaxUnavailable() error { + // safe check for corner cases when mutation webhook somehow didn't work + if c.Spec.MaxUnavailable == nil { + return fmt.Errorf("maxUnavailable cannot be nil. Mutation webhook didn't work") + } + + if err := validateIntOrStringField(c.Spec.MaxUnavailable, "spec.maxUnavailable"); err != nil { + return err + } + + // TODO: Do we need such types of check? Maybe > size/2 etc + if c.Spec.MaxUnavailable.IntValue() > int(c.Spec.Size) { + return fmt.Errorf("maxUnavailable %s cannot be greater than size", c.Spec.MaxUnavailable.String()) + } + + return nil +} diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 6696bf022..d28a8d239 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -154,6 +154,11 @@ func (in *AerospikeClusterList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AerospikeClusterSpec) DeepCopyInto(out *AerospikeClusterSpec) { *out = *in + if in.MaxUnavailable != nil { + in, out := &in.MaxUnavailable, &out.MaxUnavailable + *out = new(intstr.IntOrString) + **out = **in + } in.Storage.DeepCopyInto(&out.Storage) if in.AerospikeAccessControl != nil { in, out := &in.AerospikeAccessControl, &out.AerospikeAccessControl @@ -221,6 +226,11 @@ func (in *AerospikeClusterStatus) DeepCopy() *AerospikeClusterStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AerospikeClusterStatusSpec) DeepCopyInto(out *AerospikeClusterStatusSpec) { *out = *in + if in.MaxUnavailable != nil { + in, out := &in.MaxUnavailable, &out.MaxUnavailable + *out = new(intstr.IntOrString) + **out = **in + } in.Storage.DeepCopyInto(&out.Storage) if in.AerospikeAccessControl != nil { in, out := &in.AerospikeAccessControl, &out.AerospikeAccessControl diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index 266989bf2..b154f1768 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -280,6 +280,14 @@ spec: image: description: Aerospike server image type: string + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + x-kubernetes-int-or-string: true operatorClientCert: description: Certificates to connect to Aerospike. properties: @@ -8907,6 +8915,14 @@ spec: image: description: Aerospike server image type: string + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + x-kubernetes-int-or-string: true multiPodPerHost: description: "If set true then multiple pods can be created per Kubernetes Node. This will create a NodePort service for each Pod. NodePort, diff --git a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml index ed26895a5..7d00a72af 100644 --- a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml @@ -50,6 +50,11 @@ spec: - description: Aerospike server image displayName: Server Image path: image + - description: MaxUnavailable is the percentage/number of pods that can be allowed + to go down or unavailable before application disruption. This value is used + to create PodDisruptionBudget. Defaults to 1. + displayName: Max Unavailable + path: maxUnavailable - description: Certificates to connect to Aerospike. displayName: Operator Client Cert path: operatorClientCert @@ -78,10 +83,6 @@ spec: resource. displayName: Validation Policy path: validationPolicy - statusDescriptors: - - description: Aerospike cluster size - displayName: Cluster Size - path: size version: v1 - description: AerospikeCluster is the schema for the AerospikeCluster API displayName: Aerospike Cluster diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 981820e2c..4cd176f8d 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -111,3 +111,12 @@ rules: - patch - update - watch +- apiGroups: + - policy + resources: + - poddisruptionbudgets + verbs: + - create + - get + - patch + - update diff --git a/controllers/aerospikecluster_controller.go b/controllers/aerospikecluster_controller.go index 2c2f8f441..701e9b9a4 100644 --- a/controllers/aerospikecluster_controller.go +++ b/controllers/aerospikecluster_controller.go @@ -86,6 +86,7 @@ type RackState struct { // +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=secrets,verbs=get // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;create;update;patch //nolint:lll // marker // +kubebuilder:rbac:groups=asdb.aerospike.com,resources=aerospikeclusters,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=asdb.aerospike.com,resources=aerospikeclusters/status,verbs=get;update;patch diff --git a/controllers/pod.go b/controllers/pod.go index 1244e6f3e..302a3fe80 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -147,6 +147,13 @@ func (r *SingleClusterReconciler) getRollingRestartTypePod( r.Log.Info("Aerospike rack storage changed. Need rolling restart") } + if r.isReadinessPortUpdated(pod) { + restartType = mergeRestartType(restartType, podRestart) + + r.Log.Info("Aerospike readiness tcp port changed. Need rolling restart", + "newTCPPort", r.getReadinessProbe().TCPSocket.String()) + } + return restartType } diff --git a/controllers/poddistruptionbudget.go b/controllers/poddistruptionbudget.go new file mode 100644 index 000000000..1b78e0b2a --- /dev/null +++ b/controllers/poddistruptionbudget.go @@ -0,0 +1,96 @@ +package controllers + +import ( + "context" + "fmt" + + v1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" +) + +func (r *SingleClusterReconciler) createOrUpdatePDB() error { + ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) + pdb := &v1.PodDisruptionBudget{} + + err := r.Client.Get( + context.TODO(), types.NamespacedName{ + Name: r.aeroCluster.Name, Namespace: r.aeroCluster.Namespace, + }, pdb, + ) + if err != nil { + if !errors.IsNotFound(err) { + return err + } + + r.Log.Info("Create PodDisruptionBudget") + + pdb = &v1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: r.aeroCluster.Name, + Namespace: r.aeroCluster.Namespace, + Labels: ls, + }, + Spec: v1.PodDisruptionBudgetSpec{ + MaxUnavailable: r.aeroCluster.Spec.MaxUnavailable, + Selector: &metav1.LabelSelector{ + MatchLabels: ls, + }, + }, + } + + // This will be true only for old existing CRs. For new operator versions, this field will be + // set by default to 1 by mutating webhook. + if pdb.Spec.MaxUnavailable == nil { + maxUnavailable := intstr.FromInt(1) + pdb.Spec.MaxUnavailable = &maxUnavailable + } + + // Set AerospikeCluster instance as the owner and controller + err = controllerutil.SetControllerReference( + r.aeroCluster, pdb, r.Scheme, + ) + if err != nil { + return err + } + + if err = r.Client.Create( + context.TODO(), pdb, createOption, + ); err != nil { + return fmt.Errorf( + "failed to create PodDisruptionBudget: %v", + err, + ) + } + + r.Log.Info("Created new PodDisruptionBudget") + + return nil + } + + r.Log.Info( + "PodDisruptionBudget already exist. Updating existing PodDisruptionBudget if required", "name", + utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name), + ) + + if pdb.Spec.MaxUnavailable.String() != r.aeroCluster.Spec.MaxUnavailable.String() { + pdb.Spec.MaxUnavailable = r.aeroCluster.Spec.MaxUnavailable + if err = r.Client.Update( + context.TODO(), pdb, updateOption, + ); err != nil { + return fmt.Errorf( + "failed to update PodDisruptionBudget: %v", + err, + ) + } + + r.Log.Info("Updated PodDisruptionBudget") + } + + return nil +} diff --git a/controllers/reconciler.go b/controllers/reconciler.go index 58b4417bb..f03c22a49 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -113,6 +113,17 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { return res.getResult() } + if err := r.createOrUpdatePDB(); err != nil { + r.Log.Error(err, "Failed to create PodDisruptionBudget") + r.Recorder.Eventf( + r.aeroCluster, corev1.EventTypeWarning, "PodDisruptionBudgetCreateFailed", + "Failed to create PodDisruptionBudget %s/%s", + r.aeroCluster.Namespace, r.aeroCluster.Name, + ) + + return reconcile.Result{}, err + } + if err := r.createOrUpdateSTSLoadBalancerSvc(); err != nil { r.Log.Error(err, "Failed to create LoadBalancer service") r.Recorder.Eventf( diff --git a/controllers/statefulset.go b/controllers/statefulset.go index 1d36c7596..e24bc6b3d 100644 --- a/controllers/statefulset.go +++ b/controllers/statefulset.go @@ -15,6 +15,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -162,9 +163,9 @@ func (r *SingleClusterReconciler) createSTS( Ports: ports, Env: envVarList, VolumeMounts: getDefaultAerospikeContainerVolumeMounts(), + ReadinessProbe: r.getReadinessProbe(), }, }, - Volumes: getDefaultSTSVolumes(r.aeroCluster, rackState), }, }, @@ -201,6 +202,51 @@ func (r *SingleClusterReconciler) createSTS( return r.getSTS(rackState) } +func (r *SingleClusterReconciler) getReadinessProbe() *corev1.Probe { + var readinessPort *int + + if _, tlsPort := asdbv1.GetTLSNameAndPort(r.aeroCluster.Spec.AerospikeConfig, asdbv1.ServicePortName); tlsPort != nil { + readinessPort = tlsPort + } else { + readinessPort = asdbv1.GetServicePort(r.aeroCluster.Spec.AerospikeConfig) + } + + return &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: int32(*readinessPort), + }, + }, + }, + InitialDelaySeconds: 2, + PeriodSeconds: 5, + } +} + +func (r *SingleClusterReconciler) isReadinessPortUpdated(pod *corev1.Pod) bool { + for idx := range pod.Spec.Containers { + container := &pod.Spec.Containers[idx] + + if container.Name != asdbv1.AerospikeServerContainerName { + continue + } + + // ignore if readiness probe is not set. Avoid rolling restart for old versions of operator + if container.ReadinessProbe == nil { + return false + } + + if container.ReadinessProbe.TCPSocket != nil && + container.ReadinessProbe.TCPSocket.String() != r.getReadinessProbe().TCPSocket.String() { + return true + } + } + + return false +} + func (r *SingleClusterReconciler) deleteSTS(st *appsv1.StatefulSet) error { r.Log.Info("Delete statefulset") // No need to do cleanup pods after deleting sts @@ -596,6 +642,9 @@ func (r *SingleClusterReconciler) updateSTS( // Container. r.updateContainerImages(statefulSet) + // Updates the readiness probe TCP Port if changed for the aerospike server container + r.updateReadinessProbe(statefulSet) + // This should be called before updating storage r.initializeSTSStorage(statefulSet, rackState) @@ -1095,6 +1144,20 @@ func (r *SingleClusterReconciler) updateContainerImages(statefulset *appsv1.Stat updateImage(statefulset.Spec.Template.Spec.InitContainers) } +func (r *SingleClusterReconciler) updateReadinessProbe(statefulSet *appsv1.StatefulSet) { + for idx := range statefulSet.Spec.Template.Spec.Containers { + container := &statefulSet.Spec.Template.Spec.Containers[idx] + + if container.Name != asdbv1.AerospikeServerContainerName { + continue + } + + container.ReadinessProbe = r.getReadinessProbe() + + break + } +} + func (r *SingleClusterReconciler) updateAerospikeInitContainerImage(statefulSet *appsv1.StatefulSet) error { for idx := range statefulSet.Spec.Template.Spec.InitContainers { container := &statefulSet.Spec.Template.Spec.InitContainers[idx] diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index 266989bf2..b154f1768 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -280,6 +280,14 @@ spec: image: description: Aerospike server image type: string + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + x-kubernetes-int-or-string: true operatorClientCert: description: Certificates to connect to Aerospike. properties: @@ -8907,6 +8915,14 @@ spec: image: description: Aerospike server image type: string + maxUnavailable: + anyOf: + - type: integer + - type: string + description: MaxUnavailable is the percentage/number of pods that + can be allowed to go down or unavailable before application disruption. + This value is used to create PodDisruptionBudget. Defaults to 1. + x-kubernetes-int-or-string: true multiPodPerHost: description: "If set true then multiple pods can be created per Kubernetes Node. This will create a NodePort service for each Pod. NodePort, diff --git a/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml b/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml index c379e7f0e..adf721767 100644 --- a/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml +++ b/helm-charts/aerospike-kubernetes-operator/templates/aerospike-operator-manager-clusterrole.yaml @@ -116,4 +116,13 @@ rules: - patch - update - watch +- apiGroups: + - policy + resources: + - poddisruptionbudgets + verbs: + - create + - get + - patch + - update {{- end }} diff --git a/test/cluster_helper.go b/test/cluster_helper.go index b1263a8c7..8746dcb50 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -16,6 +16,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/client" @@ -86,6 +87,12 @@ func rollingRestartClusterByEnablingTLS( return err } + // Port should be changed to service tls-port + err = validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) + if err != nil { + return err + } + network := aeroCluster.Spec.AerospikeConfig.Value["network"].(map[string]interface{}) serviceNetwork := network[asdbv1.ServicePortName].(map[string]interface{}) fabricNetwork := network[asdbv1.FabricPortName].(map[string]interface{}) @@ -100,7 +107,12 @@ func rollingRestartClusterByEnablingTLS( network[asdbv1.HeartbeatPortName] = heartbeartNetwork aeroCluster.Spec.AerospikeConfig.Value["network"] = network - return updateCluster(k8sClient, ctx, aeroCluster) + err = updateCluster(k8sClient, ctx, aeroCluster) + if err != nil { + return err + } + + return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) } func rollingRestartClusterByDisablingTLS( @@ -129,10 +141,22 @@ func rollingRestartClusterByDisablingTLS( return err } + // port should remain same i.e. service tls-port + err = validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) + if err != nil { + return err + } + aeroCluster.Spec.AerospikeConfig.Value["network"] = getNetworkConfig() aeroCluster.Spec.OperatorClientCertSpec = nil - return updateCluster(k8sClient, ctx, aeroCluster) + err = updateCluster(k8sClient, ctx, aeroCluster) + if err != nil { + return err + } + + // Port should be updated to service non-tls port + return validateReadinessProbe(ctx, k8sClient, aeroCluster, servicePort) } func scaleUpClusterTestWithNSDeviceHandling( @@ -542,6 +566,35 @@ func validateMigrateFillDelay( return err } +// validate readiness port +func validateReadinessProbe(ctx goctx.Context, k8sClient client.Client, aeroCluster *asdbv1.AerospikeCluster, + requiredPort int) error { + for podName := range aeroCluster.Status.Pods { + pod := &corev1.Pod{} + if err := k8sClient.Get(ctx, types.NamespacedName{ + Namespace: aeroCluster.Namespace, + Name: podName, + }, pod); err != nil { + return err + } + + for idx := range pod.Spec.Containers { + container := &pod.Spec.Containers[idx] + + if container.Name != asdbv1.AerospikeServerContainerName { + continue + } + + if !reflect.DeepEqual(container.ReadinessProbe.TCPSocket.Port, intstr.FromInt(requiredPort)) { + return fmt.Errorf("readiness tcp port mismatch, expected: %v, found: %v", + intstr.FromInt(requiredPort), container.ReadinessProbe.TCPSocket.Port) + } + } + } + + return nil +} + func validateDirtyVolumes( ctx goctx.Context, k8sClient client.Client, clusterNamespacedName types.NamespacedName, expectedVolumes []string, diff --git a/test/poddisruptionbudget_test.go b/test/poddisruptionbudget_test.go new file mode 100644 index 000000000..dab2e1592 --- /dev/null +++ b/test/poddisruptionbudget_test.go @@ -0,0 +1,77 @@ +package test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" +) + +var _ = Describe( + "PodDisruptionBudget", func() { + ctx := context.TODO() + aeroCluster := &asdbv1.AerospikeCluster{} + maxAvailable := intstr.FromInt(2) + clusterNamespacedName := getNamespacedName("pdb-test-cluster", namespace) + + BeforeEach(func() { + aeroCluster = createDummyAerospikeCluster( + clusterNamespacedName, 2, + ) + }) + + AfterEach(func() { + Expect(deleteCluster(k8sClient, ctx, aeroCluster)).NotTo(HaveOccurred()) + }) + + It("Validate create PDB with default maxUnavailable", func() { + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 1) + }) + + It("Validate create PDB with specified maxUnavailable", func() { + aeroCluster.Spec.MaxUnavailable = &maxAvailable + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 2) + }) + + It("Validate update PDB", func() { + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 1) + + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + // Update maxUnavailable + By("Update maxUnavailable to 2") + aeroCluster.Spec.MaxUnavailable = &maxAvailable + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 2) + }) + }) + +func validatePDB(ctx context.Context, aerocluster *asdbv1.AerospikeCluster, expectedMaxUnavailable int) { + pdb := policyv1.PodDisruptionBudget{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Namespace: aerocluster.Namespace, + Name: aerocluster.Name, + }, &pdb) + Expect(err).ToNot(HaveOccurred()) + + // Validate PDB + Expect(pdb.Spec.MaxUnavailable.IntValue()).To(Equal(expectedMaxUnavailable)) + Expect(pdb.Status.ExpectedPods).To(Equal(aerocluster.Spec.Size)) + Expect(pdb.Status.CurrentHealthy).To(Equal(aerocluster.Spec.Size)) + Expect(pdb.Status.DisruptionsAllowed).To(Equal(int32(expectedMaxUnavailable))) + Expect(pdb.Status.DesiredHealthy).To(Equal(aerocluster.Spec.Size - int32(expectedMaxUnavailable))) +} diff --git a/test/sample_files_test.go b/test/sample_files_test.go index f0cecea23..18f638fdd 100644 --- a/test/sample_files_test.go +++ b/test/sample_files_test.go @@ -139,7 +139,7 @@ func getSamplesFiles() ([]string, error) { } // Files/Dirs ignored are: - // 1.PMEM sample file as hardware is not available + // 1. PMEM sample file as hardware is not available // 2. XDR related files as they are separately tested // 3. All files which are not CR samples if strings.Contains(path, "pmem_cluster_cr.yaml") || strings.Contains(path, "xdr_") || diff --git a/test/utils.go b/test/utils.go index 07b885ac0..b1918f37a 100644 --- a/test/utils.go +++ b/test/utils.go @@ -35,6 +35,11 @@ var ( pkgLog = ctrl.Log.WithName("test") ) +const ( + servicePort = 3000 + serviceTLSPort = 4333 +) + var secrets map[string][]byte var cacertSecrets map[string][]byte From 2346ff96dc9cf3ea8c6eea61b5b9b1bc53986ed5 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Fri, 22 Sep 2023 13:11:58 +0530 Subject: [PATCH 02/12] Added support for PDB and readiness --- api/v1/aerospikecluster_mutating_webhook.go | 3 +- api/v1/aerospikecluster_validating_webhook.go | 41 +++++++++- controllers/aerospikecluster_controller.go | 30 +++++++ controllers/poddistruptionbudget.go | 80 ++++++++++++------- test/cluster_helper.go | 12 ++- test/cluster_test.go | 7 ++ test/utils.go | 5 -- 7 files changed, 140 insertions(+), 38 deletions(-) diff --git a/api/v1/aerospikecluster_mutating_webhook.go b/api/v1/aerospikecluster_mutating_webhook.go index ed6a44019..41e79ebf4 100644 --- a/api/v1/aerospikecluster_mutating_webhook.go +++ b/api/v1/aerospikecluster_mutating_webhook.go @@ -70,9 +70,8 @@ func (c *AerospikeCluster) Default(operation v1.Operation) admission.Response { } func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error { - // Set maxUnavailable default + // Set maxUnavailable default to 1 if c.Spec.MaxUnavailable == nil { - // Set default maxUnavailable to 1 maxUnavailable := intstr.FromInt(1) c.Spec.MaxUnavailable = &maxUnavailable } diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index f7e319314..245f1385d 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -2206,9 +2206,44 @@ func (c *AerospikeCluster) validateMaxUnavailable() error { return err } - // TODO: Do we need such types of check? Maybe > size/2 etc - if c.Spec.MaxUnavailable.IntValue() > int(c.Spec.Size) { - return fmt.Errorf("maxUnavailable %s cannot be greater than size", c.Spec.MaxUnavailable.String()) + maxUnavailable := int(c.Spec.Size) + + // If Size is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss + if maxUnavailable == 1 { + return nil + } + + for idx := range c.Spec.RackConfig.Racks { + rack := &c.Spec.RackConfig.Racks[idx] + nsList := rack.AerospikeConfig.Value["namespaces"].([]interface{}) + + for _, nsInterface := range nsList { + rfInterface, exists := nsInterface.(map[string]interface{})["replication-factor"] + if !exists { + // Default RF is 2 if not given + maxUnavailable = 2 + continue + } + + rf, err := GetIntType(rfInterface) + if err != nil { + return fmt.Errorf("namespace replication-factor %v", err) + } + + // If RF is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss + if rf == 1 { + continue + } + + if rf < maxUnavailable { + maxUnavailable = rf + } + } + } + + if c.Spec.MaxUnavailable.IntValue() >= maxUnavailable { + return fmt.Errorf("maxUnavailable %s cannot be greater than or equal to %v", + c.Spec.MaxUnavailable.String(), maxUnavailable) } return nil diff --git a/controllers/aerospikecluster_controller.go b/controllers/aerospikecluster_controller.go index 701e9b9a4..d23cc9956 100644 --- a/controllers/aerospikecluster_controller.go +++ b/controllers/aerospikecluster_controller.go @@ -6,8 +6,11 @@ import ( "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" + policyv1 "k8s.io/api/policy/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" k8sRuntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -37,6 +40,8 @@ var ( } ) +var PDBbGvk = policyv1beta1.SchemeGroupVersion.WithKind("PodDisruptionBudget") + // AerospikeClusterReconciler reconciles AerospikeClusters type AerospikeClusterReconciler struct { client.Client @@ -49,6 +54,10 @@ type AerospikeClusterReconciler struct { // SetupWithManager sets up the controller with the Manager func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { + if err := r.setupPdbAPI(r.KubeConfig); err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). For(&asdbv1.AerospikeCluster{}). Owns( @@ -72,6 +81,27 @@ func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } +// setupPdbAPI sets up the pdb api version to use as per the k8s version. +// TODO: Move to v1 when minimum supported k8s version is 1.21 +func (r *AerospikeClusterReconciler) setupPdbAPI(config *rest.Config) error { + discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(config) + + resources, err := discoveryClient.ServerResourcesForGroupVersion("policy/v1") + if err != nil { + r.Log.Info("Could not get ServerResourcesForGroupVersion for policy/v1, falling back to policy/v1beta1") + return nil + } + + for i := range resources.APIResources { + if resources.APIResources[i].Kind == "PodDisruptionBudget" { + PDBbGvk = policyv1.SchemeGroupVersion.WithKind("PodDisruptionBudget") + return nil + } + } + + return nil +} + // RackState contains the rack configuration and rack size. type RackState struct { Rack *asdbv1.Rack diff --git a/controllers/poddistruptionbudget.go b/controllers/poddistruptionbudget.go index 1b78e0b2a..580a390a0 100644 --- a/controllers/poddistruptionbudget.go +++ b/controllers/poddistruptionbudget.go @@ -3,54 +3,66 @@ package controllers import ( "context" "fmt" + "strconv" - v1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" "github.com/aerospike/aerospike-kubernetes-operator/pkg/utils" ) func (r *SingleClusterReconciler) createOrUpdatePDB() error { + podList, err := r.getClusterPodList() + if err != nil { + return err + } + + for podIdx := range podList.Items { + pod := &podList.Items[podIdx] + + for containerIdx := range pod.Spec.Containers { + if pod.Spec.Containers[containerIdx].Name != asdbv1.AerospikeServerContainerName { + continue + } + + if pod.Spec.Containers[containerIdx].ReadinessProbe == nil { + r.Log.Info("Pod found without ReadinessProbe, skipping PodDisruptionBudget", + "name", pod.Name) + return nil + } + } + } + ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) - pdb := &v1.PodDisruptionBudget{} + // TODO: Move to concrete object when minimum supported k8s version is 1.21 + pdb := &unstructured.Unstructured{} + pdb.SetGroupVersionKind(PDBbGvk) - err := r.Client.Get( + if err := r.Client.Get( context.TODO(), types.NamespacedName{ Name: r.aeroCluster.Name, Namespace: r.aeroCluster.Namespace, }, pdb, - ) - if err != nil { + ); err != nil { if !errors.IsNotFound(err) { return err } r.Log.Info("Create PodDisruptionBudget") - pdb = &v1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{ - Name: r.aeroCluster.Name, - Namespace: r.aeroCluster.Namespace, - Labels: ls, - }, - Spec: v1.PodDisruptionBudgetSpec{ - MaxUnavailable: r.aeroCluster.Spec.MaxUnavailable, - Selector: &metav1.LabelSelector{ - MatchLabels: ls, - }, + pdb.SetName(r.aeroCluster.Name) + pdb.SetNamespace(r.aeroCluster.Namespace) + pdb.SetLabels(ls) + pdb.Object["spec"] = map[string]interface{}{ + "maxUnavailable": r.aeroCluster.Spec.MaxUnavailable, + "selector": &metav1.LabelSelector{ + MatchLabels: ls, }, } - // This will be true only for old existing CRs. For new operator versions, this field will be - // set by default to 1 by mutating webhook. - if pdb.Spec.MaxUnavailable == nil { - maxUnavailable := intstr.FromInt(1) - pdb.Spec.MaxUnavailable = &maxUnavailable - } - // Set AerospikeCluster instance as the owner and controller err = controllerutil.SetControllerReference( r.aeroCluster, pdb, r.Scheme, @@ -78,9 +90,23 @@ func (r *SingleClusterReconciler) createOrUpdatePDB() error { utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name), ) - if pdb.Spec.MaxUnavailable.String() != r.aeroCluster.Spec.MaxUnavailable.String() { - pdb.Spec.MaxUnavailable = r.aeroCluster.Spec.MaxUnavailable - if err = r.Client.Update( + var value string + + maxUnavailable := pdb.Object["spec"].(map[string]interface{})["maxUnavailable"] + + // Type casting is required because of unstructured object + if val, ok := maxUnavailable.(string); ok { + value = val + } else { + value = strconv.Itoa(int(maxUnavailable.(int64))) + } + + if value != r.aeroCluster.Spec.MaxUnavailable.String() { + spec := pdb.Object["spec"].(map[string]interface{}) + spec["maxUnavailable"] = r.aeroCluster.Spec.MaxUnavailable + pdb.Object["spec"] = spec + + if err := r.Client.Update( context.TODO(), pdb, updateOption, ); err != nil { return fmt.Errorf( diff --git a/test/cluster_helper.go b/test/cluster_helper.go index 8746dcb50..58b9834fa 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -112,6 +112,11 @@ func rollingRestartClusterByEnablingTLS( return err } + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) } @@ -155,8 +160,13 @@ func rollingRestartClusterByDisablingTLS( return err } + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + if err != nil { + return err + } + // Port should be updated to service non-tls port - return validateReadinessProbe(ctx, k8sClient, aeroCluster, servicePort) + return validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceNonTLSPort) } func scaleUpClusterTestWithNSDeviceHandling( diff --git a/test/cluster_test.go b/test/cluster_test.go index 697256638..d8b2cfe01 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -374,6 +374,13 @@ func DeployClusterForAllImagesPost490(ctx goctx.Context) { err = deployCluster(k8sClient, ctx, aeroCluster) Expect(err).ToNot(HaveOccurred()) + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + By("Validating Readiness probe") + err = validateReadinessProbe(ctx, k8sClient, aeroCluster, serviceTLSPort) + Expect(err).ToNot(HaveOccurred()) + _ = deleteCluster(k8sClient, ctx, aeroCluster) }, ) diff --git a/test/utils.go b/test/utils.go index b1918f37a..07b885ac0 100644 --- a/test/utils.go +++ b/test/utils.go @@ -35,11 +35,6 @@ var ( pkgLog = ctrl.Log.WithName("test") ) -const ( - servicePort = 3000 - serviceTLSPort = 4333 -) - var secrets map[string][]byte var cacertSecrets map[string][]byte From 7fd810f5af1e0e37178fc663dc0c8d1a8d9a5ae9 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Thu, 19 Oct 2023 11:06:38 +0530 Subject: [PATCH 03/12] Added test-cases --- test/poddisruptionbudget_test.go | 68 +++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/test/poddisruptionbudget_test.go b/test/poddisruptionbudget_test.go index dab2e1592..a864fbade 100644 --- a/test/poddisruptionbudget_test.go +++ b/test/poddisruptionbudget_test.go @@ -16,7 +16,7 @@ var _ = Describe( "PodDisruptionBudget", func() { ctx := context.TODO() aeroCluster := &asdbv1.AerospikeCluster{} - maxAvailable := intstr.FromInt(2) + maxAvailable := intstr.FromInt(0) clusterNamespacedName := getNamespacedName("pdb-test-cluster", namespace) BeforeEach(func() { @@ -29,34 +29,54 @@ var _ = Describe( Expect(deleteCluster(k8sClient, ctx, aeroCluster)).NotTo(HaveOccurred()) }) - It("Validate create PDB with default maxUnavailable", func() { - err := deployCluster(k8sClient, ctx, aeroCluster) - Expect(err).ToNot(HaveOccurred()) - validatePDB(ctx, aeroCluster, 1) - }) + Context("Valid Operations", func() { + It("Validate create PDB with default maxUnavailable", func() { + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 1) + }) - It("Validate create PDB with specified maxUnavailable", func() { - aeroCluster.Spec.MaxUnavailable = &maxAvailable - err := deployCluster(k8sClient, ctx, aeroCluster) - Expect(err).ToNot(HaveOccurred()) - validatePDB(ctx, aeroCluster, 2) - }) + It("Validate create PDB with specified maxUnavailable", func() { + aeroCluster.Spec.MaxUnavailable = &maxAvailable + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 0) + }) + + It("Validate update PDB", func() { + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 1) - It("Validate update PDB", func() { - err := deployCluster(k8sClient, ctx, aeroCluster) - Expect(err).ToNot(HaveOccurred()) - validatePDB(ctx, aeroCluster, 1) + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + + // Update maxUnavailable + By("Update maxUnavailable to 0") + aeroCluster.Spec.MaxUnavailable = &maxAvailable + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + validatePDB(ctx, aeroCluster, 0) + }) + }) - aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) - Expect(err).ToNot(HaveOccurred()) + Context("Invalid Operations", func() { + value := intstr.FromInt(3) - // Update maxUnavailable - By("Update maxUnavailable to 2") - aeroCluster.Spec.MaxUnavailable = &maxAvailable + It("Should fail if maxUnavailable is greater than size", func() { + aeroCluster.Spec.MaxUnavailable = &value + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }) - err = updateCluster(k8sClient, ctx, aeroCluster) - Expect(err).ToNot(HaveOccurred()) - validatePDB(ctx, aeroCluster, 2) + It("Should fail if maxUnavailable is greater than RF", func() { + aeroCluster.Spec.Size = 3 + value := intstr.FromInt(3) + aeroCluster.Spec.MaxUnavailable = &value + err := deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).To(HaveOccurred()) + }) }) }) From 9e089efa87e1ebc6f64f78dc8a759759114ba286 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Wed, 6 Dec 2023 13:41:40 +0530 Subject: [PATCH 04/12] Removed support for v1beta1 PDB --- controllers/aerospikecluster_controller.go | 30 -------------------- controllers/poddistruptionbudget.go | 32 +++++----------------- 2 files changed, 7 insertions(+), 55 deletions(-) diff --git a/controllers/aerospikecluster_controller.go b/controllers/aerospikecluster_controller.go index d23cc9956..701e9b9a4 100644 --- a/controllers/aerospikecluster_controller.go +++ b/controllers/aerospikecluster_controller.go @@ -6,11 +6,8 @@ import ( "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" - policyv1 "k8s.io/api/policy/v1" - policyv1beta1 "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/errors" k8sRuntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -40,8 +37,6 @@ var ( } ) -var PDBbGvk = policyv1beta1.SchemeGroupVersion.WithKind("PodDisruptionBudget") - // AerospikeClusterReconciler reconciles AerospikeClusters type AerospikeClusterReconciler struct { client.Client @@ -54,10 +49,6 @@ type AerospikeClusterReconciler struct { // SetupWithManager sets up the controller with the Manager func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { - if err := r.setupPdbAPI(r.KubeConfig); err != nil { - return err - } - return ctrl.NewControllerManagedBy(mgr). For(&asdbv1.AerospikeCluster{}). Owns( @@ -81,27 +72,6 @@ func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -// setupPdbAPI sets up the pdb api version to use as per the k8s version. -// TODO: Move to v1 when minimum supported k8s version is 1.21 -func (r *AerospikeClusterReconciler) setupPdbAPI(config *rest.Config) error { - discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(config) - - resources, err := discoveryClient.ServerResourcesForGroupVersion("policy/v1") - if err != nil { - r.Log.Info("Could not get ServerResourcesForGroupVersion for policy/v1, falling back to policy/v1beta1") - return nil - } - - for i := range resources.APIResources { - if resources.APIResources[i].Kind == "PodDisruptionBudget" { - PDBbGvk = policyv1.SchemeGroupVersion.WithKind("PodDisruptionBudget") - return nil - } - } - - return nil -} - // RackState contains the rack configuration and rack size. type RackState struct { Rack *asdbv1.Rack diff --git a/controllers/poddistruptionbudget.go b/controllers/poddistruptionbudget.go index 580a390a0..bb422ddb8 100644 --- a/controllers/poddistruptionbudget.go +++ b/controllers/poddistruptionbudget.go @@ -3,11 +3,10 @@ package controllers import ( "context" "fmt" - "strconv" + v1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -38,9 +37,7 @@ func (r *SingleClusterReconciler) createOrUpdatePDB() error { } ls := utils.LabelsForAerospikeCluster(r.aeroCluster.Name) - // TODO: Move to concrete object when minimum supported k8s version is 1.21 - pdb := &unstructured.Unstructured{} - pdb.SetGroupVersionKind(PDBbGvk) + pdb := &v1.PodDisruptionBudget{} if err := r.Client.Get( context.TODO(), types.NamespacedName{ @@ -56,11 +53,9 @@ func (r *SingleClusterReconciler) createOrUpdatePDB() error { pdb.SetName(r.aeroCluster.Name) pdb.SetNamespace(r.aeroCluster.Namespace) pdb.SetLabels(ls) - pdb.Object["spec"] = map[string]interface{}{ - "maxUnavailable": r.aeroCluster.Spec.MaxUnavailable, - "selector": &metav1.LabelSelector{ - MatchLabels: ls, - }, + pdb.Spec.MaxUnavailable = r.aeroCluster.Spec.MaxUnavailable + pdb.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: ls, } // Set AerospikeCluster instance as the owner and controller @@ -90,21 +85,8 @@ func (r *SingleClusterReconciler) createOrUpdatePDB() error { utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name), ) - var value string - - maxUnavailable := pdb.Object["spec"].(map[string]interface{})["maxUnavailable"] - - // Type casting is required because of unstructured object - if val, ok := maxUnavailable.(string); ok { - value = val - } else { - value = strconv.Itoa(int(maxUnavailable.(int64))) - } - - if value != r.aeroCluster.Spec.MaxUnavailable.String() { - spec := pdb.Object["spec"].(map[string]interface{}) - spec["maxUnavailable"] = r.aeroCluster.Spec.MaxUnavailable - pdb.Object["spec"] = spec + if pdb.Spec.MaxUnavailable.String() != r.aeroCluster.Spec.MaxUnavailable.String() { + pdb.Spec.MaxUnavailable = r.aeroCluster.Spec.MaxUnavailable if err := r.Client.Update( context.TODO(), pdb, updateOption, From 65f379246e471cfa31359bbb3631338428f1ba30 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Tue, 12 Dec 2023 15:29:31 +0530 Subject: [PATCH 05/12] Added support for k8sNodeBlockList --- api/v1/aerospikecluster_types.go | 30 +++++++++- api/v1/zz_generated.deepcopy.go | 15 +++++ .../asdb.aerospike.com_aerospikeclusters.yaml | 60 +++++++++++++++++-- ...rnetes-operator.clusterserviceversion.yaml | 4 ++ controllers/aero_info_calls.go | 2 +- controllers/pod.go | 41 +++++++++++-- controllers/pvc.go | 26 ++++++++ controllers/rack.go | 60 +++++++++++++++++-- controllers/reconciler.go | 2 +- controllers/statefulset.go | 38 ++++++++++-- ..._aerospikeclusters.asdb.aerospike.com.yaml | 60 +++++++++++++++++-- pkg/utils/pod.go | 3 +- test/k8snode_block_list_test.go | 1 + 13 files changed, 310 insertions(+), 32 deletions(-) create mode 100644 test/k8snode_block_list_test.go diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 4f0ad56bc..028cb68ab 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -78,6 +78,9 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability // RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Roster Node BlockList" RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"` + // K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. + // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList" + K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"` } type SeedsFinderServices struct { @@ -568,9 +571,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability // BlockVolumePolicy contains default policies for block volumes. BlockVolumePolicy AerospikePersistentVolumePolicySpec `json:"blockVolumePolicy,omitempty"` - // CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container. + // CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. CleanupThreads int `json:"cleanupThreads,omitempty"` + // LocalStorageClasses contains a list of storage classes which provisions local volumes. + LocalStorageClasses []string `json:"localStorageClasses,omitempty"` + // Volumes list to attach to created pods. // +patchMergeKey=name // +patchStrategy=merge @@ -633,6 +639,8 @@ type AerospikeClusterStatusSpec struct { //nolint:govet // for readability SeedsFinderServices SeedsFinderServices `json:"seedsFinderServices,omitempty"` // RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"` + // K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. + K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"` } // AerospikeClusterStatus defines the observed state of AerospikeCluster @@ -956,6 +964,16 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec, status.RosterNodeBlockList = rosterNodeBlockList } + if len(spec.K8sNodeBlockList) != 0 { + var k8sNodeBlockList []string + + lib.DeepCopy( + &k8sNodeBlockList, &spec.K8sNodeBlockList, + ) + + status.K8sNodeBlockList = k8sNodeBlockList + } + return &status, nil } @@ -1047,5 +1065,15 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec spec.RosterNodeBlockList = rosterNodeBlockList } + if len(status.K8sNodeBlockList) != 0 { + var k8sNodeBlockList []string + + lib.DeepCopy( + &k8sNodeBlockList, &status.K8sNodeBlockList, + ) + + spec.K8sNodeBlockList = k8sNodeBlockList + } + return &spec, nil } diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index d28a8d239..489353971 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -188,6 +188,11 @@ func (in *AerospikeClusterSpec) DeepCopyInto(out *AerospikeClusterSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.K8sNodeBlockList != nil { + in, out := &in.K8sNodeBlockList, &out.K8sNodeBlockList + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AerospikeClusterSpec. @@ -265,6 +270,11 @@ func (in *AerospikeClusterStatusSpec) DeepCopyInto(out *AerospikeClusterStatusSp *out = make([]string, len(*in)) copy(*out, *in) } + if in.K8sNodeBlockList != nil { + in, out := &in.K8sNodeBlockList, &out.K8sNodeBlockList + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AerospikeClusterStatusSpec. @@ -662,6 +672,11 @@ func (in *AerospikeStorageSpec) DeepCopyInto(out *AerospikeStorageSpec) { *out = *in in.FileSystemVolumePolicy.DeepCopyInto(&out.FileSystemVolumePolicy) in.BlockVolumePolicy.DeepCopyInto(&out.BlockVolumePolicy) + if in.LocalStorageClasses != nil { + in, out := &in.LocalStorageClasses, &out.LocalStorageClasses + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.Volumes != nil { in, out := &in.Volumes, &out.Volumes *out = make([]VolumeSpec, len(*in)) diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index b154f1768..fdd1e6820 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -280,6 +280,12 @@ spec: image: description: Aerospike server image type: string + k8sNodeBlockList: + description: K8sNodeBlockList is a list of Kubernetes nodes which + are not used for Aerospike pods. + items: + type: string + type: array maxUnavailable: anyOf: - type: integer @@ -5739,7 +5745,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -5794,6 +5800,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -7427,7 +7439,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -7482,6 +7494,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -8131,7 +8149,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number of cleanup + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -8186,6 +8204,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of storage classes + which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -8915,6 +8939,12 @@ spec: image: description: Aerospike server image type: string + k8sNodeBlockList: + description: K8sNodeBlockList is a list of Kubernetes nodes which + are not used for Aerospike pods. + items: + type: string + type: array maxUnavailable: anyOf: - type: integer @@ -14502,7 +14532,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -14557,6 +14587,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -16190,7 +16226,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -16245,6 +16281,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -16945,7 +16987,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number of cleanup + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -17000,6 +17042,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of storage classes + which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: diff --git a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml index 7d00a72af..e9cec80a0 100644 --- a/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml +++ b/config/manifests/bases/aerospike-kubernetes-operator.clusterserviceversion.yaml @@ -50,6 +50,10 @@ spec: - description: Aerospike server image displayName: Server Image path: image + - description: K8sNodeBlockList is a list of Kubernetes nodes which are not + used for Aerospike pods. + displayName: Kubernetes Node BlockList + path: k8sNodeBlockList - description: MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application disruption. This value is used to create PodDisruptionBudget. Defaults to 1. diff --git a/controllers/aero_info_calls.go b/controllers/aero_info_calls.go index 093da495b..4290a5d06 100644 --- a/controllers/aero_info_calls.go +++ b/controllers/aero_info_calls.go @@ -42,7 +42,7 @@ func (r *SingleClusterReconciler) waitForMultipleNodesSafeStopReady( return reconcileSuccess() } - // Remove a node only if cluster is stable + // Remove a node only if the cluster is stable if err := r.waitForAllSTSToBeReady(ignorablePodNames); err != nil { return reconcileError(fmt.Errorf("failed to wait for cluster to be ready: %v", err)) } diff --git a/controllers/pod.go b/controllers/pod.go index 302a3fe80..93ff94c55 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -63,6 +63,7 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap( return nil, err } + blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) requiredConfHash := confMap.Data[aerospikeConfHashFileName] for idx := range pods { @@ -70,6 +71,15 @@ func (r *SingleClusterReconciler) getRollingRestartTypeMap( continue } + if blockedK8sNodes.Has(pods[idx].Spec.NodeName) { + r.Log.Info("Pod found in blocked nodes list, will be migrated to a different node", + "podName", pods[idx].Name) + + restartTypeMap[pods[idx].Name] = podRestart + + continue + } + podStatus := r.aeroCluster.Status.Pods[pods[idx].Name] if addedNSDevices == nil && podStatus.AerospikeConfigHash != requiredConfHash { // Fetching all block devices that have been added in namespaces. @@ -260,6 +270,7 @@ func (r *SingleClusterReconciler) restartPods( } restartedPods := make([]*corev1.Pod, 0, len(podsToRestart)) + blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) for idx := range podsToRestart { pod := podsToRestart[idx] @@ -267,12 +278,21 @@ func (r *SingleClusterReconciler) restartPods( restartType := restartTypeMap[pod.Name] if restartType == quickRestart { - // If ASD restart fails then go ahead and restart the pod + // If ASD restart fails, then go ahead and restart the pod if err := r.restartASDInPod(rackState, pod); err == nil { continue } } + if blockedK8sNodes.Has(pod.Spec.NodeName) { + r.Log.Info("Pod found in blocked nodes list, deleting corresponding local PVCs if any", + "podName", pod.Name) + + if err := r.deleteLocalPVCs(rackState, pod); err != nil { + return reconcileError(err) + } + } + if err := r.Client.Delete(context.TODO(), pod); err != nil { r.Log.Error(err, "Failed to delete pod") return reconcileError(err) @@ -414,16 +434,27 @@ func (r *SingleClusterReconciler) deletePodAndEnsureImageUpdated( return reconcileError(err) } + blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) + // Delete pods - for _, p := range podsToUpdate { - if err := r.Client.Delete(context.TODO(), p); err != nil { + for _, pod := range podsToUpdate { + if blockedK8sNodes.Has(pod.Spec.NodeName) { + r.Log.Info("Pod found in blocked nodes list, deleting corresponding local PVCs if any", + "podName", pod.Name) + + if err := r.deleteLocalPVCs(rackState, pod); err != nil { + return reconcileError(err) + } + } + + if err := r.Client.Delete(context.TODO(), pod); err != nil { return reconcileError(err) } - r.Log.V(1).Info("Pod deleted", "podName", p.Name) + r.Log.V(1).Info("Pod deleted", "podName", pod.Name) r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeNormal, "PodWaitUpdate", - "[rack-%d] Waiting to update Pod %s", rackState.Rack.ID, p.Name, + "[rack-%d] Waiting to update Pod %s", rackState.Rack.ID, pod.Name, ) } diff --git a/controllers/pvc.go b/controllers/pvc.go index f354ce77e..7777b0d2e 100644 --- a/controllers/pvc.go +++ b/controllers/pvc.go @@ -103,6 +103,32 @@ func (r *SingleClusterReconciler) removePVCsAsync( return deletedPVCs, nil } +func (r *SingleClusterReconciler) deleteLocalPVCs(rackState *RackState, pod *corev1.Pod) error { + pvcItems, err := r.getPodsPVCList([]string{pod.Name}, rackState.Rack.ID) + if err != nil { + return fmt.Errorf("could not find pvc for pod %v: %v", pod.Name, err) + } + + for idx := range pvcItems { + pvcStorageClass := pvcItems[idx].Spec.StorageClassName + if pvcStorageClass == nil { + r.Log.Info("PVC does not have storageClass set, no need to delete PVC", "pvcName", pvcItems[idx].Name) + + continue + } + + if utils.ContainsString(rackState.Rack.Storage.LocalStorageClasses, *pvcStorageClass) { + if err := r.Client.Delete(context.TODO(), &pvcItems[idx]); err != nil { + return fmt.Errorf( + "could not delete pvc %s: %v", pvcItems[idx].Name, err, + ) + } + } + } + + return nil +} + func (r *SingleClusterReconciler) waitForPVCTermination(deletedPVCs []corev1.PersistentVolumeClaim) error { if len(deletedPVCs) == 0 { return nil diff --git a/controllers/rack.go b/controllers/rack.go index 66e269349..2748c59f9 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -350,7 +350,7 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat // Always update configMap. We won't be able to find if a rack's config, and it's pod config is in sync or not // Checking rack.spec, rack.status will not work. // We may change config, let some pods restart with new config and then change config back to original value. - // Now rack.spec, rack.status will be same, but few pods will have changed config. + // Now rack.spec, rack.status will be the same, but few pods will have changed config. // So a check based on spec and status will skip configMap update. // Hence, a rolling restart of pod will never bring pod to desired config if err := r.updateSTSConfigMap( @@ -420,7 +420,14 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat } if r.aeroCluster.Spec.RackConfig.MaxIgnorablePods != nil { - if res := r.handleNSOrDeviceRemovalForIgnorablePods(rackState, ignorablePodNames); !res.isSuccess { + if res = r.handleNSOrDeviceRemovalForIgnorablePods(rackState, ignorablePodNames); !res.isSuccess { + return found, res + } + } + + if r.aeroCluster.Spec.K8sNodeBlockList != nil { + found, res = r.handleK8sNodeBlockListPods(found, rackState, ignorablePodNames) + if !res.isSuccess { return found, res } } @@ -502,9 +509,9 @@ func (r *SingleClusterReconciler) reconcileRack( } if failedPods == nil { - // revert migrate-fill-delay to original value if it was set to 0 during scale down - // Reset will be done if there is Scale down or Rack redistribution - // This check won't cover scenario where scale down operation was done and then reverted to previous value + // Revert migrate-fill-delay to original value if it was set to 0 during scale down. + // Reset will be done if there is scale-down or Rack redistribution. + // This check won't cover a scenario where scale-down operation was done and then reverted to previous value // before the scale down could complete. if (r.aeroCluster.Status.Size > r.aeroCluster.Spec.Size) || (!r.IsStatusEmpty() && len(r.aeroCluster.Status.RackConfig.Racks) != len(r.aeroCluster.Spec.RackConfig.Racks)) { @@ -1061,6 +1068,49 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, return found, reconcileSuccess() } +func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1.StatefulSet, rackState *RackState, + ignorablePodNames sets.Set[string], +) (*appsv1.StatefulSet, reconcileResult) { + if err := r.updateSTS(statefulSet, rackState); err != nil { + return statefulSet, reconcileError( + fmt.Errorf("upgrade rack : %v", err), + ) + } + + podList, err := r.getOrderedRackPodList(rackState.Rack.ID) + if err != nil { + return statefulSet, reconcileError(fmt.Errorf("failed to list pods: %v", err)) + } + + blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) + + for idx := range podList { + pod := podList[idx] + + if blockedK8sNodes.Has(pod.Spec.NodeName) { + r.Log.Info("Pod found in blocked nodes list, migrating to a different node", + "podName", pod.Name) + + if res := r.waitForMultipleNodesSafeStopReady([]*corev1.Pod{pod}, ignorablePodNames); !res.isSuccess { + return statefulSet, res + } + + restartTypeMap := map[string]RestartType{ + pod.Name: podRestart, + } + + if res := r.restartPods(rackState, []*corev1.Pod{pod}, restartTypeMap); !res.isSuccess { + return statefulSet, reconcileError(err) + } + + // handle next pod on blocked node in subsequent Reconcile. + return statefulSet, reconcileRequeueAfter(1) + } + } + + return statefulSet, reconcileSuccess() +} + func (r *SingleClusterReconciler) needRollingRestartRack(rackState *RackState, ignorablePodNames sets.Set[string]) ( needRestart bool, restartTypeMap map[string]RestartType, err error, ) { diff --git a/controllers/reconciler.go b/controllers/reconciler.go index f03c22a49..54e40f877 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -45,7 +45,7 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { r.aeroCluster.Status, ) - // Check DeletionTimestamp to see if cluster is being deleted + // Check DeletionTimestamp to see if the cluster is being deleted if !r.aeroCluster.ObjectMeta.DeletionTimestamp.IsZero() { r.Log.V(1).Info("Deleting AerospikeCluster") // The cluster is being deleted diff --git a/controllers/statefulset.go b/controllers/statefulset.go index e24bc6b3d..d075b5754 100644 --- a/controllers/statefulset.go +++ b/controllers/statefulset.go @@ -17,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -651,11 +652,16 @@ func (r *SingleClusterReconciler) updateSTS( // TODO: Add validation. device, file, both should not exist in same storage class r.updateSTSStorage(statefulSet, rackState) - // Save the updated stateful set. - // Can we optimize this? Update stateful set only if there is any change - // in it. - err := r.Client.Update(context.TODO(), statefulSet, updateOption) - if err != nil { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + found, err := r.getSTS(rackState) + if err != nil { + return err + } + + // Save the updated stateful set. + found.Spec = statefulSet.Spec + return r.Client.Update(context.TODO(), found, updateOption) + }); err != nil { return fmt.Errorf( "failed to update StatefulSet %s: %v", statefulSet.Name, @@ -919,6 +925,16 @@ func (r *SingleClusterReconciler) updateSTSSchedulingPolicy( ) } + if r.aeroCluster.Spec.K8sNodeBlockList != nil { + matchExpressions = append( + matchExpressions, corev1.NodeSelectorRequirement{ + Key: "kubernetes.io/hostname", + Operator: corev1.NodeSelectorOpNotIn, + Values: r.aeroCluster.Spec.K8sNodeBlockList, + }, + ) + } + if len(matchExpressions) != 0 { if affinity.NodeAffinity == nil { affinity.NodeAffinity = &corev1.NodeAffinity{} @@ -1526,8 +1542,18 @@ func getSTSContainerPort( multiPodPerHost bool, aeroConf *asdbv1.AerospikeConfigSpec, ) []corev1.ContainerPort { ports := make([]corev1.ContainerPort, 0, len(defaultContainerPorts)) + portNames := make([]string, 0, len(defaultContainerPorts)) + + // Sorting defaultContainerPorts to fetch map in ordered manner. + // Helps reduce unnecessary sts object updates. + for portName := range defaultContainerPorts { + portNames = append(portNames, portName) + } + + sort.Strings(portNames) - for portName, portInfo := range defaultContainerPorts { + for _, portName := range portNames { + portInfo := defaultContainerPorts[portName] configPort := asdbv1.GetPortFromConfig( aeroConf, portInfo.connectionType, portInfo.configParam, ) diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index b154f1768..fdd1e6820 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -280,6 +280,12 @@ spec: image: description: Aerospike server image type: string + k8sNodeBlockList: + description: K8sNodeBlockList is a list of Kubernetes nodes which + are not used for Aerospike pods. + items: + type: string + type: array maxUnavailable: anyOf: - type: integer @@ -5739,7 +5745,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -5794,6 +5800,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -7427,7 +7439,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -7482,6 +7494,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -8131,7 +8149,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number of cleanup + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -8186,6 +8204,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of storage classes + which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -8915,6 +8939,12 @@ spec: image: description: Aerospike server image type: string + k8sNodeBlockList: + description: K8sNodeBlockList is a list of Kubernetes nodes which + are not used for Aerospike pods. + items: + type: string + type: array maxUnavailable: anyOf: - type: integer @@ -14502,7 +14532,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -14557,6 +14587,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -16190,7 +16226,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -16245,6 +16281,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of + storage classes which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: @@ -16945,7 +16987,7 @@ spec: type: string type: object cleanupThreads: - description: CleanupThreads contains maximum number of cleanup + description: CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container. type: integer filesystemVolumePolicy: @@ -17000,6 +17042,12 @@ spec: - deleteFiles type: string type: object + localStorageClasses: + description: LocalStorageClasses contains a list of storage classes + which provisions local volumes. + items: + type: string + type: array volumes: description: Volumes list to attach to created pods. items: diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index b7c687dd8..629f6c9b2 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -212,7 +212,8 @@ func isPodError(reason string) bool { func IsPodReasonUnschedulable(pod *corev1.Pod) bool { for _, condition := range pod.Status.Conditions { - if condition.Type == corev1.PodScheduled && condition.Reason == corev1.PodReasonUnschedulable { + if condition.Type == corev1.PodScheduled && (condition.Reason == corev1.PodReasonUnschedulable || + condition.Reason == corev1.PodReasonSchedulerError) { return true } } diff --git a/test/k8snode_block_list_test.go b/test/k8snode_block_list_test.go new file mode 100644 index 000000000..56e540407 --- /dev/null +++ b/test/k8snode_block_list_test.go @@ -0,0 +1 @@ +package test From 771a00e43ecaf2be7f36b597ae4ab180c087bfb7 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Tue, 19 Dec 2023 14:06:20 +0530 Subject: [PATCH 06/12] Added test-cases --- api/v1/aerospikecluster_types.go | 23 +- .../asdb.aerospike.com_aerospikeclusters.yaml | 15 +- controllers/pod.go | 26 ++- controllers/rack.go | 64 ++++-- controllers/reconciler.go | 13 +- ..._aerospikeclusters.asdb.aerospike.com.yaml | 15 +- test/cluster_helper.go | 2 +- test/cluster_test.go | 4 +- test/k8snode_block_list_test.go | 206 ++++++++++++++++++ test/utils.go | 5 + 10 files changed, 339 insertions(+), 34 deletions(-) diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 028cb68ab..d6649870a 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -25,6 +25,21 @@ import ( lib "github.com/aerospike/aerospike-management-lib" ) +// +kubebuilder:validation:Enum=AerospikeClusterInProgress;AerospikeClusterCompleted;AerospikeClusterError +type AerospikeClusterPhase string + +// These are the valid phases of Aerospike cluster. +const ( + // AerospikeClusterInProgress means the Aerospike cluster operations are in-progress state. + // This phase denotes that changes are gradually rolling out to the cluster. + AerospikeClusterInProgress AerospikeClusterPhase = "InProgress" + // AerospikeClusterCompleted means the Aerospike cluster has been deployed/upgraded successfully and is ready to use. + AerospikeClusterCompleted AerospikeClusterPhase = "Completed" + // AerospikeClusterError means the Aerospike cluster is in error state because of some reason like misconfiguration, + // infra issues, etc. + AerospikeClusterError AerospikeClusterPhase = "Error" +) + // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. // AerospikeClusterSpec defines the desired state of AerospikeCluster @@ -660,7 +675,10 @@ type AerospikeClusterStatus struct { //nolint:govet // for readability // This is map instead of the conventional map as list convention to allow each pod to patch update its own // status. The map key is the name of the pod. // +patchStrategy=strategic - Pods map[string]AerospikePodStatus `json:"pods" patchStrategy:"strategic"` + Pods map[string]AerospikePodStatus `json:"pods,omitempty" patchStrategy:"strategic"` + + // Phase denotes the current phase of Aerospike cluster operation. + Phase AerospikeClusterPhase `json:"phase,omitempty"` } // AerospikeNetworkType specifies the type of network address to use. @@ -848,9 +866,10 @@ type AerospikePodStatus struct { //nolint:govet // for readability //+kubebuilder:storageversion // +kubebuilder:printcolumn:name="Size",type=string,JSONPath=`.spec.size` // +kubebuilder:printcolumn:name="Image",type=string,JSONPath=`.spec.image` -// +kubebuilder:printcolumn:name="MultiPodPerHost",type=boolean,JSONPath=`.spec.podSpec.MultiPodPerHost` +// +kubebuilder:printcolumn:name="MultiPodPerHost",type=boolean,JSONPath=`.spec.podSpec.multiPodPerHost` // +kubebuilder:printcolumn:name="HostNetwork",type=boolean,JSONPath=`.spec.podSpec.hostNetwork` // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" +// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase" // AerospikeCluster is the schema for the AerospikeCluster API // +operator-sdk:csv:customresourcedefinitions:displayName="Aerospike Cluster",resources={{Service, v1},{Pod,v1},{StatefulSet,v1}} diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index fdd1e6820..d5b116add 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -21,7 +21,7 @@ spec: - jsonPath: .spec.image name: Image type: string - - jsonPath: .spec.podSpec.MultiPodPerHost + - jsonPath: .spec.podSpec.multiPodPerHost name: MultiPodPerHost type: boolean - jsonPath: .spec.podSpec.hostNetwork @@ -30,6 +30,9 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.phase + name: Phase + type: string name: v1 schema: openAPIV3Schema: @@ -9012,6 +9015,14 @@ spec: list by the operator type: string type: object + phase: + description: Phase denotes the current phase of Aerospike cluster + operation. + enum: + - AerospikeClusterInProgress + - AerospikeClusterCompleted + - AerospikeClusterError + type: string podSpec: description: Additional configuration for create Aerospike pods. properties: @@ -17537,8 +17548,6 @@ spec: - skipWorkDirValidate - skipXdrDlogFileValidate type: object - required: - - pods type: object type: object served: true diff --git a/controllers/pod.go b/controllers/pod.go index 93ff94c55..859bb4989 100644 --- a/controllers/pod.go +++ b/controllers/pod.go @@ -171,7 +171,7 @@ func (r *SingleClusterReconciler) rollingRestartPods( rackState *RackState, podsToRestart []*corev1.Pod, ignorablePodNames sets.Set[string], restartTypeMap map[string]RestartType, ) reconcileResult { - failedPods, activePods := getFailedAndActivePods(podsToRestart, ignorablePodNames) + failedPods, activePods := getFailedAndActivePods(podsToRestart) // If already dead node (failed pod) then no need to check node safety, migration if len(failedPods) != 0 { @@ -377,13 +377,9 @@ func (r *SingleClusterReconciler) ensurePodsRunningAndReady(podsToCheck []*corev return reconcileRequeueAfter(10) } -func getFailedAndActivePods(pods []*corev1.Pod, ignorablePodNames sets.Set[string], -) (failedPods, activePods []*corev1.Pod) { +func getFailedAndActivePods(pods []*corev1.Pod) (failedPods, activePods []*corev1.Pod) { for idx := range pods { pod := pods[idx] - if ignorablePodNames.Has(pod.Name) { - continue - } if err := utils.CheckPodFailed(pod); err != nil { failedPods = append(failedPods, pod) @@ -396,10 +392,26 @@ func getFailedAndActivePods(pods []*corev1.Pod, ignorablePodNames sets.Set[strin return failedPods, activePods } +func getNonIgnorablePods(pods []*corev1.Pod, ignorablePodNames sets.Set[string], +) []*corev1.Pod { + nonIgnorablePods := make([]*corev1.Pod, 0, len(pods)) + + for idx := range pods { + pod := pods[idx] + if ignorablePodNames.Has(pod.Name) { + continue + } + + nonIgnorablePods = append(nonIgnorablePods, pod) + } + + return nonIgnorablePods +} + func (r *SingleClusterReconciler) safelyDeletePodsAndEnsureImageUpdated( rackState *RackState, podsToUpdate []*corev1.Pod, ignorablePodNames sets.Set[string], ) reconcileResult { - failedPods, activePods := getFailedAndActivePods(podsToUpdate, ignorablePodNames) + failedPods, activePods := getFailedAndActivePods(podsToUpdate) // If already dead node (failed pod) then no need to check node safety, migration if len(failedPods) != 0 { diff --git a/controllers/rack.go b/controllers/rack.go index 2748c59f9..d90085617 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -81,7 +81,9 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { ) } - failedPods, _ := getFailedAndActivePods(podList, ignorablePodNames) + failedPods, _ := getFailedAndActivePods(podList) + // remove ignorable pods from failedPods + failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { r.Log.Info("Reconcile the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) @@ -107,7 +109,9 @@ func (r *SingleClusterReconciler) reconcileRacks() reconcileResult { ) } - failedPods, _ = getFailedAndActivePods(podList, ignorablePodNames) + failedPods, _ = getFailedAndActivePods(podList) + // remove ignorable pods from failedPods + failedPods = getNonIgnorablePods(failedPods, ignorablePodNames) if len(failedPods) != 0 { r.Log.Info("Restart the failed pods in the Rack", "rackID", state.Rack.ID, "failedPods", failedPods) @@ -426,7 +430,7 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat } if r.aeroCluster.Spec.K8sNodeBlockList != nil { - found, res = r.handleK8sNodeBlockListPods(found, rackState, ignorablePodNames) + found, res = r.handleK8sNodeBlockListPods(found, rackState, ignorablePodNames, failedPods) if !res.isSuccess { return found, res } @@ -1069,7 +1073,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet, } func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1.StatefulSet, rackState *RackState, - ignorablePodNames sets.Set[string], + ignorablePodNames sets.Set[string], failedPods []*corev1.Pod, ) (*appsv1.StatefulSet, reconcileResult) { if err := r.updateSTS(statefulSet, rackState); err != nil { return statefulSet, reconcileError( @@ -1077,13 +1081,27 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 ) } - podList, err := r.getOrderedRackPodList(rackState.Rack.ID) - if err != nil { - return statefulSet, reconcileError(fmt.Errorf("failed to list pods: %v", err)) + var ( + podList []*corev1.Pod + err error + ) + + if len(failedPods) != 0 { + podList = failedPods + } else { + // List the pods for this aeroCluster's statefulset + podList, err = r.getOrderedRackPodList(rackState.Rack.ID) + if err != nil { + return statefulSet, reconcileError(fmt.Errorf("failed to list pods: %v", err)) + } } blockedK8sNodes := sets.NewString(r.aeroCluster.Spec.K8sNodeBlockList...) + var podsToRestart []*corev1.Pod + + restartTypeMap := make(map[string]RestartType) + for idx := range podList { pod := podList[idx] @@ -1091,19 +1109,35 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 r.Log.Info("Pod found in blocked nodes list, migrating to a different node", "podName", pod.Name) - if res := r.waitForMultipleNodesSafeStopReady([]*corev1.Pod{pod}, ignorablePodNames); !res.isSuccess { - return statefulSet, res - } + podsToRestart = append(podsToRestart, pod) - restartTypeMap := map[string]RestartType{ + restartTypeMap = map[string]RestartType{ pod.Name: podRestart, } + } + } - if res := r.restartPods(rackState, []*corev1.Pod{pod}, restartTypeMap); !res.isSuccess { - return statefulSet, reconcileError(err) - } + podsBatchList := r.getPodsBatchToRestart(podsToRestart, len(podList)) + + // Restart batch of pods + if len(podsBatchList) > 0 { + // Handle one batch + podsBatch := podsBatchList[0] - // handle next pod on blocked node in subsequent Reconcile. + r.Log.Info( + "Calculated batch for Pod migration to different nodes", + "rackPodList", getPodNames(podList), + "rearrangedPods", getPodNames(podsToRestart), + "podsBatch", getPodNames(podsBatch), + "rollingUpdateBatchSize", r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, + ) + + if res := r.rollingRestartPods(rackState, podsBatch, ignorablePodNames, restartTypeMap); !res.isSuccess { + return statefulSet, res + } + + // Handle next batch in subsequent Reconcile. + if len(podsBatchList) > 1 { return statefulSet, reconcileRequeueAfter(1) } } diff --git a/controllers/reconciler.go b/controllers/reconciler.go index 54e40f877..b9da43092 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -69,7 +69,17 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { return reconcile.Result{}, nil } - // The cluster is not being deleted, add finalizer in not added already + // Set the status to AerospikeClusterInProgress before starting any operations + if r.aeroCluster.Status.Phase != asdbv1.AerospikeClusterInProgress { + r.aeroCluster.Status.Phase = asdbv1.AerospikeClusterInProgress + + if err := r.Client.Status().Update(context.Background(), r.aeroCluster); err != nil { + r.Log.Error(err, "Failed to set cluster status to AerospikeClusterInProgress") + return reconcile.Result{}, err + } + } + + // The cluster is not being deleted, add finalizer if not added already if err := r.addFinalizer(finalizerName); err != nil { r.Log.Error(err, "Failed to add finalizer") return reconcile.Result{}, err @@ -364,6 +374,7 @@ func (r *SingleClusterReconciler) updateStatus() error { } newAeroCluster.Status.AerospikeClusterStatusSpec = *specToStatus + newAeroCluster.Status.Phase = asdbv1.AerospikeClusterCompleted err = r.patchStatus(newAeroCluster) if err != nil { diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index fdd1e6820..d5b116add 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -21,7 +21,7 @@ spec: - jsonPath: .spec.image name: Image type: string - - jsonPath: .spec.podSpec.MultiPodPerHost + - jsonPath: .spec.podSpec.multiPodPerHost name: MultiPodPerHost type: boolean - jsonPath: .spec.podSpec.hostNetwork @@ -30,6 +30,9 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.phase + name: Phase + type: string name: v1 schema: openAPIV3Schema: @@ -9012,6 +9015,14 @@ spec: list by the operator type: string type: object + phase: + description: Phase denotes the current phase of Aerospike cluster + operation. + enum: + - AerospikeClusterInProgress + - AerospikeClusterCompleted + - AerospikeClusterError + type: string podSpec: description: Additional configuration for create Aerospike pods. properties: @@ -17537,8 +17548,6 @@ spec: - skipWorkDirValidate - skipXdrDlogFileValidate type: object - required: - - pods type: object type: object served: true diff --git a/test/cluster_helper.go b/test/cluster_helper.go index 58b9834fa..cc138f397 100644 --- a/test/cluster_helper.go +++ b/test/cluster_helper.go @@ -759,7 +759,7 @@ func deployClusterWithTO( if err != nil { return err } - // Wait for aerocluster to reach desired cluster size. + // Wait for aerocluster to reach the desired cluster size. return waitForAerospikeCluster( k8sClient, ctx, aeroCluster, int(aeroCluster.Spec.Size), retryInterval, timeout, diff --git a/test/cluster_test.go b/test/cluster_test.go index d8b2cfe01..afd16d76e 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -243,7 +243,7 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { Namespace: clusterNamespacedName.Namespace}, pod) Expect(err).ToNot(HaveOccurred()) - pod.Spec.Containers[0].Image = "wrong-image" + pod.Spec.Containers[0].Image = wrongImage err = k8sClient.Update(ctx, pod) Expect(err).ToNot(HaveOccurred()) @@ -282,7 +282,7 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { Namespace: clusterNamespacedName.Namespace}, pod) Expect(err).ToNot(HaveOccurred()) - pod.Spec.Containers[0].Image = "wrong-image" + pod.Spec.Containers[0].Image = wrongImage err = k8sClient.Update(ctx, pod) Expect(err).ToNot(HaveOccurred()) diff --git a/test/k8snode_block_list_test.go b/test/k8snode_block_list_test.go index 56e540407..635df6c6d 100644 --- a/test/k8snode_block_list_test.go +++ b/test/k8snode_block_list_test.go @@ -1 +1,207 @@ package test + +import ( + "context" + "fmt" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + + asdbv1 "github.com/aerospike/aerospike-kubernetes-operator/api/v1" +) + +const ( + wrongImage = "wrong-image" +) + +var _ = Describe( + "K8sNodeBlockList", func() { + ctx := context.TODO() + Context( + "Migrate pods from K8s blocked nodes", func() { + clusterName := "k8s-node-block-cluster" + clusterNamespacedName := getNamespacedName(clusterName, namespace) + podName := clusterName + "-0-0" + aeroCluster := &asdbv1.AerospikeCluster{} + oldK8sNode := "" + oldPvcInfo := make(map[string]types.UID) + + var err error + + BeforeEach( + func() { + aeroCluster = createDummyAerospikeCluster( + clusterNamespacedName, 3, + ) + aeroCluster.Spec.PodSpec.MultiPodPerHost = false + err = deployCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + pod := &corev1.Pod{} + err = k8sClient.Get(ctx, getNamespacedName(podName, namespace), pod) + Expect(err).ToNot(HaveOccurred()) + oldK8sNode = pod.Spec.NodeName + oldPvcInfo, err = extractPodPVC(pod) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + AfterEach( + func() { + err = deleteCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes", func() { + By("Blocking the k8s node") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes along with rolling "+ + "restart", func() { + By("Blocking the k8s node and updating aerospike config") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = + defaultProtofdmax + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes along with upgrade", func() { + By("Blocking the k8s node and updating aerospike image") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + aeroCluster.Spec.Image = availableImage2 + + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + + It( + "Should migrate the pods from blocked nodes to other nodes and delete corresponding"+ + "local PVCs", func() { + By("Blocking the k8s node") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + aeroCluster.Spec.Storage.LocalStorageClasses = []string{storageClass} + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the pod is migrated to other nodes and pod local pvcs are deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, false) + }, + ) + + It( + "Should migrate the failed pods from blocked nodes to other nodes with maxIgnorablePod", func() { + By(fmt.Sprintf("Fail %s aerospike pod", podName)) + pod := &corev1.Pod{} + err := k8sClient.Get(ctx, types.NamespacedName{Name: podName, + Namespace: clusterNamespacedName.Namespace}, pod) + Expect(err).ToNot(HaveOccurred()) + + pod.Spec.Containers[0].Image = wrongImage + err = k8sClient.Update(ctx, pod) + Expect(err).ToNot(HaveOccurred()) + + By("Blocking the k8s node and setting maxIgnorablePod to 1") + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + maxIgnorablePods := intstr.FromInt(1) + aeroCluster.Spec.RackConfig.MaxIgnorablePods = &maxIgnorablePods + aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} + err = updateCluster(k8sClient, ctx, aeroCluster) + Expect(err).ToNot(HaveOccurred()) + + By("Verifying if the failed pod is migrated to other nodes and pod pvcs are not deleted") + validatePodAndPVCMigration(ctx, podName, oldK8sNode, oldPvcInfo, true) + }, + ) + }, + ) + }, +) + +func extractPodPVC(pod *corev1.Pod) (map[string]types.UID, error) { + pvcUIDMap := make(map[string]types.UID) + + for idx := range pod.Spec.Volumes { + if pod.Spec.Volumes[idx].PersistentVolumeClaim != nil { + pvcUIDMap[pod.Spec.Volumes[idx].PersistentVolumeClaim.ClaimName] = "" + } + } + + for p := range pvcUIDMap { + pvc := &corev1.PersistentVolumeClaim{} + if err := k8sClient.Get(context.TODO(), getNamespacedName(p, pod.Namespace), pvc); err != nil { + return nil, err + } + + pvcUIDMap[p] = pvc.UID + } + + return pvcUIDMap, nil +} + +func validatePVCDeletion(ctx context.Context, pvcUIDMap map[string]types.UID, shouldDelete bool) error { + pvc := &corev1.PersistentVolumeClaim{} + + for pvcName, pvcUID := range pvcUIDMap { + pvcNamespacesName := getNamespacedName( + pvcName, namespace, + ) + + if err := k8sClient.Get(ctx, pvcNamespacesName, pvc); err != nil { + return err + } + + if shouldDelete && pvc.UID != pvcUID { + return fmt.Errorf("PVC %s is unintentionally deleted", pvcName) + } + + if !shouldDelete && pvc.UID == pvcUID { + return fmt.Errorf("PVC %s is not deleted", pvcName) + } + } + + return nil +} + +func validatePodAndPVCMigration(ctx context.Context, podName, oldK8sNode string, + oldPvcInfo map[string]types.UID, shouldDelete bool) { + pod := &corev1.Pod{} + err := k8sClient.Get(ctx, getNamespacedName(podName, namespace), pod) + Expect(err).ToNot(HaveOccurred()) + Expect(pod.Spec.NodeName).ToNot(Equal(oldK8sNode)) + + err = validatePVCDeletion(ctx, oldPvcInfo, shouldDelete) + Expect(err).ToNot(HaveOccurred()) +} diff --git a/test/utils.go b/test/utils.go index 07b885ac0..feac18cbd 100644 --- a/test/utils.go +++ b/test/utils.go @@ -325,6 +325,11 @@ func isClusterStateValid( return false } + if newCluster.Status.Phase != asdbv1.AerospikeClusterCompleted { + pkgLog.Info("Cluster phase is not set to Completed") + return false + } + return true } From 08e941bc783160d18ceed945f962afd500be2538 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Thu, 21 Dec 2023 14:25:00 +0530 Subject: [PATCH 07/12] Added Error phase in CR status --- api/v1/aerospikecluster_types.go | 5 +- .../asdb.aerospike.com_aerospikeclusters.yaml | 6 +- controllers/reconciler.go | 88 ++++++++++++++----- ..._aerospikeclusters.asdb.aerospike.com.yaml | 6 +- 4 files changed, 74 insertions(+), 31 deletions(-) diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index d6649870a..5836e86f3 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -25,7 +25,7 @@ import ( lib "github.com/aerospike/aerospike-management-lib" ) -// +kubebuilder:validation:Enum=AerospikeClusterInProgress;AerospikeClusterCompleted;AerospikeClusterError +// +kubebuilder:validation:Enum=InProgress;Completed;Error type AerospikeClusterPhase string // These are the valid phases of Aerospike cluster. @@ -675,7 +675,8 @@ type AerospikeClusterStatus struct { //nolint:govet // for readability // This is map instead of the conventional map as list convention to allow each pod to patch update its own // status. The map key is the name of the pod. // +patchStrategy=strategic - Pods map[string]AerospikePodStatus `json:"pods,omitempty" patchStrategy:"strategic"` + // +optional + Pods map[string]AerospikePodStatus `json:"pods" patchStrategy:"strategic"` // Phase denotes the current phase of Aerospike cluster operation. Phase AerospikeClusterPhase `json:"phase,omitempty"` diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index d5b116add..afcf4c448 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -9019,9 +9019,9 @@ spec: description: Phase denotes the current phase of Aerospike cluster operation. enum: - - AerospikeClusterInProgress - - AerospikeClusterCompleted - - AerospikeClusterError + - InProgress + - Completed + - Error type: string podSpec: description: Additional configuration for create Aerospike pods. diff --git a/controllers/reconciler.go b/controllers/reconciler.go index b9da43092..53fd4a142 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -39,12 +39,24 @@ type SingleClusterReconciler struct { Log logr.Logger } -func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { +func (r *SingleClusterReconciler) Reconcile() (result ctrl.Result, recErr error) { r.Log.V(1).Info( "AerospikeCluster", "Spec", r.aeroCluster.Spec, "Status", r.aeroCluster.Status, ) + // Set the status phase to Error if the recError is not nil + // recErr is only set when reconcile failure should result in Error phase of the cluster + defer func() { + if recErr != nil { + r.Log.Error(recErr, "Reconcile failed") + + if err := r.setStatusPhase(asdbv1.AerospikeClusterError); err != nil { + recErr = err + } + } + }() + // Check DeletionTimestamp to see if the cluster is being deleted if !r.aeroCluster.ObjectMeta.DeletionTimestamp.IsZero() { r.Log.V(1).Info("Deleting AerospikeCluster") @@ -70,13 +82,8 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { } // Set the status to AerospikeClusterInProgress before starting any operations - if r.aeroCluster.Status.Phase != asdbv1.AerospikeClusterInProgress { - r.aeroCluster.Status.Phase = asdbv1.AerospikeClusterInProgress - - if err := r.Client.Status().Update(context.Background(), r.aeroCluster); err != nil { - r.Log.Error(err, "Failed to set cluster status to AerospikeClusterInProgress") - return reconcile.Result{}, err - } + if err := r.setStatusPhase(asdbv1.AerospikeClusterInProgress); err != nil { + return reconcile.Result{}, err } // The cluster is not being deleted, add finalizer if not added already @@ -107,7 +114,9 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { r.aeroCluster.Namespace, r.aeroCluster.Name, ) - return reconcile.Result{}, err + recErr = err + + return reconcile.Result{}, recErr } // Reconcile all racks @@ -118,9 +127,11 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { "Failed to reconcile Racks for cluster %s/%s", r.aeroCluster.Namespace, r.aeroCluster.Name, ) + + recErr = res.err } - return res.getResult() + return res.result, recErr } if err := r.createOrUpdatePDB(); err != nil { @@ -131,7 +142,9 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { r.aeroCluster.Namespace, r.aeroCluster.Name, ) - return reconcile.Result{}, err + recErr = err + + return reconcile.Result{}, recErr } if err := r.createOrUpdateSTSLoadBalancerSvc(); err != nil { @@ -142,7 +155,9 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { r.aeroCluster.Namespace, r.aeroCluster.Name, ) - return reconcile.Result{}, err + recErr = err + + return reconcile.Result{}, recErr } ignorablePodNames, err := r.getIgnorablePods(nil, getConfiguredRackStateList(r.aeroCluster)) @@ -165,16 +180,19 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { return reconcile.Result{}, e } - if err := deployment.InfoQuiesceUndo( + if err = deployment.InfoQuiesceUndo( r.Log, r.getClientPolicy(), allHostConns, ); err != nil { r.Log.Error(err, "Failed to check for Quiesced nodes") - return reconcile.Result{}, err + + recErr = err + + return reconcile.Result{}, recErr } // Setup access control. - if err := r.validateAndReconcileAccessControl(ignorablePodNames); err != nil { + if err = r.validateAndReconcileAccessControl(ignorablePodNames); err != nil { r.Log.Error(err, "Failed to Reconcile access control") r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeWarning, "ACLUpdateFailed", @@ -182,11 +200,13 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { r.aeroCluster.Name, ) - return reconcile.Result{}, err + recErr = err + + return reconcile.Result{}, recErr } // Update the AerospikeCluster status. - if err := r.updateAccessControlStatus(); err != nil { + if err = r.updateAccessControlStatus(); err != nil { r.Log.Error(err, "Failed to update AerospikeCluster access control status") r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeWarning, "StatusUpdateFailed", @@ -194,7 +214,9 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { r.aeroCluster.Namespace, r.aeroCluster.Name, ) - return reconcile.Result{}, err + recErr = err + + return reconcile.Result{}, recErr } // Use policy from spec after setting up access control @@ -208,25 +230,32 @@ func (r *SingleClusterReconciler) Reconcile() (ctrl.Result, error) { false, ignorablePodNames, ); !res.isSuccess { r.Log.Error(res.err, "Failed to revert migrate-fill-delay") - return reconcile.Result{}, res.err + + recErr = res.err + + return reconcile.Result{}, recErr } if asdbv1.IsClusterSCEnabled(r.aeroCluster) { if !r.IsStatusEmpty() { if res := r.waitForClusterStability(policy, allHostConns); !res.isSuccess { - return res.result, res.err + recErr = res.err + + return res.result, recErr } } // Setup roster - if err := r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, ignorablePodNames); err != nil { + if err = r.getAndSetRoster(policy, r.aeroCluster.Spec.RosterNodeBlockList, ignorablePodNames); err != nil { r.Log.Error(err, "Failed to set roster for cluster") - return reconcile.Result{}, err + recErr = err + + return reconcile.Result{}, recErr } } // Update the AerospikeCluster status. - if err := r.updateStatus(); err != nil { + if err = r.updateStatus(); err != nil { r.Log.Error(err, "Failed to update AerospikeCluster status") r.Recorder.Eventf( r.aeroCluster, corev1.EventTypeWarning, "StatusUpdateFailed", @@ -388,6 +417,19 @@ func (r *SingleClusterReconciler) updateStatus() error { return nil } +func (r *SingleClusterReconciler) setStatusPhase(phase asdbv1.AerospikeClusterPhase) error { + if r.aeroCluster.Status.Phase != phase { + r.aeroCluster.Status.Phase = phase + + if err := r.Client.Status().Update(context.Background(), r.aeroCluster); err != nil { + r.Log.Error(err, fmt.Sprintf("Failed to set cluster status to %s", phase)) + return err + } + } + + return nil +} + func (r *SingleClusterReconciler) updateAccessControlStatus() error { if r.aeroCluster.Spec.AerospikeAccessControl == nil { return nil diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index d5b116add..afcf4c448 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -9019,9 +9019,9 @@ spec: description: Phase denotes the current phase of Aerospike cluster operation. enum: - - AerospikeClusterInProgress - - AerospikeClusterCompleted - - AerospikeClusterError + - InProgress + - Completed + - Error type: string podSpec: description: Additional configuration for create Aerospike pods. From 7366d6d6940049acf10786e0b9d9fe4c176934fb Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Fri, 29 Dec 2023 11:39:45 +0530 Subject: [PATCH 08/12] Incorporate review comments --- controllers/poddistruptionbudget.go | 6 ++++-- controllers/pvc.go | 5 ++++- controllers/rack.go | 3 ++- controllers/reconciler.go | 2 +- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/controllers/poddistruptionbudget.go b/controllers/poddistruptionbudget.go index bb422ddb8..d4ad4d291 100644 --- a/controllers/poddistruptionbudget.go +++ b/controllers/poddistruptionbudget.go @@ -75,7 +75,8 @@ func (r *SingleClusterReconciler) createOrUpdatePDB() error { ) } - r.Log.Info("Created new PodDisruptionBudget") + r.Log.Info("Created new PodDisruptionBudget", "name", + utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name)) return nil } @@ -97,7 +98,8 @@ func (r *SingleClusterReconciler) createOrUpdatePDB() error { ) } - r.Log.Info("Updated PodDisruptionBudget") + r.Log.Info("Updated PodDisruptionBudget", "name", + utils.NamespacedName(r.aeroCluster.Namespace, r.aeroCluster.Name)) } return nil diff --git a/controllers/pvc.go b/controllers/pvc.go index 7777b0d2e..7a316e617 100644 --- a/controllers/pvc.go +++ b/controllers/pvc.go @@ -6,6 +6,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "sigs.k8s.io/controller-runtime/pkg/client" @@ -103,6 +104,8 @@ func (r *SingleClusterReconciler) removePVCsAsync( return deletedPVCs, nil } +// deleteLocalPVCs deletes PVCs which are created using local storage classes +// It considers the user given LocalStorageClasses list from spec to determine if a PVC is local or not. func (r *SingleClusterReconciler) deleteLocalPVCs(rackState *RackState, pod *corev1.Pod) error { pvcItems, err := r.getPodsPVCList([]string{pod.Name}, rackState.Rack.ID) if err != nil { @@ -118,7 +121,7 @@ func (r *SingleClusterReconciler) deleteLocalPVCs(rackState *RackState, pod *cor } if utils.ContainsString(rackState.Rack.Storage.LocalStorageClasses, *pvcStorageClass) { - if err := r.Client.Delete(context.TODO(), &pvcItems[idx]); err != nil { + if err := r.Client.Delete(context.TODO(), &pvcItems[idx]); err != nil && !errors.IsNotFound(err) { return fmt.Errorf( "could not delete pvc %s: %v", pvcItems[idx].Name, err, ) diff --git a/controllers/rack.go b/controllers/rack.go index d90085617..55dab77f3 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -429,7 +429,8 @@ func (r *SingleClusterReconciler) upgradeOrRollingRestartRack(found *appsv1.Stat } } - if r.aeroCluster.Spec.K8sNodeBlockList != nil { + // handle k8sNodeBlockList pods only if it is changed + if !reflect.DeepEqual(r.aeroCluster.Spec.K8sNodeBlockList, r.aeroCluster.Status.K8sNodeBlockList) { found, res = r.handleK8sNodeBlockListPods(found, rackState, ignorablePodNames, failedPods) if !res.isSuccess { return found, res diff --git a/controllers/reconciler.go b/controllers/reconciler.go index 53fd4a142..bef0c522b 100644 --- a/controllers/reconciler.go +++ b/controllers/reconciler.go @@ -45,7 +45,7 @@ func (r *SingleClusterReconciler) Reconcile() (result ctrl.Result, recErr error) r.aeroCluster.Status, ) - // Set the status phase to Error if the recError is not nil + // Set the status phase to Error if the recErr is not nil // recErr is only set when reconcile failure should result in Error phase of the cluster defer func() { if recErr != nil { From a7659bd19defc8cc6f52f1e63dd1dc3938087876 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Mon, 12 Feb 2024 19:26:57 +0530 Subject: [PATCH 09/12] Incorporate review comments --- api/v1/aerospikecluster_mutating_webhook.go | 2 +- api/v1/aerospikecluster_types.go | 21 ++++++++++++++----- api/v1/aerospikecluster_validating_webhook.go | 17 ++++++++------- .../asdb.aerospike.com_aerospikeclusters.yaml | 6 +++++- controllers/poddistruptionbudget.go | 4 ++-- controllers/rack.go | 6 ++---- controllers/statefulset.go | 2 +- ..._aerospikeclusters.asdb.aerospike.com.yaml | 6 +++++- test/k8snode_block_list_test.go | 12 ++++++++++- 9 files changed, 52 insertions(+), 24 deletions(-) diff --git a/api/v1/aerospikecluster_mutating_webhook.go b/api/v1/aerospikecluster_mutating_webhook.go index 41e79ebf4..085c7baec 100644 --- a/api/v1/aerospikecluster_mutating_webhook.go +++ b/api/v1/aerospikecluster_mutating_webhook.go @@ -511,7 +511,7 @@ func setDefaultNetworkConf( ) } // Override these sections - // TODO: These values lines will be replaced with runtime info by script in init-container + // TODO: These values lines will be replaced with runtime info by akoinit binary in init-container // See if we can get better way to make template serviceDefaults := map[string]interface{}{} srvPort := GetServicePort(configSpec) diff --git a/api/v1/aerospikecluster_types.go b/api/v1/aerospikecluster_types.go index 5836e86f3..93e995ddf 100644 --- a/api/v1/aerospikecluster_types.go +++ b/api/v1/aerospikecluster_types.go @@ -30,13 +30,20 @@ type AerospikeClusterPhase string // These are the valid phases of Aerospike cluster. const ( - // AerospikeClusterInProgress means the Aerospike cluster operations are in-progress state. + // AerospikeClusterInProgress means the Aerospike cluster CR is being reconciled and operations are in-progress state. // This phase denotes that changes are gradually rolling out to the cluster. + // For example, when the Aerospike server version is upgraded in CR, then InProgress phase is set until the upgrade + // is completed. AerospikeClusterInProgress AerospikeClusterPhase = "InProgress" - // AerospikeClusterCompleted means the Aerospike cluster has been deployed/upgraded successfully and is ready to use. + // AerospikeClusterCompleted means the Aerospike cluster CR has been reconciled. This phase denotes that the cluster + // has been deployed/upgraded successfully and is ready to use. + // For example, when the Aerospike server version is upgraded in CR, then Completed phase is set after the upgrade is + // completed. AerospikeClusterCompleted AerospikeClusterPhase = "Completed" - // AerospikeClusterError means the Aerospike cluster is in error state because of some reason like misconfiguration, - // infra issues, etc. + // AerospikeClusterError means the Aerospike cluster operation is in error state because of some reason like + // misconfiguration, infra issues, etc. + // For example, when the Aerospike server version is upgraded in CR, then Error phase is set if the upgrade fails + // due to the wrong image issue, etc. AerospikeClusterError AerospikeClusterPhase = "Error" ) @@ -54,6 +61,7 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability Size int32 `json:"size"` // MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application // disruption. This value is used to create PodDisruptionBudget. Defaults to 1. + // Refer Aerospike documentation for more details. // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Max Unavailable" MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` // Aerospike server image @@ -93,8 +101,11 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability // RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Roster Node BlockList" RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"` - // K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. + // K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods. Pods are not scheduled on + // these nodes. Pods are migrated from these nodes if already present. This is useful for the maintenance of + // Kubernetes nodes. // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList" + // +kubebuilder:validation:MinItems:=1 K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"` } diff --git a/api/v1/aerospikecluster_validating_webhook.go b/api/v1/aerospikecluster_validating_webhook.go index f9b89bf78..cf6c2fdf8 100644 --- a/api/v1/aerospikecluster_validating_webhook.go +++ b/api/v1/aerospikecluster_validating_webhook.go @@ -2187,10 +2187,10 @@ func (c *AerospikeCluster) validateMaxUnavailable() error { return err } - maxUnavailable := int(c.Spec.Size) + safeMaxUnavailable := int(c.Spec.Size) // If Size is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss - if maxUnavailable == 1 { + if safeMaxUnavailable == 1 { return nil } @@ -2202,7 +2202,7 @@ func (c *AerospikeCluster) validateMaxUnavailable() error { rfInterface, exists := nsInterface.(map[string]interface{})["replication-factor"] if !exists { // Default RF is 2 if not given - maxUnavailable = 2 + safeMaxUnavailable = 2 continue } @@ -2216,15 +2216,16 @@ func (c *AerospikeCluster) validateMaxUnavailable() error { continue } - if rf < maxUnavailable { - maxUnavailable = rf + if rf < safeMaxUnavailable { + safeMaxUnavailable = rf } } } - if c.Spec.MaxUnavailable.IntValue() >= maxUnavailable { - return fmt.Errorf("maxUnavailable %s cannot be greater than or equal to %v", - c.Spec.MaxUnavailable.String(), maxUnavailable) + if c.Spec.MaxUnavailable.IntValue() >= safeMaxUnavailable { + return fmt.Errorf("maxUnavailable %s cannot be greater than or equal to %v as it may result in "+ + "data loss. Set it to a lower value", + c.Spec.MaxUnavailable.String(), safeMaxUnavailable) } return nil diff --git a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml index 6f79690b8..2934b5a39 100644 --- a/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml +++ b/config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml @@ -285,9 +285,12 @@ spec: type: string k8sNodeBlockList: description: K8sNodeBlockList is a list of Kubernetes nodes which - are not used for Aerospike pods. + are not used for Aerospike pods. Pods are not scheduled on these + nodes. Pods are migrated from these nodes if already present. This + is useful for the maintenance of Kubernetes nodes. items: type: string + minItems: 1 type: array maxUnavailable: anyOf: @@ -296,6 +299,7 @@ spec: description: MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application disruption. This value is used to create PodDisruptionBudget. Defaults to 1. + Refer Aerospike documentation for more details. x-kubernetes-int-or-string: true operatorClientCert: description: Certificates to connect to Aerospike. diff --git a/controllers/poddistruptionbudget.go b/controllers/poddistruptionbudget.go index d4ad4d291..4d0bb2065 100644 --- a/controllers/poddistruptionbudget.go +++ b/controllers/poddistruptionbudget.go @@ -29,8 +29,8 @@ func (r *SingleClusterReconciler) createOrUpdatePDB() error { } if pod.Spec.Containers[containerIdx].ReadinessProbe == nil { - r.Log.Info("Pod found without ReadinessProbe, skipping PodDisruptionBudget", - "name", pod.Name) + r.Log.Info("Pod found without ReadinessProbe, skipping PodDisruptionBudget. Refer Aerospike "+ + "documentation for more details.", "name", pod.Name) return nil } } diff --git a/controllers/rack.go b/controllers/rack.go index 55dab77f3..fae60b782 100644 --- a/controllers/rack.go +++ b/controllers/rack.go @@ -1078,7 +1078,7 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 ) (*appsv1.StatefulSet, reconcileResult) { if err := r.updateSTS(statefulSet, rackState); err != nil { return statefulSet, reconcileError( - fmt.Errorf("upgrade rack : %v", err), + fmt.Errorf("k8s node block list processing failed: %v", err), ) } @@ -1112,9 +1112,7 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1 podsToRestart = append(podsToRestart, pod) - restartTypeMap = map[string]RestartType{ - pod.Name: podRestart, - } + restartTypeMap[pod.Name] = podRestart } } diff --git a/controllers/statefulset.go b/controllers/statefulset.go index aee87afa1..f726fe49b 100644 --- a/controllers/statefulset.go +++ b/controllers/statefulset.go @@ -925,7 +925,7 @@ func (r *SingleClusterReconciler) updateSTSSchedulingPolicy( ) } - if r.aeroCluster.Spec.K8sNodeBlockList != nil { + if len(r.aeroCluster.Spec.K8sNodeBlockList) > 0 { matchExpressions = append( matchExpressions, corev1.NodeSelectorRequirement{ Key: "kubernetes.io/hostname", diff --git a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml index 6f79690b8..2934b5a39 100644 --- a/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml +++ b/helm-charts/aerospike-kubernetes-operator/crds/customresourcedefinition_aerospikeclusters.asdb.aerospike.com.yaml @@ -285,9 +285,12 @@ spec: type: string k8sNodeBlockList: description: K8sNodeBlockList is a list of Kubernetes nodes which - are not used for Aerospike pods. + are not used for Aerospike pods. Pods are not scheduled on these + nodes. Pods are migrated from these nodes if already present. This + is useful for the maintenance of Kubernetes nodes. items: type: string + minItems: 1 type: array maxUnavailable: anyOf: @@ -296,6 +299,7 @@ spec: description: MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application disruption. This value is used to create PodDisruptionBudget. Defaults to 1. + Refer Aerospike documentation for more details. x-kubernetes-int-or-string: true operatorClientCert: description: Certificates to connect to Aerospike. diff --git a/test/k8snode_block_list_test.go b/test/k8snode_block_list_test.go index 635df6c6d..502eab962 100644 --- a/test/k8snode_block_list_test.go +++ b/test/k8snode_block_list_test.go @@ -24,7 +24,7 @@ var _ = Describe( "Migrate pods from K8s blocked nodes", func() { clusterName := "k8s-node-block-cluster" clusterNamespacedName := getNamespacedName(clusterName, namespace) - podName := clusterName + "-0-0" + podName := clusterName + "-1-0" aeroCluster := &asdbv1.AerospikeCluster{} oldK8sNode := "" oldPvcInfo := make(map[string]types.UID) @@ -36,6 +36,16 @@ var _ = Describe( aeroCluster = createDummyAerospikeCluster( clusterNamespacedName, 3, ) + + batchSize := intstr.FromString("100%") + rackConf := asdbv1.RackConfig{ + Racks: getDummyRackConf(1, 2), + RollingUpdateBatchSize: &batchSize, + Namespaces: []string{"test"}, + } + + aeroCluster.Spec.RackConfig = rackConf + aeroCluster.Spec.PodSpec.MultiPodPerHost = false err = deployCluster(k8sClient, ctx, aeroCluster) Expect(err).ToNot(HaveOccurred()) From bba00322f145f84d2efdbbc705dc8cbf059e360e Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Wed, 14 Feb 2024 20:51:20 +0530 Subject: [PATCH 10/12] Fixed test-cases --- test/cluster_test.go | 20 +++++++++++++------- test/large_reconcile_test.go | 18 ++++++++++-------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/test/cluster_test.go b/test/cluster_test.go index cd2badd6e..f28ec7709 100644 --- a/test/cluster_test.go +++ b/test/cluster_test.go @@ -173,13 +173,19 @@ func clusterWithMaxIgnorablePod(ctx goctx.Context) { It( "Should allow cluster operations with pending pod", func() { By("Set MaxIgnorablePod and Rolling restart cluster") - aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) - Expect(err).ToNot(HaveOccurred()) - val := intstr.FromInt(1) - aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val - aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = int64(18000) - err = updateCluster(k8sClient, ctx, aeroCluster) - Expect(err).ToNot(HaveOccurred()) + + // As pod is in pending state, CR object will be updated continuously + // This is put in eventually to retry Object Conflict error + Eventually(func() error { + aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) + Expect(err).ToNot(HaveOccurred()) + val := intstr.FromInt(1) + aeroCluster.Spec.RackConfig.MaxIgnorablePods = &val + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = + int64(18000) + + return updateCluster(k8sClient, ctx, aeroCluster) + }, 1*time.Minute).ShouldNot(HaveOccurred()) By("Upgrade version") aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) diff --git a/test/large_reconcile_test.go b/test/large_reconcile_test.go index 2944e7f44..c881a3acb 100644 --- a/test/large_reconcile_test.go +++ b/test/large_reconcile_test.go @@ -73,15 +73,17 @@ var _ = Describe( err = k8sClient.Update(goctx.TODO(), aeroCluster) Expect(err).ToNot(HaveOccurred()) - // Change size to 4 immediately - aeroCluster, err = getCluster( - k8sClient, ctx, clusterNamespacedName, - ) - Expect(err).ToNot(HaveOccurred()) + // This is put in eventually to retry Object Conflict error and change size to 4 immediately + Eventually(func() error { + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) - aeroCluster.Spec.Size = 4 - err = k8sClient.Update(goctx.TODO(), aeroCluster) - Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.Size = 4 + + return k8sClient.Update(goctx.TODO(), aeroCluster) + }, 1*time.Minute).ShouldNot(HaveOccurred()) // Cluster size should never go below 4, // as only one node is removed at a time and before reducing 2nd node, we changed the size to 4 From 1690b28aa0a8cb05caa1104d920a0f4b5449ebc6 Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Tue, 20 Feb 2024 13:31:07 +0530 Subject: [PATCH 11/12] Fixed test-cases --- test/large_reconcile_test.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/test/large_reconcile_test.go b/test/large_reconcile_test.go index c881a3acb..26d731455 100644 --- a/test/large_reconcile_test.go +++ b/test/large_reconcile_test.go @@ -144,14 +144,16 @@ var _ = Describe( Expect(err).ToNot(HaveOccurred()) // Change build back to original - aeroCluster, err = getCluster( - k8sClient, ctx, clusterNamespacedName, - ) - Expect(err).ToNot(HaveOccurred()) - err = UpdateClusterImage(aeroCluster, latestImage) - Expect(err).ToNot(HaveOccurred()) - err = k8sClient.Update(goctx.TODO(), aeroCluster) - Expect(err).ToNot(HaveOccurred()) + Eventually(func() error { + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) + + err = UpdateClusterImage(aeroCluster, latestImage) + Expect(err).ToNot(HaveOccurred()) + return k8sClient.Update(goctx.TODO(), aeroCluster) + }, 1*time.Minute).ShouldNot(HaveOccurred()) // Only 1 pod need upgrade err = waitForClusterUpgrade( From b8397bca09420073e1526cb1445b82068cf0480f Mon Sep 17 00:00:00 2001 From: Abhisek Dwivedi Date: Wed, 28 Feb 2024 12:57:36 +0530 Subject: [PATCH 12/12] Fixed test-cases --- test/k8snode_block_list_test.go | 24 ++++++++++++++++++++---- test/large_reconcile_test.go | 16 +++++++++------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/test/k8snode_block_list_test.go b/test/k8snode_block_list_test.go index 502eab962..9580576cb 100644 --- a/test/k8snode_block_list_test.go +++ b/test/k8snode_block_list_test.go @@ -24,12 +24,15 @@ var _ = Describe( "Migrate pods from K8s blocked nodes", func() { clusterName := "k8s-node-block-cluster" clusterNamespacedName := getNamespacedName(clusterName, namespace) - podName := clusterName + "-1-0" + podName := clusterName + "-2-0" aeroCluster := &asdbv1.AerospikeCluster{} oldK8sNode := "" oldPvcInfo := make(map[string]types.UID) - var err error + var ( + err error + zones []string + ) BeforeEach( func() { @@ -37,9 +40,22 @@ var _ = Describe( clusterNamespacedName, 3, ) + // Zones are set to distribute the pods across different zone nodes. + zones, err = getZones(ctx, k8sClient) + Expect(err).ToNot(HaveOccurred()) + + zone1 := zones[0] + zone2 := zones[0] + if len(zones) > 1 { + zone2 = zones[1] + } + batchSize := intstr.FromString("100%") rackConf := asdbv1.RackConfig{ - Racks: getDummyRackConf(1, 2), + Racks: []asdbv1.Rack{ + {ID: 1, Zone: zone1}, + {ID: 2, Zone: zone2}, + }, RollingUpdateBatchSize: &batchSize, Namespaces: []string{"test"}, } @@ -145,7 +161,7 @@ var _ = Describe( By("Blocking the k8s node and setting maxIgnorablePod to 1") aeroCluster, err = getCluster(k8sClient, ctx, clusterNamespacedName) Expect(err).ToNot(HaveOccurred()) - maxIgnorablePods := intstr.FromInt(1) + maxIgnorablePods := intstr.FromInt32(1) aeroCluster.Spec.RackConfig.MaxIgnorablePods = &maxIgnorablePods aeroCluster.Spec.K8sNodeBlockList = []string{oldK8sNode} err = updateCluster(k8sClient, ctx, aeroCluster) diff --git a/test/large_reconcile_test.go b/test/large_reconcile_test.go index 26d731455..6e59732b0 100644 --- a/test/large_reconcile_test.go +++ b/test/large_reconcile_test.go @@ -111,14 +111,16 @@ var _ = Describe( Expect(err).ToNot(HaveOccurred()) // Change config back to original value - aeroCluster, err = getCluster( - k8sClient, ctx, clusterNamespacedName, - ) - Expect(err).ToNot(HaveOccurred()) + Eventually(func() error { + aeroCluster, err = getCluster( + k8sClient, ctx, clusterNamespacedName, + ) + Expect(err).ToNot(HaveOccurred()) - aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = defaultProtofdmax - err = k8sClient.Update(goctx.TODO(), aeroCluster) - Expect(err).ToNot(HaveOccurred()) + aeroCluster.Spec.AerospikeConfig.Value["service"].(map[string]interface{})["proto-fd-max"] = defaultProtofdmax + + return k8sClient.Update(goctx.TODO(), aeroCluster) + }, 1*time.Minute).ShouldNot(HaveOccurred()) // Cluster status should never get updated with old conf "tempConf" err = waitForClusterRollingRestart(