Skip to content

Commit

Permalink
appwrapper multi-kueue adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
dgrove-oss committed Jan 28, 2025
1 parent 595cf00 commit cb8ac77
Show file tree
Hide file tree
Showing 11 changed files with 540 additions and 3 deletions.
1 change: 1 addition & 0 deletions Makefile-test.mk
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ run-test-multikueue-e2e-%: FORCE
@echo Running multikueue e2e for k8s ${K8S_VERSION}
E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) \
ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" \
APPWRAPPER_VERSION=$(APPWRAPPER_VERSION) \
JOBSET_VERSION=$(JOBSET_VERSION) KUBEFLOW_VERSION=$(KUBEFLOW_VERSION) \
KUBEFLOW_MPI_VERSION=$(KUBEFLOW_MPI_VERSION) KUBERAY_VERSION=$(KUBERAY_VERSION) \
./hack/multikueue-e2e-test.sh
Expand Down
5 changes: 5 additions & 0 deletions hack/multikueue-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ function kind_load {
install_jobset "$WORKER1_KIND_CLUSTER_NAME"
install_jobset "$WORKER2_KIND_CLUSTER_NAME"

# APPWRAPPER SETUP
install_appwrapper "$MANAGER_KIND_CLUSTER_NAME"
install_appwrapper "$WORKER1_KIND_CLUSTER_NAME"
install_appwrapper "$WORKER2_KIND_CLUSTER_NAME"

# KUBEFLOW SETUP
# In order for MPI-operator and Training-operator to work on the same cluster it is required that:
# 1. 'kubeflow.org_mpijobs.yaml' is removed from base/crds/kustomization.yaml - https://github.com/kubeflow/training-operator/issues/1930
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/jobs/appwrapper/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func init() {
SetupIndexes: SetupIndexes,
AddToScheme: awv1beta2.AddToScheme,
IsManagingObjectsOwner: isAppWrapper,
MultiKueueAdapter: &multikueueAdapter{},
}))
}

Expand Down
140 changes: 140 additions & 0 deletions pkg/controller/jobs/appwrapper/appwrapper_multikueue_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package appwrapper

import (
"context"
"errors"
"fmt"

awv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/util/api"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
)

type multikueueAdapter struct{}

var _ jobframework.MultiKueueAdapter = (*multikueueAdapter)(nil)

func (b *multikueueAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error {
log := ctrl.LoggerFrom(ctx)

localAppWrapper := awv1beta2.AppWrapper{}
err := localClient.Get(ctx, key, &localAppWrapper)
if err != nil {
return err
}

remoteAppWrapper := awv1beta2.AppWrapper{}
err = remoteClient.Get(ctx, key, &remoteAppWrapper)
if client.IgnoreNotFound(err) != nil {
return err
}

// if the remote exists, just copy the status
if err == nil {
if localAppWrapper.Spec.Suspend {
// Ensure the appwrapper is unsuspended before updating its status; otherwise, it will fail when patching the spec.
log.V(2).Info("Skipping the sync since the local appwrapper is still suspended")
return nil
}
return clientutil.PatchStatus(ctx, localClient, &localAppWrapper, func() (bool, error) {
localAppWrapper.Status = remoteAppWrapper.Status
return true, nil
})
}

// Make a copy of the local AppWrapper
remoteAppWrapper = awv1beta2.AppWrapper{
ObjectMeta: api.CloneObjectMetaForCreation(&localAppWrapper.ObjectMeta),
Spec: *localAppWrapper.Spec.DeepCopy(),
}

// add the prebuilt workload
if remoteAppWrapper.Labels == nil {
remoteAppWrapper.Labels = map[string]string{}
}
remoteAppWrapper.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteAppWrapper.Labels[kueue.MultiKueueOriginLabel] = origin

// clear the managedBy to enable the remote AppWrapper controller to take over
remoteAppWrapper.Spec.ManagedBy = nil

return remoteClient.Create(ctx, &remoteAppWrapper)
}

func (b *multikueueAdapter) DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error {
aw := awv1beta2.AppWrapper{}
err := remoteClient.Get(ctx, key, &aw)
if err != nil {
return client.IgnoreNotFound(err)
}
return client.IgnoreNotFound(remoteClient.Delete(ctx, &aw))
}

func (b *multikueueAdapter) GVK() schema.GroupVersionKind {
return gvk
}

func (b *multikueueAdapter) KeepAdmissionCheckPending() bool {
return false
}

func (b *multikueueAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) {
aw := awv1beta2.AppWrapper{}
err := c.Get(ctx, key, &aw)
if err != nil {
return false, "", err
}
awControllerName := ptr.Deref(aw.Spec.ManagedBy, "")
if awControllerName != kueue.MultiKueueControllerName {
return false, fmt.Sprintf("Expecting spec.managedBy to be %q not %q", kueue.MultiKueueControllerName, awControllerName), nil
}
return true, "", nil

}

var _ jobframework.MultiKueueWatcher = (*multikueueAdapter)(nil)

func (*multikueueAdapter) GetEmptyList() client.ObjectList {
return &awv1beta2.AppWrapperList{}
}

func (*multikueueAdapter) WorkloadKeyFor(o runtime.Object) (types.NamespacedName, error) {
aw, ok := o.(*awv1beta2.AppWrapper)
if !ok {
return types.NamespacedName{}, errors.New("not an appwrapper")
}

prebuiltWl, hasPrebuiltWorkload := aw.Labels[constants.PrebuiltWorkloadLabel]
if !hasPrebuiltWorkload {
return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for appwrapper: %s", klog.KObj(aw))
}

return types.NamespacedName{Name: prebuiltWl, Namespace: aw.Namespace}, nil
}
Loading

0 comments on commit cb8ac77

Please sign in to comment.