Skip to content

Commit

Permalink
fix: 统一异常处理并传递详细信息
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxin-tech committed Aug 29, 2024
1 parent a2740a7 commit d32da13
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 97 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
</distributionManagement>

<properties>
<sdk.version>0.48.7-public</sdk.version>
<sdk.version>0.48.8-public</sdk.version>
<junit.version>4.11</junit.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/aliyun/odps/jdbc/DataUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public int[] upload(List<Object[]> batchedRows) throws SQLException {
try {
upload(batchedRows, batchedSize, updateCounts);
} catch (Exception e) {
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
//TODO
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/aliyun/odps/jdbc/OdpsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public boolean isOdpsNamespaceSchema() {

} catch (OdpsException e) {
log.error("Connect to odps failed:" + e.getMessage());
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/aliyun/odps/jdbc/OdpsDatabaseMetaData.java
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ && schemaMatches(schemaPattern, schemas.getString(COL_NAME_TABLE_SCHEM))) {
schemas.close();
}
} catch (Exception e) {
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
}

long end = System.currentTimeMillis();
Expand Down Expand Up @@ -996,7 +996,7 @@ && schemaMatches(schemaPattern, PRJ_NAME_MAXCOMPUTE_PUBLIC_DATA)
}
}
} catch (OdpsException | RuntimeException e) {
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
}

sortRows(rows, new int[]{1, 0});
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/aliyun/odps/jdbc/OdpsForwardResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void close() throws SQLException {
reader.close();
}
} catch (IOException e) {
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
}
conn.log.info("the result set has been closed");
}
Expand Down Expand Up @@ -178,9 +178,9 @@ private void rebuildReader() throws SQLException {
reader = sessionHandle.openRecordReader(fetchedRows, count, true);
conn.log.warn(String.format("open read record, start=%d, cnt=%d", fetchedRows, count));
} catch (IOException e) {
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
} catch (TunnelException e) {
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
}
}

Expand Down
10 changes: 4 additions & 6 deletions src/main/java/com/aliyun/odps/jdbc/OdpsPreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,8 @@ private void parse() throws SQLException {
try {
uploader = DataUploader.build(projectName, schemaName, tableName, partitionSpec,
specificColumns, getConnection());
} catch (OdpsException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new SQLException(e);
} catch (OdpsException | IOException e) {
throw new SQLException(e.getMessage());
}

parsed = true;
Expand All @@ -277,7 +275,7 @@ public void close() throws SQLException {
try {
uploader.commit();
} catch (TunnelException | IOException e) {
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
}
}
super.close();
Expand Down Expand Up @@ -543,7 +541,7 @@ private String convertJavaTypeToSqlString(Object x) throws SQLException {
throw new SQLException("charset is null");
}
} catch (UnsupportedEncodingException e) {
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
}
} else if (java.util.Date.class.isInstance(x)) {
Calendar.Builder calendarBuilder = new Calendar.Builder()
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/com/aliyun/odps/jdbc/OdpsScollResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,8 @@ private void fetchRows() throws SQLException {
.info(String.format("fetch records, start=%d, cnt=%d, %d KB, %.2f KB/s", cachedUpperRow,
count, totalKBytes, (float) totalKBytes / duration * 1000));
reader.close();
} catch (TunnelException e) {
throw new SQLException(e);
} catch (IOException e) {
throw new SQLException(e);
} catch (TunnelException | IOException e) {
throw new SQLException(e.getMessage(), e);
}
}
}
152 changes: 74 additions & 78 deletions src/main/java/com/aliyun/odps/jdbc/OdpsStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ enum FetchDirection {

protected boolean enableLimit = false;

private SQLWarning warningChain = null;
private SQLWarning warningChain = new SQLWarning();

OdpsStatement(OdpsConnection conn) {
this(conn, false);
Expand Down Expand Up @@ -153,7 +153,7 @@ public void cancel() throws SQLException {
}
}
} catch (OdpsException e) {
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
}

