diff --git a/src/main/java/com/aliyun/odps/jdbc/OdpsConnection.java b/src/main/java/com/aliyun/odps/jdbc/OdpsConnection.java index f99f384c..9e7291d8 100644 --- a/src/main/java/com/aliyun/odps/jdbc/OdpsConnection.java +++ b/src/main/java/com/aliyun/odps/jdbc/OdpsConnection.java @@ -53,6 +53,7 @@ import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.jdbc.utils.ConnectionResource; import com.aliyun.odps.jdbc.utils.Utils; +import com.aliyun.odps.sqa.ExecuteMode; import com.aliyun.odps.sqa.FallbackPolicy; import com.aliyun.odps.sqa.SQLExecutor; import com.aliyun.odps.sqa.SQLExecutorBuilder; @@ -186,8 +187,11 @@ public class OdpsConnection extends WrapperAdapter implements Connection { if (interactiveMode) { long cost = System.currentTimeMillis() - startTime; log.info(String.format("load project meta infos time cost=%d", cost)); - initSQLExecutor(serviceName, connRes.getFallbackPolicy()); } + + // 不管是不是interactiveMode都初始化。具体的mode在executor中做区分 + initSQLExecutor(serviceName, connRes.getFallbackPolicy()); + String msg = "Connect to odps project %s successfully"; log.info(String.format(msg, odps.getDefaultProject())); @@ -216,6 +220,8 @@ public void initSQLExecutor(String serviceName, FallbackPolicy fallbackPolicy) t executeOdps.setDefaultProject(executeProject); } builder.odps(executeOdps) + .executeMode(interactiveMode ? ExecuteMode.INTERACTIVE : ExecuteMode.OFFLINE) + .enableCommandApi(true) .properties(hints) .serviceName(serviceName) .fallbackPolicy(fallbackPolicy) diff --git a/src/main/java/com/aliyun/odps/jdbc/OdpsStatement.java b/src/main/java/com/aliyun/odps/jdbc/OdpsStatement.java index d9ea3115..3eb5cde5 100644 --- a/src/main/java/com/aliyun/odps/jdbc/OdpsStatement.java +++ b/src/main/java/com/aliyun/odps/jdbc/OdpsStatement.java @@ -15,29 +15,32 @@ package com.aliyun.odps.jdbc; -import java.io.BufferedReader; import java.io.IOException; -import java.io.StringReader; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLWarning; import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; +import org.apache.commons.lang.StringEscapeUtils; -import com.aliyun.odps.*; +import com.aliyun.odps.Column; +import com.aliyun.odps.Instance; import com.aliyun.odps.Instance.Status; import com.aliyun.odps.Instance.TaskStatus; -import com.aliyun.odps.sqa.SQLExecutor; -import org.apache.commons.lang.StringEscapeUtils; - +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.data.Record; import com.aliyun.odps.jdbc.utils.OdpsLogger; import com.aliyun.odps.jdbc.utils.Utils; -import com.aliyun.odps.task.SQLTask; +import com.aliyun.odps.sqa.SQLExecutor; +import com.aliyun.odps.sqa.utils.SqlParserUtil; import com.aliyun.odps.tunnel.InstanceTunnel; import com.aliyun.odps.tunnel.InstanceTunnel.DownloadSession; import com.aliyun.odps.tunnel.TunnelException; @@ -47,117 +50,6 @@ public class OdpsStatement extends WrapperAdapter implements Statement { - /** - * TODO: Hack, remove later - */ - private static Pattern DESC_TABLE_PATTERN = Pattern.compile( - "\\s*(DESCRIBE|DESC)\\s+([^;]+);?", Pattern.CASE_INSENSITIVE|Pattern.DOTALL); - private static Pattern SHOW_TABLES_PATTERN = Pattern.compile( - "\\s*SHOW\\s+TABLES\\s*;?", Pattern.CASE_INSENSITIVE|Pattern.DOTALL); - private static Pattern SHOW_PARTITIONS_PATTERN = Pattern.compile( - "\\s*SHOW\\s+PARTITIONS\\s+([^;]+);?", Pattern.CASE_INSENSITIVE|Pattern.DOTALL); - - private static final Pattern TABLE_PARTITION_PATTERN = Pattern.compile( - "\\s*([\\w\\.]+)(\\s*|(\\s+PARTITION\\s*\\((.*)\\)))\\s*", Pattern.CASE_INSENSITIVE); - - public static String[] parseTablePartition(String tablePartition) { - String[] ret = new String[2]; - - Matcher m = TABLE_PARTITION_PATTERN.matcher(tablePartition); - boolean match = m.matches(); - - if (match && m.groupCount() >= 1) { - ret[0] = m.group(1); - } - - if (match && m.groupCount() >= 4) { - ret[1] = m.group(4); - } - - return ret; - } - - private void descTablePartition(String tablePartition) throws SQLException { - String[] parsedTablePartition = parseTablePartition(tablePartition); - if (parsedTablePartition[0] == null) { - throw new SQLException("Invalid argument: " + tablePartition); - } - - OdpsResultSetMetaData meta = new OdpsResultSetMetaData( - Arrays.asList("col_name", "data_type", "comment"), - Arrays.asList(TypeInfoFactory.STRING, TypeInfoFactory.STRING, TypeInfoFactory.STRING)); - List rows = new LinkedList<>(); - - try { - Table t = connHandle.getOdps().tables().get(parsedTablePartition[0]); - addColumnDesc(t.getSchema().getColumns(), rows); - addColumnDesc(t.getSchema().getPartitionColumns(), rows); - - if (t.isPartitioned()) { - rows.add(new String[] {"", null, null}); - rows.add(new String[] {"# Partition Information", null, null}); - rows.add(new String[] {"# col_name", "data_type", "comment"}); - rows.add(new String[] {"", null, null}); - addColumnDesc(t.getSchema().getPartitionColumns(), rows); - - if (parsedTablePartition[1] != null) { - Partition partition = t.getPartition(new PartitionSpec(parsedTablePartition[1])); - PartitionSpec spec = partition.getPartitionSpec(); - rows.add(new String[] {"", null, null}); - rows.add(new String[] {"# Detailed Partition Information", null, null}); - List partitionValues = partition.getPartitionSpec().keys() - .stream() - .map(spec::get) - .collect(Collectors.toList()); - rows.add(new String[] {"Partition Value:", String.join(", ",partitionValues), null}); - rows.add(new String[] {"Database:", connHandle.getOdps().getDefaultProject(), null}); - rows.add(new String[] {"Table:", parsedTablePartition[0], null}); - rows.add(new String[] {"CreateTime:", partition.getCreatedTime().toString(), null}); - rows.add(new String[] {"LastDDLTime:", partition.getLastMetaModifiedTime().toString(), null}); - rows.add(new String[] {"LastModifiedTime:", partition.getLastDataModifiedTime().toString(), null}); - } - } - } catch (Exception e) { - throw new SQLException(e); - } - - resultSet = new OdpsStaticResultSet(connHandle, meta, rows.iterator()); - } - - private void addColumnDesc(List columns, List rows) { - for (Column c : columns) { - String[] row = new String[3]; - row[0] = c.getName(); - row[1] = c.getTypeInfo().getTypeName(); - row[2] = c.getComment(); - rows.add(row); - } - } - - private void showTables() throws SQLException { - OdpsResultSetMetaData meta = - new OdpsResultSetMetaData(Collections.singletonList("tab_name"), - Collections.singletonList(TypeInfoFactory.STRING)); - List rows = new LinkedList<>(); - for (Table table : connHandle.getOdps().tables()) { - rows.add(new String[] {table.getName()}); - } - resultSet = new OdpsStaticResultSet(connHandle, meta, rows.iterator()); - } - - private void showPartitions(String table) throws SQLException { - OdpsResultSetMetaData meta = - new OdpsResultSetMetaData(Collections.singletonList("partition"), - Collections.singletonList(TypeInfoFactory.STRING)); - List rows = new LinkedList<>(); - for (Partition partition : connHandle.getOdps().tables().get(table).getPartitions()) { - rows.add(new String[] {partition.getPartitionSpec().toString(false, true)}); - } - - resultSet = new OdpsStaticResultSet(connHandle, meta, rows.iterator()); - } - - private OdpsConnection connHandle; private Instance executeInstance = null; private ResultSet resultSet = null; @@ -304,40 +196,22 @@ public int[] executeBatch() throws SQLException { @Override public synchronized ResultSet executeQuery(String sql) throws SQLException { Properties properties = new Properties(); + String query = Utils.parseSetting(sql, properties); + if (StringUtils.isNullOrEmpty(query)) { + // only settings, just set properties + processSetClause(properties); + return EMPTY_RESULT_SET; + } + // otherwise those properties is just for this query - // TODO: hack, remove later - Matcher descTablePatternMatcher = DESC_TABLE_PATTERN.matcher(sql); - Matcher showTablesPatternMatcher = SHOW_TABLES_PATTERN.matcher(sql); - Matcher showPartitionsPatternMatcher = SHOW_PARTITIONS_PATTERN.matcher(sql); - - if (descTablePatternMatcher.matches()) { - descTablePartition(showPartitionsPatternMatcher.group(2)); - return getResultSet(); - } else if (showTablesPatternMatcher.matches()) { - showTables(); - return getResultSet(); - } else if (showPartitionsPatternMatcher.matches()) { - showPartitions(showPartitionsPatternMatcher.group(1)); - return getResultSet(); - } else { - String query = Utils.parseSetting(sql, properties); - - if (StringUtils.isNullOrEmpty(query)) { - // only settings, just set properties - processSetClause(properties); - return EMPTY_RESULT_SET; - } - // otherwise those properties is just for this query - - if (processUseClause(query)) { - return EMPTY_RESULT_SET; - } - checkClosed(); - beforeExecute(); - runSQL(query, properties); - - return hasResultSet(query) ? getResultSet() : EMPTY_RESULT_SET; + if (processUseClause(query)) { + return EMPTY_RESULT_SET; } + checkClosed(); + beforeExecute(); + runSQL(query, properties); + + return hasResultSet(query) ? getResultSet() : EMPTY_RESULT_SET; } @Override @@ -386,44 +260,26 @@ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { @Override public boolean execute(String sql) throws SQLException { - // short cut for SET clause Properties properties = new Properties(); - // TODO: hack, remove later - Matcher descTablePatternMatcher = DESC_TABLE_PATTERN.matcher(sql); - Matcher showTablesPatternMatcher = SHOW_TABLES_PATTERN.matcher(sql); - Matcher showPartitionsPatternMatcher = SHOW_PARTITIONS_PATTERN.matcher(sql); - - if (descTablePatternMatcher.matches()) { - descTablePartition(descTablePatternMatcher.group(2)); - return true; - } else if (showTablesPatternMatcher.matches()) { - showTables(); - return true; - } else if (showPartitionsPatternMatcher.matches()) { - showPartitions(showPartitionsPatternMatcher.group(1)); - return true; - } else { - String query = Utils.parseSetting(sql, properties); - - if (StringUtils.isNullOrEmpty(query)) { - // only settings, just set properties - processSetClause(properties); - return false; - } - // otherwise those properties is just for this query + String query = Utils.parseSetting(sql, properties); + if (StringUtils.isNullOrEmpty(query)) { + // only settings, just set properties + processSetClause(properties); + return false; + } + // otherwise those properties is just for this query - if (processUseClause(query)) { - return false; - } + if (processUseClause(query)) { + return false; + } - checkClosed(); - beforeExecute(); - runSQL(query, properties); + checkClosed(); + beforeExecute(); + runSQL(query, properties); - return hasResultSet(query); - } + return hasResultSet(query); } public boolean hasResultSet(String sql) throws SQLException { @@ -432,40 +288,20 @@ public boolean hasResultSet(String sql) throws SQLException { } if (updateCount == 0) { - return isQuery(sql); + return isQuery(sql) || connHandle.getExecutor().hasResultSet(); } else { return updateCount < 0; } } /** + * 采用的是odps-sql的语法文件,用来判断sql是否具有结果集。 * * @param sql sql statement * @return if the input sql statement is a query statement - * @throws SQLException */ - public static boolean isQuery(String sql) throws SQLException { - BufferedReader reader = new BufferedReader(new StringReader(sql)); - try { - String line; - while ((line = reader.readLine()) != null) { - if (line.matches("^\\s*(--|#).*")) { // skip the comment starting with '--' or '#' - continue; - } - if (line.matches("^\\s*$")) { // skip the whitespace line - continue; - } - // The first none-comment line start with "select" - if (line.matches("(?i)^(\\s*)(SELECT).*$")) { - return true; - } else { - break; - } - } - } catch (IOException e) { - throw new SQLException(e); - } - return false; + public static boolean isQuery(String sql) { + return SqlParserUtil.hasResultSet(sql); } private void processSetClause(Properties properties) { @@ -601,6 +437,24 @@ public void setQueryTimeout(int seconds) throws SQLException { public ResultSet getResultSet() throws SQLException { long startTime = System.currentTimeMillis(); if (resultSet == null || resultSet.isClosed()) { + + try { + SQLExecutor sqlExecutor = connHandle.getExecutor(); + com.aliyun.odps.data.ResultSet dataResultSet = sqlExecutor + .getResultSet(0L, resultCountLimit, resultSizeLimit, enableLimit); + // 只有command-API匹配的结果才可以进行一次转换 + if (!dataResultSet.isScrollable()) { + OdpsResultSetMetaData meta = getResultMeta(dataResultSet.getTableSchema().getColumns()); + List res = new ArrayList<>(); + for (Record record : dataResultSet) { + res.add(record.toArray()); + } + return new OdpsStaticResultSet(connHandle, meta, res.iterator()); + } + } catch (OdpsException | IOException ignored) { + + } + DownloadSession session; InstanceTunnel tunnel = new InstanceTunnel(connHandle.getOdps()); String te = connHandle.getTunnelEndpoint(); @@ -647,17 +501,20 @@ public ResultSet getResultSet() throws SQLException { } else { if (sessionResultSet != null) { try { - session = tunnel.createDirectDownloadSession( - connHandle.getOdps().getDefaultProject(), - connHandle.getExecutor().getInstance().getId(), - connHandle.getExecutor().getTaskName(), - connHandle.getExecutor().getSubqueryId(), - enableLimit); - OdpsResultSetMetaData meta = getResultMeta(sessionResultSet.getTableSchema().getColumns()); - resultSet = - isResultSetScrollable ? new OdpsScollResultSet(this, meta, session, - OdpsScollResultSet.ResultMode.INTERACTIVE) - : new OdpsSessionForwardResultSet(this, meta, sessionResultSet, startTime); + OdpsResultSetMetaData meta = getResultMeta( + sessionResultSet.getTableSchema().getColumns()); + if (isResultSetScrollable && sessionResultSet.isScrollable()) { + session = tunnel.createDirectDownloadSession( + connHandle.getOdps().getDefaultProject(), + connHandle.getExecutor().getInstance().getId(), + connHandle.getExecutor().getTaskName(), + connHandle.getExecutor().getSubqueryId(), + enableLimit); + resultSet = new OdpsScollResultSet(this, meta, session, + OdpsScollResultSet.ResultMode.INTERACTIVE); + } else { + resultSet = new OdpsSessionForwardResultSet(this, meta, sessionResultSet, startTime); + } sessionResultSet = null; } catch (TunnelException e) { connHandle.log.error("create download session for session failed: " + e.getMessage()); @@ -706,7 +563,7 @@ public int getResultSetType() throws SQLException { @Override public synchronized int getUpdateCount() throws SQLException { checkClosed(); - if (updateCountFetched){ + if (updateCountFetched) { return -1; } updateCountFetched = true; @@ -800,22 +657,23 @@ protected void checkClosed() throws SQLException { } } - private void runSQLOffline( - String sql, - Odps odps, Map settings) + private void runSQLOffline(String sql, Map settings) throws SQLException, OdpsException { long begin = System.currentTimeMillis(); - executeInstance = - SQLTask.run(odps, odps.getDefaultProject(), sql, JDBC_SQL_TASK_NAME, settings, null); - LogView logView = new LogView(odps); - if (connHandle.getLogviewHost() != null) { - logView.setLogViewHost(connHandle.getLogviewHost()); - } - String logViewUrl = logView.generateLogView(executeInstance, 7 * 24); + + SQLExecutor sqlExecutor = connHandle.getExecutor(); + sqlExecutor.run(sql, settings); + executeInstance = sqlExecutor.getInstance(); connHandle.log.info("Run SQL: " + sql); - connHandle.log.info(logViewUrl); - warningChain = new SQLWarning(logViewUrl); + + if (executeInstance == null) { + long end = System.currentTimeMillis(); + connHandle.log.info("It took me " + (end - begin) + " ms to run sql"); + } else { + String logViewUrl = sqlExecutor.getLogView(); + connHandle.log.info(logViewUrl); + warningChain = new SQLWarning(logViewUrl); // Poll the task status within the instance boolean complete = false; @@ -830,58 +688,63 @@ private void runSQLOffline( complete = Status.TERMINATED.equals(executeInstance.getStatus()); } - TaskStatus taskStatus = executeInstance.getTaskStatus().get(JDBC_SQL_TASK_NAME); - if (taskStatus == null) { - connHandle.log.warn("Failed to get task status. " - + "The instance may have been killed before its task was created."); - } else { - switch (taskStatus.getStatus()) { - case SUCCESS: - connHandle.log.debug("sql status: success"); - break; - case FAILED: - try { - String reason = executeInstance.getTaskResults().get(JDBC_SQL_TASK_NAME); - connHandle.log.error("execute sql [" + sql + "] failed: " + reason); - throw new SQLException("execute sql [" + sql + "] failed: " + reason, "FAILED"); - } catch (OdpsException e) { - connHandle.log.error("Fail to get task status:" + sql, e); - throw new SQLException("Fail to get task status", e); - } - case CANCELLED: - connHandle.log.info("execute instance cancelled"); - throw new SQLException("execute instance cancelled", "CANCELLED"); - case WAITING: - case RUNNING: - case SUSPENDED: - connHandle.log.debug("sql status: " + taskStatus.getStatus()); - break; - default: + String taskName = executeInstance.getTaskNames().iterator().next(); + TaskStatus taskStatus = executeInstance.getTaskStatus().get(taskName); + if (taskStatus == null) { + connHandle.log.warn("Failed to get task status. " + + "The instance may have been killed before its task was created."); + } else { + switch (taskStatus.getStatus()) { + case SUCCESS: + connHandle.log.debug("sql status: success"); + break; + case FAILED: + try { + String reason = executeInstance.getTaskResults().get(taskName); + connHandle.log.error("execute sql [" + sql + "] failed: " + reason); + throw new SQLException("execute sql [" + sql + "] failed: " + reason, "FAILED"); + } catch (OdpsException e) { + connHandle.log.error("Fail to get task status:" + sql, e); + throw new SQLException("Fail to get task status", e); + } + case CANCELLED: + connHandle.log.info("execute instance cancelled"); + throw new SQLException("execute instance cancelled", "CANCELLED"); + case WAITING: + case RUNNING: + case SUSPENDED: + connHandle.log.debug("sql status: " + taskStatus.getStatus()); + break; + default: + } } - } + long end = System.currentTimeMillis(); + connHandle.log.info("It took me " + (end - begin) + " ms to run sql"); - long end = System.currentTimeMillis(); - connHandle.log.info("It took me " + (end - begin) + " ms to run sql"); + // extract update count + Instance.TaskSummary taskSummary = null; + try { + taskSummary = executeInstance.getTaskSummary(taskName); + } catch (OdpsException e) { + // update count become uncertain here + connHandle.log.warn( + "Failed to get TaskSummary: instance_id=" + executeInstance.getId() + ", taskname=" + + taskName); + } - // extract update count - Instance.TaskSummary taskSummary = null; - try { - taskSummary = executeInstance.getTaskSummary(JDBC_SQL_TASK_NAME); - } catch (OdpsException e) { - // update count become uncertain here - connHandle.log.warn("Failed to get TaskSummary: instance_id=" + executeInstance.getId() + ", taskname=" + JDBC_SQL_TASK_NAME); + if (taskSummary != null) { + updateCount = Utils.getSinkCountFromTaskSummary( + StringEscapeUtils.unescapeJava(taskSummary.getJsonSummary())); + } else { + connHandle.log.warn("task summary is empty"); + } + connHandle.log.debug("successfully updated " + updateCount + " records"); } - if (taskSummary != null) { - updateCount = Utils.getSinkCountFromTaskSummary( - StringEscapeUtils.unescapeJava(taskSummary.getJsonSummary())); - } else { - connHandle.log.warn("task summary is empty"); - } - connHandle.log.debug("successfully updated " + updateCount + " records"); } - private void runSQLInSession(String sql, Map settings) throws SQLException, OdpsException { + private void runSQLInSession(String sql, Map settings) + throws SQLException, OdpsException { long begin = System.currentTimeMillis(); SQLExecutor executor = connHandle.getExecutor(); if (queryTimeout != -1 && !settings.containsKey("odps.sql.session.query.timeout")) { @@ -927,9 +790,7 @@ private void runSQL(String sql, Properties properties) throws SQLException { sql += ";"; } - Odps odps = connHandle.getOdps(); - - Map settings = new HashMap<>(); + Map settings = new HashMap<>(); for (String key : sqlTaskProperties.stringPropertyNames()) { settings.put(key, sqlTaskProperties.getProperty(key)); } @@ -943,7 +804,7 @@ private void runSQL(String sql, Properties properties) throws SQLException { } if (!connHandle.runningInInteractiveMode()) { - runSQLOffline(sql, odps, settings); + runSQLOffline(sql, settings); } else { runSQLInSession(sql, settings); } @@ -958,7 +819,9 @@ public Instance getExecuteInstance() { return executeInstance; } - public static String getDefaultTaskName() { return JDBC_SQL_TASK_NAME; } + public static String getDefaultTaskName() { + return JDBC_SQL_TASK_NAME; + } public Properties getSqlTaskProperties() { return sqlTaskProperties;