Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[system test] Fix downgrade procedure #11154

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,66 +110,73 @@ public class AbstractKRaftUpgradeST extends AbstractST {
protected final LabelSelector coSelector = new LabelSelectorBuilder().withMatchLabels(Map.of(Labels.STRIMZI_KIND_LABEL, "cluster-operator")).build();
protected final LabelSelector connectLabelSelector = KafkaConnectResource.getLabelSelector(CLUSTER_NAME, KafkaConnectResources.componentName(CLUSTER_NAME));


protected void makeComponentsSnapshots(String componentsNamespaceName) {
eoPods = DeploymentUtils.depSnapshot(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(CLUSTER_NAME));
controllerPods = PodUtils.podSnapshot(componentsNamespaceName, controllerSelector);
brokerPods = PodUtils.podSnapshot(componentsNamespaceName, brokerSelector);
connectPods = PodUtils.podSnapshot(componentsNamespaceName, connectLabelSelector);
}

/**
* Performs the Kafka Connect and Kafka Connector upgrade/downgrade procedure.
* It upgrades the Cluster Operator, Kafka Connect, and Kafka Connector while verifying each step.
*
* @param clusterOperatorNamespaceName Namespace of the Cluster Operator
* @param testStorage Test-related configuration and storage
* @param upgradeDowngradeData Bundle version modification data
* @param upgradeKafkaVersion Kafka version details
* @throws IOException if any I/O error occurs during the procedure
*/
protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(
final String clusterOperatorNamespaceName,
final TestStorage testStorage,
final BundleVersionModificationData upgradeDowngradeData,
final UpgradeKafkaVersion upgradeKafkaVersion
) throws IOException {
final UpgradeKafkaVersion upgradeKafkaVersion) throws IOException {
// 1. Setup Cluster Operator with KafkaConnect and KafkaConnector
setupEnvAndUpgradeClusterOperator(clusterOperatorNamespaceName, testStorage, upgradeDowngradeData, upgradeKafkaVersion);
deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(testStorage, upgradeDowngradeData, upgradeKafkaVersion);

final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(CLUSTER_NAME))
.withNamespaceName(testStorage.getNamespaceName())
.withUsername(USER_NAME)
.build();

resourceManager.createResourceWithWait(clients.producerTlsStrimzi(CLUSTER_NAME));
// Verify that Producer finish successfully
ClientUtils.waitForInstantProducerClientSuccess(testStorage);
// 2. Send messages
produceMessagesAndVerify(testStorage);

// 3. Make snapshots
makeComponentsSnapshots(testStorage.getNamespaceName());
logComponentsPodImagesWithConnect(testStorage.getNamespaceName());

// Verify FileSink KafkaConnector before upgrade
String connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName();
KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount());
// 4. Verify KafkaConnector FileSink
verifyKafkaConnectorFileSink(testStorage);

// Upgrade CO to HEAD and wait for readiness of ClusterOperator
// 5. Upgrade CO to HEAD and wait for readiness of ClusterOperator
changeClusterOperator(clusterOperatorNamespaceName, testStorage.getNamespaceName(), upgradeDowngradeData);

if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeKafkaVersion.getVersion())) {
// Verify that Kafka and Connect Pods Rolled
waitForKafkaClusterRollingUpdate(testStorage.getNamespaceName());
connectPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), connectLabelSelector, 1, connectPods);
KafkaConnectorUtils.waitForConnectorReady(testStorage.getNamespaceName(), CLUSTER_NAME);
}

// 6. Wait for components to roll
maybeWaitForRollingUpdate(testStorage, upgradeKafkaVersion);
logComponentsPodImagesWithConnect(testStorage.getNamespaceName());

// Upgrade/Downgrade kafka
changeKafkaVersion(testStorage.getNamespaceName(), upgradeDowngradeData);
changeKafkaVersionInKafkaConnect(testStorage.getNamespaceName(), upgradeDowngradeData);

logComponentsPodImagesWithConnect(testStorage.getNamespaceName());
checkAllComponentsImages(testStorage.getNamespaceName(), upgradeDowngradeData);
verifyPostUpgradeOrDowngradeProcedure(testStorage, upgradeDowngradeData);
}

/**
* Verifies the environment after an upgrade or downgrade procedure
* by sending new messages, checking connector output, stability, and final state.
*
* @param testStorage Test-related configuration and storage
* @param upgradeDowngradeData Bundle version modification data
*/
private void verifyPostUpgradeOrDowngradeProcedure(final TestStorage testStorage,
final BundleVersionModificationData upgradeDowngradeData) {
final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(CLUSTER_NAME))
.withNamespaceName(testStorage.getNamespaceName())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it was there before, but I think that we don't need to specify the Namespace here, as it's anyway specified in getInstantTlsClientBuilder

.withUsername(USER_NAME)
.build();
// send again new messages
resourceManager.createResourceWithWait(clients.producerTlsStrimzi(CLUSTER_NAME));

// Verify that Producer finish successfully
ClientUtils.waitForInstantProducerClientSuccess(testStorage.getNamespaceName(), testStorage);

// Verify FileSink KafkaConnector
connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName();
String connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName();
KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount());
Comment on lines +179 to 180
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you created it, I think we can use verifyKafkaConnectorFileSink here


// Verify that pods are stable
Expand All @@ -179,6 +186,60 @@ protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure(
verifyProcedure(testStorage.getNamespaceName(), upgradeDowngradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName());
}

/**
* Verifies that the Kafka Connector FileSink is receiving messages as expected.
*
* @param testStorage Test-related configuration and storage
*/
private void verifyKafkaConnectorFileSink(final TestStorage testStorage) {
String connectorPodName = kubeClient().listPods(
testStorage.getNamespaceName(),
Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)
).get(0).getMetadata().getName();

KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(
testStorage.getNamespaceName(),
connectorPodName,
DEFAULT_SINK_FILE_PATH,
testStorage.getMessageCount()
);
}

/**
* Waits for the Kafka cluster and Kafka Connect to roll if the target version is supported.
*
* @param testStorage Test-related configuration and storage
* @param upgradeKafkaVersion Kafka version details
*/
private void maybeWaitForRollingUpdate(final TestStorage testStorage,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are using just Namespace name here, so you don't need to pass whole TestStorage

final UpgradeKafkaVersion upgradeKafkaVersion) {
if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeKafkaVersion.getVersion())) {
waitForKafkaClusterRollingUpdate(testStorage.getNamespaceName());
connectPods = RollingUpdateUtils.waitTillComponentHasRolled(
testStorage.getNamespaceName(),
connectLabelSelector,
1,
connectPods
);
KafkaConnectorUtils.waitForConnectorReady(testStorage.getNamespaceName(), CLUSTER_NAME);
}
}

/**
* Produces messages and verifies they were successfully sent, using a TLS client.
*
* @param testStorage Test-related configuration and storage
*/
private void produceMessagesAndVerify(TestStorage testStorage) {
final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(CLUSTER_NAME))
.withNamespaceName(testStorage.getNamespaceName())
.withUsername(USER_NAME)
.build();

resourceManager.createResourceWithWait(clients.producerTlsStrimzi(CLUSTER_NAME));
ClientUtils.waitForInstantProducerClientSuccess(testStorage);
}

protected void setupEnvAndUpgradeClusterOperator(String clusterOperatorNamespaceName, TestStorage testStorage, BundleVersionModificationData upgradeData, UpgradeKafkaVersion upgradeKafkaVersion) throws IOException {
LOGGER.info("Test upgrade of Cluster Operator from version: {} to version: {}", upgradeData.getFromVersion(), upgradeData.getToVersion());

Expand Down