diff --git a/hetu-sql-migration-tool/src/main/java/io/hetu/core/sql/migration/tool/Console.java b/hetu-sql-migration-tool/src/main/java/io/hetu/core/sql/migration/tool/Console.java index bd7423f3f..4fb423c34 100644 --- a/hetu-sql-migration-tool/src/main/java/io/hetu/core/sql/migration/tool/Console.java +++ b/hetu-sql-migration-tool/src/main/java/io/hetu/core/sql/migration/tool/Console.java @@ -87,6 +87,7 @@ public boolean run() boolean isFromFile = !isNullOrEmpty(cliOptions.sqlFile); String query = cliOptions.execute; + if (hasQuery) { query += ";"; } diff --git a/presto-cli/src/main/java/io/prestosql/cli/ClientOptions.java b/presto-cli/src/main/java/io/prestosql/cli/ClientOptions.java index d3297e327..4fadf765d 100644 --- a/presto-cli/src/main/java/io/prestosql/cli/ClientOptions.java +++ b/presto-cli/src/main/java/io/prestosql/cli/ClientOptions.java @@ -155,6 +155,9 @@ public class ClientOptions @Option(name = "--max-batch-process-size", title = "Maximum Batch Process Size (Rows)", description = "Maximum Batch Process Size as the number of Rows which can be processed") public String maxBatchProcessSize = "50000000"; + @Option(name = {"-b", "--batch"}, title = "batch query", description = "Execute batch query") + public String batchQuery; + public enum OutputFormat { ALIGNED, diff --git a/presto-cli/src/main/java/io/prestosql/cli/Console.java b/presto-cli/src/main/java/io/prestosql/cli/Console.java index a749d9454..5bd11e52b 100644 --- a/presto-cli/src/main/java/io/prestosql/cli/Console.java +++ b/presto-cli/src/main/java/io/prestosql/cli/Console.java @@ -93,6 +93,7 @@ public boolean run() boolean hasQuery = !isNullOrEmpty(clientOptions.execute); boolean isFromFile = !isNullOrEmpty(clientOptions.file); initializeLogging(clientOptions.logLevelsFile); + boolean hasBatchQuery = !isNullOrEmpty(clientOptions.batchQuery); String query = clientOptions.execute; if (hasQuery) { @@ -103,6 +104,9 @@ public boolean run() if (hasQuery) { throw new RuntimeException("both --execute and --file specified"); } + if (hasBatchQuery) { + throw new RuntimeException("both --batch and --file specified"); + } try { query = asCharSource(new File(clientOptions.file), UTF_8).read(); hasQuery = true; @@ -112,6 +116,14 @@ public boolean run() } } + if (hasBatchQuery) { + if (hasQuery) { + throw new RuntimeException("both --execute and --batch specified"); + } + query = clientOptions.batchQuery; + hasQuery = true; + } + // abort any running query if the CLI is terminated AtomicBoolean exiting = new AtomicBoolean(); ThreadInterruptor interruptor = new ThreadInterruptor(); @@ -140,7 +152,8 @@ public boolean run() Optional.ofNullable(clientOptions.krb5ConfigPath), Optional.ofNullable(clientOptions.krb5KeytabPath), Optional.ofNullable(clientOptions.krb5CredentialCachePath), - !clientOptions.krb5DisableRemoteServiceHostnameCanonicalization)) { + !clientOptions.krb5DisableRemoteServiceHostnameCanonicalization, + hasBatchQuery)) { if (hasQuery) { return executeCommand( queryRunner, diff --git a/presto-cli/src/main/java/io/prestosql/cli/QueryRunner.java b/presto-cli/src/main/java/io/prestosql/cli/QueryRunner.java index ebaef22b2..fd8952950 100644 --- a/presto-cli/src/main/java/io/prestosql/cli/QueryRunner.java +++ b/presto-cli/src/main/java/io/prestosql/cli/QueryRunner.java @@ -44,6 +44,7 @@ public class QueryRunner { private final AtomicReference session; private final boolean debug; + private final boolean isBatchQuery; private final OkHttpClient httpClient; private final Consumer sslSetup; private CubeConsole cubeConsole; @@ -67,10 +68,12 @@ public QueryRunner( Optional kerberosConfigPath, Optional kerberosKeytabPath, Optional kerberosCredentialCachePath, - boolean kerberosUseCanonicalHostname) + boolean kerberosUseCanonicalHostname, + boolean isBatchQuery) { this.session = new AtomicReference<>(requireNonNull(session, "session is null")); this.debug = debug; + this.isBatchQuery = isBatchQuery; this.sslSetup = builder -> setupSsl(builder, keystorePath, keystorePassword, truststorePath, truststorePassword); @@ -153,7 +156,7 @@ private StatementClient startInternalQuery(ClientSession session, String query) sslSetup.accept(builder); OkHttpClient client = builder.build(); - return newStatementClient(client, session, query); + return newStatementClient(client, session, query, isBatchQuery); } @Override diff --git a/presto-cli/src/test/java/io/prestosql/cli/TestQueryRunner.java b/presto-cli/src/test/java/io/prestosql/cli/TestQueryRunner.java index d9dd59edc..984087088 100644 --- a/presto-cli/src/test/java/io/prestosql/cli/TestQueryRunner.java +++ b/presto-cli/src/test/java/io/prestosql/cli/TestQueryRunner.java @@ -161,6 +161,7 @@ static QueryRunner createQueryRunner(ClientSession clientSession) Optional.empty(), Optional.empty(), Optional.empty(), + false, false); } diff --git a/presto-client/src/main/java/io/prestosql/client/PrestoHeaders.java b/presto-client/src/main/java/io/prestosql/client/PrestoHeaders.java index 32e5c815b..40d096473 100644 --- a/presto-client/src/main/java/io/prestosql/client/PrestoHeaders.java +++ b/presto-client/src/main/java/io/prestosql/client/PrestoHeaders.java @@ -50,6 +50,7 @@ public final class PrestoHeaders public static final String PRESTO_PAGE_TOKEN = "X-Presto-Page-Sequence-Id"; public static final String PRESTO_PAGE_NEXT_TOKEN = "X-Presto-Page-End-Sequence-Id"; public static final String PRESTO_BUFFER_COMPLETE = "X-Presto-Buffer-Complete"; + public static final String PRESTO_BATCH_QUERY = "X-Presto-Is_Batch_Query"; private PrestoHeaders() {} } diff --git a/presto-client/src/main/java/io/prestosql/client/StatementClientFactory.java b/presto-client/src/main/java/io/prestosql/client/StatementClientFactory.java index 532dd280d..a65b34db6 100644 --- a/presto-client/src/main/java/io/prestosql/client/StatementClientFactory.java +++ b/presto-client/src/main/java/io/prestosql/client/StatementClientFactory.java @@ -21,6 +21,11 @@ private StatementClientFactory() {} public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query) { - return new StatementClientV1(httpClient, session, query); + return new StatementClientV1(httpClient, session, query, false); + } + + public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query, boolean isBatchQuery) + { + return new StatementClientV1(httpClient, session, query, isBatchQuery); } } diff --git a/presto-client/src/main/java/io/prestosql/client/StatementClientV1.java b/presto-client/src/main/java/io/prestosql/client/StatementClientV1.java index f13c540cd..50cc3d76c 100644 --- a/presto-client/src/main/java/io/prestosql/client/StatementClientV1.java +++ b/presto-client/src/main/java/io/prestosql/client/StatementClientV1.java @@ -61,6 +61,7 @@ import static io.prestosql.client.HttpSecurityHeadersConstants.HTTP_SECURITY_XPCDP_VALUE; import static io.prestosql.client.HttpSecurityHeadersConstants.HTTP_SECURITY_XXP; import static io.prestosql.client.HttpSecurityHeadersConstants.HTTP_SECURITY_XXP_VALUE; +import static io.prestosql.client.PrestoHeaders.PRESTO_BATCH_QUERY; import static java.lang.String.format; import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; @@ -97,10 +98,10 @@ class StatementClientV1 private final String user; private final String clientCapabilities; private final boolean timeInMilliseconds; - + private boolean isBatchQuery; private final AtomicReference state = new AtomicReference<>(State.RUNNING); - public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query) + public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query, boolean isBatchQuery) { requireNonNull(httpClient, "httpClient is null"); requireNonNull(session, "session is null"); @@ -113,6 +114,7 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String this.user = session.getUser(); this.clientCapabilities = Joiner.on(",").join(ClientCapabilities.values()); this.timeInMilliseconds = session.isTimeInMilliseconds(); + this.isBatchQuery = isBatchQuery; Request request = buildQueryRequest(session, query); @@ -233,6 +235,9 @@ private Request buildQueryRequest(ClientSession session, String query) else { builder.addHeader(HTTP_SECURITY_XXP, HTTP_SECURITY_XXP_VALUE); } + if (isBatchQuery) { + builder.addHeader(PRESTO_BATCH_QUERY, "1"); + } return builder.build(); } diff --git a/presto-main/src/main/java/io/prestosql/dispatcher/DispatchManager.java b/presto-main/src/main/java/io/prestosql/dispatcher/DispatchManager.java index b5985066d..3216d51d0 100644 --- a/presto-main/src/main/java/io/prestosql/dispatcher/DispatchManager.java +++ b/presto-main/src/main/java/io/prestosql/dispatcher/DispatchManager.java @@ -41,6 +41,7 @@ import io.prestosql.spi.resourcegroups.SelectionContext; import io.prestosql.spi.resourcegroups.SelectionCriteria; import io.prestosql.spi.service.PropertyService; +import io.prestosql.sql.tree.Statement; import io.prestosql.statestore.SharedQueryState; import io.prestosql.statestore.StateCacheStore; import io.prestosql.statestore.StateFetcher; @@ -57,10 +58,12 @@ import javax.inject.Inject; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.StringTokenizer; import java.util.concurrent.Executor; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -170,7 +173,7 @@ public QueryId createQueryId() return queryIdGenerator.createNextQueryId(); } - public ListenableFuture createQuery(QueryId queryId, String slug, SessionContext sessionContext, String query) + public ListenableFuture createQuery(QueryId queryId, String slug, SessionContext sessionContext, String query, boolean isBatchQuery) { requireNonNull(queryId, "queryId is null"); requireNonNull(sessionContext, "sessionFactory is null"); @@ -181,7 +184,12 @@ public ListenableFuture createQuery(QueryId queryId, String slug, SessionCont DispatchQueryCreationFuture queryCreationFuture = new DispatchQueryCreationFuture(); queryExecutor.execute(() -> { try { - createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager); + if (!isBatchQuery) { + createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager); + } + else { + createBatchQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager); + } } finally { queryCreationFuture.set(null); @@ -279,6 +287,107 @@ private void createQueryInternal(QueryId queryId, String slug, SessionContex } } + private void createBatchQueryInternal(QueryId queryId, String slug, SessionContext sessionContext, String inputQuery, ResourceGroupManager resourceGroupManager) + { + String query = inputQuery; + Session session = null; + DispatchQuery dispatchQuery = null; + List queryList = new ArrayList<>(); + List preparedQueryList = new ArrayList<>(); + boolean isTransactionControlStatement = false; + + try { + if (query.length() > maxQueryLength) { + int queryLength = query.length(); + query = query.substring(0, maxQueryLength); + throw new PrestoException(QUERY_TEXT_TOO_LARGE, format("Query text length (%s) exceeds the maximum length (%s)", queryLength, maxQueryLength)); + } + + // decode session + session = sessionSupplier.createSession(queryId, sessionContext); + StringTokenizer tokenizer = new StringTokenizer(query, ";"); + while (tokenizer.hasMoreTokens()) { + String curQuery = tokenizer.nextToken(); + queryList.add(curQuery); + // prepare query + preparedQueryList.add(queryPreparer.prepareQuery(session, curQuery)); + } + + // select resource group + SelectionContext selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria( + sessionContext.getIdentity().getPrincipal().isPresent(), + sessionContext.getIdentity().getUser(), + Optional.ofNullable(sessionContext.getSource()), + sessionContext.getClientTags(), + sessionContext.getResourceEstimates(), + Optional.empty())); + + // apply system default session properties (does not override user set properties) + session = sessionPropertyDefaults.newSessionWithDefaultProperties(session, Optional.empty(), selectionContext.getResourceGroupId()); + + // Check if any query is transaction control statement + for (PreparedQuery preparedQuery : preparedQueryList) { + Statement statement = preparedQuery.getStatement(); + isTransactionControlStatement = isTransactionControlStatement(statement); + if (isTransactionControlStatement) { + break; + } + } + // mark existing transaction as active + transactionManager.activateTransaction(session, isTransactionControlStatement, accessControl); + + dispatchQuery = dispatchQueryFactory.createDispatchQuery( + session, + queryList, + preparedQueryList, + slug, + selectionContext.getResourceGroupId(), + resourceGroupManager, + isTransactionControlStatement); + + boolean queryAdded = queryCreated(dispatchQuery); + if (queryAdded && !dispatchQuery.isDone()) { + try { + resourceGroupManager.submit(dispatchQuery, selectionContext, queryExecutor); + + if (PropertyService.getBooleanProperty(HetuConstant.MULTI_COORDINATOR_ENABLED) && stateUpdater != null) { + stateUpdater.registerQuery(StateStoreConstants.QUERY_STATE_COLLECTION_NAME, dispatchQuery); + } + + if (LOG.isDebugEnabled()) { + long now = System.currentTimeMillis(); + LOG.debug("query:%s submission started at %s, ended at %s, total time use: %sms", + dispatchQuery.getQueryId(), + new SimpleDateFormat("HH:mm:ss:SSS").format(dispatchQuery.getCreateTime().toDate()), + new SimpleDateFormat("HH:mm:ss:SSS").format(new Date(now)), + now - dispatchQuery.getCreateTime().getMillis()); + } + } + catch (Throwable e) { + // dispatch query has already been registered, so just fail it directly + dispatchQuery.fail(e); + } + } + } + catch (Throwable throwable) { + // creation must never fail, so register a failed query in this case + if (dispatchQuery == null) { + if (session == null) { + session = Session.builder(new SessionPropertyManager()) + .setQueryId(queryId) + .setIdentity(sessionContext.getIdentity()) + .setSource(sessionContext.getSource()) + .build(); + } + DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, Optional.empty(), throwable); + queryCreated(failedDispatchQuery); + } + else { + dispatchQuery.fail(throwable); + } + } + } + private boolean queryCreated(DispatchQuery dispatchQuery) { boolean queryAdded = queryTracker.addQuery(dispatchQuery); diff --git a/presto-main/src/main/java/io/prestosql/dispatcher/DispatchQueryFactory.java b/presto-main/src/main/java/io/prestosql/dispatcher/DispatchQueryFactory.java index 15c7cbc03..185d1619a 100644 --- a/presto-main/src/main/java/io/prestosql/dispatcher/DispatchQueryFactory.java +++ b/presto-main/src/main/java/io/prestosql/dispatcher/DispatchQueryFactory.java @@ -18,6 +18,8 @@ import io.prestosql.execution.resourcegroups.ResourceGroupManager; import io.prestosql.spi.resourcegroups.ResourceGroupId; +import java.util.List; + public interface DispatchQueryFactory { DispatchQuery createDispatchQuery( @@ -27,4 +29,13 @@ DispatchQuery createDispatchQuery( String slug, ResourceGroupId resourceGroup, ResourceGroupManager resourceGroupManager); + + DispatchQuery createDispatchQuery( + Session session, + List queryList, + List preparedQueryList, + String slug, + ResourceGroupId resourceGroup, + ResourceGroupManager resourceGroupManager, + boolean isTransactionControlStatement); } diff --git a/presto-main/src/main/java/io/prestosql/dispatcher/LocalDispatchQueryFactory.java b/presto-main/src/main/java/io/prestosql/dispatcher/LocalDispatchQueryFactory.java index 2a69d52f4..e7adeb03b 100644 --- a/presto-main/src/main/java/io/prestosql/dispatcher/LocalDispatchQueryFactory.java +++ b/presto-main/src/main/java/io/prestosql/dispatcher/LocalDispatchQueryFactory.java @@ -31,11 +31,14 @@ import io.prestosql.security.AccessControl; import io.prestosql.spi.PrestoException; import io.prestosql.spi.resourcegroups.ResourceGroupId; +import io.prestosql.sql.tree.Query; import io.prestosql.sql.tree.Statement; import io.prestosql.transaction.TransactionManager; import javax.inject.Inject; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; @@ -119,7 +122,58 @@ public DispatchQuery createDispatchQuery( throw new PrestoException(NOT_SUPPORTED, "Unsupported statement type: " + preparedQuery.getStatement().getClass().getSimpleName()); } - QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(preparedQuery, stateMachine, slug, warningCollector); + QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(preparedQuery, null, stateMachine, slug, warningCollector); + stateMachine.endSyntaxAnalysis(); + return queryExecution; + }); + + return new LocalDispatchQuery( + stateMachine, + queryExecutionFuture, + clusterSizeMonitor, + executor, + queryManager::createQuery); + } + + @Override + public DispatchQuery createDispatchQuery( + Session session, + List queryList, + List preparedQueryList, + String slug, ResourceGroupId resourceGroup, + ResourceGroupManager resourceGroupManager, + boolean isTransactionControlStatement) + { + WarningCollector warningCollector = warningCollectorFactory.create(); + List> prepareSqlList = new ArrayList<>(); + for (PreparedQuery preparedQuery : preparedQueryList) { + prepareSqlList.add(preparedQuery.getPrepareSql()); + } + QueryStateMachine stateMachine = QueryStateMachine.begin( + queryList, + prepareSqlList, + session, + locationFactory.createQueryLocation(session.getQueryId()), + resourceGroup, + resourceGroupManager, + isTransactionControlStatement, + transactionManager, + accessControl, + executor, + metadata, + warningCollector); + + queryMonitor.queryCreatedEvent(stateMachine.getBasicQueryInfo(Optional.empty())); + + ListenableFuture queryExecutionFuture = executor.submit(() -> { + stateMachine.beginSyntaxAnalysis(); + // Temporarily to get SqlQueryExecutionFactory + QueryExecutionFactory queryExecutionFactory = executionFactories.get(Query.class); + if (queryExecutionFactory == null) { + throw new PrestoException(NOT_SUPPORTED, "Unsupported statement type: " + Query.class.getSimpleName()); + } + + QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(null, preparedQueryList, stateMachine, slug, warningCollector); stateMachine.endSyntaxAnalysis(); return queryExecution; }); diff --git a/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java b/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java index 48d773ea1..2838bad34 100644 --- a/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java +++ b/presto-main/src/main/java/io/prestosql/dispatcher/QueuedStatementResource.java @@ -71,6 +71,7 @@ import static io.airlift.concurrent.Threads.threadsNamed; import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom; import static io.airlift.jaxrs.AsyncResponseHandler.bindAsyncResponse; +import static io.prestosql.client.PrestoHeaders.PRESTO_BATCH_QUERY; import static io.prestosql.execution.QueryState.FAILED; import static io.prestosql.execution.QueryState.QUEUED; import static io.prestosql.metadata.NodeState.ACTIVE; @@ -169,7 +170,7 @@ public Response postStatement( } SessionContext sessionContext = new HttpRequestSessionContext(servletRequest, groupProvider); - Query query = new Query(statement, sessionContext, dispatchManager); + Query query = new Query(statement, sessionContext, dispatchManager, "1".equals(servletRequest.getHeader(PRESTO_BATCH_QUERY))); queries.put(query.getQueryId(), query); return Response.ok(query.getQueryResults(query.getLastToken(), uriInfo, xForwardedProto)).build(); @@ -304,16 +305,18 @@ private static final class Query private final QueryId queryId; private final String slug = "x" + randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""); private final AtomicLong lastToken = new AtomicLong(); + private final boolean isBatchQuery; @GuardedBy("this") private ListenableFuture querySubmissionFuture; - public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager) + public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, boolean isBatchQuery) { this.query = requireNonNull(query, "query is null"); this.sessionContext = requireNonNull(sessionContext, "sessionContext is null"); this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null"); this.queryId = dispatchManager.createQueryId(); + this.isBatchQuery = isBatchQuery; } public QueryId getQueryId() @@ -341,7 +344,7 @@ private ListenableFuture waitForDispatched() // if query query submission has not finished, wait for it to finish synchronized (this) { if (querySubmissionFuture == null) { - querySubmissionFuture = dispatchManager.createQuery(queryId, slug, sessionContext, query); + querySubmissionFuture = dispatchManager.createQuery(queryId, slug, sessionContext, query, isBatchQuery); } if (!querySubmissionFuture.isDone()) { return querySubmissionFuture; diff --git a/presto-main/src/main/java/io/prestosql/execution/DataDefinitionExecution.java b/presto-main/src/main/java/io/prestosql/execution/DataDefinitionExecution.java index fb26f6dd3..03ba220b7 100644 --- a/presto-main/src/main/java/io/prestosql/execution/DataDefinitionExecution.java +++ b/presto-main/src/main/java/io/prestosql/execution/DataDefinitionExecution.java @@ -306,6 +306,7 @@ public DataDefinitionExecutionFactory( @Override public DataDefinitionExecution createQueryExecution( PreparedQuery preparedQuery, + List preparedQueryList, QueryStateMachine stateMachine, String slug, WarningCollector warningCollector) diff --git a/presto-main/src/main/java/io/prestosql/execution/QueryExecution.java b/presto-main/src/main/java/io/prestosql/execution/QueryExecution.java index 55d44447b..acc181950 100644 --- a/presto-main/src/main/java/io/prestosql/execution/QueryExecution.java +++ b/presto-main/src/main/java/io/prestosql/execution/QueryExecution.java @@ -96,7 +96,7 @@ default void resumeQuery() interface QueryExecutionFactory { - T createQueryExecution(PreparedQuery preparedQuery, QueryStateMachine stateMachine, String slug, WarningCollector warningCollector); + T createQueryExecution(PreparedQuery preparedQuery, List preparedQueryList, QueryStateMachine stateMachine, String slug, WarningCollector warningCollector); } /** diff --git a/presto-main/src/main/java/io/prestosql/execution/QueryStateMachine.java b/presto-main/src/main/java/io/prestosql/execution/QueryStateMachine.java index 159c21e2b..63321ea62 100644 --- a/presto-main/src/main/java/io/prestosql/execution/QueryStateMachine.java +++ b/presto-main/src/main/java/io/prestosql/execution/QueryStateMachine.java @@ -105,7 +105,9 @@ public class QueryStateMachine private final QueryId queryId; private final String query; + private final List queryList; private final Optional preparedQuery; + private final List> preparedQueryList; private final Session session; private final URI self; private final ResourceGroupId resourceGroup; @@ -153,6 +155,8 @@ public class QueryStateMachine private final AtomicReference updateType = new AtomicReference<>(); + private final List updateTypeList = new ArrayList<>(); + private final AtomicReference failureCause = new AtomicReference<>(); private final AtomicReference> inputs = new AtomicReference<>(ImmutableSet.of()); @@ -177,7 +181,9 @@ private QueryStateMachine( WarningCollector warningCollector) { this.query = requireNonNull(query, "query is null"); + this.queryList = null; this.preparedQuery = requireNonNull(preparedQuery, "preparedQuery is null"); + this.preparedQueryList = null; this.session = requireNonNull(session, "session is null"); this.queryId = session.getQueryId(); this.self = requireNonNull(self, "self is null"); @@ -195,6 +201,40 @@ private QueryStateMachine( this.warningCollector = requireNonNull(warningCollector, "warningCollector is null"); } + private QueryStateMachine( + List queryList, + List> preparedQueryList, + Session session, + URI self, + ResourceGroupId resourceGroup, + ResourceGroupManager resourceGroupManager, + TransactionManager transactionManager, + Executor executor, + Ticker ticker, + Metadata metadata, + WarningCollector warningCollector) + { + this.query = null; + this.queryList = requireNonNull(queryList, "queryList is null"); + this.preparedQuery = null; + this.preparedQueryList = requireNonNull(preparedQueryList, "preparedQuery is null"); + this.session = requireNonNull(session, "session is null"); + this.queryId = session.getQueryId(); + this.self = requireNonNull(self, "self is null"); + this.resourceGroup = requireNonNull(resourceGroup, "resourceGroup is null"); + this.resourceGroupManager = resourceGroupManager; + this.throttlingEnabled = resourceGroupManager.isGroupRegistered(resourceGroup) + && resourceGroupManager.getSoftReservedMemory(resourceGroup) != Long.MAX_VALUE; + this.transactionManager = requireNonNull(transactionManager, "transactionManager is null"); + this.queryStateTimer = new QueryStateTimer(ticker); + this.metadata = requireNonNull(metadata, "metadata is null"); + + this.queryState = new StateMachine<>("query " + queryId, executor, QUEUED, TERMINAL_QUERY_STATES); + this.finalQueryInfo = new StateMachine<>("finalQueryInfo-" + queryId, executor, Optional.empty()); + this.outputManager = new QueryOutputManager(executor); + this.warningCollector = requireNonNull(warningCollector, "warningCollector is null"); + } + /** * Created QueryStateMachines must be transitioned to terminal states to clean up resources. */ @@ -228,6 +268,39 @@ public static QueryStateMachine begin( warningCollector); } + /** + * Created QueryStateMachines must be transitioned to terminal states to clean up resources. + */ + public static QueryStateMachine begin( + List queryList, + List> preparedQueryList, + Session session, + URI self, + ResourceGroupId resourceGroup, + ResourceGroupManager resourceGroupManager, + boolean transactionControl, + TransactionManager transactionManager, + AccessControl accessControl, + Executor executor, + Metadata metadata, + WarningCollector warningCollector) + { + return beginWithTicker( + queryList, + preparedQueryList, + session, + self, + resourceGroup, + resourceGroupManager, + transactionControl, + transactionManager, + accessControl, + executor, + Ticker.systemTicker(), + metadata, + warningCollector); + } + static QueryStateMachine beginWithTicker( String query, Optional preparedQuery, @@ -268,6 +341,46 @@ static QueryStateMachine beginWithTicker( return queryStateMachine; } + static QueryStateMachine beginWithTicker( + List queryList, + List> preparedQueryList, + Session inputSession, + URI self, + ResourceGroupId resourceGroup, + ResourceGroupManager resourceGroupManager, + boolean transactionControl, + TransactionManager transactionManager, + AccessControl accessControl, + Executor executor, + Ticker ticker, + Metadata metadata, + WarningCollector warningCollector) + { + Session localSession = inputSession; + // If there is not an existing transaction, begin an auto commit transaction + if (!localSession.getTransactionId().isPresent() && !transactionControl) { + // TODO: make autocommit isolation level a session parameter + TransactionId transactionId = transactionManager.beginTransaction(true); + localSession = localSession.beginTransactionId(transactionId, transactionManager, accessControl); + } + + QueryStateMachine queryStateMachine = new QueryStateMachine( + queryList, + preparedQueryList, + localSession, + self, + resourceGroup, + resourceGroupManager, + transactionManager, + executor, + ticker, + metadata, + warningCollector); + queryStateMachine.addStateChangeListener(newState -> QUERY_STATE_LOG.debug("Query %s is %s", queryStateMachine.getQueryId(), newState)); + + return queryStateMachine; + } + public QueryId getQueryId() { return queryId; @@ -400,8 +513,8 @@ public BasicQueryInfo getBasicQueryInfo(Optional rootStage) memoryPool.get().getId(), stageStats.isScheduled(), self, - query, - preparedQuery, + query != null ? query : queryList.get(0), + preparedQuery != null ? preparedQuery : preparedQueryList.get(0), queryStats, errorCode == null ? null : errorCode.getType(), errorCode); @@ -436,8 +549,8 @@ QueryInfo getQueryInfo(Optional rootStage) isScheduled, self, outputManager.getQueryOutputInfo().map(QueryOutputInfo::getColumnNames).orElse(ImmutableList.of()), - query, - preparedQuery, + query != null ? query : queryList.get(0), + preparedQuery != null ? preparedQuery : preparedQueryList.get(0), getQueryStats(rootStage), Optional.ofNullable(setCatalog.get()), Optional.ofNullable(setSchema.get()), @@ -449,7 +562,7 @@ QueryInfo getQueryInfo(Optional rootStage) deallocatedPreparedStatements, Optional.ofNullable(startedTransactionId.get()), clearTransactionId.get(), - updateType.get(), + updateType.get() != null ? updateType.get() : updateTypeList.get(0), rootStage, localFailureCause, errorCode, @@ -754,6 +867,11 @@ public void setUpdateType(String updateType) this.updateType.set(updateType); } + public void addUpdateType(String updateType) + { + this.updateTypeList.add(updateType); + } + public QueryState getQueryState() { return queryState.get(); diff --git a/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java b/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java index 89203be39..02a28ec20 100644 --- a/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java +++ b/presto-main/src/main/java/io/prestosql/execution/SqlQueryExecution.java @@ -164,6 +164,7 @@ public class SqlQueryExecution private final ExecutionPolicy executionPolicy; private final SplitSchedulerStats schedulerStats; private final Analysis analysis; + private final List analysisList; private final StatsCalculator statsCalculator; private final CostCalculator costCalculator; private final DynamicFilterService dynamicFilterService; @@ -176,6 +177,7 @@ public class SqlQueryExecution public SqlQueryExecution( PreparedQuery preparedQuery, + List preparedQueryList, QueryStateMachine stateMachine, String slug, Metadata metadata, @@ -243,28 +245,64 @@ public SqlQueryExecution( } }); - // analyze query - requireNonNull(preparedQuery, "preparedQuery is null"); - Analyzer analyzer = new Analyzer( - stateMachine.getSession(), - metadata, - sqlParser, - accessControl, - Optional.of(queryExplainer), - preparedQuery.getParameters(), - warningCollector, - heuristicIndexerManager, - cubeManager); - this.analysis = analyzer.analyze(preparedQuery.getStatement()); - - stateMachine.setUpdateType(analysis.getUpdateType()); + if (preparedQueryList == null) { + // analyze query + requireNonNull(preparedQuery, "preparedQuery is null"); + Analyzer analyzer = new Analyzer( + stateMachine.getSession(), + metadata, + sqlParser, + accessControl, + Optional.of(queryExplainer), + preparedQuery.getParameters(), + warningCollector, + heuristicIndexerManager, + cubeManager); + this.analysis = analyzer.analyze(preparedQuery.getStatement()); + this.analysisList = null; + + stateMachine.setUpdateType(analysis.getUpdateType()); + } + else { // For batch query + this.analysisList = new ArrayList<>(); + for (PreparedQuery curPreparedQuery : preparedQueryList) { + requireNonNull(curPreparedQuery, "curPreparedQuery is null"); + Analyzer analyzer = new Analyzer( + stateMachine.getSession(), + metadata, + sqlParser, + accessControl, + Optional.of(queryExplainer), + curPreparedQuery.getParameters(), + warningCollector, + heuristicIndexerManager, + cubeManager); + Analysis curAnlaysis = analyzer.analyze(curPreparedQuery.getStatement()); + this.analysisList.add(curAnlaysis); + stateMachine.addUpdateType(curAnlaysis.getUpdateType()); + } + this.analysis = null; + } // when the query finishes cache the final query info, and clear the reference to the output stage AtomicReference localQueryScheduler = this.queryScheduler; stateMachine.addStateChangeListener(state -> { - //Set the AsyncRunning flag if query is capable of running async - if (analysis.isAsyncQuery() && state == QueryState.RUNNING) { - stateMachine.setRunningAsync(true); + if (state == QueryState.RUNNING) { + if (analysis != null) { + if (analysis.isAsyncQuery()) { + stateMachine.setRunningAsync(true); + } + } + else { // For batch query + boolean asyncQueries = false; + for (Analysis curAnalysis : analysisList) { + asyncQueries = curAnalysis.isAsyncQuery(); + if (!asyncQueries) { + break; + } + } + stateMachine.setRunningAsync(asyncQueries); + } } if (!state.isDone()) { @@ -648,9 +686,15 @@ private PlanRoot doAnalyzeQuery() stateMachine.beginAnalysis(); stateMachine.beginLogicalPlan(); + Analysis curAnalysis = analysis; + //TODO temporary till bqo is up + if (analysisList != null && analysisList.get(0) != null) { + curAnalysis = analysisList.get(0); + } + // plan query PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(); - Plan plan = createPlan(analysis, stateMachine.getSession(), planOptimizers, idAllocator, metadata, new TypeAnalyzer(sqlParser, metadata), statsCalculator, costCalculator, stateMachine.getWarningCollector()); + Plan plan = createPlan(curAnalysis, stateMachine.getSession(), planOptimizers, idAllocator, metadata, new TypeAnalyzer(sqlParser, metadata), statsCalculator, costCalculator, stateMachine.getWarningCollector()); queryPlan.set(plan); // extract inputs @@ -658,7 +702,7 @@ private PlanRoot doAnalyzeQuery() stateMachine.setInputs(inputs); // extract output - stateMachine.setOutput(analysis.getTarget()); + stateMachine.setOutput(curAnalysis.getTarget()); stateMachine.endLogicalPlan(); // fragment the plan @@ -667,13 +711,13 @@ private PlanRoot doAnalyzeQuery() // record analysis time stateMachine.endAnalysis(); - boolean explainAnalyze = analysis.getStatement() instanceof Explain && ((Explain) analysis.getStatement()).isAnalyze(); + boolean explainAnalyze = curAnalysis.getStatement() instanceof Explain && ((Explain) curAnalysis.getStatement()).isAnalyze(); if (SystemSessionProperties.isRecoveryEnabled(getSession())) { checkRecoverySupport(getSession()); } - return new PlanRoot(fragmentedPlan, !explainAnalyze, extractConnectors(analysis)); + return new PlanRoot(fragmentedPlan, !explainAnalyze, extractConnectors(curAnalysis)); } // This method was introduced separate logical planning from query analyzing stage @@ -1160,6 +1204,7 @@ private void loadConfigToService(HetuConfig hetuConfig) @Override public QueryExecution createQueryExecution( PreparedQuery preparedQuery, + List preparedQueryList, QueryStateMachine stateMachine, String slug, WarningCollector warningCollector) @@ -1170,6 +1215,7 @@ public QueryExecution createQueryExecution( return new CachedSqlQueryExecution( preparedQuery, + preparedQueryList, stateMachine, slug, metadata, diff --git a/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java b/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java index 63b716cf7..488fae319 100644 --- a/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java +++ b/presto-main/src/main/java/io/prestosql/query/CachedSqlQueryExecution.java @@ -110,7 +110,7 @@ public class CachedSqlQueryExecution private final Optional> cache; // cache key is generated by SqlQueryExecutionCacheKeyGenerator private final BeginTableWrite beginTableWrite; - public CachedSqlQueryExecution(QueryPreparer.PreparedQuery preparedQuery, QueryStateMachine stateMachine, + public CachedSqlQueryExecution(QueryPreparer.PreparedQuery preparedQuery, List preparedQueryList, QueryStateMachine stateMachine, String slug, Metadata metadata, CubeManager cubeManager, AccessControl accessControl, SqlParser sqlParser, SplitManager splitManager, NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, List planOptimizers, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory, @@ -121,7 +121,7 @@ public CachedSqlQueryExecution(QueryPreparer.PreparedQuery preparedQuery, QueryS DynamicFilterService dynamicFilterService, Optional> cache, HeuristicIndexerManager heuristicIndexerManager, StateStoreProvider stateStoreProvider, RecoveryUtils recoveryUtils) { - super(preparedQuery, stateMachine, slug, metadata, cubeManager, accessControl, sqlParser, splitManager, + super(preparedQuery, preparedQueryList, stateMachine, slug, metadata, cubeManager, accessControl, sqlParser, splitManager, nodePartitioningManager, nodeScheduler, planOptimizers, planFragmenter, remoteTaskFactory, locationFactory, scheduleSplitBatchSize, queryExecutor, schedulerExecutor, failureDetector, nodeTaskMap, queryExplainer, executionPolicy, schedulerStats, statsCalculator, costCalculator, warningCollector, dynamicFilterService, heuristicIndexerManager, stateStoreProvider, recoveryUtils); diff --git a/presto-main/src/main/java/io/prestosql/server/protocol/PagePublisherQueryRunner.java b/presto-main/src/main/java/io/prestosql/server/protocol/PagePublisherQueryRunner.java index 5e3cc2abe..394a0cbde 100644 --- a/presto-main/src/main/java/io/prestosql/server/protocol/PagePublisherQueryRunner.java +++ b/presto-main/src/main/java/io/prestosql/server/protocol/PagePublisherQueryRunner.java @@ -232,7 +232,7 @@ private static WebApplicationException badRequest(Response.Status status, String private void waitForDispatched(QueryId queryId, String slug, SessionContext sessionContext, String query) { - ListenableFuture future = this.dispatchManager.createQuery(queryId, slug, sessionContext, query); + ListenableFuture future = this.dispatchManager.createQuery(queryId, slug, sessionContext, query, false); try { future.get(); this.dispatchManager.waitForDispatched(queryId).get(); diff --git a/presto-main/src/main/java/io/prestosql/vacuum/AutoVacuumScanner.java b/presto-main/src/main/java/io/prestosql/vacuum/AutoVacuumScanner.java index b3b52f2cb..8931408a4 100644 --- a/presto-main/src/main/java/io/prestosql/vacuum/AutoVacuumScanner.java +++ b/presto-main/src/main/java/io/prestosql/vacuum/AutoVacuumScanner.java @@ -229,7 +229,7 @@ private ListenableFuture waitForDispatched(QueryId queryId, String slug, Auto ListenableFuture querySubmissionFuture; // if query query submission has not finished, wait for it to finish synchronized (this) { - querySubmissionFuture = dispatchManager.createQuery(queryId, slug, sessionContext, vacuumQuery); + querySubmissionFuture = dispatchManager.createQuery(queryId, slug, sessionContext, vacuumQuery, false); } if (!querySubmissionFuture.isDone()) { diff --git a/presto-tests/src/test/java/io/prestosql/execution/TestQueryRunnerUtil.java b/presto-tests/src/test/java/io/prestosql/execution/TestQueryRunnerUtil.java index e877e6bf9..9dd0d8773 100644 --- a/presto-tests/src/test/java/io/prestosql/execution/TestQueryRunnerUtil.java +++ b/presto-tests/src/test/java/io/prestosql/execution/TestQueryRunnerUtil.java @@ -37,7 +37,7 @@ private TestQueryRunnerUtil() {} public static QueryId createQuery(DistributedQueryRunner queryRunner, Session session, String sql) { DispatchManager dispatchManager = queryRunner.getCoordinator().getDispatchManager(); - getFutureValue(dispatchManager.createQuery(session.getQueryId(), "slug", new TestingSessionContext(session), sql)); + getFutureValue(dispatchManager.createQuery(session.getQueryId(), "slug", new TestingSessionContext(session), sql, false)); return session.getQueryId(); } diff --git a/presto-tests/src/test/java/io/prestosql/tests/TestMetadataManager.java b/presto-tests/src/test/java/io/prestosql/tests/TestMetadataManager.java index 0b7827e79..fd8c3ccdc 100644 --- a/presto-tests/src/test/java/io/prestosql/tests/TestMetadataManager.java +++ b/presto-tests/src/test/java/io/prestosql/tests/TestMetadataManager.java @@ -119,7 +119,7 @@ public void testMetadataIsClearedAfterQueryCanceled() queryId, "slug", new TestingSessionContext(TEST_SESSION), - "SELECT * FROM lineitem") + "SELECT * FROM lineitem", false) .get(); // wait until query starts running diff --git a/presto-tests/src/test/java/io/prestosql/tests/TestQueryManager.java b/presto-tests/src/test/java/io/prestosql/tests/TestQueryManager.java index bbdff40ee..fad486967 100644 --- a/presto-tests/src/test/java/io/prestosql/tests/TestQueryManager.java +++ b/presto-tests/src/test/java/io/prestosql/tests/TestQueryManager.java @@ -66,7 +66,7 @@ public void testFailQuery() queryId, "slug", new TestingSessionContext(TEST_SESSION), - "SELECT * FROM lineitem") + "SELECT * FROM lineitem", false) .get(); // wait until query starts running