diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java index 8dcc20c74..89b2129d9 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java @@ -166,7 +166,7 @@ public void init(Configuration readerSliceConfig) } public void startRead(Configuration readerSliceConfig, RecordSender recordSender, - TaskPluginCollector taskPluginCollector, int fetchSize) + TaskPluginCollector taskPluginCollector, int fetchSize) { String querySql = readerSliceConfig.getString(Key.QUERY_SQL); @@ -223,7 +223,7 @@ public void destroy(Configuration originalConfig) } protected void transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, - int columnNumber, TaskPluginCollector taskPluginCollector) + int columnNumber, TaskPluginCollector taskPluginCollector) { Record record = buildRecord(recordSender, rs, metaData, columnNumber, taskPluginCollector); recordSender.sendToWriter(record); @@ -272,7 +272,12 @@ protected Column createColumn(ResultSet rs, ResultSetMetaData metaData, int i) return new DateColumn(rs.getDate(i)); case Types.TIMESTAMP: - return new TimestampColumn(rs.getTimestamp(i, Calendar.getInstance())); + if(!"org.apache.hive.jdbc.HiveDriver".equals(this.dataBaseType.getDriverClassName())){ + return new TimestampColumn(rs.getTimestamp(i, Calendar.getInstance())); + }else{ + //hive not support method(Timestamp getTimestamp(int columnIndex, Calendar cal)) + return new TimestampColumn(rs.getTimestamp(i)); + } case Types.BINARY: case Types.VARBINARY: @@ -311,7 +316,7 @@ protected Column createColumn(ResultSet rs, ResultSetMetaData metaData, int i) } protected Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, - TaskPluginCollector taskPluginCollector) + TaskPluginCollector taskPluginCollector) { Record record = recordSender.createRecord(); diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java index b087d228b..9180a57cb 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR; @@ -118,7 +119,11 @@ private static void dealJdbcAndTable(Configuration originalConfig) String driverClass = connConf.getString(Key.JDBC_DRIVER, null); if (driverClass != null && !driverClass.isEmpty()) { LOG.warn("use specified driver class: {}", driverClass); - dataBaseType.setDriverClassName(driverClass); + + + Arrays.stream(DataBaseType.values()).filter( + d -> d.getDriverClassName().equals(driverClass)).findFirst().ifPresent(d -> + dataBaseType=d); } connConf.getNecessaryValue(Key.JDBC_URL, REQUIRED_VALUE); diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java index caac6b37c..55e4bd727 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java @@ -50,7 +50,15 @@ public enum DataBaseType Sybase("sybase", "com.sybase.jdbc4.jdbc.SybDriver"), Databend("databend", "com.databend.jdbc.DatabendDriver"), Access("access","net.ucanaccess.jdbc.UcanaccessDriver"), - HANA("hana", "com.sap.db.jdbc.Driver"); + HANA("hana", "com.sap.db.jdbc.Driver"), + VERTICA("vertica", "com.vertica.jdbc.Driver"), + DM("dm","dm.jdbc.driver.DmDriver"), + OSCAR("oscar","com.oscar.Driver"), + KINGBASE8("kingbase8","com.kingbase8.Driver"), + HIGHGO("highgo","com.highgo.jdbc.Driver"), + OCEANBASE("oceanbase","com.alipay.oceanbase.jdbc.Driver"), + GOLDENDB("goldendb","com.goldendb.jdbc.Driver"), + GBASEDBT("gbasedbt-sqli","com.gbasedbt.jdbc.Driver"); private static final Pattern jdbcUrlPattern = Pattern.compile("jdbc:\\w+:(?:thin:url=|//|thin:@|)([\\w\\d.,]+).*"); @@ -162,8 +170,4 @@ public String getTypeName() return typeName; } - public void setDriverClassName(String driverClassName) - { - this.driverClassName = driverClassName; - } } diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/writer/util/OriginalConfPretreatmentUtil.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/writer/util/OriginalConfPretreatmentUtil.java index 0bc49d743..cc953df08 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/writer/util/OriginalConfPretreatmentUtil.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/writer/util/OriginalConfPretreatmentUtil.java @@ -38,6 +38,7 @@ import java.sql.Connection; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR; @@ -92,7 +93,9 @@ public static void simplifyConf(Configuration originalConfig) String driverClass = connConf.getString(Key.JDBC_DRIVER, null); if (driverClass != null && !driverClass.isEmpty()) { LOG.warn("Use specified driver class [{}]", driverClass); - dataBaseType.setDriverClassName(driverClass); + Arrays.stream(DataBaseType.values()).filter( + d -> d.getDriverClassName().equals(driverClass)).findFirst().ifPresent(d -> + dataBaseType=d); } String jdbcUrl = connConf.getString(Key.JDBC_URL); if (StringUtils.isBlank(jdbcUrl)) { diff --git a/plugin/reader/rdbmsreader/src/main/java/com/wgzhao/addax/plugin/reader/rdbmsreader/RdbmsReader.java b/plugin/reader/rdbmsreader/src/main/java/com/wgzhao/addax/plugin/reader/rdbmsreader/RdbmsReader.java index 3e3d30fd7..c3e59fe0c 100644 --- a/plugin/reader/rdbmsreader/src/main/java/com/wgzhao/addax/plugin/reader/rdbmsreader/RdbmsReader.java +++ b/plugin/reader/rdbmsreader/src/main/java/com/wgzhao/addax/plugin/reader/rdbmsreader/RdbmsReader.java @@ -40,7 +40,7 @@ public class RdbmsReader extends Reader { - private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS; + private static DataBaseType DATABASE_TYPE = DataBaseType.RDBMS; public static class Job extends Reader.Job @@ -71,11 +71,13 @@ public void init() final String jdbcType = connection.getString(Key.JDBC_URL).split(":")[1]; Arrays.stream(DataBaseType.values()).filter( dataBaseType -> dataBaseType.getTypeName().equals(jdbcType)).findFirst().ifPresent(dataBaseType -> - DATABASE_TYPE.setDriverClassName(dataBaseType.getDriverClassName())); + DATABASE_TYPE=dataBaseType); } else { // use custom jdbc driver - DATABASE_TYPE.setDriverClassName(jdbcDriver); + Arrays.stream(DataBaseType.values()).filter( + dataBaseType -> dataBaseType.getDriverClassName().equals(jdbcDriver)).findFirst().ifPresent(dataBaseType -> + DATABASE_TYPE=dataBaseType); } this.commonRdbmsReaderMaster = new SubCommonRdbmsReader.Job(DATABASE_TYPE); this.originalConfig = this.commonRdbmsReaderMaster.init(this.originalConfig); diff --git a/plugin/writer/rdbmswriter/src/main/java/com/wgzhao/addax/plugin/writer/rdbmswriter/RdbmsWriter.java b/plugin/writer/rdbmswriter/src/main/java/com/wgzhao/addax/plugin/writer/rdbmswriter/RdbmsWriter.java index 48112ac60..9c93ce99e 100644 --- a/plugin/writer/rdbmswriter/src/main/java/com/wgzhao/addax/plugin/writer/rdbmswriter/RdbmsWriter.java +++ b/plugin/writer/rdbmswriter/src/main/java/com/wgzhao/addax/plugin/writer/rdbmswriter/RdbmsWriter.java @@ -29,6 +29,7 @@ import com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter; import org.apache.commons.lang3.StringUtils; +import java.util.Arrays; import java.util.List; import static com.wgzhao.addax.common.base.Key.JDBC_DRIVER; @@ -38,7 +39,7 @@ public class RdbmsWriter extends Writer { - private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS; + private static DataBaseType DATABASE_TYPE = DataBaseType.RDBMS; public static class Job extends Writer.Job @@ -63,7 +64,9 @@ public void init() throw AddaxException.asAddaxException(REQUIRED_VALUE, "config 'driver' is required and must not be empty"); } // use special jdbc driver class - DATABASE_TYPE.setDriverClassName(jdbcDriver); + Arrays.stream(DataBaseType.values()).filter( + d -> d.getDriverClassName().equals(jdbcDriver)).findFirst().ifPresent(d -> + DATABASE_TYPE=d); this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE); commonRdbmsWriterJob.init(originalConfig); }