diff --git a/README.md b/README.md index 155bbb5..f8ccf9a 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ provides a fully managed workflow-as-a-service (WAAS) to the data platform users It serves thousands of users, including data scientists, data engineers, machine learning engineers, software engineers, content producers, and business analysts, for various use cases. It schedules hundreds of thousands of workflows, millions of jobs every day -and operate with a strict SLO even when there are spikes in the traffic. +and operates with a strict SLO even when there are spikes in the traffic. Maestro is highly scalable and extensible to support existing and new use cases and offers enhanced usability to end users. You can read more details about it in our latest [blog post](https://netflixtechblog.com/maestro-netflixs-workflow-orchestrator-ee13a06f9c78). diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java b/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java index 939394a..c443ece 100644 --- a/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java +++ b/maestro-common/src/main/java/com/netflix/maestro/models/Constants.java @@ -148,7 +148,7 @@ private Constants() {} public static final int MAX_PLATFORM_RETRY_LIMIT_SECS = 24 * 3600; // 1 day /** maximum retry wait limit for timeout errors. */ - public static final int MAX_TIMEOUT_RETRY_LIMIT_SECS = 24 * 3600; // 1 days + public static final int MAX_TIMEOUT_RETRY_LIMIT_SECS = 24 * 3600; // 1 day /** Max timeout limit in milliseconds. */ public static final long MAX_TIME_OUT_LIMIT_IN_MILLIS = TimeUnit.DAYS.toMillis(120); // 120 days @@ -316,7 +316,7 @@ public static WorkflowVersion of(String version) { /** Workflow create request data size limit used for validation. */ public static final String WORKFLOW_CREATE_REQUEST_DATA_SIZE_LIMIT = "256KB"; - /** params' total size (in JSON format) limit for a workflow instance or a step instance. */ + /** param's total size (in JSON format) limit for a workflow instance or a step instance. */ public static final int JSONIFIED_PARAMS_STRING_SIZE_LIMIT = 750000; /** Defines limit for the query for step attempt state view. */ diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/definition/Properties.java b/maestro-common/src/main/java/com/netflix/maestro/models/definition/Properties.java index 3fcb5ff..8a38167 100644 --- a/maestro-common/src/main/java/com/netflix/maestro/models/definition/Properties.java +++ b/maestro-common/src/main/java/com/netflix/maestro/models/definition/Properties.java @@ -31,7 +31,7 @@ * *

If unset (null value), means there is no change for this field. * - *

Properties changes are kept separately and can evolve independently from the workflow version + *

Properties changes are kept separately and can evolve independently of the workflow version * changes. */ @JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class) diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/definition/Step.java b/maestro-common/src/main/java/com/netflix/maestro/models/definition/Step.java index 6c80d19..853b358 100644 --- a/maestro-common/src/main/java/com/netflix/maestro/models/definition/Step.java +++ b/maestro-common/src/main/java/com/netflix/maestro/models/definition/Step.java @@ -50,7 +50,7 @@ public interface Step { /** Get step type. */ StepType getType(); - /** Get step sub type. */ + /** Get step subtype. */ default String getSubType() { return null; } diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/definition/StepDependenciesDefinition.java b/maestro-common/src/main/java/com/netflix/maestro/models/definition/StepDependenciesDefinition.java index 71093da..c3e5f0f 100644 --- a/maestro-common/src/main/java/com/netflix/maestro/models/definition/StepDependenciesDefinition.java +++ b/maestro-common/src/main/java/com/netflix/maestro/models/definition/StepDependenciesDefinition.java @@ -34,7 +34,7 @@ public class StepDependenciesDefinition { /** param name for step dependency name. */ public static final String STEP_DEPENDENCY_NAME = "name"; - /** param name for step dependency sub type, like input_table, input_s3. */ + /** param name for step dependency subtype, like input_table, input_s3. */ public static final String STEP_DEPENDENCY_SUB_TYPE = "_step_dependency_sub_type"; private final List definitions; diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/instance/OutputSignalInstance.java b/maestro-common/src/main/java/com/netflix/maestro/models/instance/OutputSignalInstance.java index 581060d..6447dd7 100644 --- a/maestro-common/src/main/java/com/netflix/maestro/models/instance/OutputSignalInstance.java +++ b/maestro-common/src/main/java/com/netflix/maestro/models/instance/OutputSignalInstance.java @@ -29,6 +29,6 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public class OutputSignalInstance { private String outputSignalInstanceId; - // announced time will be null if its a duplicate signal + // announced time will be null if it is a duplicate signal private Long announcedTime; } diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/instance/StepInstance.java b/maestro-common/src/main/java/com/netflix/maestro/models/instance/StepInstance.java index 864908b..f96309a 100644 --- a/maestro-common/src/main/java/com/netflix/maestro/models/instance/StepInstance.java +++ b/maestro-common/src/main/java/com/netflix/maestro/models/instance/StepInstance.java @@ -275,9 +275,9 @@ public enum Status { /** Step is disabled at workflow instance start time, terminal state. */ DISABLED(true, true, false, false), /** - * Step should not run and user logic does not run. Maestro runs over this step when its if - * condition is false or the workflow is already failed when failure mode is FAIL_AFTER_RUNNING. - * Users can discard steps with this status. terminal state. + * Step should not run and user logic does not run. Maestro runs over this step when condition + * is false or the workflow is already failed when failure mode is FAIL_AFTER_RUNNING. Users can + * discard steps with this status. terminal state. */ UNSATISFIED(true, true, false, false), /** Step is skipped by users at runtime, terminal state. */ diff --git a/maestro-common/src/main/java/com/netflix/maestro/models/timeline/TimelineLogEvent.java b/maestro-common/src/main/java/com/netflix/maestro/models/timeline/TimelineLogEvent.java index 47ad270..add893a 100644 --- a/maestro-common/src/main/java/com/netflix/maestro/models/timeline/TimelineLogEvent.java +++ b/maestro-common/src/main/java/com/netflix/maestro/models/timeline/TimelineLogEvent.java @@ -100,7 +100,7 @@ public static TimelineLogEvent debug(String template, Object... args) { return TimelineLogEvent.builder().level(Level.DEBUG).message(template, args).build(); } - /** static method to generate a info level {@link TimelineLogEvent}. */ + /** static method to generate an info level {@link TimelineLogEvent}. */ @JsonIgnore public static TimelineLogEvent info(String template, Object... args) { return TimelineLogEvent.builder().level(Level.INFO).message(template, args).build(); @@ -112,7 +112,7 @@ public static TimelineLogEvent warn(String template, Object... args) { return TimelineLogEvent.builder().level(Level.WARN).message(template, args).build(); } - /** static method to generate a error level {@link TimelineLogEvent}. */ + /** static method to generate an error level {@link TimelineLogEvent}. */ @JsonIgnore public static TimelineLogEvent error(String template, Object... args) { return TimelineLogEvent.builder().level(Level.ERROR).message(template, args).build(); diff --git a/maestro-common/src/main/java/com/netflix/maestro/utils/IdHelper.java b/maestro-common/src/main/java/com/netflix/maestro/utils/IdHelper.java index 05de9a7..6c9907c 100644 --- a/maestro-common/src/main/java/com/netflix/maestro/utils/IdHelper.java +++ b/maestro-common/src/main/java/com/netflix/maestro/utils/IdHelper.java @@ -100,7 +100,7 @@ public static boolean isInlineWorkflowId(String workflowId) { * 11) vs 9 (base62: 9) -> 211 vs 19. * * @param value value to encode - * @param isOrdered should the output encoded string perserve the ordering. True for rangeKey case + * @param isOrdered should the output encoded string preserve the ordering. True for rangeKey case * and false for hashKey. * @return encoded base62 string */ diff --git a/maestro-engine/src/main/java/com/netflix/maestro/engine/concurrency/TagPermitManager.java b/maestro-engine/src/main/java/com/netflix/maestro/engine/concurrency/TagPermitManager.java index e8dba38..c7e68e7 100644 --- a/maestro-engine/src/main/java/com/netflix/maestro/engine/concurrency/TagPermitManager.java +++ b/maestro-engine/src/main/java/com/netflix/maestro/engine/concurrency/TagPermitManager.java @@ -46,7 +46,7 @@ public Status(boolean success, String message) { } } - /** acquire permits for every tag in tagList for a given a uuid (e.g. step uuid). */ + /** acquire permits for every tag in tagList for a given an uuid (e.g. step uuid). */ Status acquire(List tagsList, String uuid); /** diff --git a/maestro-engine/src/main/java/com/netflix/maestro/engine/steps/StepRuntime.java b/maestro-engine/src/main/java/com/netflix/maestro/engine/steps/StepRuntime.java index 78cd820..5d554a7 100644 --- a/maestro-engine/src/main/java/com/netflix/maestro/engine/steps/StepRuntime.java +++ b/maestro-engine/src/main/java/com/netflix/maestro/engine/steps/StepRuntime.java @@ -118,7 +118,7 @@ default Result start( * Customized step execution logic. * *

While the step status is RUNNING, the code in execute() will be called periodically with a - * preset polling interval. Additionally, if the execution throws an exception, the execute will + * preset polling interval. Additionally, if the execution throws an exception, the execution will * be retried as another step instance run. * *

The input data are a copy of the original summary data. Any changes on them will be diff --git a/maestro-flow/build.gradle b/maestro-flow/build.gradle index 5124afd..423a520 100644 --- a/maestro-flow/build.gradle +++ b/maestro-flow/build.gradle @@ -4,4 +4,10 @@ dependencies { implementation jacksonDatabindDep implementation jacksonAnnotationsDep api slf4jApiDep + + testImplementation junitDep + testImplementation mockitoCoreDep + testImplementation testcontainerDep + testImplementation postgresqlDep + testImplementation(testFixtures(project(':maestro-common'))) } diff --git a/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/BaseActor.java b/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/BaseActor.java index 73123f9..76e8054 100644 --- a/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/BaseActor.java +++ b/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/BaseActor.java @@ -1,6 +1,7 @@ package com.netflix.maestro.flow.actor; import com.netflix.maestro.annotations.Nullable; +import com.netflix.maestro.annotations.VisibleForTesting; import com.netflix.maestro.flow.engine.ExecutionContext; import com.netflix.maestro.metrics.MaestroMetrics; import java.util.HashMap; @@ -9,7 +10,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; -import java.util.stream.Stream; import lombok.Getter; import org.slf4j.Logger; @@ -120,21 +120,22 @@ boolean noChildActorsRunning() { } void startShutdown(Action action) { - if (childActors.isEmpty()) { - checkShutdown(); - } else { + if (!checkShutdown()) { cancelPendingActions(); wakeUpChildActors(action); } } - void checkShutdown() { + // return true if shutdown is finished, otherwise false + boolean checkShutdown() { if (noChildActorsRunning()) { terminateNow(); if (parent != null) { parent.post(Action.FLOW_DOWN); } + return true; } + return false; } void terminateNow() { @@ -142,27 +143,10 @@ void terminateNow() { running = false; } - void cancelPendingActions() { + private void cancelPendingActions() { scheduledActions.values().forEach(f -> f.cancel(true)); } - Stream dequeRetryActions() { - return scheduledActions.entrySet().stream() - .filter( - e -> - e.getKey() instanceof Action.FlowTaskRetry - && !e.getValue().isDone() - && e.getValue().cancel(false)) - .map(e -> ((Action.FlowTaskRetry) e.getKey()).taskRefName()); - } - - boolean dequeRetryAction(String taskRef) { - var action = new Action.FlowTaskRetry(taskRef); - return scheduledActions.containsKey(action) - && !scheduledActions.get(action).isDone() - && scheduledActions.get(action).cancel(false); - } - void schedule(Action action, long delayInMillis) { if (!isRunning() || (scheduledActions.containsKey(action) && !scheduledActions.get(action).isDone())) { @@ -230,4 +214,14 @@ private Action dequeueAction() { return null; } } + + @VisibleForTesting + Map> getScheduledActions() { + return scheduledActions; + } + + @VisibleForTesting + BlockingQueue getActions() { + return actions; + } } diff --git a/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/FlowActor.java b/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/FlowActor.java index 960c543..982ab03 100644 --- a/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/FlowActor.java +++ b/maestro-flow/src/main/java/com/netflix/maestro/flow/actor/FlowActor.java @@ -14,6 +14,7 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; @@ -76,8 +77,9 @@ void runForAction(Action action) { @Override void afterRunning() { + getMetrics() + .counter("num_of_finished_flows", getClass(), "finalized", String.valueOf(finalized)); if (finalized) { - getMetrics().counter("num_of_finished_flows", getClass()); getContext().deleteFlow(flow); LOG.info("Flow for {} is deleted as it finishes.", reference()); } @@ -338,4 +340,22 @@ private void runTask(Task task, Action initAction) { var actor = new TaskActor(cloned, flow, this, getContext()); runActionFor(actor, initAction); } + + private Stream dequeRetryActions() { + return getScheduledActions().entrySet().stream() + .filter( + e -> + e.getKey() instanceof Action.FlowTaskRetry + && !e.getValue().isDone() + && e.getValue().cancel(false)) + .map(e -> ((Action.FlowTaskRetry) e.getKey()).taskRefName()); + } + + private boolean dequeRetryAction(String taskRef) { + var scheduledActions = getScheduledActions(); + var action = new Action.FlowTaskRetry(taskRef); + return scheduledActions.containsKey(action) + && !scheduledActions.get(action).isDone() + && scheduledActions.get(action).cancel(false); + } } diff --git a/maestro-flow/src/main/java/com/netflix/maestro/flow/dao/MaestroFlowDao.java b/maestro-flow/src/main/java/com/netflix/maestro/flow/dao/MaestroFlowDao.java index 67a5746..079f94a 100644 --- a/maestro-flow/src/main/java/com/netflix/maestro/flow/dao/MaestroFlowDao.java +++ b/maestro-flow/src/main/java/com/netflix/maestro/flow/dao/MaestroFlowDao.java @@ -1,11 +1,12 @@ package com.netflix.maestro.flow.dao; import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.maestro.annotations.VisibleForTesting; import com.netflix.maestro.database.AbstractDatabaseDao; +import com.netflix.maestro.database.DatabaseConfiguration; import com.netflix.maestro.exceptions.MaestroRetryableError; import com.netflix.maestro.flow.models.Flow; import com.netflix.maestro.flow.models.FlowGroup; -import com.netflix.maestro.flow.properties.FlowEngineProperties; import com.netflix.maestro.metrics.MaestroMetrics; import com.netflix.maestro.utils.Checks; import java.sql.ResultSet; @@ -40,13 +41,15 @@ public class MaestroFlowDao extends AbstractDatabaseDao { "INSERT INTO maestro_flow_group (group_id,generation,address) VALUES (?,?,?) ON CONFLICT DO NOTHING"; private static final String GET_FLOW_WITH_SAME_KEYS_QUERY = "SELECT 1 FROM maestro_flow WHERE group_id=? AND flow_id=? LIMIT 1"; + private static final String REMOVE_GROUP_QUERY = + "DELETE FROM maestro_flow_group WHERE group_id=?"; public MaestroFlowDao( DataSource dataSource, ObjectMapper objectMapper, - FlowEngineProperties properties, + DatabaseConfiguration config, MaestroMetrics metrics) { - super(dataSource, objectMapper, properties, metrics); + super(dataSource, objectMapper, config, metrics); } /** @@ -239,4 +242,13 @@ public boolean existFlowWithSameKeys(long groupId, String flowId) { groupId, flowId); } + + @VisibleForTesting + void deleteGroup(long groupId) { + withMetricLogError( + () -> withRetryableUpdate(REMOVE_GROUP_QUERY, stmt -> stmt.setLong(1, groupId)), + "deleteGroup", + "Failed to delete the group for the groupId [{}]", + groupId); + } } diff --git a/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/ExecutionContext.java b/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/ExecutionContext.java index f1ea28a..b0ab881 100644 --- a/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/ExecutionContext.java +++ b/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/ExecutionContext.java @@ -101,11 +101,14 @@ public void prepare(Flow flow) { prepare.setStartTime(System.currentTimeMillis()); flowTaskMap.get(prepare.getTaskType()).execute(flow, prepare); if (!prepare.getStatus().isTerminal()) { + LOG.info("prepare task for flow [{}] is not done yet, will retry", flow.getReference()); throw new MaestroRetryableError("prepare task is not done yet, will retry"); } else { flow.setReasonForIncompletion(prepare.getReasonForIncompletion()); flow.markUpdate(); } + } catch (MaestroRetryableError mre) { + throw mre; } catch (RuntimeException e) { LOG.warn("prepare task in flow {} throws an error, will retry it", flow.getReference(), e); throw new MaestroRetryableError(e, "retry prepare task due to an exception"); @@ -213,7 +216,7 @@ public void resumeFlow(Flow flow) { } catch (MaestroNotFoundException nfe) { LOG.info("cannot find the reference flow: {}. Ignore it.", flow.getReference(), nfe); } catch (RuntimeException e) { - LOG.warn("got an exceptino for resuming flow for {} and will retry", flow.getReference(), e); + LOG.warn("got an exception for resuming flow for {} and will retry", flow.getReference(), e); throw new MaestroRetryableError(e, "retry resuming flow due to an exception"); } } diff --git a/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/FlowExecutor.java b/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/FlowExecutor.java index 42fea2c..bae0dcf 100644 --- a/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/FlowExecutor.java +++ b/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/FlowExecutor.java @@ -163,10 +163,12 @@ private Actor getOrCreateNewGroup(long groupId) { } /** Wake up a flow or a task. */ - public void wakeUp(Long groupId, String flowReference, String taskReference) { + public boolean wakeUp(Long groupId, String flowReference, String taskReference) { Actor groupActor = groupActors.get(groupId); if (groupActor != null && groupActor.isRunning()) { groupActor.post(new Action.FlowWakeUp(flowReference, taskReference)); + return true; } + return false; } } diff --git a/maestro-flow/src/main/java/com/netflix/maestro/flow/models/Task.java b/maestro-flow/src/main/java/com/netflix/maestro/flow/models/Task.java index 587524e..f72e3fa 100644 --- a/maestro-flow/src/main/java/com/netflix/maestro/flow/models/Task.java +++ b/maestro-flow/src/main/java/com/netflix/maestro/flow/models/Task.java @@ -15,7 +15,7 @@ * task actor can switch to execute. So those inactive tasks are not real maestro tasks. This is * required to avoid that the child actor runs the business logic but the parent flow is unaware and * decide to finish. Also, the active flag is a local state and not thread safe and can only be - * accessed within the actor (e.g. flow owns a list of copied tasks, and it can mutate active flag + * accessed within the actor, e.g. flow owns a list of copied tasks, and it can mutate active flag * for its own snapshots. * *

Basic rule: flow actor can only activate a task actor. A task actor can only deactivate itself diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/FlowBaseTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/FlowBaseTest.java new file mode 100644 index 0000000..4962f74 --- /dev/null +++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/FlowBaseTest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.maestro.flow; + +import com.netflix.maestro.flow.models.Flow; +import com.netflix.maestro.flow.models.FlowDef; +import java.util.Map; +import org.junit.After; +import org.junit.Before; +import org.mockito.MockitoAnnotations; + +public abstract class FlowBaseTest { + + private AutoCloseable closeable; + + @Before + public void openMocks() { + closeable = MockitoAnnotations.openMocks(this); + } + + @After + public void releaseMocks() throws Exception { + closeable.close(); + } + + protected Flow createFlow() { + Flow flow = + new Flow(10, "test-flow-id", 1, System.currentTimeMillis() + 3600000, "test-flow-ref"); + flow.setInput(Map.of()); + flow.setFlowDef(new FlowDef()); + flow.setStatus(Flow.Status.RUNNING); + flow.setUpdateTime(flow.getStartTime()); + return flow; + } +} diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorBaseTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorBaseTest.java new file mode 100644 index 0000000..1575351 --- /dev/null +++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorBaseTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.maestro.flow.actor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import com.netflix.maestro.flow.FlowBaseTest; +import com.netflix.maestro.flow.engine.ExecutionContext; +import com.netflix.maestro.flow.models.FlowGroup; +import com.netflix.maestro.flow.properties.FlowEngineProperties; +import com.netflix.maestro.metrics.MaestroMetrics; +import org.junit.Before; +import org.mockito.Mock; + +public abstract class ActorBaseTest extends FlowBaseTest { + @Mock protected ExecutionContext context; + @Mock protected FlowEngineProperties properties; + @Mock protected MaestroMetrics metrics; + + @Before + public void initialize() { + when(context.getProperties()).thenReturn(properties); + when(properties.getActorErrorRetryIntervalInMillis()).thenReturn(3000L); + when(properties.getHeartbeatIntervalInMillis()).thenReturn(10000L); + when(properties.getGroupFlowFetchLimit()).thenReturn(100L); + when(properties.getFlowReconciliationIntervalInMillis()).thenReturn(30000L); + when(context.getMetrics()).thenReturn(metrics); + when(context.cloneTask(any())).thenAnswer(i -> i.getArguments()[0]); + } + + GroupActor createGroupActor() { + FlowGroup group = new FlowGroup(1, 2, "testAddress"); + return new GroupActor(group, context); + } + + void verifyEmptyAction(BaseActor actor) { + assertTrue(actor.getActions().isEmpty()); + } + + void verifyActions(BaseActor actor, Action... expectedActions) { + var actions = actor.getActions(); + assertEquals(expectedActions.length, actions.size()); + for (Action expectedAction : expectedActions) { + assertEquals(expectedAction, actions.poll()); + } + } +} diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorTest.java new file mode 100644 index 0000000..9087a53 --- /dev/null +++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/ActorTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.maestro.flow.actor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.netflix.maestro.flow.models.FlowGroup; +import org.junit.Test; + +public class ActorTest extends ActorBaseTest { + + @Test + public void testStartGroupActor() { + FlowGroup group = new FlowGroup(1, 2, "testAddress"); + GroupActor actor = (GroupActor) Actor.startGroupActor(group, context); + verify(context, times(1)).run(any()); + // check if the group actor was started + assertTrue(actor.isRunning()); + assertEquals(2, actor.generation()); + verifyActions(actor, Action.GROUP_START); + } +} diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/BaseActorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/BaseActorTest.java new file mode 100644 index 0000000..3c612a0 --- /dev/null +++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/BaseActorTest.java @@ -0,0 +1,355 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.maestro.flow.actor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.netflix.maestro.AssertHelper; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class BaseActorTest extends ActorBaseTest { + private GroupActor groupActor; + private FlowActor flowActor; + + @Before + public void init() { + groupActor = createGroupActor(); + flowActor = new FlowActor(createFlow(), groupActor, context); + } + + @Test + public void testIsRunning() { + assertTrue(groupActor.isRunning()); + assertTrue(flowActor.isRunning()); + + flowActor.terminateNow(); + assertTrue(groupActor.isRunning()); + assertFalse(flowActor.isRunning()); + + var actor3 = new FlowActor(createFlow(), groupActor, context); + assertTrue(groupActor.isRunning()); + assertTrue(actor3.isRunning()); + + groupActor.terminateNow(); + assertFalse(groupActor.isRunning()); + assertFalse(actor3.isRunning()); + } + + @Test + public void testPost() { + verifyEmptyAction(groupActor); + + groupActor.post(Action.GROUP_START); + verifyActions(groupActor, Action.GROUP_START); + } + + @Test + public void testGeneration() { + assertEquals(2, groupActor.generation()); + assertEquals(2, flowActor.generation()); + } + + @Test + public void testRunActionFor() { + groupActor.runActionFor(flowActor, Action.FLOW_START); + + verify(context, times(1)).run(any()); + verifyActions(flowActor, Action.FLOW_START); + assertEquals(flowActor, groupActor.getChild("test-flow-ref")); + + doThrow(new RejectedExecutionException()).when(context).run(flowActor); + groupActor.runActionFor(flowActor, Action.FLOW_START); + verifyEmptyAction(flowActor); + + doThrow(new RuntimeException("test unhandled error")).when(context).run(flowActor); + AssertHelper.assertThrows( + "should throw instead of ignore the error", + RuntimeException.class, + "test unhandled error", + () -> groupActor.runActionFor(flowActor, Action.FLOW_START)); + } + + @Test + public void testWakeUpChildActors() { + groupActor.wakeUpChildActors(Action.FLOW_TIMEOUT); + verifyEmptyAction(flowActor); + + groupActor.runActionFor(flowActor, Action.FLOW_START); + groupActor.wakeUpChildActors(Action.FLOW_TIMEOUT); + verifyActions(flowActor, Action.FLOW_START, Action.FLOW_TIMEOUT); + } + + @Test + public void testWakeUpChildActor() { + groupActor.wakeUpChildActor("test-flow-ref", Action.FLOW_TIMEOUT); + verifyEmptyAction(flowActor); + + groupActor.runActionFor(flowActor, Action.FLOW_START); + groupActor.wakeUpChildActor("test-flow-ref", Action.FLOW_TIMEOUT); + verifyActions(flowActor, Action.FLOW_START, Action.FLOW_TIMEOUT); + + groupActor.wakeUpChildActor("not-exist-flow-ref", Action.FLOW_TIMEOUT); + verifyEmptyAction(flowActor); + + flowActor.terminateNow(); + groupActor.wakeUpChildActor("test-flow-ref", Action.FLOW_TIMEOUT); + verifyEmptyAction(flowActor); + } + + @Test + public void testContainChild() { + assertFalse(groupActor.containsChild("test-flow-ref")); + + groupActor.runActionFor(flowActor, Action.FLOW_START); + assertTrue(groupActor.containsChild("test-flow-ref")); + } + + @Test + public void testGetChild() { + assertNull(groupActor.getChild("test-flow-ref")); + + groupActor.runActionFor(flowActor, Action.FLOW_START); + assertEquals(flowActor, groupActor.getChild("test-flow-ref")); + } + + @Test + public void testRemoveChild() { + assertNull(groupActor.removeChild("test-flow-ref")); + + groupActor.runActionFor(flowActor, Action.FLOW_START); + assertEquals(flowActor, groupActor.removeChild("test-flow-ref")); + assertNull(groupActor.getChild("test-flow-ref")); + } + + @Test + public void testCleanupChildActors() { + groupActor.runActionFor(flowActor, Action.FLOW_START); + groupActor.cleanupChildActors(); + assertTrue(groupActor.containsChild("test-flow-ref")); + + flowActor.terminateNow(); + groupActor.cleanupChildActors(); + assertFalse(groupActor.containsChild("test-flow-ref")); + } + + @Test + public void testNoChildActorsRunning() { + assertTrue(groupActor.noChildActorsRunning()); + + groupActor.runActionFor(flowActor, Action.FLOW_START); + assertFalse(groupActor.noChildActorsRunning()); + + flowActor.terminateNow(); + assertTrue(groupActor.noChildActorsRunning()); + } + + @Test + public void testStartShutdownFromRootWithoutChildActors() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + + assertTrue(groupActor.isRunning()); + + groupActor.schedule(Action.GROUP_START, 10000); + assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet()); + + groupActor.startShutdown(Action.FLOW_SHUTDOWN); + verify(future, times(1)).cancel(true); + assertFalse(groupActor.isRunning()); + } + + @Test + public void testStartShutdownFromRootWithChildActors() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + + groupActor.runActionFor(flowActor, Action.FLOW_START); + assertTrue(groupActor.isRunning()); + + groupActor.schedule(Action.GROUP_START, 10000); + assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet()); + + groupActor.startShutdown(Action.FLOW_SHUTDOWN); + verify(future, times(1)).cancel(true); + + assertTrue(groupActor.isRunning()); + verifyActions(flowActor, Action.FLOW_START, Action.FLOW_SHUTDOWN); + } + + @Test + public void testStartShutdownFromChildWithoutChildActors() { + groupActor.runActionFor(flowActor, Action.FLOW_START); + assertTrue(groupActor.isRunning()); + assertTrue(flowActor.isRunning()); + + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + + flowActor.schedule(Action.FLOW_REFRESH, 10000); + assertEquals(Set.of(Action.FLOW_REFRESH), flowActor.getScheduledActions().keySet()); + + flowActor.startShutdown(Action.TASK_SHUTDOWN); + verify(future, times(1)).cancel(true); + assertFalse(flowActor.isRunning()); + assertTrue(groupActor.isRunning()); + verifyActions(groupActor, Action.FLOW_DOWN); + + groupActor.startShutdown(Action.FLOW_SHUTDOWN); + assertFalse(groupActor.isRunning()); + } + + @Test + public void testCheckShutdownWithChildRunning() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + + groupActor.runActionFor(flowActor, Action.FLOW_START); + assertTrue(groupActor.isRunning()); + assertTrue(flowActor.isRunning()); + + flowActor.schedule(Action.FLOW_REFRESH, 10000); + assertEquals(Set.of(Action.FLOW_REFRESH), flowActor.getScheduledActions().keySet()); + + assertFalse(groupActor.checkShutdown()); + verify(future, times(0)).cancel(true); + assertTrue(groupActor.isRunning()); + } + + @Test + public void testCheckShutdownWithoutChildRunning() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + + groupActor.runActionFor(flowActor, Action.FLOW_START); + assertTrue(groupActor.isRunning()); + assertTrue(flowActor.isRunning()); + + flowActor.schedule(Action.FLOW_REFRESH, 10000); + assertEquals(Set.of(Action.FLOW_REFRESH), flowActor.getScheduledActions().keySet()); + + assertTrue(flowActor.checkShutdown()); + verifyActions(groupActor, Action.FLOW_DOWN); + assertFalse(flowActor.isRunning()); + + assertTrue(groupActor.checkShutdown()); + verify(future, times(1)).cancel(true); + assertFalse(groupActor.isRunning()); + } + + @Test + public void testTerminateNow() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + + assertTrue(groupActor.isRunning()); + groupActor.schedule(Action.GROUP_START, 10000); + assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet()); + + groupActor.terminateNow(); + assertFalse(groupActor.isRunning()); + verify(future, times(1)).cancel(true); + } + + @Test + public void testScheduleWithoutDelay() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + + groupActor.schedule(Action.GROUP_START, 0); + verify(context, times(0)).schedule(any(), anyLong()); + verifyActions(groupActor, Action.GROUP_START); + } + + @Test + public void testScheduleWithDelay() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + + groupActor.schedule(Action.GROUP_START, 10000); + verify(context, times(1)).schedule(any(), anyLong()); + assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet()); + verifyEmptyAction(groupActor); + } + + @Test + public void testScheduleForNotRunningActor() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + + groupActor.terminateNow(); + + groupActor.schedule(Action.GROUP_START, 10000); + verify(context, times(0)).schedule(any(), anyLong()); + verifyEmptyAction(groupActor); + } + + @Test + public void testScheduleForDuplicatePendingAction() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + + groupActor.schedule(Action.GROUP_START, 10000); + groupActor.schedule(Action.GROUP_START, 10000); + verify(context, times(1)).schedule(any(), anyLong()); + assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet()); + verifyEmptyAction(groupActor); + } + + @Test + public void testScheduleForDuplicateDoneAction() { + var future1 = Mockito.mock(ScheduledFuture.class); + var future2 = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future1).thenReturn(future2); + when(future1.isDone()).thenReturn(true); + + groupActor.runActionFor(flowActor, Action.FLOW_START); + groupActor.schedule(Action.GROUP_START, 10000); + groupActor.startShutdown(Action.FLOW_SHUTDOWN); + verify(future1, times(1)).cancel(true); + assertEquals(Map.of(Action.GROUP_START, future1), groupActor.getScheduledActions()); + assertTrue(groupActor.isRunning()); + + groupActor.schedule(Action.GROUP_START, 10000); + verify(context, times(2)).schedule(any(), anyLong()); + verify(future1, times(1)).cancel(true); + verify(future2, times(0)).cancel(true); + assertEquals(Map.of(Action.GROUP_START, future2), groupActor.getScheduledActions()); + } + + @Test + public void testRun() { + flowActor.post(Action.TASK_DOWN); + flowActor.run(); + verify(context, times(2)).getMetrics(); + verify(metrics, times(1)).counter("num_of_running_flows", FlowActor.class); + verify(metrics, times(1)) + .counter("num_of_finished_flows", FlowActor.class, "finalized", "false"); + verifyActions(groupActor, Action.FLOW_DOWN); + verifyEmptyAction(flowActor); + } +} diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/FlowActorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/FlowActorTest.java new file mode 100644 index 0000000..c7e672f --- /dev/null +++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/FlowActorTest.java @@ -0,0 +1,402 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.maestro.flow.actor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.netflix.maestro.AssertHelper; +import com.netflix.maestro.exceptions.MaestroUnprocessableEntityException; +import com.netflix.maestro.flow.models.Flow; +import com.netflix.maestro.flow.models.Task; +import com.netflix.maestro.flow.models.TaskDef; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class FlowActorTest extends ActorBaseTest { + + private GroupActor groupActor; + private FlowActor flowActor; + private Flow flow; + + @Before + public void init() { + groupActor = createGroupActor(); + flow = createFlow(); + flowActor = new FlowActor(flow, groupActor, context); + } + + @Test + public void testBeforeRunning() { + flowActor.beforeRunning(); + verify(context, times(1)).getMetrics(); + verify(metrics, times(1)).counter("num_of_running_flows", FlowActor.class); + } + + @Test + public void testRunForActionFlowStart() { + assertNull(flow.getPrepareTask()); + assertNull(flow.getMonitorTask()); + + flowActor.runForAction(Action.FLOW_START); + assertNotNull(flow.getPrepareTask()); + assertNotNull(flow.getMonitorTask()); + verify(context, times(0)).resumeFlow(flow); + verify(context, times(1)).prepare(flow); + assertEquals(Set.of(Action.FLOW_TIMEOUT), flowActor.getScheduledActions().keySet()); + + var actions = flowActor.getActions(); + assertEquals(1, actions.size()); + assertEquals(Action.FlowReconcile.class, actions.poll().getClass()); + } + + @Test + public void testRunForActionFlowResumeWithRunningFlow() { + assertNull(flow.getPrepareTask()); + assertNull(flow.getMonitorTask()); + + flowActor.runForAction(Action.FLOW_RESUME); + assertNull(flow.getPrepareTask()); + assertNull(flow.getMonitorTask()); + verify(context, times(1)).resumeFlow(flow); + verify(context, times(0)).prepare(flow); + assertEquals(Set.of(Action.FLOW_TIMEOUT), flowActor.getScheduledActions().keySet()); + + var actions = flowActor.getActions(); + assertEquals(1, actions.size()); + assertEquals(Action.FlowReconcile.class, actions.poll().getClass()); + } + + @Test + public void testRunForActionFlowResumeWithRunningTasks() { + Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false); + Task task2 = flow.newTask(new TaskDef("task2", "noop", null, null), false); + task2.setStatus(Task.Status.FAILED); + flow.addFinishedTask(task2); + + flowActor.runForAction(Action.FLOW_RESUME); + var actions = flowActor.getActions(); + assertEquals(2, actions.size()); + assertEquals(new Action.FlowTaskRetry("task2"), actions.poll()); + assertEquals(Action.FlowReconcile.class, actions.poll().getClass()); + + verify(context, times(1)).resumeFlow(flow); + verify(context, times(1)).cloneTask(task1); + verify(context, times(1)).run(any()); + verify(context, times(0)).prepare(flow); + assertEquals(Set.of(Action.FLOW_TIMEOUT), flowActor.getScheduledActions().keySet()); + + assertTrue(flowActor.containsChild("task1")); + var child = flowActor.getChild("task1"); + verifyActions(child, Action.TASK_RESUME); + } + + @Test + public void testRunForActionFlowResumeWithTerminatedFlow() { + flow.setStatus(Flow.Status.FAILED); + assertNull(flow.getPrepareTask()); + assertNull(flow.getMonitorTask()); + + flowActor.runForAction(Action.FLOW_RESUME); + assertNull(flow.getPrepareTask()); + assertNull(flow.getMonitorTask()); + assertFalse(flowActor.isRunning()); + + flowActor.afterRunning(); + verify(context, times(1)).deleteFlow(flow); + verify(context, times(1)).resumeFlow(flow); + verify(context, times(0)).prepare(flow); + verifyEmptyAction(flowActor); + } + + @Test + public void testRunForActionFlowReconcile() { + flow.getFlowDef() + .setTasks( + List.of( + List.of(new TaskDef("task1", "noop", null, null)), + List.of(new TaskDef("task2", "noop", null, null)))); + flow.setPrepareTask(flow.newTask(new TaskDef("prepare", "noop", null, null), true)); + flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true)); + + flowActor.runForAction(new Action.FlowReconcile(123)); + assertEquals(2, flow.getRunningTasks().size()); + assertTrue(flowActor.containsChild("task1")); + assertTrue(flowActor.containsChild("task2")); + var child1 = flowActor.getChild("task1"); + var child2 = flowActor.getChild("task2"); + verifyActions(child1, Action.TASK_START); + verifyActions(child2, Action.TASK_START); + + verify(context, times(2)).cloneTask(any()); + verify(context, times(1)).refresh(flow); + verify(context, times(0)).finalCall(flow); + + var scheduledActions = flowActor.getScheduledActions(); + assertEquals(1, scheduledActions.size()); + assertEquals( + Action.FlowReconcile.class, + scheduledActions.keySet().stream().findFirst().get().getClass()); + } + + @Test + public void testRunForActionFlowReconcileWithFailedPreparedTask() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + + flow.getFlowDef() + .setTasks( + List.of( + List.of(new TaskDef("task1", "noop", null, null)), + List.of(new TaskDef("task2", "noop", null, null)))); + flow.setPrepareTask(flow.newTask(new TaskDef("prepare", "noop", null, null), true)); + flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true)); + + flowActor.runForAction(new Action.FlowReconcile(123)); + var child1 = flowActor.getChild("task1"); + var child2 = flowActor.getChild("task2"); + verifyActions(child1, Action.TASK_START); + verifyActions(child2, Action.TASK_START); + assertTrue(flowActor.isRunning()); + verify(context, times(2)).cloneTask(any()); + verify(context, times(1)).refresh(flow); + + flow.getPrepareTask().setStatus(Task.Status.FAILED_WITH_TERMINAL_ERROR); + flowActor.runForAction(new Action.FlowReconcile(123)); + assertEquals(Task.Status.CANCELED, flow.getMonitorTask().getStatus()); + assertEquals(Flow.Status.FAILED, flow.getStatus()); + verify(future, times(1)).cancel(true); + verifyActions(child1, Action.TASK_PING); + verifyActions(child2, Action.TASK_PING); + verify(context, times(1)).finalCall(flow); + assertFalse(flowActor.isRunning()); + } + + @Test + public void testFlowRefreshRunningFlow() { + flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true)); + flowActor.runForAction(Action.FLOW_REFRESH); + verify(context, times(1)).refresh(flow); + verify(context, times(0)).finalCall(flow); + } + + @Test + public void testFlowRefreshTerminatedFlow() { + flow.setPrepareTask(flow.newTask(new TaskDef("prepare", "noop", null, null), true)); + flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true)); + flow.getMonitorTask().setStatus(Task.Status.COMPLETED); + + flowActor.runForAction(Action.FLOW_REFRESH); + verify(context, times(1)).refresh(flow); + verify(context, times(1)).finalCall(flow); + assertEquals(Flow.Status.COMPLETED, flow.getStatus()); + assertFalse(flowActor.isRunning()); + } + + @Test + public void testFlowTaskRetry() { + Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false); + task1.setStatus(Task.Status.FAILED); + flow.addFinishedTask(task1); + + flowActor.runForAction(new Action.FlowTaskRetry("task1")); + assertTrue(flowActor.containsChild("task1")); + var child = flowActor.getChild("task1"); + verifyActions(child, Action.TASK_START); + assertEquals(Set.of("task1"), flow.getRunningTasks().keySet()); + verify(context, times(1)).cloneTask(any()); + verify(context, times(1)).run(any()); + } + + @Test + public void testTaskUpdateForRunningTask() { + Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false); + task1.setStatus(Task.Status.FAILED); + flow.addFinishedTask(task1); + flowActor.runForAction(new Action.FlowTaskRetry("task1")); + + Task task2 = flow.newTask(new TaskDef("task1", "noop", Map.of(), null), false); + task2.setStatus(Task.Status.IN_PROGRESS); + + flowActor.runForAction(new Action.TaskUpdate(task2)); + assertEquals(Set.of(Action.FLOW_REFRESH), flowActor.getScheduledActions().keySet()); + assertEquals(task2, flow.getRunningTasks().get("task1")); + verifyEmptyAction(flowActor); + } + + @Test + public void testTaskUpdateForRunningInactiveTask() { + Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false); + task1.setStatus(Task.Status.FAILED); + flow.addFinishedTask(task1); + flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true)); + + flowActor.runForAction(new Action.FlowTaskRetry("task1")); + Task task2 = flow.newTask(new TaskDef("task1", "noop", Map.of(), null), false); + task2.setStatus(Task.Status.IN_PROGRESS); + task2.setActive(false); + task2.setStartDelayInSeconds(3000); + + flowActor.runForAction(new Action.TaskUpdate(task2)); + assertEquals( + Set.of(Action.FLOW_REFRESH, new Action.TaskWakeUp("task1")), + flowActor.getScheduledActions().keySet()); + assertEquals(task2, flow.getRunningTasks().get("task1")); + } + + @Test + public void testTaskUpdateForTerminatedTask() { + flow.getFlowDef() + .setTasks( + List.of( + List.of(new TaskDef("task1", "noop", null, null)), + List.of(new TaskDef("task2", "noop", null, null)))); + flow.setPrepareTask(flow.newTask(new TaskDef("prepare", "noop", null, null), true)); + flow.setMonitorTask(flow.newTask(new TaskDef("monitor", "noop", null, null), true)); + + Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false); + task1.setStatus(Task.Status.FAILED); + flow.addFinishedTask(task1); + flowActor.runForAction(new Action.FlowTaskRetry("task1")); + verify(context, times(1)).cloneTask(any()); + + Task task2 = flow.newTask(new TaskDef("task1", "noop", Map.of(), null), false); + task2.setStatus(Task.Status.FAILED); + task2.setStartDelayInSeconds(3000); + flowActor.runForAction(new Action.TaskUpdate(task2)); + assertEquals( + Set.of(new Action.FlowTaskRetry("task1")), flowActor.getScheduledActions().keySet()); + assertFalse(flowActor.containsChild("task1")); + assertFalse(flow.getRunningTasks().containsKey("task1")); + assertTrue(flowActor.containsChild("task2")); + assertTrue(flow.getRunningTasks().containsKey("task2")); + var child2 = flowActor.getChild("task2"); + verifyActions(child2, Action.TASK_START); + + verify(context, times(2)).cloneTask(any()); + verify(context, times(1)).refresh(flow); + verify(context, times(0)).finalCall(flow); + } + + @Test + public void testTaskWakeUpWithRunningTask() { + Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false); + task1.setStatus(Task.Status.FAILED); + flow.addFinishedTask(task1); + flowActor.runForAction(new Action.FlowTaskRetry("task1")); + verifyActions(flowActor.getChild("task1"), Action.TASK_START); + + Task task2 = flow.getRunningTasks().get("task1"); + task2.setActive(false); + flowActor.runForAction(new Action.TaskWakeUp("task1")); + assertTrue(task2.isActive()); + verifyActions(flowActor.getChild("task1"), Action.TASK_ACTIVATE); + } + + @Test + public void testTaskWakeUpWithQueuedTask() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + when(future.cancel(false)).thenReturn(true); + + Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false); + task1.setStatus(Task.Status.FAILED); + task1.setStartDelayInSeconds(3000); + flow.addFinishedTask(task1); + flowActor.runForAction(Action.FLOW_RESUME); + assertFalse(flowActor.containsChild("task1")); + + flowActor.runForAction(new Action.TaskWakeUp("task1")); + verify(future, times(1)).cancel(false); + verifyActions(flowActor.getChild("task1"), Action.TASK_START, Action.TASK_ACTIVATE); + assertTrue(flowActor.containsChild("task1")); + } + + @Test + public void testFlowTimeoutWithoutTasks() { + flowActor.runForAction(Action.FLOW_TIMEOUT); + assertEquals(Flow.Status.TIMED_OUT, flow.getStatus()); + assertEquals(Set.of(Action.FLOW_TIMEOUT), flowActor.getScheduledActions().keySet()); + } + + @Test + public void testFlowTimeoutWithTasks() { + var future = Mockito.mock(ScheduledFuture.class); + when(context.schedule(any(), anyLong())).thenReturn(future); + when(future.cancel(false)).thenReturn(true); + + Task task1 = flow.newTask(new TaskDef("task1", "noop", null, null), false); + task1.setStatus(Task.Status.FAILED); + task1.setStartDelayInSeconds(3000); + flow.addFinishedTask(task1); + flowActor.runForAction(Action.FLOW_RESUME); + assertFalse(flowActor.containsChild("task1")); + + flowActor.runForAction(Action.FLOW_TIMEOUT); + assertEquals(Flow.Status.TIMED_OUT, flow.getStatus()); + assertEquals( + Set.of(new Action.FlowTaskRetry("task1"), Action.FLOW_TIMEOUT), + flowActor.getScheduledActions().keySet()); + verifyActions(flowActor.getChild("task1"), Action.TASK_START, Action.TASK_STOP); + } + + @Test + public void testFlowShutdown() { + flowActor.runForAction(Action.FLOW_SHUTDOWN); + verifyActions(groupActor, Action.FLOW_DOWN); + assertFalse(flowActor.isRunning()); + } + + @Test + public void testTaskDown() { + flowActor.runForAction(Action.TASK_DOWN); + verifyActions(groupActor, Action.FLOW_DOWN); + assertFalse(flowActor.isRunning()); + } + + @Test + public void testUnexpectedAction() { + AssertHelper.assertThrows( + "should throw for unexpected action", + MaestroUnprocessableEntityException.class, + "Unexpected action: [TaskPing[]] for flow ", + () -> flowActor.runForAction(Action.TASK_PING)); + } + + @Test + public void testAfterRunning() { + flowActor.afterRunning(); + verify(context, times(1)).getMetrics(); + verify(metrics, times(1)) + .counter("num_of_finished_flows", FlowActor.class, "finalized", "false"); + verify(context, times(0)).deleteFlow(any()); + } + + @Test + public void testReference() { + assertEquals("test-flow-ref", flowActor.reference()); + } +} diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/GroupActorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/GroupActorTest.java new file mode 100644 index 0000000..4fb72b6 --- /dev/null +++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/GroupActorTest.java @@ -0,0 +1,170 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.maestro.flow.actor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.netflix.maestro.AssertHelper; +import com.netflix.maestro.exceptions.MaestroUnprocessableEntityException; +import com.netflix.maestro.flow.models.Flow; +import java.util.List; +import java.util.Set; +import org.junit.Before; +import org.junit.Test; + +public class GroupActorTest extends ActorBaseTest { + private GroupActor groupActor; + private Flow flow; + + @Before + public void init() { + groupActor = createGroupActor(); + flow = createFlow(); + } + + @Test + public void testGeneration() { + assertEquals(2, groupActor.generation()); + } + + @Test + public void testBeforeRunning() { + groupActor.beforeRunning(); + verify(context, times(1)).schedule(any(), anyLong()); + assertEquals(Set.of(Action.GROUP_HEARTBEAT), groupActor.getScheduledActions().keySet()); + } + + @Test + public void testRunForActionStartGroupWithAnException() { + when(context.getFlowsFrom(any(), anyLong(), any())).thenReturn(null); + groupActor.runForAction(Action.GROUP_START); + + verify(context, times(1)).getFlowsFrom(any(), anyLong(), any()); + verify(context, times(1)).schedule(any(), anyLong()); + assertEquals(Set.of(Action.GROUP_START), groupActor.getScheduledActions().keySet()); + } + + @Test + public void testRunForActionStartGroupWithoutNewFlow() { + when(context.getFlowsFrom(any(), anyLong(), any())).thenReturn(List.of()); + groupActor.runForAction(Action.GROUP_START); + + verify(context, times(1)).getFlowsFrom(any(), anyLong(), any()); + verify(context, times(0)).schedule(any(), anyLong()); + assertTrue(groupActor.getScheduledActions().isEmpty()); + } + + @Test + public void testRunForActionStartGroupWithNewFlow() { + when(context.getFlowsFrom(any(), anyLong(), any())) + .thenReturn(List.of(flow)) + .thenReturn(List.of()); + groupActor.runForAction(Action.GROUP_START); + + verifyActions(groupActor, new Action.FlowLaunch(flow, true)); + verify(context, times(2)).getFlowsFrom(any(), anyLong(), any()); + verify(context, times(0)).schedule(any(), anyLong()); + assertTrue(groupActor.getScheduledActions().isEmpty()); + } + + @Test + public void testRunForActionFlowLaunchResume() { + groupActor.runForAction(new Action.FlowLaunch(flow, true)); + + verify(context, times(1)).run(any()); + var child = groupActor.getChild(flow.getReference()); + verifyActions(child, Action.FLOW_RESUME); + + groupActor.runForAction(new Action.FlowLaunch(flow, true)); + verify(context, times(1)).run(any()); + assertEquals(child, groupActor.getChild(flow.getReference())); + } + + @Test + public void testRunForActionFlowLaunchStart() { + groupActor.runForAction(new Action.FlowLaunch(flow, false)); + + verify(context, times(1)).run(any()); + var child = groupActor.getChild(flow.getReference()); + verifyActions(child, Action.FLOW_START); + + groupActor.runForAction(new Action.FlowLaunch(flow, true)); + verify(context, times(1)).run(any()); + assertEquals(child, groupActor.getChild(flow.getReference())); + } + + @Test + public void testRunForActionFlowWakeUp() { + groupActor.runForAction(new Action.FlowWakeUp(flow.getReference(), "taskRef")); + assertNull(groupActor.getChild(flow.getReference())); + + groupActor.runForAction(new Action.FlowLaunch(flow, false)); + verify(context, times(1)).run(any()); + var child = groupActor.getChild(flow.getReference()); + verifyActions(child, Action.FLOW_START); + + groupActor.runForAction(new Action.FlowWakeUp(flow.getReference(), "taskRef")); + verifyActions(child, new Action.TaskWakeUp("taskRef")); + } + + @Test + public void testGroupHeartbeat() { + groupActor.runForAction(Action.GROUP_HEARTBEAT); + + verify(context, times(1)).heartbeatGroup(any()); + verify(context, times(1)).schedule(any(), anyLong()); + assertEquals(Set.of(Action.GROUP_HEARTBEAT), groupActor.getScheduledActions().keySet()); + } + + @Test + public void testGroupShutdown() { + assertTrue(groupActor.isRunning()); + + groupActor.runForAction(Action.GROUP_SHUTDOWN); + assertFalse(groupActor.isRunning()); + verifyEmptyAction(groupActor); + assertTrue(groupActor.getScheduledActions().isEmpty()); + } + + @Test + public void testFlowDown() { + assertTrue(groupActor.isRunning()); + + groupActor.runForAction(Action.FLOW_DOWN); + assertFalse(groupActor.isRunning()); + verifyEmptyAction(groupActor); + assertTrue(groupActor.getScheduledActions().isEmpty()); + } + + @Test + public void testUnexpectedAction() { + AssertHelper.assertThrows( + "should throw for unexpected action", + MaestroUnprocessableEntityException.class, + "Unexpected action: [TaskDown[]] for FlowGroup [group-1]", + () -> groupActor.runForAction(Action.TASK_DOWN)); + } + + @Test + public void testReference() { + assertEquals("group-1", groupActor.reference()); + } +} diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/TaskActorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/TaskActorTest.java new file mode 100644 index 0000000..cf261ff --- /dev/null +++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/actor/TaskActorTest.java @@ -0,0 +1,183 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.maestro.flow.actor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.netflix.maestro.flow.models.Flow; +import com.netflix.maestro.flow.models.Task; +import com.netflix.maestro.flow.models.TaskDef; +import java.util.Set; +import org.junit.Before; +import org.junit.Test; + +public class TaskActorTest extends ActorBaseTest { + + private Flow flow; + private Task task; + private FlowActor flowActor; + private TaskActor taskActor; + + @Before + public void init() { + flow = createFlow(); + task = flow.newTask(new TaskDef("task1", "noop", null, null), false); + flowActor = new FlowActor(flow, createGroupActor(), context); + taskActor = new TaskActor(task, flow, flowActor, context); + } + + @Test + public void testBeforeRunning() { + taskActor.beforeRunning(); + verify(context, times(1)).getMetrics(); + verify(metrics, times(1)).counter("num_of_running_tasks", TaskActor.class); + } + + @Test + public void testRunForActionTaskStart() { + taskActor.runForAction(Action.TASK_START); + verify(context, times(1)).start(any(), any()); + verifyActions(taskActor, Action.TASK_PING); + } + + @Test + public void testRunForActionTaskResume() { + taskActor.runForAction(Action.TASK_RESUME); + verify(context, times(0)).start(any(), any()); + verifyActions(taskActor, Action.TASK_PING); + } + + @Test + public void testRunForActionStopForActiveRunningTask() { + taskActor.runForAction(Action.TASK_STOP); + verify(context, times(1)).cancel(any(), any()); + assertEquals(Set.of(Action.TASK_STOP), taskActor.getScheduledActions().keySet()); + verifyEmptyAction(taskActor); + assertTrue(taskActor.isRunning()); + } + + @Test + public void testRunForActionStopForInactiveTask() { + task.setActive(false); + + taskActor.runForAction(Action.TASK_STOP); + verify(context, times(0)).cancel(any(), any()); + } + + @Test + public void testRunForActionStopForTerminatedTask() { + task.setStatus(Task.Status.FAILED); + + taskActor.runForAction(Action.TASK_STOP); + verify(context, times(1)).cancel(any(), any()); + assertTrue(taskActor.getScheduledActions().isEmpty()); + verifyEmptyAction(taskActor); + assertFalse(taskActor.isRunning()); + verify(context, times(1)).cloneTask(any()); + verifyActions(flowActor, new Action.TaskUpdate(task)); + } + + @Test + public void testRunForActionTaskPingForActiveRunningTaskWithChange() { + task.setStartTime(System.currentTimeMillis()); + task.setTimeoutInMillis(3600000L); + task.setStartDelayInSeconds(3000); + when(context.execute(flow, task)).thenReturn(true); + + taskActor.runForAction(Action.TASK_PING); + verify(context, times(1)).execute(any(), any()); + assertEquals( + Set.of(Action.TASK_TIMEOUT, Action.TASK_PING), taskActor.getScheduledActions().keySet()); + assertNull(task.getTimeoutInMillis()); + assertEquals(1, task.getPollCount()); + verify(context, times(1)).cloneTask(any()); + verifyActions(flowActor, new Action.TaskUpdate(task)); + } + + @Test + public void testRunForActionTaskPingForInactiveRunningTaskWithoutChange() { + task.setActive(false); + + taskActor.runForAction(Action.TASK_PING); + verify(context, times(0)).execute(any(), any()); + assertTrue(taskActor.getScheduledActions().isEmpty()); + assertEquals(0, task.getPollCount()); + verify(context, times(0)).cloneTask(any()); + verifyEmptyAction(flowActor); + } + + @Test + public void testRunForActionTaskPingForTerminatedTask() { + task.setStatus(Task.Status.FAILED); + task.setStartDelayInSeconds(3000); + when(context.execute(flow, task)).thenReturn(false); + + taskActor.runForAction(Action.TASK_PING); + verify(context, times(1)).execute(any(), any()); + assertFalse(taskActor.isRunning()); + verify(context, times(1)).cloneTask(any()); + verifyActions(flowActor, new Action.TaskUpdate(task)); + verifyEmptyAction(taskActor); + assertTrue(taskActor.getScheduledActions().isEmpty()); + } + + @Test + public void testRunForActionTaskActivate() { + verifyExecute(Action.TASK_ACTIVATE, false); + } + + private void verifyExecute(Action action, boolean activeFlag) { + task.setActive(activeFlag); + task.setStartDelayInSeconds(3000L); + when(context.execute(flow, task)).thenReturn(false); + + taskActor.runForAction(action); + assertTrue(task.isActive()); + verify(context, times(1)).execute(any(), any()); + assertEquals(1, task.getPollCount()); + assertEquals(Set.of(Action.TASK_PING), taskActor.getScheduledActions().keySet()); + verify(context, times(0)).cloneTask(any()); + verifyEmptyAction(flowActor); + } + + @Test + public void testRunForActionTaskTimeout() { + verifyExecute(Action.TASK_TIMEOUT, true); + } + + @Test + public void testRunForActionTaskShutdown() { + taskActor.runForAction(Action.TASK_SHUTDOWN); + assertFalse(taskActor.isRunning()); + verifyActions(flowActor, Action.TASK_DOWN); + } + + @Test + public void testAfterRunning() { + taskActor.afterRunning(); + verify(context, times(1)).getMetrics(); + verify(metrics, times(1)).counter("num_of_finished_tasks", TaskActor.class); + } + + @Test + public void testReference() { + assertEquals("task1", taskActor.reference()); + } +} diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/dao/MaestroFlowDaoTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/dao/MaestroFlowDaoTest.java new file mode 100644 index 0000000..012f184 --- /dev/null +++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/dao/MaestroFlowDaoTest.java @@ -0,0 +1,158 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.maestro.flow.dao; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.maestro.AssertHelper; +import com.netflix.maestro.database.DatabaseConfiguration; +import com.netflix.maestro.database.DatabaseSourceProvider; +import com.netflix.maestro.exceptions.MaestroRetryableError; +import com.netflix.maestro.flow.FlowBaseTest; +import com.netflix.maestro.flow.models.Flow; +import com.netflix.maestro.flow.models.FlowGroup; +import com.netflix.maestro.metrics.MaestroMetrics; +import com.netflix.maestro.utils.JsonHelper; +import java.io.IOException; +import javax.sql.DataSource; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +public class MaestroFlowDaoTest extends FlowBaseTest { + + private static DatabaseConfiguration config; + private static DataSource dataSource; + private static ObjectMapper mapper; + private static MaestroMetrics metrics; + + private static class MaestroDBTestConfiguration implements DatabaseConfiguration { + @Override + public String getJdbcUrl() { + return "jdbc:tc:cockroach:v22.2.19:///maestro"; + } + + @Override + public int getConnectionPoolMaxSize() { + return 10; + } + + @Override + public int getConnectionPoolMinIdle() { + return getConnectionPoolMaxSize(); + } + } + + @BeforeClass + public static void init() { + config = new MaestroDBTestConfiguration(); + dataSource = new DatabaseSourceProvider(config).get(); + mapper = JsonHelper.objectMapper(); + metrics = Mockito.mock(MaestroMetrics.class); + } + + private MaestroFlowDao dao; + private FlowGroup group; + + @Before + public void setUp() throws IOException { + dao = new MaestroFlowDao(dataSource, mapper, config, metrics); + group = new FlowGroup(10, 1, "testAddress"); + dao.insertGroup(group); + } + + @After + public void tearDown() { + dao.deleteGroup(10); + } + + @Test + public void testInsertFlow() { + Flow flow = createFlow(); + dao.insertFlow(flow); + var flows = dao.getFlows(new FlowGroup(10, 2, "testAddress"), 10, "test"); + assertEquals(1, flows.size()); + assertEquals(10, flows.getFirst().getGroupId()); + assertEquals("test-flow-id", flows.getFirst().getFlowId()); + assertEquals(2, flows.getFirst().getGeneration()); + assertEquals(flow.getStartTime(), flows.getFirst().getStartTime()); + assertEquals("test-flow-ref", flows.getFirst().getReference()); + dao.deleteFlow(flow); + } + + @Test + public void testDeleteFlow() { + Flow flow = createFlow(); + dao.insertFlow(flow); + dao.deleteFlow(flow); + var flows = dao.getFlows(new FlowGroup(10, 2, "testAddress"), 10, "test"); + assertTrue(flows.isEmpty()); + } + + @Test + public void testGetFlows() { + Flow flow = createFlow(); + dao.insertFlow(flow); + var flows = dao.getFlows(new FlowGroup(10, 1, "testAddress"), 10, "test"); + assertTrue(flows.isEmpty()); + flows = dao.getFlows(new FlowGroup(9, 2, "testAddress"), 10, "test"); + assertTrue(flows.isEmpty()); + flows = dao.getFlows(new FlowGroup(9, 2, "testAddress"), 10, "z"); + assertTrue(flows.isEmpty()); + dao.deleteFlow(flow); + } + + @Test + public void testHeartbeatGroup() { + assertTrue(dao.heartbeatGroup(group)); + assertFalse(dao.heartbeatGroup(new FlowGroup(10, 2, "testAddress"))); + } + + @Test + public void testClaimExpiredGroup() { + assertNull(dao.claimExpiredGroup("address2", 100000)); + FlowGroup claimed = dao.claimExpiredGroup("address2", -100000); + assertEquals(10, claimed.groupId()); + assertEquals(2, claimed.generation()); + assertEquals("address2", claimed.address()); + } + + @Test + public void testInsertGroup() { + AssertHelper.assertThrows( + "should throw and retry", + MaestroRetryableError.class, + "insertGroup for group [10] is failed (res=[0])", + () -> dao.insertGroup(group)); + FlowGroup claimed = dao.claimExpiredGroup("address2", -100000); + assertEquals(10, claimed.groupId()); + assertEquals(2, claimed.generation()); + assertEquals("address2", claimed.address()); + } + + @Test + public void testExistFlowWithSameKeys() { + Flow flow = createFlow(); + dao.insertFlow(flow); + assertTrue(dao.existFlowWithSameKeys(10, "test-flow-id")); + assertFalse(dao.existFlowWithSameKeys(2, "test-flow-id")); + assertFalse(dao.existFlowWithSameKeys(10, "test-flow-id2")); + dao.deleteFlow(flow); + } +} diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/ExecutionContextTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/ExecutionContextTest.java new file mode 100644 index 0000000..e811e31 --- /dev/null +++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/ExecutionContextTest.java @@ -0,0 +1,293 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.maestro.flow.engine; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.netflix.maestro.AssertHelper; +import com.netflix.maestro.exceptions.MaestroInternalError; +import com.netflix.maestro.exceptions.MaestroRetryableError; +import com.netflix.maestro.exceptions.MaestroUnprocessableEntityException; +import com.netflix.maestro.flow.FlowBaseTest; +import com.netflix.maestro.flow.dao.MaestroFlowDao; +import com.netflix.maestro.flow.models.Flow; +import com.netflix.maestro.flow.models.FlowGroup; +import com.netflix.maestro.flow.models.Task; +import com.netflix.maestro.flow.models.TaskDef; +import com.netflix.maestro.flow.properties.FlowEngineProperties; +import com.netflix.maestro.flow.runtime.ExecutionPreparer; +import com.netflix.maestro.flow.runtime.FinalFlowStatusCallback; +import com.netflix.maestro.flow.runtime.FlowTask; +import com.netflix.maestro.metrics.MaestroMetrics; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; + +public class ExecutionContextTest extends FlowBaseTest { + @Mock FlowTask flowTask; + @Mock private FinalFlowStatusCallback finalCallback; + @Mock private ExecutionPreparer executionPreparer; + @Mock private MaestroFlowDao flowDao; + @Mock private FlowEngineProperties properties; + @Mock private MaestroMetrics metrics; + + private ExecutionContext context; + private Flow flow; + private FlowGroup group; + + @Before + public void init() { + context = + new ExecutionContext( + Map.of("noop", flowTask), + finalCallback, + executionPreparer, + flowDao, + properties, + metrics); + flow = createFlow(); + group = new FlowGroup(1, 1, "testAddress"); + } + + @Test + public void testRun() { + AtomicInteger counter = new AtomicInteger(0); + context.run(counter::incrementAndGet); + context.shutdown(); + assertEquals(1, counter.get()); + } + + @Test + public void testSchedule() { + AtomicInteger counter = new AtomicInteger(0); + context.schedule(counter::incrementAndGet, 1); + context.shutdown(); + assertEquals(1, counter.get()); + } + + @Test + public void testPrepareDone() { + Task prepare = flow.newTask(new TaskDef("prepare", "noop", null, null), true); + flow.setPrepareTask(prepare); + assertNull(flow.getPrepareTask().getStartTime()); + prepare.setStatus(Task.Status.COMPLETED); + prepare.setReasonForIncompletion("hello"); + context.prepare(flow); + assertNotNull(flow.getPrepareTask().getStartTime()); + verify(flowTask, times(1)).execute(flow, prepare); + assertEquals("hello", flow.getReasonForIncompletion()); + } + + @Test + public void testPrepareRetry() { + Task prepare = flow.newTask(new TaskDef("prepare", "noop", null, null), true); + flow.setPrepareTask(prepare); + assertNull(flow.getPrepareTask().getStartTime()); + + AssertHelper.assertThrows( + "should throw and retry", + MaestroRetryableError.class, + "prepare task is not done yet", + () -> context.prepare(flow)); + assertNotNull(flow.getPrepareTask().getStartTime()); + verify(flowTask, times(1)).execute(flow, prepare); + } + + @Test + public void testRefresh() { + Task monitor = flow.newTask(new TaskDef("monitor", "noop", null, null), true); + flow.setMonitorTask(monitor); + + context.refresh(flow); + verify(flowTask, times(1)).execute(flow, monitor); + } + + @Test + public void testFinalCall() { + flow.getFlowDef().setFinalFlowStatusCallbackEnabled(false); + context.finalCall(flow); + verify(finalCallback, times(0)).onFlowCompleted(flow); + verify(finalCallback, times(0)).onFlowTerminated(flow); + verify(finalCallback, times(0)).onFlowFinalized(flow); + + flow.getFlowDef().setFinalFlowStatusCallbackEnabled(true); + flow.setStatus(Flow.Status.COMPLETED); + context.finalCall(flow); + verify(finalCallback, times(1)).onFlowCompleted(flow); + verify(finalCallback, times(0)).onFlowTerminated(flow); + verify(finalCallback, times(1)).onFlowFinalized(flow); + + Mockito.reset(finalCallback); + flow.setStatus(Flow.Status.FAILED); + context.finalCall(flow); + verify(finalCallback, times(0)).onFlowCompleted(flow); + verify(finalCallback, times(1)).onFlowTerminated(flow); + verify(finalCallback, times(1)).onFlowFinalized(flow); + } + + @Test + public void testStart() { + Task task = flow.newTask(new TaskDef("task", "noop", null, null), false); + context.start(flow, task); + verify(flowTask, times(1)).start(flow, task); + } + + @Test + public void testExecute() { + Task task = flow.newTask(new TaskDef("task", "noop", null, null), false); + context.execute(flow, task); + verify(flowTask, times(1)).execute(flow, task); + } + + @Test + public void testCancel() { + Task task = flow.newTask(new TaskDef("task", "noop", null, null), false); + context.cancel(flow, task); + verify(flowTask, times(1)).cancel(flow, task); + } + + @Test + public void testCloneTask() { + Task task = flow.newTask(new TaskDef("task", "noop", null, null), false); + context.cloneTask(task); + verify(executionPreparer, times(1)).cloneTask(task); + } + + @Test + public void testCloneTaskException() { + Task task = flow.newTask(new TaskDef("task", "noop", null, null), false); + Mockito.doThrow(new RuntimeException("test")).when(executionPreparer).cloneTask(task); + AssertHelper.assertThrows( + "should throw", + MaestroUnprocessableEntityException.class, + "cannot clone task: [task]", + () -> context.cloneTask(task)); + } + + @Test + public void testSaveFlow() { + context.saveFlow(flow); + verify(flowDao, times(1)).insertFlow(flow); + } + + @Test + public void testSaveFlowRetry() { + Mockito.doThrow(new MaestroInternalError("test")).when(flowDao).insertFlow(flow); + AssertHelper.assertThrows( + "should throw and retry", + MaestroRetryableError.class, + "insertFlow is failed and please retry", + () -> context.saveFlow(flow)); + } + + @Test + public void testDeleteFlow() { + context.deleteFlow(flow); + verify(flowDao, times(1)).deleteFlow(flow); + } + + @Test + public void testGetFlowsFrom() { + when(flowDao.getFlows(group, 10, "")).thenReturn(List.of()); + assertEquals(List.of(), context.getFlowsFrom(group, 10, "")); + verify(flowDao, times(1)).getFlows(group, 10, ""); + } + + @Test + public void testGetFlowsFromWithError() { + Mockito.doThrow(new MaestroInternalError("test")).when(flowDao).getFlows(group, 10, ""); + assertNull(context.getFlowsFrom(group, 10, "")); + verify(flowDao, times(1)).getFlows(group, 10, ""); + } + + @Test + public void testResumeFlowWithPrepare() { + Task prepare = flow.newTask(new TaskDef("prepare", "noop", null, null), true); + flow.setPrepareTask(prepare); + prepare.setStatus(Task.Status.COMPLETED); + when(executionPreparer.resume(flow)).thenReturn(true); + + context.resumeFlow(flow); + verify(executionPreparer, times(1)).resume(flow); + verify(flowTask, times(1)).execute(flow, prepare); + } + + @Test + public void testResumeFlowWithoutPrepare() { + when(executionPreparer.resume(flow)).thenReturn(false); + + context.resumeFlow(flow); + verify(executionPreparer, times(1)).resume(flow); + verify(flowTask, times(0)).execute(any(), any()); + } + + @Test + public void testResumeFlowWithException() { + Mockito.doThrow(new RuntimeException("test")).when(executionPreparer).resume(flow); + AssertHelper.assertThrows( + "should throw and retry", + MaestroRetryableError.class, + "retry resuming flow due to an exception", + () -> context.resumeFlow(flow)); + } + + @Test + public void testTrySaveGroup() { + context.trySaveGroup(group); + verify(flowDao, times(1)).insertGroup(group); + } + + @Test + public void testTrySaveGroupWithShutdown() { + context.shutdown(); + AssertHelper.assertThrows( + "should throw during shutdown", + MaestroRetryableError.class, + "ExecutionContext is shutdown and cannot save a group and please retry.", + () -> context.trySaveGroup(group)); + } + + @Test + public void testTrySaveGroupWithException() { + Mockito.doThrow(new MaestroInternalError("test")).when(flowDao).insertGroup(group); + AssertHelper.assertThrows( + "should throw and retry", + MaestroRetryableError.class, + "insertGroup is failed and please retry", + () -> context.trySaveGroup(group)); + } + + @Test + public void testClaimGroup() { + context.claimGroup(); + verify(flowDao, times(1)).claimExpiredGroup(any(), anyLong()); + } + + @Test + public void testHeartbeatGroup() { + when(flowDao.heartbeatGroup(group)).thenReturn(true); + context.heartbeatGroup(group); + verify(flowDao, times(1)).heartbeatGroup(group); + } +} diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/FlowExecutorTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/FlowExecutorTest.java new file mode 100644 index 0000000..062a420 --- /dev/null +++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/engine/FlowExecutorTest.java @@ -0,0 +1,89 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.maestro.flow.engine; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.netflix.maestro.flow.actor.Actor; +import com.netflix.maestro.flow.actor.ActorBaseTest; +import com.netflix.maestro.flow.models.Flow; +import com.netflix.maestro.flow.models.FlowDef; +import com.netflix.maestro.flow.models.FlowGroup; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class FlowExecutorTest extends ActorBaseTest { + + private FlowExecutor executor; + + @Before + public void init() { + when(properties.getInitialMaintenanceDelayInMillis()).thenReturn(1L); + when(properties.getMaintenanceDelayInMillis()).thenReturn(100000L); + executor = new FlowExecutor(context); + } + + @Test + public void testInitWithoutClaimingGroup() { + when(context.claimGroup()).thenReturn(null); + executor.init(); + verify(context, timeout(10000)).claimGroup(); + } + + @Test + public void testInitWithClaimingGroup() { + FlowGroup group = new FlowGroup(1, 1, "testAddress"); + when(context.claimGroup()).thenReturn(group); + executor.init(); + verify(context, timeout(10000)).claimGroup(); + verify(context, times(1)).run(any()); + } + + @Test + public void testShutdown() { + executor.shutdown(); + verify(context, times(1)).shutdown(); + } + + @Test + public void testStartFlow() { + var id = executor.startFlow(1, "test-id", "wf-1", new FlowDef(), Map.of()); + verify(context, times(1)).trySaveGroup(any()); + var actorCaptor = ArgumentCaptor.forClass(Actor.class); + verify(context, times(1)).run(actorCaptor.capture()); + var flowCaptor = ArgumentCaptor.forClass(Flow.class); + verify(context, times(1)).saveFlow(flowCaptor.capture()); + assertEquals("test-id", id); + assertTrue(actorCaptor.getValue().isRunning()); + assertEquals("wf-1", flowCaptor.getValue().getReference()); + } + + @Test + public void testWakeUp() { + assertFalse(executor.wakeUp(1L, "wf-1", "task1")); + + executor.startFlow(1, "test-id", "wf-1", new FlowDef(), Map.of()); + assertTrue(executor.wakeUp(1L, "wf-1", "task1")); + assertFalse(executor.wakeUp(2L, "wf-1", "task1")); + assertTrue(executor.wakeUp(1L, "wf-2", "task1")); + } +} diff --git a/maestro-flow/src/test/java/com/netflix/maestro/flow/models/FlowTest.java b/maestro-flow/src/test/java/com/netflix/maestro/flow/models/FlowTest.java new file mode 100644 index 0000000..b433dd1 --- /dev/null +++ b/maestro-flow/src/test/java/com/netflix/maestro/flow/models/FlowTest.java @@ -0,0 +1,117 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.maestro.flow.models; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.netflix.maestro.AssertHelper; +import com.netflix.maestro.flow.FlowBaseTest; +import java.util.Map; +import java.util.Set; +import org.junit.Before; +import org.junit.Test; + +public class FlowTest extends FlowBaseTest { + + private Flow flow; + private Task task; + + @Before + public void init() { + flow = createFlow(); + task = flow.newTask(new TaskDef("task1", "noop", null, null), false); + } + + @Test + public void testAddFinishedTask() { + assertEquals(1, flow.getRunningTasks().size()); + AssertHelper.assertThrows( + "should throw if adding unfinished task", + IllegalArgumentException.class, + "must be in the terminal state", + () -> flow.addFinishedTask(task)); + task.setStatus(Task.Status.FAILED); + flow.addFinishedTask(task); + flow.addFinishedTask(task); + flow.addFinishedTask(task); + assertEquals(1, flow.getFinishedTasks().size()); + assertEquals(0, flow.getRunningTasks().size()); + assertEquals(task, flow.getFinishedTasks().stream().findFirst().get()); + } + + @Test + public void testUpdateRunningTask() { + var updatedTask = flow.newTask(new TaskDef("task1", "noop1", null, null), true); + assertEquals(Set.of("task1"), flow.getRunningTasks().keySet()); + flow.updateRunningTask(updatedTask); + assertEquals(Map.of("task1", updatedTask), flow.getRunningTasks()); + } + + @Test + public void testGetFinishedTasks() { + assertEquals(0, flow.getFinishedTasks().size()); + var snapshot = flow.getFinishedTasks(); + assertTrue(snapshot.isEmpty()); + + task.setStatus(Task.Status.FAILED); + flow.addFinishedTask(task); + assertTrue(snapshot.isEmpty()); + assertEquals(1, flow.getFinishedTasks().size()); + } + + @Test + public void testGetStreamOfAllTasks() { + assertEquals(1, flow.getStreamOfAllTasks().count()); + task.setStatus(Task.Status.FAILED); + flow.addFinishedTask(task); + assertEquals(1, flow.getStreamOfAllTasks().count()); + flow.updateRunningTask(task); + assertEquals(2, flow.getStreamOfAllTasks().count()); + } + + @Test + public void testMarkTimedout() { + flow.markTimedout(); + assertEquals(Flow.Status.TIMED_OUT, flow.getStatus()); + } + + @Test + public void testNewTask() { + assertEquals(1, flow.getRunningTasks().size()); + assertEquals(Set.of("task1"), flow.getRunningTasks().keySet()); + + var taskDef = new TaskDef("task2", "noop", null, null); + var task2 = flow.newTask(taskDef, false); + assertEquals(taskDef, task2.getTaskDef()); + assertTrue(task2.isActive()); + assertFalse(task2.isTerminal()); + assertEquals(2, flow.getRunningTasks().size()); + assertEquals(Set.of("task1", "task2"), flow.getRunningTasks().keySet()); + } + + @Test + public void testNewTaskInline() { + assertEquals(1, flow.getRunningTasks().size()); + assertEquals(Set.of("task1"), flow.getRunningTasks().keySet()); + + var taskDef = new TaskDef("task2", "noop", null, null); + var task2 = flow.newTask(taskDef, true); + assertEquals(taskDef, task2.getTaskDef()); + assertTrue(task2.isActive()); + assertFalse(task2.isTerminal()); + assertEquals(1, flow.getRunningTasks().size()); + assertEquals(Set.of("task1"), flow.getRunningTasks().keySet()); + } +} diff --git a/maestro-server/src/test/resources/samples/sample-conditional-wf.json b/maestro-server/src/test/resources/samples/sample-conditional-wf.json new file mode 100644 index 0000000..03b1303 --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-conditional-wf.json @@ -0,0 +1,30 @@ +{ + "properties": { + "owner": "tester", + "run_strategy": "parallel", + "step_concurrency": 20 + }, + "workflow": { + "id": "sample-conditional-wf", + "name": "Test conditional workflow", + "description": "Long description about this workflow", + "params": {"param0": {"value": 15, "type": "LONG"}}, + "steps": [ + {"step": {"id": "job.1", "type": "NoOp", "transition": {"successors": {"job.2": "true"}}}}, + {"step": {"id": "job.2", "type": "NoOp", + "transition": {"successors": {"job.3": "true"}}}}, + {"step": {"id": "job.3", "type": "NoOp", + "params": {"param1": {"value": 15, "type": "LONG"}, "param2": {"value": 15, "type": "LONG"}}, + "transition": {"successors": {"job.4": "param2 < 0", "job.6": "1 > 0"}}}}, + {"step": {"id": "job.4", "type": "NoOp", + "params": {"param3": {"value": 15, "type": "LONG"}}, + "transition": {"successors": {"job.5": "params.getFromStep('job.3', 'param2') > 0"}}}}, + {"step": {"id": "job.5", "type": "NoOp", + "params": {"param4": {"value": 15, "type": "LONG"}}, + "transition": {"successors": {"job.6": "params.getFromStep('job.4', 'param3') > 0"}}}}, + {"step": {"id": "job.6", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}}, + {"step": {"id": "job.7", "type": "NoOp", + "params": {"param5": {"expression": "params.get('param0')", "type": "LONG"}}}} + ] + } +} diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-10.json b/maestro-server/src/test/resources/samples/sample-dag-test-10.json new file mode 100644 index 0000000..f8c53df --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-dag-test-10.json @@ -0,0 +1,180 @@ +{ + "properties": { + "owner": "tester", + "run_strategy": "parallel", + "step_concurrency": 20, + "alerting": { + "emails": [ + "test+alertconfig_default1@netflix.com", + "${foo}+alertconfig_default2@netflix.com" + ], + "tct": { + "completed_by_hour": 1, + "tz": "UTC" + } + } + }, + "workflow": { + "id": "sample-dag-test-10", + "name": "Test workflow 10", + "description": "Long description about this workflow", + "timeout": 100, + "tags": [ + { + "name": "tester", + "namespace": "system" + }, + "test", + { + "name": "foo" + } + ], + "params": { + "foo": { + "expression": "'bar1';", + "type": "STRING" + } + }, + "steps": [ + { + "step": { + "id": "job1", + "type": "Sleep", + "transition": { + "successors": { + "job.2": "true", + "job3": "true" + } + }, + "tags": [ + { + "name": "Sleep", + "namespace": "system" + }, + "job1" + ], + "params": { + "sleep_seconds": { + "value": 1, + "type": "LONG" + } + }, + "failure_mode": "ignore_failure", + "retry_policy": { + "error_retry_limit": 10, + "platform_retry_limit": 10, + "backoff": { + "error_retry_backoff_in_secs": 1, + "platform_retry_backoff_in_secs": 1, + "type": "FIXED_BACKOFF" + } + }, + "dependencies": { + "SIGNAL": { + "definitions": [ + { + "value": { + "name": { + "value": "signal_a", + "type": "STRING" + }, + "_step_dependency_sub_type": { + "value": "input_signal", + "type": "STRING" + }, + "foo": { + "type": "SIGNAL", + "parameter": { + "value": "bar", + "type": "STRING", + "mode": "mutable" + }, + "operator": "=" + } + }, + "type": "MAP" + } + ], + "type": "SIGNAL" + } + } + } + }, + { + "step": { + "id": "job.2", + "type": "Sleep", + "transition": { + "successors": { + "job4": "true" + } + }, + "params": { + "param1": { + "expression": "sleep_seconds + '1';", + "type": "STRING" + }, + "param2": { + "value": "${job1__sleep_seconds} + 1", + "type": "STRING" + }, + "sleep_seconds": { + "expression": "job3__sleep_seconds + 1;", + "type": "LONG" + } + }, + "failure_mode": "fail_after_running", + "retry_policy": { + "error_retry_limit": 10, + "platform_retry_limit": 10, + "backoff": { + "error_retry_backoff_in_secs": 1, + "platform_retry_backoff_in_secs": 1, + "type": "FIXED_BACKOFF" + } + } + } + }, + { + "step": { + "id": "job3", + "type": "Sleep", + "transition": { + "successors": { + "job4": "true", + "job.2": "true" + } + }, + "params": { + "sleep_seconds": { + "value": 1, + "type": "LONG" + } + }, + "failure_mode": "fail_immediately", + "retry_policy": { + "error_retry_limit": 10, + "platform_retry_limit": 10, + "backoff": { + "error_retry_backoff_in_secs": 1, + "platform_retry_backoff_in_secs": 1, + "type": "FIXED_BACKOFF" + } + } + } + }, + { + "step": { + "id": "job4", + "type": "Sleep", + "params": { + "sleep_seconds": { + "value": 1, + "type": "LONG" + } + } + } + } + ] + } +} \ No newline at end of file diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-3.json b/maestro-server/src/test/resources/samples/sample-dag-test-3.json new file mode 100644 index 0000000..9985e91 --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-dag-test-3.json @@ -0,0 +1,84 @@ +{ + "properties": { + "owner": "tester", + "run_strategy": "parallel", + "step_concurrency": 20, + "alerting": { + "emails": [ + "test+alertconfig_default1@netflix.com", + "${foo}+alertconfig_default2@netflix.com" + ], + "tct": { + "completed_by_hour": 1, + "tz": "UTC" + } + } + }, + "workflow": { + "id": "sample-dag-test-3", + "name": "Test workflow 03", + "description": "Long description about this workflow", + "tags": [ + {"name": "tester", "namespace": "system"}, + "test", + {"name": "foo"}], + "params": { + "foo": {"expression": "'bar1';", "type": "STRING"} + }, + "steps": [ + {"step": {"id": "job1", "type": "Sleep", + "transition": {"successors": {"job.2": "true", "job3": "true"}}, + "tags": [ + {"name": "Sleep", "namespace": "system"}, + "job1" + ], + "params": {"sleep_seconds": {"value": 15, "type": "LONG"}}, + "failure_mode": "ignore_failure", + "retry_policy": { + "error_retry_limit": 10, + "platform_retry_limit": 10, + "backoff": { + "error_retry_backoff_in_secs": 1, + "platform_retry_backoff_in_secs": 1, + "type": "FIXED_BACKOFF" + } + } + }}, + {"step": {"id": "job.2", "type": "Sleep", + "transition": {"successors": {"job4": "true"}}, + "params": { + "param1": {"expression": "sleep_seconds + '1';", "type": "STRING"}, + "param2": {"value": "${job1__sleep_seconds} + 1", "type": "STRING"}, + "vtts_ts": {"expression": "Util.dateIntHourToTs(TARGET_RUN_DATE, TARGET_RUN_HOUR, 'UTC', 0, 0)/1000", "type": "LONG"}, + "sleep_seconds": {"expression": "job3__sleep_seconds + 1;", "type": "LONG"} + }, + "failure_mode": "fail_after_running", + "retry_policy": { + "error_retry_limit": 10, + "platform_retry_limit": 10, + "backoff": { + "error_retry_backoff_in_secs": 1, + "platform_retry_backoff_in_secs": 1, + "type": "FIXED_BACKOFF" + } + } + }}, + {"step": {"id": "job3", "type": "Sleep", + "transition": {"successors": {"job4": "true", "job.2": "true"}}, + "params": {"sleep_seconds": {"value": 15, "type": "LONG"}}, + "failure_mode": "fail_immediately", + "retry_policy": { + "error_retry_limit": 10, + "platform_retry_limit": 10, + "backoff": { + "error_retry_backoff_in_secs": 1, + "platform_retry_backoff_in_secs": 1, + "type": "FIXED_BACKOFF" + } + } + }}, + {"step": {"id": "job4", "type": "Sleep", + "params": {"sleep_seconds": {"value": 15, "type": "LONG"}}}} + ] + } +} \ No newline at end of file diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-4.json b/maestro-server/src/test/resources/samples/sample-dag-test-4.json new file mode 100644 index 0000000..027714f --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-dag-test-4.json @@ -0,0 +1,21 @@ +{ + "properties": { + "owner": "tester", + "run_strategy": "strict_sequential", + "step_concurrency": 20 + }, + "workflow": { + "id": "sample-dag-test-4", + "name": "Test workflow 04", + "description": "Long description about this workflow", + "steps": [ + {"step": {"id": "job.1", "type": "NoOp", "transition": {"successors": {"job.2": "true", "job.3": "true", "job.4": "true"}}}}, + {"step": {"id": "job.2", "type": "NoOp", "transition": {"successors": {"job.5": "true"}}}}, + {"step": {"id": "job.3", "type": "NoOp", "transition": {"successors": {"job.6": "true"}}}}, + {"step": {"id": "job.4", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}}, + {"step": {"id": "job.5", "type": "NoOp", "transition": {"successors": {"job.6": "true"}}}}, + {"step": {"id": "job.6", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}}, + {"step": {"id": "job.7", "type": "NoOp"}} + ] + } +} \ No newline at end of file diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-5.json b/maestro-server/src/test/resources/samples/sample-dag-test-5.json new file mode 100644 index 0000000..a0d0a46 --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-dag-test-5.json @@ -0,0 +1,119 @@ +{ + "properties": { + "owner": "tester", + "run_strategy": "first_only", + "step_concurrency": 20 + }, + "workflow": { + "id": "sample-dag-test-5", + "name": "Test workflow 05", + "description": "Long description about this workflow", + "params": { + "WF_RESTART_MODE": { + "expression": "WORKFLOW_RUN_POLICY = params.getFromInstance('RUN_POLICY'); if (WORKFLOW_RUN_POLICY == \"START_FRESH_NEW_RUN\") return \"run\"; else if (WORKFLOW_RUN_POLICY == \"RESTART_FROM_INCOMPLETE\") return \"resume\"; else if (WORKFLOW_RUN_POLICY == \"RESTART_FROM_START\") return \"run\"; else throw \"Unsupported restart mode: \" + WORKFLOW_RUN_POLICY;", + "type": "String", + "mode": "MUTABLE" + } + }, + "steps": [ + { + "step": { + "id": "job.1", + "type": "NoOp", + "dependencies": { + "SIGNAL": { + "definitions": [ + { + "value": { + "name": { + "value": "prod_db/foo/bar", + "type": "STRING" + }, + "vtts_utc_dateint": { + "operator": "=", + "parameter": { + "type": "STRING", + "value": "${DERIVED_RUN_DATE_UTC}" + }, + "type": "SIGNAL" + } + }, + "type": "MAP" + } + ], + "type": "SIGNAL" + } + }, + "transition": { + "successors": { + "job.2": "true", + "job.3": "true" + } + } + } + }, + { + "step": { + "id": "job.2", + "type": "NoOp", + "transition": { + "successors": { + "job.4": "true", + "job.5": "true" + } + } + } + }, + { + "step": { + "id": "job.3", + "type": "NoOp", + "transition": { + "successors": { + "job.7": "true" + } + } + } + }, + { + "step": { + "id": "job.4", + "type": "NoOp", + "transition": { + "successors": { + "job.6": "true" + } + } + } + }, + { + "step": { + "id": "job.5", + "type": "NoOp", + "transition": { + "successors": { + "job.7": "true" + } + } + } + }, + { + "step": { + "id": "job.6", + "type": "NoOp", + "transition": { + "successors": { + "job.7": "true" + } + } + } + }, + { + "step": { + "id": "job.7", + "type": "NoOp" + } + } + ] + } +} \ No newline at end of file diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-6.json b/maestro-server/src/test/resources/samples/sample-dag-test-6.json new file mode 100644 index 0000000..d53f9b0 --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-dag-test-6.json @@ -0,0 +1,22 @@ +{ + "properties": { + "owner": "tester", + "run_strategy": "last_only", + "step_concurrency": 20 + }, + "workflow": { + "id": "sample-dag-test-6", + "name": "Test workflow 06", + "description": "Long description about this workflow", + "steps": [ + {"step": {"id": "job.1", "type": "NoOp", "transition": {"successors": {"job.2": "true", "job.3": "true", "job.4": "true", "job.5": "true"}}}}, + {"step": {"id": "job.2", "type": "NoOp", "transition": {"successors": {"job.6": "true"}}}}, + {"step": {"id": "job.3", "type": "NoOp", "transition": {"successors": {"job.6": "true"}}}}, + {"step": {"id": "job.4", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}}, + {"step": {"id": "job.5", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}}, + {"step": {"id": "job.6", "type": "NoOp", "transition": {"successors": {"job.8": "true"}}}}, + {"step": {"id": "job.7", "type": "NoOp", "transition": {"successors": {"job.8": "true"}}}}, + {"step": {"id": "job.8", "type": "NoOp"}} + ] + } +} \ No newline at end of file diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-7.json b/maestro-server/src/test/resources/samples/sample-dag-test-7.json new file mode 100644 index 0000000..87ad5f7 --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-dag-test-7.json @@ -0,0 +1,29 @@ +{ + "properties": { + "owner": "tester", + "run_strategy": { + "rule": "strict_sequential" + }, + "step_concurrency": 20 + }, + "workflow": { + "id": "sample-dag-test-7", + "name": "Test workflow 07", + "description": "Long description about this workflow", + "steps": [ + {"step": {"id": "job.1", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "Long"}}, + "transition": {"successors": {"job.2": "true", "job.3": "true"}}}}, + {"step": {"id": "job.2", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "long"}}, + "transition": {"successors": {"job.4": "true"}}}}, + {"step": {"id": "job.3", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "LONG"}}, + "transition": {"successors": {"job.5": "true"}}}}, + {"step": {"id": "job.4", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "lonG"}}, + "transition": {"successors": {"job.6": "true"}}}}, + {"step": {"id": "job.5", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "loNg"}}, + "transition": {"successors": {"job.6": "true"}}}}, + {"step": {"id": "job.6", "type": "NoOp", "params": {"sleep_seconds": {"value": 15, "type": "lOng"}}}}, + {"step": {"id": "job.7", "type": "NoOp", "transition": {"successors": {"job.8": "true"}}}}, + {"step": {"id": "job.8", "type": "NoOp"}} + ] + } +} diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-8.json b/maestro-server/src/test/resources/samples/sample-dag-test-8.json new file mode 100644 index 0000000..d110c0e --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-dag-test-8.json @@ -0,0 +1,29 @@ +{ + "properties": { + "owner": "tester", + "run_strategy": { + "rule": "parallel", + "workflow_concurrency": 5 + }, + "step_concurrency": 20 + }, + "workflow": { + "id": "sample-dag-test-8", + "name": "Test workflow 08", + "description": "Long description about this workflow", + "params": {"x": {"value": 5, "type": "LONG"}, + "base_workflow_timeout": {"value": "20 minutes", "type": "STRING"}}, + "steps": [ + {"step": {"id": "job.1", "type": "NoOp", + "timeout": "${base_workflow_timeout}", + "transition": {"successors": {"job.3": "x > 10", "job.4": "x > 10"}}}}, + {"step": {"id": "job.2", "type": "NoOp", + "transition": {"successors": {"job.4": "x > 3"}}}}, + {"step": {"id": "job.3", "type": "NoOp", + "transition": {"successors": {"job.4": "x > 10"}}}}, + {"step": {"id": "job.4", "type": "NoOp", + "transition": {"successors": {"job.5": "true"}}}}, + {"step": {"id": "job.5", "type": "NoOp", "transition": {}}} + ] + } +} \ No newline at end of file diff --git a/maestro-server/src/test/resources/samples/sample-dag-test-9.json b/maestro-server/src/test/resources/samples/sample-dag-test-9.json new file mode 100644 index 0000000..17e632f --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-dag-test-9.json @@ -0,0 +1,29 @@ +{ + "properties": { + "owner": "tester", + "run_strategy": { + "rule": "last_only", + "workflow_concurrency": 1 + }, + "step_concurrency": 20 + }, + "git_info": {"branch": "foo"}, + "workflow": { + "id": "sample-dag-test-9", + "name": "Test workflow 09", + "description": "Long description about this workflow", + "steps": [ + {"step": {"id": "job.1", "name": "a","description":"b", "type": "NoOp", + "transition": {"successors": {"job.2": "true", "job.3": "true"}}}}, + {"step": {"id": "job.2", "type": "NoOp", + "transition": {"successors": {"job.4": "true", "job.5": "true", "job.6": "true"}}}}, + {"step": {"id": "job.3", "type": "NoOp", "transition": {"successors": {"job.8": "true"}}}}, + {"step": {"id": "job.4", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}}, + {"step": {"id": "job.5", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}}, + {"step": {"id": "job.6", "type": "NoOp", "transition": {"successors": {"job.7": "true"}}}}, + {"step": {"id": "job.7", "type": "NoOp", "transition": {"successors": {"job.9": "true"}}}}, + {"step": {"id": "job.8", "type": "NoOp", "transition": {"successors": {"job.9": "true"}}}}, + {"step": {"id": "job.9", "type": "NoOp"}} + ] + } +} \ No newline at end of file diff --git a/maestro-server/src/test/resources/samples/sample-large-foreach-wf.json b/maestro-server/src/test/resources/samples/sample-large-foreach-wf.json new file mode 100644 index 0000000..2a730b9 --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-large-foreach-wf.json @@ -0,0 +1,61 @@ +{ + "properties": { + "owner": "tester" + }, + "is_active": true, + "workflow": { + "id": "sample-large-foreach-wf", + "steps": [{ + "foreach": { + "id": "job1", + "concurrency": 100, + "params": { + "param1": { + "value": ["a", "b", "c"], + "type": "String_array" + }, + "loop_params": { + "value": { + "i" : { + "expression": "Util.dateIntsBetween(20200101, 20210101, 1);", + "type": "long_array", + "validator": "param!=null && param.size() > 2", "mode": "mutable" + }, + "j" : { + "expression": "param1;", "type": "STRING_ARRAY", "validator": "param!=null && param.size() > 2" + } + }, + "type": "MAP" + } + }, + "steps": [ + { + "step": { + "id": "job2", + "type": "NoOp", + "tags": [ + "NoOp" + ], + "params": { + "param2": { + "expression": "i + j;", "type": "STRING", "mode": "mutable" + } + }, + "transition": {"successors": {"job.2": "true"}}, + "retry_policy": {} + } + }, + { + "step": { + "id": "job.2", + "type": "Sleep", + "params": { + "sleep_seconds": {"expression": "1 * 1;", "type": "LONG"} + } + } + } + ] + } + }] + } +} diff --git a/maestro-server/src/test/resources/samples/sample-mixed-wf.json b/maestro-server/src/test/resources/samples/sample-mixed-wf.json new file mode 100644 index 0000000..5820eac --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-mixed-wf.json @@ -0,0 +1,67 @@ +{ + "properties": { + "owner": "tester" + }, + "is_active": true, + "workflow": { + "id": "sample-mixed-wf", + "steps": [{ + "subworkflow": { + "id": "sub-step1", + "sync": true, + "params": { + "subworkflow_id": { + "value": "sample-dag-test-3", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param1": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }}, + { + "subworkflow": { + "id": "sub-step2", + "sync": true, + "params": { + "subworkflow_id": { + "value": "sample-nested-foreach-subworkflow-wf", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param2": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }}, + { + "subworkflow": { + "id": "sub-step3", + "sync": true, + "params": { + "subworkflow_id": { + "value": "sample-nested-subworkflow-foreach-wf", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param3": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }} + ] + } +} diff --git a/maestro-server/src/test/resources/samples/sample-nested-foreach-subworkflow-wf.json b/maestro-server/src/test/resources/samples/sample-nested-foreach-subworkflow-wf.json new file mode 100644 index 0000000..ca110cf --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-nested-foreach-subworkflow-wf.json @@ -0,0 +1,72 @@ +{ + "properties": { + "owner": "tester", + "run_strategy": "parallel" + }, + "is_active": true, + "workflow": { + "id": "sample-nested-foreach-subworkflow-wf", + "steps": [{ + "foreach": { + "id": "foreach-step1", + "concurrency": 2, + "params": { + "param1": { + "value": ["a", "b"], + "type": "String_array" + }, + "loop_params": { + "value": { + "i" : { + "value": [1, 2, 3], + "type": "long_array", + "validator": "param!=null && param.size() > 2", "mode": "mutable" + }, + "j" : { + "expression": "param1;", "type": "STRING_ARRAY", "validator": "param!=null && param.size() > 2" + } + }, + "type": "MAP" + } + }, + "steps": [ + { + "step": { + "id": "job1", + "type": "NoOp", + "tags": [ + "NoOp" + ], + "params": { + "param2": { + "expression": "i + j;", "type": "STRING", "mode": "mutable" + } + }, + "transition": {"successors": {"sub-step2": "true"}}, + "retry_policy": {} + } + }, + { + "subworkflow": { + "id": "sub-step2", + "sync": true, + "params": { + "subworkflow_id": { + "value": "sample-dag-test-3", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param1": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }} + ] + } + }] + } +} diff --git a/maestro-server/src/test/resources/samples/sample-nested-foreach-wf.json b/maestro-server/src/test/resources/samples/sample-nested-foreach-wf.json new file mode 100644 index 0000000..de78751 --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-nested-foreach-wf.json @@ -0,0 +1,77 @@ +{ + "properties": { + "owner": "tester" + }, + "is_active": true, + "workflow": { + "id": "sample-nested-foreach-wf", + "steps": [{ + "foreach": { + "id": "foreach-job", + "concurrency": 2, + "params": { + "param1": { + "value": ["a", "b"], + "type": "String_array" + }, + "loop_params": { + "value": { + "i" : { + "value": [1, 2, 3], + "type": "long_array", + "validator": "param!=null && param.size() > 2", "mode": "mutable" + }, + "j" : { + "expression": "param1;", "type": "STRING_ARRAY", "validator": "param!=null && param.size() > 2" + } + }, + "type": "MAP" + } + }, + "steps": [ + { + "foreach": { + "id": "job1", + "concurrency": 3, + "params": { + "param2": { + "expression": "i + 2;", "type": "LONG", "mode": "mutable" + }, + "loop_params": { + "value": { + "x" : { + "expression": "Util.intsBetween(1, i, 1);", + "type": "long_array" + } + }, + "type": "MAP" + } + }, + "steps": [ + { + "step": { + "id": "job.1", + "type": "Sleep", + "params": { + "sleep_seconds": {"expression": "x * 5;", "type": "LONG"} + } + } + } + ], + "transition": {"successors": {"job.2": "true"}} + } + }, + { + "step": { + "id": "job.2", + "type": "Sleep", + "params": { + "sleep_seconds": {"expression": "i * 10;", "type": "LONG"} + } + } + } + ] + } + }] + } +} diff --git a/maestro-server/src/test/resources/samples/sample-nested-subworkflow-foreach-wf.json b/maestro-server/src/test/resources/samples/sample-nested-subworkflow-foreach-wf.json new file mode 100644 index 0000000..0128f0d --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-nested-subworkflow-foreach-wf.json @@ -0,0 +1,67 @@ +{ + "properties": { + "owner": "tester" + }, + "is_active": true, + "workflow": { + "id": "sample-nested-subworkflow-foreach-wf", + "steps": [{ + "subworkflow": { + "id": "sub-step1", + "sync": true, + "params": { + "subworkflow_id": { + "value": "sample-foreach-wf", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param3": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }}, + { + "subworkflow": { + "id": "sub-step2", + "sync": true, + "params": { + "subworkflow_id": { + "value": "sample-foreach-wf", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param2": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }}, + { + "subworkflow": { + "id": "sub-step3", + "sync": true, + "params": { + "subworkflow_id": { + "value": "sample-foreach-wf", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param3": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }} + ] + } +} diff --git a/maestro-server/src/test/resources/samples/sample-nested-subworkflow-wf.json b/maestro-server/src/test/resources/samples/sample-nested-subworkflow-wf.json new file mode 100644 index 0000000..d4944e9 --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-nested-subworkflow-wf.json @@ -0,0 +1,28 @@ +{ + "properties": { + "owner": "tester" + }, + "is_active": true, + "workflow": { + "id": "sample-nested-subworkflow-wf", + "steps": [{ + "subworkflow": { + "id": "sub-step1", + "sync": true, + "params": { + "subworkflow_id": { + "value": "sample-subworkflow-wf", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param1": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }}] + } +} diff --git a/maestro-server/src/test/resources/samples/sample-subworkflow-wf.json b/maestro-server/src/test/resources/samples/sample-subworkflow-wf.json new file mode 100644 index 0000000..784afbf --- /dev/null +++ b/maestro-server/src/test/resources/samples/sample-subworkflow-wf.json @@ -0,0 +1,124 @@ +{ + "properties": { + "owner": "tester" + }, + "is_active": true, + "workflow": { + "id": "sample-subworkflow-wf", + "instance_step_concurrency": 3, + "steps": [{ + "subworkflow": { + "id": "job1", + "params": { + "subworkflow_id": { + "value": "sample-dag-test-3", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param1": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }}, + { + "subworkflow": { + "id": "job2", + "sync": true, + "params": { + "subworkflow_id": { + "value": "sample-dag-test-3", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param1": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }}, + { + "subworkflow": { + "id": "job3", + "sync": false, + "params": { + "subworkflow_id": { + "value": "sample-dag-test-3", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param1": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }}, + { + "subworkflow": { + "id": "job4", + "sync": true, + "explicit_params": false, + "params": { + "subworkflow_id": { + "value": "sample-dag-test-3", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param1": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }}, + { + "subworkflow": { + "id": "job5", + "sync": true, + "explicit_params": true, + "params": { + "subworkflow_id": { + "value": "sample-dag-test-3", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param1": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }},{ + "subworkflow": { + "id": "job6", + "sync": true, + "params": { + "subworkflow_id": { + "value": "sample-dag-test-3", + "type": "STRING" + }, + "subworkflow_version": { + "value": "latest", + "type": "STRING" + }, + "param1": { + "value": {"foo": "bar"}, + "type": "STRING_MAP" + } + } + }}] + } +}