Skip to content

Commit

Permalink
[FLINK-35347][table] Introduce embedded scheduler and WorkflowSchedul…
Browse files Browse the repository at this point in the history
…er plugin for materialized table
  • Loading branch information
lsyldliu committed May 22, 2024
1 parent f1ecb9e commit 3adc16c
Show file tree
Hide file tree
Showing 50 changed files with 3,497 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -319,4 +321,30 @@ FunctionDefinition getFunctionDefinition(
*/
List<String> completeStatement(SessionHandle sessionHandle, String statement, int position)
throws SqlGatewayException;

// -------------------------------------------------------------------------------------------
// Materialized Table API
// -------------------------------------------------------------------------------------------

/**
* Trigger a refresh operation of specific materialized table.
*
* @param sessionHandle handle to identify the session.
* @param materializedTableIdentifier A fully qualified materialized table identifier:
* 'catalogName.databaseName.objectName', used for locating the materialized table in
* catalog.
* @param isPeriodic Represents whether the workflow is refreshed periodically or one-time-only.
* @param scheduleTime The time point at which the scheduler triggers this refresh operation.
* @param staticPartitions The specific partitions for one-time-only refresh workflow.
* @param executionConfig The flink job config.
* @return handle to identify the operation.
*/
OperationHandle refreshMaterializedTable(
SessionHandle sessionHandle,
String materializedTableIdentifier,
boolean isPeriodic,
@Nullable String scheduleTime,
Map<String, String> dynamicOptions,
Map<String, String> staticPartitions,
Map<String, String> executionConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;

import org.jetbrains.annotations.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -187,6 +189,18 @@ public List<String> completeStatement(
throw new UnsupportedOperationException();
}

@Override
public OperationHandle refreshMaterializedTable(
SessionHandle sessionHandle,
String materializedTableIdentifier,
boolean isPeriodic,
@Nullable String scheduleTime,
Map<String, String> dynamicOptions,
Map<String, String> staticPartitions,
Map<String, String> executionConfig) {
throw new UnsupportedOperationException();
}

@Override
public ResolvedCatalogBaseTable<?> getTable(
SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)
Expand Down
30 changes: 30 additions & 0 deletions flink-table/flink-sql-gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,24 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
<version>${cronutils.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -153,8 +171,20 @@
<artifactSet>
<includes combine.children="append">
<include>org.apache.flink:flink-sql-gateway-api</include>
<include>org.quartz-scheduler:quartz</include>
<include>com.cronutils:cron-utils</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.quartz</pattern>
<shadedPattern>org.apache.flink.shaded.org.quartz</shadedPattern>
</relocation>
<relocation>
<pattern>com.cronutils</pattern>
<shadedPattern>org.apache.flink.shaded.com.cronutils</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
import org.apache.flink.table.gateway.rest.handler.materializedtable.RefreshMaterializedTableHandler;
import org.apache.flink.table.gateway.rest.handler.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowHandler;
import org.apache.flink.table.gateway.rest.handler.materializedtable.scheduler.DeleteEmbeddedSchedulerWorkflowHandler;
import org.apache.flink.table.gateway.rest.handler.materializedtable.scheduler.ResumeEmbeddedSchedulerWorkflowHandler;
import org.apache.flink.table.gateway.rest.handler.materializedtable.scheduler.SuspendEmbeddedSchedulerWorkflowHandler;
import org.apache.flink.table.gateway.rest.handler.operation.CancelOperationHandler;
import org.apache.flink.table.gateway.rest.handler.operation.CloseOperationHandler;
import org.apache.flink.table.gateway.rest.handler.operation.GetOperationStatusHandler;
Expand All @@ -37,6 +42,11 @@
import org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler;
import org.apache.flink.table.gateway.rest.handler.util.GetApiVersionHandler;
import org.apache.flink.table.gateway.rest.handler.util.GetInfoHandler;
import org.apache.flink.table.gateway.rest.header.materializedtable.RefreshMaterializedTableHeaders;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.DeleteEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.ResumeEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.SuspendEmbeddedSchedulerWorkflowHeaders;
import org.apache.flink.table.gateway.rest.header.operation.CancelOperationHeaders;
import org.apache.flink.table.gateway.rest.header.operation.CloseOperationHeaders;
import org.apache.flink.table.gateway.rest.header.operation.GetOperationStatusHeaders;
Expand All @@ -50,6 +60,7 @@
import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import org.apache.flink.table.gateway.rest.header.util.GetApiVersionHeaders;
import org.apache.flink.table.gateway.rest.header.util.GetInfoHeaders;
import org.apache.flink.table.gateway.workflow.scheduler.EmbeddedQuartzScheduler;
import org.apache.flink.util.ConfigurationException;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
Expand All @@ -63,11 +74,13 @@
public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGatewayEndpoint {

public final SqlGatewayService service;
private final EmbeddedQuartzScheduler quartzScheduler;

public SqlGatewayRestEndpoint(Configuration configuration, SqlGatewayService sqlGatewayService)
throws IOException, ConfigurationException {
super(configuration);
service = sqlGatewayService;
quartzScheduler = new EmbeddedQuartzScheduler();
}

@Override
Expand All @@ -78,6 +91,8 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
addOperationRelatedHandlers(handlers);
addUtilRelatedHandlers(handlers);
addStatementRelatedHandlers(handlers);
addMaterializedRelatedHandlers(handlers);
addInMemorySchedulerRelatedHandlers(handlers);
return handlers;
}

Expand Down Expand Up @@ -181,11 +196,69 @@ private void addStatementRelatedHandlers(
service, responseHeaders, FetchResultsHeaders.getInstanceV1())));
}

