diff --git a/pom.xml b/pom.xml index d91017c..8b994ce 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ io.lenses kafka-connect-smt - 1.1.3-SNAPSHOT + 1.2.2-SNAPSHOT jar @@ -34,7 +34,7 @@ 8 UTF-8 1.8 - 3.5.0 + 3.7.0 ${project.basedir}/checkstyle/java.header diff --git a/src/main/java/io/lenses/connect/smt/header/InsertFieldTimestampHeaders.java b/src/main/java/io/lenses/connect/smt/header/InsertFieldTimestampHeaders.java index 843f387..74bc590 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertFieldTimestampHeaders.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertFieldTimestampHeaders.java @@ -28,9 +28,7 @@ public class InsertFieldTimestampHeaders> RecordFieldTimestamp.extendConfigDef(InsertTimestampHeaders.CONFIG_DEF); private RecordFieldTimestamp fieldTimestamp; - public InsertFieldTimestampHeaders() { - super(InsertRecordTimestampHeaders.CONFIG_DEF); - } + public InsertFieldTimestampHeaders() {} @Override protected Instant getInstant(R r) { diff --git a/src/main/java/io/lenses/connect/smt/header/InsertRecordTimestampHeaders.java b/src/main/java/io/lenses/connect/smt/header/InsertRecordTimestampHeaders.java index 5b289ae..f85daf6 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertRecordTimestampHeaders.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertRecordTimestampHeaders.java @@ -23,9 +23,7 @@ public class InsertRecordTimestampHeaders> extends InsertTimestampHeaders { - public InsertRecordTimestampHeaders() { - super(InsertRecordTimestampHeaders.CONFIG_DEF); - } + public InsertRecordTimestampHeaders() {} @Override protected Instant getInstant(R r) { diff --git a/src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java b/src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java index d4a44ea..5c5f0df 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java @@ -23,11 +23,110 @@ */ public class InsertRollingFieldTimestampHeaders> extends InsertRollingTimestampHeaders { - private RecordFieldTimestamp fieldTimestamp; - public static ConfigDef CONFIG_DEF = - RecordFieldTimestamp.extendConfigDef(InsertRollingTimestampHeaders.CONFIG_DEF); + public static ConfigDef CONFIG_DEF; + + static { + // The code would be + // RecordFieldTimestamp.extendConfigDef(InsertRollingTimestampHeaders.CONFIG_DEF); + // However Connect runtime gets badly confused for reasons not understood. + // Connect runtime is thinking that the field setting is defined already which is not the case + // The workaround is to redefine all the ConfigDef settings here to avoid the Connect runtime + // nonsense + ConfigDef replicated = + new ConfigDef() + .define( + InsertTimestampHeaders.ConfigName.HEADER_PREFIX_NAME, + ConfigDef.Type.STRING, + InsertTimestampHeaders.ConfigName.DEFAULT_PREFIX_NAME, + ConfigDef.Importance.HIGH, + "The prefix to use for the headers inserted. For example, if the prefix is 'wallclock_', the headers inserted will be 'wallclock_year', 'wallclock_month', etc.") + .define( + InsertTimestampHeaders.ConfigName.YEAR_FORMAT_CONFIG, + ConfigDef.Type.STRING, + InsertTimestampHeaders.ConfigName.DEFAULT_YEAR_FORMAT, + ConfigDef.Importance.HIGH, + "The format to use for the year. The default is '" + + InsertTimestampHeaders.ConfigName.DEFAULT_YEAR_FORMAT + + "'.") + .define( + InsertTimestampHeaders.ConfigName.MONTH_FORMAT_CONFIG, + ConfigDef.Type.STRING, + InsertTimestampHeaders.ConfigName.DEFAULT_MONTH_FORMAT, + ConfigDef.Importance.HIGH, + "The format to use for the month. The default is '" + + InsertTimestampHeaders.ConfigName.DEFAULT_MONTH_FORMAT + + "'.") + .define( + InsertTimestampHeaders.ConfigName.DAY_FORMAT_CONFIG, + ConfigDef.Type.STRING, + InsertTimestampHeaders.ConfigName.DEFAULT_DAY_FORMAT, + ConfigDef.Importance.HIGH, + "The format to use for the day. The default is '" + + InsertTimestampHeaders.ConfigName.DEFAULT_DAY_FORMAT + + "'.") + .define( + InsertTimestampHeaders.ConfigName.HOUR_FORMAT_CONFIG, + ConfigDef.Type.STRING, + InsertTimestampHeaders.ConfigName.DEFAULT_HOUR_FORMAT, + ConfigDef.Importance.HIGH, + "The format to use for the hour. The default is '" + + InsertTimestampHeaders.ConfigName.DEFAULT_HOUR_FORMAT + + "'.") + .define( + InsertTimestampHeaders.ConfigName.MINUTE_FORMAT_CONFIG, + ConfigDef.Type.STRING, + InsertTimestampHeaders.ConfigName.DEFAULT_MINUTE_FORMAT, + ConfigDef.Importance.HIGH, + "The format to use for the minute. The default is '" + + InsertTimestampHeaders.ConfigName.DEFAULT_MINUTE_FORMAT + + "'.") + .define( + InsertTimestampHeaders.ConfigName.SECOND_FORMAT_CONFIG, + ConfigDef.Type.STRING, + InsertTimestampHeaders.ConfigName.DEFAULT_SECOND_FORMAT, + ConfigDef.Importance.HIGH, + "The format to use for the second. The default is '" + + InsertTimestampHeaders.ConfigName.DEFAULT_SECOND_FORMAT + + "'.") + .define( + InsertTimestampHeaders.ConfigName.DATE_FORMAT_CONFIG, + ConfigDef.Type.STRING, + InsertTimestampHeaders.ConfigName.DEFAULT_DATE_FORMAT, + ConfigDef.Importance.HIGH, + "The format to use for the date. The default is '" + + InsertTimestampHeaders.ConfigName.DEFAULT_DATE_FORMAT + + "'.") + .define( + InsertTimestampHeaders.ConfigName.TIMEZONE, + ConfigDef.Type.STRING, + "UTC", + ConfigDef.Importance.HIGH, + "The timezone to use.") + .define( + InsertTimestampHeaders.ConfigName.LOCALE, + ConfigDef.Type.STRING, + InsertTimestampHeaders.ConfigName.DEFAULT_LOCALE, + ConfigDef.Importance.HIGH, + "The locale to use.") + .define( + ConfigName.ROLLING_WINDOW_SIZE_CONFIG, + ConfigDef.Type.INT, + ConfigName.DEFAULT_ROLLING_WINDOW_VALUE, + ConfigDef.Importance.HIGH, + "The rolling window size. For example, if the rolling window is set to 'minutes' " + + "and the rolling window value is set to 15, then the rolling window " + + "is 15 minutes.") + .define( + ConfigName.ROLLING_WINDOW_TYPE_CONFIG, + ConfigDef.Type.STRING, + ConfigName.DEFAULT_ROLLING_WINDOW.name(), + ConfigDef.Importance.HIGH, + "The rolling window type. The allowed values are hours, minutes or seconds."); + + CONFIG_DEF = RecordFieldTimestamp.extendConfigDef(replicated); + } public InsertRollingFieldTimestampHeaders() { super(); diff --git a/src/main/java/io/lenses/connect/smt/header/InsertRollingTimestampHeaders.java b/src/main/java/io/lenses/connect/smt/header/InsertRollingTimestampHeaders.java index 6fdd5d4..baa2ba0 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertRollingTimestampHeaders.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertRollingTimestampHeaders.java @@ -45,9 +45,7 @@ abstract class InsertRollingTimestampHeaders> "The rolling window type. The allowed values are hours, minutes or seconds."); private RollingWindowDetails rollingWindowDetails; - public InsertRollingTimestampHeaders() { - super(CONFIG_DEF); - } + public InsertRollingTimestampHeaders() {} interface ConfigName { String ROLLING_WINDOW_TYPE_CONFIG = "rolling.window.type"; diff --git a/src/main/java/io/lenses/connect/smt/header/InsertTimestampHeaders.java b/src/main/java/io/lenses/connect/smt/header/InsertTimestampHeaders.java index 55a0593..e2579d4 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertTimestampHeaders.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertTimestampHeaders.java @@ -153,7 +153,7 @@ interface ConfigName { String DEFAULT_LOCALE = "en"; } - protected InsertTimestampHeaders(ConfigDef configDef) {} + protected InsertTimestampHeaders() {} protected abstract Instant getInstant(R r); @@ -192,7 +192,7 @@ protected void configureInternal(SimpleConfig config) {} @Override public void configure(Map props) { - final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + final SimpleConfig config = new SimpleConfig(config(), props); final String timeZoneStr = config.getString(ConfigName.TIMEZONE); timeZone = TimeZone.getTimeZone(timeZoneStr).toZoneId(); String prefixName = config.getString(ConfigName.HEADER_PREFIX_NAME); diff --git a/src/main/java/io/lenses/connect/smt/header/InsertWallclockHeaders.java b/src/main/java/io/lenses/connect/smt/header/InsertWallclockHeaders.java index 859d23c..00d55f0 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertWallclockHeaders.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertWallclockHeaders.java @@ -24,9 +24,7 @@ public class InsertWallclockHeaders> extends InsertTimestampHeaders { private Supplier instantSupplier = Instant::now; - public InsertWallclockHeaders() { - super(InsertTimestampHeaders.CONFIG_DEF); - } + public InsertWallclockHeaders() {} // Used solely for testing purposes void setInstantSupplier(Supplier instantSupplier) { diff --git a/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java b/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java index e6f2161..b05062c 100644 --- a/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java +++ b/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java @@ -24,6 +24,7 @@ /** Unit tests for {@link InsertRollingRecordTimestampHeaders}. */ public class InsertRollingFieldTimestampHeadersTest { + @Test public void testRollingWindowEvery15Minutes() { ArrayList> scenarios = new ArrayList<>();