isCancelled = true;
Expand Down Expand Up @@ -348,7 +348,7 @@ public static boolean isQuery(String sql) throws SQLException {
}
}
} catch (IOException e) {
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
}
return false;
}
Expand Down Expand Up @@ -485,14 +485,12 @@ public void setQueryTimeout(int seconds) throws SQLException {
@Override
public ResultSet getResultSet() throws SQLException {
long startTime = System.currentTimeMillis();
if (resultSet == null || resultSet.isClosed()) {
if (odpsResultSet != null) {
if (resultSet == null || resultSet.isClosed() && odpsResultSet != null) {
OdpsResultSetMetaData
meta =
getResultMeta(odpsResultSet.getTableSchema().getColumns());

try {
if (connHandle.getExecutor().getInstance() == null) {
if (!isResultSetScrollable || connHandle.getExecutor().getInstance() == null) {
resultSet = new OdpsSessionForwardResultSet(this, meta, odpsResultSet, startTime);
} else {
DownloadSession session;
Expand All @@ -515,15 +513,11 @@ public ResultSet getResultSet() throws SQLException {
connHandle.getExecutor().getSubqueryId(),
enableLimit);

resultSet =
isResultSetScrollable ? new OdpsScollResultSet(this, meta, session,
resultSet = new OdpsScollResultSet(this, meta, session,
connHandle.getExecutor()
.isRunningInInteractiveMode()
? OdpsScollResultSet.ResultMode.INTERACTIVE
: OdpsScollResultSet.ResultMode.OFFLINE)
: new OdpsSessionForwardResultSet(this, meta,
odpsResultSet,
startTime);
: OdpsScollResultSet.ResultMode.OFFLINE);
}
odpsResultSet = null;
} catch (TunnelException e) {
Expand All @@ -539,9 +533,7 @@ public ResultSet getResultSet() throws SQLException {
+ connHandle.getExecutor().getInstance().getId() + ", Error:" + e
.getMessage(), e);
}
}
}

return resultSet;
}

Expand Down Expand Up @@ -670,78 +662,82 @@ protected void checkClosed() throws SQLException {

private void runSQL(String sql, Map<String, String> settings, boolean isUpdate)
throws SQLException, OdpsException {
long begin = System.currentTimeMillis();
SQLExecutor executor = connHandle.getExecutor();
if (queryTimeout != -1 && !settings.containsKey("odps.sql.session.query.timeout")) {
settings.put("odps.sql.session.query.timeout", String.valueOf(queryTimeout));
}
Long autoSelectLimit = connHandle.getAutoSelectLimit();
if (autoSelectLimit != null && autoSelectLimit > 0) {
settings.put("odps.sql.select.auto.limit", autoSelectLimit.toString());
}
connHandle.log.info("Run SQL: " + sql + ", Begin time: " + begin);
executor.run(sql, settings);
executeInstance = executor.getInstance();
if (executeInstance != null) {
connHandle.log.info("InstanceId: " + executeInstance.getId());
}
if (isUpdate) {
try {
long begin = System.currentTimeMillis();
if (queryTimeout != -1 && !settings.containsKey("odps.sql.session.query.timeout")) {
settings.put("odps.sql.session.query.timeout", String.valueOf(queryTimeout));
}
Long autoSelectLimit = connHandle.getAutoSelectLimit();
if (autoSelectLimit != null && autoSelectLimit > 0) {
settings.put("odps.sql.select.auto.limit", autoSelectLimit.toString());
}
connHandle.log.info("Run SQL: " + sql + ", Begin time: " + begin);
executor.run(sql, settings);
executeInstance = executor.getInstance();
if (executeInstance != null) {
executeInstance.waitForSuccess();

Instance.TaskSummary taskSummary = null;
try {
taskSummary = executeInstance.getTaskSummary(JDBC_SQL_OFFLINE_TASK_NAME);
} catch (OdpsException e) {
// update count become uncertain here
connHandle.log.warn(
"Failed to get TaskSummary: instance_id=" + executeInstance.getId() + ", taskname="
+ JDBC_SQL_OFFLINE_TASK_NAME);
connHandle.log.info("InstanceId: " + executeInstance.getId());
}
if (isUpdate) {
if (executeInstance != null) {
executeInstance.waitForSuccess();
Instance.TaskSummary taskSummary = null;
try {
taskSummary = executeInstance.getTaskSummary(JDBC_SQL_OFFLINE_TASK_NAME);
} catch (OdpsException e) {
// update count become uncertain here
connHandle.log.warn(
"Failed to get TaskSummary: instance_id=" + executeInstance.getId() + ", taskname="
+ JDBC_SQL_OFFLINE_TASK_NAME);
}
if (taskSummary != null) {
updateCount = Utils.getSinkCountFromTaskSummary(
StringEscapeUtils.unescapeJava(taskSummary.getJsonSummary()));
connHandle.log.debug("successfully updated " + updateCount + " records");
} else {
connHandle.log.warn("task summary is empty");
}
}

if (taskSummary != null) {
updateCount = Utils.getSinkCountFromTaskSummary(
StringEscapeUtils.unescapeJava(taskSummary.getJsonSummary()));
connHandle.log.debug("successfully updated " + updateCount + " records");
} else {
connHandle.log.warn("task summary is empty");
// 如果是DML或者DDL,即使有结果也视为无结果
odpsResultSet = null;
} else {
setResultSetInternal();
}
long end = System.currentTimeMillis();
if (executeInstance != null) {
connHandle.log.info("It took me " + (end - begin) + " ms to run sql, instanceId: "
+ executeInstance.getId());
} else {
connHandle.log.info("It took me " + (end - begin) + " ms to run sql");
}
logviewUrl = executor.getLogView();
if (logviewUrl != null) {
connHandle.log.info("LogView: " + logviewUrl);
}
List<String> exeLog = executor.getExecutionLog();
if (!exeLog.isEmpty()) {
for (String log : exeLog) {
connHandle.log.info("Session execution log: " + log);
}
}
// 如果是DML或者DDL,即使有结果也视为无结果
odpsResultSet = null;
} else {
} catch (OdpsException | IOException e) {
String logview = null;
try {
setResultSetInternal();
List<String> exeLog = executor.getExecutionLog();
if (!exeLog.isEmpty()) {
for (String log : exeLog) {
connHandle.log.warn("Session execution log:" + log);
}
}
} catch (IOException e) {
connHandle.log.error("Run SQL failed", e);
throw new SQLException("execute sql [" + sql + "] instance:["
+ executor.getInstance().getId() + "] failed: " + e.getMessage(), e);
} catch (OdpsException e) {
connHandle.log.error("Run SQL failed", e);
throw new SQLException("execute sql [" + sql + "] instance:["
+ executor.getInstance().getId() + "] failed: " + e.getMessage(), e);
logview = executor.getLogView();
} catch (Exception ignored) {
}
throwSQLException(e, sql, executor.getInstance(), logview);
}
}

long end = System.currentTimeMillis();
if (executeInstance != null) {
connHandle.log.info("It took me " + (end - begin) + " ms to run sql, instanceId: "
+ executeInstance.getId());
warningChain = new SQLWarning("instance id: " + executeInstance.getId());
} else {
connHandle.log.info("It took me " + (end - begin) + " ms to run sql");
warningChain = new SQLWarning();
}
logviewUrl = executor.getLogView();
if (logviewUrl != null) {
connHandle.log.info("LogView: " + logviewUrl);
}
private void throwSQLException(Exception e, String sql, Instance instance, String logviewUrl) throws SQLException {
connHandle.log.error("LogView: " + logviewUrl);
connHandle.log.error("Run SQL failed", e);
throw new SQLException(
"execute sql [ " + sql + " ] + failed. " + (instance == null ? "" : "instanceId:["
+ instance.getId()
+ "]")
+ e.getMessage(), e);
}

private void runSQL(String sql, Properties properties) throws SQLException {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/aliyun/odps/jdbc/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private static String getRawResult(Odps odps, String sql) throws SQLException {
i.waitForSuccess();
return i.getTaskResults().get("AnonymousSQLTask");
} catch (OdpsException e) {
throw new SQLException(e);
throw new SQLException(e.getMessage(), e);
}
}

Expand Down

0 comments on commit d32da13

Please sign in to comment.