Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
leon-inf committed Nov 14, 2024
1 parent 6a3d8ce commit 871a813
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 321 deletions.
7 changes: 5 additions & 2 deletions controllers/apps/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ var _ = Describe("Cluster Controller", func() {
Create(&testCtx).
GetObject()

By("Create a bpt obj")
testdp.CreateBackupPolicyTpl(&testCtx, compDefObj.Name)

By("Wait objects available")
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(compDefObj),
func(g Gomega, compDef *appsv1.ComponentDefinition) {
Expand Down Expand Up @@ -303,7 +306,7 @@ var _ = Describe("Cluster Controller", func() {
ml, client.InNamespace(clusterKey.Namespace))).Should(HaveLen(defaultShardCount))

By("checking backup policy")
backupPolicyName := generateBackupPolicyName(clusterKey.Name, compTplName, false)
backupPolicyName := generateBackupPolicyName(clusterKey.Name, compTplName)
backupPolicyKey := client.ObjectKey{Name: backupPolicyName, Namespace: clusterKey.Namespace}
Eventually(testapps.CheckObj(&testCtx, backupPolicyKey, func(g Gomega, bp *dpv1alpha1.BackupPolicy) {
g.Expect(bp.Spec.Targets).Should(HaveLen(defaultShardCount))
Expand Down Expand Up @@ -1068,7 +1071,7 @@ var _ = Describe("Cluster Controller", func() {
}

By("checking backup policy")
backupPolicyName := generateBackupPolicyName(clusterKey.Name, defaultCompName, false)
backupPolicyName := generateBackupPolicyName(clusterKey.Name, defaultCompName)
backupPolicyKey := client.ObjectKey{Name: backupPolicyName, Namespace: clusterKey.Namespace}
backupPolicy := &dpv1alpha1.BackupPolicy{}
Eventually(testapps.CheckObjExists(&testCtx, backupPolicyKey, backupPolicy, true)).Should(Succeed())
Expand Down
26 changes: 3 additions & 23 deletions controllers/apps/component_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ var _ = Describe("Component Controller", func() {
testapps.ClearResourcesWithRemoveFinalizerOption(&testCtx, generics.PodSignature, true, inNS, ml)
testapps.ClearResourcesWithRemoveFinalizerOption(&testCtx, generics.BackupSignature, true, inNS, ml)
testapps.ClearResourcesWithRemoveFinalizerOption(&testCtx, generics.BackupPolicySignature, true, inNS, ml)
testapps.ClearResourcesWithRemoveFinalizerOption(&testCtx, generics.VolumeSnapshotSignature, true, inNS)
// non-namespaced
testapps.ClearResources(&testCtx, generics.BackupPolicyTemplateSignature, ml)
testapps.ClearResources(&testCtx, generics.ActionSetSignature, ml)
Expand Down Expand Up @@ -603,9 +602,6 @@ var _ = Describe("Component Controller", func() {
AddVolumeClaimTemplate(testapps.LogVolumeName, pvcSpec)
})

// REVIEW: this test flow, wait for running phase?
testk8s.MockEnableVolumeSnapshot(&testCtx, testk8s.DefaultStorageClassName)

horizontalScale(int(updatedReplicas), testk8s.DefaultStorageClassName, compDefName)
}

Expand Down Expand Up @@ -1557,11 +1553,6 @@ var _ = Describe("Component Controller", func() {
})).ShouldNot(HaveOccurred())
Eventually(testapps.GetClusterComponentPhase(&testCtx, clusterKey, compName)).Should(Equal(kbappsv1.RunningComponentPhase))

By("the restore container has been removed from init containers")
Eventually(testapps.CheckObj(&testCtx, client.ObjectKeyFromObject(its), func(g Gomega, tmpIts *workloads.InstanceSet) {
g.Expect(tmpIts.Spec.Template.Spec.InitContainers).Should(BeEmpty())
})).Should(Succeed())

By("clean up annotations after cluster running")
Eventually(testapps.CheckObj(&testCtx, clusterKey, func(g Gomega, tmpCluster *kbappsv1.Cluster) {
g.Expect(tmpCluster.Status.Phase).Should(Equal(kbappsv1.RunningClusterPhase))
Expand Down Expand Up @@ -1746,7 +1737,7 @@ var _ = Describe("Component Controller", func() {
testHorizontalScale(defaultCompName, compDefObj.Name, 3, 0)
})

Context("with different backup methods", func() {
Context("scale-out multiple components", func() {
createNWaitClusterObj := func(components map[string]string,
processor func(compName string, factory *testapps.MockClusterFactory),
withFixedName ...bool) {
Expand All @@ -1773,7 +1764,8 @@ var _ = Describe("Component Controller", func() {
waitForCreatingResourceCompletely(clusterKey, compNames...)
}

testMultiCompHScale := func() {
// TODO
It("h-scale with data actions", func() {
compNameNDef := map[string]string{
fmt.Sprintf("%s-0", defaultCompName): compDefObj.Name,
fmt.Sprintf("%s-1", defaultCompName): compDefObj.Name,
Expand All @@ -1790,17 +1782,6 @@ var _ = Describe("Component Controller", func() {
}, false)

horizontalScale(int(updatedReplicas), testk8s.DefaultStorageClassName, compDefObj.Name)
}

It("h-scale with volume snapshot", func() {
testk8s.MockEnableVolumeSnapshot(&testCtx, testk8s.DefaultStorageClassName)
testMultiCompHScale()
})

// TODO
It("h-scale with data actions", func() {
testk8s.MockDisableVolumeSnapshot(&testCtx, testk8s.DefaultStorageClassName)
testMultiCompHScale()
})
})
})
Expand All @@ -1825,7 +1806,6 @@ var _ = Describe("Component Controller", func() {

It("scale-out", func() {
testVolumeExpansion(compDefObj, defaultCompName, mockStorageClass)
testk8s.MockEnableVolumeSnapshot(&testCtx, mockStorageClass.Name)
horizontalScale(5, mockStorageClass.Name, compDefObj.Name)
})
})
Expand Down
31 changes: 7 additions & 24 deletions controllers/apps/transformer_cluster_backup_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ func (r *clusterBackupPolicyTransformer) Transform(ctx graph.TransformContext, d
if backupPolicy == nil {
return
}
if bpBuilder.isHScaleTPL {
r.V(1).Info("Skip creating backup schedule for the h-scale backup policy template", "template", bpBuilder.backupPolicyTPL.Name)
return
}
// build the data protection backup schedule from the template.
oldBackupSchedule, newBackupSchedule := bpBuilder.transformBackupSchedule(backupPolicy)
// merge cluster backup configuration into the backup schedule.
Expand Down Expand Up @@ -120,8 +116,8 @@ func (r *clusterBackupPolicyTransformer) Transform(ctx graph.TransformContext, d
graphCli.DependOn(dag, backupPolicy, comps...)
}

transformBackupPolicyAndSchedule := func(bpt *dpv1alpha1.BackupPolicyTemplate, compSpec *appsv1.ClusterComponentSpec, componentName string, isSharding, isHScaleTPL bool) error {
bpBuilder := newBackupPolicyBuilder(r, compSpec, bpt, componentName, isSharding, isHScaleTPL)
transformBackupPolicyAndSchedule := func(bpt *dpv1alpha1.BackupPolicyTemplate, compSpec *appsv1.ClusterComponentSpec, componentName string, isSharding bool) error {
bpBuilder := newBackupPolicyBuilder(r, compSpec, bpt, componentName, isSharding)
policy := transformBackupPolicy(bpBuilder)
// only merge the first backupSchedule for the cluster backup.
transformBackupSchedule(bpBuilder, policy)
Expand All @@ -136,7 +132,7 @@ func (r *clusterBackupPolicyTransformer) Transform(ctx graph.TransformContext, d
if bpt == nil {
return nil
}
return transformBackupPolicyAndSchedule(bpt, compSpec, componentName, isSharding, false)
return transformBackupPolicyAndSchedule(bpt, compSpec, componentName, isSharding)
}

for i := range r.Cluster.Spec.ComponentSpecs {
Expand Down Expand Up @@ -173,7 +169,6 @@ type backupPolicyBuilder struct {
record.EventRecorder
logr.Logger
Cluster *appsv1.Cluster
isHScaleTPL bool
backupPolicyTPL *dpv1alpha1.BackupPolicyTemplate
compSpec *appsv1.ClusterComponentSpec
componentName string
Expand All @@ -184,8 +179,7 @@ func newBackupPolicyBuilder(r *clusterBackupPolicyTransformer,
compSpec *appsv1.ClusterComponentSpec,
backupPolicyTPL *dpv1alpha1.BackupPolicyTemplate,
componentName string,
isSharding,
isHScaleTPL bool) *backupPolicyBuilder {
isSharding bool) *backupPolicyBuilder {
return &backupPolicyBuilder{
Context: r.Context,
Client: r.Client,
Expand All @@ -195,14 +189,13 @@ func newBackupPolicyBuilder(r *clusterBackupPolicyTransformer,
compSpec: compSpec,
backupPolicyTPL: backupPolicyTPL,
componentName: componentName,
isHScaleTPL: isHScaleTPL,
isSharding: isSharding,
}
}

// transformBackupPolicy transforms backup policy template to backup policy.
func (r *backupPolicyBuilder) transformBackupPolicy() (*dpv1alpha1.BackupPolicy, *dpv1alpha1.BackupPolicy) {
backupPolicyName := generateBackupPolicyName(r.Cluster.Name, r.componentName, r.isHScaleTPL)
backupPolicyName := generateBackupPolicyName(r.Cluster.Name, r.componentName)
backupPolicy := &dpv1alpha1.BackupPolicy{}
if err := r.Client.Get(r.Context, client.ObjectKey{
Namespace: r.Cluster.Namespace,
Expand Down Expand Up @@ -639,16 +632,9 @@ func (r *backupPolicyBuilder) mergeClusterBackup(
return backupSchedule
}

func (r *backupPolicyBuilder) defaultPolicyAnnotationValue() string {
if r.isHScaleTPL {
return "false"
}
return trueVal
}

func (r *backupPolicyBuilder) buildAnnotations() map[string]string {
annotations := map[string]string{
dptypes.DefaultBackupPolicyAnnotationKey: r.defaultPolicyAnnotationValue(),
dptypes.DefaultBackupPolicyAnnotationKey: trueVal,
constant.BackupPolicyTemplateAnnotationKey: r.backupPolicyTPL.Name,
}
if r.backupPolicyTPL.Annotations[dptypes.ReconfigureRefAnnotationKey] != "" {
Expand Down Expand Up @@ -691,10 +677,7 @@ func (r *backupPolicyBuilder) buildTargetPodLabels(role string, fullCompName str
}

// generateBackupPolicyName generates the backup policy name which is created from backup policy template.
func generateBackupPolicyName(clusterName, componentName string, isHScaleTPL bool) string {
if isHScaleTPL {
return fmt.Sprintf("%s-%s-backup-policy-hscale", clusterName, componentName)
}
func generateBackupPolicyName(clusterName, componentName string) string {
return fmt.Sprintf("%s-%s-backup-policy", clusterName, componentName)
}

Expand Down
40 changes: 27 additions & 13 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,9 @@ func (r *componentWorkloadOps) deletePVCs4ScaleIn(itsObj *workloads.InstanceSet)
}
pvc := corev1.PersistentVolumeClaim{}
if err := r.cli.Get(r.reqCtx.Ctx, pvcKey, &pvc, inDataContext4C()); err != nil {
if apierrors.IsNotFound(err) {
continue // the pvc is already deleted or not created
}
return err
}
// Since there are no order guarantee between updating ITS and deleting PVCs, if there is any error occurred
Expand All @@ -600,32 +603,43 @@ func (r *componentWorkloadOps) deletePVCs4ScaleIn(itsObj *workloads.InstanceSet)
}

func (r *componentWorkloadOps) scaleOut() error {
if r.synthesizeComp.LifecycleActions == nil {
return nil
}
dataDump := r.synthesizeComp.LifecycleActions.DataDump
dataLoad := r.synthesizeComp.LifecycleActions.DataLoad
if dataDump == nil || dataDump.Exec == nil || dataLoad == nil || dataLoad.Exec == nil {
return nil
}

// replicas in provisioning
provisioningReplicas, err := component.ReplicasInProvisioning(r.component)
if err != nil {
return err
}

// replicas to be created
newReplicas := r.desiredCompPodNameSet.Difference(r.runningItsPodNameSet).UnsortedList()

// update replicas of the workload
graphCli := model.NewGraphClient(r.cli)
v := graphCli.Do(r.dag, nil, r.protoITS, model.ActionUpdatePtr(), nil)

newReplicas := r.desiredCompPodNameSet.Difference(r.runningItsPodNameSet).UnsortedList()
hasDataActionDefined := func() bool {
if r.synthesizeComp.LifecycleActions == nil {
return false
}
dataDump := r.synthesizeComp.LifecycleActions.DataDump
dataLoad := r.synthesizeComp.LifecycleActions.DataLoad
if dataDump == nil || dataDump.Exec == nil || dataLoad == nil || dataLoad.Exec == nil {
return false
}
return true
}()

// build and assign data replication tasks
if err := func() error {
source, err := r.sourceReplica(dataDump)
if !hasDataActionDefined {
return nil
}

source, err := r.sourceReplica(r.synthesizeComp.LifecycleActions.DataDump)
if err != nil {
return err
}

replicas := append(newReplicas, provisioningReplicas...)
replicas := append(slices.Clone(newReplicas), provisioningReplicas...)
parameters, err := component.NewReplicaTask(r.synthesizeComp.FullCompName, r.synthesizeComp.Generation, source, replicas)
if err != nil {
return err
Expand All @@ -646,7 +660,7 @@ func (r *componentWorkloadOps) scaleOut() error {
return err
}

return component.NewReplicas(r.component, newReplicas)
return component.NewReplicas(r.component, newReplicas, hasDataActionDefined)
}

func (r *componentWorkloadOps) sourceReplica(dataDump *appsv1.Action) (*corev1.Pod, error) {
Expand Down
Loading

0 comments on commit 871a813

Please sign in to comment.