Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Url mutable with cache profile #432

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api/v1/model_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type ModelSpec struct {
// "ollama://<model>"
//
// +kubebuilder:validation:Required
// +kubebuilder:validation:XValidation:rule="self == oldSelf", message="url is immutable."
// +kubebuilder:validation:XValidation:rule="self.startsWith(\"hf://\") || self.startsWith(\"pvc://\") || self.startsWith(\"ollama://\") || self.startsWith(\"s3://\") || self.startsWith(\"gs://\") || self.startsWith(\"oss://\")", message="url must start with \"hf://\", \"pvc://\", \"ollama://\", \"s3://\", \"gs://\", or \"oss://\" and not be empty."
URL string `json:"url"`

Expand Down
2 changes: 0 additions & 2 deletions charts/kubeai/templates/crds/kubeai.org_models.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ spec:
"ollama://<model>"
type: string
x-kubernetes-validations:
- message: url is immutable.
rule: self == oldSelf
- message: url must start with "hf://", "pvc://", "ollama://", "s3://",
"gs://", or "oss://" and not be empty.
rule: self.startsWith("hf://") || self.startsWith("pvc://") || self.startsWith("ollama://")
Expand Down
80 changes: 71 additions & 9 deletions internal/modelcontroller/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,67 @@ func (r *ModelReconciler) reconcileCache(ctx context.Context, model *kubeaiv1.Mo
return ctrl.Result{}, fmt.Errorf("adding cache deletion finalizer: %w", err)
}
}
}

// Check if URL has changed by comparing with the URL stored in the PVC annotation
pvcModelAnn, err := parsePVCModelAnnotation(pvc, model.Name)
if err != nil {
return ctrl.Result{}, fmt.Errorf("parsing pvc model annotation: %w", err)
}

// Get the stored model URL from annotations if it exists
var storedURL string
if pvc.Annotations != nil {
storedURLKey := fmt.Sprintf("%s/url", kubeaiv1.PVCModelAnnotation(model.Name))
storedURL = pvc.Annotations[storedURLKey]
}

// If URL has changed and we have a previous URL, we need to evict the cache first
urlChanged := storedURL != "" && storedURL != model.Spec.URL

