-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[system test] Fix downgrade procedure #11154
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,66 +110,73 @@ public class AbstractKRaftUpgradeST extends AbstractST { | |
protected final LabelSelector coSelector = new LabelSelectorBuilder().withMatchLabels(Map.of(Labels.STRIMZI_KIND_LABEL, "cluster-operator")).build(); | ||
protected final LabelSelector connectLabelSelector = KafkaConnectResource.getLabelSelector(CLUSTER_NAME, KafkaConnectResources.componentName(CLUSTER_NAME)); | ||
|
||
|
||
protected void makeComponentsSnapshots(String componentsNamespaceName) { | ||
eoPods = DeploymentUtils.depSnapshot(componentsNamespaceName, KafkaResources.entityOperatorDeploymentName(CLUSTER_NAME)); | ||
controllerPods = PodUtils.podSnapshot(componentsNamespaceName, controllerSelector); | ||
brokerPods = PodUtils.podSnapshot(componentsNamespaceName, brokerSelector); | ||
connectPods = PodUtils.podSnapshot(componentsNamespaceName, connectLabelSelector); | ||
} | ||
|
||
/** | ||
* Performs the Kafka Connect and Kafka Connector upgrade/downgrade procedure. | ||
* It upgrades the Cluster Operator, Kafka Connect, and Kafka Connector while verifying each step. | ||
* | ||
* @param clusterOperatorNamespaceName Namespace of the Cluster Operator | ||
* @param testStorage Test-related configuration and storage | ||
* @param upgradeDowngradeData Bundle version modification data | ||
* @param upgradeKafkaVersion Kafka version details | ||
* @throws IOException if any I/O error occurs during the procedure | ||
*/ | ||
protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure( | ||
final String clusterOperatorNamespaceName, | ||
final TestStorage testStorage, | ||
final BundleVersionModificationData upgradeDowngradeData, | ||
final UpgradeKafkaVersion upgradeKafkaVersion | ||
) throws IOException { | ||
final UpgradeKafkaVersion upgradeKafkaVersion) throws IOException { | ||
// 1. Setup Cluster Operator with KafkaConnect and KafkaConnector | ||
setupEnvAndUpgradeClusterOperator(clusterOperatorNamespaceName, testStorage, upgradeDowngradeData, upgradeKafkaVersion); | ||
deployKafkaConnectAndKafkaConnectorWithWaitForReadiness(testStorage, upgradeDowngradeData, upgradeKafkaVersion); | ||
|
||
final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(CLUSTER_NAME)) | ||
.withNamespaceName(testStorage.getNamespaceName()) | ||
.withUsername(USER_NAME) | ||
.build(); | ||
|
||
resourceManager.createResourceWithWait(clients.producerTlsStrimzi(CLUSTER_NAME)); | ||
// Verify that Producer finish successfully | ||
ClientUtils.waitForInstantProducerClientSuccess(testStorage); | ||
// 2. Send messages | ||
produceMessagesAndVerify(testStorage); | ||
|
||
// 3. Make snapshots | ||
makeComponentsSnapshots(testStorage.getNamespaceName()); | ||
logComponentsPodImagesWithConnect(testStorage.getNamespaceName()); | ||
|
||
// Verify FileSink KafkaConnector before upgrade | ||
String connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName(); | ||
KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount()); | ||
// 4. Verify KafkaConnector FileSink | ||
verifyKafkaConnectorFileSink(testStorage); | ||
|
||
// Upgrade CO to HEAD and wait for readiness of ClusterOperator | ||
// 5. Upgrade CO to HEAD and wait for readiness of ClusterOperator | ||
changeClusterOperator(clusterOperatorNamespaceName, testStorage.getNamespaceName(), upgradeDowngradeData); | ||
|
||
if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeKafkaVersion.getVersion())) { | ||
// Verify that Kafka and Connect Pods Rolled | ||
waitForKafkaClusterRollingUpdate(testStorage.getNamespaceName()); | ||
connectPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), connectLabelSelector, 1, connectPods); | ||
KafkaConnectorUtils.waitForConnectorReady(testStorage.getNamespaceName(), CLUSTER_NAME); | ||
} | ||
|
||
// 6. Wait for components to roll | ||
maybeWaitForRollingUpdate(testStorage, upgradeKafkaVersion); | ||
logComponentsPodImagesWithConnect(testStorage.getNamespaceName()); | ||
|
||
// Upgrade/Downgrade kafka | ||
changeKafkaVersion(testStorage.getNamespaceName(), upgradeDowngradeData); | ||
changeKafkaVersionInKafkaConnect(testStorage.getNamespaceName(), upgradeDowngradeData); | ||
|
||
logComponentsPodImagesWithConnect(testStorage.getNamespaceName()); | ||
checkAllComponentsImages(testStorage.getNamespaceName(), upgradeDowngradeData); | ||
verifyPostUpgradeOrDowngradeProcedure(testStorage, upgradeDowngradeData); | ||
} | ||
|
||
/** | ||
* Verifies the environment after an upgrade or downgrade procedure | ||
* by sending new messages, checking connector output, stability, and final state. | ||
* | ||
* @param testStorage Test-related configuration and storage | ||
* @param upgradeDowngradeData Bundle version modification data | ||
*/ | ||
private void verifyPostUpgradeOrDowngradeProcedure(final TestStorage testStorage, | ||
final BundleVersionModificationData upgradeDowngradeData) { | ||
final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(CLUSTER_NAME)) | ||
.withNamespaceName(testStorage.getNamespaceName()) | ||
.withUsername(USER_NAME) | ||
.build(); | ||
// send again new messages | ||
resourceManager.createResourceWithWait(clients.producerTlsStrimzi(CLUSTER_NAME)); | ||
|
||
// Verify that Producer finish successfully | ||
ClientUtils.waitForInstantProducerClientSuccess(testStorage.getNamespaceName(), testStorage); | ||
|
||
// Verify FileSink KafkaConnector | ||
connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName(); | ||
String connectorPodName = kubeClient().listPods(testStorage.getNamespaceName(), Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND)).get(0).getMetadata().getName(); | ||
KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink(testStorage.getNamespaceName(), connectorPodName, DEFAULT_SINK_FILE_PATH, testStorage.getMessageCount()); | ||
Comment on lines
+179
to
180
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When you created it, I think we can use |
||
|
||
// Verify that pods are stable | ||
|
@@ -179,6 +186,60 @@ protected void doKafkaConnectAndKafkaConnectorUpgradeOrDowngradeProcedure( | |
verifyProcedure(testStorage.getNamespaceName(), upgradeDowngradeData, testStorage.getContinuousProducerName(), testStorage.getContinuousConsumerName()); | ||
} | ||
|
||
/** | ||
* Verifies that the Kafka Connector FileSink is receiving messages as expected. | ||
* | ||
* @param testStorage Test-related configuration and storage | ||
*/ | ||
private void verifyKafkaConnectorFileSink(final TestStorage testStorage) { | ||
String connectorPodName = kubeClient().listPods( | ||
testStorage.getNamespaceName(), | ||
Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, KafkaConnect.RESOURCE_KIND) | ||
).get(0).getMetadata().getName(); | ||
|
||
KafkaConnectUtils.waitForMessagesInKafkaConnectFileSink( | ||
testStorage.getNamespaceName(), | ||
connectorPodName, | ||
DEFAULT_SINK_FILE_PATH, | ||
testStorage.getMessageCount() | ||
); | ||
} | ||
|
||
/** | ||
* Waits for the Kafka cluster and Kafka Connect to roll if the target version is supported. | ||
* | ||
* @param testStorage Test-related configuration and storage | ||
* @param upgradeKafkaVersion Kafka version details | ||
*/ | ||
private void maybeWaitForRollingUpdate(final TestStorage testStorage, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are using just Namespace name here, so you don't need to pass whole TestStorage |
||
final UpgradeKafkaVersion upgradeKafkaVersion) { | ||
if (TestKafkaVersion.supportedVersionsContainsVersion(upgradeKafkaVersion.getVersion())) { | ||
waitForKafkaClusterRollingUpdate(testStorage.getNamespaceName()); | ||
connectPods = RollingUpdateUtils.waitTillComponentHasRolled( | ||
testStorage.getNamespaceName(), | ||
connectLabelSelector, | ||
1, | ||
connectPods | ||
); | ||
KafkaConnectorUtils.waitForConnectorReady(testStorage.getNamespaceName(), CLUSTER_NAME); | ||
} | ||
} | ||
|
||
/** | ||
* Produces messages and verifies they were successfully sent, using a TLS client. | ||
* | ||
* @param testStorage Test-related configuration and storage | ||
*/ | ||
private void produceMessagesAndVerify(TestStorage testStorage) { | ||
final KafkaClients clients = ClientUtils.getInstantTlsClientBuilder(testStorage, KafkaResources.tlsBootstrapAddress(CLUSTER_NAME)) | ||
.withNamespaceName(testStorage.getNamespaceName()) | ||
.withUsername(USER_NAME) | ||
.build(); | ||
|
||
resourceManager.createResourceWithWait(clients.producerTlsStrimzi(CLUSTER_NAME)); | ||
ClientUtils.waitForInstantProducerClientSuccess(testStorage); | ||
} | ||
|
||
protected void setupEnvAndUpgradeClusterOperator(String clusterOperatorNamespaceName, TestStorage testStorage, BundleVersionModificationData upgradeData, UpgradeKafkaVersion upgradeKafkaVersion) throws IOException { | ||
LOGGER.info("Test upgrade of Cluster Operator from version: {} to version: {}", upgradeData.getFromVersion(), upgradeData.getToVersion()); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it was there before, but I think that we don't need to specify the Namespace here, as it's anyway specified in
getInstantTlsClientBuilder