Skip to content

Commit

Permalink
KAFKA-5540: Deprecate internal converter configs (KIP-174)
Browse files Browse the repository at this point in the history
Implementation of [KIP-174](https://cwiki.apache.org/confluence/display/KAFKA/KIP-174+-+Deprecate+and+remove+internal+converter+configs+in+WorkerConfig)

Configuration properties 'internal.key.converter' and 'internal.value.converter'
are deprecated, and default to org.apache.kafka.connect.json.JsonConverter.

Warnings are logged if values are specified for either, or if properties that
appear to configure instances of internal converters (i.e., ones prefixed with
either 'internal.key.converter.' or 'internal.value.converter.') are given.

The property 'schemas.enable' is also defaulted to false for internal
JsonConverter instances (both for keys and values) if it isn't specified.

Documentation and code have also been updated with deprecation notices and
annotations, respectively.

Unit tests have been updated in `PluginsTest` to account for the new defaults for `schemas.enable` for internal key/value converters, and to ensure that (for the time being), internal key/value converters are still configurable despite being deprecated.

Author: Chris Egerton <[email protected]>
Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Randall Hauch <[email protected]>, Ewen Cheslack-Postava <[email protected]>

Closes apache#4693 from C0urante/kafka-5540
  • Loading branch information
C0urante authored and ewencp committed May 29, 2018
1 parent 3a8d3a7 commit a64ab91
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 47 deletions.
7 changes: 4 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1318,12 +1318,14 @@ project(':connect:runtime') {
archivesBaseName = "connect-runtime"

dependencies {

compile project(':connect:api')
compile project(":connect:transforms")
compile project(':clients')
compile project(':tools')
compile libs.slf4jApi
compile project(':connect:json')
compile project(':connect:transforms')

compile libs.slf4jApi
compile libs.jacksonJaxrsJsonProvider
compile libs.jerseyContainerServlet
compile libs.jerseyHk2
Expand All @@ -1342,7 +1344,6 @@ project(':connect:runtime') {
testCompile libs.powermockJunit4
testCompile libs.powermockEasymock

testCompile project(":connect:json")
testCompile project(':clients').sourceSets.test.output

testRuntime libs.slf4jlog4j
Expand Down
7 changes: 0 additions & 7 deletions config/connect-distributed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@ value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets, config, and status data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset, config, and status data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
Expand Down
7 changes: 0 additions & 7 deletions config/connect-standalone.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@ value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -37,6 +42,7 @@
* Common base class providing configuration for Kafka Connect workers, whether standalone or distributed.
*/
public class WorkerConfig extends AbstractConfig {
private static final Logger log = LoggerFactory.getLogger(WorkerConfig.class);

public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String BOOTSTRAP_SERVERS_DOC
Expand Down Expand Up @@ -73,23 +79,35 @@ public class WorkerConfig extends AbstractConfig {
" header values to strings and deserialize them by inferring the schemas.";
public static final String HEADER_CONVERTER_CLASS_DEFAULT = SimpleHeaderConverter.class.getName();

/**
* @deprecated As of 2.0.0
*/
@Deprecated
public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter";
public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
" This controls the format of the keys in messages written to or read from Kafka, and since this is" +
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro." +
" This setting controls the format used for internal bookkeeping data used by the framework, such as" +
" configs and offsets, so users can typically use any functioning Converter implementation.";
" configs and offsets, so users can typically use any functioning Converter implementation." +
" Deprecated; will be removed in an upcoming version.";

/**
* @deprecated As of 2.0.0
*/
@Deprecated
public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter";
public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC =
"Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
" This controls the format of the values in messages written to or read from Kafka, and since this is" +
" independent of connectors it allows any connector to work with any serialization format." +
" Examples of common formats include JSON and Avro." +
" This setting controls the format used for internal bookkeeping data used by the framework, such as" +
" configs and offsets, so users can typically use any functioning Converter implementation.";
" configs and offsets, so users can typically use any functioning Converter implementation." +
" Deprecated; will be removed in an upcoming version.";

private static final Class<? extends Converter> INTERNAL_CONVERTER_DEFAULT = JsonConverter.class;

public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
= "task.shutdown.graceful.timeout.ms";
Expand Down Expand Up @@ -190,9 +208,9 @@ protected static ConfigDef baseConfigDef() {
Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
.define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
.define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, INTERNAL_CONVERTER_DEFAULT,
Importance.LOW, INTERNAL_KEY_CONVERTER_CLASS_DOC)
.define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
.define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, INTERNAL_CONVERTER_DEFAULT,
Importance.LOW, INTERNAL_VALUE_CONVERTER_CLASS_DOC)
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
Expand Down Expand Up @@ -239,6 +257,58 @@ protected static ConfigDef baseConfigDef() {
Importance.LOW, HEADER_CONVERTER_CLASS_DOC);
}

private void logInternalConverterDeprecationWarnings(Map<String, String> props) {
String[] deprecatedConfigs = new String[] {
INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
INTERNAL_VALUE_CONVERTER_CLASS_CONFIG
};
for (String config : deprecatedConfigs) {
if (props.containsKey(config)) {
Class<?> internalConverterClass = getClass(config);
logDeprecatedProperty(config, internalConverterClass.getCanonicalName(), INTERNAL_CONVERTER_DEFAULT.getCanonicalName(), null);
if (internalConverterClass.equals(INTERNAL_CONVERTER_DEFAULT)) {
// log the properties for this converter ...
for (Map.Entry<String, Object> propEntry : originalsWithPrefix(config + ".").entrySet()) {
String prop = propEntry.getKey();
String propValue = propEntry.getValue().toString();
String defaultValue = JsonConverterConfig.SCHEMAS_ENABLE_CONFIG.equals(prop) ? "false" : null;
logDeprecatedProperty(config + "." + prop, propValue, defaultValue, config);
}
}
}
}
}

private void logDeprecatedProperty(String propName, String propValue, String defaultValue, String prefix) {
String prefixNotice = prefix != null
? " (along with all configuration for '" + prefix + "')"
: "";
if (defaultValue != null && defaultValue.equalsIgnoreCase(propValue)) {
log.info(
"Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. "
+ "The specified value matches the default, so this property can be safely removed from the worker configuration.",
propName,
prefixNotice,
propValue
);
} else if (defaultValue != null) {
log.warn(
"Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release. "
+ "The specified value '{}' does NOT match the default and recommended value '{}'.",
propName,
prefixNotice,
propValue,
defaultValue
);
} else {
log.warn(
"Worker configuration property '{}'{} is deprecated and may be removed in an upcoming release.",
propName,
prefixNotice
);
}
}

@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
Expand All @@ -253,5 +323,6 @@ public static List<String> pluginLocations(Map<String, String> props) {

public WorkerConfig(ConfigDef definition, Map<String, String> props) {
super(definition, props);
logInternalConverterDeprecationWarnings(props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
Expand Down Expand Up @@ -97,6 +99,11 @@ protected static <U> Class<? extends U> pluginClass(
);
}

protected static boolean isInternalConverter(String classPropertyName) {
return classPropertyName.equals(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG)
|| classPropertyName.equals(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG);
}

public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
ClassLoader current = Thread.currentThread().getContextClassLoader();
if (!current.equals(loader)) {
Expand Down Expand Up @@ -195,8 +202,9 @@ public Task newTask(Class<? extends Task> taskClass) {
* @throws ConnectException if the {@link Converter} implementation class could not be found
*/
public Converter newConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) {
if (!config.originals().containsKey(classPropertyName)) {
// This configuration does not define the converter via the specified property name
if (!config.originals().containsKey(classPropertyName) && !isInternalConverter(classPropertyName)) {
// This configuration does not define the converter via the specified property name, and
// it does not represent an internal converter (which has a default available)
return null;
}
Converter plugin = null;
Expand Down Expand Up @@ -236,6 +244,18 @@ public Converter newConverter(AbstractConfig config, String classPropertyName, C
Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix);
log.debug("Configuring the {} converter with configuration:{}{}",
isKeyConverter ? "key" : "value", System.lineSeparator(), converterConfig);

// Have to override schemas.enable from true to false for internal JSON converters
// Don't have to warn the user about anything since all deprecation warnings take place in the
// WorkerConfig class
if (plugin instanceof JsonConverter && isInternalConverter(classPropertyName)) {
// If they haven't explicitly specified values for internal.key.converter.schemas.enable
// or internal.value.converter.schemas.enable, we can safely default them to false
if (!converterConfig.containsKey(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)) {
converterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
}
}

plugin.configure(converterConfig, isKeyConverter);
return plugin;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class PluginsTest {
private AbstractConfig config;
private TestConverter converter;
private TestHeaderConverter headerConverter;
private TestInternalConverter internalConverter;

@BeforeClass
public static void beforeAll() {
Expand All @@ -71,10 +73,8 @@ public void setup() {
props.put("value.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true");
props.put("key.converter.extra.config", "foo1");
props.put("value.converter.extra.config", "foo2");
props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
props.put("internal.key.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
props.put("internal.value.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, TestInternalConverter.class.getName());
props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, TestInternalConverter.class.getName());
props.put("internal.key.converter.extra.config", "bar1");
props.put("internal.value.converter.extra.config", "bar2");
props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName());
Expand Down Expand Up @@ -102,15 +102,17 @@ public void shouldInstantiateAndConfigureConverters() {

@Test
public void shouldInstantiateAndConfigureInternalConverters() {
instantiateAndConfigureConverter(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER);
// Validate extra configs got passed through to overridden converters
assertEquals("false", converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
assertEquals("bar1", converter.configs.get("extra.config"));

instantiateAndConfigureConverter(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
// Validate extra configs got passed through to overridden converters
assertEquals("false", converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
assertEquals("bar2", converter.configs.get("extra.config"));
instantiateAndConfigureInternalConverter(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER);
// Validate schemas.enable is defaulted to false for internal converter
assertEquals(false, internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
// Validate internal converter properties can still be set
assertEquals("bar1", internalConverter.configs.get("extra.config"));

instantiateAndConfigureInternalConverter(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
// Validate schemas.enable is defaulted to false for internal converter
assertEquals(false, internalConverter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
// Validate internal converter properties can still be set
assertEquals("bar2", internalConverter.configs.get("extra.config"));
}

@Test
Expand Down Expand Up @@ -163,6 +165,16 @@ protected void instantiateAndConfigureConverter(String configPropName, ClassLoad
assertNotNull(converter);
}

protected void instantiateAndConfigureHeaderConverter(String configPropName) {
headerConverter = (TestHeaderConverter) plugins.newHeaderConverter(config, configPropName, ClassLoaderUsage.CURRENT_CLASSLOADER);
assertNotNull(headerConverter);
}

protected void instantiateAndConfigureInternalConverter(String configPropName, ClassLoaderUsage classLoaderUsage) {
internalConverter = (TestInternalConverter) plugins.newConverter(config, configPropName, classLoaderUsage);
assertNotNull(internalConverter);
}

protected void assertConverterType(ConverterType type, Map<String, ?> props) {
assertEquals(type.getName(), props.get(ConverterConfig.TYPE_CONFIG));
}
Expand Down Expand Up @@ -230,4 +242,13 @@ public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] val
public void close() throws IOException {
}
}
}

public static class TestInternalConverter extends JsonConverter {
public Map<String, ?> configs;

public void configure(Map<String, ?> configs) {
this.configs = configs;
super.configure(configs);
}
}
}
9 changes: 9 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2
<li>New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from older version. </li>
<li><a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for Kafka Streams repartition topics by setting its default value to <code>Long.MAX_VALUE</code>.</li>
<li>Updated <code>ProcessorStateManager</code> APIs in Kafka Streams for registering state stores to the processor topology. For more details please read the Streams <a href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_200">Upgrade Guide</a>.</li>
<li>
In earlier releases, Connect's worker configuration required the <code>internal.key.converter</code> and <code>internal.value.converter</code> properties.
In 2.0, these are <a href="https://cwiki.apache.org/confluence/x/AZQ7B">no longer required</a> and default to the JSON converter.
You may safely remove these properties from your Connect standalone and distributed worker configurations:<br />
<code>internal.key.converter=org.apache.kafka.connect.json.JsonConverter</code>
<code>internal.key.converter.schemas.enable=false</code>
<code>internal.value.converter=org.apache.kafka.connect.json.JsonConverter</code>
<code>internal.value.converter.schemas.enable=false</code>
</li>
</ul>

<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>
Expand Down
Loading

0 comments on commit a64ab91

Please sign in to comment.