Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8015 Test offset store configuration and reconcilation #70

Merged
merged 1 commit into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 10 additions & 20 deletions systemtests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
<fabric8.version>6.10.0</fabric8.version>
<junit.jupiter.version>5.10.2</junit.jupiter.version>
<junit.platform.launcher.version>1.10.2</junit.platform.launcher.version>
<log4j.version>2.17.2</log4j.version>
<slf4j.version>1.7.36</slf4j.version>
<log4j2.version>2.23.1</log4j2.version>
<assertj.version>3.25.3</assertj.version>
<testframe.version>0.1.1</testframe.version>
<testframe.version>0.2.1</testframe.version>
<dmt.version>0.0.1-alpha1</dmt.version>
<okhttp.version>5.0.0-alpha.12</okhttp.version>
<awaitility.version>4.2.1</awaitility.version>
<jackson.version>2.16.1</jackson.version>
<lombok.version>1.18.34</lombok.version>
<apache.commons.lang.version>3.14.0</apache.commons.lang.version>
<google.json.simple.version>1.1.1</google.json.simple.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -75,23 +75,8 @@
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down Expand Up @@ -139,6 +124,11 @@
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>${google.json.simple.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ public final class ConfigProperties {
public static final String BUNDLE_PATH = System.getProperty("test.bundle.path", System.getProperty("user.dir") + "/../k8/");
public static final Integer HTTP_POLL_TIMEOUT = Integer.valueOf(System.getProperty("test.http.poll.timeout", "20"));
public static final Integer HTTP_POLL_INTERVAL = Integer.valueOf(System.getProperty("test.http.poll.interval", "200"));
public static final Integer FABRIC8_POLL_INTERVAL = Integer.valueOf(System.getProperty("test.fabric8.poll.interval", "2"));
public static final Integer FABRIC8_POLL_TIMEOUT = Integer.valueOf(System.getProperty("test.fabric8.poll.timeout", "60"));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.operator.systemtests;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.operator.api.model.DebeziumServer;
import io.debezium.operator.api.model.source.Offset;
import io.debezium.operator.api.model.source.OffsetBuilder;
import io.debezium.operator.systemtests.resources.NamespaceHolder;
import io.debezium.operator.systemtests.resources.dmt.DmtClient;
import io.debezium.operator.systemtests.resources.operator.DebeziumOperatorBundleResource;
import io.debezium.operator.systemtests.resources.server.DebeziumServerGenerator;
import io.debezium.operator.systemtests.resources.sinks.RedisResource;
import io.fabric8.kubernetes.client.LocalPortForward;
import io.skodjob.testframe.resources.KubeResourceManager;

public class OffsetStorageTest extends TestBase {
private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Test
void testRedisOffsetStorage() {
String namespace = NamespaceHolder.INSTANCE.getCurrentNamespace();
DebeziumOperatorBundleResource operatorBundleResource = new DebeziumOperatorBundleResource();
operatorBundleResource.configureAsDefault(namespace);
logger.info("Deploying Operator");
operatorBundleResource.deploy();
logger.info("Deploying Debezium Server");
DebeziumServer server = DebeziumServerGenerator.generateDefaultMysqlToRedis(namespace);

Offset offset = new OffsetBuilder()
.withNewRedis()
.withAddress(RedisResource.getDefaultRedisAddress())
.endRedis()
.withFlushMs(10)
.build();
server.getSpec().getSource().setOffset(offset);

KubeResourceManager.getInstance().createResourceWithWait(server);
assertStreamingWorks();

try (LocalPortForward lcp = dmtResource.portForward(portForwardPort, namespace)) {
String redis_offset = DmtClient.readRedisOffsets(portForwardHost, portForwardPort);
assertThat(redis_offset).contains("file");
assertThat(redis_offset).contains("pos");
}
catch (IOException e) {
throw new RuntimeException(e);
}

server.getSpec().getSource().getOffset().getRedis().setKey("metadata:debezium_n:offsets");
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(server);
assertStreamingWorks(10, 20);

try (LocalPortForward lcp = dmtResource.portForward(portForwardPort, namespace)) {
String redis_offset = DmtClient.readRedisOffsets(portForwardHost, portForwardPort, "metadata:debezium_n:offsets");
assertThat(redis_offset).contains("file");
assertThat(redis_offset).contains("pos");
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.io.IOException;
import java.time.Duration;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
Expand All @@ -34,9 +35,9 @@
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestBase {
private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
private final DmtResource dmtResource = new DmtResource();
private final String portForwardHost = "127.0.0.1";
private int portForwardPort = 8080;
protected final DmtResource dmtResource = new DmtResource();
protected final String portForwardHost = "127.0.0.1";
protected int portForwardPort = 8080;

@BeforeAll
void initDefault() {
Expand Down Expand Up @@ -70,18 +71,27 @@ void cleanUp() {
}

public void assertStreamingWorks() {
assertStreamingWorks(10, 10);
}

public void assertStreamingWorks(int messagesToDatabase, int expectedMessages) {
String namespace = NamespaceHolder.INSTANCE.getCurrentNamespace();
try (LocalPortForward lcp = dmtResource.portForward(8080, namespace)) {
DmtClient.waitForDmt(portForwardHost, portForwardPort, Duration.ofSeconds(HTTP_POLL_TIMEOUT));
DmtClient.insertTestDataToDatabase(portForwardHost, portForwardPort, 10);
DmtClient.waitForFilledRedis(portForwardHost, portForwardPort, Duration.ofSeconds(40), "inventory.inventory.operator_test");
DmtClient.insertTestDataToDatabase(portForwardHost, portForwardPort, messagesToDatabase);
DmtClient.waitForFilledRedis(portForwardHost, portForwardPort, Duration.ofSeconds(60), "inventory.inventory.operator_test");
await().atMost(Duration.ofMinutes(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> DmtClient.digStreamedData(portForwardHost, portForwardPort, 10) == 10);
.until(() -> DmtClient.digStreamedData(portForwardHost, portForwardPort, expectedMessages) == expectedMessages);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

@AfterAll
void resetNamespace() {
NamespaceHolder.INSTANCE.resetNamespace();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public String getCurrentNamespace() {
return currentNamespace;
}

public void resetNamespace() {
this.currentNamespace = null;
}

public DmtResource getNamespacedDmt() {
return namespacedDmt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -47,8 +51,30 @@ private static OkHttpClient defaultClient() {
.build();
}

public static String readRedisOffsets(String host, int port) {
return readRedisOffsets(host, port, "metadata:debezium:offsets");
}

public static String readRedisOffsets(String host, int port, String key) {
AtomicReference<String> offset = new AtomicReference<>();
Map<String, String> params = Map.of("hashKey", key);
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> {
try (Response response = DmtClient.sendGetRequest(host, port, "/Redis/readHash", params)) {
offset.set(response.body().string());
return response.isSuccessful();
}
catch (Exception e) {
return false;
}
});
return offset.get();
}

public static void resetRedis(String host, int port) {
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT)).pollInterval(Duration.ofMillis(200))
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> {
try (Response response = DmtClient.sendGetRequest(host, port, "/Redis/reset")) {
return response.isSuccessful();
Expand All @@ -60,7 +86,8 @@ public static void resetRedis(String host, int port) {
}

public static void resetMysql(String host, int port) {
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT)).pollInterval(Duration.ofMillis(200))
await().atMost(Duration.ofSeconds(HTTP_POLL_TIMEOUT))
.pollInterval(Duration.ofMillis(HTTP_POLL_INTERVAL))
.until(() -> {
try (Response response = DmtClient.sendGetRequest(host, port, "/Main/ResetDatabase")) {
return response.isSuccessful();
Expand Down Expand Up @@ -92,17 +119,24 @@ public static void waitForFilledRedis(String host, int port, Duration atMost, St
}

public static int digStreamedData(String host, int port, int number) {
String jsonRespo = readRedisChannel(host, port, "inventory.inventory.operator_test", number);
final String CHANNEL = "inventory.inventory.operator_test";
JSONParser parser = new JSONParser();
String jsonRespo = readRedisChannel(host, port, CHANNEL, number);

if (Objects.isNull(jsonRespo)) {
return 0;
}
int count = 0;
for (int i = 0; i < number; i++) {
if (jsonRespo.contains("name" + i)) {
count++;
}

try {
JSONArray response = (JSONArray) parser.parse(jsonRespo);
JSONObject topic = (JSONObject) response.get(0);
JSONArray responses = (JSONArray) topic.get(CHANNEL);
return responses.size();
}
catch (ParseException e) {
LOGGER.error("Cannot parse JSON response from DMT: {}", e.getMessage());
return 0;
}
return count;
}

public static String readRedisChannel(String host, int port, String channel, int limit) {
Expand Down Expand Up @@ -242,4 +276,24 @@ public static Response sendGetRequest(String host, int port, String command) thr
Call call = client.newCall(request);
return call.execute();
}

public static Response sendGetRequest(String host, int port, String command, Map<String, String> params) throws IOException {
OkHttpClient client = defaultClient();

HttpUrl.Builder builder = Objects.requireNonNull(HttpUrl.parse("http://" + host + ":" + port + command))
.newBuilder();

if (!Objects.isNull(params)) {
for (Map.Entry<String, String> entry : params.entrySet()) {
builder = builder.addQueryParameter(entry.getKey(), entry.getValue());
}
}

Request request = new Request.Builder()
.url(builder.build())
.build();

Call call = client.newCall(request);
return call.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,29 @@
*/
package io.debezium.operator.systemtests.resources.server;

import static io.debezium.operator.systemtests.ConfigProperties.FABRIC8_POLL_INTERVAL;
import static io.debezium.operator.systemtests.ConfigProperties.FABRIC8_POLL_TIMEOUT;
import static org.awaitility.Awaitility.await;

import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.operator.api.model.DebeziumServer;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.internal.HasMetadataOperationsImpl;
import io.skodjob.testframe.interfaces.ResourceType;
import io.skodjob.testframe.resources.DeploymentType;
import io.skodjob.testframe.resources.KubeResourceManager;

public class DebeziumServerResource implements ResourceType<DebeziumServer> {

private final MixedOperation<DebeziumServer, DebeziumServerList, Resource<DebeziumServer>> client;
private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());

public DebeziumServerResource() {
this.client = KubeResourceManager.getKubeClient().getClient().resources(DebeziumServer.class, DebeziumServerList.class);
Expand Down Expand Up @@ -65,11 +72,22 @@ public void replace(String s, Consumer<DebeziumServer> editor) {

@Override
public boolean waitForReadiness(DebeziumServer debeziumServer) {
new DeploymentType().getClient()
.inNamespace(debeziumServer.getMetadata().getNamespace())
.withName(debeziumServer.getMetadata().getName()).waitUntilReady(1, TimeUnit.MINUTES);

return client.resource(debeziumServer).isReady();
await().atMost(Duration.ofSeconds(FABRIC8_POLL_TIMEOUT)).pollInterval(Duration.ofSeconds(FABRIC8_POLL_INTERVAL))
.until(() -> {
DebeziumServer dbzServer = client.inNamespace(debeziumServer.getMetadata().getNamespace())
.withName(debeziumServer.getMetadata().getName()).get();

boolean ready = dbzServer.getStatus().getConditions().stream()
.anyMatch(condition -> condition.getType().equals("Ready") && condition.getStatus().equals("True"));
if (ready) {
return true;
}
else {
logger.info("Waiting for readiness of Debezium Server...");
return false;
}
});
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public void configureAsDefault(String namespace) {
}
}

public static String getDefaultRedisAddress() {
return "redis-service:6379";
}

@Override
public void deploy() {
KubeResourceManager.getInstance().createResourceWithoutWait(configMap, service, persistentVolumeClaim);
Expand Down