Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Oracle network data encryption and integrity #273

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sink-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ For example, for PostgreSQL the connection URL might look like
jdbc:postgresql://localhost:5432/test?user=fred&password=secret&ssl=true
```

For Oracle databases it's possible to configure [network data
encryption and integrity](https://docs.oracle.com/cd/B19306_01/network.102/b14268/asojbdc.htm#g1007990)
by setting `oracle.net.encryption_client` and `oracle.net.crypto_checksum_client`.

### SQL Dialects

Different databases use different dialects of SQL. The connector
Expand Down
52 changes: 52 additions & 0 deletions src/main/java/io/aiven/connect/jdbc/config/JdbcConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ public class JdbcConfig extends AbstractConfig {
+ "specific dialect. All properly-packaged dialects in the JDBC connector plugin "
+ "can be used.";

public static final String ORACLE_ENCRYPTION_CLIENT_CONFIG = "oracle.net.encryption_client";
private static final String ORACLE_ENCRYPTION_CLIENT_DISPLAY = "Oracle encryption";
private static final String ORACLE_ENCRYPTION_CLIENT_DOC =
"This parameter defines the level of security that the client wants to negotiate with "
+ "the server. The permitted values are \"REJECTED\", \"ACCEPTED\", \"REQUESTED\" and "
+ "\"REQUIRED\"";

public static final String ORACLE_CHECKSUM_CLIENT_CONFIG = "oracle.net.crypto_checksum_client";
private static final String ORACLE_CHECKSUM_CLIENT_DISPLAY = "Oracle crypto checksum";
private static final String ORACLE_CHECKSUM_CLIENT_DOC =
"This parameter defines the level of security that the client wants to negotiate with "
+ "the server for data integrity. The permitted values are \"REJECTED\", \"ACCEPTED\", "
+ "\"REQUESTED\" and \"REQUIRED\"";

public static final String SQL_QUOTE_IDENTIFIERS_CONFIG = "sql.quote.identifiers";
private static final Boolean SQL_QUOTE_IDENTIFIERS_DEFAULT = true;
private static final String SQL_QUOTE_IDENTIFIERS_DOC =
Expand All @@ -83,6 +97,14 @@ public final String getConnectionUser() {
return getString(CONNECTION_USER_CONFIG);
}

public final String getOracleEncryptionClient() {
return getString(JdbcConfig.ORACLE_ENCRYPTION_CLIENT_CONFIG);
}

public final String getOracleChecksumClient() {
return getString(JdbcConfig.ORACLE_CHECKSUM_CLIENT_CONFIG);
}

public final TimeZone getDBTimeZone() {
return TimeZone.getTimeZone(ZoneId.of(getString(DB_TIMEZONE_CONFIG)));
}
Expand Down Expand Up @@ -171,6 +193,36 @@ protected static void defineDialectName(final ConfigDef configDef, final int ord
);
}

protected static void defineOracleEncryptionClient(final ConfigDef configDef, final int orderInGroup) {
configDef.define(
JdbcConfig.ORACLE_ENCRYPTION_CLIENT_CONFIG,
ConfigDef.Type.STRING,
null,
null,
ConfigDef.Importance.LOW,
JdbcConfig.ORACLE_ENCRYPTION_CLIENT_DOC,
DATABASE_GROUP,
orderInGroup,
ConfigDef.Width.LONG,
JdbcConfig.ORACLE_ENCRYPTION_CLIENT_DISPLAY
);
}

protected static void defineOracleChecksumClient(final ConfigDef configDef, final int orderInGroup) {
configDef.define(
JdbcConfig.ORACLE_CHECKSUM_CLIENT_CONFIG,
ConfigDef.Type.STRING,
null,
null,
ConfigDef.Importance.LOW,
JdbcConfig.ORACLE_CHECKSUM_CLIENT_DOC,
DATABASE_GROUP,
orderInGroup,
ConfigDef.Width.LONG,
JdbcConfig.ORACLE_CHECKSUM_CLIENT_DISPLAY
);
}

protected static void defineSqlQuoteIdentifiers(final ConfigDef configDef, final int orderInGroup) {
configDef.define(
JdbcConfig.SQL_QUOTE_IDENTIFIERS_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;

import io.aiven.connect.jdbc.config.JdbcConfig;
import io.aiven.connect.jdbc.sink.JdbcSinkConfig;
import io.aiven.connect.jdbc.sink.metadata.FieldsMetadata;
import io.aiven.connect.jdbc.sink.metadata.SchemaPair;
Expand Down Expand Up @@ -134,6 +135,8 @@ public interface DatabaseDialect extends ConnectionProvider {
*/
String name();

public void setDialectSpecificProperties(final JdbcConfig config);

/**
* Create a new prepared statement using the specified database connection.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ protected List<String> getDefaultSinkTableTypes() {
return List.of(JdbcSourceTaskConfig.TABLE_TYPE_DEFAULT);
}

@Override
public void setDialectSpecificProperties(
final JdbcConfig config
) {
}

@Override
public String name() {
return getClass().getSimpleName().replace("DatabaseDialect", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,17 @@ protected String sanitizedUrl(final String url) {
.replaceAll("(:thin:[^/]*)/([^@]*)@", "$1/****@")
.replaceAll("(:oci[^:]*:[^/]*)/([^@]*)@", "$1/****@");
}

@Override
public void setDialectSpecificProperties(
final JdbcConfig config
) {
if (config.getOracleEncryptionClient() != null) {
System.setProperty(JdbcConfig.ORACLE_ENCRYPTION_CLIENT_CONFIG, config.getOracleEncryptionClient());
}

if (config.getOracleChecksumClient() != null) {
System.setProperty(JdbcConfig.ORACLE_CHECKSUM_CLIENT_CONFIG, config.getOracleChecksumClient());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ public enum PrimaryKeyMode {
defineDbTimezone(CONFIG_DEF, ++orderInGroup);
defineDialectName(CONFIG_DEF, ++orderInGroup);
defineSqlQuoteIdentifiers(CONFIG_DEF, ++orderInGroup);
defineOracleEncryptionClient(CONFIG_DEF, ++orderInGroup);
defineOracleChecksumClient(CONFIG_DEF, ++orderInGroup);

// Writes
CONFIG_DEF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ void initWriter() {
final String connectionUrl = config.getConnectionUrl();
dialect = DatabaseDialects.findBestFor(connectionUrl, config);
}
dialect.setDialectSpecificProperties(config);
final DbStructure dbStructure = new DbStructure(dialect);
log.info("Initializing writer using SQL dialect: {}", dialect.getClass().getSimpleName());
writer = new JdbcDbWriter(config, dialect, dbStructure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@

import org.apache.kafka.common.config.ConfigException;

import io.aiven.connect.jdbc.config.JdbcConfig;

import org.junit.Test;

import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertNull;
import static junit.framework.TestCase.assertTrue;

public class JdbcSinkConfigTest {
Expand Down Expand Up @@ -71,4 +74,22 @@ public void shouldThrowExceptionForEmptyMappingFormat() {
new JdbcSinkConfig(props);
}

@Test
public void shouldParseOracleSpecificConfiguration() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");

JdbcSinkConfig config = new JdbcSinkConfig(props);

assertNull(config.getOracleEncryptionClient());
assertNull(config.getOracleChecksumClient());

props.put(JdbcConfig.ORACLE_ENCRYPTION_CLIENT_CONFIG, "REQUIRED");
props.put(JdbcConfig.ORACLE_CHECKSUM_CLIENT_CONFIG, "ACCEPTED");

config = new JdbcSinkConfig(props);

assertEquals(config.getOracleEncryptionClient(), "REQUIRED");
assertEquals(config.getOracleChecksumClient(), "ACCEPTED");
}
}