From d32da134ca8760bd0531ea85ea3f03ef81e01bc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BC=8E=E6=98=95?= Date: Thu, 29 Aug 2024 14:19:06 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E7=BB=9F=E4=B8=80=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=B9=B6=E4=BC=A0=E9=80=92=E8=AF=A6=E7=BB=86?= =?UTF-8?q?=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../com/aliyun/odps/jdbc/DataUploader.java | 2 +- .../com/aliyun/odps/jdbc/OdpsConnection.java | 2 +- .../odps/jdbc/OdpsDatabaseMetaData.java | 4 +- .../odps/jdbc/OdpsForwardResultSet.java | 6 +- .../odps/jdbc/OdpsPreparedStatement.java | 10 +- .../aliyun/odps/jdbc/OdpsScollResultSet.java | 6 +- .../com/aliyun/odps/jdbc/OdpsStatement.java | 152 +++++++++--------- .../com/aliyun/odps/jdbc/utils/Utils.java | 2 +- 9 files changed, 89 insertions(+), 97 deletions(-) diff --git a/pom.xml b/pom.xml index 65d28e33..2f8970a1 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ - 0.48.7-public + 0.48.8-public 4.11 UTF-8 1.8 diff --git a/src/main/java/com/aliyun/odps/jdbc/DataUploader.java b/src/main/java/com/aliyun/odps/jdbc/DataUploader.java index 599ed4df..441457a8 100644 --- a/src/main/java/com/aliyun/odps/jdbc/DataUploader.java +++ b/src/main/java/com/aliyun/odps/jdbc/DataUploader.java @@ -93,7 +93,7 @@ public int[] upload(List batchedRows) throws SQLException { try { upload(batchedRows, batchedSize, updateCounts); } catch (Exception e) { - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); //TODO } diff --git a/src/main/java/com/aliyun/odps/jdbc/OdpsConnection.java b/src/main/java/com/aliyun/odps/jdbc/OdpsConnection.java index df044962..864b48ae 100644 --- a/src/main/java/com/aliyun/odps/jdbc/OdpsConnection.java +++ b/src/main/java/com/aliyun/odps/jdbc/OdpsConnection.java @@ -319,7 +319,7 @@ public boolean isOdpsNamespaceSchema() { } catch (OdpsException e) { log.error("Connect to odps failed:" + e.getMessage()); - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); } } diff --git a/src/main/java/com/aliyun/odps/jdbc/OdpsDatabaseMetaData.java b/src/main/java/com/aliyun/odps/jdbc/OdpsDatabaseMetaData.java index 7e7e7933..51e27445 100644 --- a/src/main/java/com/aliyun/odps/jdbc/OdpsDatabaseMetaData.java +++ b/src/main/java/com/aliyun/odps/jdbc/OdpsDatabaseMetaData.java @@ -842,7 +842,7 @@ && schemaMatches(schemaPattern, schemas.getString(COL_NAME_TABLE_SCHEM))) { schemas.close(); } } catch (Exception e) { - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); } long end = System.currentTimeMillis(); @@ -996,7 +996,7 @@ && schemaMatches(schemaPattern, PRJ_NAME_MAXCOMPUTE_PUBLIC_DATA) } } } catch (OdpsException | RuntimeException e) { - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); } sortRows(rows, new int[]{1, 0}); diff --git a/src/main/java/com/aliyun/odps/jdbc/OdpsForwardResultSet.java b/src/main/java/com/aliyun/odps/jdbc/OdpsForwardResultSet.java index 4e8e250a..f238c0e7 100644 --- a/src/main/java/com/aliyun/odps/jdbc/OdpsForwardResultSet.java +++ b/src/main/java/com/aliyun/odps/jdbc/OdpsForwardResultSet.java @@ -111,7 +111,7 @@ public void close() throws SQLException { reader.close(); } } catch (IOException e) { - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); } conn.log.info("the result set has been closed"); } @@ -178,9 +178,9 @@ private void rebuildReader() throws SQLException { reader = sessionHandle.openRecordReader(fetchedRows, count, true); conn.log.warn(String.format("open read record, start=%d, cnt=%d", fetchedRows, count)); } catch (IOException e) { - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); } catch (TunnelException e) { - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); } } diff --git a/src/main/java/com/aliyun/odps/jdbc/OdpsPreparedStatement.java b/src/main/java/com/aliyun/odps/jdbc/OdpsPreparedStatement.java index c724a972..76902b33 100644 --- a/src/main/java/com/aliyun/odps/jdbc/OdpsPreparedStatement.java +++ b/src/main/java/com/aliyun/odps/jdbc/OdpsPreparedStatement.java @@ -257,10 +257,8 @@ private void parse() throws SQLException { try { uploader = DataUploader.build(projectName, schemaName, tableName, partitionSpec, specificColumns, getConnection()); - } catch (OdpsException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new SQLException(e); + } catch (OdpsException | IOException e) { + throw new SQLException(e.getMessage()); } parsed = true; @@ -277,7 +275,7 @@ public void close() throws SQLException { try { uploader.commit(); } catch (TunnelException | IOException e) { - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); } } super.close(); @@ -543,7 +541,7 @@ private String convertJavaTypeToSqlString(Object x) throws SQLException { throw new SQLException("charset is null"); } } catch (UnsupportedEncodingException e) { - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); } } else if (java.util.Date.class.isInstance(x)) { Calendar.Builder calendarBuilder = new Calendar.Builder() diff --git a/src/main/java/com/aliyun/odps/jdbc/OdpsScollResultSet.java b/src/main/java/com/aliyun/odps/jdbc/OdpsScollResultSet.java index 9827b22a..1f49b041 100644 --- a/src/main/java/com/aliyun/odps/jdbc/OdpsScollResultSet.java +++ b/src/main/java/com/aliyun/odps/jdbc/OdpsScollResultSet.java @@ -335,10 +335,8 @@ private void fetchRows() throws SQLException { .info(String.format("fetch records, start=%d, cnt=%d, %d KB, %.2f KB/s", cachedUpperRow, count, totalKBytes, (float) totalKBytes / duration * 1000)); reader.close(); - } catch (TunnelException e) { - throw new SQLException(e); - } catch (IOException e) { - throw new SQLException(e); + } catch (TunnelException | IOException e) { + throw new SQLException(e.getMessage(), e); } } } diff --git a/src/main/java/com/aliyun/odps/jdbc/OdpsStatement.java b/src/main/java/com/aliyun/odps/jdbc/OdpsStatement.java index 009677d3..9908c307 100644 --- a/src/main/java/com/aliyun/odps/jdbc/OdpsStatement.java +++ b/src/main/java/com/aliyun/odps/jdbc/OdpsStatement.java @@ -112,7 +112,7 @@ enum FetchDirection { protected boolean enableLimit = false; - private SQLWarning warningChain = null; + private SQLWarning warningChain = new SQLWarning(); OdpsStatement(OdpsConnection conn) { this(conn, false); @@ -153,7 +153,7 @@ public void cancel() throws SQLException { } } } catch (OdpsException e) { - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); } isCancelled = true; @@ -348,7 +348,7 @@ public static boolean isQuery(String sql) throws SQLException { } } } catch (IOException e) { - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); } return false; } @@ -485,14 +485,12 @@ public void setQueryTimeout(int seconds) throws SQLException { @Override public ResultSet getResultSet() throws SQLException { long startTime = System.currentTimeMillis(); - if (resultSet == null || resultSet.isClosed()) { - if (odpsResultSet != null) { + if (resultSet == null || resultSet.isClosed() && odpsResultSet != null) { OdpsResultSetMetaData meta = getResultMeta(odpsResultSet.getTableSchema().getColumns()); - try { - if (connHandle.getExecutor().getInstance() == null) { + if (!isResultSetScrollable || connHandle.getExecutor().getInstance() == null) { resultSet = new OdpsSessionForwardResultSet(this, meta, odpsResultSet, startTime); } else { DownloadSession session; @@ -515,15 +513,11 @@ public ResultSet getResultSet() throws SQLException { connHandle.getExecutor().getSubqueryId(), enableLimit); - resultSet = - isResultSetScrollable ? new OdpsScollResultSet(this, meta, session, + resultSet = new OdpsScollResultSet(this, meta, session, connHandle.getExecutor() .isRunningInInteractiveMode() ? OdpsScollResultSet.ResultMode.INTERACTIVE - : OdpsScollResultSet.ResultMode.OFFLINE) - : new OdpsSessionForwardResultSet(this, meta, - odpsResultSet, - startTime); + : OdpsScollResultSet.ResultMode.OFFLINE); } odpsResultSet = null; } catch (TunnelException e) { @@ -539,9 +533,7 @@ public ResultSet getResultSet() throws SQLException { + connHandle.getExecutor().getInstance().getId() + ", Error:" + e .getMessage(), e); } - } } - return resultSet; } @@ -670,78 +662,82 @@ protected void checkClosed() throws SQLException { private void runSQL(String sql, Map settings, boolean isUpdate) throws SQLException, OdpsException { - long begin = System.currentTimeMillis(); SQLExecutor executor = connHandle.getExecutor(); - if (queryTimeout != -1 && !settings.containsKey("odps.sql.session.query.timeout")) { - settings.put("odps.sql.session.query.timeout", String.valueOf(queryTimeout)); - } - Long autoSelectLimit = connHandle.getAutoSelectLimit(); - if (autoSelectLimit != null && autoSelectLimit > 0) { - settings.put("odps.sql.select.auto.limit", autoSelectLimit.toString()); - } - connHandle.log.info("Run SQL: " + sql + ", Begin time: " + begin); - executor.run(sql, settings); - executeInstance = executor.getInstance(); - if (executeInstance != null) { - connHandle.log.info("InstanceId: " + executeInstance.getId()); - } - if (isUpdate) { + try { + long begin = System.currentTimeMillis(); + if (queryTimeout != -1 && !settings.containsKey("odps.sql.session.query.timeout")) { + settings.put("odps.sql.session.query.timeout", String.valueOf(queryTimeout)); + } + Long autoSelectLimit = connHandle.getAutoSelectLimit(); + if (autoSelectLimit != null && autoSelectLimit > 0) { + settings.put("odps.sql.select.auto.limit", autoSelectLimit.toString()); + } + connHandle.log.info("Run SQL: " + sql + ", Begin time: " + begin); + executor.run(sql, settings); + executeInstance = executor.getInstance(); if (executeInstance != null) { - executeInstance.waitForSuccess(); - - Instance.TaskSummary taskSummary = null; - try { - taskSummary = executeInstance.getTaskSummary(JDBC_SQL_OFFLINE_TASK_NAME); - } catch (OdpsException e) { - // update count become uncertain here - connHandle.log.warn( - "Failed to get TaskSummary: instance_id=" + executeInstance.getId() + ", taskname=" - + JDBC_SQL_OFFLINE_TASK_NAME); + connHandle.log.info("InstanceId: " + executeInstance.getId()); + } + if (isUpdate) { + if (executeInstance != null) { + executeInstance.waitForSuccess(); + Instance.TaskSummary taskSummary = null; + try { + taskSummary = executeInstance.getTaskSummary(JDBC_SQL_OFFLINE_TASK_NAME); + } catch (OdpsException e) { + // update count become uncertain here + connHandle.log.warn( + "Failed to get TaskSummary: instance_id=" + executeInstance.getId() + ", taskname=" + + JDBC_SQL_OFFLINE_TASK_NAME); + } + if (taskSummary != null) { + updateCount = Utils.getSinkCountFromTaskSummary( + StringEscapeUtils.unescapeJava(taskSummary.getJsonSummary())); + connHandle.log.debug("successfully updated " + updateCount + " records"); + } else { + connHandle.log.warn("task summary is empty"); + } } - - if (taskSummary != null) { - updateCount = Utils.getSinkCountFromTaskSummary( - StringEscapeUtils.unescapeJava(taskSummary.getJsonSummary())); - connHandle.log.debug("successfully updated " + updateCount + " records"); - } else { - connHandle.log.warn("task summary is empty"); + // 如果是DML或者DDL,即使有结果也视为无结果 + odpsResultSet = null; + } else { + setResultSetInternal(); + } + long end = System.currentTimeMillis(); + if (executeInstance != null) { + connHandle.log.info("It took me " + (end - begin) + " ms to run sql, instanceId: " + + executeInstance.getId()); + } else { + connHandle.log.info("It took me " + (end - begin) + " ms to run sql"); + } + logviewUrl = executor.getLogView(); + if (logviewUrl != null) { + connHandle.log.info("LogView: " + logviewUrl); + } + List exeLog = executor.getExecutionLog(); + if (!exeLog.isEmpty()) { + for (String log : exeLog) { + connHandle.log.info("Session execution log: " + log); } } - // 如果是DML或者DDL,即使有结果也视为无结果 - odpsResultSet = null; - } else { + } catch (OdpsException | IOException e) { + String logview = null; try { - setResultSetInternal(); - List exeLog = executor.getExecutionLog(); - if (!exeLog.isEmpty()) { - for (String log : exeLog) { - connHandle.log.warn("Session execution log:" + log); - } - } - } catch (IOException e) { - connHandle.log.error("Run SQL failed", e); - throw new SQLException("execute sql [" + sql + "] instance:[" - + executor.getInstance().getId() + "] failed: " + e.getMessage(), e); - } catch (OdpsException e) { - connHandle.log.error("Run SQL failed", e); - throw new SQLException("execute sql [" + sql + "] instance:[" - + executor.getInstance().getId() + "] failed: " + e.getMessage(), e); + logview = executor.getLogView(); + } catch (Exception ignored) { } + throwSQLException(e, sql, executor.getInstance(), logview); } + } - long end = System.currentTimeMillis(); - if (executeInstance != null) { - connHandle.log.info("It took me " + (end - begin) + " ms to run sql, instanceId: " - + executeInstance.getId()); - warningChain = new SQLWarning("instance id: " + executeInstance.getId()); - } else { - connHandle.log.info("It took me " + (end - begin) + " ms to run sql"); - warningChain = new SQLWarning(); - } - logviewUrl = executor.getLogView(); - if (logviewUrl != null) { - connHandle.log.info("LogView: " + logviewUrl); - } + private void throwSQLException(Exception e, String sql, Instance instance, String logviewUrl) throws SQLException { + connHandle.log.error("LogView: " + logviewUrl); + connHandle.log.error("Run SQL failed", e); + throw new SQLException( + "execute sql [ " + sql + " ] + failed. " + (instance == null ? "" : "instanceId:[" + + instance.getId() + + "]") + + e.getMessage(), e); } private void runSQL(String sql, Properties properties) throws SQLException { diff --git a/src/main/java/com/aliyun/odps/jdbc/utils/Utils.java b/src/main/java/com/aliyun/odps/jdbc/utils/Utils.java index 09582882..3d4d832b 100644 --- a/src/main/java/com/aliyun/odps/jdbc/utils/Utils.java +++ b/src/main/java/com/aliyun/odps/jdbc/utils/Utils.java @@ -61,7 +61,7 @@ private static String getRawResult(Odps odps, String sql) throws SQLException { i.waitForSuccess(); return i.getTaskResults().get("AnonymousSQLTask"); } catch (OdpsException e) { - throw new SQLException(e); + throw new SQLException(e.getMessage(), e); } }