private void addMaterializedRelatedHandlers(
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) {
// Refresh materialized table
RefreshMaterializedTableHandler refreshMaterializedTableHandler =
new RefreshMaterializedTableHandler(
service, responseHeaders, RefreshMaterializedTableHeaders.getInstance());
handlers.add(
Tuple2.of(
RefreshMaterializedTableHeaders.getInstance(),
refreshMaterializedTableHandler));
}

private void addInMemorySchedulerRelatedHandlers(
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) {
// create workflow
CreateEmbeddedSchedulerWorkflowHandler createHandler =
new CreateEmbeddedSchedulerWorkflowHandler(
service,
quartzScheduler,
responseHeaders,
CreateEmbeddedSchedulerWorkflowHeaders.getInstance());
handlers.add(
Tuple2.of(CreateEmbeddedSchedulerWorkflowHeaders.getInstance(), createHandler));

// suspend workflow
SuspendEmbeddedSchedulerWorkflowHandler suspendHandler =
new SuspendEmbeddedSchedulerWorkflowHandler(
service,
quartzScheduler,
responseHeaders,
SuspendEmbeddedSchedulerWorkflowHeaders.getInstance());
handlers.add(
Tuple2.of(SuspendEmbeddedSchedulerWorkflowHeaders.getInstance(), suspendHandler));

// resume workflow
ResumeEmbeddedSchedulerWorkflowHandler resumeHandler =
new ResumeEmbeddedSchedulerWorkflowHandler(
service,
quartzScheduler,
responseHeaders,
ResumeEmbeddedSchedulerWorkflowHeaders.getInstance());
handlers.add(
Tuple2.of(ResumeEmbeddedSchedulerWorkflowHeaders.getInstance(), resumeHandler));

// delete workflow
DeleteEmbeddedSchedulerWorkflowHandler deleteHandler =
new DeleteEmbeddedSchedulerWorkflowHandler(
service,
quartzScheduler,
responseHeaders,
DeleteEmbeddedSchedulerWorkflowHeaders.getInstance());
handlers.add(
Tuple2.of(DeleteEmbeddedSchedulerWorkflowHeaders.getInstance(), deleteHandler));
}

@Override
protected void startInternal() {}
protected void startInternal() {
quartzScheduler.start();
}

@Override
public void stop() throws Exception {
super.close();
quartzScheduler.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.gateway.rest.handler.materializedtable;

import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
import org.apache.flink.table.gateway.rest.message.materializedtable.MaterializedTableIdentifierParameter;
import org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableParameters;
import org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableRequestBody;
import org.apache.flink.table.gateway.rest.message.materializedtable.RefreshMaterializedTableResponseBody;
import org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/** Handler to execute materialized table refresh operation. */
public class RefreshMaterializedTableHandler
extends AbstractSqlGatewayRestHandler<
RefreshMaterializedTableRequestBody,
RefreshMaterializedTableResponseBody,
RefreshMaterializedTableParameters> {

public RefreshMaterializedTableHandler(
SqlGatewayService service,
Map<String, String> responseHeaders,
MessageHeaders<
RefreshMaterializedTableRequestBody,
RefreshMaterializedTableResponseBody,
RefreshMaterializedTableParameters>
messageHeaders) {
super(service, responseHeaders, messageHeaders);
}

@Override
protected CompletableFuture<RefreshMaterializedTableResponseBody> handleRequest(
@Nullable SqlGatewayRestAPIVersion version,
@Nonnull HandlerRequest<RefreshMaterializedTableRequestBody> request)
throws RestHandlerException {
SessionHandle sessionHandle = request.getPathParameter(SessionHandleIdPathParameter.class);
String materializedTableIdentifier =
request.getPathParameter(MaterializedTableIdentifierParameter.class);
boolean isPeriodic = request.getRequestBody().isPeriodic();
String scheduleTime = request.getRequestBody().getScheduleTime();
Map<String, String> dynamicOptions = request.getRequestBody().getDynamicOptions();
Map<String, String> staticPartitions = request.getRequestBody().getStaticPartitions();
Map<String, String> executionConfig = request.getRequestBody().getExecutionConfig();
OperationHandle operationHandle =
service.refreshMaterializedTable(
sessionHandle,
materializedTableIdentifier,
isPeriodic,
scheduleTime,
dynamicOptions == null ? Collections.emptyMap() : dynamicOptions,
staticPartitions == null ? Collections.emptyMap() : staticPartitions,
executionConfig == null ? Collections.emptyMap() : executionConfig);

return CompletableFuture.completedFuture(
new RefreshMaterializedTableResponseBody(
operationHandle.getIdentifier().toString()));
}
}
Loading

0 comments on commit 3adc16c

Please sign in to comment.