diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java index b3458ba1892..a0817f9f5af 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java @@ -275,7 +275,14 @@ public KafkaBrokerConfigurationBuilder withListeners( // Control Plane listener is set for pure KRaft controller or combined node, and broker in ZooKeeper mode or in migration state but not when full KRaft. if (node.controller() || (node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration())) { - listeners.add(CONTROL_PLANE_LISTENER_NAME + "://0.0.0.0:9090"); + if (!node.broker()) { + // If it is a pure controller, the listener needs to be set with hostnames that Admin Client can bootstrap + listeners.add(String.format("%s://%s:9090", CONTROL_PLANE_LISTENER_NAME, + DnsNameGenerator.podDnsNameWithoutClusterDomain(namespace, KafkaResources.brokersServiceName(clusterName), node.podName()) + )); + } else { + listeners.add(CONTROL_PLANE_LISTENER_NAME + "://0.0.0.0:9090"); + } // Control Plane listener to be advertised only with broker in ZooKeeper-based or migration but NOT when full KRaft only or mixed if (node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration()) { diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java index 44a9b0800dd..b225e9edf3e 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java @@ -120,7 +120,10 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp protected static final String REPLICATION_PORT_NAME = "tcp-replication"; protected static final int KAFKA_AGENT_PORT = 8443; protected static final String KAFKA_AGENT_PORT_NAME = "tcp-kafkaagent"; - protected static final int CONTROLPLANE_PORT = 9090; + /** + * Port number used for control plane + */ + public static final int CONTROLPLANE_PORT = 9090; protected static final String CONTROLPLANE_PORT_NAME = "tcp-ctrlplane"; // port name is up to 15 characters /** diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java index defa005c68b..2da0144683f 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java @@ -36,10 +36,8 @@ import org.apache.kafka.clients.admin.AlterConfigsResult; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.DescribeClusterResult; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.SslAuthenticationException; @@ -112,6 +110,7 @@ public class KafkaRoller { private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaRoller.class); private static final String CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME = "controller.quorum.fetch.timeout.ms"; private static final String CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT = "2000"; + private static final KafkaVersion VERSION_SUPPORTS_TALKING_TO_CONTROLLERS = new KafkaVersion("3.7.0", "", "", "", "", false, false, ""); private final PodOperator podOperations; private final long pollingIntervalMs; @@ -207,7 +206,7 @@ protected Future pod(Integer podId) { private boolean maybeInitBrokerAdminClient() { if (this.brokerAdminClient == null) { try { - this.brokerAdminClient = adminClient(nodes.stream().filter(NodeRef::broker).collect(Collectors.toSet()), false); + this.brokerAdminClient = brokerAdminClient(nodes.stream().filter(NodeRef::broker).collect(Collectors.toSet())); } catch (ForceableProblem | FatalProblem e) { LOGGER.warnCr(reconciliation, "Failed to create brokerAdminClient.", e); return false; @@ -222,14 +221,16 @@ private boolean maybeInitBrokerAdminClient() { */ private boolean maybeInitControllerAdminClient() { if (this.controllerAdminClient == null) { + // Prior to KIP-919 implementation added to version 3.7.0, Kafka did not support + // using Kafka Admin API with controller nodes when running in KRaft mode. + // Therefore, use brokers to initialise adminClient for quorum health check + // when the version is older than 3.7.0. try { - // TODO: Currently, when running in KRaft mode Kafka does not support using Kafka Admin API with controller - // nodes. This is tracked in https://github.com/strimzi/strimzi-kafka-operator/issues/9692. - // Therefore use broker nodes of the cluster to initialise adminClient for quorum health check. - // Once Kafka Admin API is supported for controllers, nodes.stream().filter(NodeRef:controller) - // can be used here. Until then pass an empty set of nodes so the client is initialized with - // the brokers service. - this.controllerAdminClient = adminClient(Set.of(), false); + if (kafkaVersion.compareTo(VERSION_SUPPORTS_TALKING_TO_CONTROLLERS) <= 0) { + this.controllerAdminClient = controllerAdminClient(nodes.stream().filter(NodeRef::controller).collect(Collectors.toSet())); + } else { + this.controllerAdminClient = brokerAdminClient(Set.of()); + } } catch (ForceableProblem | FatalProblem e) { LOGGER.warnCr(reconciliation, "Failed to create controllerAdminClient.", e); return false; @@ -616,35 +617,13 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont KafkaBrokerLoggingConfigurationDiff brokerLoggingDiff = null; boolean needsReconfig = false; - if (isController) { - if (maybeInitControllerAdminClient()) { - String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT; - String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId()); - - if (desiredConfig != null) { - OrderedProperties orderedProperties = new OrderedProperties(); - controllerQuorumFetchTimeout = orderedProperties.addStringPairs(desiredConfig).asMap().getOrDefault(CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME, CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT); - } - - restartContext.quorumCheck = quorumCheck(controllerAdminClient, Long.parseLong(controllerQuorumFetchTimeout)); - } else { - //TODO When https://github.com/strimzi/strimzi-kafka-operator/issues/9692 is complete - // we should change this logic to immediately restart this pod because we cannot connect to it. - if (isBroker) { - // If it is a combined node (controller and broker) and the admin client cannot be initialised, - // restart this pod. There is no reason to continue as we won't be able to - // connect an admin client to this pod for other checks later. - LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the brokers do not seem to responding to connection attempts. " + - "Restarting pod because it is a combined node so it is one of the brokers that is not responding.", nodeRef); - reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE); - markRestartContextWithForceRestart(restartContext); - return; - } else { - // If it is a controller only node throw an UnforceableProblem, so we try again until the backOff - // is finished, then it will move on to the next controller and eventually the brokers. - throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts"); - } + // if it is a pure controller, initialise the admin client specifically for controllers + if (isController && !isBroker) { + if (!maybeInitControllerAdminClient()) { + handleFailedAdminClientForController(nodeRef, restartContext, reasonToRestartPod); + return; } + restartContext.quorumCheck = quorumCheck(controllerAdminClient, nodeRef); } if (isBroker) { @@ -655,6 +634,11 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont return; } + // If it is a mixed node, initialise quorum check with the broker admin client + if (isController) { + restartContext.quorumCheck = quorumCheck(brokerAdminClient, nodeRef); + } + // Always get the broker config. This request gets sent to that specific broker, so it's a proof that we can // connect to the broker and that it's capable of responding. Config brokerConfig; @@ -700,6 +684,23 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont restartContext.brokerLoggingDiff = brokerLoggingDiff; } + private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContext restartContext, RestartReasons reasonToRestartPod) throws UnforceableProblem { + if (kafkaVersion.compareTo(VERSION_SUPPORTS_TALKING_TO_CONTROLLERS) <= 0) { + // If the version supports talking to controllers, force restart this pod when the admin client cannot be initialised. + // There is no reason to continue as we won't be able to connect an admin client to this pod for other checks later. + LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the brokers do not seem to responding to connection attempts. " + + "Restarting pod because it is a combined node so it is one of the brokers that is not responding.", nodeRef); + reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE); + markRestartContextWithForceRestart(restartContext); + } else { + // If it is a pure controller node and does not support talking to controllers, + // it means that the admin client connection cannot be initialised against the brokers. + // Throw an UnforceableProblem so that broker nodes can be checked next + // and potentially resolve the connection issue. + throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts"); + } + } + /** * Returns a config of the given broker. * @param nodeRef The reference of the broker. @@ -914,10 +915,10 @@ protected Future restart(Pod pod, RestartContext restartContext) { * Returns an AdminClient instance bootstrapped from the given nodes. If nodes is an * empty set, use the brokers service to bootstrap the client. */ - /* test */ Admin adminClient(Set nodes, boolean ceShouldBeFatal) throws ForceableProblem, FatalProblem { - // If no nodes are passed initialize the admin client using the brokers service - // TODO when https://github.com/strimzi/strimzi-kafka-operator/issues/9692 is completed review whether - // this function can be reverted to expect nodes to be non empty + /* test */ Admin brokerAdminClient(Set nodes) throws ForceableProblem, FatalProblem { + // If no nodes are passed, initialize the admin client using the bootstrap service + // This is still needed for version older than 3.7.0, so that when only controller nodes being rolled, + // it can use brokers to get quorum information via AdminClient. String bootstrapHostnames; if (nodes.isEmpty()) { bootstrapHostnames = String.format("%s:%s", DnsNameGenerator.of(namespace, KafkaResources.bootstrapServiceName(cluster)).serviceDnsName(), KafkaCluster.REPLICATION_PORT); @@ -928,20 +929,34 @@ protected Future restart(Pod pod, RestartContext restartContext) { try { LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames); return adminClientProvider.createAdminClient(bootstrapHostnames, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity()); - } catch (KafkaException e) { - if (ceShouldBeFatal && (e instanceof ConfigException - || e.getCause() instanceof ConfigException)) { - throw new FatalProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e); - } else { - throw new ForceableProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e); - } } catch (RuntimeException e) { throw new ForceableProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e); } } - /* test */ KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) { - return new KafkaQuorumCheck(reconciliation, ac, vertx, controllerQuorumFetchTimeoutMs); + /** + * Returns an AdminClient instance bootstrapped from the given controller nodes. + */ + /* test */ Admin controllerAdminClient(Set nodes) throws ForceableProblem, FatalProblem { + String bootstrapHostnames = nodes.stream().map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.CONTROLPLANE_PORT).collect(Collectors.joining(",")); + + try { + LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames); + return adminClientProvider.createControllerAdminClient(bootstrapHostnames, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity()); + } catch (RuntimeException e) { + throw new ForceableProblem("An error while try to create an admin client with bootstrap controllers " + bootstrapHostnames, e); + } + } + + /* test */ KafkaQuorumCheck quorumCheck(Admin ac, NodeRef nodeRef) { + String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT; + String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId()); + + if (desiredConfig != null) { + OrderedProperties orderedProperties = new OrderedProperties(); + controllerQuorumFetchTimeout = orderedProperties.addStringPairs(desiredConfig).asMap().getOrDefault(CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME, CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT); + } + return new KafkaQuorumCheck(reconciliation, ac, vertx, Long.parseLong(controllerQuorumFetchTimeout)); } /* test */ KafkaAvailability availability(Admin ac) { @@ -984,7 +999,7 @@ int controller(NodeRef nodeRef, long timeout, TimeUnit unit, RestartContext rest // This is tracked in https://github.com/strimzi/strimzi-kafka-operator/issues/9373. // Use admin client connected directly to this broker here, then any exception or timeout trying to connect to // the current node will be caught and handled from this method, rather than appearing elsewhere. - try (Admin ac = adminClient(Set.of(nodeRef), false)) { + try (Admin ac = brokerAdminClient(Set.of(nodeRef))) { Node controllerNode = null; try { diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java index 7782184ad8c..c3644fbf097 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java @@ -587,10 +587,20 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties()); } + @Override + public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) { + return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties()); + } + @Override public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { return mockAdminClient; } + + @Override + public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { + return mockAdminClient; + } }; } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java index 265929b7b00..fb571a06309 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java @@ -882,7 +882,7 @@ public void testKraftListenersBrokerAndControllerNodes() { "listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", "listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}", "listener.name.controlplane-9090.ssl.truststore.type=PKCS12", - "listeners=CONTROLPLANE-9090://0.0.0.0:9090", + "listeners=CONTROLPLANE-9090://my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc:9090", "listener.security.protocol.map=CONTROLPLANE-9090:SSL", "sasl.enabled.mechanisms=", "ssl.endpoint.identification.algorithm=HTTPS")); @@ -975,7 +975,7 @@ public void testKraftOauthBrokerControllerAndMixedNodes() { "listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12", "listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}", "listener.name.controlplane-9090.ssl.truststore.type=PKCS12", - "listeners=CONTROLPLANE-9090://0.0.0.0:9090", + "listeners=CONTROLPLANE-9090://my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc:9090", "listener.security.protocol.map=CONTROLPLANE-9090:SSL", "sasl.enabled.mechanisms=", "ssl.endpoint.identification.algorithm=HTTPS", @@ -2396,7 +2396,7 @@ public void testListenersOnMigration() { } assertThat(configuration, containsString("listener.name.controlplane-9090")); - assertThat(configuration, containsString("listeners=CONTROLPLANE-9090://0.0.0.0:9090")); + assertThat(configuration, containsString("listeners=CONTROLPLANE-9090://my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc:9090")); // controllers never advertises listeners assertThat(configuration, not(containsString("advertised.listeners"))); } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterMigrationTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterMigrationTest.java index 77f15b2852a..1bb2f0d7edb 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterMigrationTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterMigrationTest.java @@ -229,7 +229,7 @@ public void testControllerNodeConfigurationOnMigration() { } assertThat(configuration, containsString("listener.name.controlplane-9090")); - assertThat(configuration, containsString("listeners=CONTROLPLANE-9090://0.0.0.0:9090")); + assertThat(configuration, containsString("listeners=CONTROLPLANE-9090://my-cluster-controllers-3.my-cluster-kafka-brokers.my-namespace.svc:9090")); // controllers never advertises listeners assertThat(configuration, not(containsString("advertised.listeners"))); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java index 7ca5fddda66..6df46a1a02f 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java @@ -930,9 +930,33 @@ KafkaAgentClient initKafkaAgentClient() { } @Override - protected Admin adminClient(Set nodes, boolean b) throws ForceableProblem, FatalProblem { + protected Admin brokerAdminClient(Set nodes) throws ForceableProblem, FatalProblem { if (delegateAdminClientCall) { - return super.adminClient(nodes, b); + return super.brokerAdminClient(nodes); + } + RuntimeException exception = acOpenException.apply(nodes); + if (exception != null) { + throw new ForceableProblem("An error while try to create the admin client", exception); + } + Admin ac = mock(AdminClient.class, invocation -> { + if ("close".equals(invocation.getMethod().getName())) { + Admin mock = (Admin) invocation.getMock(); + unclosedAdminClients.remove(mock); + if (acCloseException != null) { + throw acCloseException; + } + return null; + } + throw new RuntimeException("Not mocked " + invocation.getMethod()); + }); + unclosedAdminClients.put(ac, new Throwable("Pod " + nodes)); + return ac; + } + + @Override + protected Admin controllerAdminClient(Set nodes) throws ForceableProblem, FatalProblem { + if (delegateAdminClientCall) { + return super.controllerAdminClient(nodes); } RuntimeException exception = acOpenException.apply(nodes); if (exception != null) { @@ -974,7 +998,7 @@ Future canRoll(int podId) { } @Override - protected KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) { + protected KafkaQuorumCheck quorumCheck(Admin ac, NodeRef nodeRef) { Admin admin = mock(Admin.class); DescribeMetadataQuorumResult qrmResult = mock(DescribeMetadataQuorumResult.class); when(admin.describeMetadataQuorum()).thenReturn(qrmResult); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/events/KubernetesRestartEventsMockTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/events/KubernetesRestartEventsMockTest.java index 909885d9fc9..4cfddbf27a4 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/events/KubernetesRestartEventsMockTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/events/KubernetesRestartEventsMockTest.java @@ -601,10 +601,20 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru return adminClientSupplier.get(); } + @Override + public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) { + return adminClientSupplier.get(); + } + @Override public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { return adminClientSupplier.get(); } + + @Override + public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { + return adminClientSupplier.get(); + } }; return new ResourceOperatorSupplier(vertx, diff --git a/operator-common/src/main/java/io/strimzi/operator/common/AdminClientProvider.java b/operator-common/src/main/java/io/strimzi/operator/common/AdminClientProvider.java index 6765304f094..ebc879776bc 100644 --- a/operator-common/src/main/java/io/strimzi/operator/common/AdminClientProvider.java +++ b/operator-common/src/main/java/io/strimzi/operator/common/AdminClientProvider.java @@ -16,7 +16,7 @@ public interface AdminClientProvider { /** - * Create a Kafka Admin interface instance + * Create a Kafka Admin interface instance for brokers * * @param bootstrapHostnames Kafka hostname to connect to for administration operations * @param kafkaCaTrustSet Trust set for connecting to Kafka @@ -26,7 +26,17 @@ public interface AdminClientProvider { Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity); /** - * Create a Kafka Admin interface instance + * Create a Kafka Admin interface instance for controllers + * + * @param controllerBootstrapHostnames Kafka controller hostname to connect to for administration operations + * @param kafkaCaTrustSet Trust set for connecting to Kafka + * @param authIdentity Identity for TLS client authentication for connecting to Kafka + * @return Instance of Kafka Admin interface + */ + Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity); + + /** + * Create a Kafka Admin interface instance for brokers * * @param bootstrapHostnames Kafka hostname to connect to for administration operations * @param kafkaCaTrustSet Trust set for connecting to Kafka @@ -36,4 +46,16 @@ public interface AdminClientProvider { * @return Instance of Kafka Admin interface */ Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config); + + /** + * Create a Kafka Admin interface instance for controllers + * + * @param controllerBootstrapHostnames Kafka hostname to connect to for administration operations + * @param kafkaCaTrustSet Trust set for connecting to Kafka + * @param authIdentity Identity for TLS client authentication for connecting to Kafka + * @param config Additional configuration for the Kafka Admin Client + * + * @return Instance of Kafka Admin interface + */ + Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config); } diff --git a/operator-common/src/main/java/io/strimzi/operator/common/DefaultAdminClientProvider.java b/operator-common/src/main/java/io/strimzi/operator/common/DefaultAdminClientProvider.java index bf9fd43a8b9..9ece616fffc 100644 --- a/operator-common/src/main/java/io/strimzi/operator/common/DefaultAdminClientProvider.java +++ b/operator-common/src/main/java/io/strimzi/operator/common/DefaultAdminClientProvider.java @@ -21,6 +21,11 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties()); } + @Override + public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) { + return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties()); + } + /** * Create a Kafka Admin interface instance handling the following different scenarios: * @@ -44,26 +49,30 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru */ @Override public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { - return Admin.create(adminClientConfiguration(bootstrapHostnames, kafkaCaTrustSet, authIdentity, config)); + config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapHostnames); + return Admin.create(adminClientConfiguration(kafkaCaTrustSet, authIdentity, config)); + } + + @Override + public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { + config.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, controllerBootstrapHostnames); + return Admin.create(adminClientConfiguration(kafkaCaTrustSet, authIdentity, config)); } /** * Utility method for preparing the Admin client configuration * - * @param bootstrapHostnames Kafka bootstrap address * @param kafkaCaTrustSet Trust set for connecting to Kafka * @param authIdentity Identity for TLS client authentication for connecting to Kafka * @param config Custom Admin client configuration or empty properties instance * * @return Admin client configuration */ - /* test */ static Properties adminClientConfiguration(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { + /* test */ static Properties adminClientConfiguration(PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) { if (config == null) { throw new InvalidConfigurationException("The config parameter should not be null"); } - config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapHostnames); - // configuring TLS encryption if requested if (kafkaCaTrustSet != null) { config.putIfAbsent(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SSL"); diff --git a/operator-common/src/test/java/io/strimzi/operator/common/DefaultAdminClientProviderTest.java b/operator-common/src/test/java/io/strimzi/operator/common/DefaultAdminClientProviderTest.java index 8ce181d9d85..51e1606c0c9 100644 --- a/operator-common/src/test/java/io/strimzi/operator/common/DefaultAdminClientProviderTest.java +++ b/operator-common/src/test/java/io/strimzi/operator/common/DefaultAdminClientProviderTest.java @@ -26,7 +26,6 @@ public class DefaultAdminClientProviderTest { private static final String USER_KEY = "user-key"; private void assertDefaultConfigs(Properties config) { - assertThat(config.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), is("my-kafka:9092")); assertThat(config.get(AdminClientConfig.METADATA_MAX_AGE_CONFIG), is("30000")); assertThat(config.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG), is("10000")); assertThat(config.get(AdminClientConfig.RETRIES_CONFIG), is("3")); @@ -35,9 +34,9 @@ private void assertDefaultConfigs(Properties config) { @Test public void testPlainConnection() { - Properties config = DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", null, null, new Properties()); + Properties config = DefaultAdminClientProvider.adminClientConfiguration(null, null, new Properties()); - assertThat(config.size(), is(5)); + assertThat(config.size(), is(4)); assertDefaultConfigs(config); } @@ -47,10 +46,9 @@ public void testCustomConfig() { customConfig.setProperty(AdminClientConfig.RETRIES_CONFIG, "5"); // Override a value we have default for customConfig.setProperty(AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "13000"); // Override a value we do not use - Properties config = DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", null, null, customConfig); + Properties config = DefaultAdminClientProvider.adminClientConfiguration(null, null, customConfig); - assertThat(config.size(), is(6)); - assertThat(config.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), is("my-kafka:9092")); + assertThat(config.size(), is(5)); assertThat(config.get(AdminClientConfig.METADATA_MAX_AGE_CONFIG), is("30000")); assertThat(config.get(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG), is("10000")); assertThat(config.get(AdminClientConfig.RETRIES_CONFIG), is("5")); @@ -60,9 +58,9 @@ public void testCustomConfig() { @Test public void testTlsConnection() { - Properties config = DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", mockPemTrustSet(), null, new Properties()); + Properties config = DefaultAdminClientProvider.adminClientConfiguration(mockPemTrustSet(), null, new Properties()); - assertThat(config.size(), is(8)); + assertThat(config.size(), is(7)); assertDefaultConfigs(config); assertThat(config.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG), is("SSL")); assertThat(config.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), is("PEM")); @@ -72,9 +70,9 @@ public void testTlsConnection() { @Test public void testMTlsConnection() { - Properties config = DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", mockPemTrustSet(), mockPemAuthIdentity(), new Properties()); + Properties config = DefaultAdminClientProvider.adminClientConfiguration(mockPemTrustSet(), mockPemAuthIdentity(), new Properties()); - assertThat(config.size(), is(11)); + assertThat(config.size(), is(10)); assertDefaultConfigs(config); assertThat(config.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG), is("SSL")); assertThat(config.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), is("PEM")); @@ -87,9 +85,9 @@ public void testMTlsConnection() { @Test public void testMTlsWithPublicCAConnection() { - Properties config = DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", null, mockPemAuthIdentity(), new Properties()); + Properties config = DefaultAdminClientProvider.adminClientConfiguration(null, mockPemAuthIdentity(), new Properties()); - assertThat(config.size(), is(9)); + assertThat(config.size(), is(8)); assertDefaultConfigs(config); assertThat(config.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG), is("SSL")); assertThat(config.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG).toString(), is("PEM")); @@ -99,7 +97,7 @@ public void testMTlsWithPublicCAConnection() { @Test public void testNullConfig() { - InvalidConfigurationException ex = assertThrows(InvalidConfigurationException.class, () -> DefaultAdminClientProvider.adminClientConfiguration("my-kafka:9092", null, mockPemAuthIdentity(), null)); + InvalidConfigurationException ex = assertThrows(InvalidConfigurationException.class, () -> DefaultAdminClientProvider.adminClientConfiguration(null, mockPemAuthIdentity(), null)); assertThat(ex.getMessage(), is("The config parameter should not be null")); }