From a8d1225af654cf36cc0e2b5db5b866ffbf1de59e Mon Sep 17 00:00:00 2001 From: "wuyue.lyd" Date: Tue, 7 May 2024 14:48:50 +0800 Subject: [PATCH] [to #56334367] support upload to acid2 table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 概述 这次代码变更主要优化了`OdpsPreparedStatement`类,增加了对ACID表的支持,以及改进了批量插入数据的方式。新版本能够处理包含项目名、模式名和表名的插入语句,并引入了一个新的`DataUploader`接口来处理数据上传到ODPS的过程。此外,还移除了不支持的方法以简化代码并提高效率。 Link: https://code.alibaba-inc.com/odps/aliyun-odps-jdbc/codereview/16572699 --- pom.xml | 2 +- .../jdbc/AbstractOdpsPreparedStatement.java | 234 +++++++++++ .../aliyun/odps/jdbc/AcidTableUploader.java | 60 +++ .../aliyun/odps/jdbc/BasicTableUploader.java | 73 ++++ .../com/aliyun/odps/jdbc/DataUploader.java | 116 ++++++ .../odps/jdbc/OdpsPreparedStatement.java | 362 +++--------------- .../odps/jdbc/OdpsPreparedStatementTest.java | 53 +++ 7 files changed, 584 insertions(+), 316 deletions(-) create mode 100644 src/main/java/com/aliyun/odps/jdbc/AbstractOdpsPreparedStatement.java create mode 100644 src/main/java/com/aliyun/odps/jdbc/AcidTableUploader.java create mode 100644 src/main/java/com/aliyun/odps/jdbc/BasicTableUploader.java create mode 100644 src/main/java/com/aliyun/odps/jdbc/DataUploader.java diff --git a/pom.xml b/pom.xml index bd6e69d7..09eb365a 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ - 0.45.2-public + 0.48.1-public-SNAPSHOT 4.11 UTF-8 1.8 diff --git a/src/main/java/com/aliyun/odps/jdbc/AbstractOdpsPreparedStatement.java b/src/main/java/com/aliyun/odps/jdbc/AbstractOdpsPreparedStatement.java new file mode 100644 index 00000000..e2cca785 --- /dev/null +++ b/src/main/java/com/aliyun/odps/jdbc/AbstractOdpsPreparedStatement.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package com.aliyun.odps.jdbc; + +import java.io.InputStream; +import java.io.Reader; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Ref; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; + +public abstract class AbstractOdpsPreparedStatement extends OdpsStatement implements PreparedStatement { + + + AbstractOdpsPreparedStatement(OdpsConnection conn) { + super(conn); + } + + AbstractOdpsPreparedStatement(OdpsConnection conn, boolean isResultSetScrollable) { + super(conn, isResultSetScrollable); + } + + @Override + public ParameterMetaData getParameterMetaData() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setArray(int parameterIndex, Array x) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, int length) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setAsciiStream(int parameterIndex, InputStream x, long length) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, int length) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x, long length) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setBlob(int parameterIndex, Blob x) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setBlob(int parameterIndex, InputStream inputStream, long length) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, int length) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setCharacterStream(int parameterIndex, Reader reader, long length) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setClob(int parameterIndex, Clob x) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setClob(int parameterIndex, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setClob(int parameterIndex, Reader reader, long length) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setNCharacterStream(int parameterIndex, Reader value, long length) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setNClob(int parameterIndex, NClob value) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setNClob(int parameterIndex, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setNClob(int parameterIndex, Reader reader, long length) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setNString(int parameterIndex, String value) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setRef(int parameterIndex, Ref x) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setRowId(int parameterIndex, RowId x) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setURL(int parameterIndex, URL x) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void setUnicodeStream(int parameterIndex, InputStream x, int length) + throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + +} \ No newline at end of file diff --git a/src/main/java/com/aliyun/odps/jdbc/AcidTableUploader.java b/src/main/java/com/aliyun/odps/jdbc/AcidTableUploader.java new file mode 100644 index 00000000..809010f7 --- /dev/null +++ b/src/main/java/com/aliyun/odps/jdbc/AcidTableUploader.java @@ -0,0 +1,60 @@ +package com.aliyun.odps.jdbc; + +import static java.lang.String.format; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.tunnel.TableTunnel; +import com.aliyun.odps.tunnel.TunnelException; +import com.aliyun.odps.tunnel.impl.UpsertRecord; +import com.aliyun.odps.tunnel.streams.UpsertStream; + +public class AcidTableUploader extends DataUploader { + + private TableTunnel.UpsertSession upsertSession; + + public AcidTableUploader(String projectName, + String schemaName, + String tableName, + String partitionSpec, + List specificColumns, + OdpsConnection conn) throws OdpsException, IOException { + super(projectName, schemaName, tableName, partitionSpec, specificColumns, conn); + } + + protected void setUpSession() throws OdpsException, IOException { + TableTunnel.UpsertSession.Builder builder = tunnel.buildUpsertSession(projectName, tableName); + + if (null != partitionSpec) { + builder.setPartitionSpec(partitionSpec); + } + + upsertSession = builder.build(); + conn.log.info("create upsert session id=" + upsertSession.getId()); + reuseRecord = (UpsertRecord) upsertSession.newRecord(); + tableSchema = upsertSession.getSchema(); + } + + protected void upload(List data, int batchSize, int[] updateCounts) + throws OdpsException, IOException, SQLException { + UpsertStream stream = upsertSession.buildUpsertStream().build(); + + for (int i = 0; i < data.size(); i++) { + Object[] row = data.get(i); + setReusedRecord(row, tableSchema); + stream.upsert(reuseRecord); + updateCounts[i] = 1; + } + + stream.close(); + } + + public void commit() throws TunnelException, IOException { + if (upsertSession != null) { + upsertSession.commit(false); + } + } +} diff --git a/src/main/java/com/aliyun/odps/jdbc/BasicTableUploader.java b/src/main/java/com/aliyun/odps/jdbc/BasicTableUploader.java new file mode 100644 index 00000000..6d2cc332 --- /dev/null +++ b/src/main/java/com/aliyun/odps/jdbc/BasicTableUploader.java @@ -0,0 +1,73 @@ +package com.aliyun.odps.jdbc; + +import static java.lang.String.format; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.data.ArrayRecord; +import com.aliyun.odps.tunnel.TableTunnel; +import com.aliyun.odps.tunnel.TunnelException; +import com.aliyun.odps.tunnel.io.TunnelRecordWriter; + +public class BasicTableUploader extends DataUploader { + + private TableTunnel.UploadSession uploadSession; + private int blocks = 0; + + public BasicTableUploader(String projectName, String schemaName, String tableName, + String partitionSpec, List specificColumns, OdpsConnection conn) + throws OdpsException, IOException { + super(projectName, schemaName, tableName, partitionSpec, specificColumns, conn); + } + + + public void setUpSession() throws OdpsException { + if (null != partitionSpec) { + uploadSession = tunnel.createUploadSession(projectName, tableName, partitionSpec); + } else { + uploadSession = tunnel.createUploadSession(projectName, tableName); + } + + conn.log.info("create upload session id=" + uploadSession.getId()); + reuseRecord = (ArrayRecord) uploadSession.newRecord(); + tableSchema = uploadSession.getSchema(); + } + + + protected void upload(List batchedRows, int batchedSize, int[] updateCounts) + throws OdpsException, IOException, SQLException { + + long startTime = System.currentTimeMillis(); + + try(TunnelRecordWriter recordWriter = (TunnelRecordWriter) uploadSession.openRecordWriter(blocks, true)) { + for (int i = 0; i < batchedSize; i++) { + Object[] row = batchedRows.get(i); + setReusedRecord(row, tableSchema); + recordWriter.write(reuseRecord); + updateCounts[i] = 1; + } + + long duration = System.currentTimeMillis() - startTime; + float megaBytesPerSec = (float) recordWriter.getTotalBytes() / 1024 / 1024 / duration * 1000; + conn.log.info(format("It took me %d ms to insert %d records [%d], %.2f MiB/s", duration, + batchedSize, + blocks, megaBytesPerSec)); + blocks += 1; + } + } + + + public void commit() throws TunnelException, IOException { + if (uploadSession != null && blocks > 0) { + Long[] blockList = new Long[blocks]; + conn.log.info("commit session: " + blocks + " blocks"); + for (int i = 0; i < blocks; i++) { + blockList[i] = Long.valueOf(i); + } + uploadSession.commit(blockList); + } + } +} diff --git a/src/main/java/com/aliyun/odps/jdbc/DataUploader.java b/src/main/java/com/aliyun/odps/jdbc/DataUploader.java new file mode 100644 index 00000000..599ed4df --- /dev/null +++ b/src/main/java/com/aliyun/odps/jdbc/DataUploader.java @@ -0,0 +1,116 @@ +package com.aliyun.odps.jdbc; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import com.aliyun.odps.Column; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.PartitionSpec; +import com.aliyun.odps.Table; +import com.aliyun.odps.TableSchema; +import com.aliyun.odps.data.ArrayRecord; +import com.aliyun.odps.jdbc.utils.transformer.to.odps.AbstractToOdpsTransformer; +import com.aliyun.odps.jdbc.utils.transformer.to.odps.ToOdpsTransformerFactory; +import com.aliyun.odps.tunnel.TableTunnel; +import com.aliyun.odps.tunnel.TunnelException; +import com.aliyun.odps.utils.StringUtils; + +public abstract class DataUploader { + + protected String projectName; + protected String schemaName; + protected String tableName; + protected PartitionSpec partitionSpec; + protected List specificColumns; + protected OdpsConnection conn; + + protected TableTunnel tunnel; + protected Table table; + protected TableSchema tableSchema; + protected ArrayRecord reuseRecord; + + public DataUploader(String projectName, + String schemaName, + String tableName, + String partitionSpec, + List specificColumns, + OdpsConnection conn) + throws OdpsException, IOException { + this.projectName = projectName; + this.schemaName = schemaName; + this.tableName = tableName; + if (!StringUtils.isNullOrEmpty(partitionSpec)) { + this.partitionSpec = new PartitionSpec(partitionSpec); + } + this.specificColumns = specificColumns; + this.conn = conn; + + tunnel = new TableTunnel(conn.getOdps()); + Table table = conn.getOdps().tables().get(projectName, tableName); + if (partitionSpec != null && !table.hasPartition(this.partitionSpec)) { + table.createPartition(this.partitionSpec); + } + setUpSession(); + + if (specificColumns == null) { + this.specificColumns = tableSchema.getColumns().stream().map(Column::getName).collect(Collectors.toList()); + } + } + + public static DataUploader build(String projectName, + String schemaName, + String tableName, + String partitionSpec, + List specificColumns, + OdpsConnection conn) throws OdpsException, IOException { + Table table = conn.getOdps().tables().get(projectName, tableName); + if (table.isTransactional() && table.getPrimaryKey() != null && !table.getPrimaryKey().isEmpty()) { + return new AcidTableUploader(projectName, schemaName, tableName, partitionSpec, specificColumns, conn); + } else { + return new BasicTableUploader(projectName, schemaName, tableName, partitionSpec, specificColumns, conn); + } + } + + protected abstract void setUpSession() throws OdpsException, IOException; + + + public int[] upload(List batchedRows) throws SQLException { + + int batchedSize = batchedRows.size(); + if (batchedSize == 0) { + return new int[0]; + } + + conn.log.info(batchedSize + " records are going to be uploaded to table " + + projectName + "." + tableName + " in batch"); + + int[] updateCounts = new int[batchedSize]; + Arrays.fill(updateCounts, -1); + + try { + upload(batchedRows, batchedSize, updateCounts); + } catch (Exception e) { + throw new SQLException(e); + //TODO + } + + return updateCounts; + } + + protected abstract void upload(List batchedRows, int batchedSize, int[] updateCounts) + throws OdpsException, IOException, SQLException; + + protected void setReusedRecord(Object[] row, TableSchema schema) throws SQLException { + for (int i = 0; i < specificColumns.size(); i++) { + String columnName = specificColumns.get(i); + AbstractToOdpsTransformer transformer = ToOdpsTransformerFactory.getTransformer( + schema.getColumn(columnName).getTypeInfo().getOdpsType()); + reuseRecord.set(columnName, transformer.transform(row[i], conn.getCharset())); + } + } + + public abstract void commit() throws TunnelException, IOException; +} diff --git a/src/main/java/com/aliyun/odps/jdbc/OdpsPreparedStatement.java b/src/main/java/com/aliyun/odps/jdbc/OdpsPreparedStatement.java index e16a3e86..5d37f816 100644 --- a/src/main/java/com/aliyun/odps/jdbc/OdpsPreparedStatement.java +++ b/src/main/java/com/aliyun/odps/jdbc/OdpsPreparedStatement.java @@ -21,25 +21,12 @@ package com.aliyun.odps.jdbc; import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; -import java.net.URL; -import java.sql.Array; -import java.sql.Blob; -import java.sql.Clob; import java.sql.Date; -import java.sql.NClob; -import java.sql.ParameterMetaData; -import java.sql.PreparedStatement; -import java.sql.Ref; import java.sql.ResultSet; import java.sql.ResultSetMetaData; -import java.sql.RowId; import java.sql.SQLException; -import java.sql.SQLFeatureNotSupportedException; -import java.sql.SQLXML; import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; @@ -53,32 +40,20 @@ import java.util.Calendar; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import com.aliyun.odps.Column; import com.aliyun.odps.OdpsException; -import com.aliyun.odps.PartitionSpec; -import com.aliyun.odps.Table; -import com.aliyun.odps.TableSchema; -import com.aliyun.odps.data.ArrayRecord; import com.aliyun.odps.data.Binary; import com.aliyun.odps.data.Char; import com.aliyun.odps.data.Varchar; import com.aliyun.odps.jdbc.utils.JdbcColumn; -import com.aliyun.odps.jdbc.utils.transformer.to.odps.AbstractToOdpsTransformer; -import com.aliyun.odps.jdbc.utils.transformer.to.odps.ToOdpsTransformerFactory; import com.aliyun.odps.sqa.commandapi.utils.SqlParserUtil; -import com.aliyun.odps.tunnel.TableTunnel; import com.aliyun.odps.tunnel.TunnelException; -import com.aliyun.odps.tunnel.io.TunnelRecordWriter; -import com.aliyun.odps.type.TypeInfo; -import com.aliyun.odps.utils.StringUtils; -public class OdpsPreparedStatement extends OdpsStatement implements PreparedStatement { +public class OdpsPreparedStatement extends AbstractOdpsPreparedStatement { private final String TABLE_NAME = "((\\w+\\.)?\\w+)"; // "proj.name" or "name" private final String PREP_VALUES = "\\((\\s*\\?\\s*)(,\\s*\\?\\s*)*\\)"; // "(?)" or "(?,?,...)" @@ -130,17 +105,18 @@ public class OdpsPreparedStatement extends OdpsStatement implements PreparedStat */ private final String sql; - private boolean verified = false; + private boolean parsed = false; private String tableBatchInsertTo; + private String projectName; + private String schemaName; + private String tableName; private String partitionSpec; private List specificColumns; private int parametersNum; - TableTunnel.UploadSession session; - ArrayRecord reuseRecord; - int blocks; + private DataUploader uploader; /** * The parameters for the prepared sql (index=>parameter). The parameter is stored as Java objects @@ -207,73 +183,51 @@ public void clearBatch() throws SQLException { */ @Override public int[] executeBatch() throws SQLException { - if (!verified) { - boolean withSpecPartition = sql.matches(PREP_INSERT_WITH_SPEC_PARTITION); - boolean withoutSpecPartition = sql.matches(PREP_INSERT_WITHOUT_SPEC_PARTITION); + if (!parsed) { + parse(); + } - if (!withoutSpecPartition && !withSpecPartition) { - throw new SQLException("batched statement only support following syntax: " + EXAMPLE); - } + int[] updateCounts = uploader.upload(batchedRows); + clearBatch(); + return updateCounts; + } - if (withoutSpecPartition) { - setSession(Pattern.compile(PREP_INSERT_WITHOUT_SPEC_PARTITION).matcher(sql), false); - } + private void parse() throws SQLException { - if (withSpecPartition) { - setSession(Pattern.compile(PREP_INSERT_WITH_SPEC_PARTITION).matcher(sql), true); - } - } + boolean withSpecPartition = sql.matches(PREP_INSERT_WITH_SPEC_PARTITION); + boolean withoutSpecPartition = sql.matches(PREP_INSERT_WITHOUT_SPEC_PARTITION); - int batchedSize = batchedRows.size(); - // if no sql is batched, just return - if (batchedSize == 0) { - return new int[0]; + if (!withoutSpecPartition && !withSpecPartition) { + throw new SQLException("batched statement only support following syntax: " + EXAMPLE); } - getConnection().log - .info(batchedSize + " records are going to be uploaded to table " + tableBatchInsertTo - + " in batch"); - - int[] updateCounts = new int[batchedSize]; - Arrays.fill(updateCounts, -1); - - long startTime = System.currentTimeMillis(); - try(TunnelRecordWriter recordWriter = (TunnelRecordWriter) session.openRecordWriter(blocks, true);) { - Map columnTypeMap = - session.getSchema().getColumns().stream() - .collect(Collectors.toMap(Column::getName, Column::getTypeInfo)); - for (int i = 0; i < batchedSize; i++) { - Object[] row = batchedRows.get(i); - for (int j = 0; j < specificColumns.size(); j++) { - String columnName = specificColumns.get(j); - AbstractToOdpsTransformer transformer = - ToOdpsTransformerFactory.getTransformer(columnTypeMap.get(columnName).getOdpsType()); - reuseRecord.set(columnName, transformer.transform(row[j], getConnection().getCharset())); - } - recordWriter.write(reuseRecord); - updateCounts[i] = 1; - } + Matcher matcher = null; + boolean hasPartition = false; - long duration = System.currentTimeMillis() - startTime; - float megaBytesPerSec = (float) recordWriter.getTotalBytes() / 1024 / 1024 / duration * 1000; - getConnection().log.info( - String.format("It took me %d ms to insert %d records [%d], %.2f MiB/s", duration, - batchedSize, - blocks, megaBytesPerSec)); - blocks += 1; - } catch (TunnelException | IOException e) { - throw new SQLException(e); + if (withoutSpecPartition) { + matcher = Pattern.compile(PREP_INSERT_WITHOUT_SPEC_PARTITION).matcher(sql); + hasPartition = false; } - clearBatch(); - return updateCounts; - } - private void setSession(Matcher matcher, boolean hasPartition) throws SQLException { + if (withSpecPartition) { + matcher = Pattern.compile(PREP_INSERT_WITH_SPEC_PARTITION).matcher(sql); + hasPartition = true; + } + + if (matcher.find()) { tableBatchInsertTo = matcher.group(1); if (hasPartition) { partitionSpec = matcher.group(4); } + if (tableBatchInsertTo.contains(".")) { + String[] splited = tableBatchInsertTo.split("\\."); + projectName = splited[0]; + tableName = splited[1]; + } else { + projectName = getConnection().getOdps().getDefaultProject(); + tableName = tableBatchInsertTo; + } } else { throw new SQLException("cannot extract table name or partition name in SQL: " + sql); } @@ -290,51 +244,16 @@ private void setSession(Matcher matcher, boolean hasPartition) throws SQLExcepti this.specificColumns = specificColumns; } - TableTunnel tunnel = new TableTunnel(getConnection().getOdps()); - // TODO 三层模型 try { - if (tableBatchInsertTo.contains(".")) { - String[] splited = tableBatchInsertTo.split("\\."); - String projectName = splited[0]; - String tableName = splited[1]; - - if (hasPartition && !StringUtils.isNullOrEmpty(partitionSpec)) { - Table table = getConnection().getOdps().tables().get(projectName, tableName); - PartitionSpec partition = new PartitionSpec(partitionSpec); - if (!table.hasPartition(partition)) { - table.createPartition(partition); - } - session = tunnel.createUploadSession(projectName, tableName, partition); - } else { - session = tunnel.createUploadSession(projectName, tableName); - } - } else { - String defaultProject = getConnection().getOdps().getDefaultProject(); - if (hasPartition && !StringUtils.isNullOrEmpty(partitionSpec)) { - Table table = getConnection().getOdps().tables().get(defaultProject, tableBatchInsertTo); - PartitionSpec partition = new PartitionSpec(partitionSpec); - if (!table.hasPartition(partition)) { - table.createPartition(partition); - } - session = tunnel.createUploadSession(defaultProject, tableBatchInsertTo, partition); - } else { - session = tunnel.createUploadSession(defaultProject, tableBatchInsertTo); - } - } - } catch (TunnelException e) { - throw new SQLException(e); + uploader = DataUploader.build(projectName, schemaName, tableName, partitionSpec, + specificColumns, getConnection()); } catch (OdpsException e) { throw new RuntimeException(e); + } catch (IOException e) { + throw new SQLException(e); } - getConnection().log.info("create upload session id=" + session.getId()); - reuseRecord = (ArrayRecord) session.newRecord(); - if (specificColumns == null) { - TableSchema schema = session.getSchema(); - this.specificColumns = - schema.getColumns().stream().map(Column::getName).collect(Collectors.toList()); - } - blocks = 0; - verified = true; + + parsed = true; } // Commit on close @@ -343,17 +262,11 @@ public void close() throws SQLException { if (isClosed()) { return; } - if (session != null && blocks > 0) { - Long[] blockList = new Long[blocks]; - getConnection().log.info("commit session: " + blocks + " blocks"); - for (int i = 0; i < blocks; i++) { - blockList[i] = Long.valueOf(i); - } + + if (uploader != null) { try { - session.commit(blockList); - } catch (TunnelException e) { - throw new SQLException(e); - } catch (IOException e) { + uploader.commit(); + } catch (TunnelException | IOException e) { throw new SQLException(e); } } @@ -405,143 +318,11 @@ public ResultSetMetaData getMetaData() throws SQLException { return getResultSet().getMetaData(); } - @Override - public ParameterMetaData getParameterMetaData() throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setArray(int parameterIndex, Array x) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setAsciiStream(int parameterIndex, InputStream x, int length) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setAsciiStream(int parameterIndex, InputStream x, long length) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setBinaryStream(int parameterIndex, InputStream x, int length) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setBinaryStream(int parameterIndex, InputStream x, long length) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setBlob(int parameterIndex, Blob x) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setBlob(int parameterIndex, InputStream inputStream, long length) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - @Override public void setBytes(int parameterIndex, byte[] x) throws SQLException { parameters.put(parameterIndex, x); } - @Override - public void setCharacterStream(int parameterIndex, Reader reader) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setCharacterStream(int parameterIndex, Reader reader, int length) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setCharacterStream(int parameterIndex, Reader reader, long length) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setClob(int parameterIndex, Clob x) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setClob(int parameterIndex, Reader reader) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setClob(int parameterIndex, Reader reader, long length) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setNCharacterStream(int parameterIndex, Reader value) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setNCharacterStream(int parameterIndex, Reader value, long length) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setNClob(int parameterIndex, NClob value) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setNClob(int parameterIndex, Reader reader) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setNClob(int parameterIndex, Reader reader, long length) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setNString(int parameterIndex, String value) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - @Override public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { @@ -673,55 +454,6 @@ public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { parameters.put(parameterIndex, x); } - @Override - public void setObject(int parameterIndex, Object x, int targetSqlType) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setRef(int parameterIndex, Ref x) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setRowId(int parameterIndex, RowId x) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setURL(int parameterIndex, URL x) throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - - @Override - public void setUnicodeStream(int parameterIndex, InputStream x, int length) - throws SQLException { - throw new SQLFeatureNotSupportedException(); - } - /** * Returns a new sql replacing the '?'s in the prepared sql to parameters. */ diff --git a/src/test/java/com/aliyun/odps/jdbc/OdpsPreparedStatementTest.java b/src/test/java/com/aliyun/odps/jdbc/OdpsPreparedStatementTest.java index 2718542e..dfa530f4 100644 --- a/src/test/java/com/aliyun/odps/jdbc/OdpsPreparedStatementTest.java +++ b/src/test/java/com/aliyun/odps/jdbc/OdpsPreparedStatementTest.java @@ -601,4 +601,57 @@ public void testSqlWithConstantMark() throws SQLException { Assert.assertEquals("test", resultSet.getString(1)); } } + + @Test + public void testAcid2Table() throws SQLException { + Connection connection = TestManager.getInstance().conn; + Statement ddl = connection.createStatement(); + ddl.executeUpdate("drop table if exists acid_table;"); + ddl.executeUpdate( + "create table acid_table(c1 string not null primary key) tblproperties (\"transactional\"=\"true\");"); + ddl.close(); + + PreparedStatement ps = connection.prepareStatement( + "insert into acid_table values (?);"); + ps.setString(1, "str"); + ps.addBatch(); + ps.executeBatch(); + + ps.setString(1, "str2"); + ps.addBatch(); + ps.executeBatch(); + + ps.close(); + + ResultSet resultSet = runQuery("select count(*) from acid_table;"); + resultSet.next(); + Assert.assertEquals(2, resultSet.getInt(1)); + + } + + @Test + public void testAcidTable() throws SQLException { + Connection connection = TestManager.getInstance().conn; + Statement ddl = connection.createStatement(); + ddl.executeUpdate("drop table if exists acid1_table;"); + ddl.executeUpdate( + "create table acid1_table(c1 string) tblproperties (\"transactional\"=\"true\");"); + ddl.close(); + PreparedStatement ps = connection.prepareStatement("insert into acid1_table values (?);"); + ps.setString(1, "str"); + ps.addBatch(); + ps.executeBatch(); + ps.setString(1, "str2"); + ps.addBatch(); + ps.executeBatch(); + ps.close(); + ResultSet resultSet = runQuery("select count(*) from acid1_table;"); + resultSet.next(); + Assert.assertEquals(2, resultSet.getInt(1)); + } + + ResultSet runQuery(String sql) throws SQLException { + Statement statement = TestManager.getInstance().conn.createStatement(); + return statement.executeQuery(sql); + } } \ No newline at end of file