Skip to content

Commit

Permalink
DBZ-3369 Add Schema Registry Lifecycle Manager to test Protobuf conve…
Browse files Browse the repository at this point in the history
…rters
  • Loading branch information
ani-sha authored and jpechane committed Apr 9, 2021
1 parent 8353558 commit 23fdb17
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 3 deletions.
12 changes: 12 additions & 0 deletions debezium-server/debezium-server-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,18 @@
</systemPropertyVariables>
</configuration>
</execution>
<execution>
<id>integration-test-protobuf</id>
<goals>
<goal>integration-test</goal>
</goals>
<configuration>
<systemPropertyVariables>
<debezium.format.key>protobuf</debezium.format.key>
<debezium.format.value>protobuf</debezium.format.value>
</systemPropertyVariables>
</configuration>
</execution>
<execution>
<id>verify</id>
<goals>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import io.debezium.server.events.ConnectorCompletedEvent;
Expand All @@ -30,6 +31,8 @@
@QuarkusTest
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@EnabledIfSystemProperty(named = "test.apicurio", matches = "false", disabledReason = "DebeziumServerIT doesn't run with apicurio profile.")
@DisabledIfSystemProperty(named = "debezium.format.key", matches = "protobuf")
@DisabledIfSystemProperty(named = "debezium.format.value", matches = "protobuf")
public class DebeziumServerIT {

private static final int MESSAGE_COUNT = 4;
Expand All @@ -54,7 +57,7 @@ void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exceptio
}

@Test
public void testPostgres() throws Exception {
public void testPostgresWithJson() throws Exception {
Testing.Print.enable();
final TestConsumer testConsumer = (TestConsumer) server.getConsumer();
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;

import java.util.Collections;
import java.util.List;

import io.debezium.testing.testcontainers.SchemaRegistryTestResourceLifecycleManager;
import io.quarkus.test.junit.QuarkusTestProfile;

public class DebeziumServerSchemaRegistryProfile implements QuarkusTestProfile {

@Override
public List<TestResourceEntry> testResources() {
return Collections.singletonList(new TestResourceEntry(SchemaRegistryTestResourceLifecycleManager.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void testPostgresWithApicurioAvro() throws Exception {
assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT);
assertThat(testConsumer.getValues().get(0)).isInstanceOf(byte[].class);
assertThat(testConsumer.getValues().get(0)).isNotNull();
assertThat(testConsumer.getValues().get(0).equals(0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;

import java.time.Duration;

import javax.enterprise.event.Observes;
import javax.inject.Inject;

import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

@QuarkusTest
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@TestProfile(DebeziumServerSchemaRegistryProfile.class)
@EnabledIfSystemProperty(named = "debezium.format.key", matches = "protobuf")
@EnabledIfSystemProperty(named = "debezium.format.value", matches = "protobuf")
public class DebeziumServerWithSchemaRegistryIT {

private static final int MESSAGE_COUNT = 4;
@Inject
DebeziumServer server;

{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
}

void setupDependencies(@Observes ConnectorStartedEvent event) {
if (!TestConfigSource.isItTest()) {
return;
}

}

void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
if (!event.isSuccess()) {
throw (Exception) event.getError().get();
}
}

@Test
public void testPostgresWithProtobuf() throws Exception {
Testing.Print.enable();
final TestConsumer testConsumer = (TestConsumer) server.getConsumer();
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds()))
.until(() -> (testConsumer.getValues().size() >= MESSAGE_COUNT));
Assertions.assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT);
Assertions.assertThat(testConsumer.getValues().get(0)).isInstanceOf(byte[].class);
Assertions.assertThat(testConsumer.getValues().get(0)).isNotNull();
Assertions.assertThat(testConsumer.getValues().get(0).equals(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public TestConfigSource() {
formatValue = (formatValue != null) ? formatValue : Json.class.getSimpleName().toLowerCase();
integrationTest.put("debezium.format.key", formatKey);
integrationTest.put("debezium.format.value", formatValue);
integrationTest.put("debezium.format.schema.registry.url", "http://localhost:8081");
}

unitTest.put("debezium.sink.type", "test");
Expand All @@ -66,7 +65,6 @@ public TestConfigSource() {
unitTest.put("debezium.transforms", "hoist");
unitTest.put("debezium.transforms.hoist.type", "org.apache.kafka.connect.transforms.HoistField$Value");
unitTest.put("debezium.transforms.hoist.field", "line");
unitTest.put("debezium.format.schema.registry.url", "http://localhost:8081");

if (isItTest()) {
config = integrationTest;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.testcontainers;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {

private static final String SCHEMA_REGISTRY_DOCKER_IMAGE_NAME = "confluentinc/cp-schema-registry:6.0.2";
private static final DockerImageName SCHEMA_REGISTRY_DOCKER_IMAGE = DockerImageName.parse(SCHEMA_REGISTRY_DOCKER_IMAGE_NAME);
private static final Integer SCHEMA_REGISTRY_EXPOSED_PORT = 8081;

SchemaRegistryContainer() {
super(SCHEMA_REGISTRY_DOCKER_IMAGE);
addExposedPorts(SCHEMA_REGISTRY_EXPOSED_PORT);
}

public SchemaRegistryContainer withKafka(KafkaContainer kafkaContainer) {
return withKafka(kafkaContainer.getNetwork(), kafkaContainer.getNetworkAliases().get(0) + ":9092");
}

public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) {
withNetwork(network);
withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry");
withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081");
withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + bootstrapServers);
return self();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.testcontainers;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;

public class SchemaRegistryTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {

private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRegistryTestResourceLifecycleManager.class);

private static final Network network = Network.newNetwork();
private static final Integer PORT = 8081;

public static KafkaContainer kafkaContainer = new KafkaContainer()
.withNetwork(network);

private static final SchemaRegistryContainer schemaRegistryContainer = new SchemaRegistryContainer()
.withNetwork(network)
.withKafka(kafkaContainer)
.withLogConsumer(new Slf4jLogConsumer(LOGGER))
.dependsOn(kafkaContainer)
.withStartupTimeout(Duration.ofSeconds(90));

@Override
public Map<String, String> start() {
Startables.deepStart(Stream.of(kafkaContainer, schemaRegistryContainer)).join();

Map<String, String> params = new ConcurrentHashMap<>();
params.put("debezium.format.schema.registry.url", getSchemaRegistryUrl());

return params;
}

@Override
public void stop() {
try {
if (schemaRegistryContainer != null) {
schemaRegistryContainer.stop();
}
}
catch (Exception e) {
// ignored
}
}

private static String getSchemaRegistryUrl() {
return "http://" + schemaRegistryContainer.getHost() + ":" + schemaRegistryContainer.getMappedPort(PORT);
}
}

0 comments on commit 23fdb17

Please sign in to comment.