Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KEP-672: Implement the DependsOn API #740

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/jobset/v1alpha2/jobset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ type ReplicatedJob struct {
// The Order of ReplicatedJobs is defined by their enumeration in the slice.
// Note, that the first ReplicatedJob in the slice cannot use the DependsOn API.
// Currently, only a single item is supported in the DependsOn list.
// If JobSet is suspended the all active ReplicatedJobs will be suspended. When JobSet is
// resumed the Job sequence starts again.
// This API is mutually exclusive with the StartupPolicy API.
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="Value is immutable"
// +kubebuilder:validation:MaxItems=1
Expand Down
2 changes: 1 addition & 1 deletion api/jobset/v1alpha2/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/components/crd/bases/jobset.x-k8s.io_jobsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ spec:
The Order of ReplicatedJobs is defined by their enumeration in the slice.
Note, that the first ReplicatedJob in the slice cannot use the DependsOn API.
Currently, only a single item is supported in the DependsOn list.
If JobSet is suspended the all active ReplicatedJobs will be suspended. When JobSet is
resumed the Job sequence starts again.
This API is mutually exclusive with the StartupPolicy API.
items:
description: DependsOn defines the dependency on the previous
Expand Down
2 changes: 1 addition & 1 deletion hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@
],
"properties": {
"dependsOn": {
"description": "DependsOn is an optional list that specifies the preceding ReplicatedJobs upon which the current ReplicatedJob depends. If specified, the ReplicatedJob will be created only after the referenced ReplicatedJobs reach their desired state. The Order of ReplicatedJobs is defined by their enumeration in the slice. Note, that the first ReplicatedJob in the slice cannot use the DependsOn API. Currently, only a single item is supported in the DependsOn list. This API is mutually exclusive with the StartupPolicy API.",
"description": "DependsOn is an optional list that specifies the preceding ReplicatedJobs upon which the current ReplicatedJob depends. If specified, the ReplicatedJob will be created only after the referenced ReplicatedJobs reach their desired state. The Order of ReplicatedJobs is defined by their enumeration in the slice. Note, that the first ReplicatedJob in the slice cannot use the DependsOn API. Currently, only a single item is supported in the DependsOn list. If JobSet is suspended the all active ReplicatedJobs will be suspended. When JobSet is resumed the Job sequence starts again. This API is mutually exclusive with the StartupPolicy API.",
"type": "array",
"items": {
"default": {},
Expand Down
17 changes: 13 additions & 4 deletions pkg/controllers/depends_on.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,29 @@ import (
)

// dependencyReachedStatus checks if dependant ReplicatedJob reaches Ready or Complete status.
func dependencyReachedStatus(dependsOnJob jobset.DependsOn, dependsOnJobReplicas int32, rJobsStatuses []jobset.ReplicatedJobStatus) bool {
// If the actual status is empty, return false.
// func dependencyReachedStatus(dependsOnJob jobset.DependsOn, dependsOnJobReplicas int32, rJobsStatuses []jobset.ReplicatedJobStatus) bool {
func dependencyReachedStatus(rJob jobset.ReplicatedJob, rJobReplicas map[string]int32, rJobsStatuses []jobset.ReplicatedJobStatus) bool {
// Check is ReplicatedJob has any dependencies.
if rJob.DependsOn == nil {
return true
}

// Get the dependant ReplicatedJob. Currently, the ReplicatedJob supports only a single dependency.
dependsOnJob := rJob.DependsOn[0]

// If the actual status of dependant ReplicatedJob is empty, return false.
actualStatus := findReplicatedJobStatus(rJobsStatuses, dependsOnJob.Name)
if actualStatus == nil {
return false
}

// For Complete status, number of replicas must be equal to number of succeeded Jobs.
if dependsOnJob.Status == jobset.DependencyComplete && dependsOnJobReplicas == actualStatus.Succeeded {
if dependsOnJob.Status == jobset.DependencyComplete && rJobReplicas[dependsOnJob.Name] == actualStatus.Succeeded {
return true
}

// For Ready status, number of replicas must be equal to sum of ready, failed, and succeeded Jobs.
if dependsOnJob.Status == jobset.DependencyReady && dependsOnJobReplicas == actualStatus.Failed+actualStatus.Ready+actualStatus.Succeeded {
if dependsOnJob.Status == jobset.DependencyReady && rJobReplicas[dependsOnJob.Name] == actualStatus.Failed+actualStatus.Ready+actualStatus.Succeeded {
return true
}

Expand Down
114 changes: 89 additions & 25 deletions pkg/controllers/depends_on_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,46 @@ import (
"github.com/google/go-cmp/cmp"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
testutils "sigs.k8s.io/jobset/pkg/util/testing"
)

func TestDependencyReachedStatus(t *testing.T) {
rJobInitializer := "initializer"
rJobTrainer := "trainer-node"

tests := []struct {
name string
dependsOnJob jobset.DependsOn
dependsOnJobReplicas int32
rJobsStatuses []jobset.ReplicatedJobStatus
expected bool
name string
rJob jobset.ReplicatedJob
rJobReplicas map[string]int32
rJobsStatuses []jobset.ReplicatedJobStatus
expected bool
}{
{
name: "ReplicatedJob doesn't have any dependencies",
rJob: testutils.MakeReplicatedJob(rJobInitializer).
Obj(),
rJobReplicas: map[string]int32{
rJobInitializer: 1,
},
rJobsStatuses: []jobset.ReplicatedJobStatus{},
expected: true,
},
{
name: "status for ReplicatedJob is nil",
dependsOnJob: jobset.DependsOn{
Name: "initializer", Status: jobset.DependencyComplete,
rJob: testutils.MakeReplicatedJob(rJobTrainer).
DependsOn(
[]jobset.DependsOn{
{
Name: rJobInitializer,
Status: jobset.DependencyComplete,
},
},
).
Obj(),
rJobReplicas: map[string]int32{
rJobInitializer: 1,
rJobTrainer: 1,
},
dependsOnJobReplicas: 1,
rJobsStatuses: []jobset.ReplicatedJobStatus{
{
Name: "invalid",
Expand All @@ -36,13 +60,23 @@ func TestDependencyReachedStatus(t *testing.T) {
},
{
name: "depends on ReplicatedJob reaches complete status",
dependsOnJob: jobset.DependsOn{
Name: "initializer", Status: jobset.DependencyComplete,
rJob: testutils.MakeReplicatedJob(rJobTrainer).
DependsOn(
[]jobset.DependsOn{
{
Name: rJobInitializer,
Status: jobset.DependencyComplete,
},
},
).
Obj(),
rJobReplicas: map[string]int32{
rJobInitializer: 2,
rJobTrainer: 1,
},
dependsOnJobReplicas: 2,
rJobsStatuses: []jobset.ReplicatedJobStatus{
{
Name: "initializer",
Name: rJobInitializer,
Ready: 0,
Succeeded: 2,
Failed: 0,
Expand All @@ -54,13 +88,23 @@ func TestDependencyReachedStatus(t *testing.T) {
},
{
name: "depends on ReplicatedJob doesn't reach complete status",
dependsOnJob: jobset.DependsOn{
Name: "initializer", Status: jobset.DependencyComplete,
rJob: testutils.MakeReplicatedJob(rJobTrainer).
DependsOn(
[]jobset.DependsOn{
{
Name: rJobInitializer,
Status: jobset.DependencyComplete,
},
},
).
Obj(),
rJobReplicas: map[string]int32{
rJobInitializer: 2,
rJobTrainer: 1,
},
dependsOnJobReplicas: 2,
rJobsStatuses: []jobset.ReplicatedJobStatus{
{
Name: "initializer",
Name: rJobInitializer,
Ready: 1,
Succeeded: 1,
Failed: 0,
Expand All @@ -72,13 +116,23 @@ func TestDependencyReachedStatus(t *testing.T) {
},
{
name: "depends on ReplicatedJob reaches ready status",
dependsOnJob: jobset.DependsOn{
Name: "initializer", Status: jobset.DependencyReady,
rJob: testutils.MakeReplicatedJob(rJobTrainer).
DependsOn(
[]jobset.DependsOn{
{
Name: rJobInitializer,
Status: jobset.DependencyReady,
},
},
).
Obj(),
rJobReplicas: map[string]int32{
rJobInitializer: 3,
rJobTrainer: 1,
},
dependsOnJobReplicas: 3,
rJobsStatuses: []jobset.ReplicatedJobStatus{
{
Name: "initializer",
Name: rJobInitializer,
Ready: 1,
Succeeded: 1,
Failed: 1,
Expand All @@ -90,13 +144,23 @@ func TestDependencyReachedStatus(t *testing.T) {
},
{
name: "depends on ReplicatedJob doesn't reach ready status",
dependsOnJob: jobset.DependsOn{
Name: "initializer", Status: jobset.DependencyReady,
rJob: testutils.MakeReplicatedJob(rJobTrainer).
DependsOn(
[]jobset.DependsOn{
{
Name: rJobInitializer,
Status: jobset.DependencyReady,
},
},
).
Obj(),
rJobReplicas: map[string]int32{
rJobInitializer: 3,
rJobTrainer: 1,
},
dependsOnJobReplicas: 3,
rJobsStatuses: []jobset.ReplicatedJobStatus{
{
Name: "initializer",
Name: rJobInitializer,
Ready: 2,
Succeeded: 0,
Failed: 0,
Expand All @@ -109,7 +173,7 @@ func TestDependencyReachedStatus(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actual := dependencyReachedStatus(tc.dependsOnJob, tc.dependsOnJobReplicas, tc.rJobsStatuses)
actual := dependencyReachedStatus(tc.rJob, tc.rJobReplicas, tc.rJobsStatuses)
if diff := cmp.Diff(tc.expected, actual); diff != "" {
t.Errorf("unexpected finished value (+got/-want): %s", diff)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset

rJobsReplicas[replicatedJob.Name] = replicatedJob.Replicas

// For depends on, the Job is created only after the dependent ReplicatedJob reaches the status.
if replicatedJob.DependsOn != nil && !dependencyReachedStatus(replicatedJob.DependsOn[0], rJobsReplicas[replicatedJob.DependsOn[0].Name], replicatedJobStatuses) {
// For depends on, the ReplicatedJob is created only after the previous ReplicatedJob reached the status.
if !dependencyReachedStatus(replicatedJob, rJobsReplicas, replicatedJobStatuses) {
continue
}

Expand Down Expand Up @@ -523,8 +523,8 @@ func (r *JobSetReconciler) reconcileReplicatedJobs(ctx context.Context, js *jobs

rJobsReplicas[replicatedJob.Name] = replicatedJob.Replicas

// For depends on, the Job is created only after the previous replicatedJob reached the status.
if replicatedJob.DependsOn != nil && !dependencyReachedStatus(replicatedJob.DependsOn[0], rJobsReplicas[replicatedJob.DependsOn[0].Name], replicatedJobStatuses) {
// For depends on, the ReplicatedJob is created only after the previous ReplicatedJob reached the status.
if !dependencyReachedStatus(replicatedJob, rJobsReplicas, replicatedJobStatuses) {
continue
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/webhooks/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ func (j *jobSetWebhook) ValidateCreate(ctx context.Context, obj runtime.Object)
}
}

// Map where key is ReplicatedJob name and value is ReplicatedJob index.
replicatedJobNames := map[string]int{}
// Map where key is ReplicatedJob name.
replicatedJobNames := map[string]bool{}

// Validate each replicatedJob.
for rIdx, rJob := range js.Spec.ReplicatedJobs {
for _, rJob := range js.Spec.ReplicatedJobs {
var parallelism int32 = 1
if rJob.Template.Spec.Parallelism != nil {
parallelism = *rJob.Template.Spec.Parallelism
Expand Down Expand Up @@ -227,11 +227,11 @@ func (j *jobSetWebhook) ValidateCreate(ctx context.Context, obj runtime.Object)
allErrs = append(allErrs, errors.New(errMessage))
}
}
replicatedJobNames[rJob.Name] = rIdx
replicatedJobNames[rJob.Name] = true
// Check that DependsOn references the previous ReplicatedJob.
if rIdx > 0 && rJob.DependsOn != nil {
dependsOnIdx, ok := replicatedJobNames[rJob.DependsOn[0].Name]
if !ok || rIdx <= dependsOnIdx {
if rJob.DependsOn != nil {
_, ok := replicatedJobNames[rJob.DependsOn[0].Name]
if !ok {
allErrs = append(allErrs, fmt.Errorf("replicatedJob: %s cannot depend on replicatedJob: %s", rJob.Name, rJob.DependsOn[0].Name))
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/docs/JobsetV1alpha2ReplicatedJob.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**depends_on** | [**List[JobsetV1alpha2DependsOn]**](JobsetV1alpha2DependsOn.md) | DependsOn is an optional list that specifies the preceding ReplicatedJobs upon which the current ReplicatedJob depends. If specified, the ReplicatedJob will be created only after the referenced ReplicatedJobs reach their desired state. The Order of ReplicatedJobs is defined by their enumeration in the slice. Note, that the first ReplicatedJob in the slice cannot use the DependsOn API. This API is mutually exclusive with the StartupPolicy API. | [optional]
**depends_on** | [**List[JobsetV1alpha2DependsOn]**](JobsetV1alpha2DependsOn.md) | DependsOn is an optional list that specifies the preceding ReplicatedJobs upon which the current ReplicatedJob depends. If specified, the ReplicatedJob will be created only after the referenced ReplicatedJobs reach their desired state. The Order of ReplicatedJobs is defined by their enumeration in the slice. Note, that the first ReplicatedJob in the slice cannot use the DependsOn API. Currently, only a single item is supported in the DependsOn list. If JobSet is suspended the all active ReplicatedJobs will be suspended. When JobSet is resumed the Job sequence starts again. This API is mutually exclusive with the StartupPolicy API. | [optional]
**name** | **str** | Name is the name of the entry and will be used as a suffix for the Job name. | [default to '']
**replicas** | **int** | Replicas is the number of jobs that will be created from this ReplicatedJob&#39;s template. Jobs names will be in the format: &lt;jobSet.name&gt;-&lt;spec.replicatedJob.name&gt;-&lt;job-index&gt; | [optional]
**template** | [**V1JobTemplateSpec**](V1JobTemplateSpec.md) | |
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/jobset/models/jobset_v1alpha2_replicated_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class JobsetV1alpha2ReplicatedJob(BaseModel):
"""
JobsetV1alpha2ReplicatedJob
""" # noqa: E501
depends_on: Optional[List[JobsetV1alpha2DependsOn]] = Field(default=None, description="DependsOn is an optional list that specifies the preceding ReplicatedJobs upon which the current ReplicatedJob depends. If specified, the ReplicatedJob will be created only after the referenced ReplicatedJobs reach their desired state. The Order of ReplicatedJobs is defined by their enumeration in the slice. Note, that the first ReplicatedJob in the slice cannot use the DependsOn API. This API is mutually exclusive with the StartupPolicy API.", alias="dependsOn")
depends_on: Optional[List[JobsetV1alpha2DependsOn]] = Field(default=None, description="DependsOn is an optional list that specifies the preceding ReplicatedJobs upon which the current ReplicatedJob depends. If specified, the ReplicatedJob will be created only after the referenced ReplicatedJobs reach their desired state. The Order of ReplicatedJobs is defined by their enumeration in the slice. Note, that the first ReplicatedJob in the slice cannot use the DependsOn API. Currently, only a single item is supported in the DependsOn list. If JobSet is suspended the all active ReplicatedJobs will be suspended. When JobSet is resumed the Job sequence starts again. This API is mutually exclusive with the StartupPolicy API.", alias="dependsOn")
name: StrictStr = Field(description="Name is the name of the entry and will be used as a suffix for the Job name.")
replicas: Optional[StrictInt] = Field(default=None, description="Replicas is the number of jobs that will be created from this ReplicatedJob's template. Jobs names will be in the format: <jobSet.name>-<spec.replicatedJob.name>-<job-index>")
template: V1JobTemplateSpec
Expand Down
Loading