Skip to content

Commit

Permalink
DBZ-2699 implement skipped.operation in core
Browse files Browse the repository at this point in the history
  • Loading branch information
Hossein Torabi authored and jpechane committed Apr 8, 2021
1 parent bf01380 commit 5d6705c
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static junit.framework.TestCase.assertEquals;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -2491,6 +2492,45 @@ public void shouldThrowWhenTableFiltersIsEmpty() throws Exception {
assertTrue(logInterceptor.containsStacktraceElement("No table filters found for filtered publication cdc"));
}

@Test
public void shouldEmitNoEventsForSkippedCreateOperations() throws Exception {
Testing.Print.enable();
TestHelper.dropDefaultReplicationSlot();
TestHelper.execute(SETUP_TABLES_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SKIPPED_OPERATIONS, Envelope.Operation.UPDATE.code())
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
assertNoRecordsToConsume();

// insert record and update it
TestHelper.execute("INSERT into s1.a VALUES(201, 1);");
TestHelper.execute("UPDATE s1.a SET aa=201 WHERE pk=201");
TestHelper.execute("INSERT into s1.a VALUES(202, 2)");
TestHelper.execute("UPDATE s1.a SET aa=202 WHERE pk=202");
TestHelper.execute("INSERT into s1.a VALUES(203, 3)");
TestHelper.execute("UPDATE s1.a SET aa=203 WHERE pk=203");

SourceRecords records = consumeRecordsByTopic(3);
List<SourceRecord> recordsForTopic = records.recordsForTopic(topicName("s1.a"));

assertThat(recordsForTopic.size()).isEqualTo(3);
assertInsert(recordsForTopic.get(0), PK_FIELD, 201);
assertInsert(recordsForTopic.get(1), PK_FIELD, 202);
assertInsert(recordsForTopic.get(2), PK_FIELD, 203);

recordsForTopic.forEach(record -> {
Struct value = (Struct) record.value();
String op = value.getString("op");
assertNotEquals(op, Envelope.Operation.UPDATE.code());
});

}

private CompletableFuture<Void> batchInsertRecords(long recordsCount, int batchSize) {
String insertStmt = "INSERT INTO text_table(j, jb, x, u) " +
"VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,42 @@ public void testMaxLsnSelectStatementWithFalse() throws Exception {
stopConnector();
}

@Test
@FixFor("DBZ-2699")
public void shouldEmitNoEventsForSkippedUpdateAndDeleteOperations() throws Exception {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.SKIPPED_OPERATIONS, "u,d")
.build();

start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
TestHelper.waitForSnapshotToBeCompleted();
consumeRecordsByTopic(1);

connection.execute("INSERT INTO tablea VALUES(201, 'insert201')");
connection.execute("UPDATE tablea SET cola='insert201-update' WHERE id=201");
connection.execute("INSERT INTO tablea VALUES(202, 'insert202')");
connection.execute("DELETE FROM tablea WHERE id=202");
connection.execute("INSERT INTO tablea VALUES(203, 'insert203')");

final SourceRecords records = consumeRecordsByTopic(3);
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
Assertions.assertThat(tableA).hasSize(3);
tableA.forEach((SourceRecord record) -> {
Struct value = (Struct) record.value();
assertThat(value.get("op")).isEqualTo(Envelope.Operation.CREATE.code());
assertThat(value.get("op")).isNotEqualTo(Envelope.Operation.UPDATE.code());
assertThat(value.get("op")).isNotEqualTo(Envelope.Operation.DELETE.code());
});

assertInsert(tableA.get(0), "id", 201);
assertInsert(tableA.get(1), "id", 202);
assertInsert(tableA.get(2), "id", 203);

}

private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -74,6 +75,8 @@ public class EventDispatcher<T extends DataCollectionId> {
private final InconsistentSchemaHandler<T> inconsistentSchemaHandler;
private final TransactionMonitor transactionMonitor;
private final CommonConnectorConfig connectorConfig;
private final Set<Operation> skippedOperations;
private final boolean neverSkip;

private final Schema schemaChangeKeySchema;
private final Schema schemaChangeValueSchema;
Expand Down Expand Up @@ -114,6 +117,8 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> t
this.streamingReceiver = new StreamingChangeRecordReceiver();
this.emitTombstonesOnDelete = connectorConfig.isEmitTombstoneOnDelete();
this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema;
this.skippedOperations = connectorConfig.getSkippedOps();
this.neverSkip = this.skippedOperations.isEmpty();

this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, this::enqueueTransactionMessage);
this.signal = new Signal(connectorConfig, this);
Expand Down Expand Up @@ -208,12 +213,15 @@ public void changeRecord(DataCollectionSchema schema,
OffsetContext offset,
ConnectHeaders headers)
throws InterruptedException {
transactionMonitor.dataEvent(dataCollectionId, offset, key, value);
eventListener.onEvent(dataCollectionId, offset, key, value);
if (operation == Operation.CREATE && signal.isSignal(dataCollectionId)) {
signal.process(value, offset);
}
streamingReceiver.changeRecord(schema, operation, key, value, offset, headers);

if (neverSkip || !skippedOperations.contains(operation)) {
transactionMonitor.dataEvent(dataCollectionId, offset, key, value);
eventListener.onEvent(dataCollectionId, offset, key, value);
streamingReceiver.changeRecord(schema, operation, key, value, offset, headers);
}
}
});
handled = true;
Expand Down
6 changes: 6 additions & 0 deletions documentation/modules/ROOT/pages/connectors/cassandra.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,12 @@ refreshing the cached Cassandra table schemas.
|Whether field names will be sanitized to adhere to Avro naming requirements.
See {link-prefix}:{link-avro-serialization}#avro-naming[Avro naming] for more details.

