Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-7921 Kafka offset/history store support #54

Merged
merged 4 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,50 @@
*/
public final class ConfigMapping {

private final Map<String, String> config;
/**
* Represents a key in the configuration
*/
public enum KeyType {
/**
* The key is relative to the current prefix
*/
RELATIVE,

/**
* The key is absolute and will never be prefixed (unless specifically documented),
* including situations when the mapping is put into another mapping
*/
ABSOLUTE;
}

public record Key(String name, KeyType type) {
public static Key rel(String key) {
return new Key(key, KeyType.RELATIVE);
}

public static Key abs(String key) {
return new Key(key, KeyType.ABSOLUTE);
}

public static Key root() {
return new Key(null, null);
}

public Key asAbs() {
return abs(name);
}

public Key asRel() {
return rel(name);
}

@Override
public String toString() {
return name;
}
}

private final Map<Key, String> config;
private final String prefix;

public static ConfigMapping from(Map<String, ?> properties) {
Expand All @@ -44,7 +87,31 @@ public ConfigMapping(String prefix) {
this.prefix = prefix;
}

public Map<String, String> getAsMap() {
/**
* Creates new ConfigMapping, with the same prefix as this one, but sets all keys as absolute
*
* @return new ConfigMapping with all keys as absolute
*/
public ConfigMapping asAbsolute() {
return ConfigMapping.prefixed(prefix).putAllAbs(this);
}

/**
* Creates new ConfigMapping, with the same prefix as this one, but sets all keys as relative
*
* @return new ConfigMapping with all keys as relative
*/
public ConfigMapping asRelative() {
return ConfigMapping.prefixed(prefix).putAllRel(this);
}

public Map<String, String> getAsMapSimple() {
return config.entrySet()
.stream()
.collect(Collectors.toMap(e -> e.getKey().name(), Map.Entry::getValue));
}

public Map<Key, String> getAsMap() {
return config;
}

Expand All @@ -56,23 +123,33 @@ public String getAsString() {
}

public ConfigMapping rootValue(Object value) {
putInternal(value);
putInternal(value, Key.root());
return this;
}

public ConfigMapping put(String key, Object value) {
putInternal(value, key);
putInternal(value, Key.rel(key));
return this;
}

public ConfigMapping putAll(ConfigMappable resource) {
putAll(resource.asConfiguration());
/**
* Puts the value with the given key as absolute key (that is the key will never be prefixed)
*
* @param key the absolute key to put the value under
* @param value the value to put
* @return this mapping
*/
public ConfigMapping putAbs(String key, Object value) {
putInternal(value, Key.abs(key));
return this;
}

public ConfigMapping putAll(ConfigMappable resource) {
return putAll(resource.asConfiguration());
}

public ConfigMapping putAll(String key, ConfigMappable resource) {
putAll(key, resource.asConfiguration());
return this;
return putAll(key, resource.asConfiguration());
}

public ConfigMapping putAll(ConfigMapping config) {
Expand All @@ -86,7 +163,27 @@ public ConfigMapping putAll(String key, ConfigMapping config) {
}

public ConfigMapping putAll(Map<String, ?> props) {
props.forEach((key, value) -> putInternal(value, key));
props.forEach((key, value) -> putInternal(value, Key.rel(key)));
return this;
}

/**
* Puts all values from the given configuration, but sets all keys as absolute
* @param config the configuration to put
* @return this mapping
*/
public ConfigMapping putAllAbs(ConfigMapping config) {
config.getAsMap().forEach((key, value) -> putInternal(value, key.asAbs()));
return this;
}

/**
* Puts all values from the given configuration, but sets all keys as relative
* @param config the configuration to put
* @return this mapping
*/
public ConfigMapping putAllRel(ConfigMapping config) {
config.getAsMap().forEach((key, value) -> putInternal(value, key.asRel()));
return this;
}

Expand Down Expand Up @@ -121,19 +218,29 @@ public <T extends ConfigMappable> ConfigMapping putMap(String key, Map<String, T
return this;
}

private void putInternal(Object value, String... keys) {
private void putInternal(Object value, Key key) {
putInternal(value, null, key);
}

private void putInternal(Object value, String key, Key subKey) {
if (value == null) {
return;
}
var key = prefix(keys);
config.put(key, String.valueOf(value));
var combined = prefix(key, subKey);
config.put(combined, String.valueOf(value));
}

private String prefix(String... keys) {
return Stream.concat(Stream.of(prefix), Stream.of(keys))
private Key prefix(String key, Key subKey) {
if (subKey.type == KeyType.ABSOLUTE) {
return subKey;
}

var combined = Stream.concat(Stream.of(prefix), Stream.of(key, subKey.name))
.filter(Objects::nonNull)
.filter(not(String::isBlank))
.collect(Collectors.joining("."));

return Key.rel(combined);
}

public String md5Sum() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.debezium.operator.api.model.source.storage.Store;
import io.debezium.operator.api.model.source.storage.offset.FileOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.InMemoryOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.KafkaOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.RedisOffsetStore;
import io.debezium.operator.docs.annotations.Documented;
import io.sundr.builder.annotations.Buildable;
Expand All @@ -32,6 +33,8 @@ public class Offset implements ConfigMappable {
private InMemoryOffsetStore memory;
@JsonPropertyDescription("Redis backed offset store configuration")
private RedisOffsetStore redis;
@JsonPropertyDescription("Kafka backing store configuration")
private KafkaOffsetStore kafka;
@JsonPropertyDescription("Arbitrary offset store configuration")
private CustomStore store;
@JsonPropertyDescription("Interval at which to try commiting offsets")
Expand Down Expand Up @@ -70,6 +73,14 @@ public void setRedis(RedisOffsetStore redis) {
this.redis = redis;
}

public KafkaOffsetStore getKafka() {
return kafka;
}

public void setKafka(KafkaOffsetStore kafka) {
this.kafka = kafka;
}

public CustomStore getStore() {
return store;
}
Expand All @@ -80,7 +91,7 @@ public void setStore(CustomStore store) {

@JsonIgnore
public Store getActiveStore() {
return Stream.of(file, memory, redis, store)
return Stream.of(file, memory, redis, kafka, store)
.filter(Objects::nonNull)
.findFirst()
.orElseGet(InMemoryOffsetStore::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

import io.debezium.operator.api.config.ConfigMappable;
import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.ConfigProperties;
import io.debezium.operator.api.model.source.storage.CustomStore;
import io.debezium.operator.api.model.source.storage.Store;
import io.debezium.operator.api.model.source.storage.schema.FileSchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.schema.InMemorySchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.schema.KafkaSchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.schema.RedisSchemaHistoryStore;
import io.debezium.operator.docs.annotations.Documented;
import io.sundr.builder.annotations.Buildable;
Expand All @@ -31,8 +33,12 @@ public class SchemaHistory implements ConfigMappable {
private InMemorySchemaHistoryStore memory;
@JsonPropertyDescription("Redis backed schema history store configuration")
private RedisSchemaHistoryStore redis;
@JsonPropertyDescription("Kafka backed schema history store configuration")
private KafkaSchemaHistoryStore kafka;
@JsonPropertyDescription("Arbitrary schema history store configuration")
private CustomStore store;
@JsonPropertyDescription("Additional common schema history store configuration properties.")
private ConfigProperties config = new ConfigProperties();;

public FileSchemaHistoryStore getFile() {
return file;
Expand All @@ -58,6 +64,14 @@ public void setRedis(RedisSchemaHistoryStore redis) {
this.redis = redis;
}

public KafkaSchemaHistoryStore getKafka() {
return kafka;
}

public void setKafka(KafkaSchemaHistoryStore kafka) {
this.kafka = kafka;
}

public CustomStore getStore() {
return store;
}
Expand All @@ -66,16 +80,26 @@ public void setStore(CustomStore store) {
this.store = store;
}

public ConfigProperties getConfig() {
return config;
}

public void setConfig(ConfigProperties config) {
this.config = config;
}

@JsonIgnore
public Store getActiveStore() {
return Stream.of(file, memory, redis, store)
return Stream.of(file, memory, redis, kafka, store)
.filter(Objects::nonNull)
.findFirst()
.orElseGet(InMemorySchemaHistoryStore::new);
}

@Override
public ConfigMapping asConfiguration() {
return getActiveStore().asConfiguration();
return ConfigMapping.empty()
.putAll(config)
.putAll(getActiveStore());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

public abstract class AbstractStore implements Store {
@JsonIgnore
private final String type;
protected final String type;
@JsonIgnore
private final String configPrefix;
protected final String configPrefix;

@JsonPropertyDescription("Additional store configuration properties.")
private ConfigProperties config = new ConfigProperties();
Expand Down Expand Up @@ -43,14 +43,13 @@ public void setConfig(ConfigProperties config) {

@Override
public ConfigMapping asConfiguration() {
var typeConfig = ConfigMapping.prefixed(configPrefix)
.putAll(config)
.putAll(typeConfiguration());

return ConfigMapping.empty()
.rootValue(type)
.putAll(typeConfig);
.putAll(configPrefix, config)
.putAll(configPrefix, typeConfiguration());
}

public abstract ConfigMapping typeConfiguration();
protected ConfigMapping typeConfiguration() {
return ConfigMapping.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void setFileName(String fileName) {
}

@Override
public ConfigMapping typeConfiguration() {
protected ConfigMapping typeConfiguration() {
return ConfigMapping.empty()
.put("filename", fileName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,10 @@
*/
package io.debezium.operator.api.model.source.storage;

import io.debezium.operator.api.config.ConfigMapping;

public class InMemoryStore extends AbstractStore {

public InMemoryStore(String type) {
super(type);
}

@Override
public ConfigMapping typeConfiguration() {
return ConfigMapping.empty();
}
}
Loading