Skip to content

Commit

Permalink
[FLINK-35691][table] Fix drop table statement can drop materialized t…
Browse files Browse the repository at this point in the history
…able
  • Loading branch information
lsyldliu committed Jun 26, 2024
1 parent d4d15fb commit e2f0665
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,29 @@ void testDropMaterializedTableInContinuousMode() throws Exception {
List<RowData> jobResults = fetchAllResults(service, sessionHandle, describeJobHandle);
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");

// Drop materialized table using drop table statement
String dropTableUsingMaterializedTableDDL = "DROP TABLE users_shops";
OperationHandle dropTableUsingMaterializedTableHandle =
service.executeStatement(
sessionHandle, dropTableUsingMaterializedTableDDL, -1, new Configuration());

assertThatThrownBy(
() ->
awaitOperationTermination(
service,
sessionHandle,
dropTableUsingMaterializedTableHandle))
.rootCause()
.isInstanceOf(ValidationException.class)
.hasMessage(
String.format(
"Table with identifier '%s' does not exist.",
ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops")
.asSummaryString()));

// drop materialized table
String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS users_shops";
OperationHandle dropMaterializedTableHandle =
Expand Down Expand Up @@ -1091,6 +1114,29 @@ void testDropMaterializedTableInFullMode() throws Exception {
// verify refresh workflow is created
assertThat(embeddedWorkflowScheduler.getQuartzScheduler().checkExists(jobKey)).isTrue();

// Drop materialized table using drop table statement
String dropTableUsingMaterializedTableDDL = "DROP TABLE users_shops";
OperationHandle dropTableUsingMaterializedTableHandle =
service.executeStatement(
sessionHandle, dropTableUsingMaterializedTableDDL, -1, new Configuration());

assertThatThrownBy(
() ->
awaitOperationTermination(
service,
sessionHandle,
dropTableUsingMaterializedTableHandle))
.rootCause()
.isInstanceOf(ValidationException.class)
.hasMessage(
String.format(
"Table with identifier '%s' does not exist.",
ObjectIdentifier.of(
fileSystemCatalogName,
TEST_DEFAULT_DATABASE,
"users_shops")
.asSummaryString()));

// drop materialized table
String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS users_shops";
OperationHandle dropMaterializedTableHandle =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,19 @@ public void alterTable(
* exist.
*/
public void dropTable(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) {
dropTableInternal(objectIdentifier, ignoreIfNotExists, true);
dropTableInternal(objectIdentifier, ignoreIfNotExists, true, false);
}

/**
* Drops a materialized table in a given fully qualified path.
*
* @param objectIdentifier The fully qualified path of the materialized table to drop.
* @param ignoreIfNotExists If false exception will be thrown if the table to drop does not
* exist.
*/
public void dropMaterializedTable(
ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) {
dropTableInternal(objectIdentifier, ignoreIfNotExists, true, true);
}

/**
Expand All @@ -1268,16 +1280,19 @@ public void dropTable(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExis
* exist.
*/
public void dropView(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) {
dropTableInternal(objectIdentifier, ignoreIfNotExists, false);
dropTableInternal(objectIdentifier, ignoreIfNotExists, false, false);
}

private void dropTableInternal(
ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists, boolean isDropTable) {
ObjectIdentifier objectIdentifier,
boolean ignoreIfNotExists,
boolean isDropTable,
boolean isDropMaterializedTable) {
Predicate<CatalogBaseTable> filter =
isDropTable
? table ->
table instanceof CatalogTable
|| table instanceof CatalogMaterializedTable
? isDropMaterializedTable
? table -> table instanceof CatalogMaterializedTable
: table -> table instanceof CatalogTable
: table -> table instanceof CatalogView;
// Same name temporary table or view exists.
if (filter.test(temporaryTables.get(objectIdentifier))) {
Expand Down Expand Up @@ -1317,7 +1332,8 @@ private void dropTableInternal(
ignoreIfNotExists,
"DropTable");
} else if (!ignoreIfNotExists) {
String tableOrView = isDropTable ? "Table" : "View";
String tableOrView =
isDropTable ? isDropMaterializedTable ? "Materialized Table" : "Table" : "View";
throw new ValidationException(
String.format(
"%s with identifier '%s' does not exist.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,35 @@
package org.apache.flink.table.operations.materializedtable;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropOperation;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

/** Operation to describe a DROP MATERIALIZED TABLE statement. */
@Internal
public class DropMaterializedTableOperation extends DropTableOperation
implements MaterializedTableOperation {
public class DropMaterializedTableOperation implements DropOperation, MaterializedTableOperation {

private final ObjectIdentifier tableIdentifier;
private final boolean ifExists;

public DropMaterializedTableOperation(ObjectIdentifier tableIdentifier, boolean ifExists) {
super(tableIdentifier, ifExists, false);
this.tableIdentifier = tableIdentifier;
this.ifExists = ifExists;
}

public ObjectIdentifier getTableIdentifier() {
return tableIdentifier;
}

public boolean isIfExists() {
return ifExists;
}

@Override
Expand All @@ -49,4 +62,10 @@ public String asSummaryString() {
Collections.emptyList(),
Operation::asSummaryString);
}

@Override
public TableResultInternal execute(Context ctx) {
ctx.getCatalogManager().dropMaterializedTable(getTableIdentifier(), isIfExists());
return TableResultImpl.TABLE_RESULT_OK;
}
}

0 comments on commit e2f0665

Please sign in to comment.