|[[cassandra-property-skipped-operations]]<<cassandra-property-skipped-operations, `+skipped.operations+`>>
|
| comma-separated list of oplog operations that will be skipped during streaming.
The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes.
By default, no operations are skipped.

|===

If the Cassandra agent use SSL to connect to Cassandra node, an SSL config file is required.
Expand Down
6 changes: 6 additions & 0 deletions documentation/modules/ROOT/pages/connectors/db2.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,12 @@ A possible use case for setting these properties is large, append-only tables. Y
|`false`
|Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify `true` if you want the connector to do this. See {link-prefix}:{link-db2-connector}#db2-transaction-metadata[Transaction metadata] for details.

|[[db2-property-skipped-operations]]<<db2-property-skipped-operations, `+skipped.operations+`>>
|
| comma-separated list of oplog operations that will be skipped during streaming.
The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes.
By default, no operations are skipped.

|===

[id="db2-pass-through-properties"]
Expand Down
6 changes: 6 additions & 0 deletions documentation/modules/ROOT/pages/connectors/mysql.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2492,6 +2492,12 @@ endif::community[]
|`false`
|Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify `true` if you want the connector to do this. See {link-prefix}:{link-mysql-connector}#mysql-transaction-metadata[Transaction metadata] for details.

|[[mysql-property-skipped-operations]]<<mysql-property-skipped-operations, `+skipped.operations+`>>
|
| comma-separated list of oplog operations that will be skipped during streaming.
The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes.
By default, no operations are skipped.

|===

[id="mysql-pass-through-configuration-properties"]
Expand Down
6 changes: 6 additions & 0 deletions documentation/modules/ROOT/pages/connectors/oracle.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,12 @@ When set to `0`, log mining history is is disabled.
|A comma-separated list of RAC node host names or addresses.
This field is required to enable Oracle RAC support.

|[[oracle-property-skipped-operations]]<<oracle-property-skipped-operations, `+skipped.operations+`>>
|
| comma-separated list of oplog operations that will be skipped during streaming.
The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes.
By default, no operations are skipped.

|===

[[oracle-monitoring]]
Expand Down
6 changes: 6 additions & 0 deletions documentation/modules/ROOT/pages/connectors/postgresql.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2867,6 +2867,12 @@ If the setting of `toasted.value.placeholder` starts with the `hex:` prefix it i
|10000 (10 seconds)
|The number of milliseconds to wait before restarting a connector after a retriable error occurs.

|[[postgresql-property-skipped-operations]]<<postgresql-property-skipped-operations, `+skipped.operations+`>>
|
| comma-separated list of oplog operations that will be skipped during streaming.
The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes.
By default, no operations are skipped.

|===

[id="postgresql-pass-through-properties"]
Expand Down
6 changes: 6 additions & 0 deletions documentation/modules/ROOT/pages/connectors/sqlserver.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2239,6 +2239,12 @@ See {link-prefix}:{link-sqlserver-connector}#sqlserver-transaction-metadata[Tran
|10000 (10 seconds)
|The number of milli-seconds to wait before restarting a connector after a retriable error occurs.

|[[sqlserver-property-skipped-operations]]<<sqlserver-property-skipped-operations, `+skipped.operations+`>>
|
| comma-separated list of oplog operations that will be skipped during streaming.
The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes.
By default, no operations are skipped.

|===

The connector also supports _pass-through_ configuration properties that are used when creating the Kafka producer and consumer. Specifically, all connector configuration properties that begin with the `database.history.producer.` prefix are used (without the prefix) when creating the Kafka producer that writes to the database history, and all those that begin with the prefix `database.history.consumer.` are used (without the prefix) when creating the Kafka consumer that reads the database history upon connector startup.
Expand Down
6 changes: 6 additions & 0 deletions documentation/modules/ROOT/pages/connectors/vitess.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,12 @@ The following _advanced_ configuration properties have defaults that work in mos
`false` if not.
|Indicates whether field names are sanitized to adhere to {link-prefix}:{link-avro-serialization}#avro-naming[Avro naming requirements].

|[[vitess-property-skipped-operations]]<<vitess-property-skipped-operations, `+skipped.operations+`>>
|
| comma-separated list of oplog operations that will be skipped during streaming.
The operations include: `c` for inserts/create, `u` for updates, and `d` for deletes.
By default, no operations are skipped.

|===

[id="vitess-pass-through-properties"]
Expand Down

0 comments on commit 5d6705c

Please sign in to comment.