// If URL has changed, we need to evict the cache first
if urlChanged && pvcModelAnn.UID == string(model.UID) {
// Create eviction job if it doesn't exist
evictJob := &batchv1.Job{}
var evictJobExists bool
if err := r.Client.Get(ctx, types.NamespacedName{
Namespace: model.Namespace,
Name: evictCacheJobName(model),
}, evictJob); err != nil {
if apierrors.IsNotFound(err) {
evictJobExists = false
} else {
return ctrl.Result{}, fmt.Errorf("getting cache eviction job: %w", err)
}
} else {
evictJobExists = true
}

if !evictJobExists {
job := r.evictCacheJobForModel(model, cfg)
if err := ctrl.SetControllerReference(model, job, r.Scheme); err != nil {
return ctrl.Result{}, fmt.Errorf("setting controller reference on cache eviction job: %w", err)
}
if err := r.Create(ctx, job); err != nil {
return ctrl.Result{}, fmt.Errorf("creating cache eviction job: %w", err)
}
return ctrl.Result{}, errReturnEarly
}

// Wait for the eviction job to complete
if !k8sutils.IsJobCompleted(evictJob) {
return ctrl.Result{}, errReturnEarly
}

// Delete the eviction job
if err := r.Delete(ctx, evictJob, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil {
return ctrl.Result{}, fmt.Errorf("deleting eviction job: %w", err)
}

// Reset the cache status
model.Status.Cache.Loaded = false
pvcModelAnn.UID = ""
}
// NOTE: .Spec.CacheProfile and .Spec.URL are immutable, so we don't need to check if they
// have changed in order to evict a stale cache.

loadJob := &batchv1.Job{}
var jobExists bool
Expand All @@ -90,13 +147,8 @@ func (r *ModelReconciler) reconcileCache(ctx context.Context, model *kubeaiv1.Mo
jobExists = true
}

pvcModelAnn, err := parsePVCModelAnnotation(pvc, model.Name)
if err != nil {
return ctrl.Result{}, fmt.Errorf("parsing pvc model annotation: %w", err)
}

// Run Job to populate PVC if not already downloaded.
if pvcModelAnn.UID != string(model.UID) {
// Run Job to populate PVC if not already downloaded or if URL has changed
if pvcModelAnn.UID != string(model.UID) || urlChanged {
// Ensure the download job exists.
if !jobExists {
loadJob = r.loadCacheJobForModel(model, cfg)
Expand All @@ -118,6 +170,16 @@ func (r *ModelReconciler) reconcileCache(ctx context.Context, model *kubeaiv1.Mo
}); err != nil {
return ctrl.Result{}, fmt.Errorf("setting pvc model annotation: %w", err)
}

// Store the current URL in the PVC annotation
if pvc.Annotations == nil {
pvc.Annotations = make(map[string]string)
}
urlKey := fmt.Sprintf("%s/url", kubeaiv1.PVCModelAnnotation(model.Name))
pvc.Annotations[urlKey] = model.Spec.URL
if err := r.Client.Update(ctx, pvc); err != nil {
return ctrl.Result{}, fmt.Errorf("updating pvc with url annotation: %w", err)
}
}
model.Status.Cache.Loaded = pvcModelAnn.UID == string(model.UID)

Expand Down
67 changes: 65 additions & 2 deletions test/e2e/quickstart/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ models_release="kubeai-models"

helm install $models_release $REPO_DIR/charts/models -f - <<EOF
catalog:
gemma2-2b-cpu:
deepseek-r1-1.5b-cpu:
enabled: true
features: [TextGeneration]
url: 'ollama://deepseek-r1:1.5b'
engine: OLlama
minReplicas: 1
resourceProfile: 'cpu:1'
qwen2-500m-cpu:
enabled: true
nomic-embed-text-cpu:
Expand All @@ -20,4 +24,63 @@ EOF
curl http://localhost:8000/openai/v1/completions \
--max-time 900 \
-H "Content-Type: application/json" \
-d '{"model": "gemma2-2b-cpu", "prompt": "Who was the first president of the United States?", "max_tokens": 40}'
-d '{"model": "deepseek-r1-1.5b-cpu", "prompt": "Who was the first president of the United States?", "max_tokens": 40}'


# Verify that the Model URL can be updated without requests failing.
DEEPSEEK_POD=$(kubectl get pod -l model=deepseek-r1-1.5b-cpu -o jsonpath='{.items[0].metadata.name}')
OLD_MODEL_URL="ollama://deepseek-r1:1.5b"
NEW_MODEL_URL="ollama://qwen2.5:0.5b"
OLD_MODEL_NAME=${OLD_MODEL_URL#ollama://}
NEW_MODEL_NAME=${NEW_MODEL_URL#ollama://}

kubectl patch model deepseek-r1-1.5b-cpu --type=merge -p "{\"spec\": {\"url\": \"$NEW_MODEL_URL\"}}"

check_pod_gone() {
! kubectl get pod $DEEPSEEK_POD | grep -q "Running"
}

# Make a request to the model
make_request() {
curl http://localhost:8000/openai/v1/completions \
--max-time 900 \
-H "Content-Type: application/json" \
-d '{"model": "deepseek-r1-1.5b-cpu", "prompt": "Who was the first president of the United States?", "max_tokens": 40}'

# Check if the old pod is gone
check_pod_gone
}

retry 120 make_request

# Verify that the rollout was successful
echo "Verifying successful rollout..."

# List the new pods for the model
echo "Current pods for the model:"
kubectl get pods -l model=deepseek-r1-1.5b-cpu

# For Ollama models, the model URL is in the startup probe command, not in container args
NEW_POD=$(kubectl get pod -l model=deepseek-r1-1.5b-cpu -o jsonpath='{.items[0].metadata.name}')
STARTUP_PROBE_CMD=$(kubectl get pod $NEW_POD -o jsonpath='{.spec.containers[0].startupProbe.exec.command[2]}')
echo "Startup probe command for the new pod:"
echo "$STARTUP_PROBE_CMD"

# Verify that the new model URL is in the startup probe command
if ! echo "$STARTUP_PROBE_CMD" | grep -q "$NEW_MODEL_NAME"; then
echo "❌ Rollout verification failed: New model name '$NEW_MODEL_NAME' not found in startup probe command"
exit 1
fi

# Check that the old URL is no longer in use
if echo "$STARTUP_PROBE_CMD" | grep -q "$OLD_MODEL_NAME"; then
echo "❌ Rollout verification failed: Old model name '$OLD_MODEL_NAME' still found in startup probe command"
exit 1
fi

# Also check that the model is actually available by making a request
echo "Making a request to verify the model is available..."
curl http://localhost:8000/openai/v1/completions \
--max-time 900 \
-H "Content-Type: application/json" \
-d '{"model": "deepseek-r1-1.5b-cpu", "prompt": "Who was the first president of the United States?", "max_tokens": 40}'
33 changes: 24 additions & 9 deletions test/integration/model_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,21 +292,22 @@ func TestModelValidation(t *testing.T) {
},
{
model: v1.Model{
ObjectMeta: metadata("mutate-url-invalid"),
ObjectMeta: metadata("mutate-cacheprofile-invalid"),
Spec: v1.ModelSpec{
URL: "hf://test-repo/test-model",
Engine: "VLLM",
Features: []v1.ModelFeature{},
URL: "hf://test-repo/test-model",
Engine: "VLLM",
Features: []v1.ModelFeature{},
CacheProfile: "some-cache-profile",
},
},
update: func(m *v1.Model) {
m.Spec.URL = "hf://update-test-repo/update-test-model"
m.Spec.CacheProfile = "some-updated-cache-profile"
},
expErrContain: "url is immutable",
expErrContain: "cacheProfile is immutable",
},
{
model: v1.Model{
ObjectMeta: metadata("mutate-cacheprofile-invalid"),
ObjectMeta: metadata("url-mutable-with-cache-profile-valid"),
Spec: v1.ModelSpec{
URL: "hf://test-repo/test-model",
Engine: "VLLM",
Expand All @@ -315,9 +316,23 @@ func TestModelValidation(t *testing.T) {
},
},
update: func(m *v1.Model) {
m.Spec.CacheProfile = "some-updated-cache-profile"
m.Spec.URL = "hf://test-repo/updated-model"
},
expErrContain: "cacheProfile is immutable",
expValid: true,
},
{
model: v1.Model{
ObjectMeta: metadata("url-mutable-without-cache-profile-valid"),
Spec: v1.ModelSpec{
URL: "hf://test-repo/test-model",
Engine: "VLLM",
Features: []v1.ModelFeature{},
},
},
update: func(m *v1.Model) {
m.Spec.URL = "hf://test-repo/updated-model"
},
expValid: true,
},
}
for _, c := range cases {
Expand Down