Skip to content

Commit

Permalink
support vscale
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei committed Oct 15, 2024
1 parent 44d7459 commit 22689c1
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 39 deletions.
28 changes: 24 additions & 4 deletions pkg/action/template/cluster_operations_template.cue
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ options: {
component: string
instance: string
componentNames: [...string]
instanceTPLNames: [...string]
rebuildInstanceFrom: [
...{
componentName: string
Expand Down Expand Up @@ -140,21 +141,40 @@ content: {
verticalScaling: [ for _, cName in options.componentNames {
componentName: cName
requests: {
if options.memory != "" {
if options.memory != "" && len(options.instanceTPLNames) == 0 {
memory: options.memory
}
if options.cpu != "" {
if options.cpu != "" && len(options.instanceTPLNames) == 0 {
cpu: options.cpu
}
}
limits: {
if options.memory != "" {
if options.memory != "" && len(options.instanceTPLNames) == 0 {
memory: options.memory
}
if options.cpu != "" {
if options.cpu != "" && len(options.instanceTPLNames) == 0 {
cpu: options.cpu
}
}
instances: [ for _, tplName in options.instanceTPLNames {
name: tplName
requests: {
if options.memory != "" {
memory: options.memory
}
if options.cpu != "" {
cpu: options.cpu
}
}
limits: {
if options.memory != "" {
memory: options.memory
}
if options.cpu != "" {
cpu: options.cpu
}
}
}]
}]
}
if options.type == "Reconfiguring" {
Expand Down
23 changes: 23 additions & 0 deletions pkg/cluster/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,29 @@ func GetDefaultCompName(cd *appsv1alpha1.ClusterDefinition) (string, error) {
return "", fmt.Errorf("failed to get the default component definition name")
}

func IsShardingComponent(cluster *kbappsv1.Cluster, compName string) bool {
if cluster.Spec.GetShardingByName(compName) != nil {
return true
}
return false
}

func ComponentNameLabelKey(cluster *kbappsv1.Cluster, compName string) string {
compLabelKey := constant.KBAppComponentLabelKey
if IsShardingComponent(cluster, compName) {
compLabelKey = constant.KBAppShardingNameLabelKey
}
return compLabelKey
}

func GetComponentSpec(cluster *kbappsv1.Cluster, compName string) *kbappsv1.ClusterComponentSpec {
shardingSpec := cluster.Spec.GetShardingByName(compName)
if shardingSpec != nil {
return &shardingSpec.Template
}
return cluster.Spec.GetComponentByName(compName)
}

func GetClusterByName(dynamic dynamic.Interface, name string, namespace string) (*kbappsv1.Cluster, error) {
cluster := &kbappsv1.Cluster{}
if err := util.GetK8SClientObject(dynamic, cluster, types.ClusterGVR(), namespace, name); err != nil {
Expand Down
84 changes: 51 additions & 33 deletions pkg/cmd/cluster/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type OperationsOptions struct {
AutoApprove bool `json:"-"`
ComponentNames []string `json:"componentNames,omitempty"`
OpsRequestName string `json:"opsRequestName"`
InstanceTPLNames []string `json:"instanceTPLNames,omitempty"`
TTLSecondsAfterSucceed int `json:"ttlSecondsAfterSucceed"`
Force bool `json:"force"`

Expand Down Expand Up @@ -158,6 +159,7 @@ func (o *OperationsOptions) addCommonFlags(cmd *cobra.Command, f cmdutil.Factory
cmd.Flags().IntVar(&o.TTLSecondsAfterSucceed, "ttlSecondsAfterSucceed", 0, "Time to live after the OpsRequest succeed")
cmd.Flags().StringVar(&o.DryRun, "dry-run", "none", `Must be "client", or "server". If with client strategy, only print the object that would be sent, and no data is actually sent. If with server strategy, submit the server-side request, but no data is persistent.`)
cmd.Flags().Lookup("dry-run").NoOptDefVal = "unchanged"
cmd.Flags().BoolVar(&o.EditBeforeCreate, "edit", o.EditBeforeCreate, "Edit the API resource before creating")
if o.HasComponentNamesFlag {
flags.AddComponentsFlag(f, cmd, &o.ComponentNames, "Component names to this operations")
}
Expand Down Expand Up @@ -338,55 +340,68 @@ func (o *OperationsOptions) validateUpgrade(cluster *appsv1.Cluster) error {
}
return nil
}
return o.handleComponentOps(cluster, validateCompSpec)
}

func (o *OperationsOptions) handleComponentOps(cluster *appsv1.Cluster, handleF func(compSpec appsv1.ClusterComponentSpec, compName string) error) error {
for _, v := range cluster.Spec.ComponentSpecs {
if !slices.Contains(o.ComponentNames, v.Name) {
continue
}
if err := validateCompSpec(v, v.Name); err != nil {
if err := handleF(v, v.Name); err != nil {
return err
}
}
for _, v := range cluster.Spec.ShardingSpecs {
if !slices.Contains(o.ComponentNames, v.Name) {
continue
}
if err := validateCompSpec(v.Template, v.Name); err != nil {
if err := handleF(v.Template, v.Name); err != nil {
return err
}
}
return nil
}

func (o *OperationsOptions) validateVolumeExpansion() error {
func (o *OperationsOptions) validateVolumeExpansion(clusterObj *appsv1.Cluster) error {
if len(o.VCTNames) == 0 {
return fmt.Errorf("missing volume-claim-templates")
}
if len(o.Storage) == 0 {
return fmt.Errorf("missing storage")
}

for _, cName := range o.ComponentNames {
for _, vctName := range o.VCTNames {
labels := fmt.Sprintf("%s=%s,%s=%s,%s=%s",
constant.AppInstanceLabelKey, o.Name,
constant.KBAppComponentLabelKey, cName,
cluster.ComponentNameLabelKey(clusterObj, cName), cName,
constant.VolumeClaimTemplateNameLabelKey, vctName,
)
pvcs, err := o.Client.CoreV1().PersistentVolumeClaims(o.Namespace).List(context.Background(),
metav1.ListOptions{LabelSelector: labels, Limit: 1})
metav1.ListOptions{LabelSelector: labels, Limit: 20})
if err != nil {
return err
}
if len(pvcs.Items) == 0 {
continue
var pvc *corev1.PersistentVolumeClaim
for _, pvcItem := range pvcs.Items {
if pvcItem.Labels[constant.KBAppComponentInstanceTemplateLabelKey] == "" {
pvc = &pvcItem
break
}
}
if pvc == nil {
return nil
}
pvc := pvcs.Items[0]
specStorage := pvc.Spec.Resources.Requests.Storage()
statusStorage := pvc.Status.Capacity.Storage()
targetStorage, err := resource.ParseQuantity(o.Storage)
if err != nil {
return fmt.Errorf("cannot parse '%v', %v", o.Storage, err)
}
if targetStorage.Cmp(*statusStorage) < 0 {
return fmt.Errorf(`requested storage size of volumeClaimTemplate "%s" can not less than status.capacity.storage "%s" `,
vctName, statusStorage.String())
}
// determine whether the opsRequest is a recovery action for volume expansion failure
if specStorage.Cmp(targetStorage) > 0 &&
statusStorage.Cmp(targetStorage) <= 0 {
Expand All @@ -404,7 +419,7 @@ func (o *OperationsOptions) validateVScale(cluster *appsv1.Cluster) error {
return fmt.Errorf("cpu or memory must be specified")
}

fillResource := func(comp *appsv1.ClusterComponentSpec) error {
fillResource := func(comp appsv1.ClusterComponentSpec, compName string) error {
requests := make(corev1.ResourceList)
if o.CPU != "" {
cpu, err := resource.ParseQuantity(o.CPU)
Expand All @@ -420,23 +435,9 @@ func (o *OperationsOptions) validateVScale(cluster *appsv1.Cluster) error {
}
requests[corev1.ResourceMemory] = memory
}
requests.DeepCopyInto(&comp.Resources.Requests)
requests.DeepCopyInto(&comp.Resources.Limits)
return nil
}

for _, name := range o.ComponentNames {
for _, comp := range cluster.Spec.ComponentSpecs {
if comp.Name != name {
continue
}
if err := fillResource(&comp); err != nil {
return err
}
}
}

return nil
return o.handleComponentOps(cluster, fillResource)
}

// Validate command flags or args is legal
Expand All @@ -459,7 +460,7 @@ func (o *OperationsOptions) Validate() error {
}
switch o.OpsType {
case opsv1alpha1.VolumeExpansionType:
if err = o.validateVolumeExpansion(); err != nil {
if err = o.validateVolumeExpansion(cluster); err != nil {
return err
}
case opsv1alpha1.UpgradeType:
Expand All @@ -485,15 +486,29 @@ func (o *OperationsOptions) Validate() error {
return nil
}

func (o *OperationsOptions) validateComponents(cluster *appsv1.Cluster) error {
func (o *OperationsOptions) validateComponents(clusterObj *appsv1.Cluster) error {
validateInstances := func(instances []appsv1.InstanceTemplate, componentName string) error {
for _, v := range o.InstanceTPLNames {
var exist bool
for _, ins := range instances {
if v == ins.Name {
exist = true
break
}
}
if !exist {
return fmt.Errorf(`can not found the instance template "%s" in the component "%s"`, v, componentName)
}
}
return nil
}
for _, compName := range o.ComponentNames {
compSpec := cluster.Spec.GetComponentByName(compName)
if compSpec != nil {
continue
compSpec := cluster.GetComponentSpec(clusterObj, compName)
if compSpec == nil {
return fmt.Errorf(`can not found the component "%s" in cluster "%s"`, compName, clusterObj.Name)
}
shardingSpec := cluster.Spec.GetShardingByName(compName)
if shardingSpec == nil {
return fmt.Errorf(`can not found the component "%s" in cluster "%s"`, compName, cluster.Name)
if err := validateInstances(compSpec.Instances, compName); err != nil {
return err
}
}
return nil
Expand Down Expand Up @@ -820,6 +835,8 @@ func NewVerticalScalingCmd(f cmdutil.Factory, streams genericiooptions.IOStreams
},
}
o.addCommonFlags(cmd, f)
cmd.Flags().StringSliceVar(&o.InstanceTPLNames, "instance-tpl", nil, "vertically scaling the specified instance template in the specified component")
util.CheckErr(flags.CompletedInstanceTemplatesFlag(cmd, f, "instance-tpl"))
cmd.Flags().StringVar(&o.CPU, "cpu", "", "Request and limit size of component cpu")
cmd.Flags().StringVar(&o.Memory, "memory", "", "Request and limit size of component memory")
cmd.Flags().BoolVar(&o.AutoApprove, "auto-approve", false, "Skip interactive approval before vertically scaling the cluster")
Expand Down Expand Up @@ -880,6 +897,7 @@ func NewVolumeExpansionCmd(f cmdutil.Factory, streams genericiooptions.IOStreams
cmdutil.CheckErr(o.Run())
},
}
// TODO: supports to volume expand the vcts of the instance templates?
o.addCommonFlags(cmd, f)
cmd.Flags().StringSliceVarP(&o.VCTNames, "volume-claim-templates", "t", nil, "VolumeClaimTemplate names in components (required)")
cmd.Flags().StringVar(&o.Storage, "storage", "", "Volume storage size (required)")
Expand Down
28 changes: 26 additions & 2 deletions pkg/util/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,10 @@ func autoCompleteClusterComponent(cmd *cobra.Command, f cmdutil.Factory, flag st

func CompletedInstanceFlag(cmd *cobra.Command, f cmdutil.Factory, flag string) error {
autoComplete := func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
var clusterName string
if len(args) == 0 {
return nil, cobra.ShellCompDirectiveNoFileComp
}
clusterName = args[0]
clusterName := args[0]
namespace, _, _ := f.ToRawKubeConfigLoader().Namespace()
cli, _ := f.KubernetesClientSet()
pods, err := cli.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
Expand All @@ -257,3 +256,28 @@ func CompletedInstanceFlag(cmd *cobra.Command, f cmdutil.Factory, flag string) e
}
return cmd.RegisterFlagCompletionFunc(flag, autoComplete)
}

func CompletedInstanceTemplatesFlag(cmd *cobra.Command, f cmdutil.Factory, flag string) error {
autoComplete := func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
if len(args) == 0 {
return nil, cobra.ShellCompDirectiveNoFileComp
}
namespace, _, _ := f.ToRawKubeConfigLoader().Namespace()
dynamic, _ := f.DynamicClient()
cluster := &appsv1alpha1.Cluster{}
_ = util.GetK8SClientObject(dynamic, cluster, types.ClusterGVR(), namespace, util.GetClusterNameFromArgsOrFlag(cmd, args))
var templateNames []string
for _, comp := range cluster.Spec.ComponentSpecs {
for _, insTpl := range comp.Instances {
templateNames = append(templateNames, insTpl.Name)
}
}
for _, comp := range cluster.Spec.ShardingSpecs {
for _, insTpl := range comp.Template.Instances {
templateNames = append(templateNames, insTpl.Name)
}
}
return templateNames, cobra.ShellCompDirectiveNoFileComp
}
return cmd.RegisterFlagCompletionFunc(flag, autoComplete)
}

0 comments on commit 22689c1

Please sign in to comment.