From f4fab6a2a8fb6004ca5b58014c1f406d4392ea43 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Wed, 10 Jan 2024 19:40:38 +0800 Subject: [PATCH 1/2] add handling for getting rollout status Signed-off-by: LiZhenCheng9527 --- pkg/fleet-manager/application/controller.go | 33 +++++ .../application/rollout_helper.go | 56 +++++++++ .../application/rollout_helper_test.go | 115 +++++++++++++----- 3 files changed, 176 insertions(+), 28 deletions(-) diff --git a/pkg/fleet-manager/application/controller.go b/pkg/fleet-manager/application/controller.go index 331f4df72..38be19093 100644 --- a/pkg/fleet-manager/application/controller.go +++ b/pkg/fleet-manager/application/controller.go @@ -261,6 +261,7 @@ func (a *ApplicationManager) reconcileSourceStatus(ctx context.Context, app *app // reconcileSyncStatus reconciles the sync status of the given application by finding all Kustomizations and HelmReleases associated with it, // and updating the sync status of each resource in the application's SyncStatus field. func (a *ApplicationManager) reconcileSyncStatus(ctx context.Context, app *applicationapi.Application) error { + log := ctrl.LoggerFrom(ctx) var syncStatus []*applicationapi.ApplicationSyncStatus // find all kustomization @@ -291,6 +292,38 @@ func (a *ApplicationManager) reconcileSyncStatus(ctx context.Context, app *appli syncStatus = append(syncStatus, helmReleaseStatus) } + // there only one fleet, so pre-fetch it here. + fleetKey := generateFleetKey(app) + fleet := &fleetapi.Fleet{} + if err := a.Client.Get(ctx, fleetKey, fleet); err != nil { + if apierrors.IsNotFound(err) { + log.Info("fleet does not exist", "fleet", fleetKey) + return nil + } + log.Error(err, "failed to find fleet", "fleet", fleetKey) + return err + } + + rolloutStatus := make(map[string]*applicationapi.RolloutStatus) + // Get rollout status from member clusters + for index, syncPolicy := range app.Spec.SyncPolicies { + policyName := generatePolicyName(app, index) + if syncPolicy.Rollout != nil { + status, _, err := a.reconcileRolloutSyncStatus(ctx, app, fleet, syncPolicy, policyName) + if err != nil { + return errors.Wrapf(err, "failed to reconcil rollout status") + } + rolloutStatus = mergeMap(status, rolloutStatus) + } + } + + // update rollout status + for index, policyStatus := range syncStatus { + if _, exist := rolloutStatus[policyStatus.Name]; exist { + syncStatus[index].RolloutStatus = rolloutStatus[policyStatus.Name] + } + } + app.Status.SyncStatus = syncStatus return nil } diff --git a/pkg/fleet-manager/application/rollout_helper.go b/pkg/fleet-manager/application/rollout_helper.go index 0eea24355..fc85fef15 100644 --- a/pkg/fleet-manager/application/rollout_helper.go +++ b/pkg/fleet-manager/application/rollout_helper.go @@ -167,6 +167,53 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context, return ctrl.Result{}, nil } +func (a *ApplicationManager) reconcileRolloutSyncStatus(ctx context.Context, + app *applicationapi.Application, + fleet *fleetapi.Fleet, + syncPolicy *applicationapi.ApplicationSyncPolicy, + policyName string, +) (map[string]*applicationapi.RolloutStatus, ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + + // depend on fleet and cluster selector get destination clusters + destinationClusters, err := a.fetchRolloutClusters(ctx, app, a.Client, fleet, syncPolicy) + if err != nil { + log.Error(err, "failed to fetch destination clusters for syncPolicy") + return nil, ctrl.Result{}, err + } + + rolloutStatus := map[string]*applicationapi.RolloutStatus{} + // Loop all destination cluster to get canary status + for clusterKey, cluster := range destinationClusters { + fleetClusterClient := cluster.Client.CtrlRuntimeClient() + name := generatePolicyResourceName(policyName, clusterKey.Kind, clusterKey.Name) + canary := &flaggerv1b1.Canary{} + canaryNamespacedName := types.NamespacedName{ + Namespace: syncPolicy.Rollout.Workload.Namespace, + Name: syncPolicy.Rollout.Workload.Name, + } + // Use the client of the target cluster to get the status of Flagger canary resources + if err := fleetClusterClient.Get(ctx, canaryNamespacedName, canary); err != nil { + return nil, ctrl.Result{}, errors.Wrapf(err, "failed to get canary %s in %s", canaryNamespacedName.Name, clusterKey.Name) + } + + if status, exists := rolloutStatus[name]; exists { + // If a match is found, update the existing rolloutStatus with the new status. + status.RolloutStatusInCluster = &canary.Status + } else { + currentstatus := applicationapi.RolloutStatus{ + ClusterName: clusterKey.Name, + RolloutNameInCluster: canaryNamespacedName.Name, + RolloutStatusInCluster: &canary.Status, + } + rolloutStatus[name] = ¤tstatus + } + } + + log.Info("finish get rollout status") + return rolloutStatus, ctrl.Result{RequeueAfter: StatusSyncInterval}, nil +} + func enableIstioSidecarInjection(ctx context.Context, kubeClient client.Client, namespace string) error { log := ctrl.LoggerFrom(ctx) @@ -399,3 +446,12 @@ func addLabels(obj client.Object, key, value string) client.Object { obj.SetLabels(labels) return obj } + +func mergeMap(map1, map2 map[string]*applicationapi.RolloutStatus) map[string]*applicationapi.RolloutStatus { + for name, rolloutStatus := range map1 { + if _, exist := map2[name]; !exist { + map2[name] = rolloutStatus + } + } + return map2 +} diff --git a/pkg/fleet-manager/application/rollout_helper_test.go b/pkg/fleet-manager/application/rollout_helper_test.go index 4bf90e1f5..f9d4bbc03 100644 --- a/pkg/fleet-manager/application/rollout_helper_test.go +++ b/pkg/fleet-manager/application/rollout_helper_test.go @@ -396,34 +396,6 @@ func Test_renderCanaryAnalysis(t *testing.T) { } } -/* -func Test_generateDeployConfig(t *testing.T) { - filepath := manifests.BuiltinOrDir("") - //fmt.Printf("%s", filepath) - deployname := "plugins/testloader-deploy.yaml" - namespacedName := types.NamespacedName{ - Namespace: "test", - Name: "podinfo", - } - if _, err := generateDeployConfig(filepath, deployname, namespacedName.Name, namespacedName.Namespace); err != nil { - fmt.Printf("failed get testloader deployment configuration: %v", err) - } -} - -func Test_generateSvcConfig(t *testing.T) { - filepath := manifests.BuiltinOrDir("") - //fmt.Printf("%s", filepath) - svcname := "plugins/testloader-svc.yaml" - namespacedName := types.NamespacedName{ - Namespace: "test", - Name: "podinfo", - } - if _, err := generateSvcConfig(filepath, svcname, namespacedName.Name, namespacedName.Namespace); err != nil { - fmt.Printf("failed get testloader deployment configuration: %v", err) - } -} -*/ - func Test_addLables(t *testing.T) { type args struct { obj client.Object @@ -504,3 +476,90 @@ func Test_addLables(t *testing.T) { }) } } + +func TestMergeMap(t *testing.T) { + type args struct { + map1 map[string]*applicationapi.RolloutStatus + map2 map[string]*applicationapi.RolloutStatus + } + tests := []struct { + name string + args args + want map[string]*applicationapi.RolloutStatus + }{ + { + name: "function test", + args: args{ + map1: map[string]*applicationapi.RolloutStatus{ + "kurator": { + ClusterName: "kurator-member1", + RolloutNameInCluster: "podinfo", + RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{ + Phase: "success", + }, + }, + "istio": { + ClusterName: "kurator-member2", + RolloutNameInCluster: "podinfo", + RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{ + Phase: "Initializing", + }, + }, + }, + map2: map[string]*applicationapi.RolloutStatus{ + "kubeedge": { + ClusterName: "kurator-member1", + RolloutNameInCluster: "podinfo", + RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{ + Phase: "success", + }, + }, + "karmada": { + ClusterName: "kurator-member1", + RolloutNameInCluster: "podinfo", + RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{ + Phase: "Initializing", + }, + }, + }, + }, + want: map[string]*applicationapi.RolloutStatus{ + "kurator": { + ClusterName: "kurator-member1", + RolloutNameInCluster: "podinfo", + RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{ + Phase: "success", + }, + }, + "istio": { + ClusterName: "kurator-member2", + RolloutNameInCluster: "podinfo", + RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{ + Phase: "Initializing", + }, + }, + "kubeedge": { + ClusterName: "kurator-member1", + RolloutNameInCluster: "podinfo", + RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{ + Phase: "success", + }, + }, + "karmada": { + ClusterName: "kurator-member1", + RolloutNameInCluster: "podinfo", + RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{ + Phase: "Initializing", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := mergeMap(tt.args.map1, tt.args.map2); !reflect.DeepEqual(got, tt.want) { + t.Errorf("mergeMap() = %v, want %v", got, tt.want) + } + }) + } +} From 46260c57347e829c9488e3cc9935bf6dec334a9c Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Wed, 10 Jan 2024 20:44:50 +0800 Subject: [PATCH 2/2] Removed duplicate fetch fleet operations Signed-off-by: LiZhenCheng9527 --- pkg/fleet-manager/application/controller.go | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/pkg/fleet-manager/application/controller.go b/pkg/fleet-manager/application/controller.go index 38be19093..a5f946bc0 100644 --- a/pkg/fleet-manager/application/controller.go +++ b/pkg/fleet-manager/application/controller.go @@ -168,7 +168,7 @@ func (a *ApplicationManager) reconcile(ctx context.Context, app *applicationapi. return result, err } - if err := a.reconcileStatus(ctx, app); err != nil { + if err := a.reconcileStatus(ctx, app, fleet); err != nil { log.Error(err, "failed to reconcile status") return ctrl.Result{}, err } @@ -201,12 +201,12 @@ func (a *ApplicationManager) reconcileApplicationResources(ctx context.Context, // reconcileStatus updates the status of resources associated with the current Application resource. // It does this by fetching the current status of the source (either GitRepoKind or HelmRepoKind) and the sync policy from the API server, // and updating the Application's status to reflect these current statuses. -func (a *ApplicationManager) reconcileStatus(ctx context.Context, app *applicationapi.Application) error { +func (a *ApplicationManager) reconcileStatus(ctx context.Context, app *applicationapi.Application, fleet *fleetapi.Fleet) error { if err := a.reconcileSourceStatus(ctx, app); err != nil { return err } - if err := a.reconcileSyncStatus(ctx, app); err != nil { + if err := a.reconcileSyncStatus(ctx, app, fleet); err != nil { return err } @@ -260,8 +260,7 @@ func (a *ApplicationManager) reconcileSourceStatus(ctx context.Context, app *app // reconcileSyncStatus reconciles the sync status of the given application by finding all Kustomizations and HelmReleases associated with it, // and updating the sync status of each resource in the application's SyncStatus field. -func (a *ApplicationManager) reconcileSyncStatus(ctx context.Context, app *applicationapi.Application) error { - log := ctrl.LoggerFrom(ctx) +func (a *ApplicationManager) reconcileSyncStatus(ctx context.Context, app *applicationapi.Application, fleet *fleetapi.Fleet) error { var syncStatus []*applicationapi.ApplicationSyncStatus // find all kustomization @@ -292,18 +291,6 @@ func (a *ApplicationManager) reconcileSyncStatus(ctx context.Context, app *appli syncStatus = append(syncStatus, helmReleaseStatus) } - // there only one fleet, so pre-fetch it here. - fleetKey := generateFleetKey(app) - fleet := &fleetapi.Fleet{} - if err := a.Client.Get(ctx, fleetKey, fleet); err != nil { - if apierrors.IsNotFound(err) { - log.Info("fleet does not exist", "fleet", fleetKey) - return nil - } - log.Error(err, "failed to find fleet", "fleet", fleetKey) - return err - } - rolloutStatus := make(map[string]*applicationapi.RolloutStatus) // Get rollout status from member clusters for index, syncPolicy := range app.Spec.SyncPolicies {