Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to define toleration on k8s jobs #751

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Wave, containers provisioning service
* Copyright (c) 2024, Seqera Labs
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package io.seqera.wave.configuration

import io.micronaut.context.annotation.ConfigurationProperties
import io.micronaut.core.annotation.Introspected
/**
* kubernetes toleration config
*
* @author Munish Chouhan <[email protected]>
*/
@Introspected
@ConfigurationProperties('wave.build.k8s.tolerations')
class K8sTolerationsConfig {
boolean enabled;
List<Toleration> arm64;
List<Toleration> amd64;

static class Toleration {
String key;
String value;
String operator;
String effect;
}
}
8 changes: 8 additions & 0 deletions src/main/groovy/io/seqera/wave/core/ContainerPlatform.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,12 @@ class ContainerPlatform {
}
return variant
}

boolean isARM64(){
return ARM64.contains(arch)
}

boolean isAMD64(){
return AMD64.contains(arch)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class KubeBuildStrategy extends BuildStrategy {
final buildCmd = launchCmd(req)
final timeout = req.maxDuration ?: buildConfig.defaultTimeout
final selector= getSelectorLabel(req.platform, nodeSelectorMap)
k8sService.launchBuildJob(jobName, buildImage, buildCmd, req.workDir, configFile, timeout, selector)
k8sService.launchBuildJob(jobName, buildImage, buildCmd, req.workDir, configFile, timeout, selector, req.platform)
}
catch (ApiException e) {
throw new BadRequestException("Unexpected build failure - ${e.responseBody}", e)
Expand Down
3 changes: 2 additions & 1 deletion src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.kubernetes.client.openapi.models.V1Pod
import io.seqera.wave.configuration.BlobCacheConfig
import io.seqera.wave.configuration.MirrorConfig
import io.seqera.wave.configuration.ScanConfig
import io.seqera.wave.core.ContainerPlatform
/**
* Defines Kubernetes operations
*
Expand All @@ -47,7 +48,7 @@ interface K8sService {

V1Job launchTransferJob(String name, String containerImage, List<String> args, BlobCacheConfig blobConfig)

V1Job launchBuildJob(String name, String containerImage, List<String> args, Path workDir, Path creds, Duration timeout, Map<String,String> nodeSelector)
V1Job launchBuildJob(String name, String containerImage, List<String> args, Path workDir, Path creds, Duration timeout, Map<String,String> nodeSelector, ContainerPlatform platform)

V1Job launchScanJob(String name, String containerImage, List<String> args, Path workDir, Path creds, ScanConfig scanConfig)

Expand Down
39 changes: 36 additions & 3 deletions src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimVolumeSource
import io.kubernetes.client.openapi.models.V1Pod
import io.kubernetes.client.openapi.models.V1PodBuilder
import io.kubernetes.client.openapi.models.V1ResourceRequirements
import io.kubernetes.client.openapi.models.V1Toleration
import io.kubernetes.client.openapi.models.V1TolerationBuilder
import io.kubernetes.client.openapi.models.V1Volume
import io.kubernetes.client.openapi.models.V1VolumeMount
import io.micronaut.context.annotation.Property
Expand All @@ -43,6 +45,7 @@ import io.micronaut.context.annotation.Value
import io.micronaut.core.annotation.Nullable
import io.seqera.wave.configuration.BlobCacheConfig
import io.seqera.wave.configuration.BuildConfig
import io.seqera.wave.configuration.K8sTolerationsConfig
import io.seqera.wave.configuration.MirrorConfig
import io.seqera.wave.configuration.ScanConfig
import io.seqera.wave.core.ContainerPlatform
Expand Down Expand Up @@ -101,6 +104,9 @@ class K8sServiceImpl implements K8sService {
@Inject
private BuildConfig buildConfig

@Inject
private K8sTolerationsConfig k8sTolerationsConfig

// check this link to know more about these options https://github.com/moby/buildkit/tree/master/examples/kubernetes#kubernetes-manifests-for-buildkit
private final static Map<String,String> BUILDKIT_FLAGS = ['BUILDKITD_FLAGS': '--oci-worker-no-process-sandbox']

Expand Down Expand Up @@ -491,15 +497,15 @@ class K8sServiceImpl implements K8sService {
* The {@link V1Pod} description the submitted pod
*/
@Override
V1Job launchBuildJob(String name, String containerImage, List<String> args, Path workDir, Path creds, Duration timeout, Map<String,String> nodeSelector) {
final spec = buildJobSpec(name, containerImage, args, workDir, creds, timeout, nodeSelector)
V1Job launchBuildJob(String name, String containerImage, List<String> args, Path workDir, Path creds, Duration timeout, Map<String,String> nodeSelector, ContainerPlatform platform) {
final spec = buildJobSpec(name, containerImage, args, workDir, creds, timeout, nodeSelector, platform)
return k8sClient
.batchV1Api()
.createNamespacedJob(namespace, spec)
.execute()
}

V1Job buildJobSpec(String name, String containerImage, List<String> args, Path workDir, Path credsFile, Duration timeout, Map<String,String> nodeSelector) {
V1Job buildJobSpec(String name, String containerImage, List<String> args, Path workDir, Path credsFile, Duration timeout, Map<String,String> nodeSelector, ContainerPlatform platform) {

// dirty dependency to avoid introducing another parameter
final singularity = containerImage.contains('singularity')
Expand Down Expand Up @@ -546,6 +552,15 @@ class K8sServiceImpl implements K8sService {
.withRestartPolicy("Never")
.addAllToVolumes(volumes)

//set toleration for build pods
if( k8sTolerationsConfig.enabled && platform ) {
if ( platform.isARM64() ) {
spec.withTolerations(buildTolerations(k8sTolerationsConfig.arm64))
} else if ( platform.isAMD64() ) {
spec.withTolerations(buildTolerations(k8sTolerationsConfig.amd64))
}
}

final requests = new V1ResourceRequirements()
if( requestsCpu )
requests.putRequestsItem('cpu', new Quantity(requestsCpu))
Expand Down Expand Up @@ -752,4 +767,22 @@ class K8sServiceImpl implements K8sService {
}
return latest
}

/**
* Creates List of Tolerations
*/
protected List<V1Toleration> buildTolerations(List<K8sTolerationsConfig.Toleration> tolerations){
if( !tolerations )
return null;
V1TolerationBuilder builder = new V1TolerationBuilder();
List<V1Toleration> v1Tolerations = new ArrayList<>();
for(def toleration: tolerations) {
v1Tolerations.add(builder.withKey(toleration.key)
.withOperator(toleration.operator)
.withValue(toleration.value)
.withEffect(toleration.effect)
.build())
}
return v1Tolerations
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ class KubeBuildStrategyTest extends Specification {
strategy.build('build-job-name', req)

then:
1 * k8sService.launchBuildJob( _, _, _, _, _, _, [service:'wave-build']) >> null
1 * k8sService.launchBuildJob( _, _, _, _, _, _, [service:'wave-build'],_) >> null

when:
def req2 = new BuildRequest(containerId, dockerfile, null, PATH, targetImage, USER, ContainerPlatform.of('arm64'), cache, "10.20.30.40", '{}', null,null , null, null, BuildFormat.DOCKER, Duration.ofMinutes(1))
Files.createDirectories(req2.workDir)
strategy.build('job-name', req2)

then:
1 * k8sService.launchBuildJob( _, _, _, _, _, _, [service:'wave-build-arm64']) >> null
1 * k8sService.launchBuildJob( _, _, _, _, _, _, [service:'wave-build-arm64'],_) >> null

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import spock.lang.Unroll

import java.nio.file.Path
import java.time.Duration
import java.time.Instant
import java.time.OffsetDateTime

import io.kubernetes.client.custom.Quantity
Expand All @@ -40,6 +41,8 @@ import io.micronaut.context.ApplicationContext
import io.seqera.wave.configuration.BlobCacheConfig
import io.seqera.wave.configuration.MirrorConfig
import io.seqera.wave.configuration.ScanConfig
import io.seqera.wave.core.ContainerPlatform

/**
*
* @author Paolo Di Tommaso <[email protected]>
Expand Down Expand Up @@ -593,7 +596,7 @@ class K8sServiceImplTest extends Specification {
def nodeSelector = [key: 'value']

when:
def job = k8sService.buildJobSpec(name, containerImage, args, workDir, credsFile, timeout, nodeSelector)
def job = k8sService.buildJobSpec(name, containerImage, args, workDir, credsFile, timeout, nodeSelector, ContainerPlatform.of('arm64'))

then:
job.spec.backoffLimit == 3
Expand Down Expand Up @@ -645,7 +648,7 @@ class K8sServiceImplTest extends Specification {
def nodeSelector = [key: 'value']

when:
def job = k8sService.buildJobSpec(name, containerImage, args, workDir, credsFile, timeout, nodeSelector)
def job = k8sService.buildJobSpec(name, containerImage, args, workDir, credsFile, timeout, nodeSelector, ContainerPlatform.of('amd64'))

then:
job.spec.template.spec.containers[0].image == containerImage
Expand Down Expand Up @@ -1005,4 +1008,82 @@ class K8sServiceImplTest extends Specification {
jobStarted() | K8sService.JobStatus.Pending
jobUnknown() | K8sService.JobStatus.Pending
}

def 'should add tolerations for arm64 in pod spec' () {
given:
def PROPS = [
'wave.build.workspace': '/build/work',
'wave.build.k8s.namespace': 'foo',
'wave.build.k8s.configPath': '/home/kube.config',
'wave.build.k8s.storage.claimName': 'bar',
'wave.build.k8s.storage.mountPath': '/build',
'wave.build.k8s.tolerations.enabled': true,
'wave.build.k8s.tolerations.arm64[0].key': 'arch',
'wave.build.k8s.tolerations.arm64[0].value': 'arm64',
'wave.build.k8s.tolerations.arm64[0].operator': 'Equal',
'wave.build.k8s.tolerations.arm64[0].effect': 'NoSchedule']
and:
def ctx = ApplicationContext.run(PROPS)
def k8sService = ctx.getBean(K8sServiceImpl)

when:
def result = k8sService.buildJobSpec('foo', 'my-image:latest', ['this','that'], Path.of('/build/work/xyz'), Path.of('/build/work/xyz/config.json'), Duration.ofMinutes(1), [:], ContainerPlatform.of('arm64'))

then: 'should set the tolerations for arm64'
result.spec.template.spec.tolerations.get(0).key == 'arch'
result.spec.template.spec.tolerations.get(0).value == 'arm64'
result.spec.template.spec.tolerations.get(0).operator == 'Equal'
result.spec.template.spec.tolerations.get(0).effect == 'NoSchedule'

when:
result = k8sService.buildJobSpec('foo', 'my-image:latest', ['this','that'], Path.of('/build/work/xyz'), Path.of('/build/work/xyz/config.json'), Duration.ofMinutes(1), [:], ContainerPlatform.of('amd64'))

then: 'should not set the toleration for amd64'
result.spec.template.spec.tolerations == null

when:
result = k8sService.buildJobSpec('foo', 'my-image:latest', ['this','that'], Path.of('/build/work/xyz'), Path.of('/build/work/xyz/config.json'), Duration.ofMinutes(1), [:], null)

then: 'should not throw NPE'
result.spec.template.spec.tolerations == null
}

def 'should add tolerations for amd64 in pod spec' () {
given:
def PROPS = [
'wave.build.workspace': '/build/work',
'wave.build.k8s.namespace': 'foo',
'wave.build.k8s.configPath': '/home/kube.config',
'wave.build.k8s.storage.claimName': 'bar',
'wave.build.k8s.storage.mountPath': '/build',
'wave.build.k8s.tolerations.enabled': true,
'wave.build.k8s.tolerations.amd64[0].key': 'arch',
'wave.build.k8s.tolerations.amd64[0].value': 'amd64',
'wave.build.k8s.tolerations.amd64[0].operator': 'Equal',
'wave.build.k8s.tolerations.amd64[0].effect': 'NoSchedule']
and:
def ctx = ApplicationContext.run(PROPS)
def k8sService = ctx.getBean(K8sServiceImpl)

when:
def result = k8sService.buildJobSpec('foo', 'my-image:latest', ['this','that'], Path.of('/build/work/xyz'), Path.of('/build/work/xyz/config.json'), Duration.ofMinutes(1), [:], ContainerPlatform.of('amd64'))

then: 'should set the tolerations for amd64'
result.spec.template.spec.tolerations.get(0).key == 'arch'
result.spec.template.spec.tolerations.get(0).value == 'amd64'
result.spec.template.spec.tolerations.get(0).operator == 'Equal'
result.spec.template.spec.tolerations.get(0).effect == 'NoSchedule'

when:
result = k8sService.buildJobSpec('foo', 'my-image:latest', ['this','that'], Path.of('/build/work/xyz'), Path.of('/build/work/xyz/config.json'), Duration.ofMinutes(1), [:], ContainerPlatform.of('arm64'))

then: 'should not set the toleration for arm64'
result.spec.template.spec.tolerations == null

when:
result = k8sService.buildJobSpec('foo', 'my-image:latest', ['this','that'], Path.of('/build/work/xyz'), Path.of('/build/work/xyz/config.json'), Duration.ofMinutes(1), [:], null)

then: 'should not throw NPE'
result.spec.template.spec.tolerations == null
}
}
Loading