Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KO-198, KO-264: Migrate pods from the given list of k8s nodes #265

Merged
merged 16 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion api/v1/aerospikecluster_mutating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"gomodules.xyz/jsonpatch/v2"
v1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
Expand Down Expand Up @@ -69,6 +70,12 @@ func (c *AerospikeCluster) Default(operation v1.Operation) admission.Response {
}

func (c *AerospikeCluster) setDefaults(asLog logr.Logger) error {
// Set maxUnavailable default to 1
if c.Spec.MaxUnavailable == nil {
maxUnavailable := intstr.FromInt(1)
c.Spec.MaxUnavailable = &maxUnavailable
}

// Set network defaults
c.Spec.AerospikeNetworkPolicy.setDefaults(c.ObjectMeta.Namespace)

Expand Down Expand Up @@ -504,7 +511,7 @@ func setDefaultNetworkConf(
)
}
// Override these sections
// TODO: These values lines will be replaces with runtime info by script in init-container
// TODO: These values lines will be replaced with runtime info by script in init-container
sud82 marked this conversation as resolved.
Show resolved Hide resolved
// See if we can get better way to make template
serviceDefaults := map[string]interface{}{}
srvPort := GetServicePort(configSpec)
Expand Down
62 changes: 59 additions & 3 deletions api/v1/aerospikecluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ import (
lib "github.com/aerospike/aerospike-management-lib"
)

// +kubebuilder:validation:Enum=InProgress;Completed;Error
type AerospikeClusterPhase string

// These are the valid phases of Aerospike cluster.
const (
// AerospikeClusterInProgress means the Aerospike cluster operations are in-progress state.
// This phase denotes that changes are gradually rolling out to the cluster.
AerospikeClusterInProgress AerospikeClusterPhase = "InProgress"
// AerospikeClusterCompleted means the Aerospike cluster has been deployed/upgraded successfully and is ready to use.
sud82 marked this conversation as resolved.
Show resolved Hide resolved
AerospikeClusterCompleted AerospikeClusterPhase = "Completed"
// AerospikeClusterError means the Aerospike cluster is in error state because of some reason like misconfiguration,
// infra issues, etc.
AerospikeClusterError AerospikeClusterPhase = "Error"
)

// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// AerospikeClusterSpec defines the desired state of AerospikeCluster
Expand All @@ -37,6 +52,10 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// Aerospike cluster size
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Cluster Size"
Size int32 `json:"size"`
// MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application
// disruption. This value is used to create PodDisruptionBudget. Defaults to 1.
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Max Unavailable"
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// Aerospike server image
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Server Image"
Image string `json:"image"`
Expand Down Expand Up @@ -74,6 +93,9 @@ type AerospikeClusterSpec struct { //nolint:govet // for readability
// RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Roster Node BlockList"
RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"`
// K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods.
sud82 marked this conversation as resolved.
Show resolved Hide resolved
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Kubernetes Node BlockList"
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
}

