Skip to content

Commit

Permalink
feat(core): improve performance of ExecutorService.handleChildWorkerT…
Browse files Browse the repository at this point in the history
…askResult

Searching for a retry in all parents is a costly operation, doing it only wgen we are retrying or failing avoid it most of the time.
  • Loading branch information
loicmathieu committed Feb 28, 2025
1 parent 2c77a43 commit fa07cbd
Showing 1 changed file with 41 additions and 31 deletions.
72 changes: 41 additions & 31 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -493,53 +493,43 @@ private Executor handleChildWorkerTaskResult(Executor executor) throws Exception
}

Task task = executor.getFlow().findTaskByTaskIdOrNull(taskRun.getTaskId());
String taskId = taskRun.getTaskId();
Task parentTask = null;
if (taskRun.getParentTaskRunId() != null) {
do {
parentTask = executor.getFlow().findParentTasksByTaskId(taskId);
if (parentTask != null) {
taskId = parentTask.getId();
}
} while (parentTask != null && parentTask.getRetry() == null);
}

/*
* Check if the task is failed and if it has a retry policy
*/
if (!executor.getExecution().getState().isRetrying() &&
taskRun.getState().isFailed() &&
(task instanceof RunnableTask<?> || task instanceof Subflow) &&
(task.getRetry() != null || executor.getFlow().getRetry() != null || (parentTask != null && parentTask.getRetry() != null))
(task instanceof RunnableTask<?> || task instanceof Subflow)
) {
Instant nextRetryDate;
AbstractRetry retry;
AbstractRetry.Behavior behavior;
Instant nextRetryDate = null;
AbstractRetry.Behavior behavior = null;

// Case task has a retry
if (task.getRetry() != null) {
retry = task.getRetry();
behavior = retry.getBehavior();
nextRetryDate = behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
taskRun.nextRetryDate(retry, executor.getExecution()) :
taskRun.nextRetryDate(retry);
}
// Case parent task has a retry
else if (parentTask != null && parentTask.getRetry() != null) {
retry = parentTask.getRetry();
AbstractRetry retry = task.getRetry();
behavior = retry.getBehavior();
nextRetryDate = behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
taskRun.nextRetryDate(retry, executor.getExecution()) :
taskRun.nextRetryDate(retry);
}
// Case flow has a retry
else {
retry = executor.getFlow().getRetry();
behavior = retry.getBehavior();
nextRetryDate = behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
executionService.nextRetryDate(retry, executor.getExecution()) :
taskRun.nextRetryDate(retry);
// Case parent task has a retry
AbstractRetry retry = searchForParentRetry(taskRun, executor);
if (retry != null) {
behavior = retry.getBehavior();
nextRetryDate = behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
taskRun.nextRetryDate(retry, executor.getExecution()) :
taskRun.nextRetryDate(retry);
}
// Case flow has a retry
else if (executor.getFlow().getRetry() != null) {
retry = executor.getFlow().getRetry();
behavior = retry.getBehavior();
nextRetryDate = behavior.equals(AbstractRetry.Behavior.CREATE_NEW_EXECUTION) ?
executionService.nextRetryDate(retry, executor.getExecution()) :
taskRun.nextRetryDate(retry);
}
}

if (nextRetryDate != null) {
ExecutionDelay.ExecutionDelayBuilder executionDelayBuilder = ExecutionDelay.builder()
.taskRunId(taskRun.getId())
Expand Down Expand Up @@ -607,6 +597,26 @@ else if (task instanceof LoopUntil waitFor && taskRun.getState().isRunning()) {
return executor;
}

private AbstractRetry searchForParentRetry(TaskRun taskRun, Executor executor) {
// search in all parents, recursively
if (taskRun.getParentTaskRunId() != null) {
String taskId = taskRun.getTaskId();
Task parentTask;
do {
parentTask = executor.getFlow().findParentTasksByTaskId(taskId);
if (parentTask != null) {
taskId = parentTask.getId();
}
} while (parentTask != null && parentTask.getRetry() == null);

if (parentTask != null) {
return parentTask.getRetry();
}
}

return null;
}

private Executor handlePausedDelay(Executor executor, List<WorkerTaskResult> workerTaskResults) throws InternalException {
if (workerTaskResults
.stream()
Expand Down

0 comments on commit fa07cbd

Please sign in to comment.