Skip to content

Commit

Permalink
Connect Runtime fails initialising the static ctor for InsertRollingF…
Browse files Browse the repository at this point in the history
…ieldTimestampHeaders (#13)

It's not clear why but the code `RecordFieldTimestamp.extendConfigDef(InsertRollingTimestampHeaders.CONFIG_DEF);` gets Connect plugin scanner throwing an exception. For reasons beyond the code implementation it thinks the code redefines the setting entry named: `field`.

The solution is to redefine the ConfigDef and avoid referencing the static values defined in th parent classes.

Co-authored-by: stheppi <[email protected]>
  • Loading branch information
stheppi and stheppi authored Apr 4, 2024
1 parent 3e96036 commit 2bd16e8
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 19 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@

<groupId>io.lenses</groupId>
<artifactId>kafka-connect-smt</artifactId>
<version>1.1.3-SNAPSHOT</version>
<version>1.2.2-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<kafka.version>3.5.0</kafka.version>
<kafka.version>3.7.0</kafka.version>
<headerFile>${project.basedir}/checkstyle/java.header</headerFile>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ public class InsertFieldTimestampHeaders<R extends ConnectRecord<R>>
RecordFieldTimestamp.extendConfigDef(InsertTimestampHeaders.CONFIG_DEF);
private RecordFieldTimestamp<R> fieldTimestamp;

public InsertFieldTimestampHeaders() {
super(InsertRecordTimestampHeaders.CONFIG_DEF);
}
public InsertFieldTimestampHeaders() {}

@Override
protected Instant getInstant(R r) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
public class InsertRecordTimestampHeaders<R extends ConnectRecord<R>>
extends InsertTimestampHeaders<R> {

public InsertRecordTimestampHeaders() {
super(InsertRecordTimestampHeaders.CONFIG_DEF);
}
public InsertRecordTimestampHeaders() {}

@Override
protected Instant getInstant(R r) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,110 @@
*/
public class InsertRollingFieldTimestampHeaders<R extends ConnectRecord<R>>
extends InsertRollingTimestampHeaders<R> {

private RecordFieldTimestamp<R> fieldTimestamp;

public static ConfigDef CONFIG_DEF =
RecordFieldTimestamp.extendConfigDef(InsertRollingTimestampHeaders.CONFIG_DEF);
public static ConfigDef CONFIG_DEF;

static {
// The code would be
// RecordFieldTimestamp.extendConfigDef(InsertRollingTimestampHeaders.CONFIG_DEF);
// However Connect runtime gets badly confused for reasons not understood.
// Connect runtime is thinking that the field setting is defined already which is not the case
// The workaround is to redefine all the ConfigDef settings here to avoid the Connect runtime
// nonsense
ConfigDef replicated =
new ConfigDef()
.define(
InsertTimestampHeaders.ConfigName.HEADER_PREFIX_NAME,
ConfigDef.Type.STRING,
InsertTimestampHeaders.ConfigName.DEFAULT_PREFIX_NAME,
ConfigDef.Importance.HIGH,
"The prefix to use for the headers inserted. For example, if the prefix is 'wallclock_', the headers inserted will be 'wallclock_year', 'wallclock_month', etc.")
.define(
InsertTimestampHeaders.ConfigName.YEAR_FORMAT_CONFIG,
ConfigDef.Type.STRING,
InsertTimestampHeaders.ConfigName.DEFAULT_YEAR_FORMAT,
ConfigDef.Importance.HIGH,
"The format to use for the year. The default is '"
+ InsertTimestampHeaders.ConfigName.DEFAULT_YEAR_FORMAT
+ "'.")
.define(
InsertTimestampHeaders.ConfigName.MONTH_FORMAT_CONFIG,
ConfigDef.Type.STRING,
InsertTimestampHeaders.ConfigName.DEFAULT_MONTH_FORMAT,
ConfigDef.Importance.HIGH,
"The format to use for the month. The default is '"
+ InsertTimestampHeaders.ConfigName.DEFAULT_MONTH_FORMAT
+ "'.")
.define(
InsertTimestampHeaders.ConfigName.DAY_FORMAT_CONFIG,
ConfigDef.Type.STRING,
InsertTimestampHeaders.ConfigName.DEFAULT_DAY_FORMAT,
ConfigDef.Importance.HIGH,
"The format to use for the day. The default is '"
+ InsertTimestampHeaders.ConfigName.DEFAULT_DAY_FORMAT
+ "'.")
.define(
InsertTimestampHeaders.ConfigName.HOUR_FORMAT_CONFIG,
ConfigDef.Type.STRING,
InsertTimestampHeaders.ConfigName.DEFAULT_HOUR_FORMAT,
ConfigDef.Importance.HIGH,
"The format to use for the hour. The default is '"
+ InsertTimestampHeaders.ConfigName.DEFAULT_HOUR_FORMAT
+ "'.")
.define(
InsertTimestampHeaders.ConfigName.MINUTE_FORMAT_CONFIG,
ConfigDef.Type.STRING,
InsertTimestampHeaders.ConfigName.DEFAULT_MINUTE_FORMAT,
ConfigDef.Importance.HIGH,
"The format to use for the minute. The default is '"
+ InsertTimestampHeaders.ConfigName.DEFAULT_MINUTE_FORMAT
+ "'.")
.define(
InsertTimestampHeaders.ConfigName.SECOND_FORMAT_CONFIG,
ConfigDef.Type.STRING,
InsertTimestampHeaders.ConfigName.DEFAULT_SECOND_FORMAT,
ConfigDef.Importance.HIGH,
"The format to use for the second. The default is '"
+ InsertTimestampHeaders.ConfigName.DEFAULT_SECOND_FORMAT
+ "'.")
.define(
InsertTimestampHeaders.ConfigName.DATE_FORMAT_CONFIG,
ConfigDef.Type.STRING,
InsertTimestampHeaders.ConfigName.DEFAULT_DATE_FORMAT,
ConfigDef.Importance.HIGH,
"The format to use for the date. The default is '"
+ InsertTimestampHeaders.ConfigName.DEFAULT_DATE_FORMAT
+ "'.")
.define(
InsertTimestampHeaders.ConfigName.TIMEZONE,
ConfigDef.Type.STRING,
"UTC",
ConfigDef.Importance.HIGH,
"The timezone to use.")
.define(
InsertTimestampHeaders.ConfigName.LOCALE,
ConfigDef.Type.STRING,
InsertTimestampHeaders.ConfigName.DEFAULT_LOCALE,
ConfigDef.Importance.HIGH,
"The locale to use.")
.define(
ConfigName.ROLLING_WINDOW_SIZE_CONFIG,
ConfigDef.Type.INT,
ConfigName.DEFAULT_ROLLING_WINDOW_VALUE,
ConfigDef.Importance.HIGH,
"The rolling window size. For example, if the rolling window is set to 'minutes' "
+ "and the rolling window value is set to 15, then the rolling window "
+ "is 15 minutes.")
.define(
ConfigName.ROLLING_WINDOW_TYPE_CONFIG,
ConfigDef.Type.STRING,
ConfigName.DEFAULT_ROLLING_WINDOW.name(),
ConfigDef.Importance.HIGH,
"The rolling window type. The allowed values are hours, minutes or seconds.");

CONFIG_DEF = RecordFieldTimestamp.extendConfigDef(replicated);
}

public InsertRollingFieldTimestampHeaders() {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ abstract class InsertRollingTimestampHeaders<R extends ConnectRecord<R>>
"The rolling window type. The allowed values are hours, minutes or seconds.");
private RollingWindowDetails rollingWindowDetails;

public InsertRollingTimestampHeaders() {
super(CONFIG_DEF);
}
public InsertRollingTimestampHeaders() {}

interface ConfigName {
String ROLLING_WINDOW_TYPE_CONFIG = "rolling.window.type";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ interface ConfigName {
String DEFAULT_LOCALE = "en";
}

protected InsertTimestampHeaders(ConfigDef configDef) {}
protected InsertTimestampHeaders() {}

protected abstract Instant getInstant(R r);

Expand Down Expand Up @@ -192,7 +192,7 @@ protected void configureInternal(SimpleConfig config) {}

@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
final SimpleConfig config = new SimpleConfig(config(), props);
final String timeZoneStr = config.getString(ConfigName.TIMEZONE);
timeZone = TimeZone.getTimeZone(timeZoneStr).toZoneId();
String prefixName = config.getString(ConfigName.HEADER_PREFIX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
public class InsertWallclockHeaders<R extends ConnectRecord<R>> extends InsertTimestampHeaders<R> {
private Supplier<Instant> instantSupplier = Instant::now;

public InsertWallclockHeaders() {
super(InsertTimestampHeaders.CONFIG_DEF);
}
public InsertWallclockHeaders() {}

// Used solely for testing purposes
void setInstantSupplier(Supplier<Instant> instantSupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

/** Unit tests for {@link InsertRollingRecordTimestampHeaders}. */
public class InsertRollingFieldTimestampHeadersTest {

@Test
public void testRollingWindowEvery15Minutes() {
ArrayList<Tuple5<String, Integer, String, String, String>> scenarios = new ArrayList<>();
Expand Down

0 comments on commit 2bd16e8

Please sign in to comment.