From a808422b5b922e2a04bc67d50aa45f747dab73b7 Mon Sep 17 00:00:00 2001 From: Nacho Barrientos Date: Wed, 8 Nov 2023 10:12:04 +0100 Subject: [PATCH] Add support for Oracle network data encryption and integrity --- docs/sink-connector.md | 4 ++ .../aiven/connect/jdbc/config/JdbcConfig.java | 52 +++++++++++++++++++ .../connect/jdbc/dialect/DatabaseDialect.java | 3 ++ .../jdbc/dialect/GenericDatabaseDialect.java | 6 +++ .../jdbc/dialect/OracleDatabaseDialect.java | 13 +++++ .../connect/jdbc/sink/JdbcSinkConfig.java | 2 + .../aiven/connect/jdbc/sink/JdbcSinkTask.java | 1 + .../connect/jdbc/sink/JdbcSinkConfigTest.java | 21 ++++++++ 8 files changed, 102 insertions(+) diff --git a/docs/sink-connector.md b/docs/sink-connector.md index b2e55a19..bd5214cf 100644 --- a/docs/sink-connector.md +++ b/docs/sink-connector.md @@ -52,6 +52,10 @@ For example, for PostgreSQL the connection URL might look like jdbc:postgresql://localhost:5432/test?user=fred&password=secret&ssl=true ``` +For Oracle databases it's possible to configure [network data +encryption and integrity](https://docs.oracle.com/cd/B19306_01/network.102/b14268/asojbdc.htm#g1007990) +by setting `oracle.net.encryption_client` and `oracle.net.crypto_checksum_client`. + ### SQL Dialects Different databases use different dialects of SQL. The connector diff --git a/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java b/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java index fc5438c6..f27e8523 100644 --- a/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java @@ -60,6 +60,20 @@ public class JdbcConfig extends AbstractConfig { + "specific dialect. All properly-packaged dialects in the JDBC connector plugin " + "can be used."; + public static final String ORACLE_ENCRYPTION_CLIENT_CONFIG = "oracle.net.encryption_client"; + private static final String ORACLE_ENCRYPTION_CLIENT_DISPLAY = "Oracle encryption"; + private static final String ORACLE_ENCRYPTION_CLIENT_DOC = + "This parameter defines the level of security that the client wants to negotiate with " + + "the server. The permitted values are \"REJECTED\", \"ACCEPTED\", \"REQUESTED\" and " + + "\"REQUIRED\""; + + public static final String ORACLE_CHECKSUM_CLIENT_CONFIG = "oracle.net.crypto_checksum_client"; + private static final String ORACLE_CHECKSUM_CLIENT_DISPLAY = "Oracle crypto checksum"; + private static final String ORACLE_CHECKSUM_CLIENT_DOC = + "This parameter defines the level of security that the client wants to negotiate with " + + "the server for data integrity. The permitted values are \"REJECTED\", \"ACCEPTED\", " + + "\"REQUESTED\" and \"REQUIRED\""; + public static final String SQL_QUOTE_IDENTIFIERS_CONFIG = "sql.quote.identifiers"; private static final Boolean SQL_QUOTE_IDENTIFIERS_DEFAULT = true; private static final String SQL_QUOTE_IDENTIFIERS_DOC = @@ -83,6 +97,14 @@ public final String getConnectionUser() { return getString(CONNECTION_USER_CONFIG); } + public final String getOracleEncryptionClient() { + return getString(JdbcConfig.ORACLE_ENCRYPTION_CLIENT_CONFIG); + } + + public final String getOracleChecksumClient() { + return getString(JdbcConfig.ORACLE_CHECKSUM_CLIENT_CONFIG); + } + public final TimeZone getDBTimeZone() { return TimeZone.getTimeZone(ZoneId.of(getString(DB_TIMEZONE_CONFIG))); } @@ -171,6 +193,36 @@ protected static void defineDialectName(final ConfigDef configDef, final int ord ); } + protected static void defineOracleEncryptionClient(final ConfigDef configDef, final int orderInGroup) { + configDef.define( + JdbcConfig.ORACLE_ENCRYPTION_CLIENT_CONFIG, + ConfigDef.Type.STRING, + null, + null, + ConfigDef.Importance.LOW, + JdbcConfig.ORACLE_ENCRYPTION_CLIENT_DOC, + DATABASE_GROUP, + orderInGroup, + ConfigDef.Width.LONG, + JdbcConfig.ORACLE_ENCRYPTION_CLIENT_DISPLAY + ); + } + + protected static void defineOracleChecksumClient(final ConfigDef configDef, final int orderInGroup) { + configDef.define( + JdbcConfig.ORACLE_CHECKSUM_CLIENT_CONFIG, + ConfigDef.Type.STRING, + null, + null, + ConfigDef.Importance.LOW, + JdbcConfig.ORACLE_CHECKSUM_CLIENT_DOC, + DATABASE_GROUP, + orderInGroup, + ConfigDef.Width.LONG, + JdbcConfig.ORACLE_CHECKSUM_CLIENT_DISPLAY + ); + } + protected static void defineSqlQuoteIdentifiers(final ConfigDef configDef, final int orderInGroup) { configDef.define( JdbcConfig.SQL_QUOTE_IDENTIFIERS_CONFIG, diff --git a/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java b/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java index 2a4befdc..9fed3a32 100644 --- a/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java +++ b/src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; +import io.aiven.connect.jdbc.config.JdbcConfig; import io.aiven.connect.jdbc.sink.JdbcSinkConfig; import io.aiven.connect.jdbc.sink.metadata.FieldsMetadata; import io.aiven.connect.jdbc.sink.metadata.SchemaPair; @@ -134,6 +135,8 @@ public interface DatabaseDialect extends ConnectionProvider { */ String name(); + public void setDialectSpecificProperties(final JdbcConfig config); + /** * Create a new prepared statement using the specified database connection. * diff --git a/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java b/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java index 1ccd2744..60cd0f12 100644 --- a/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java +++ b/src/main/java/io/aiven/connect/jdbc/dialect/GenericDatabaseDialect.java @@ -185,6 +185,12 @@ protected List getDefaultSinkTableTypes() { return List.of(JdbcSourceTaskConfig.TABLE_TYPE_DEFAULT); } + @Override + public void setDialectSpecificProperties( + final JdbcConfig config + ) { + } + @Override public String name() { return getClass().getSimpleName().replace("DatabaseDialect", ""); diff --git a/src/main/java/io/aiven/connect/jdbc/dialect/OracleDatabaseDialect.java b/src/main/java/io/aiven/connect/jdbc/dialect/OracleDatabaseDialect.java index 2ea50dec..47a59825 100644 --- a/src/main/java/io/aiven/connect/jdbc/dialect/OracleDatabaseDialect.java +++ b/src/main/java/io/aiven/connect/jdbc/dialect/OracleDatabaseDialect.java @@ -216,4 +216,17 @@ protected String sanitizedUrl(final String url) { .replaceAll("(:thin:[^/]*)/([^@]*)@", "$1/****@") .replaceAll("(:oci[^:]*:[^/]*)/([^@]*)@", "$1/****@"); } + + @Override + public void setDialectSpecificProperties( + final JdbcConfig config + ) { + if (config.getOracleEncryptionClient() != null) { + System.setProperty(JdbcConfig.ORACLE_ENCRYPTION_CLIENT_CONFIG, config.getOracleEncryptionClient()); + } + + if (config.getOracleChecksumClient() != null) { + System.setProperty(JdbcConfig.ORACLE_CHECKSUM_CLIENT_CONFIG, config.getOracleChecksumClient()); + } + } } diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java index 04be94c1..24384512 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java @@ -194,6 +194,8 @@ public enum PrimaryKeyMode { defineDbTimezone(CONFIG_DEF, ++orderInGroup); defineDialectName(CONFIG_DEF, ++orderInGroup); defineSqlQuoteIdentifiers(CONFIG_DEF, ++orderInGroup); + defineOracleEncryptionClient(CONFIG_DEF, ++orderInGroup); + defineOracleChecksumClient(CONFIG_DEF, ++orderInGroup); // Writes CONFIG_DEF diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkTask.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkTask.java index c83d5ec0..0a3b95d6 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkTask.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkTask.java @@ -58,6 +58,7 @@ void initWriter() { final String connectionUrl = config.getConnectionUrl(); dialect = DatabaseDialects.findBestFor(connectionUrl, config); } + dialect.setDialectSpecificProperties(config); final DbStructure dbStructure = new DbStructure(dialect); log.info("Initializing writer using SQL dialect: {}", dialect.getClass().getSimpleName()); writer = new JdbcDbWriter(config, dialect, dbStructure); diff --git a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java index e2034244..1513d16c 100644 --- a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java +++ b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java @@ -22,9 +22,12 @@ import org.apache.kafka.common.config.ConfigException; +import io.aiven.connect.jdbc.config.JdbcConfig; + import org.junit.Test; import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertNull; import static junit.framework.TestCase.assertTrue; public class JdbcSinkConfigTest { @@ -71,4 +74,22 @@ public void shouldThrowExceptionForEmptyMappingFormat() { new JdbcSinkConfig(props); } + @Test + public void shouldParseOracleSpecificConfiguration() { + final Map props = new HashMap<>(); + props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); + + JdbcSinkConfig config = new JdbcSinkConfig(props); + + assertNull(config.getOracleEncryptionClient()); + assertNull(config.getOracleChecksumClient()); + + props.put(JdbcConfig.ORACLE_ENCRYPTION_CLIENT_CONFIG, "REQUIRED"); + props.put(JdbcConfig.ORACLE_CHECKSUM_CLIENT_CONFIG, "ACCEPTED"); + + config = new JdbcSinkConfig(props); + + assertEquals(config.getOracleEncryptionClient(), "REQUIRED"); + assertEquals(config.getOracleChecksumClient(), "ACCEPTED"); + } }