Skip to content

Commit

Permalink
[FLINK-35691][table] Fix repeated suspend and resume materialized tab…
Browse files Browse the repository at this point in the history
…le logic
  • Loading branch information
lsyldliu committed Jun 25, 2024
1 parent efdcd36 commit 6adc005
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ private ResultFetcher callAlterMaterializedTableSuspend(
CatalogMaterializedTable materializedTable =
getCatalogMaterializedTable(operationExecutor, tableIdentifier);

// Initialization phase doesn't support resume operation.
if (CatalogMaterializedTable.RefreshStatus.INITIALIZING
== materializedTable.getRefreshStatus()) {
throw new SqlExecutionException(
String.format(
"Materialized table %s is being initialized and does not support suspend operation.",
tableIdentifier));
}

if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
suspendContinuousRefreshJob(
operationExecutor, handle, tableIdentifier, materializedTable);
Expand All @@ -313,6 +322,14 @@ private void suspendContinuousRefreshJob(
ContinuousRefreshHandler refreshHandler =
deserializeContinuousHandler(materializedTable.getSerializedRefreshHandler());

if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
== materializedTable.getRefreshStatus()) {
throw new SqlExecutionException(
String.format(
"Materialized table %s continuous refresh job has been suspended, jobId is %s.",
tableIdentifier, refreshHandler.getJobId()));
}

String savepointPath =
stopJobWithSavepoint(operationExecutor, handle, refreshHandler.getJobId());

Expand Down Expand Up @@ -344,6 +361,14 @@ private void suspendRefreshWorkflow(
OperationHandle handle,
ObjectIdentifier tableIdentifier,
CatalogMaterializedTable materializedTable) {
if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
== materializedTable.getRefreshStatus()) {
throw new SqlExecutionException(
String.format(
"Materialized table %s refresh workflow has been suspended.",
tableIdentifier));
}

if (workflowScheduler == null) {
throw new SqlExecutionException(
"The workflow scheduler must be configured when suspending materialized table in full refresh mode.");
Expand Down Expand Up @@ -384,6 +409,15 @@ private ResultFetcher callAlterMaterializedTableResume(
CatalogMaterializedTable catalogMaterializedTable =
getCatalogMaterializedTable(operationExecutor, tableIdentifier);

// Initialization phase doesn't support resume operation.
if (CatalogMaterializedTable.RefreshStatus.INITIALIZING
== catalogMaterializedTable.getRefreshStatus()) {
throw new SqlExecutionException(
String.format(
"Materialized table %s is being initialized and does not support resume operation.",
tableIdentifier));
}

if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
== catalogMaterializedTable.getRefreshMode()) {
resumeContinuousRefreshJob(
Expand Down Expand Up @@ -414,6 +448,18 @@ private void resumeContinuousRefreshJob(
deserializeContinuousHandler(
catalogMaterializedTable.getSerializedRefreshHandler());

// Repeated resume continuous refresh job is not supported
if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
== catalogMaterializedTable.getRefreshStatus()) {
JobStatus jobStatus = getJobStatus(operationExecutor, handle, refreshHandler);
if (isJobActivated(jobStatus)) {
throw new SqlExecutionException(
String.format(
"Materialized table %s continuous refresh job has been resumed, jobId is %s.",
tableIdentifier, refreshHandler.getJobId()));
}
}

Optional<String> restorePath = refreshHandler.getRestorePath();
try {
executeContinuousRefreshJob(
Expand All @@ -438,6 +484,15 @@ private void resumeRefreshWorkflow(
ObjectIdentifier tableIdentifier,
CatalogMaterializedTable catalogMaterializedTable,
Map<String, String> dynamicOptions) {
// Repeated resume refresh workflow is not supported
if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
== catalogMaterializedTable.getRefreshStatus()) {
throw new SqlExecutionException(
String.format(
"Materialized table %s refresh workflow has been resumed.",
tableIdentifier));
}

if (workflowScheduler == null) {
throw new SqlExecutionException(
"The workflow scheduler must be configured when resuming materialized table in full refresh mode.");
Expand Down Expand Up @@ -981,6 +1036,13 @@ protected static String getInsertStatement(
definitionQuery);
}

private static boolean isJobActivated(JobStatus jobStatus) {
return JobStatus.INITIALIZING == jobStatus
|| JobStatus.CREATED == jobStatus
|| JobStatus.RUNNING == jobStatus
|| JobStatus.RESTARTING == jobStatus;
}

private static String generateTableWithDynamicOptions(
ObjectIdentifier objectIdentifier, Map<String, String> dynamicOptions) {
StringBuilder builder = new StringBuilder(objectIdentifier.asSerializableString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,114 @@ void testAlterMaterializedTableWithoutSavepointDirConfiguredInContinuousMode()
"Savepoint directory is not configured, can't stop job with savepoint.");
}

@Test
void testAlterMaterializedTableWithRepeatedSuspendAndResumeInContinuousMode(
@TempDir Path temporaryPath) throws Exception {
String materializedTableDDL =
"CREATE MATERIALIZED TABLE users_shops"
+ " PARTITIONED BY (ds)\n"
+ " WITH(\n"
+ " 'format' = 'debezium-json'\n"
+ " )\n"
+ " FRESHNESS = INTERVAL '30' SECOND\n"
+ " AS SELECT \n"
+ " user_id,\n"
+ " shop_id,\n"
+ " ds,\n"
+ " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n"
+ " SUM (1) AS pv\n"
+ " FROM (\n"
+ " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource"
+ " ) AS tmp\n"
+ " GROUP BY (user_id, shop_id, ds)";

OperationHandle materializedTableHandle =
service.executeStatement(
sessionHandle, materializedTableDDL, -1, new Configuration());
awaitOperationTermination(service, sessionHandle, materializedTableHandle);

ResolvedCatalogMaterializedTable activeMaterializedTable =
(ResolvedCatalogMaterializedTable)
service.getTable(
sessionHandle,
ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops"));
waitUntilAllTasksAreRunning(
restClusterClient,
JobID.fromHexString(
ContinuousRefreshHandlerSerializer.INSTANCE
.deserialize(
activeMaterializedTable.getSerializedRefreshHandler(),
getClass().getClassLoader())
.getJobId()));

// suspend materialized table
String savepointDir = temporaryPath.toString();
String alterJobSavepointDDL =
String.format(
"SET 'execution.checkpointing.savepoint-dir' = 'file://%s'", savepointDir);
OperationHandle alterMaterializedTableSavepointHandle =
service.executeStatement(
sessionHandle, alterJobSavepointDDL, -1, new Configuration());
awaitOperationTermination(service, sessionHandle, alterMaterializedTableSavepointHandle);

// suspend materialized table
String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND";
OperationHandle alterMaterializedTableSuspendHandle =
service.executeStatement(
sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration());
awaitOperationTermination(service, sessionHandle, alterMaterializedTableSuspendHandle);

// verify repeated suspend materialized table
OperationHandle repeatedAlterMaterializedTableSuspendHandle =
service.executeStatement(
sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration());
assertThatThrownBy(
() ->
awaitOperationTermination(
service,
sessionHandle,
repeatedAlterMaterializedTableSuspendHandle))
.rootCause()
.isInstanceOf(SqlExecutionException.class)
.hasMessageContaining(
String.format(
"Materialized table %s continuous refresh job has been suspended",
ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops")));

// resume materialized table
String alterMaterializedTableResumeDDL = "ALTER MATERIALIZED TABLE users_shops RESUME";
OperationHandle alterMaterializedTableResumeHandle =
service.executeStatement(
sessionHandle, alterMaterializedTableResumeDDL, -1, new Configuration());
awaitOperationTermination(service, sessionHandle, alterMaterializedTableResumeHandle);

// verify repeated resume materialized table
OperationHandle repeatedAlterMaterializedTableResumeHandle =
service.executeStatement(
sessionHandle, alterMaterializedTableResumeDDL, -1, new Configuration());
assertThatThrownBy(
() ->
awaitOperationTermination(
service,
sessionHandle,
repeatedAlterMaterializedTableResumeHandle))
.rootCause()
.isInstanceOf(SqlExecutionException.class)
.hasMessageContaining(
String.format(
"Materialized table %s continuous refresh job has been resumed",
ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops")));
}

@Test
void testAlterMaterializedTableSuspendAndResumeInFullMode() throws Exception {
createAndVerifyCreateMaterializedTableWithData(
Expand Down Expand Up @@ -753,6 +861,79 @@ void testAlterMaterializedTableSuspendAndResumeInFullMode() throws Exception {
.containsEntry("debezium-json.ignore-parse-errors", "true");
}

@Test
void testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws Exception {
createAndVerifyCreateMaterializedTableWithData(
"users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL);

ResolvedCatalogMaterializedTable activeMaterializedTable =
(ResolvedCatalogMaterializedTable)
service.getTable(
sessionHandle,
ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops"));

assertThat(activeMaterializedTable.getRefreshStatus())
.isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);

// suspend materialized table
String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND";
OperationHandle alterMaterializedTableSuspendHandle =
service.executeStatement(
sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration());
awaitOperationTermination(service, sessionHandle, alterMaterializedTableSuspendHandle);

// repeated suspend materialized table
OperationHandle repeatedAlterMaterializedTableSuspendHandle =
service.executeStatement(
sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration());
assertThatThrownBy(
() ->
awaitOperationTermination(
service,
sessionHandle,
repeatedAlterMaterializedTableSuspendHandle))
.rootCause()
.isInstanceOf(SqlExecutionException.class)
.hasMessageContaining(
String.format(
"Materialized table %s refresh workflow has been suspended.",
ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops")));

// resume materialized table
String alterMaterializedTableResumeDDL =
"ALTER MATERIALIZED TABLE users_shops RESUME WITH ('debezium-json.ignore-parse-errors' = 'true')";
OperationHandle alterMaterializedTableResumeHandle =
service.executeStatement(
sessionHandle, alterMaterializedTableResumeDDL, -1, new Configuration());
awaitOperationTermination(service, sessionHandle, alterMaterializedTableResumeHandle);

// verify repeated resume materialized table
OperationHandle repeatedAlterMaterializedTableResumeHandle =
service.executeStatement(
sessionHandle, alterMaterializedTableResumeDDL, -1, new Configuration());
assertThatThrownBy(
() ->
awaitOperationTermination(
service,
sessionHandle,
repeatedAlterMaterializedTableResumeHandle))
.rootCause()
.isInstanceOf(SqlExecutionException.class)
.hasMessageContaining(
String.format(
"Materialized table %s refresh workflow has been resumed.",
ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops")));
}

@Test
void testDropMaterializedTableInContinuousMode() throws Exception {
String materializedTableDDL =
Expand Down

0 comments on commit 6adc005

Please sign in to comment.