Skip to content

Commit

Permalink
DBZ-8501 Support for JDBC offset and schema history stores
Browse files Browse the repository at this point in the history
  • Loading branch information
jcechace committed Dec 10, 2024
1 parent 248cdec commit 988d711
Show file tree
Hide file tree
Showing 10 changed files with 692 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.debezium.operator.api.model.source.storage.offset.ConfigMapOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.FileOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.InMemoryOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.JdbcOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.KafkaOffsetStore;
import io.debezium.operator.api.model.source.storage.offset.RedisOffsetStore;
import io.debezium.operator.docs.annotations.Documented;
Expand All @@ -37,6 +38,8 @@ public class Offset implements ConfigMappable<DebeziumServer> {
private RedisOffsetStore redis;
@JsonPropertyDescription("Kafka backing store configuration")
private KafkaOffsetStore kafka;
@JsonPropertyDescription("JDBC backing store configuration")
private JdbcOffsetStore jdbc;
@JsonPropertyDescription("Config map backed offset store configuration")
private ConfigMapOffsetStore configMap;
@JsonPropertyDescription("Arbitrary offset store configuration")
Expand Down Expand Up @@ -85,6 +88,14 @@ public void setKafka(KafkaOffsetStore kafka) {
this.kafka = kafka;
}

public JdbcOffsetStore getJdbc() {
return jdbc;
}

public void setJdbc(JdbcOffsetStore jdbc) {
this.jdbc = jdbc;
}

public ConfigMapOffsetStore getConfigMap() {
return configMap;
}
Expand All @@ -103,7 +114,7 @@ public void setStore(CustomStore store) {

@JsonIgnore
public Store getActiveStore() {
return Stream.of(file, memory, redis, kafka, configMap, store)
return Stream.of(file, memory, redis, kafka, jdbc, configMap, store)
.filter(Objects::nonNull)
.findFirst()
.orElseGet(InMemoryOffsetStore::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.debezium.operator.api.model.source.storage.Store;
import io.debezium.operator.api.model.source.storage.schema.FileSchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.schema.InMemorySchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.schema.JdbcSchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.schema.KafkaSchemaHistoryStore;
import io.debezium.operator.api.model.source.storage.schema.RedisSchemaHistoryStore;
import io.debezium.operator.docs.annotations.Documented;
Expand All @@ -36,6 +37,8 @@ public class SchemaHistory implements ConfigMappable<DebeziumServer> {
private RedisSchemaHistoryStore redis;
@JsonPropertyDescription("Kafka backed schema history store configuration")
private KafkaSchemaHistoryStore kafka;
@JsonPropertyDescription("JDBC backed schema history store configuration")
private JdbcSchemaHistoryStore jdbc;
@JsonPropertyDescription("Arbitrary schema history store configuration")
private CustomStore store;
@JsonPropertyDescription("Additional common schema history store configuration properties.")
Expand Down Expand Up @@ -73,6 +76,14 @@ public void setKafka(KafkaSchemaHistoryStore kafka) {
this.kafka = kafka;
}

public JdbcSchemaHistoryStore getJdbc() {
return jdbc;
}

public void setJdbc(JdbcSchemaHistoryStore jdbc) {
this.jdbc = jdbc;
}

public CustomStore getStore() {
return store;
}
Expand All @@ -91,7 +102,7 @@ public void setConfig(ConfigProperties config) {

@JsonIgnore
public Store getActiveStore() {
return Stream.of(file, memory, redis, kafka, store)
return Stream.of(file, memory, redis, kafka, jdbc, store)
.filter(Objects::nonNull)
.findFirst()
.orElseGet(InMemorySchemaHistoryStore::new);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.source.storage;

import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.DebeziumServer;

public class JdbcStore extends AbstractStore {
public static final String CONFIG_PREFIX = "jdbc";

@JsonPropertyDescription("JDBC connection URL")
private String url;

@JsonPropertyDescription("Username used to connect to the storage database")
private String user;

@JsonPropertyDescription("Password used to connect to the storage database")
private String password;

@JsonPropertyDescription("Retry delay on connection failure (in milliseconds)")
private long retryDelay;

@JsonPropertyDescription("Maximum number of retries on connection failure")
private int maxRetries;

public JdbcStore(String type) {
super(CONFIG_PREFIX, type);
}

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public long getRetryDelay() {
return retryDelay;
}

public void setRetryDelay(long retryDelay) {
this.retryDelay = retryDelay;
}

public int getMaxRetries() {
return maxRetries;
}

public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}

@Override
protected ConfigMapping<DebeziumServer> typeConfiguration(DebeziumServer primary) {
return super.typeConfiguration(primary)
.put("url", url)
.put("user", user)
.put("password", password)
.put("wait.retry.delay.ms", retryDelay)
.put("retry.max.attempts", maxRetries);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.source.storage.offset;

import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.DebeziumServer;
import io.debezium.operator.api.model.source.storage.JdbcStore;
import io.debezium.operator.docs.annotations.Documented;
import io.sundr.builder.annotations.Buildable;

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class JdbcOffsetStore extends JdbcStore {

public static final String TYPE = "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore";

@JsonPropertyDescription("The configuration of the offset table")
private JdbcOffsetTableConfig table = new JdbcOffsetTableConfig();

public JdbcOffsetStore() {
super(TYPE);
}

public JdbcOffsetTableConfig getTable() {
return table;
}

public void setTable(JdbcOffsetTableConfig table) {
this.table = table;
}

@Override
protected ConfigMapping<DebeziumServer> typeConfiguration(DebeziumServer primary) {
return super.typeConfiguration(primary)
.putAll("table", table);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.source.storage.offset;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import io.debezium.operator.api.config.ConfigMappable;
import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.DebeziumServer;
import io.debezium.operator.docs.annotations.Documented;
import io.sundr.builder.annotations.Buildable;

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class JdbcOffsetTableConfig implements ConfigMappable<DebeziumServer> {

@JsonPropertyDescription("The name of the offset table")
@JsonProperty(required = false)
private String name;

@JsonPropertyDescription("DDL statement to create the offset table")
@JsonProperty(required = false)
private String ddl;

@JsonPropertyDescription("Statement used to select from the offset table")
@JsonProperty(required = false)
private String select;

@JsonPropertyDescription("Statement used to insert into the offset table")
@JsonProperty(required = false)
private String insert;

@JsonPropertyDescription("Statement used to update the offset table")
@JsonProperty(required = false)
private String delete;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getDdl() {
return ddl;
}

public void setDdl(String ddl) {
this.ddl = ddl;
}

public String getSelect() {
return select;
}

public void setSelect(String select) {
this.select = select;
}

public String getInsert() {
return insert;
}

public void setInsert(String insert) {
this.insert = insert;
}

public String getDelete() {
return delete;
}

public void setDelete(String delete) {
this.delete = delete;
}

@Override
public ConfigMapping<DebeziumServer> asConfiguration(DebeziumServer primary) {
return ConfigMapping.empty(primary)
.put("name", name)
.put("ddl", ddl)
.put("select", select)
.put("insert", insert)
.put("delete", delete);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.api.model.source.storage.schema;

import com.fasterxml.jackson.annotation.JsonPropertyDescription;

import io.debezium.operator.api.config.ConfigMapping;
import io.debezium.operator.api.model.DebeziumServer;
import io.debezium.operator.api.model.source.storage.JdbcStore;
import io.debezium.operator.docs.annotations.Documented;
import io.sundr.builder.annotations.Buildable;

@Documented
@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder", lazyCollectionInitEnabled = false)
public class JdbcSchemaHistoryStore extends JdbcStore {

public static final String TYPE = "io.debezium.storage.jdbc.history.JdbcSchemaHistory";

@JsonPropertyDescription("The configuration of the offset table")
private JdbcSchemaHistoryTableConfig table = new JdbcSchemaHistoryTableConfig();

public JdbcSchemaHistoryStore() {
super(TYPE);
}

public JdbcSchemaHistoryTableConfig getTable() {
return table;
}

public void setTable(JdbcSchemaHistoryTableConfig table) {
this.table = table;
}

@Override
protected ConfigMapping<DebeziumServer> typeConfiguration(DebeziumServer primary) {
return super.typeConfiguration(primary)
.putAll("table", table);
}
}
Loading

0 comments on commit 988d711

Please sign in to comment.