type SeedsFinderServices struct {
Expand Down Expand Up @@ -564,9 +586,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability
// BlockVolumePolicy contains default policies for block volumes.
BlockVolumePolicy AerospikePersistentVolumePolicySpec `json:"blockVolumePolicy,omitempty"`

// CleanupThreads contains maximum number of cleanup threads(dd or blkdiscard) per init container.
// CleanupThreads contains the maximum number of cleanup threads(dd or blkdiscard) per init container.
CleanupThreads int `json:"cleanupThreads,omitempty"`

// LocalStorageClasses contains a list of storage classes which provisions local volumes.
LocalStorageClasses []string `json:"localStorageClasses,omitempty"`

// Volumes list to attach to created pods.
// +patchMergeKey=name
// +patchStrategy=merge
Expand All @@ -578,10 +603,12 @@ type AerospikeStorageSpec struct { //nolint:govet // for readability
// AerospikeClusterStatusSpec captures the current status of the cluster.
type AerospikeClusterStatusSpec struct { //nolint:govet // for readability
// Aerospike cluster size
// +operator-sdk:csv:customresourcedefinitions:type=status,displayName="Cluster Size"
Size int32 `json:"size,omitempty"`
// Aerospike server image
Image string `json:"image,omitempty"`
// MaxUnavailable is the percentage/number of pods that can be allowed to go down or unavailable before application
// disruption. This value is used to create PodDisruptionBudget. Defaults to 1.
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
// If set true then multiple pods can be created per Kubernetes Node.
// This will create a NodePort service for each Pod.
// NodePort, as the name implies, opens a specific port on all the Kubernetes Nodes ,
Expand Down Expand Up @@ -627,6 +654,8 @@ type AerospikeClusterStatusSpec struct { //nolint:govet // for readability
SeedsFinderServices SeedsFinderServices `json:"seedsFinderServices,omitempty"`
// RosterNodeBlockList is a list of blocked nodeIDs from roster in a strong-consistency setup
RosterNodeBlockList []string `json:"rosterNodeBlockList,omitempty"`
// K8sNodeBlockList is a list of Kubernetes nodes which are not used for Aerospike pods.
K8sNodeBlockList []string `json:"k8sNodeBlockList,omitempty"`
}

// AerospikeClusterStatus defines the observed state of AerospikeCluster
Expand All @@ -646,7 +675,11 @@ type AerospikeClusterStatus struct { //nolint:govet // for readability
// This is map instead of the conventional map as list convention to allow each pod to patch update its own
// status. The map key is the name of the pod.
// +patchStrategy=strategic
// +optional
sud82 marked this conversation as resolved.
Show resolved Hide resolved
Pods map[string]AerospikePodStatus `json:"pods" patchStrategy:"strategic"`

// Phase denotes the current phase of Aerospike cluster operation.
Phase AerospikeClusterPhase `json:"phase,omitempty"`
}

// AerospikeNetworkType specifies the type of network address to use.
Expand Down Expand Up @@ -834,9 +867,10 @@ type AerospikePodStatus struct { //nolint:govet // for readability
//+kubebuilder:storageversion
// +kubebuilder:printcolumn:name="Size",type=string,JSONPath=`.spec.size`
// +kubebuilder:printcolumn:name="Image",type=string,JSONPath=`.spec.image`
// +kubebuilder:printcolumn:name="MultiPodPerHost",type=boolean,JSONPath=`.spec.podSpec.MultiPodPerHost`
// +kubebuilder:printcolumn:name="MultiPodPerHost",type=boolean,JSONPath=`.spec.podSpec.multiPodPerHost`
// +kubebuilder:printcolumn:name="HostNetwork",type=boolean,JSONPath=`.spec.podSpec.hostNetwork`
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"

// AerospikeCluster is the schema for the AerospikeCluster API
// +operator-sdk:csv:customresourcedefinitions:displayName="Aerospike Cluster",resources={{Service, v1},{Pod,v1},{StatefulSet,v1}}
Expand Down Expand Up @@ -869,6 +903,7 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec,

status.Size = spec.Size
status.Image = spec.Image
status.MaxUnavailable = spec.MaxUnavailable

// Storage
statusStorage := AerospikeStorageSpec{}
Expand Down Expand Up @@ -949,6 +984,16 @@ func CopySpecToStatus(spec *AerospikeClusterSpec) (*AerospikeClusterStatusSpec,
status.RosterNodeBlockList = rosterNodeBlockList
}

if len(spec.K8sNodeBlockList) != 0 {
var k8sNodeBlockList []string

lib.DeepCopy(
&k8sNodeBlockList, &spec.K8sNodeBlockList,
)

status.K8sNodeBlockList = k8sNodeBlockList
}

return &status, nil
}

Expand All @@ -958,6 +1003,7 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec

spec.Size = status.Size
spec.Image = status.Image
spec.MaxUnavailable = status.MaxUnavailable

// Storage
specStorage := AerospikeStorageSpec{}
Expand Down Expand Up @@ -1039,5 +1085,15 @@ func CopyStatusToSpec(status *AerospikeClusterStatusSpec) (*AerospikeClusterSpec
spec.RosterNodeBlockList = rosterNodeBlockList
}

if len(status.K8sNodeBlockList) != 0 {
var k8sNodeBlockList []string

lib.DeepCopy(
&k8sNodeBlockList, &status.K8sNodeBlockList,
)

spec.K8sNodeBlockList = k8sNodeBlockList
}

return &spec, nil
}
58 changes: 58 additions & 0 deletions api/v1/aerospikecluster_validating_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func (c *AerospikeCluster) validate(aslog logr.Logger) error {
return fmt.Errorf("invalid cluster size 0")
}

// Validate MaxUnavailable for PodDisruptionBudget
if err := c.validateMaxUnavailable(); err != nil {
return err
}

// Validate Image version
version, err := GetImageVersion(c.Spec.Image)
if err != nil {
Expand Down Expand Up @@ -2190,3 +2195,56 @@ func validateIntOrStringField(value *intstr.IntOrString, fieldPath string) error

return nil
}

func (c *AerospikeCluster) validateMaxUnavailable() error {
// safe check for corner cases when mutation webhook somehow didn't work
if c.Spec.MaxUnavailable == nil {
return fmt.Errorf("maxUnavailable cannot be nil. Mutation webhook didn't work")
}

if err := validateIntOrStringField(c.Spec.MaxUnavailable, "spec.maxUnavailable"); err != nil {
return err
}

maxUnavailable := int(c.Spec.Size)
sud82 marked this conversation as resolved.
Show resolved Hide resolved

// If Size is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss
if maxUnavailable == 1 {
return nil
}

for idx := range c.Spec.RackConfig.Racks {
rack := &c.Spec.RackConfig.Racks[idx]
nsList := rack.AerospikeConfig.Value["namespaces"].([]interface{})

for _, nsInterface := range nsList {
rfInterface, exists := nsInterface.(map[string]interface{})["replication-factor"]
if !exists {
// Default RF is 2 if not given
maxUnavailable = 2
continue
}

rf, err := GetIntType(rfInterface)
if err != nil {
return fmt.Errorf("namespace replication-factor %v", err)
}

// If RF is 1, then ignore it for maxUnavailable calculation as it will anyway result in data loss
if rf == 1 {
continue
}

if rf < maxUnavailable {
maxUnavailable = rf
}
}
}

if c.Spec.MaxUnavailable.IntValue() >= maxUnavailable {
sud82 marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("maxUnavailable %s cannot be greater than or equal to %v",
c.Spec.MaxUnavailable.String(), maxUnavailable)
}

return nil
}
25 changes: 25 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading