Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat 18 node selector #25

Merged
merged 2 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/io/boomerang/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
@EnableAutoConfiguration
public class Application {

public static void main(String[] args) {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
11 changes: 4 additions & 7 deletions src/main/java/io/boomerang/kube/service/KubeServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ public class KubeServiceImpl implements KubeService {
@Value("${kube.image.pullSecret}")
protected String kubeImagePullSecret;

@Value("${kube.worker.job.backOffLimit}")
@Value("${kube.task.backOffLimit}")
protected Integer kubeJobBackOffLimit;

@Value("${kube.worker.job.restartPolicy}")
@Value("${kube.task.restartPolicy}")
protected String kubeJobRestartPolicy;

@Value("${kube.worker.job.ttlDays}")
@Value("${kube.task.ttlDays}")
protected Integer kubeJobTTLDays;

@Value("${kube.worker.serviceaccount}")
Expand All @@ -61,12 +61,9 @@ public class KubeServiceImpl implements KubeService {
@Value("${kube.resource.request.memory}")
private String kubeResourceRequestMemory;

@Value("${kube.worker.storage.data.memory}")
@Value("${kube.task.storage.data.memory}")
private Boolean kubeWorkerStorageDataMemory;

@Value("${kube.worker.node.dedicated}")
protected Boolean kubeJobDedicatedNodes;

@Value("${kube.worker.hostaliases}")
protected String kubeHostAliases;

Expand Down
59 changes: 34 additions & 25 deletions src/main/java/io/boomerang/kube/service/TektonServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,15 @@ public class TektonServiceImpl implements TektonService {
@Value("${kube.lifecycle.image}")
protected String kubeLifecycleImage;

@Value("${kube.worker.job.backOffLimit}")
@Value("${kube.task.backOffLimit}")
protected Integer kubeJobBackOffLimit;

@Value("${kube.worker.job.restartPolicy}")
@Value("${kube.task.restartPolicy}")
protected String kubeJobRestartPolicy;

@Value("${kube.worker.job.ttlDays}")
@Value("${kube.task.ttlDays}")
protected Integer kubeJobTTLDays;

@Value("${kube.worker.serviceaccount}")
protected String kubeJobServiceAccount;

@Value("${kube.resource.limit.ephemeral-storage}")
private String kubeResourceLimitEphemeralStorage;

Expand All @@ -95,14 +92,20 @@ public class TektonServiceImpl implements TektonService {
@Value("${kube.resource.request.memory}")
private String kubeResourceRequestMemory;

@Value("${kube.worker.storage.data.memory}")
private Boolean kubeWorkerStorageDataMemory;

@Value("${kube.worker.node.dedicated}")
protected Boolean kubeJobDedicatedNodes;

@Value("${kube.task.storage.data.memory}")
private Boolean kubeTaskStorageDataMemory;
@Value("${kube.worker.serviceaccount}")
private String kubeWorkerServiceAccount;
@Value("${kube.worker.hostaliases}")
protected String kubeHostAliases;
private String kubeWorkerHostAliases;

@Value("#{${kube.worker.nodeselector}}")
private Map<String, String> kubeWorkerNodeSelector;

@Value("${kube.worker.tolerations}")
private String kubeWorkerTolerations;

TektonClient client = null;

Expand Down Expand Up @@ -160,7 +163,7 @@ public TaskRun createTaskRun(String workflowName,
*/
List<WorkspaceDeclaration> taskSpecWorkspaces = new ArrayList<>();
List<WorkspaceBinding> taskWorkspaces = new ArrayList<>();
if (workspaces != null) {
if (workspaces != null && !workspaces.isEmpty()) {
workspaces.forEach(ws -> {
if ("workflow".equals(ws.getName()) && kubeService.checkWorkspacePVCExists(ws.getId(), false)) {
WorkspaceDeclaration wsWorkspaceDeclaration = new WorkspaceDeclaration();
Expand Down Expand Up @@ -247,7 +250,7 @@ public TaskRun createTaskRun(String workflowName,
Volume dataVolume = new Volume();
dataVolume.setName(helperKubeService.getPrefixVol() + "-data");
EmptyDirVolumeSource dataEmptyDirVolumeSource = new EmptyDirVolumeSource();
if (kubeWorkerStorageDataMemory
if (kubeTaskStorageDataMemory
&& Boolean.valueOf(parameters.get("worker.storage.data.memory"))) {
LOGGER.info("Setting /data to in memory storage...");
dataEmptyDirVolumeSource.setMedium("Memory");
Expand Down Expand Up @@ -281,23 +284,28 @@ public TaskRun createTaskRun(String workflowName,
*/
List<Toleration> tolerations = new ArrayList<>();
Map<String, String> nodeSelectors = new HashMap<>();
if (kubeJobDedicatedNodes) {
Toleration toleration = new Toleration();
toleration.setKey("dedicated");
toleration.setValue("bmrg-worker");
toleration.setEffect("NoSchedule");
toleration.setOperator("Equal");
tolerations.add(toleration);
nodeSelectors.put("node-role.kubernetes.io/bmrg-worker", "true");
if (kubeWorkerNodeSelector != null && !kubeWorkerNodeSelector.isEmpty()) {
LOGGER.info(kubeWorkerNodeSelector.toString());
kubeWorkerNodeSelector.forEach((k, v) -> {
LOGGER.info("Adding node selector: " + k + "=" + v);
nodeSelectors.put(k, v);
});
}
LOGGER.info("Finalized Node Selectors: " + nodeSelectors.toString());
if (kubeWorkerTolerations != null && !kubeWorkerTolerations.isEmpty()) {
LOGGER.info(kubeWorkerTolerations.toString());
Type listTolerationsType = new TypeToken<List<Toleration>>() {}.getType();
tolerations = new Gson().fromJson(kubeWorkerTolerations, listTolerationsType);
}
LOGGER.info("Finalized Tolerations: " + tolerations.toString());

/*
* Create Host Aliases if defined
*/
List<HostAlias> hostAliases = new ArrayList<>();
if (!kubeHostAliases.isEmpty()) {
if (!kubeWorkerHostAliases.isEmpty()) {
Type listHostAliasType = new TypeToken<List<HostAlias>>() {}.getType();
hostAliases = new Gson().fromJson(kubeHostAliases, listHostAliasType);
hostAliases = new Gson().fromJson(kubeWorkerHostAliases, listHostAliasType);
}

/*
Expand Down Expand Up @@ -422,6 +430,7 @@ public TaskRun createTaskRun(String workflowName,
.withParams(taskParams)
.withWorkspaces(taskWorkspaces)
.withTimeout(taskTimeout)
// .withServiceAccountName(workerProperties.getServiceaccount())
.withNewTaskSpec()
.withWorkspaces(taskSpecWorkspaces)
.withParams(taskSpecParams)
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/boomerang/service/TaskServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ public class TaskServiceImpl implements TaskService {
@Value("${kube.timeout.waitUntil}")
protected long waitUntilTimeout;

@Value("${kube.worker.job.deletion}")
@Value("${kube.task.deletion}")
private TaskDeletionEnum taskDeletion;

@Value("${kube.worker.timeout}")
@Value("${kube.task.timeout}")
private Integer taskTimeout;

@Autowired
Expand Down
5 changes: 3 additions & 2 deletions src/main/resources/application-local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ logging.level.root=INFO

# Disable elastic integration locally
kube.worker.logging.type=default
kube.worker.node.dedicated=true
kube.worker.nodeselector={'node-role.kubernetes.io/bmrg-worker': 'true'}
kube.worker.tolerations=[{key: 'dedicated', value: 'bmrg-worker', operator: 'Equal', effect: 'NoSchedule'}]

proxy.enable=false

Expand All @@ -13,4 +14,4 @@ springfox.documentation.swagger.v2.path=/controller/api-docs
opentracing.jaeger.enabled=false

# Enable HTTP Trace Logging for Frabic8 kubernetes-client
org.slf4j.simpleLogger.defaultLogLevel=trace
org.slf4j.simpleLogger.defaultLogLevel=traces
17 changes: 10 additions & 7 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@ kube.workflow.storage.class=
kube.workflow.storage.accessMode=ReadWriteMany

# Task / Worker configuration
kube.worker.job.backOffLimit=0
kube.worker.job.restartPolicy=Never
kube.worker.job.ttlDays=7
kube.worker.job.deletion=Never
kube.worker.timeout=60
kube.worker.storage.data.memory=false
# We split on Task vs Worker so that we dont have to build or custom property model class for all of them
# Worker also comes from the values.yaml vs the Task properties come with the Task.java model
kube.task.backOffLimit=0
kube.task.restartPolicy=Never
kube.task.ttlDays=7
kube.task.deletion=Never
kube.task.timeout=60
kube.task.storage.data.memory=false
kube.worker.hostaliases=[]
kube.worker.node.dedicated=false
kube.worker.serviceaccount=
kube.worker.nodeselector={}
kube.worker.tolerations=[{}]
# typically we do 1/4 to 1/8 of the limit as the request
kube.resource.limit.ephemeral-storage=16Gi
kube.resource.request.ephemeral-storage=2Gi
Expand Down