diff --git a/README.md b/README.md index 067b194..58f19c0 100644 --- a/README.md +++ b/README.md @@ -78,4 +78,6 @@ $ ./gradlew gem # -t to watch change of files and rebuild continuously ## TEST +``` $ EMBULK_OUTPUT_DATABRICKS_TEST_CONFIG="example/test.yml" ./gradlew test # Create example/test.yml based on example/test.yml.example +``` diff --git a/src/main/java/org/embulk/output/DatabricksOutputPlugin.java b/src/main/java/org/embulk/output/DatabricksOutputPlugin.java index 4f24d50..3720edb 100644 --- a/src/main/java/org/embulk/output/DatabricksOutputPlugin.java +++ b/src/main/java/org/embulk/output/DatabricksOutputPlugin.java @@ -1,8 +1,6 @@ package org.embulk.output; import java.io.IOException; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; import org.embulk.config.ConfigDiff; @@ -171,86 +169,10 @@ protected void logConnectionProperties(String url, Properties props) { super.logConnectionProperties(url, maskedProps); } - // This is almost copy from AbstractJdbcOutputPlugin excepting validation of table exists in - // current schema public Optional newJdbcSchemaFromTableIfExists( JdbcOutputConnection connection, TableIdentifier table) throws SQLException { - if (!connection.tableExists(table)) { - // DatabaseMetaData.getPrimaryKeys fails if table does not exist - return Optional.empty(); - } - - DatabricksOutputConnection conn = (DatabricksOutputConnection) connection; - DatabaseMetaData dbm = connection.getMetaData(); - String escape = dbm.getSearchStringEscape(); - - ResultSet rs = - dbm.getPrimaryKeys(conn.getCatalogName(), table.getSchemaName(), table.getTableName()); - final HashSet primaryKeysBuilder = new HashSet<>(); - try { - while (rs.next()) { - if (!((DatabricksOutputConnection) connection) - .isAvailableTableMetadataInConnection(rs, table)) { - continue; - } - primaryKeysBuilder.add(rs.getString("COLUMN_NAME")); - } - } finally { - rs.close(); - } - final Set primaryKeys = Collections.unmodifiableSet(primaryKeysBuilder); - - final ArrayList builder = new ArrayList<>(); - // NOTE: Columns of TIMESTAMP_NTZ, INTERVAL are not included in getColumns result. - // This cause runtime sql exception when copy into. - // (probably because of unsupported in databricks jdbc) - // https://docs.databricks.com/en/sql/language-manual/data-types/interval-type.html - // https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html#notes - rs = - dbm.getColumns( - JdbcUtils.escapeSearchString(conn.getCatalogName(), escape), - JdbcUtils.escapeSearchString(table.getSchemaName(), escape), - JdbcUtils.escapeSearchString(table.getTableName(), escape), - null); - try { - while (rs.next()) { - if (!((DatabricksOutputConnection) connection) - .isAvailableTableMetadataInConnection(rs, table)) { - continue; - } - String columnName = rs.getString("COLUMN_NAME"); - String simpleTypeName = rs.getString("TYPE_NAME").toUpperCase(Locale.ENGLISH); - boolean isUniqueKey = primaryKeys.contains(columnName); - int sqlType = rs.getInt("DATA_TYPE"); - int colSize = rs.getInt("COLUMN_SIZE"); - int decDigit = rs.getInt("DECIMAL_DIGITS"); - if (rs.wasNull()) { - decDigit = -1; - } - int charOctetLength = rs.getInt("CHAR_OCTET_LENGTH"); - boolean isNotNull = "NO".equals(rs.getString("IS_NULLABLE")); - // rs.getString("COLUMN_DEF") // or null // TODO - builder.add( - JdbcColumn.newGenericTypeColumn( - columnName, - sqlType, - simpleTypeName, - colSize, - decDigit, - charOctetLength, - isNotNull, - isUniqueKey)); - // We can't get declared column name using JDBC API. - // Subclasses need to overwrite it. - } - } finally { - rs.close(); - } - final List columns = Collections.unmodifiableList(builder); - if (columns.isEmpty()) { - return Optional.empty(); - } else { - return Optional.of(new JdbcSchema(columns)); - } + return super.newJdbcSchemaFromTableIfExists( + connection, + ((DatabricksOutputConnection) connection).currentConnectionTableIdentifier(table)); } } diff --git a/src/main/java/org/embulk/output/databricks/DatabricksOutputConnection.java b/src/main/java/org/embulk/output/databricks/DatabricksOutputConnection.java index 7355437..c9fef5a 100644 --- a/src/main/java/org/embulk/output/databricks/DatabricksOutputConnection.java +++ b/src/main/java/org/embulk/output/databricks/DatabricksOutputConnection.java @@ -42,58 +42,21 @@ protected void executeUseStatement(String sql) throws SQLException { // This is almost copy from JdbcOutputConnection excepting validation of table exists in current // schema public boolean tableExists(TableIdentifier table) throws SQLException { - try (ResultSet rs = - connection - .getMetaData() - .getTables(catalogName, table.getSchemaName(), table.getTableName(), null)) { - while (rs.next()) { - if (isAvailableTableMetadataInConnection(rs, table)) { - return true; - } - } - } - return false; + return super.tableExists(currentConnectionTableIdentifier(table)); } - public boolean isAvailableTableMetadataInConnection(ResultSet rs, TableIdentifier tableIdentifier) - throws SQLException { - // If unchecked, tables in other catalogs may appear to exist. - // This is because the base embulk jdbc plugin's tableIdentifier.getDatabase() is often returns - // null - // and one Databricks connection has multiple available catalogs (databases). - - // NOTE: maybe this logic is not necessary anymore after this PR: - // https://github.com/trocco-io/embulk-output-databricks/pull/11 - // But I'm not sure, so I'll keep it for now. - - if (tableIdentifier.getDatabase() == null) { - logger.trace("tableIdentifier.getDatabase() == null, check by instance variable"); - if (!rs.getString("TABLE_CAT").equalsIgnoreCase(catalogName)) { - return false; - } - } - if (tableIdentifier.getSchemaName() == null) { - logger.trace("tableIdentifier.getSchemaName() == null, check by instance variable"); - if (!rs.getString("TABLE_SCHEM").equalsIgnoreCase(schemaName)) { - return false; - } - } - - if (tableIdentifier.getDatabase() != null - && !tableIdentifier.getDatabase().equalsIgnoreCase(catalogName)) { - logger.error( - String.format( - "tableIdentifier.getSchemaName() != instance variable. (%s, %s)", - tableIdentifier.getDatabase(), catalogName)); - } - if (tableIdentifier.getSchemaName() != null - && !tableIdentifier.getSchemaName().equalsIgnoreCase(schemaName)) { - logger.error( - String.format( - "tableIdentifier.getSchemaName() != instance variable. (%s, %s)", - tableIdentifier.getSchemaName(), schemaName)); - } - return true; + public TableIdentifier currentConnectionTableIdentifier(TableIdentifier tableIdentifier) { + // Caution: + // JdbcOutputPlugin sometimes uses tableIdentifier whose database variable is null, + // which causes unexpected DatabaseMetaData behavior in AbstractJdbcOutputPlugin. + // For example, getTables and getColumns search in all catalogs, + // not just the one specified by the connection's default value, + // and can't search in schemas with multibyte name. + // So, if tableIdentifier database variable is null, it will set the connection's default value. + return new TableIdentifier( + tableIdentifier.getDatabase() != null ? tableIdentifier.getDatabase() : catalogName, + tableIdentifier.getSchemaName() != null ? tableIdentifier.getSchemaName() : schemaName, + tableIdentifier.getTableName()); } @Override @@ -299,8 +262,4 @@ private String buildColumns(JdbcSchema schema, String prefix) { } return sb.toString(); } - - public String getCatalogName() { - return catalogName; - } }