Skip to content

Commit

Permalink
[to #56334367] support upload to acid2 table
Browse files Browse the repository at this point in the history
## 概述
这次代码变更主要优化了`OdpsPreparedStatement`类,增加了对ACID表的支持,以及改进了批量插入数据的方式。新版本能够处理包含项目名、模式名和表名的插入语句,并引入了一个新的`DataUploader`接口来处理数据上传到ODPS的过程。此外,还移除了不支持的方法以简化代码并提高效率。

Link: https://code.alibaba-inc.com/odps/aliyun-odps-jdbc/codereview/16572699
  • Loading branch information
idleyui authored and dingxin-tech committed May 7, 2024
1 parent 2757415 commit a8d1225
Show file tree
Hide file tree
Showing 7 changed files with 584 additions and 316 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
</distributionManagement>

<properties>
<sdk.version>0.45.2-public</sdk.version>
<sdk.version>0.48.1-public-SNAPSHOT</sdk.version>
<junit.version>4.11</junit.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
Expand Down
234 changes: 234 additions & 0 deletions src/main/java/com/aliyun/odps/jdbc/AbstractOdpsPreparedStatement.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package com.aliyun.odps.jdbc;

import java.io.InputStream;
import java.io.Reader;
import java.net.URL;
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.NClob;
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.Ref;
import java.sql.RowId;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLXML;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Calendar;

public abstract class AbstractOdpsPreparedStatement extends OdpsStatement implements PreparedStatement {


AbstractOdpsPreparedStatement(OdpsConnection conn) {
super(conn);
}

AbstractOdpsPreparedStatement(OdpsConnection conn, boolean isResultSetScrollable) {
super(conn, isResultSetScrollable);
}

@Override
public ParameterMetaData getParameterMetaData() throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setArray(int parameterIndex, Array x) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setAsciiStream(int parameterIndex, InputStream x, int length)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setAsciiStream(int parameterIndex, InputStream x, long length)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setBinaryStream(int parameterIndex, InputStream x, int length)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setBinaryStream(int parameterIndex, InputStream x, long length)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setBlob(int parameterIndex, Blob x) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setBlob(int parameterIndex, InputStream inputStream, long length)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setCharacterStream(int parameterIndex, Reader reader)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setCharacterStream(int parameterIndex, Reader reader, int length)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setCharacterStream(int parameterIndex, Reader reader, long length)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setClob(int parameterIndex, Clob x) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setClob(int parameterIndex, Reader reader) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setClob(int parameterIndex, Reader reader, long length)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setNCharacterStream(int parameterIndex, Reader value)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setNCharacterStream(int parameterIndex, Reader value, long length)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setNClob(int parameterIndex, NClob value) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setNClob(int parameterIndex, Reader reader) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setNClob(int parameterIndex, Reader reader, long length)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setNString(int parameterIndex, String value) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setObject(int parameterIndex, Object x, int targetSqlType)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setRef(int parameterIndex, Ref x) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setRowId(int parameterIndex, RowId x) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setURL(int parameterIndex, URL x) throws SQLException {
throw new SQLFeatureNotSupportedException();
}

@Override
public void setUnicodeStream(int parameterIndex, InputStream x, int length)
throws SQLException {
throw new SQLFeatureNotSupportedException();
}

}
60 changes: 60 additions & 0 deletions src/main/java/com/aliyun/odps/jdbc/AcidTableUploader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.aliyun.odps.jdbc;

import static java.lang.String.format;

import java.io.IOException;
import java.sql.SQLException;
import java.util.List;

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.impl.UpsertRecord;
import com.aliyun.odps.tunnel.streams.UpsertStream;

public class AcidTableUploader extends DataUploader {

private TableTunnel.UpsertSession upsertSession;

public AcidTableUploader(String projectName,
String schemaName,
String tableName,
String partitionSpec,
List<String> specificColumns,
OdpsConnection conn) throws OdpsException, IOException {
super(projectName, schemaName, tableName, partitionSpec, specificColumns, conn);
}

protected void setUpSession() throws OdpsException, IOException {
TableTunnel.UpsertSession.Builder builder = tunnel.buildUpsertSession(projectName, tableName);

if (null != partitionSpec) {
builder.setPartitionSpec(partitionSpec);
}

upsertSession = builder.build();
conn.log.info("create upsert session id=" + upsertSession.getId());
reuseRecord = (UpsertRecord) upsertSession.newRecord();
tableSchema = upsertSession.getSchema();
}

protected void upload(List<Object[]> data, int batchSize, int[] updateCounts)
throws OdpsException, IOException, SQLException {
UpsertStream stream = upsertSession.buildUpsertStream().build();

for (int i = 0; i < data.size(); i++) {
Object[] row = data.get(i);
setReusedRecord(row, tableSchema);
stream.upsert(reuseRecord);
updateCounts[i] = 1;
}

stream.close();
}

public void commit() throws TunnelException, IOException {
if (upsertSession != null) {
upsertSession.commit(false);
}
}
}
73 changes: 73 additions & 0 deletions src/main/java/com/aliyun/odps/jdbc/BasicTableUploader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.aliyun.odps.jdbc;

import static java.lang.String.format;

import java.io.IOException;
import java.sql.SQLException;
import java.util.List;

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.io.TunnelRecordWriter;

public class BasicTableUploader extends DataUploader {

private TableTunnel.UploadSession uploadSession;
private int blocks = 0;

public BasicTableUploader(String projectName, String schemaName, String tableName,
String partitionSpec, List<String> specificColumns, OdpsConnection conn)
throws OdpsException, IOException {
super(projectName, schemaName, tableName, partitionSpec, specificColumns, conn);
}


public void setUpSession() throws OdpsException {
if (null != partitionSpec) {
uploadSession = tunnel.createUploadSession(projectName, tableName, partitionSpec);
} else {
uploadSession = tunnel.createUploadSession(projectName, tableName);
}

conn.log.info("create upload session id=" + uploadSession.getId());
reuseRecord = (ArrayRecord) uploadSession.newRecord();
tableSchema = uploadSession.getSchema();
}


protected void upload(List<Object[]> batchedRows, int batchedSize, int[] updateCounts)
throws OdpsException, IOException, SQLException {

long startTime = System.currentTimeMillis();

try(TunnelRecordWriter recordWriter = (TunnelRecordWriter) uploadSession.openRecordWriter(blocks, true)) {
for (int i = 0; i < batchedSize; i++) {
Object[] row = batchedRows.get(i);
setReusedRecord(row, tableSchema);
recordWriter.write(reuseRecord);
updateCounts[i] = 1;
}

long duration = System.currentTimeMillis() - startTime;
float megaBytesPerSec = (float) recordWriter.getTotalBytes() / 1024 / 1024 / duration * 1000;
conn.log.info(format("It took me %d ms to insert %d records [%d], %.2f MiB/s", duration,
batchedSize,
blocks, megaBytesPerSec));
blocks += 1;
}
}


public void commit() throws TunnelException, IOException {
if (uploadSession != null && blocks > 0) {
Long[] blockList = new Long[blocks];
conn.log.info("commit session: " + blocks + " blocks");
for (int i = 0; i < blocks; i++) {
blockList[i] = Long.valueOf(i);
}
uploadSession.commit(blockList);
}
}
}
Loading

0 comments on commit a8d1225

Please sign in to comment.