diff --git a/maestro-common/src/main/java/com/netflix/maestro/exceptions/MaestroDatabaseError.java b/maestro-common/src/main/java/com/netflix/maestro/exceptions/MaestroDatabaseError.java new file mode 100644 index 0000000..3cb05a8 --- /dev/null +++ b/maestro-common/src/main/java/com/netflix/maestro/exceptions/MaestroDatabaseError.java @@ -0,0 +1,21 @@ +package com.netflix.maestro.exceptions; + +import com.netflix.maestro.models.error.Details; +import lombok.Getter; + +@Getter +public class MaestroDatabaseError extends MaestroRuntimeException { + private static final long serialVersionUID = 7334668492533395123L; + + private final Details details; + + /** + * Constructor with error message and details. + * + * @param cause cause exception + */ + public MaestroDatabaseError(Throwable cause, String msg) { + super(Code.INTERNAL_ERROR, msg, cause); + this.details = Details.create(cause, false, msg); + } +} diff --git a/maestro-database/gradle.lockfile b/maestro-database/gradle.lockfile new file mode 100644 index 0000000..9165152 --- /dev/null +++ b/maestro-database/gradle.lockfile @@ -0,0 +1,12 @@ +# This is a Gradle generated file for dependency locking. +# Manual edits can break the build and are not advised. +# This file is expected to be part of source control. +com.fasterxml.jackson.core:jackson-annotations:2.15.4=compileClasspath +com.fasterxml.jackson.core:jackson-core:2.15.4=compileClasspath +com.fasterxml.jackson.core:jackson-databind:2.15.4=compileClasspath +com.fasterxml.jackson:jackson-bom:2.15.4=compileClasspath +com.zaxxer:HikariCP:4.0.3=compileClasspath +org.flywaydb:flyway-core:7.6.0=compileClasspath +org.projectlombok:lombok:1.18.36=annotationProcessor,compileClasspath +org.slf4j:slf4j-api:1.7.30=compileClasspath +empty= diff --git a/maestro-database/src/main/java/com/netflix/maestro/database/AbstractDatabaseDao.java b/maestro-database/src/main/java/com/netflix/maestro/database/AbstractDatabaseDao.java index 41605ae..8f480a6 100644 --- a/maestro-database/src/main/java/com/netflix/maestro/database/AbstractDatabaseDao.java +++ b/maestro-database/src/main/java/com/netflix/maestro/database/AbstractDatabaseDao.java @@ -20,6 +20,7 @@ import com.netflix.maestro.database.utils.ResultProcessor; import com.netflix.maestro.database.utils.StatementFunction; import com.netflix.maestro.database.utils.StatementPreparer; +import com.netflix.maestro.exceptions.MaestroDatabaseError; import com.netflix.maestro.exceptions.MaestroInternalError; import com.netflix.maestro.exceptions.MaestroUnprocessableEntityException; import com.netflix.maestro.metrics.MaestroMetrics; @@ -289,7 +290,7 @@ protected R withRetryableTransaction(final ConnectionFunction function) { ERROR_TYPE_TAG_NAME, "retryable_transaction_error_" + e.getSQLState()); } - throw new MaestroInternalError(e, e.getMessage()); + throw new MaestroDatabaseError(e, e.getMessage()); } catch (InterruptedException e) { LOG.warn("InterruptedException exception occurred: message = [{}]", e.getMessage()); Thread.currentThread().interrupt(); diff --git a/maestro-flow/gradle.lockfile b/maestro-flow/gradle.lockfile new file mode 100644 index 0000000..5b6dbf7 --- /dev/null +++ b/maestro-flow/gradle.lockfile @@ -0,0 +1,12 @@ +# This is a Gradle generated file for dependency locking. +# Manual edits can break the build and are not advised. +# This file is expected to be part of source control. +com.fasterxml.jackson.core:jackson-annotations:2.18.2=compileClasspath +com.fasterxml.jackson.core:jackson-core:2.18.2=compileClasspath +com.fasterxml.jackson.core:jackson-databind:2.15.4=compileClasspath +com.fasterxml.jackson:jackson-bom:2.18.2=compileClasspath +com.zaxxer:HikariCP:4.0.3=compileClasspath +org.flywaydb:flyway-core:7.6.0=compileClasspath +org.projectlombok:lombok:1.18.36=annotationProcessor,compileClasspath +org.slf4j:slf4j-api:1.7.30=compileClasspath +empty= diff --git a/maestro-flow/src/main/java/com/netflix/maestro/flow/dao/MaestroFlowDao.java b/maestro-flow/src/main/java/com/netflix/maestro/flow/dao/MaestroFlowDao.java index 67256cb..be0949f 100644 --- a/maestro-flow/src/main/java/com/netflix/maestro/flow/dao/MaestroFlowDao.java +++ b/maestro-flow/src/main/java/com/netflix/maestro/flow/dao/MaestroFlowDao.java @@ -7,6 +7,7 @@ import com.netflix.maestro.flow.properties.FlowEngineProperties; import com.netflix.maestro.metrics.MaestroMetrics; import com.netflix.maestro.utils.Checks; +import java.sql.ResultSet; import java.util.ArrayList; import java.util.List; import javax.sql.DataSource; @@ -36,6 +37,8 @@ public class MaestroFlowDao extends AbstractDatabaseDao { + "RETURNING group_id,generation"; private static final String ADD_FLOW_GROUP_QUERY = "INSERT INTO maestro_flow_group (group_id,generation,address) VALUES (?,?,?)"; + private static final String GET_FLOW_WITH_SAME_KEYS_QUERY = + "SELECT 1 FROM maestro_flow WHERE group_id=? AND flow_id=? LIMIT 1"; public MaestroFlowDao( DataSource dataSource, @@ -216,4 +219,21 @@ public void insertGroup(FlowGroup group) { Checks.checkTrue( res == 1, "Insert flow group row count for [%s] is not 1 but %s", group.groupId(), res); } + + /** Used to get if there is any flow instance with this flow keys. */ + public boolean existFlowWithSameKeys(long groupId, String flowId) { + return withMetricLogError( + () -> + withRetryableQuery( + GET_FLOW_WITH_SAME_KEYS_QUERY, + stmt -> { + stmt.setLong(1, groupId); + stmt.setString(2, flowId); + }, + ResultSet::next), + "existFlowWithSameKeys", + "Failed to check the existence of the flow instance [{}][{}]", + groupId, + flowId); + } } diff --git a/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/ExecutionContext.java b/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/ExecutionContext.java index c30e91d..f1ea28a 100644 --- a/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/ExecutionContext.java +++ b/maestro-flow/src/main/java/com/netflix/maestro/flow/engine/ExecutionContext.java @@ -122,7 +122,7 @@ public void refresh(Flow flow) { /** Run the flow's final callback function. */ public void finalCall(Flow flow) { - if (flow.getFlowDef().isFlowFinalCallbackEnabled()) { + if (flow.getFlowDef().isFinalFlowStatusCallbackEnabled()) { if (flow.getStatus().isSuccessful()) { finalCallback.onFlowCompleted(flow); } else { diff --git a/maestro-flow/src/main/java/com/netflix/maestro/flow/models/FlowDef.java b/maestro-flow/src/main/java/com/netflix/maestro/flow/models/FlowDef.java index 614f041..c601d60 100644 --- a/maestro-flow/src/main/java/com/netflix/maestro/flow/models/FlowDef.java +++ b/maestro-flow/src/main/java/com/netflix/maestro/flow/models/FlowDef.java @@ -16,5 +16,5 @@ public class FlowDef { private List> tasks; // tasks in the flow graph private long timeoutInMillis; - private boolean flowFinalCallbackEnabled; + private boolean finalFlowStatusCallbackEnabled; }