Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K0-314: Added support for batch scale-down #279

Merged
merged 8 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ type RackConfig struct { //nolint:govet // for readability
// RollingUpdateBatchSize is the percentage/number of rack pods that will be restarted simultaneously
// +optional
RollingUpdateBatchSize *intstr.IntOrString `json:"rollingUpdateBatchSize,omitempty"`
// ScaleDownBatchSize is the percentage/number of rack pods that will be scaled down simultaneously
sud82 marked this conversation as resolved.
Show resolved Hide resolved
// +optional
ScaleDownBatchSize *intstr.IntOrString `json:"scaleDownBatchSize,omitempty"`
// MaxIgnorablePods is the maximum number/percentage of pending/failed pods in a rack that are ignored while
// assessing cluster stability. Pods identified using this value are not considered part of the cluster.
// Additionally, in SC mode clusters, these pods are removed from the roster.
Expand Down
100 changes: 66 additions & 34 deletions api/v1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,38 +602,15 @@ func (c *AerospikeCluster) validateRackConfig(_ logr.Logger) error {
}

// Validate batch upgrade/restart param
if c.Spec.RackConfig.RollingUpdateBatchSize != nil {
if err := validateIntOrStringField(c.Spec.RackConfig.RollingUpdateBatchSize,
"spec.rackConfig.rollingUpdateBatchSize"); err != nil {
return err
}

if len(c.Spec.RackConfig.Racks) < 2 {
return fmt.Errorf("can not use rackConfig.RollingUpdateBatchSize when number of racks is less than two")
}

nsConfsNamespaces := c.getNsConfsForNamespaces()
for ns, nsConf := range nsConfsNamespaces {
if !isNameExist(c.Spec.RackConfig.Namespaces, ns) {
return fmt.Errorf(
"can not use rackConfig.RollingUpdateBatchSize when there is any non-rack enabled namespace %s", ns,
)
}

if nsConf.noOfRacksForNamespaces <= 1 {
return fmt.Errorf(
"can not use rackConfig.RollingUpdateBatchSize when namespace `%s` is configured in only one rack",
ns,
)
}
if err := c.validateBatchSize(c.Spec.RackConfig.RollingUpdateBatchSize,
"spec.rackConfig.rollingUpdateBatchSize"); err != nil {
return err
}

if nsConf.replicationFactor <= 1 {
return fmt.Errorf(
"can not use rackConfig.RollingUpdateBatchSize when namespace `%s` is configured with replication-factor 1",
ns,
)
}
}
// Validate batch scaleDown param
if err := c.validateBatchSize(c.Spec.RackConfig.ScaleDownBatchSize,
"spec.rackConfig.scaleDownBatchSize"); err != nil {
return err
}

// Validate MaxIgnorablePods param
Expand All @@ -652,11 +629,11 @@ type nsConf struct {
replicationFactor int
}

func (c *AerospikeCluster) getNsConfsForNamespaces() map[string]nsConf {
func getNsConfForNamespaces(rackConfig RackConfig) map[string]nsConf {
nsConfs := map[string]nsConf{}

for idx := range c.Spec.RackConfig.Racks {
rack := &c.Spec.RackConfig.Racks[idx]
for idx := range rackConfig.Racks {
rack := &rackConfig.Racks[idx]
nsList := rack.AerospikeConfig.Value["namespaces"].([]interface{})

for _, nsInterface := range nsList {
Expand Down Expand Up @@ -2173,6 +2150,61 @@ func (c *AerospikeCluster) validateNetworkPolicy(namespace string) error {
return nil
}

func (c *AerospikeCluster) validateBatchSize(batchSize *intstr.IntOrString, fieldPath string) error {
if batchSize == nil {
return nil
}

if err := validateIntOrStringField(batchSize, fieldPath); err != nil {
return err
}

validateRacksForBatchSize := func(rackConfig RackConfig) error {
if len(rackConfig.Racks) < 2 {
return fmt.Errorf("can not use %s when number of racks is less than two", fieldPath)
}

nsConfsNamespaces := getNsConfForNamespaces(rackConfig)
for ns, nsConf := range nsConfsNamespaces {
if !isNameExist(rackConfig.Namespaces, ns) {
return fmt.Errorf(
"can not use %s when there is any non-rack enabled namespace %s", fieldPath, ns,
)
}

if nsConf.noOfRacksForNamespaces <= 1 {
return fmt.Errorf(
"can not use %s when namespace `%s` is configured in only one rack", fieldPath, ns,
)
}

if nsConf.replicationFactor <= 1 {
return fmt.Errorf(
"can not use %s when namespace `%s` is configured with replication-factor 1", fieldPath,
ns,
)
}
}

return nil
}

// validate rackConf from spec
if err := validateRacksForBatchSize(c.Spec.RackConfig); err != nil {
return err
}

// If the status is not nil, validate rackConf from status to restrict batch-size update
// when old rackConfig is not valid for batch-size
if c.Status.AerospikeConfig != nil {
if err := validateRacksForBatchSize(c.Status.RackConfig); err != nil {
return fmt.Errorf("status invalid for %s: update, %v", fieldPath, err)
}
}

return nil
}

func validateIntOrStringField(value *intstr.IntOrString, fieldPath string) error {
randomNumber := 100
// Just validate if value is valid number or string.
Expand Down
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions config/crd/bases/asdb.aerospike.com_aerospikeclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8686,6 +8686,13 @@ spec:
description: RollingUpdateBatchSize is the percentage/number of
rack pods that will be restarted simultaneously
x-kubernetes-int-or-string: true
scaleDownBatchSize:
anyOf:
- type: integer
- type: string
description: ScaleDownBatchSize is the percentage/number of rack
pods that will be scaled down simultaneously
x-kubernetes-int-or-string: true
type: object
rosterNodeBlockList:
description: RosterNodeBlockList is a list of blocked nodeIDs from
Expand Down Expand Up @@ -18134,6 +18141,13 @@ spec:
description: RollingUpdateBatchSize is the percentage/number of
rack pods that will be restarted simultaneously
x-kubernetes-int-or-string: true
scaleDownBatchSize:
anyOf:
- type: integer
- type: string
description: ScaleDownBatchSize is the percentage/number of rack
pods that will be scaled down simultaneously
x-kubernetes-int-or-string: true
type: object
resources:
description: 'Define resources requests and limits for Aerospike Server
Expand Down
4 changes: 2 additions & 2 deletions controllers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,10 +893,10 @@ func (r *SingleClusterReconciler) getClusterPodList() (
return podList, nil
}

func (r *SingleClusterReconciler) isAnyPodInImageFailedState(podList []corev1.Pod, ignorablePodNames sets.Set[string],
func (r *SingleClusterReconciler) isAnyPodInImageFailedState(podList []*corev1.Pod, ignorablePodNames sets.Set[string],
) bool {
for idx := range podList {
pod := &podList[idx]
pod := podList[idx]
if ignorablePodNames.Has(pod.Name) {
continue
}
Expand Down
94 changes: 56 additions & 38 deletions controllers/rack.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,12 +670,12 @@ func (r *SingleClusterReconciler) scaleUpRack(

// No need for this? But if image is bad then new pod will also come up
// with bad node.
podList, err := r.getRackPodList(rackState.Rack.ID)
podList, err := r.getOrderedRackPodList(rackState.Rack.ID)
if err != nil {
return found, reconcileError(fmt.Errorf("failed to list pods: %v", err))
}

if r.isAnyPodInImageFailedState(podList.Items, ignorablePodNames) {
if r.isAnyPodInImageFailedState(podList, ignorablePodNames) {
return found, reconcileError(fmt.Errorf("cannot scale up AerospikeCluster. A pod is already in failed state"))
}

Expand All @@ -686,8 +686,8 @@ func (r *SingleClusterReconciler) scaleUpRack(

// Ensure none of the to be launched pods are active.
for _, newPodName := range newPodNames {
for idx := range podList.Items {
if podList.Items[idx].Name == newPodName {
for idx := range podList {
if podList[idx].Name == newPodName {
return found, reconcileError(
fmt.Errorf(
"pod %s yet to be launched is still present",
Expand Down Expand Up @@ -805,7 +805,7 @@ func (r *SingleClusterReconciler) upgradeRack(statefulSet *appsv1.StatefulSet, r
podsBatchList[0] = podsToUpgrade
} else {
// Create batch of pods
podsBatchList = r.getPodsBatchToRestart(podsToUpgrade, len(podList))
podsBatchList = r.getPodBatch(podsToUpgrade, len(podList))
sud82 marked this conversation as resolved.
Show resolved Hide resolved
}

if len(podsBatchList) > 0 {
Expand Down Expand Up @@ -871,8 +871,8 @@ func (r *SingleClusterReconciler) scaleDownRack(
}

r.Log.Info(
"ScaleDown AerospikeCluster statefulset", "desiredSz", desiredSize,
"currentSz", *found.Spec.Replicas,
"ScaleDown AerospikeCluster statefulset", "desiredSize", desiredSize,
"currentSize", *found.Spec.Replicas,
)
r.Recorder.Eventf(
r.aeroCluster, corev1.EventTypeNormal, "RackScaleDown",
Expand All @@ -881,32 +881,53 @@ func (r *SingleClusterReconciler) scaleDownRack(
desiredSize,
)

oldPodList, err := r.getRackPodList(rackState.Rack.ID)
oldPodList, err := r.getOrderedRackPodList(rackState.Rack.ID)
if err != nil {
return found, reconcileError(fmt.Errorf("failed to list pods: %v", err))
}

if r.isAnyPodInImageFailedState(oldPodList.Items, ignorablePodNames) {
if r.isAnyPodInImageFailedState(oldPodList, ignorablePodNames) {
return found, reconcileError(fmt.Errorf("cannot scale down AerospikeCluster. A pod is already in failed state"))
}

// code flow will reach this stage only when found.Spec.Replicas > desiredSize

// maintain list of removed pods. It will be used for alumni-reset and tip-clear
var pod *corev1.Pod
// Code flow will reach this stage only when found.Spec.Replicas > desiredSize
// Maintain a list of removed pods. It will be used for alumni-reset and tip-clear

policy := r.getClientPolicy()
diffPods := *found.Spec.Replicas - desiredSize

podsBatchList := r.getPodBatch(oldPodList[:diffPods], len(oldPodList))
sud82 marked this conversation as resolved.
Show resolved Hide resolved

// Handle one batch
podsBatch := podsBatchList[0]

r.Log.Info(
"Calculated batch for Pod scale-down",
"rackPodList", getPodNames(oldPodList),
"podsBatch", getPodNames(podsBatch),
"scaleDownBatchSize", r.aeroCluster.Spec.RackConfig.ScaleDownBatchSize,
)

var (
runningPods []*corev1.Pod
isAnyPodRunningAndReady bool
)

podName := getSTSPodName(found.Name, *found.Spec.Replicas-1)
for idx := range podsBatch {
if utils.IsPodRunningAndReady(podsBatch[idx]) {
runningPods = append(runningPods, podsBatch[idx])
isAnyPodRunningAndReady = true

pod = utils.GetPod(podName, oldPodList.Items)
continue
}

isPodRunningAndReady := utils.IsPodRunningAndReady(pod)
ignorablePodNames.Insert(podsBatch[idx].Name)
}

// Ignore safe stop check if pod is not running.
// Ignore safe stop check if all pods in the batch are not running.
// Ignore migrate-fill-delay if pod is not running. Deleting this pod will not lead to any migration.
if isPodRunningAndReady {
if res := r.waitForMultipleNodesSafeStopReady([]*corev1.Pod{pod}, ignorablePodNames); !res.isSuccess {
if isAnyPodRunningAndReady {
if res := r.waitForMultipleNodesSafeStopReady(runningPods, ignorablePodNames); !res.isSuccess {
// The pod is running and is unsafe to terminate.
return found, res
}
Expand All @@ -916,15 +937,14 @@ func (r *SingleClusterReconciler) scaleDownRack(
// This check ensures that migrate-fill-delay is not set while processing failed racks.
// setting migrate-fill-delay will fail if there are any failed pod
if res := r.setMigrateFillDelay(
policy, &rackState.Rack.AerospikeConfig, true,
ignorablePodNames.Insert(pod.Name),
policy, &rackState.Rack.AerospikeConfig, true, ignorablePodNames,
); !res.isSuccess {
return found, res
}
}

// Update new object with new size
newSize := *found.Spec.Replicas - 1
newSize := *found.Spec.Replicas - int32(len(podsBatch))
found.Spec.Replicas = &newSize

if err = r.Client.Update(
Expand All @@ -938,9 +958,10 @@ func (r *SingleClusterReconciler) scaleDownRack(
)
}

// No need for these checks if pod was not running.
// These checks will fail if there is any other pod in failed state.
if isPodRunningAndReady {
// Consider these checks if any pod in the batch is running and ready.
// If all the pods are not running then we can safely ignore these checks.
// These checks will fail if there is any other pod in failed state outside the batch.
if isAnyPodRunningAndReady {
// Wait for pods to get terminated
if err = r.waitForSTSToBeReady(found, ignorablePodNames); err != nil {
r.Log.Error(err, "Failed to wait for statefulset to be ready")
Expand Down Expand Up @@ -989,18 +1010,20 @@ func (r *SingleClusterReconciler) scaleDownRack(

found = nFound

if err := r.cleanupPods([]string{podName}, rackState); err != nil {
podNames := getPodNames(podsBatch)

if err := r.cleanupPods(podNames, rackState); err != nil {
return nFound, reconcileError(
fmt.Errorf(
"failed to cleanup pod %s: %v", podName, err,
"failed to cleanup pod %s: %v", podNames, err,
),
)
}

r.Log.Info("Pod Removed", "podName", podName)
r.Log.Info("Pod Removed", "podNames", podNames)
r.Recorder.Eventf(
r.aeroCluster, corev1.EventTypeNormal, "PodDeleted",
"[rack-%d] Deleted Pod %s", rackState.Rack.ID, pod.Name,
"[rack-%d] Deleted Pods %s", rackState.Rack.ID, podNames,
)

r.Recorder.Eventf(
Expand Down Expand Up @@ -1043,12 +1066,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
}
}

pods := make([]corev1.Pod, 0, len(podList))
for idx := range podList {
pods = append(pods, *podList[idx])
}

if len(failedPods) != 0 && r.isAnyPodInImageFailedState(pods, ignorablePodNames) {
if len(failedPods) != 0 && r.isAnyPodInImageFailedState(podList, ignorablePodNames) {
return found, reconcileError(
fmt.Errorf(
"cannot Rolling restart AerospikeCluster. " +
Expand Down Expand Up @@ -1098,7 +1116,7 @@ func (r *SingleClusterReconciler) rollingRestartRack(found *appsv1.StatefulSet,
podsBatchList[0] = podsToRestart
} else {
// Create batch of pods
podsBatchList = r.getPodsBatchToRestart(podsToRestart, len(podList))
podsBatchList = r.getPodBatch(podsToRestart, len(podList))
}

// Restart batch of pods
Expand Down Expand Up @@ -1187,7 +1205,7 @@ func (r *SingleClusterReconciler) handleK8sNodeBlockListPods(statefulSet *appsv1
}
}

podsBatchList := r.getPodsBatchToRestart(podsToRestart, len(podList))
podsBatchList := r.getPodBatch(podsToRestart, len(podList))

// Restart batch of pods
if len(podsBatchList) > 0 {
Expand Down Expand Up @@ -1844,7 +1862,7 @@ func getOriginalPath(path string) string {
return path
}

func (r *SingleClusterReconciler) getPodsBatchToRestart(podList []*corev1.Pod, rackSize int) [][]*corev1.Pod {
func (r *SingleClusterReconciler) getPodBatch(podList []*corev1.Pod, rackSize int) [][]*corev1.Pod {
// Error is already handled in validation
rollingUpdateBatchSize, _ := intstr.GetScaledValueFromIntOrPercent(
r.aeroCluster.Spec.RackConfig.RollingUpdateBatchSize, rackSize, false,
Expand Down
Loading
Loading