Skip to content

Commit

Permalink
Merge pull request #12 from trocco-io/feature/remove-duplicate-code-w…
Browse files Browse the repository at this point in the history
…ith-superclass

Refactoring: remove duplicate code with superclass
  • Loading branch information
NamedPython authored Nov 6, 2024
2 parents 881c4f1 + 437f8f2 commit 54844dc
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 135 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@ $ ./gradlew gem # -t to watch change of files and rebuild continuously

## TEST

```
$ EMBULK_OUTPUT_DATABRICKS_TEST_CONFIG="example/test.yml" ./gradlew test # Create example/test.yml based on example/test.yml.example
```
84 changes: 3 additions & 81 deletions src/main/java/org/embulk/output/DatabricksOutputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.embulk.output;

import java.io.IOException;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import org.embulk.config.ConfigDiff;
Expand Down Expand Up @@ -171,86 +169,10 @@ protected void logConnectionProperties(String url, Properties props) {
super.logConnectionProperties(url, maskedProps);
}

// This is almost copy from AbstractJdbcOutputPlugin excepting validation of table exists in
// current schema
public Optional<JdbcSchema> newJdbcSchemaFromTableIfExists(
JdbcOutputConnection connection, TableIdentifier table) throws SQLException {
if (!connection.tableExists(table)) {
// DatabaseMetaData.getPrimaryKeys fails if table does not exist
return Optional.empty();
}

DatabricksOutputConnection conn = (DatabricksOutputConnection) connection;
DatabaseMetaData dbm = connection.getMetaData();
String escape = dbm.getSearchStringEscape();

ResultSet rs =
dbm.getPrimaryKeys(conn.getCatalogName(), table.getSchemaName(), table.getTableName());
final HashSet<String> primaryKeysBuilder = new HashSet<>();
try {
while (rs.next()) {
if (!((DatabricksOutputConnection) connection)
.isAvailableTableMetadataInConnection(rs, table)) {
continue;
}
primaryKeysBuilder.add(rs.getString("COLUMN_NAME"));
}
} finally {
rs.close();
}
final Set<String> primaryKeys = Collections.unmodifiableSet(primaryKeysBuilder);

final ArrayList<JdbcColumn> builder = new ArrayList<>();
// NOTE: Columns of TIMESTAMP_NTZ, INTERVAL are not included in getColumns result.
// This cause runtime sql exception when copy into.
// (probably because of unsupported in databricks jdbc)
// https://docs.databricks.com/en/sql/language-manual/data-types/interval-type.html
// https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html#notes
rs =
dbm.getColumns(
JdbcUtils.escapeSearchString(conn.getCatalogName(), escape),
JdbcUtils.escapeSearchString(table.getSchemaName(), escape),
JdbcUtils.escapeSearchString(table.getTableName(), escape),
null);
try {
while (rs.next()) {
if (!((DatabricksOutputConnection) connection)
.isAvailableTableMetadataInConnection(rs, table)) {
continue;
}
String columnName = rs.getString("COLUMN_NAME");
String simpleTypeName = rs.getString("TYPE_NAME").toUpperCase(Locale.ENGLISH);
boolean isUniqueKey = primaryKeys.contains(columnName);
int sqlType = rs.getInt("DATA_TYPE");
int colSize = rs.getInt("COLUMN_SIZE");
int decDigit = rs.getInt("DECIMAL_DIGITS");
if (rs.wasNull()) {
decDigit = -1;
}
int charOctetLength = rs.getInt("CHAR_OCTET_LENGTH");
boolean isNotNull = "NO".equals(rs.getString("IS_NULLABLE"));
// rs.getString("COLUMN_DEF") // or null // TODO
builder.add(
JdbcColumn.newGenericTypeColumn(
columnName,
sqlType,
simpleTypeName,
colSize,
decDigit,
charOctetLength,
isNotNull,
isUniqueKey));
// We can't get declared column name using JDBC API.
// Subclasses need to overwrite it.
}
} finally {
rs.close();
}
final List<JdbcColumn> columns = Collections.unmodifiableList(builder);
if (columns.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(new JdbcSchema(columns));
}
return super.newJdbcSchemaFromTableIfExists(
connection,
((DatabricksOutputConnection) connection).currentConnectionTableIdentifier(table));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,58 +42,21 @@ protected void executeUseStatement(String sql) throws SQLException {
// This is almost copy from JdbcOutputConnection excepting validation of table exists in current
// schema
public boolean tableExists(TableIdentifier table) throws SQLException {
try (ResultSet rs =
connection
.getMetaData()
.getTables(catalogName, table.getSchemaName(), table.getTableName(), null)) {
while (rs.next()) {
if (isAvailableTableMetadataInConnection(rs, table)) {
return true;
}
}
}
return false;
return super.tableExists(currentConnectionTableIdentifier(table));
}

public boolean isAvailableTableMetadataInConnection(ResultSet rs, TableIdentifier tableIdentifier)
throws SQLException {
// If unchecked, tables in other catalogs may appear to exist.
// This is because the base embulk jdbc plugin's tableIdentifier.getDatabase() is often returns
// null
// and one Databricks connection has multiple available catalogs (databases).

// NOTE: maybe this logic is not necessary anymore after this PR:
// https://github.com/trocco-io/embulk-output-databricks/pull/11
// But I'm not sure, so I'll keep it for now.

if (tableIdentifier.getDatabase() == null) {
logger.trace("tableIdentifier.getDatabase() == null, check by instance variable");
if (!rs.getString("TABLE_CAT").equalsIgnoreCase(catalogName)) {
return false;
}
}
if (tableIdentifier.getSchemaName() == null) {
logger.trace("tableIdentifier.getSchemaName() == null, check by instance variable");
if (!rs.getString("TABLE_SCHEM").equalsIgnoreCase(schemaName)) {
return false;
}
}

if (tableIdentifier.getDatabase() != null
&& !tableIdentifier.getDatabase().equalsIgnoreCase(catalogName)) {
logger.error(
String.format(
"tableIdentifier.getSchemaName() != instance variable. (%s, %s)",
tableIdentifier.getDatabase(), catalogName));
}
if (tableIdentifier.getSchemaName() != null
&& !tableIdentifier.getSchemaName().equalsIgnoreCase(schemaName)) {
logger.error(
String.format(
"tableIdentifier.getSchemaName() != instance variable. (%s, %s)",
tableIdentifier.getSchemaName(), schemaName));
}
return true;
public TableIdentifier currentConnectionTableIdentifier(TableIdentifier tableIdentifier) {
// Caution:
// JdbcOutputPlugin sometimes uses tableIdentifier whose database variable is null,
// which causes unexpected DatabaseMetaData behavior in AbstractJdbcOutputPlugin.
// For example, getTables and getColumns search in all catalogs,
// not just the one specified by the connection's default value,
// and can't search in schemas with multibyte name.
// So, if tableIdentifier database variable is null, it will set the connection's default value.
return new TableIdentifier(
tableIdentifier.getDatabase() != null ? tableIdentifier.getDatabase() : catalogName,
tableIdentifier.getSchemaName() != null ? tableIdentifier.getSchemaName() : schemaName,
tableIdentifier.getTableName());
}

@Override
Expand Down Expand Up @@ -299,8 +262,4 @@ private String buildColumns(JdbcSchema schema, String prefix) {
}
return sb.toString();
}

public String getCatalogName() {
return catalogName;
}
}

0 comments on commit 54844dc

Please sign in to comment.