diff --git a/README.md b/README.md index 155bbb5..f8ccf9a 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ provides a fully managed workflow-as-a-service (WAAS) to the data platform users It serves thousands of users, including data scientists, data engineers, machine learning engineers, software engineers, content producers, and business analysts, for various use cases. It schedules hundreds of thousands of workflows, millions of jobs every day -and operate with a strict SLO even when there are spikes in the traffic. +and operates with a strict SLO even when there are spikes in the traffic. Maestro is highly scalable and extensible to support existing and new use cases and offers enhanced usability to end users. You can read more details about it in our latest [blog post](https://netflixtechblog.com/maestro-netflixs-workflow-orchestrator-ee13a06f9c78). diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java b/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java index 939394a..c443ece 100644 --- a/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java +++ b/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java @@ -148,7 +148,7 @@ private Constants() {} public static final int MAX_PLATFORM_RETRY_LIMIT_SECS = 24 * 3600; // 1 day /** maximum retry wait limit for timeout errors. */ - public static final int MAX_TIMEOUT_RETRY_LIMIT_SECS = 24 * 3600; // 1 days + public static final int MAX_TIMEOUT_RETRY_LIMIT_SECS = 24 * 3600; // 1 day /** Max timeout limit in milliseconds. */ public static final long MAX_TIME_OUT_LIMIT_IN_MILLIS = TimeUnit.DAYS.toMillis(120); // 120 days @@ -316,7 +316,7 @@ public static WorkflowVersion of(String version) { /** Workflow create request data size limit used for validation. */ public static final String WORKFLOW_CREATE_REQUEST_DATA_SIZE_LIMIT = "256KB"; - /** params' total size (in JSON format) limit for a workflow instance or a step instance. */ + /** param's total size (in JSON format) limit for a workflow instance or a step instance. */ public static final int JSONIFIED_PARAMS_STRING_SIZE_LIMIT = 750000; /** Defines limit for the query for step attempt state view. */ diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/definition/Properties.java b/maestro-common/src/main/java/com/netflix/maestro/models/definition/Properties.java index 3fcb5ff..8a38167 100644 --- a/maestro-common/src/main/java/com/netflix/maestro/models/definition/Properties.java +++ b/maestro-common/src/main/java/com/netflix/maestro/models/definition/Properties.java @@ -31,7 +31,7 @@ * *
If unset (null value), means there is no change for this field. * - *
Properties changes are kept separately and can evolve independently from the workflow version + *
Properties changes are kept separately and can evolve independently of the workflow version
* changes.
*/
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/definition/Step.java b/maestro-common/src/main/java/com/netflix/maestro/models/definition/Step.java
index 6c80d19..853b358 100644
--- a/maestro-common/src/main/java/com/netflix/maestro/models/definition/Step.java
+++ b/maestro-common/src/main/java/com/netflix/maestro/models/definition/Step.java
@@ -50,7 +50,7 @@ public interface Step {
/** Get step type. */
StepType getType();
- /** Get step sub type. */
+ /** Get step subtype. */
default String getSubType() {
return null;
}
diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/definition/StepDependenciesDefinition.java b/maestro-common/src/main/java/com/netflix/maestro/models/definition/StepDependenciesDefinition.java
index 71093da..c3e5f0f 100644
--- a/maestro-common/src/main/java/com/netflix/maestro/models/definition/StepDependenciesDefinition.java
+++ b/maestro-common/src/main/java/com/netflix/maestro/models/definition/StepDependenciesDefinition.java
@@ -34,7 +34,7 @@ public class StepDependenciesDefinition {
/** param name for step dependency name. */
public static final String STEP_DEPENDENCY_NAME = "name";
- /** param name for step dependency sub type, like input_table, input_s3. */
+ /** param name for step dependency subtype, like input_table, input_s3. */
public static final String STEP_DEPENDENCY_SUB_TYPE = "_step_dependency_sub_type";
private final List While the step status is RUNNING, the code in execute() will be called periodically with a
- * preset polling interval. Additionally, if the execution throws an exception, the execute will
+ * preset polling interval. Additionally, if the execution throws an exception, the execution will
* be retried as another step instance run.
*
* The input data are a copy of the original summary data. Any changes on them will be
diff --git a/maestro-flow/build.gradle b/maestro-flow/build.gradle
index 5124afd..423a520 100644
--- a/maestro-flow/build.gradle
+++ b/maestro-flow/build.gradle
@@ -4,4 +4,10 @@ dependencies {
implementation jacksonDatabindDep
implementation jacksonAnnotationsDep
api slf4jApiDep
+
+ testImplementation junitDep
+ testImplementation mockitoCoreDep
+ testImplementation testcontainerDep
+ testImplementation postgresqlDep
+ testImplementation(testFixtures(project(':maestro-common')))
}
diff --git a/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/BaseActor.java b/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/BaseActor.java
index 73123f9..76e8054 100644
--- a/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/BaseActor.java
+++ b/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/BaseActor.java
@@ -1,6 +1,7 @@
package com.netflix.maestro.flow.actor;
import com.netflix.maestro.annotations.Nullable;
+import com.netflix.maestro.annotations.VisibleForTesting;
import com.netflix.maestro.flow.engine.ExecutionContext;
import com.netflix.maestro.metrics.MaestroMetrics;
import java.util.HashMap;
@@ -9,7 +10,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
-import java.util.stream.Stream;
import lombok.Getter;
import org.slf4j.Logger;
@@ -120,21 +120,22 @@ boolean noChildActorsRunning() {
}
void startShutdown(Action action) {
- if (childActors.isEmpty()) {
- checkShutdown();
- } else {
+ if (!checkShutdown()) {
cancelPendingActions();
wakeUpChildActors(action);
}
}
- void checkShutdown() {
+ // return true if shutdown is finished, otherwise false
+ boolean checkShutdown() {
if (noChildActorsRunning()) {
terminateNow();
if (parent != null) {
parent.post(Action.FLOW_DOWN);
}
+ return true;
}
+ return false;
}
void terminateNow() {
@@ -142,27 +143,10 @@ void terminateNow() {
running = false;
}
- void cancelPendingActions() {
+ private void cancelPendingActions() {
scheduledActions.values().forEach(f -> f.cancel(true));
}
- Stream Basic rule: flow actor can only activate a task actor. A task actor can only deactivate itself
diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/FlowBaseTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/FlowBaseTest.java
new file mode 100644
index 0000000..4962f74
--- /dev/null
+++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/FlowBaseTest.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2025 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.maestro.flow;
+
+import com.netflix.maestro.flow.models.Flow;
+import com.netflix.maestro.flow.models.FlowDef;
+import java.util.Map;
+import org.junit.After;
+import org.junit.Before;
+import org.mockito.MockitoAnnotations;
+
+public abstract class FlowBaseTest {
+
+ private AutoCloseable closeable;
+
+ @Before
+ public void openMocks() {
+ closeable = MockitoAnnotations.openMocks(this);
+ }
+
+ @After
+ public void releaseMocks() throws Exception {
+ closeable.close();
+ }
+
+ protected Flow createFlow() {
+ Flow flow =
+ new Flow(10, "test-flow-id", 1, System.currentTimeMillis() + 3600000, "test-flow-ref");
+ flow.setInput(Map.of());
+ flow.setFlowDef(new FlowDef());
+ flow.setStatus(Flow.Status.RUNNING);
+ flow.setUpdateTime(flow.getStartTime());
+ return flow;
+ }
+}
diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorBaseTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorBaseTest.java
new file mode 100644
index 0000000..1575351
--- /dev/null
+++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorBaseTest.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2025 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.maestro.flow.actor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import com.netflix.maestro.flow.FlowBaseTest;
+import com.netflix.maestro.flow.engine.ExecutionContext;
+import com.netflix.maestro.flow.models.FlowGroup;
+import com.netflix.maestro.flow.properties.FlowEngineProperties;
+import com.netflix.maestro.metrics.MaestroMetrics;
+import org.junit.Before;
+import org.mockito.Mock;
+
+public abstract class ActorBaseTest extends FlowBaseTest {
+ @Mock protected ExecutionContext context;
+ @Mock protected FlowEngineProperties properties;
+ @Mock protected MaestroMetrics metrics;
+
+ @Before
+ public void initialize() {
+ when(context.getProperties()).thenReturn(properties);
+ when(properties.getActorErrorRetryIntervalInMillis()).thenReturn(3000L);
+ when(properties.getHeartbeatIntervalInMillis()).thenReturn(10000L);
+ when(properties.getGroupFlowFetchLimit()).thenReturn(100L);
+ when(properties.getFlowReconciliationIntervalInMillis()).thenReturn(30000L);
+ when(context.getMetrics()).thenReturn(metrics);
+ when(context.cloneTask(any())).thenAnswer(i -> i.getArguments()[0]);
+ }
+
+ GroupActor createGroupActor() {
+ FlowGroup group = new FlowGroup(1, 2, "testAddress");
+ return new GroupActor(group, context);
+ }
+
+ void verifyEmptyAction(BaseActor actor) {
+ assertTrue(actor.getActions().isEmpty());
+ }
+
+ void verifyActions(BaseActor actor, Action... expectedActions) {
+ var actions = actor.getActions();
+ assertEquals(expectedActions.length, actions.size());
+ for (Action expectedAction : expectedActions) {
+ assertEquals(expectedAction, actions.poll());
+ }
+ }
+}
diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorTest.java
new file mode 100644
index 0000000..9087a53
--- /dev/null
+++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorTest.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2025 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.maestro.flow.actor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.netflix.maestro.flow.models.FlowGroup;
+import org.junit.Test;
+
+public class ActorTest extends ActorBaseTest {
+
+ @Test
+ public void testStartGroupActor() {
+ FlowGroup group = new FlowGroup(1, 2, "testAddress");
+ GroupActor actor = (GroupActor) Actor.startGroupActor(group, context);
+ verify(context, times(1)).run(any());
+ // check if the group actor was started
+ assertTrue(actor.isRunning());
+ assertEquals(2, actor.generation());
+ verifyActions(actor, Action.GROUP_START);
+ }
+}
diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/BaseActorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/BaseActorTest.java
new file mode 100644
index 0000000..3c612a0
--- /dev/null
+++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/BaseActorTest.java
@@ -0,0 +1,355 @@
+/*
+ * Copyright 2025 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.maestro.flow.actor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.netflix.maestro.AssertHelper;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class BaseActorTest extends ActorBaseTest {
+ private GroupActor groupActor;
+ private FlowActor flowActor;
+
+ @Before
+ public void init() {
+ groupActor = createGroupActor();
+ flowActor = new FlowActor(createFlow(), groupActor, context);
+ }
+
+ @Test
+ public void testIsRunning() {
+ assertTrue(groupActor.isRunning());
+ assertTrue(flowActor.isRunning());
+
+ flowActor.terminateNow();
+ assertTrue(groupActor.isRunning());
+ assertFalse(flowActor.isRunning());
+
+ var actor3 = new FlowActor(createFlow(), groupActor, context);
+ assertTrue(groupActor.isRunning());
+ assertTrue(actor3.isRunning());
+
+ groupActor.terminateNow();
+ assertFalse(groupActor.isRunning());
+ assertFalse(actor3.isRunning());
+ }
+
+ @Test
+ public void testPost() {
+ verifyEmptyAction(groupActor);
+
+ groupActor.post(Action.GROUP_START);
+ verifyActions(groupActor, Action.GROUP_START);
+ }
+
+ @Test
+ public void testGeneration() {
+ assertEquals(2, groupActor.generation());
+ assertEquals(2, flowActor.generation());
+ }
+
+ @Test
+ public void testRunActionFor() {
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+
+ verify(context, times(1)).run(any());
+ verifyActions(flowActor, Action.FLOW_START);
+ assertEquals(flowActor, groupActor.getChild("test-flow-ref"));
+
+ doThrow(new RejectedExecutionException()).when(context).run(flowActor);
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ verifyEmptyAction(flowActor);
+
+ doThrow(new RuntimeException("test unhandled error")).when(context).run(flowActor);
+ AssertHelper.assertThrows(
+ "should throw instead of ignore the error",
+ RuntimeException.class,
+ "test unhandled error",
+ () -> groupActor.runActionFor(flowActor, Action.FLOW_START));
+ }
+
+ @Test
+ public void testWakeUpChildActors() {
+ groupActor.wakeUpChildActors(Action.FLOW_TIMEOUT);
+ verifyEmptyAction(flowActor);
+
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ groupActor.wakeUpChildActors(Action.FLOW_TIMEOUT);
+ verifyActions(flowActor, Action.FLOW_START, Action.FLOW_TIMEOUT);
+ }
+
+ @Test
+ public void testWakeUpChildActor() {
+ groupActor.wakeUpChildActor("test-flow-ref", Action.FLOW_TIMEOUT);
+ verifyEmptyAction(flowActor);
+
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ groupActor.wakeUpChildActor("test-flow-ref", Action.FLOW_TIMEOUT);
+ verifyActions(flowActor, Action.FLOW_START, Action.FLOW_TIMEOUT);
+
+ groupActor.wakeUpChildActor("not-exist-flow-ref", Action.FLOW_TIMEOUT);
+ verifyEmptyAction(flowActor);
+
+ flowActor.terminateNow();
+ groupActor.wakeUpChildActor("test-flow-ref", Action.FLOW_TIMEOUT);
+ verifyEmptyAction(flowActor);
+ }
+
+ @Test
+ public void testContainChild() {
+ assertFalse(groupActor.containsChild("test-flow-ref"));
+
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ assertTrue(groupActor.containsChild("test-flow-ref"));
+ }
+
+ @Test
+ public void testGetChild() {
+ assertNull(groupActor.getChild("test-flow-ref"));
+
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ assertEquals(flowActor, groupActor.getChild("test-flow-ref"));
+ }
+
+ @Test
+ public void testRemoveChild() {
+ assertNull(groupActor.removeChild("test-flow-ref"));
+
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ assertEquals(flowActor, groupActor.removeChild("test-flow-ref"));
+ assertNull(groupActor.getChild("test-flow-ref"));
+ }
+
+ @Test
+ public void testCleanupChildActors() {
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ groupActor.cleanupChildActors();
+ assertTrue(groupActor.containsChild("test-flow-ref"));
+
+ flowActor.terminateNow();
+ groupActor.cleanupChildActors();
+ assertFalse(groupActor.containsChild("test-flow-ref"));
+ }
+
+ @Test
+ public void testNoChildActorsRunning() {
+ assertTrue(groupActor.noChildActorsRunning());
+
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ assertFalse(groupActor.noChildActorsRunning());
+
+ flowActor.terminateNow();
+ assertTrue(groupActor.noChildActorsRunning());
+ }
+
+ @Test
+ public void testStartShutdownFromRootWithoutChildActors() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+
+ assertTrue(groupActor.isRunning());
+
+ groupActor.schedule(Action.GROUP_START, 10000);
+ assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet());
+
+ groupActor.startShutdown(Action.FLOW_SHUTDOWN);
+ verify(future, times(1)).cancel(true);
+ assertFalse(groupActor.isRunning());
+ }
+
+ @Test
+ public void testStartShutdownFromRootWithChildActors() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ assertTrue(groupActor.isRunning());
+
+ groupActor.schedule(Action.GROUP_START, 10000);
+ assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet());
+
+ groupActor.startShutdown(Action.FLOW_SHUTDOWN);
+ verify(future, times(1)).cancel(true);
+
+ assertTrue(groupActor.isRunning());
+ verifyActions(flowActor, Action.FLOW_START, Action.FLOW_SHUTDOWN);
+ }
+
+ @Test
+ public void testStartShutdownFromChildWithoutChildActors() {
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ assertTrue(groupActor.isRunning());
+ assertTrue(flowActor.isRunning());
+
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+
+ flowActor.schedule(Action.FLOW_REFRESH, 10000);
+ assertEquals(Set.of(Action.FLOW_REFRESH), flowActor.getScheduledActions().keySet());
+
+ flowActor.startShutdown(Action.TASK_SHUTDOWN);
+ verify(future, times(1)).cancel(true);
+ assertFalse(flowActor.isRunning());
+ assertTrue(groupActor.isRunning());
+ verifyActions(groupActor, Action.FLOW_DOWN);
+
+ groupActor.startShutdown(Action.FLOW_SHUTDOWN);
+ assertFalse(groupActor.isRunning());
+ }
+
+ @Test
+ public void testCheckShutdownWithChildRunning() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ assertTrue(groupActor.isRunning());
+ assertTrue(flowActor.isRunning());
+
+ flowActor.schedule(Action.FLOW_REFRESH, 10000);
+ assertEquals(Set.of(Action.FLOW_REFRESH), flowActor.getScheduledActions().keySet());
+
+ assertFalse(groupActor.checkShutdown());
+ verify(future, times(0)).cancel(true);
+ assertTrue(groupActor.isRunning());
+ }
+
+ @Test
+ public void testCheckShutdownWithoutChildRunning() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ assertTrue(groupActor.isRunning());
+ assertTrue(flowActor.isRunning());
+
+ flowActor.schedule(Action.FLOW_REFRESH, 10000);
+ assertEquals(Set.of(Action.FLOW_REFRESH), flowActor.getScheduledActions().keySet());
+
+ assertTrue(flowActor.checkShutdown());
+ verifyActions(groupActor, Action.FLOW_DOWN);
+ assertFalse(flowActor.isRunning());
+
+ assertTrue(groupActor.checkShutdown());
+ verify(future, times(1)).cancel(true);
+ assertFalse(groupActor.isRunning());
+ }
+
+ @Test
+ public void testTerminateNow() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+
+ assertTrue(groupActor.isRunning());
+ groupActor.schedule(Action.GROUP_START, 10000);
+ assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet());
+
+ groupActor.terminateNow();
+ assertFalse(groupActor.isRunning());
+ verify(future, times(1)).cancel(true);
+ }
+
+ @Test
+ public void testScheduleWithoutDelay() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+
+ groupActor.schedule(Action.GROUP_START, 0);
+ verify(context, times(0)).schedule(any(), anyLong());
+ verifyActions(groupActor, Action.GROUP_START);
+ }
+
+ @Test
+ public void testScheduleWithDelay() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+
+ groupActor.schedule(Action.GROUP_START, 10000);
+ verify(context, times(1)).schedule(any(), anyLong());
+ assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet());
+ verifyEmptyAction(groupActor);
+ }
+
+ @Test
+ public void testScheduleForNotRunningActor() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+
+ groupActor.terminateNow();
+
+ groupActor.schedule(Action.GROUP_START, 10000);
+ verify(context, times(0)).schedule(any(), anyLong());
+ verifyEmptyAction(groupActor);
+ }
+
+ @Test
+ public void testScheduleForDuplicatePendingAction() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+
+ groupActor.schedule(Action.GROUP_START, 10000);
+ groupActor.schedule(Action.GROUP_START, 10000);
+ verify(context, times(1)).schedule(any(), anyLong());
+ assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet());
+ verifyEmptyAction(groupActor);
+ }
+
+ @Test
+ public void testScheduleForDuplicateDoneAction() {
+ var future1 = Mockito.mock(ScheduledFuture.class);
+ var future2 = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future1).thenReturn(future2);
+ when(future1.isDone()).thenReturn(true);
+
+ groupActor.runActionFor(flowActor, Action.FLOW_START);
+ groupActor.schedule(Action.GROUP_START, 10000);
+ groupActor.startShutdown(Action.FLOW_SHUTDOWN);
+ verify(future1, times(1)).cancel(true);
+ assertEquals(Map.of(Action.GROUP_START, future1), groupActor.getScheduledActions());
+ assertTrue(groupActor.isRunning());
+
+ groupActor.schedule(Action.GROUP_START, 10000);
+ verify(context, times(2)).schedule(any(), anyLong());
+ verify(future1, times(1)).cancel(true);
+ verify(future2, times(0)).cancel(true);
+ assertEquals(Map.of(Action.GROUP_START, future2), groupActor.getScheduledActions());
+ }
+
+ @Test
+ public void testRun() {
+ flowActor.post(Action.TASK_DOWN);
+ flowActor.run();
+ verify(context, times(2)).getMetrics();
+ verify(metrics, times(1)).counter("num_of_running_flows", FlowActor.class);
+ verify(metrics, times(1))
+ .counter("num_of_finished_flows", FlowActor.class, "finalized", "false");
+ verifyActions(groupActor, Action.FLOW_DOWN);
+ verifyEmptyAction(flowActor);
+ }
+}
diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/FlowActorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/FlowActorTest.java
new file mode 100644
index 0000000..c7e672f
--- /dev/null
+++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/FlowActorTest.java
@@ -0,0 +1,402 @@
+/*
+ * Copyright 2025 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.maestro.flow.actor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.netflix.maestro.AssertHelper;
+import com.netflix.maestro.exceptions.MaestroUnprocessableEntityException;
+import com.netflix.maestro.flow.models.Flow;
+import com.netflix.maestro.flow.models.Task;
+import com.netflix.maestro.flow.models.TaskDef;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class FlowActorTest extends ActorBaseTest {
+
+ private GroupActor groupActor;
+ private FlowActor flowActor;
+ private Flow flow;
+
+ @Before
+ public void init() {
+ groupActor = createGroupActor();
+ flow = createFlow();
+ flowActor = new FlowActor(flow, groupActor, context);
+ }
+
+ @Test
+ public void testBeforeRunning() {
+ flowActor.beforeRunning();
+ verify(context, times(1)).getMetrics();
+ verify(metrics, times(1)).counter("num_of_running_flows", FlowActor.class);
+ }
+
+ @Test
+ public void testRunForActionFlowStart() {
+ assertNull(flow.getPrepareTask());
+ assertNull(flow.getMonitorTask());
+
+ flowActor.runForAction(Action.FLOW_START);
+ assertNotNull(flow.getPrepareTask());
+ assertNotNull(flow.getMonitorTask());
+ verify(context, times(0)).resumeFlow(flow);
+ verify(context, times(1)).prepare(flow);
+ assertEquals(Set.of(Action.FLOW_TIMEOUT), flowActor.getScheduledActions().keySet());
+
+ var actions = flowActor.getActions();
+ assertEquals(1, actions.size());
+ assertEquals(Action.FlowReconcile.class, actions.poll().getClass());
+ }
+
+ @Test
+ public void testRunForActionFlowResumeWithRunningFlow() {
+ assertNull(flow.getPrepareTask());
+ assertNull(flow.getMonitorTask());
+
+ flowActor.runForAction(Action.FLOW_RESUME);
+ assertNull(flow.getPrepareTask());
+ assertNull(flow.getMonitorTask());
+ verify(context, times(1)).resumeFlow(flow);
+ verify(context, times(0)).prepare(flow);
+ assertEquals(Set.of(Action.FLOW_TIMEOUT), flowActor.getScheduledActions().keySet());
+
+ var actions = flowActor.getActions();
+ assertEquals(1, actions.size());
+ assertEquals(Action.FlowReconcile.class, actions.poll().getClass());
+ }
+
+ @Test
+ public void testRunForActionFlowResumeWithRunningTasks() {
+ Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false);
+ Task task2 = flow.newTask(new TaskDef("task2", "noop", null, null), false);
+ task2.setStatus(Task.Status.FAILED);
+ flow.addFinishedTask(task2);
+
+ flowActor.runForAction(Action.FLOW_RESUME);
+ var actions = flowActor.getActions();
+ assertEquals(2, actions.size());
+ assertEquals(new Action.FlowTaskRetry("task2"), actions.poll());
+ assertEquals(Action.FlowReconcile.class, actions.poll().getClass());
+
+ verify(context, times(1)).resumeFlow(flow);
+ verify(context, times(1)).cloneTask(task1);
+ verify(context, times(1)).run(any());
+ verify(context, times(0)).prepare(flow);
+ assertEquals(Set.of(Action.FLOW_TIMEOUT), flowActor.getScheduledActions().keySet());
+
+ assertTrue(flowActor.containsChild("task1"));
+ var child = flowActor.getChild("task1");
+ verifyActions(child, Action.TASK_RESUME);
+ }
+
+ @Test
+ public void testRunForActionFlowResumeWithTerminatedFlow() {
+ flow.setStatus(Flow.Status.FAILED);
+ assertNull(flow.getPrepareTask());
+ assertNull(flow.getMonitorTask());
+
+ flowActor.runForAction(Action.FLOW_RESUME);
+ assertNull(flow.getPrepareTask());
+ assertNull(flow.getMonitorTask());
+ assertFalse(flowActor.isRunning());
+
+ flowActor.afterRunning();
+ verify(context, times(1)).deleteFlow(flow);
+ verify(context, times(1)).resumeFlow(flow);
+ verify(context, times(0)).prepare(flow);
+ verifyEmptyAction(flowActor);
+ }
+
+ @Test
+ public void testRunForActionFlowReconcile() {
+ flow.getFlowDef()
+ .setTasks(
+ List.of(
+ List.of(new TaskDef("task1", "noop", null, null)),
+ List.of(new TaskDef("task2", "noop", null, null))));
+ flow.setPrepareTask(flow.newTask(new TaskDef("prepare", "noop", null, null), true));
+ flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true));
+
+ flowActor.runForAction(new Action.FlowReconcile(123));
+ assertEquals(2, flow.getRunningTasks().size());
+ assertTrue(flowActor.containsChild("task1"));
+ assertTrue(flowActor.containsChild("task2"));
+ var child1 = flowActor.getChild("task1");
+ var child2 = flowActor.getChild("task2");
+ verifyActions(child1, Action.TASK_START);
+ verifyActions(child2, Action.TASK_START);
+
+ verify(context, times(2)).cloneTask(any());
+ verify(context, times(1)).refresh(flow);
+ verify(context, times(0)).finalCall(flow);
+
+ var scheduledActions = flowActor.getScheduledActions();
+ assertEquals(1, scheduledActions.size());
+ assertEquals(
+ Action.FlowReconcile.class,
+ scheduledActions.keySet().stream().findFirst().get().getClass());
+ }
+
+ @Test
+ public void testRunForActionFlowReconcileWithFailedPreparedTask() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+
+ flow.getFlowDef()
+ .setTasks(
+ List.of(
+ List.of(new TaskDef("task1", "noop", null, null)),
+ List.of(new TaskDef("task2", "noop", null, null))));
+ flow.setPrepareTask(flow.newTask(new TaskDef("prepare", "noop", null, null), true));
+ flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true));
+
+ flowActor.runForAction(new Action.FlowReconcile(123));
+ var child1 = flowActor.getChild("task1");
+ var child2 = flowActor.getChild("task2");
+ verifyActions(child1, Action.TASK_START);
+ verifyActions(child2, Action.TASK_START);
+ assertTrue(flowActor.isRunning());
+ verify(context, times(2)).cloneTask(any());
+ verify(context, times(1)).refresh(flow);
+
+ flow.getPrepareTask().setStatus(Task.Status.FAILED_WITH_TERMINAL_ERROR);
+ flowActor.runForAction(new Action.FlowReconcile(123));
+ assertEquals(Task.Status.CANCELED, flow.getMonitorTask().getStatus());
+ assertEquals(Flow.Status.FAILED, flow.getStatus());
+ verify(future, times(1)).cancel(true);
+ verifyActions(child1, Action.TASK_PING);
+ verifyActions(child2, Action.TASK_PING);
+ verify(context, times(1)).finalCall(flow);
+ assertFalse(flowActor.isRunning());
+ }
+
+ @Test
+ public void testFlowRefreshRunningFlow() {
+ flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true));
+ flowActor.runForAction(Action.FLOW_REFRESH);
+ verify(context, times(1)).refresh(flow);
+ verify(context, times(0)).finalCall(flow);
+ }
+
+ @Test
+ public void testFlowRefreshTerminatedFlow() {
+ flow.setPrepareTask(flow.newTask(new TaskDef("prepare", "noop", null, null), true));
+ flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true));
+ flow.getMonitorTask().setStatus(Task.Status.COMPLETED);
+
+ flowActor.runForAction(Action.FLOW_REFRESH);
+ verify(context, times(1)).refresh(flow);
+ verify(context, times(1)).finalCall(flow);
+ assertEquals(Flow.Status.COMPLETED, flow.getStatus());
+ assertFalse(flowActor.isRunning());
+ }
+
+ @Test
+ public void testFlowTaskRetry() {
+ Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false);
+ task1.setStatus(Task.Status.FAILED);
+ flow.addFinishedTask(task1);
+
+ flowActor.runForAction(new Action.FlowTaskRetry("task1"));
+ assertTrue(flowActor.containsChild("task1"));
+ var child = flowActor.getChild("task1");
+ verifyActions(child, Action.TASK_START);
+ assertEquals(Set.of("task1"), flow.getRunningTasks().keySet());
+ verify(context, times(1)).cloneTask(any());
+ verify(context, times(1)).run(any());
+ }
+
+ @Test
+ public void testTaskUpdateForRunningTask() {
+ Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false);
+ task1.setStatus(Task.Status.FAILED);
+ flow.addFinishedTask(task1);
+ flowActor.runForAction(new Action.FlowTaskRetry("task1"));
+
+ Task task2 = flow.newTask(new TaskDef("task1", "noop", Map.of(), null), false);
+ task2.setStatus(Task.Status.IN_PROGRESS);
+
+ flowActor.runForAction(new Action.TaskUpdate(task2));
+ assertEquals(Set.of(Action.FLOW_REFRESH), flowActor.getScheduledActions().keySet());
+ assertEquals(task2, flow.getRunningTasks().get("task1"));
+ verifyEmptyAction(flowActor);
+ }
+
+ @Test
+ public void testTaskUpdateForRunningInactiveTask() {
+ Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false);
+ task1.setStatus(Task.Status.FAILED);
+ flow.addFinishedTask(task1);
+ flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true));
+
+ flowActor.runForAction(new Action.FlowTaskRetry("task1"));
+ Task task2 = flow.newTask(new TaskDef("task1", "noop", Map.of(), null), false);
+ task2.setStatus(Task.Status.IN_PROGRESS);
+ task2.setActive(false);
+ task2.setStartDelayInSeconds(3000);
+
+ flowActor.runForAction(new Action.TaskUpdate(task2));
+ assertEquals(
+ Set.of(Action.FLOW_REFRESH, new Action.TaskWakeUp("task1")),
+ flowActor.getScheduledActions().keySet());
+ assertEquals(task2, flow.getRunningTasks().get("task1"));
+ }
+
+ @Test
+ public void testTaskUpdateForTerminatedTask() {
+ flow.getFlowDef()
+ .setTasks(
+ List.of(
+ List.of(new TaskDef("task1", "noop", null, null)),
+ List.of(new TaskDef("task2", "noop", null, null))));
+ flow.setPrepareTask(flow.newTask(new TaskDef("prepare", "noop", null, null), true));
+ flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true));
+
+ Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false);
+ task1.setStatus(Task.Status.FAILED);
+ flow.addFinishedTask(task1);
+ flowActor.runForAction(new Action.FlowTaskRetry("task1"));
+ verify(context, times(1)).cloneTask(any());
+
+ Task task2 = flow.newTask(new TaskDef("task1", "noop", Map.of(), null), false);
+ task2.setStatus(Task.Status.FAILED);
+ task2.setStartDelayInSeconds(3000);
+ flowActor.runForAction(new Action.TaskUpdate(task2));
+ assertEquals(
+ Set.of(new Action.FlowTaskRetry("task1")), flowActor.getScheduledActions().keySet());
+ assertFalse(flowActor.containsChild("task1"));
+ assertFalse(flow.getRunningTasks().containsKey("task1"));
+ assertTrue(flowActor.containsChild("task2"));
+ assertTrue(flow.getRunningTasks().containsKey("task2"));
+ var child2 = flowActor.getChild("task2");
+ verifyActions(child2, Action.TASK_START);
+
+ verify(context, times(2)).cloneTask(any());
+ verify(context, times(1)).refresh(flow);
+ verify(context, times(0)).finalCall(flow);
+ }
+
+ @Test
+ public void testTaskWakeUpWithRunningTask() {
+ Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false);
+ task1.setStatus(Task.Status.FAILED);
+ flow.addFinishedTask(task1);
+ flowActor.runForAction(new Action.FlowTaskRetry("task1"));
+ verifyActions(flowActor.getChild("task1"), Action.TASK_START);
+
+ Task task2 = flow.getRunningTasks().get("task1");
+ task2.setActive(false);
+ flowActor.runForAction(new Action.TaskWakeUp("task1"));
+ assertTrue(task2.isActive());
+ verifyActions(flowActor.getChild("task1"), Action.TASK_ACTIVATE);
+ }
+
+ @Test
+ public void testTaskWakeUpWithQueuedTask() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+ when(future.cancel(false)).thenReturn(true);
+
+ Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false);
+ task1.setStatus(Task.Status.FAILED);
+ task1.setStartDelayInSeconds(3000);
+ flow.addFinishedTask(task1);
+ flowActor.runForAction(Action.FLOW_RESUME);
+ assertFalse(flowActor.containsChild("task1"));
+
+ flowActor.runForAction(new Action.TaskWakeUp("task1"));
+ verify(future, times(1)).cancel(false);
+ verifyActions(flowActor.getChild("task1"), Action.TASK_START, Action.TASK_ACTIVATE);
+ assertTrue(flowActor.containsChild("task1"));
+ }
+
+ @Test
+ public void testFlowTimeoutWithoutTasks() {
+ flowActor.runForAction(Action.FLOW_TIMEOUT);
+ assertEquals(Flow.Status.TIMED_OUT, flow.getStatus());
+ assertEquals(Set.of(Action.FLOW_TIMEOUT), flowActor.getScheduledActions().keySet());
+ }
+
+ @Test
+ public void testFlowTimeoutWithTasks() {
+ var future = Mockito.mock(ScheduledFuture.class);
+ when(context.schedule(any(), anyLong())).thenReturn(future);
+ when(future.cancel(false)).thenReturn(true);
+
+ Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false);
+ task1.setStatus(Task.Status.FAILED);
+ task1.setStartDelayInSeconds(3000);
+ flow.addFinishedTask(task1);
+ flowActor.runForAction(Action.FLOW_RESUME);
+ assertFalse(flowActor.containsChild("task1"));
+
+ flowActor.runForAction(Action.FLOW_TIMEOUT);
+ assertEquals(Flow.Status.TIMED_OUT, flow.getStatus());
+ assertEquals(
+ Set.of(new Action.FlowTaskRetry("task1"), Action.FLOW_TIMEOUT),
+ flowActor.getScheduledActions().keySet());
+ verifyActions(flowActor.getChild("task1"), Action.TASK_START, Action.TASK_STOP);
+ }
+
+ @Test
+ public void testFlowShutdown() {
+ flowActor.runForAction(Action.FLOW_SHUTDOWN);
+ verifyActions(groupActor, Action.FLOW_DOWN);
+ assertFalse(flowActor.isRunning());
+ }
+
+ @Test
+ public void testTaskDown() {
+ flowActor.runForAction(Action.TASK_DOWN);
+ verifyActions(groupActor, Action.FLOW_DOWN);
+ assertFalse(flowActor.isRunning());
+ }
+
+ @Test
+ public void testUnexpectedAction() {
+ AssertHelper.assertThrows(
+ "should throw for unexpected action",
+ MaestroUnprocessableEntityException.class,
+ "Unexpected action: [TaskPing[]] for flow ",
+ () -> flowActor.runForAction(Action.TASK_PING));
+ }
+
+ @Test
+ public void testAfterRunning() {
+ flowActor.afterRunning();
+ verify(context, times(1)).getMetrics();
+ verify(metrics, times(1))
+ .counter("num_of_finished_flows", FlowActor.class, "finalized", "false");
+ verify(context, times(0)).deleteFlow(any());
+ }
+
+ @Test
+ public void testReference() {
+ assertEquals("test-flow-ref", flowActor.reference());
+ }
+}
diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/GroupActorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/GroupActorTest.java
new file mode 100644
index 0000000..4fb72b6
--- /dev/null
+++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/GroupActorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2025 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.maestro.flow.actor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.netflix.maestro.AssertHelper;
+import com.netflix.maestro.exceptions.MaestroUnprocessableEntityException;
+import com.netflix.maestro.flow.models.Flow;
+import java.util.List;
+import java.util.Set;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GroupActorTest extends ActorBaseTest {
+ private GroupActor groupActor;
+ private Flow flow;
+
+ @Before
+ public void init() {
+ groupActor = createGroupActor();
+ flow = createFlow();
+ }
+
+ @Test
+ public void testGeneration() {
+ assertEquals(2, groupActor.generation());
+ }
+
+ @Test
+ public void testBeforeRunning() {
+ groupActor.beforeRunning();
+ verify(context, times(1)).schedule(any(), anyLong());
+ assertEquals(Set.of(Action.GROUP_HEARTBEAT), groupActor.getScheduledActions().keySet());
+ }
+
+ @Test
+ public void testRunForActionStartGroupWithAnException() {
+ when(context.getFlowsFrom(any(), anyLong(), any())).thenReturn(null);
+ groupActor.runForAction(Action.GROUP_START);
+
+ verify(context, times(1)).getFlowsFrom(any(), anyLong(), any());
+ verify(context, times(1)).schedule(any(), anyLong());
+ assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet());
+ }
+
+ @Test
+ public void testRunForActionStartGroupWithoutNewFlow() {
+ when(context.getFlowsFrom(any(), anyLong(), any())).thenReturn(List.of());
+ groupActor.runForAction(Action.GROUP_START);
+
+ verify(context, times(1)).getFlowsFrom(any(), anyLong(), any());
+ verify(context, times(0)).schedule(any(), anyLong());
+ assertTrue(groupActor.getScheduledActions().isEmpty());
+ }
+
+ @Test
+ public void testRunForActionStartGroupWithNewFlow() {
+ when(context.getFlowsFrom(any(), anyLong(), any()))
+ .thenReturn(List.of(flow))
+ .thenReturn(List.of());
+ groupActor.runForAction(Action.GROUP_START);
+
+ verifyActions(groupActor, new Action.FlowLaunch(flow, true));
+ verify(context, times(2)).getFlowsFrom(any(), anyLong(), any());
+ verify(context, times(0)).schedule(any(), anyLong());
+ assertTrue(groupActor.getScheduledActions().isEmpty());
+ }
+
+ @Test
+ public void testRunForActionFlowLaunchResume() {
+ groupActor.runForAction(new Action.FlowLaunch(flow, true));
+
+ verify(context, times(1)).run(any());
+ var child = groupActor.getChild(flow.getReference());
+ verifyActions(child, Action.FLOW_RESUME);
+
+ groupActor.runForAction(new Action.FlowLaunch(flow, true));
+ verify(context, times(1)).run(any());
+ assertEquals(child, groupActor.getChild(flow.getReference()));
+ }
+
+ @Test
+ public void testRunForActionFlowLaunchStart() {
+ groupActor.runForAction(new Action.FlowLaunch(flow, false));
+
+ verify(context, times(1)).run(any());
+ var child = groupActor.getChild(flow.getReference());
+ verifyActions(child, Action.FLOW_START);
+
+ groupActor.runForAction(new Action.FlowLaunch(flow, true));
+ verify(context, times(1)).run(any());
+ assertEquals(child, groupActor.getChild(flow.getReference()));
+ }
+
+ @Test
+ public void testRunForActionFlowWakeUp() {
+ groupActor.runForAction(new Action.FlowWakeUp(flow.getReference(), "taskRef"));
+ assertNull(groupActor.getChild(flow.getReference()));
+
+ groupActor.runForAction(new Action.FlowLaunch(flow, false));
+ verify(context, times(1)).run(any());
+ var child = groupActor.getChild(flow.getReference());
+ verifyActions(child, Action.FLOW_START);
+
+ groupActor.runForAction(new Action.FlowWakeUp(flow.getReference(), "taskRef"));
+ verifyActions(child, new Action.TaskWakeUp("taskRef"));
+ }
+
+ @Test
+ public void testGroupHeartbeat() {
+ groupActor.runForAction(Action.GROUP_HEARTBEAT);
+
+ verify(context, times(1)).heartbeatGroup(any());
+ verify(context, times(1)).schedule(any(), anyLong());
+ assertEquals(Set.of(Action.GROUP_HEARTBEAT), groupActor.getScheduledActions().keySet());
+ }
+
+ @Test
+ public void testGroupShutdown() {
+ assertTrue(groupActor.isRunning());
+
+ groupActor.runForAction(Action.GROUP_SHUTDOWN);
+ assertFalse(groupActor.isRunning());
+ verifyEmptyAction(groupActor);
+ assertTrue(groupActor.getScheduledActions().isEmpty());
+ }
+
+ @Test
+ public void testFlowDown() {
+ assertTrue(groupActor.isRunning());
+
+ groupActor.runForAction(Action.FLOW_DOWN);
+ assertFalse(groupActor.isRunning());
+ verifyEmptyAction(groupActor);
+ assertTrue(groupActor.getScheduledActions().isEmpty());
+ }
+
+ @Test
+ public void testUnexpectedAction() {
+ AssertHelper.assertThrows(
+ "should throw for unexpected action",
+ MaestroUnprocessableEntityException.class,
+ "Unexpected action: [TaskDown[]] for FlowGroup [group-1]",
+ () -> groupActor.runForAction(Action.TASK_DOWN));
+ }
+
+ @Test
+ public void testReference() {
+ assertEquals("group-1", groupActor.reference());
+ }
+}
diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/TaskActorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/TaskActorTest.java
new file mode 100644
index 0000000..cf261ff
--- /dev/null
+++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/TaskActorTest.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright 2025 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.maestro.flow.actor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.netflix.maestro.flow.models.Flow;
+import com.netflix.maestro.flow.models.Task;
+import com.netflix.maestro.flow.models.TaskDef;
+import java.util.Set;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TaskActorTest extends ActorBaseTest {
+
+ private Flow flow;
+ private Task task;
+ private FlowActor flowActor;
+ private TaskActor taskActor;
+
+ @Before
+ public void init() {
+ flow = createFlow();
+ task = flow.newTask(new TaskDef("task1", "noop", null, null), false);
+ flowActor = new FlowActor(flow, createGroupActor(), context);
+ taskActor = new TaskActor(task, flow, flowActor, context);
+ }
+
+ @Test
+ public void testBeforeRunning() {
+ taskActor.beforeRunning();
+ verify(context, times(1)).getMetrics();
+ verify(metrics, times(1)).counter("num_of_running_tasks", TaskActor.class);
+ }
+
+ @Test
+ public void testRunForActionTaskStart() {
+ taskActor.runForAction(Action.TASK_START);
+ verify(context, times(1)).start(any(), any());
+ verifyActions(taskActor, Action.TASK_PING);
+ }
+
+ @Test
+ public void testRunForActionTaskResume() {
+ taskActor.runForAction(Action.TASK_RESUME);
+ verify(context, times(0)).start(any(), any());
+ verifyActions(taskActor, Action.TASK_PING);
+ }
+
+ @Test
+ public void testRunForActionStopForActiveRunningTask() {
+ taskActor.runForAction(Action.TASK_STOP);
+ verify(context, times(1)).cancel(any(), any());
+ assertEquals(Set.of(Action.TASK_STOP), taskActor.getScheduledActions().keySet());
+ verifyEmptyAction(taskActor);
+ assertTrue(taskActor.isRunning());
+ }
+
+ @Test
+ public void testRunForActionStopForInactiveTask() {
+ task.setActive(false);
+
+ taskActor.runForAction(Action.TASK_STOP);
+ verify(context, times(0)).cancel(any(), any());
+ }
+
+ @Test
+ public void testRunForActionStopForTerminatedTask() {
+ task.setStatus(Task.Status.FAILED);
+
+ taskActor.runForAction(Action.TASK_STOP);
+ verify(context, times(1)).cancel(any(), any());
+ assertTrue(taskActor.getScheduledActions().isEmpty());
+ verifyEmptyAction(taskActor);
+ assertFalse(taskActor.isRunning());
+ verify(context, times(1)).cloneTask(any());
+ verifyActions(flowActor, new Action.TaskUpdate(task));
+ }
+
+ @Test
+ public void testRunForActionTaskPingForActiveRunningTaskWithChange() {
+ task.setStartTime(System.currentTimeMillis());
+ task.setTimeoutInMillis(3600000L);
+ task.setStartDelayInSeconds(3000);
+ when(context.execute(flow, task)).thenReturn(true);
+
+ taskActor.runForAction(Action.TASK_PING);
+ verify(context, times(1)).execute(any(), any());
+ assertEquals(
+ Set.of(Action.TASK_TIMEOUT, Action.TASK_PING), taskActor.getScheduledActions().keySet());
+ assertNull(task.getTimeoutInMillis());
+ assertEquals(1, task.getPollCount());
+ verify(context, times(1)).cloneTask(any());
+ verifyActions(flowActor, new Action.TaskUpdate(task));
+ }
+
+ @Test
+ public void testRunForActionTaskPingForInactiveRunningTaskWithoutChange() {
+ task.setActive(false);
+
+ taskActor.runForAction(Action.TASK_PING);
+ verify(context, times(0)).execute(any(), any());
+ assertTrue(taskActor.getScheduledActions().isEmpty());
+ assertEquals(0, task.getPollCount());
+ verify(context, times(0)).cloneTask(any());
+ verifyEmptyAction(flowActor);
+ }
+
+ @Test
+ public void testRunForActionTaskPingForTerminatedTask() {
+ task.setStatus(Task.Status.FAILED);
+ task.setStartDelayInSeconds(3000);
+ when(context.execute(flow, task)).thenReturn(false);
+
+ taskActor.runForAction(Action.TASK_PING);
+ verify(context, times(1)).execute(any(), any());
+ assertFalse(taskActor.isRunning());
+ verify(context, times(1)).cloneTask(any());
+ verifyActions(flowActor, new Action.TaskUpdate(task));
+ verifyEmptyAction(taskActor);
+ assertTrue(taskActor.getScheduledActions().isEmpty());
+ }
+
+ @Test
+ public void testRunForActionTaskActivate() {
+ verifyExecute(Action.TASK_ACTIVATE, false);
+ }
+
+ private void verifyExecute(Action action, boolean activeFlag) {
+ task.setActive(activeFlag);
+ task.setStartDelayInSeconds(3000L);
+ when(context.execute(flow, task)).thenReturn(false);
+
+ taskActor.runForAction(action);
+ assertTrue(task.isActive());
+ verify(context, times(1)).execute(any(), any());
+ assertEquals(1, task.getPollCount());
+ assertEquals(Set.of(Action.TASK_PING), taskActor.getScheduledActions().keySet());
+ verify(context, times(0)).cloneTask(any());
+ verifyEmptyAction(flowActor);
+ }
+
+ @Test
+ public void testRunForActionTaskTimeout() {
+ verifyExecute(Action.TASK_TIMEOUT, true);
+ }
+
+ @Test
+ public void testRunForActionTaskShutdown() {
+ taskActor.runForAction(Action.TASK_SHUTDOWN);
+ assertFalse(taskActor.isRunning());
+ verifyActions(flowActor, Action.TASK_DOWN);
+ }
+
+ @Test
+ public void testAfterRunning() {
+ taskActor.afterRunning();
+ verify(context, times(1)).getMetrics();
+ verify(metrics, times(1)).counter("num_of_finished_tasks", TaskActor.class);
+ }
+
+ @Test
+ public void testReference() {
+ assertEquals("task1", taskActor.reference());
+ }
+}
diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/dao/MaestroFlowDaoTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/dao/MaestroFlowDaoTest.java
new file mode 100644
index 0000000..012f184
--- /dev/null
+++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/dao/MaestroFlowDaoTest.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2025 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.maestro.flow.dao;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.netflix.maestro.AssertHelper;
+import com.netflix.maestro.database.DatabaseConfiguration;
+import com.netflix.maestro.database.DatabaseSourceProvider;
+import com.netflix.maestro.exceptions.MaestroRetryableError;
+import com.netflix.maestro.flow.FlowBaseTest;
+import com.netflix.maestro.flow.models.Flow;
+import com.netflix.maestro.flow.models.FlowGroup;
+import com.netflix.maestro.metrics.MaestroMetrics;
+import com.netflix.maestro.utils.JsonHelper;
+import java.io.IOException;
+import javax.sql.DataSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class MaestroFlowDaoTest extends FlowBaseTest {
+
+ private static DatabaseConfiguration config;
+ private static DataSource dataSource;
+ private static ObjectMapper mapper;
+ private static MaestroMetrics metrics;
+
+ private static class MaestroDBTestConfiguration implements DatabaseConfiguration {
+ @Override
+ public String getJdbcUrl() {
+ return "jdbc:tc:cockroach:v22.2.19:///maestro";
+ }
+
+ @Override
+ public int getConnectionPoolMaxSize() {
+ return 10;
+ }
+
+ @Override
+ public int getConnectionPoolMinIdle() {
+ return getConnectionPoolMaxSize();
+ }
+ }
+
+ @BeforeClass
+ public static void init() {
+ config = new MaestroDBTestConfiguration();
+ dataSource = new DatabaseSourceProvider(config).get();
+ mapper = JsonHelper.objectMapper();
+ metrics = Mockito.mock(MaestroMetrics.class);
+ }
+
+ private MaestroFlowDao dao;
+ private FlowGroup group;
+
+ @Before
+ public void setUp() throws IOException {
+ dao = new MaestroFlowDao(dataSource, mapper, config, metrics);
+ group = new FlowGroup(10, 1, "testAddress");
+ dao.insertGroup(group);
+ }
+
+ @After
+ public void tearDown() {
+ dao.deleteGroup(10);
+ }
+
+ @Test
+ public void testInsertFlow() {
+ Flow flow = createFlow();
+ dao.insertFlow(flow);
+ var flows = dao.getFlows(new FlowGroup(10, 2, "testAddress"), 10, "test");
+ assertEquals(1, flows.size());
+ assertEquals(10, flows.getFirst().getGroupId());
+ assertEquals("test-flow-id", flows.getFirst().getFlowId());
+ assertEquals(2, flows.getFirst().getGeneration());
+ assertEquals(flow.getStartTime(), flows.getFirst().getStartTime());
+ assertEquals("test-flow-ref", flows.getFirst().getReference());
+ dao.deleteFlow(flow);
+ }
+
+ @Test
+ public void testDeleteFlow() {
+ Flow flow = createFlow();
+ dao.insertFlow(flow);
+ dao.deleteFlow(flow);
+ var flows = dao.getFlows(new FlowGroup(10, 2, "testAddress"), 10, "test");
+ assertTrue(flows.isEmpty());
+ }
+
+ @Test
+ public void testGetFlows() {
+ Flow flow = createFlow();
+ dao.insertFlow(flow);
+ var flows = dao.getFlows(new FlowGroup(10, 1, "testAddress"), 10, "test");
+ assertTrue(flows.isEmpty());
+ flows = dao.getFlows(new FlowGroup(9, 2, "testAddress"), 10, "test");
+ assertTrue(flows.isEmpty());
+ flows = dao.getFlows(new FlowGroup(9, 2, "testAddress"), 10, "z");
+ assertTrue(flows.isEmpty());
+ dao.deleteFlow(flow);
+ }
+
+ @Test
+ public void testHeartbeatGroup() {
+ assertTrue(dao.heartbeatGroup(group));
+ assertFalse(dao.heartbeatGroup(new FlowGroup(10, 2, "testAddress")));
+ }
+
+ @Test
+ public void testClaimExpiredGroup() {
+ assertNull(dao.claimExpiredGroup("address2", 100000));
+ FlowGroup claimed = dao.claimExpiredGroup("address2", -100000);
+ assertEquals(10, claimed.groupId());
+ assertEquals(2, claimed.generation());
+ assertEquals("address2", claimed.address());
+ }
+
+ @Test
+ public void testInsertGroup() {
+ AssertHelper.assertThrows(
+ "should throw and retry",
+ MaestroRetryableError.class,
+ "insertGroup for group [10] is failed (res=[0])",
+ () -> dao.insertGroup(group));
+ FlowGroup claimed = dao.claimExpiredGroup("address2", -100000);
+ assertEquals(10, claimed.groupId());
+ assertEquals(2, claimed.generation());
+ assertEquals("address2", claimed.address());
+ }
+
+ @Test
+ public void testExistFlowWithSameKeys() {
+ Flow flow = createFlow();
+ dao.insertFlow(flow);
+ assertTrue(dao.existFlowWithSameKeys(10, "test-flow-id"));
+ assertFalse(dao.existFlowWithSameKeys(2, "test-flow-id"));
+ assertFalse(dao.existFlowWithSameKeys(10, "test-flow-id2"));
+ dao.deleteFlow(flow);
+ }
+}
diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/ExecutionContextTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/ExecutionContextTest.java
new file mode 100644
index 0000000..e811e31
--- /dev/null
+++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/ExecutionContextTest.java
@@ -0,0 +1,293 @@
+/*
+ * Copyright 2025 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.maestro.flow.engine;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.netflix.maestro.AssertHelper;
+import com.netflix.maestro.exceptions.MaestroInternalError;
+import com.netflix.maestro.exceptions.MaestroRetryableError;
+import com.netflix.maestro.exceptions.MaestroUnprocessableEntityException;
+import com.netflix.maestro.flow.FlowBaseTest;
+import com.netflix.maestro.flow.dao.MaestroFlowDao;
+import com.netflix.maestro.flow.models.Flow;
+import com.netflix.maestro.flow.models.FlowGroup;
+import com.netflix.maestro.flow.models.Task;
+import com.netflix.maestro.flow.models.TaskDef;
+import com.netflix.maestro.flow.properties.FlowEngineProperties;
+import com.netflix.maestro.flow.runtime.ExecutionPreparer;
+import com.netflix.maestro.flow.runtime.FinalFlowStatusCallback;
+import com.netflix.maestro.flow.runtime.FlowTask;
+import com.netflix.maestro.metrics.MaestroMetrics;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+public class ExecutionContextTest extends FlowBaseTest {
+ @Mock FlowTask flowTask;
+ @Mock private FinalFlowStatusCallback finalCallback;
+ @Mock private ExecutionPreparer executionPreparer;
+ @Mock private MaestroFlowDao flowDao;
+ @Mock private FlowEngineProperties properties;
+ @Mock private MaestroMetrics metrics;
+
+ private ExecutionContext context;
+ private Flow flow;
+ private FlowGroup group;
+
+ @Before
+ public void init() {
+ context =
+ new ExecutionContext(
+ Map.of("noop", flowTask),
+ finalCallback,
+ executionPreparer,
+ flowDao,
+ properties,
+ metrics);
+ flow = createFlow();
+ group = new FlowGroup(1, 1, "testAddress");
+ }
+
+ @Test
+ public void testRun() {
+ AtomicInteger counter = new AtomicInteger(0);
+ context.run(counter::incrementAndGet);
+ context.shutdown();
+ assertEquals(1, counter.get());
+ }
+
+ @Test
+ public void testSchedule() {
+ AtomicInteger counter = new AtomicInteger(0);
+ context.schedule(counter::incrementAndGet, 1);
+ context.shutdown();
+ assertEquals(1, counter.get());
+ }
+
+ @Test
+ public void testPrepareDone() {
+ Task prepare = flow.newTask(new TaskDef("prepare", "noop", null, null), true);
+ flow.setPrepareTask(prepare);
+ assertNull(flow.getPrepareTask().getStartTime());
+ prepare.setStatus(Task.Status.COMPLETED);
+ prepare.setReasonForIncompletion("hello");
+ context.prepare(flow);
+ assertNotNull(flow.getPrepareTask().getStartTime());
+ verify(flowTask, times(1)).execute(flow, prepare);
+ assertEquals("hello", flow.getReasonForIncompletion());
+ }
+
+ @Test
+ public void testPrepareRetry() {
+ Task prepare = flow.newTask(new TaskDef("prepare", "noop", null, null), true);
+ flow.setPrepareTask(prepare);
+ assertNull(flow.getPrepareTask().getStartTime());
+
+ AssertHelper.assertThrows(
+ "should throw and retry",
+ MaestroRetryableError.class,
+ "prepare task is not done yet",
+ () -> context.prepare(flow));
+ assertNotNull(flow.getPrepareTask().getStartTime());
+ verify(flowTask, times(1)).execute(flow, prepare);
+ }
+
+ @Test
+ public void testRefresh() {
+ Task monitor = flow.newTask(new TaskDef("monitor", "noop", null, null), true);
+ flow.setMonitorTask(monitor);
+
+ context.refresh(flow);
+ verify(flowTask, times(1)).execute(flow, monitor);
+ }
+
+ @Test
+ public void testFinalCall() {
+ flow.getFlowDef().setFinalFlowStatusCallbackEnabled(false);
+ context.finalCall(flow);
+ verify(finalCallback, times(0)).onFlowCompleted(flow);
+ verify(finalCallback, times(0)).onFlowTerminated(flow);
+ verify(finalCallback, times(0)).onFlowFinalized(flow);
+
+ flow.getFlowDef().setFinalFlowStatusCallbackEnabled(true);
+ flow.setStatus(Flow.Status.COMPLETED);
+ context.finalCall(flow);
+ verify(finalCallback, times(1)).onFlowCompleted(flow);
+ verify(finalCallback, times(0)).onFlowTerminated(flow);
+ verify(finalCallback, times(1)).onFlowFinalized(flow);
+
+ Mockito.reset(finalCallback);
+ flow.setStatus(Flow.Status.FAILED);
+ context.finalCall(flow);
+ verify(finalCallback, times(0)).onFlowCompleted(flow);
+ verify(finalCallback, times(1)).onFlowTerminated(flow);
+ verify(finalCallback, times(1)).onFlowFinalized(flow);
+ }
+
+ @Test
+ public void testStart() {
+ Task task = flow.newTask(new TaskDef("task", "noop", null, null), false);
+ context.start(flow, task);
+ verify(flowTask, times(1)).start(flow, task);
+ }
+
+ @Test
+ public void testExecute() {
+ Task task = flow.newTask(new TaskDef("task", "noop", null, null), false);
+ context.execute(flow, task);
+ verify(flowTask, times(1)).execute(flow, task);
+ }
+
+ @Test
+ public void testCancel() {
+ Task task = flow.newTask(new TaskDef("task", "noop", null, null), false);
+ context.cancel(flow, task);
+ verify(flowTask, times(1)).cancel(flow, task);
+ }
+
+ @Test
+ public void testCloneTask() {
+ Task task = flow.newTask(new TaskDef("task", "noop", null, null), false);
+ context.cloneTask(task);
+ verify(executionPreparer, times(1)).cloneTask(task);
+ }
+
+ @Test
+ public void testCloneTaskException() {
+ Task task = flow.newTask(new TaskDef("task", "noop", null, null), false);
+ Mockito.doThrow(new RuntimeException("test")).when(executionPreparer).cloneTask(task);
+ AssertHelper.assertThrows(
+ "should throw",
+ MaestroUnprocessableEntityException.class,
+ "cannot clone task: [task]",
+ () -> context.cloneTask(task));
+ }
+
+ @Test
+ public void testSaveFlow() {
+ context.saveFlow(flow);
+ verify(flowDao, times(1)).insertFlow(flow);
+ }
+
+ @Test
+ public void testSaveFlowRetry() {
+ Mockito.doThrow(new MaestroInternalError("test")).when(flowDao).insertFlow(flow);
+ AssertHelper.assertThrows(
+ "should throw and retry",
+ MaestroRetryableError.class,
+ "insertFlow is failed and please retry",
+ () -> context.saveFlow(flow));
+ }
+
+ @Test
+ public void testDeleteFlow() {
+ context.deleteFlow(flow);
+ verify(flowDao, times(1)).deleteFlow(flow);
+ }
+
+ @Test
+ public void testGetFlowsFrom() {
+ when(flowDao.getFlows(group, 10, "")).thenReturn(List.of());
+ assertEquals(List.of(), context.getFlowsFrom(group, 10, ""));
+ verify(flowDao, times(1)).getFlows(group, 10, "");
+ }
+
+ @Test
+ public void testGetFlowsFromWithError() {
+ Mockito.doThrow(new MaestroInternalError("test")).when(flowDao).getFlows(group, 10, "");
+ assertNull(context.getFlowsFrom(group, 10, ""));
+ verify(flowDao, times(1)).getFlows(group, 10, "");
+ }
+
+ @Test
+ public void testResumeFlowWithPrepare() {
+ Task prepare = flow.newTask(new TaskDef("prepare", "noop", null, null), true);
+ flow.setPrepareTask(prepare);
+ prepare.setStatus(Task.Status.COMPLETED);
+ when(executionPreparer.resume(flow)).thenReturn(true);
+
+ context.resumeFlow(flow);
+ verify(executionPreparer, times(1)).resume(flow);
+ verify(flowTask, times(1)).execute(flow, prepare);
+ }
+
+ @Test
+ public void testResumeFlowWithoutPrepare() {
+ when(executionPreparer.resume(flow)).thenReturn(false);
+
+ context.resumeFlow(flow);
+ verify(executionPreparer, times(1)).resume(flow);
+ verify(flowTask, times(0)).execute(any(), any());
+ }
+
+ @Test
+ public void testResumeFlowWithException() {
+ Mockito.doThrow(new RuntimeException("test")).when(executionPreparer).resume(flow);
+ AssertHelper.assertThrows(
+ "should throw and retry",
+ MaestroRetryableError.class,
+ "retry resuming flow due to an exception",
+ () -> context.resumeFlow(flow));
+ }
+
+ @Test
+ public void testTrySaveGroup() {
+ context.trySaveGroup(group);
+ verify(flowDao, times(1)).insertGroup(group);
+ }
+
+ @Test
+ public void testTrySaveGroupWithShutdown() {
+ context.shutdown();
+ AssertHelper.assertThrows(
+ "should throw during shutdown",
+ MaestroRetryableError.class,
+ "ExecutionContext is shutdown and cannot save a group and please retry.",
+ () -> context.trySaveGroup(group));
+ }
+
+ @Test
+ public void testTrySaveGroupWithException() {
+ Mockito.doThrow(new MaestroInternalError("test")).when(flowDao).insertGroup(group);
+ AssertHelper.assertThrows(
+ "should throw and retry",
+ MaestroRetryableError.class,
+ "insertGroup is failed and please retry",
+ () -> context.trySaveGroup(group));
+ }
+
+ @Test
+ public void testClaimGroup() {
+ context.claimGroup();
+ verify(flowDao, times(1)).claimExpiredGroup(any(), anyLong());
+ }
+
+ @Test
+ public void testHeartbeatGroup() {
+ when(flowDao.heartbeatGroup(group)).thenReturn(true);
+ context.heartbeatGroup(group);
+ verify(flowDao, times(1)).heartbeatGroup(group);
+ }
+}
diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/FlowExecutorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/FlowExecutorTest.java
new file mode 100644
index 0000000..062a420
--- /dev/null
+++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/FlowExecutorTest.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2025 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.maestro.flow.engine;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.netflix.maestro.flow.actor.Actor;
+import com.netflix.maestro.flow.actor.ActorBaseTest;
+import com.netflix.maestro.flow.models.Flow;
+import com.netflix.maestro.flow.models.FlowDef;
+import com.netflix.maestro.flow.models.FlowGroup;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class FlowExecutorTest extends ActorBaseTest {
+
+ private FlowExecutor executor;
+
+ @Before
+ public void init() {
+ when(properties.getInitialMaintenanceDelayInMillis()).thenReturn(1L);
+ when(properties.getMaintenanceDelayInMillis()).thenReturn(100000L);
+ executor = new FlowExecutor(context);
+ }
+
+ @Test
+ public void testInitWithoutClaimingGroup() {
+ when(context.claimGroup()).thenReturn(null);
+ executor.init();
+ verify(context, timeout(10000)).claimGroup();
+ }
+
+ @Test
+ public void testInitWithClaimingGroup() {
+ FlowGroup group = new FlowGroup(1, 1, "testAddress");
+ when(context.claimGroup()).thenReturn(group);
+ executor.init();
+ verify(context, timeout(10000)).claimGroup();
+ verify(context, times(1)).run(any());
+ }
+
+ @Test
+ public void testShutdown() {
+ executor.shutdown();
+ verify(context, times(1)).shutdown();
+ }
+
+ @Test
+ public void testStartFlow() {
+ var id = executor.startFlow(1, "test-id", "wf-1", new FlowDef(), Map.of());
+ verify(context, times(1)).trySaveGroup(any());
+ var actorCaptor = ArgumentCaptor.forClass(Actor.class);
+ verify(context, times(1)).run(actorCaptor.capture());
+ var flowCaptor = ArgumentCaptor.forClass(Flow.class);
+ verify(context, times(1)).saveFlow(flowCaptor.capture());
+ assertEquals("test-id", id);
+ assertTrue(actorCaptor.getValue().isRunning());
+ assertEquals("wf-1", flowCaptor.getValue().getReference());
+ }
+
+ @Test
+ public void testWakeUp() {
+ assertFalse(executor.wakeUp(1L, "wf-1", "task1"));
+
+ executor.startFlow(1, "test-id", "wf-1", new FlowDef(), Map.of());
+ assertTrue(executor.wakeUp(1L, "wf-1", "task1"));
+ assertFalse(executor.wakeUp(2L, "wf-1", "task1"));
+ assertTrue(executor.wakeUp(1L, "wf-2", "task1"));
+ }
+}
diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/models/FlowTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/models/FlowTest.java
new file mode 100644
index 0000000..b433dd1
--- /dev/null
+++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/models/FlowTest.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2025 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.maestro.flow.models;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.netflix.maestro.AssertHelper;
+import com.netflix.maestro.flow.FlowBaseTest;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FlowTest extends FlowBaseTest {
+
+ private Flow flow;
+ private Task task;
+
+ @Before
+ public void init() {
+ flow = createFlow();
+ task = flow.newTask(new TaskDef("task1", "noop", null, null), false);
+ }
+
+ @Test
+ public void testAddFinishedTask() {
+ assertEquals(1, flow.getRunningTasks().size());
+ AssertHelper.assertThrows(
+ "should throw if adding unfinished task",
+ IllegalArgumentException.class,
+ "must be in the terminal state",
+ () -> flow.addFinishedTask(task));
+ task.setStatus(Task.Status.FAILED);
+ flow.addFinishedTask(task);
+ flow.addFinishedTask(task);
+ flow.addFinishedTask(task);
+ assertEquals(1, flow.getFinishedTasks().size());
+ assertEquals(0, flow.getRunningTasks().size());
+ assertEquals(task, flow.getFinishedTasks().stream().findFirst().get());
+ }
+
+ @Test
+ public void testUpdateRunningTask() {
+ var updatedTask = flow.newTask(new TaskDef("task1", "noop1", null, null), true);
+ assertEquals(Set.of("task1"), flow.getRunningTasks().keySet());
+ flow.updateRunningTask(updatedTask);
+ assertEquals(Map.of("task1", updatedTask), flow.getRunningTasks());
+ }
+
+ @Test
+ public void testGetFinishedTasks() {
+ assertEquals(0, flow.getFinishedTasks().size());
+ var snapshot = flow.getFinishedTasks();
+ assertTrue(snapshot.isEmpty());
+
+ task.setStatus(Task.Status.FAILED);
+ flow.addFinishedTask(task);
+ assertTrue(snapshot.isEmpty());
+ assertEquals(1, flow.getFinishedTasks().size());
+ }
+
+ @Test
+ public void testGetStreamOfAllTasks() {
+ assertEquals(1, flow.getStreamOfAllTasks().count());
+ task.setStatus(Task.Status.FAILED);
+ flow.addFinishedTask(task);
+ assertEquals(1, flow.getStreamOfAllTasks().count());
+ flow.updateRunningTask(task);
+ assertEquals(2, flow.getStreamOfAllTasks().count());
+ }
+
+ @Test
+ public void testMarkTimedout() {
+ flow.markTimedout();
+ assertEquals(Flow.Status.TIMED_OUT, flow.getStatus());
+ }
+
+ @Test
+ public void testNewTask() {
+ assertEquals(1, flow.getRunningTasks().size());
+ assertEquals(Set.of("task1"), flow.getRunningTasks().keySet());
+
+ var taskDef = new TaskDef("task2", "noop", null, null);
+ var task2 = flow.newTask(taskDef, false);
+ assertEquals(taskDef, task2.getTaskDef());
+ assertTrue(task2.isActive());
+ assertFalse(task2.isTerminal());
+ assertEquals(2, flow.getRunningTasks().size());
+ assertEquals(Set.of("task1", "task2"), flow.getRunningTasks().keySet());
+ }
+
+ @Test
+ public void testNewTaskInline() {
+ assertEquals(1, flow.getRunningTasks().size());
+ assertEquals(Set.of("task1"), flow.getRunningTasks().keySet());
+
+ var taskDef = new TaskDef("task2", "noop", null, null);
+ var task2 = flow.newTask(taskDef, true);
+ assertEquals(taskDef, task2.getTaskDef());
+ assertTrue(task2.isActive());
+ assertFalse(task2.isTerminal());
+ assertEquals(1, flow.getRunningTasks().size());
+ assertEquals(Set.of("task1"), flow.getRunningTasks().keySet());
+ }
+}
diff --git a/maestro-server/src/test/resources/samples/sample-conditional-wf.json b/maestro-server/src/test/resources/samples/sample-conditional-wf.json
new file mode 100644
index 0000000..03b1303
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-conditional-wf.json
@@ -0,0 +1,30 @@
+{
+ "properties": {
+ "owner": "tester",
+ "run_strategy": "parallel",
+ "step_concurrency": 20
+ },
+ "workflow": {
+ "id": "sample-conditional-wf",
+ "name": "Test conditional workflow",
+ "description": "Long description about this workflow",
+ "params": {"param0": {"value": 15, "type": "LONG"}},
+ "steps": [
+ {"step": {"id": "job.1", "type": "NoOp", "transition": {"successors": {"job.2": "true"}}}},
+ {"step": {"id": "job.2", "type": "NoOp",
+ "transition": {"successors": {"job.3": "true"}}}},
+ {"step": {"id": "job.3", "type": "NoOp",
+ "params": {"param1": {"value": 15, "type": "LONG"}, "param2": {"value": 15, "type": "LONG"}},
+ "transition": {"successors": {"job.4": "param2 < 0", "job.6": "1 > 0"}}}},
+ {"step": {"id": "job.4", "type": "NoOp",
+ "params": {"param3": {"value": 15, "type": "LONG"}},
+ "transition": {"successors": {"job.5": "params.getFromStep('job.3', 'param2') > 0"}}}},
+ {"step": {"id": "job.5", "type": "NoOp",
+ "params": {"param4": {"value": 15, "type": "LONG"}},
+ "transition": {"successors": {"job.6": "params.getFromStep('job.4', 'param3') > 0"}}}},
+ {"step": {"id": "job.6", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}},
+ {"step": {"id": "job.7", "type": "NoOp",
+ "params": {"param5": {"expression": "params.get('param0')", "type": "LONG"}}}}
+ ]
+ }
+}
diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-10.json b/maestro-server/src/test/resources/samples/sample-dag-test-10.json
new file mode 100644
index 0000000..f8c53df
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-dag-test-10.json
@@ -0,0 +1,180 @@
+{
+ "properties": {
+ "owner": "tester",
+ "run_strategy": "parallel",
+ "step_concurrency": 20,
+ "alerting": {
+ "emails": [
+ "test+alertconfig_default1@netflix.com",
+ "${foo}+alertconfig_default2@netflix.com"
+ ],
+ "tct": {
+ "completed_by_hour": 1,
+ "tz": "UTC"
+ }
+ }
+ },
+ "workflow": {
+ "id": "sample-dag-test-10",
+ "name": "Test workflow 10",
+ "description": "Long description about this workflow",
+ "timeout": 100,
+ "tags": [
+ {
+ "name": "tester",
+ "namespace": "system"
+ },
+ "test",
+ {
+ "name": "foo"
+ }
+ ],
+ "params": {
+ "foo": {
+ "expression": "'bar1';",
+ "type": "STRING"
+ }
+ },
+ "steps": [
+ {
+ "step": {
+ "id": "job1",
+ "type": "Sleep",
+ "transition": {
+ "successors": {
+ "job.2": "true",
+ "job3": "true"
+ }
+ },
+ "tags": [
+ {
+ "name": "Sleep",
+ "namespace": "system"
+ },
+ "job1"
+ ],
+ "params": {
+ "sleep_seconds": {
+ "value": 1,
+ "type": "LONG"
+ }
+ },
+ "failure_mode": "ignore_failure",
+ "retry_policy": {
+ "error_retry_limit": 10,
+ "platform_retry_limit": 10,
+ "backoff": {
+ "error_retry_backoff_in_secs": 1,
+ "platform_retry_backoff_in_secs": 1,
+ "type": "FIXED_BACKOFF"
+ }
+ },
+ "dependencies": {
+ "SIGNAL": {
+ "definitions": [
+ {
+ "value": {
+ "name": {
+ "value": "signal_a",
+ "type": "STRING"
+ },
+ "_step_dependency_sub_type": {
+ "value": "input_signal",
+ "type": "STRING"
+ },
+ "foo": {
+ "type": "SIGNAL",
+ "parameter": {
+ "value": "bar",
+ "type": "STRING",
+ "mode": "mutable"
+ },
+ "operator": "="
+ }
+ },
+ "type": "MAP"
+ }
+ ],
+ "type": "SIGNAL"
+ }
+ }
+ }
+ },
+ {
+ "step": {
+ "id": "job.2",
+ "type": "Sleep",
+ "transition": {
+ "successors": {
+ "job4": "true"
+ }
+ },
+ "params": {
+ "param1": {
+ "expression": "sleep_seconds + '1';",
+ "type": "STRING"
+ },
+ "param2": {
+ "value": "${job1__sleep_seconds} + 1",
+ "type": "STRING"
+ },
+ "sleep_seconds": {
+ "expression": "job3__sleep_seconds + 1;",
+ "type": "LONG"
+ }
+ },
+ "failure_mode": "fail_after_running",
+ "retry_policy": {
+ "error_retry_limit": 10,
+ "platform_retry_limit": 10,
+ "backoff": {
+ "error_retry_backoff_in_secs": 1,
+ "platform_retry_backoff_in_secs": 1,
+ "type": "FIXED_BACKOFF"
+ }
+ }
+ }
+ },
+ {
+ "step": {
+ "id": "job3",
+ "type": "Sleep",
+ "transition": {
+ "successors": {
+ "job4": "true",
+ "job.2": "true"
+ }
+ },
+ "params": {
+ "sleep_seconds": {
+ "value": 1,
+ "type": "LONG"
+ }
+ },
+ "failure_mode": "fail_immediately",
+ "retry_policy": {
+ "error_retry_limit": 10,
+ "platform_retry_limit": 10,
+ "backoff": {
+ "error_retry_backoff_in_secs": 1,
+ "platform_retry_backoff_in_secs": 1,
+ "type": "FIXED_BACKOFF"
+ }
+ }
+ }
+ },
+ {
+ "step": {
+ "id": "job4",
+ "type": "Sleep",
+ "params": {
+ "sleep_seconds": {
+ "value": 1,
+ "type": "LONG"
+ }
+ }
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-3.json b/maestro-server/src/test/resources/samples/sample-dag-test-3.json
new file mode 100644
index 0000000..9985e91
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-dag-test-3.json
@@ -0,0 +1,84 @@
+{
+ "properties": {
+ "owner": "tester",
+ "run_strategy": "parallel",
+ "step_concurrency": 20,
+ "alerting": {
+ "emails": [
+ "test+alertconfig_default1@netflix.com",
+ "${foo}+alertconfig_default2@netflix.com"
+ ],
+ "tct": {
+ "completed_by_hour": 1,
+ "tz": "UTC"
+ }
+ }
+ },
+ "workflow": {
+ "id": "sample-dag-test-3",
+ "name": "Test workflow 03",
+ "description": "Long description about this workflow",
+ "tags": [
+ {"name": "tester", "namespace": "system"},
+ "test",
+ {"name": "foo"}],
+ "params": {
+ "foo": {"expression": "'bar1';", "type": "STRING"}
+ },
+ "steps": [
+ {"step": {"id": "job1", "type": "Sleep",
+ "transition": {"successors": {"job.2": "true", "job3": "true"}},
+ "tags": [
+ {"name": "Sleep", "namespace": "system"},
+ "job1"
+ ],
+ "params": {"sleep_seconds": {"value": 15, "type": "LONG"}},
+ "failure_mode": "ignore_failure",
+ "retry_policy": {
+ "error_retry_limit": 10,
+ "platform_retry_limit": 10,
+ "backoff": {
+ "error_retry_backoff_in_secs": 1,
+ "platform_retry_backoff_in_secs": 1,
+ "type": "FIXED_BACKOFF"
+ }
+ }
+ }},
+ {"step": {"id": "job.2", "type": "Sleep",
+ "transition": {"successors": {"job4": "true"}},
+ "params": {
+ "param1": {"expression": "sleep_seconds + '1';", "type": "STRING"},
+ "param2": {"value": "${job1__sleep_seconds} + 1", "type": "STRING"},
+ "vtts_ts": {"expression": "Util.dateIntHourToTs(TARGET_RUN_DATE, TARGET_RUN_HOUR, 'UTC', 0, 0)/1000", "type": "LONG"},
+ "sleep_seconds": {"expression": "job3__sleep_seconds + 1;", "type": "LONG"}
+ },
+ "failure_mode": "fail_after_running",
+ "retry_policy": {
+ "error_retry_limit": 10,
+ "platform_retry_limit": 10,
+ "backoff": {
+ "error_retry_backoff_in_secs": 1,
+ "platform_retry_backoff_in_secs": 1,
+ "type": "FIXED_BACKOFF"
+ }
+ }
+ }},
+ {"step": {"id": "job3", "type": "Sleep",
+ "transition": {"successors": {"job4": "true", "job.2": "true"}},
+ "params": {"sleep_seconds": {"value": 15, "type": "LONG"}},
+ "failure_mode": "fail_immediately",
+ "retry_policy": {
+ "error_retry_limit": 10,
+ "platform_retry_limit": 10,
+ "backoff": {
+ "error_retry_backoff_in_secs": 1,
+ "platform_retry_backoff_in_secs": 1,
+ "type": "FIXED_BACKOFF"
+ }
+ }
+ }},
+ {"step": {"id": "job4", "type": "Sleep",
+ "params": {"sleep_seconds": {"value": 15, "type": "LONG"}}}}
+ ]
+ }
+}
\ No newline at end of file
diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-4.json b/maestro-server/src/test/resources/samples/sample-dag-test-4.json
new file mode 100644
index 0000000..027714f
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-dag-test-4.json
@@ -0,0 +1,21 @@
+{
+ "properties": {
+ "owner": "tester",
+ "run_strategy": "strict_sequential",
+ "step_concurrency": 20
+ },
+ "workflow": {
+ "id": "sample-dag-test-4",
+ "name": "Test workflow 04",
+ "description": "Long description about this workflow",
+ "steps": [
+ {"step": {"id": "job.1", "type": "NoOp", "transition": {"successors": {"job.2": "true", "job.3": "true", "job.4": "true"}}}},
+ {"step": {"id": "job.2", "type": "NoOp", "transition": {"successors": {"job.5": "true"}}}},
+ {"step": {"id": "job.3", "type": "NoOp", "transition": {"successors": {"job.6": "true"}}}},
+ {"step": {"id": "job.4", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}},
+ {"step": {"id": "job.5", "type": "NoOp", "transition": {"successors": {"job.6": "true"}}}},
+ {"step": {"id": "job.6", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}},
+ {"step": {"id": "job.7", "type": "NoOp"}}
+ ]
+ }
+}
\ No newline at end of file
diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-5.json b/maestro-server/src/test/resources/samples/sample-dag-test-5.json
new file mode 100644
index 0000000..a0d0a46
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-dag-test-5.json
@@ -0,0 +1,119 @@
+{
+ "properties": {
+ "owner": "tester",
+ "run_strategy": "first_only",
+ "step_concurrency": 20
+ },
+ "workflow": {
+ "id": "sample-dag-test-5",
+ "name": "Test workflow 05",
+ "description": "Long description about this workflow",
+ "params": {
+ "WF_RESTART_MODE": {
+ "expression": "WORKFLOW_RUN_POLICY = params.getFromInstance('RUN_POLICY'); if (WORKFLOW_RUN_POLICY == \"START_FRESH_NEW_RUN\") return \"run\"; else if (WORKFLOW_RUN_POLICY == \"RESTART_FROM_INCOMPLETE\") return \"resume\"; else if (WORKFLOW_RUN_POLICY == \"RESTART_FROM_START\") return \"run\"; else throw \"Unsupported restart mode: \" + WORKFLOW_RUN_POLICY;",
+ "type": "String",
+ "mode": "MUTABLE"
+ }
+ },
+ "steps": [
+ {
+ "step": {
+ "id": "job.1",
+ "type": "NoOp",
+ "dependencies": {
+ "SIGNAL": {
+ "definitions": [
+ {
+ "value": {
+ "name": {
+ "value": "prod_db/foo/bar",
+ "type": "STRING"
+ },
+ "vtts_utc_dateint": {
+ "operator": "=",
+ "parameter": {
+ "type": "STRING",
+ "value": "${DERIVED_RUN_DATE_UTC}"
+ },
+ "type": "SIGNAL"
+ }
+ },
+ "type": "MAP"
+ }
+ ],
+ "type": "SIGNAL"
+ }
+ },
+ "transition": {
+ "successors": {
+ "job.2": "true",
+ "job.3": "true"
+ }
+ }
+ }
+ },
+ {
+ "step": {
+ "id": "job.2",
+ "type": "NoOp",
+ "transition": {
+ "successors": {
+ "job.4": "true",
+ "job.5": "true"
+ }
+ }
+ }
+ },
+ {
+ "step": {
+ "id": "job.3",
+ "type": "NoOp",
+ "transition": {
+ "successors": {
+ "job.7": "true"
+ }
+ }
+ }
+ },
+ {
+ "step": {
+ "id": "job.4",
+ "type": "NoOp",
+ "transition": {
+ "successors": {
+ "job.6": "true"
+ }
+ }
+ }
+ },
+ {
+ "step": {
+ "id": "job.5",
+ "type": "NoOp",
+ "transition": {
+ "successors": {
+ "job.7": "true"
+ }
+ }
+ }
+ },
+ {
+ "step": {
+ "id": "job.6",
+ "type": "NoOp",
+ "transition": {
+ "successors": {
+ "job.7": "true"
+ }
+ }
+ }
+ },
+ {
+ "step": {
+ "id": "job.7",
+ "type": "NoOp"
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-6.json b/maestro-server/src/test/resources/samples/sample-dag-test-6.json
new file mode 100644
index 0000000..d53f9b0
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-dag-test-6.json
@@ -0,0 +1,22 @@
+{
+ "properties": {
+ "owner": "tester",
+ "run_strategy": "last_only",
+ "step_concurrency": 20
+ },
+ "workflow": {
+ "id": "sample-dag-test-6",
+ "name": "Test workflow 06",
+ "description": "Long description about this workflow",
+ "steps": [
+ {"step": {"id": "job.1", "type": "NoOp", "transition": {"successors": {"job.2": "true", "job.3": "true", "job.4": "true", "job.5": "true"}}}},
+ {"step": {"id": "job.2", "type": "NoOp", "transition": {"successors": {"job.6": "true"}}}},
+ {"step": {"id": "job.3", "type": "NoOp", "transition": {"successors": {"job.6": "true"}}}},
+ {"step": {"id": "job.4", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}},
+ {"step": {"id": "job.5", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}},
+ {"step": {"id": "job.6", "type": "NoOp", "transition": {"successors": {"job.8": "true"}}}},
+ {"step": {"id": "job.7", "type": "NoOp", "transition": {"successors": {"job.8": "true"}}}},
+ {"step": {"id": "job.8", "type": "NoOp"}}
+ ]
+ }
+}
\ No newline at end of file
diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-7.json b/maestro-server/src/test/resources/samples/sample-dag-test-7.json
new file mode 100644
index 0000000..87ad5f7
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-dag-test-7.json
@@ -0,0 +1,29 @@
+{
+ "properties": {
+ "owner": "tester",
+ "run_strategy": {
+ "rule": "strict_sequential"
+ },
+ "step_concurrency": 20
+ },
+ "workflow": {
+ "id": "sample-dag-test-7",
+ "name": "Test workflow 07",
+ "description": "Long description about this workflow",
+ "steps": [
+ {"step": {"id": "job.1", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "Long"}},
+ "transition": {"successors": {"job.2": "true", "job.3": "true"}}}},
+ {"step": {"id": "job.2", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "long"}},
+ "transition": {"successors": {"job.4": "true"}}}},
+ {"step": {"id": "job.3", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "LONG"}},
+ "transition": {"successors": {"job.5": "true"}}}},
+ {"step": {"id": "job.4", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "lonG"}},
+ "transition": {"successors": {"job.6": "true"}}}},
+ {"step": {"id": "job.5", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "loNg"}},
+ "transition": {"successors": {"job.6": "true"}}}},
+ {"step": {"id": "job.6", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "lOng"}}}},
+ {"step": {"id": "job.7", "type": "NoOp", "transition": {"successors": {"job.8": "true"}}}},
+ {"step": {"id": "job.8", "type": "NoOp"}}
+ ]
+ }
+}
diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-8.json b/maestro-server/src/test/resources/samples/sample-dag-test-8.json
new file mode 100644
index 0000000..d110c0e
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-dag-test-8.json
@@ -0,0 +1,29 @@
+{
+ "properties": {
+ "owner": "tester",
+ "run_strategy": {
+ "rule": "parallel",
+ "workflow_concurrency": 5
+ },
+ "step_concurrency": 20
+ },
+ "workflow": {
+ "id": "sample-dag-test-8",
+ "name": "Test workflow 08",
+ "description": "Long description about this workflow",
+ "params": {"x": {"value": 5, "type": "LONG"},
+ "base_workflow_timeout": {"value": "20 minutes", "type": "STRING"}},
+ "steps": [
+ {"step": {"id": "job.1", "type": "NoOp",
+ "timeout": "${base_workflow_timeout}",
+ "transition": {"successors": {"job.3": "x > 10", "job.4": "x > 10"}}}},
+ {"step": {"id": "job.2", "type": "NoOp",
+ "transition": {"successors": {"job.4": "x > 3"}}}},
+ {"step": {"id": "job.3", "type": "NoOp",
+ "transition": {"successors": {"job.4": "x > 10"}}}},
+ {"step": {"id": "job.4", "type": "NoOp",
+ "transition": {"successors": {"job.5": "true"}}}},
+ {"step": {"id": "job.5", "type": "NoOp", "transition": {}}}
+ ]
+ }
+}
\ No newline at end of file
diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-9.json b/maestro-server/src/test/resources/samples/sample-dag-test-9.json
new file mode 100644
index 0000000..17e632f
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-dag-test-9.json
@@ -0,0 +1,29 @@
+{
+ "properties": {
+ "owner": "tester",
+ "run_strategy": {
+ "rule": "last_only",
+ "workflow_concurrency": 1
+ },
+ "step_concurrency": 20
+ },
+ "git_info": {"branch": "foo"},
+ "workflow": {
+ "id": "sample-dag-test-9",
+ "name": "Test workflow 09",
+ "description": "Long description about this workflow",
+ "steps": [
+ {"step": {"id": "job.1", "name": "a","description":"b", "type": "NoOp",
+ "transition": {"successors": {"job.2": "true", "job.3": "true"}}}},
+ {"step": {"id": "job.2", "type": "NoOp",
+ "transition": {"successors": {"job.4": "true", "job.5": "true", "job.6": "true"}}}},
+ {"step": {"id": "job.3", "type": "NoOp", "transition": {"successors": {"job.8": "true"}}}},
+ {"step": {"id": "job.4", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}},
+ {"step": {"id": "job.5", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}},
+ {"step": {"id": "job.6", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}},
+ {"step": {"id": "job.7", "type": "NoOp", "transition": {"successors": {"job.9": "true"}}}},
+ {"step": {"id": "job.8", "type": "NoOp", "transition": {"successors": {"job.9": "true"}}}},
+ {"step": {"id": "job.9", "type": "NoOp"}}
+ ]
+ }
+}
\ No newline at end of file
diff --git a/maestro-server/src/test/resources/samples/sample-large-foreach-wf.json b/maestro-server/src/test/resources/samples/sample-large-foreach-wf.json
new file mode 100644
index 0000000..2a730b9
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-large-foreach-wf.json
@@ -0,0 +1,61 @@
+{
+ "properties": {
+ "owner": "tester"
+ },
+ "is_active": true,
+ "workflow": {
+ "id": "sample-large-foreach-wf",
+ "steps": [{
+ "foreach": {
+ "id": "job1",
+ "concurrency": 100,
+ "params": {
+ "param1": {
+ "value": ["a", "b", "c"],
+ "type": "String_array"
+ },
+ "loop_params": {
+ "value": {
+ "i" : {
+ "expression": "Util.dateIntsBetween(20200101, 20210101, 1);",
+ "type": "long_array",
+ "validator": "param!=null && param.size() > 2", "mode": "mutable"
+ },
+ "j" : {
+ "expression": "param1;", "type": "STRING_ARRAY", "validator": "param!=null && param.size() > 2"
+ }
+ },
+ "type": "MAP"
+ }
+ },
+ "steps": [
+ {
+ "step": {
+ "id": "job2",
+ "type": "NoOp",
+ "tags": [
+ "NoOp"
+ ],
+ "params": {
+ "param2": {
+ "expression": "i + j;", "type": "STRING", "mode": "mutable"
+ }
+ },
+ "transition": {"successors": {"job.2": "true"}},
+ "retry_policy": {}
+ }
+ },
+ {
+ "step": {
+ "id": "job.2",
+ "type": "Sleep",
+ "params": {
+ "sleep_seconds": {"expression": "1 * 1;", "type": "LONG"}
+ }
+ }
+ }
+ ]
+ }
+ }]
+ }
+}
diff --git a/maestro-server/src/test/resources/samples/sample-mixed-wf.json b/maestro-server/src/test/resources/samples/sample-mixed-wf.json
new file mode 100644
index 0000000..5820eac
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-mixed-wf.json
@@ -0,0 +1,67 @@
+{
+ "properties": {
+ "owner": "tester"
+ },
+ "is_active": true,
+ "workflow": {
+ "id": "sample-mixed-wf",
+ "steps": [{
+ "subworkflow": {
+ "id": "sub-step1",
+ "sync": true,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-dag-test-3",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param1": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }},
+ {
+ "subworkflow": {
+ "id": "sub-step2",
+ "sync": true,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-nested-foreach-subworkflow-wf",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param2": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }},
+ {
+ "subworkflow": {
+ "id": "sub-step3",
+ "sync": true,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-nested-subworkflow-foreach-wf",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param3": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }}
+ ]
+ }
+}
diff --git a/maestro-server/src/test/resources/samples/sample-nested-foreach-subworkflow-wf.json b/maestro-server/src/test/resources/samples/sample-nested-foreach-subworkflow-wf.json
new file mode 100644
index 0000000..ca110cf
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-nested-foreach-subworkflow-wf.json
@@ -0,0 +1,72 @@
+{
+ "properties": {
+ "owner": "tester",
+ "run_strategy": "parallel"
+ },
+ "is_active": true,
+ "workflow": {
+ "id": "sample-nested-foreach-subworkflow-wf",
+ "steps": [{
+ "foreach": {
+ "id": "foreach-step1",
+ "concurrency": 2,
+ "params": {
+ "param1": {
+ "value": ["a", "b"],
+ "type": "String_array"
+ },
+ "loop_params": {
+ "value": {
+ "i" : {
+ "value": [1, 2, 3],
+ "type": "long_array",
+ "validator": "param!=null && param.size() > 2", "mode": "mutable"
+ },
+ "j" : {
+ "expression": "param1;", "type": "STRING_ARRAY", "validator": "param!=null && param.size() > 2"
+ }
+ },
+ "type": "MAP"
+ }
+ },
+ "steps": [
+ {
+ "step": {
+ "id": "job1",
+ "type": "NoOp",
+ "tags": [
+ "NoOp"
+ ],
+ "params": {
+ "param2": {
+ "expression": "i + j;", "type": "STRING", "mode": "mutable"
+ }
+ },
+ "transition": {"successors": {"sub-step2": "true"}},
+ "retry_policy": {}
+ }
+ },
+ {
+ "subworkflow": {
+ "id": "sub-step2",
+ "sync": true,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-dag-test-3",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param1": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }}
+ ]
+ }
+ }]
+ }
+}
diff --git a/maestro-server/src/test/resources/samples/sample-nested-foreach-wf.json b/maestro-server/src/test/resources/samples/sample-nested-foreach-wf.json
new file mode 100644
index 0000000..de78751
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-nested-foreach-wf.json
@@ -0,0 +1,77 @@
+{
+ "properties": {
+ "owner": "tester"
+ },
+ "is_active": true,
+ "workflow": {
+ "id": "sample-nested-foreach-wf",
+ "steps": [{
+ "foreach": {
+ "id": "foreach-job",
+ "concurrency": 2,
+ "params": {
+ "param1": {
+ "value": ["a", "b"],
+ "type": "String_array"
+ },
+ "loop_params": {
+ "value": {
+ "i" : {
+ "value": [1, 2, 3],
+ "type": "long_array",
+ "validator": "param!=null && param.size() > 2", "mode": "mutable"
+ },
+ "j" : {
+ "expression": "param1;", "type": "STRING_ARRAY", "validator": "param!=null && param.size() > 2"
+ }
+ },
+ "type": "MAP"
+ }
+ },
+ "steps": [
+ {
+ "foreach": {
+ "id": "job1",
+ "concurrency": 3,
+ "params": {
+ "param2": {
+ "expression": "i + 2;", "type": "LONG", "mode": "mutable"
+ },
+ "loop_params": {
+ "value": {
+ "x" : {
+ "expression": "Util.intsBetween(1, i, 1);",
+ "type": "long_array"
+ }
+ },
+ "type": "MAP"
+ }
+ },
+ "steps": [
+ {
+ "step": {
+ "id": "job.1",
+ "type": "Sleep",
+ "params": {
+ "sleep_seconds": {"expression": "x * 5;", "type": "LONG"}
+ }
+ }
+ }
+ ],
+ "transition": {"successors": {"job.2": "true"}}
+ }
+ },
+ {
+ "step": {
+ "id": "job.2",
+ "type": "Sleep",
+ "params": {
+ "sleep_seconds": {"expression": "i * 10;", "type": "LONG"}
+ }
+ }
+ }
+ ]
+ }
+ }]
+ }
+}
diff --git a/maestro-server/src/test/resources/samples/sample-nested-subworkflow-foreach-wf.json b/maestro-server/src/test/resources/samples/sample-nested-subworkflow-foreach-wf.json
new file mode 100644
index 0000000..0128f0d
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-nested-subworkflow-foreach-wf.json
@@ -0,0 +1,67 @@
+{
+ "properties": {
+ "owner": "tester"
+ },
+ "is_active": true,
+ "workflow": {
+ "id": "sample-nested-subworkflow-foreach-wf",
+ "steps": [{
+ "subworkflow": {
+ "id": "sub-step1",
+ "sync": true,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-foreach-wf",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param3": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }},
+ {
+ "subworkflow": {
+ "id": "sub-step2",
+ "sync": true,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-foreach-wf",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param2": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }},
+ {
+ "subworkflow": {
+ "id": "sub-step3",
+ "sync": true,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-foreach-wf",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param3": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }}
+ ]
+ }
+}
diff --git a/maestro-server/src/test/resources/samples/sample-nested-subworkflow-wf.json b/maestro-server/src/test/resources/samples/sample-nested-subworkflow-wf.json
new file mode 100644
index 0000000..d4944e9
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-nested-subworkflow-wf.json
@@ -0,0 +1,28 @@
+{
+ "properties": {
+ "owner": "tester"
+ },
+ "is_active": true,
+ "workflow": {
+ "id": "sample-nested-subworkflow-wf",
+ "steps": [{
+ "subworkflow": {
+ "id": "sub-step1",
+ "sync": true,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-subworkflow-wf",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param1": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }}]
+ }
+}
diff --git a/maestro-server/src/test/resources/samples/sample-subworkflow-wf.json b/maestro-server/src/test/resources/samples/sample-subworkflow-wf.json
new file mode 100644
index 0000000..784afbf
--- /dev/null
+++ b/maestro-server/src/test/resources/samples/sample-subworkflow-wf.json
@@ -0,0 +1,124 @@
+{
+ "properties": {
+ "owner": "tester"
+ },
+ "is_active": true,
+ "workflow": {
+ "id": "sample-subworkflow-wf",
+ "instance_step_concurrency": 3,
+ "steps": [{
+ "subworkflow": {
+ "id": "job1",
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-dag-test-3",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param1": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }},
+ {
+ "subworkflow": {
+ "id": "job2",
+ "sync": true,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-dag-test-3",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param1": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }},
+ {
+ "subworkflow": {
+ "id": "job3",
+ "sync": false,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-dag-test-3",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param1": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }},
+ {
+ "subworkflow": {
+ "id": "job4",
+ "sync": true,
+ "explicit_params": false,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-dag-test-3",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param1": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }},
+ {
+ "subworkflow": {
+ "id": "job5",
+ "sync": true,
+ "explicit_params": true,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-dag-test-3",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param1": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }},{
+ "subworkflow": {
+ "id": "job6",
+ "sync": true,
+ "params": {
+ "subworkflow_id": {
+ "value": "sample-dag-test-3",
+ "type": "STRING"
+ },
+ "subworkflow_version": {
+ "value": "latest",
+ "type": "STRING"
+ },
+ "param1": {
+ "value": {"foo": "bar"},
+ "type": "STRING_MAP"
+ }
+ }
+ }}]
+ }
+}