Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add handling for getting rollout status #556

Merged
merged 2 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions pkg/fleet-manager/application/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (a *ApplicationManager) reconcile(ctx context.Context, app *applicationapi.
return result, err
}

if err := a.reconcileStatus(ctx, app); err != nil {
if err := a.reconcileStatus(ctx, app, fleet); err != nil {
log.Error(err, "failed to reconcile status")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -201,12 +201,12 @@ func (a *ApplicationManager) reconcileApplicationResources(ctx context.Context,
// reconcileStatus updates the status of resources associated with the current Application resource.
// It does this by fetching the current status of the source (either GitRepoKind or HelmRepoKind) and the sync policy from the API server,
// and updating the Application's status to reflect these current statuses.
func (a *ApplicationManager) reconcileStatus(ctx context.Context, app *applicationapi.Application) error {
func (a *ApplicationManager) reconcileStatus(ctx context.Context, app *applicationapi.Application, fleet *fleetapi.Fleet) error {
if err := a.reconcileSourceStatus(ctx, app); err != nil {
return err
}

if err := a.reconcileSyncStatus(ctx, app); err != nil {
if err := a.reconcileSyncStatus(ctx, app, fleet); err != nil {
return err
}

Expand Down Expand Up @@ -260,7 +260,7 @@ func (a *ApplicationManager) reconcileSourceStatus(ctx context.Context, app *app

// reconcileSyncStatus reconciles the sync status of the given application by finding all Kustomizations and HelmReleases associated with it,
// and updating the sync status of each resource in the application's SyncStatus field.
func (a *ApplicationManager) reconcileSyncStatus(ctx context.Context, app *applicationapi.Application) error {
func (a *ApplicationManager) reconcileSyncStatus(ctx context.Context, app *applicationapi.Application, fleet *fleetapi.Fleet) error {
var syncStatus []*applicationapi.ApplicationSyncStatus

// find all kustomization
Expand Down Expand Up @@ -291,6 +291,26 @@ func (a *ApplicationManager) reconcileSyncStatus(ctx context.Context, app *appli
syncStatus = append(syncStatus, helmReleaseStatus)
}

rolloutStatus := make(map[string]*applicationapi.RolloutStatus)
// Get rollout status from member clusters
for index, syncPolicy := range app.Spec.SyncPolicies {
policyName := generatePolicyName(app, index)
if syncPolicy.Rollout != nil {
status, _, err := a.reconcileRolloutSyncStatus(ctx, app, fleet, syncPolicy, policyName)
if err != nil {
return errors.Wrapf(err, "failed to reconcil rollout status")
}
rolloutStatus = mergeMap(status, rolloutStatus)
}
}

// update rollout status
for index, policyStatus := range syncStatus {
if _, exist := rolloutStatus[policyStatus.Name]; exist {
syncStatus[index].RolloutStatus = rolloutStatus[policyStatus.Name]
}
}

app.Status.SyncStatus = syncStatus
return nil
}
Expand Down
56 changes: 56 additions & 0 deletions pkg/fleet-manager/application/rollout_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,53 @@ func (a *ApplicationManager) syncRolloutPolicyForCluster(ctx context.Context,
return ctrl.Result{}, nil
}

func (a *ApplicationManager) reconcileRolloutSyncStatus(ctx context.Context,
app *applicationapi.Application,
fleet *fleetapi.Fleet,
syncPolicy *applicationapi.ApplicationSyncPolicy,
policyName string,
) (map[string]*applicationapi.RolloutStatus, ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// depend on fleet and cluster selector get destination clusters
destinationClusters, err := a.fetchRolloutClusters(ctx, app, a.Client, fleet, syncPolicy)
if err != nil {
log.Error(err, "failed to fetch destination clusters for syncPolicy")
return nil, ctrl.Result{}, err
}

rolloutStatus := map[string]*applicationapi.RolloutStatus{}
// Loop all destination cluster to get canary status
for clusterKey, cluster := range destinationClusters {
fleetClusterClient := cluster.Client.CtrlRuntimeClient()
name := generatePolicyResourceName(policyName, clusterKey.Kind, clusterKey.Name)
canary := &flaggerv1b1.Canary{}
canaryNamespacedName := types.NamespacedName{
Namespace: syncPolicy.Rollout.Workload.Namespace,
Name: syncPolicy.Rollout.Workload.Name,
}
// Use the client of the target cluster to get the status of Flagger canary resources
if err := fleetClusterClient.Get(ctx, canaryNamespacedName, canary); err != nil {
return nil, ctrl.Result{}, errors.Wrapf(err, "failed to get canary %s in %s", canaryNamespacedName.Name, clusterKey.Name)
}

if status, exists := rolloutStatus[name]; exists {
// If a match is found, update the existing rolloutStatus with the new status.
status.RolloutStatusInCluster = &canary.Status
} else {
currentstatus := applicationapi.RolloutStatus{
ClusterName: clusterKey.Name,
RolloutNameInCluster: canaryNamespacedName.Name,
RolloutStatusInCluster: &canary.Status,
}
rolloutStatus[name] = &currentstatus
}
}

log.Info("finish get rollout status")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Info("finish get rollout status")
log.Info("finished getting rollout status")

return rolloutStatus, ctrl.Result{RequeueAfter: StatusSyncInterval}, nil
}

func enableIstioSidecarInjection(ctx context.Context, kubeClient client.Client, namespace string) error {
log := ctrl.LoggerFrom(ctx)

Expand Down Expand Up @@ -399,3 +446,12 @@ func addLabels(obj client.Object, key, value string) client.Object {
obj.SetLabels(labels)
return obj
}

func mergeMap(map1, map2 map[string]*applicationapi.RolloutStatus) map[string]*applicationapi.RolloutStatus {
for name, rolloutStatus := range map1 {
if _, exist := map2[name]; !exist {
map2[name] = rolloutStatus
}
}
return map2
}
115 changes: 87 additions & 28 deletions pkg/fleet-manager/application/rollout_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,34 +396,6 @@ func Test_renderCanaryAnalysis(t *testing.T) {
}
}

/*
func Test_generateDeployConfig(t *testing.T) {
filepath := manifests.BuiltinOrDir("")
//fmt.Printf("%s", filepath)
deployname := "plugins/testloader-deploy.yaml"
namespacedName := types.NamespacedName{
Namespace: "test",
Name: "podinfo",
}
if _, err := generateDeployConfig(filepath, deployname, namespacedName.Name, namespacedName.Namespace); err != nil {
fmt.Printf("failed get testloader deployment configuration: %v", err)
}
}

func Test_generateSvcConfig(t *testing.T) {
filepath := manifests.BuiltinOrDir("")
//fmt.Printf("%s", filepath)
svcname := "plugins/testloader-svc.yaml"
namespacedName := types.NamespacedName{
Namespace: "test",
Name: "podinfo",
}
if _, err := generateSvcConfig(filepath, svcname, namespacedName.Name, namespacedName.Namespace); err != nil {
fmt.Printf("failed get testloader deployment configuration: %v", err)
}
}
*/

func Test_addLables(t *testing.T) {
type args struct {
obj client.Object
Expand Down Expand Up @@ -504,3 +476,90 @@ func Test_addLables(t *testing.T) {
})
}
}

func TestMergeMap(t *testing.T) {
type args struct {
map1 map[string]*applicationapi.RolloutStatus
map2 map[string]*applicationapi.RolloutStatus
}
tests := []struct {
name string
args args
want map[string]*applicationapi.RolloutStatus
}{
{
name: "function test",
args: args{
map1: map[string]*applicationapi.RolloutStatus{
"kurator": {
ClusterName: "kurator-member1",
RolloutNameInCluster: "podinfo",
RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{
Phase: "success",
},
},
"istio": {
ClusterName: "kurator-member2",
RolloutNameInCluster: "podinfo",
RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{
Phase: "Initializing",
},
},
},
map2: map[string]*applicationapi.RolloutStatus{
"kubeedge": {
ClusterName: "kurator-member1",
RolloutNameInCluster: "podinfo",
RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{
Phase: "success",
},
},
"karmada": {
ClusterName: "kurator-member1",
RolloutNameInCluster: "podinfo",
RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{
Phase: "Initializing",
},
},
},
},
want: map[string]*applicationapi.RolloutStatus{
"kurator": {
ClusterName: "kurator-member1",
RolloutNameInCluster: "podinfo",
RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{
Phase: "success",
},
},
"istio": {
ClusterName: "kurator-member2",
RolloutNameInCluster: "podinfo",
RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{
Phase: "Initializing",
},
},
"kubeedge": {
ClusterName: "kurator-member1",
RolloutNameInCluster: "podinfo",
RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{
Phase: "success",
},
},
"karmada": {
ClusterName: "kurator-member1",
RolloutNameInCluster: "podinfo",
RolloutStatusInCluster: &flaggerv1b1.CanaryStatus{
Phase: "Initializing",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := mergeMap(tt.args.map1, tt.args.map2); !reflect.DeepEqual(got, tt.want) {
t.Errorf("mergeMap() = %v, want %v", got, tt.want)
}
})
}
}
Loading