Skip to content

Commit

Permalink
Allow KafkaRoller talk to controller directly
Browse files Browse the repository at this point in the history
Configure controller listener with pod DNS name so that AdminClient can bootstrap.
Add a method for creating admin client for controllers with BOOTSTRAP_CONTROLLERS_CONFIG set.
Make KafkaRoller create admin client against pure controllers nodes, if they are running 3.7.0 or later.
Tidy up: remove ceShouldBeFatal option as it's never set to true.

Signed-off-by: Gantigmaa Selenge <[email protected]>
  • Loading branch information
tinaselenge committed May 14, 2024
1 parent 3ef754c commit 0f6d51f
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,14 @@ public KafkaBrokerConfigurationBuilder withListeners(

// Control Plane listener is set for pure KRaft controller or combined node, and broker in ZooKeeper mode or in migration state but not when full KRaft.
if (node.controller() || (node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration())) {
listeners.add(CONTROL_PLANE_LISTENER_NAME + "://0.0.0.0:9090");
if (!node.broker()) {
// If it is a pure controller, the listener needs to be set with hostnames that Admin Client can bootstrap
listeners.add(String.format("%s://%s:9090", CONTROL_PLANE_LISTENER_NAME,
DnsNameGenerator.podDnsNameWithoutClusterDomain(namespace, KafkaResources.brokersServiceName(clusterName), node.podName())
));
} else {
listeners.add(CONTROL_PLANE_LISTENER_NAME + "://0.0.0.0:9090");
}

// Control Plane listener to be advertised only with broker in ZooKeeper-based or migration but NOT when full KRaft only or mixed
if (node.broker() && kafkaMetadataConfigState.isZooKeeperToMigration()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
protected static final String REPLICATION_PORT_NAME = "tcp-replication";
protected static final int KAFKA_AGENT_PORT = 8443;
protected static final String KAFKA_AGENT_PORT_NAME = "tcp-kafkaagent";
protected static final int CONTROLPLANE_PORT = 9090;
/**
* Port number used for control plane
*/
public static final int CONTROLPLANE_PORT = 9090;
protected static final String CONTROLPLANE_PORT_NAME = "tcp-ctrlplane"; // port name is up to 15 characters

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.SslAuthenticationException;

Expand Down Expand Up @@ -112,6 +110,7 @@ public class KafkaRoller {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaRoller.class);
private static final String CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME = "controller.quorum.fetch.timeout.ms";
private static final String CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT = "2000";
private static final KafkaVersion VERSION_SUPPORTS_TALKING_TO_CONTROLLERS = new KafkaVersion("3.7.0", "", "", "", "", false, false, "");

private final PodOperator podOperations;
private final long pollingIntervalMs;
Expand Down Expand Up @@ -207,7 +206,7 @@ protected Future<Pod> pod(Integer podId) {
private boolean maybeInitBrokerAdminClient() {
if (this.brokerAdminClient == null) {
try {
this.brokerAdminClient = adminClient(nodes.stream().filter(NodeRef::broker).collect(Collectors.toSet()), false);
this.brokerAdminClient = brokerAdminClient(nodes.stream().filter(NodeRef::broker).collect(Collectors.toSet()));
} catch (ForceableProblem | FatalProblem e) {
LOGGER.warnCr(reconciliation, "Failed to create brokerAdminClient.", e);
return false;
Expand All @@ -222,14 +221,16 @@ private boolean maybeInitBrokerAdminClient() {
*/
private boolean maybeInitControllerAdminClient() {
if (this.controllerAdminClient == null) {
// Prior to KIP-919 implementation added to version 3.7.0, Kafka did not support
// using Kafka Admin API with controller nodes when running in KRaft mode.
// Therefore, use brokers to initialise adminClient for quorum health check
// when the version is older than 3.7.0.
try {
// TODO: Currently, when running in KRaft mode Kafka does not support using Kafka Admin API with controller
// nodes. This is tracked in https://github.com/strimzi/strimzi-kafka-operator/issues/9692.
// Therefore use broker nodes of the cluster to initialise adminClient for quorum health check.
// Once Kafka Admin API is supported for controllers, nodes.stream().filter(NodeRef:controller)
// can be used here. Until then pass an empty set of nodes so the client is initialized with
// the brokers service.
this.controllerAdminClient = adminClient(Set.of(), false);
if (kafkaVersion.compareTo(VERSION_SUPPORTS_TALKING_TO_CONTROLLERS) <= 0) {
this.controllerAdminClient = controllerAdminClient(nodes.stream().filter(NodeRef::controller).collect(Collectors.toSet()));
} else {
this.controllerAdminClient = brokerAdminClient(Set.of());
}
} catch (ForceableProblem | FatalProblem e) {
LOGGER.warnCr(reconciliation, "Failed to create controllerAdminClient.", e);
return false;
Expand Down Expand Up @@ -616,35 +617,13 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
KafkaBrokerLoggingConfigurationDiff brokerLoggingDiff = null;
boolean needsReconfig = false;

if (isController) {
if (maybeInitControllerAdminClient()) {
String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT;
String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId());

if (desiredConfig != null) {
OrderedProperties orderedProperties = new OrderedProperties();
controllerQuorumFetchTimeout = orderedProperties.addStringPairs(desiredConfig).asMap().getOrDefault(CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME, CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT);
}

restartContext.quorumCheck = quorumCheck(controllerAdminClient, Long.parseLong(controllerQuorumFetchTimeout));
} else {
//TODO When https://github.com/strimzi/strimzi-kafka-operator/issues/9692 is complete
// we should change this logic to immediately restart this pod because we cannot connect to it.
if (isBroker) {
// If it is a combined node (controller and broker) and the admin client cannot be initialised,
// restart this pod. There is no reason to continue as we won't be able to
// connect an admin client to this pod for other checks later.
LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the brokers do not seem to responding to connection attempts. " +
"Restarting pod because it is a combined node so it is one of the brokers that is not responding.", nodeRef);
reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE);
markRestartContextWithForceRestart(restartContext);
return;
} else {
// If it is a controller only node throw an UnforceableProblem, so we try again until the backOff
// is finished, then it will move on to the next controller and eventually the brokers.
throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts");
}
// if it is a pure controller, initialise the admin client specifically for controllers
if (isController && !isBroker) {
if (!maybeInitControllerAdminClient()) {
handleFailedAdminClientForController(nodeRef, restartContext, reasonToRestartPod);
return;
}
restartContext.quorumCheck = quorumCheck(controllerAdminClient, nodeRef);
}

if (isBroker) {
Expand All @@ -655,6 +634,11 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
return;
}

// If it is a mixed node, initialise quorum check with the broker admin client
if (isController) {
restartContext.quorumCheck = quorumCheck(brokerAdminClient, nodeRef);
}

// Always get the broker config. This request gets sent to that specific broker, so it's a proof that we can
// connect to the broker and that it's capable of responding.
Config brokerConfig;
Expand Down Expand Up @@ -700,6 +684,23 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
restartContext.brokerLoggingDiff = brokerLoggingDiff;
}

private void handleFailedAdminClientForController(NodeRef nodeRef, RestartContext restartContext, RestartReasons reasonToRestartPod) throws UnforceableProblem {
if (kafkaVersion.compareTo(VERSION_SUPPORTS_TALKING_TO_CONTROLLERS) <= 0) {
// If the version supports talking to controllers, force restart this pod when the admin client cannot be initialised.
// There is no reason to continue as we won't be able to connect an admin client to this pod for other checks later.
LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the brokers do not seem to responding to connection attempts. " +
"Restarting pod because it is a combined node so it is one of the brokers that is not responding.", nodeRef);
reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE);
markRestartContextWithForceRestart(restartContext);
} else {
// If it is a pure controller node and does not support talking to controllers,
// it means that the admin client connection cannot be initialised against the brokers.
// Throw an UnforceableProblem so that broker nodes can be checked next
// and potentially resolve the connection issue.
throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts");
}
}

/**
* Returns a config of the given broker.
* @param nodeRef The reference of the broker.
Expand Down Expand Up @@ -914,10 +915,10 @@ protected Future<Void> restart(Pod pod, RestartContext restartContext) {
* Returns an AdminClient instance bootstrapped from the given nodes. If nodes is an
* empty set, use the brokers service to bootstrap the client.
*/
/* test */ Admin adminClient(Set<NodeRef> nodes, boolean ceShouldBeFatal) throws ForceableProblem, FatalProblem {
// If no nodes are passed initialize the admin client using the brokers service
// TODO when https://github.com/strimzi/strimzi-kafka-operator/issues/9692 is completed review whether
// this function can be reverted to expect nodes to be non empty
/* test */ Admin brokerAdminClient(Set<NodeRef> nodes) throws ForceableProblem, FatalProblem {
// If no nodes are passed, initialize the admin client using the bootstrap service
// This is still needed for version older than 3.7.0, so that when only controller nodes being rolled,
// it can use brokers to get quorum information via AdminClient.
String bootstrapHostnames;
if (nodes.isEmpty()) {
bootstrapHostnames = String.format("%s:%s", DnsNameGenerator.of(namespace, KafkaResources.bootstrapServiceName(cluster)).serviceDnsName(), KafkaCluster.REPLICATION_PORT);
Expand All @@ -928,20 +929,34 @@ protected Future<Void> restart(Pod pod, RestartContext restartContext) {
try {
LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames);
return adminClientProvider.createAdminClient(bootstrapHostnames, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity());
} catch (KafkaException e) {
if (ceShouldBeFatal && (e instanceof ConfigException
|| e.getCause() instanceof ConfigException)) {
throw new FatalProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e);
} else {
throw new ForceableProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e);
}
} catch (RuntimeException e) {
throw new ForceableProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e);
}
}

/* test */ KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) {
return new KafkaQuorumCheck(reconciliation, ac, vertx, controllerQuorumFetchTimeoutMs);
/**
* Returns an AdminClient instance bootstrapped from the given controller nodes.
*/
/* test */ Admin controllerAdminClient(Set<NodeRef> nodes) throws ForceableProblem, FatalProblem {
String bootstrapHostnames = nodes.stream().map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.CONTROLPLANE_PORT).collect(Collectors.joining(","));

try {
LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames);
return adminClientProvider.createControllerAdminClient(bootstrapHostnames, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity());
} catch (RuntimeException e) {
throw new ForceableProblem("An error while try to create an admin client with bootstrap controllers " + bootstrapHostnames, e);
}
}

/* test */ KafkaQuorumCheck quorumCheck(Admin ac, NodeRef nodeRef) {
String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT;
String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId());

if (desiredConfig != null) {
OrderedProperties orderedProperties = new OrderedProperties();
controllerQuorumFetchTimeout = orderedProperties.addStringPairs(desiredConfig).asMap().getOrDefault(CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME, CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT);
}
return new KafkaQuorumCheck(reconciliation, ac, vertx, Long.parseLong(controllerQuorumFetchTimeout));
}

/* test */ KafkaAvailability availability(Admin ac) {
Expand Down Expand Up @@ -984,7 +999,7 @@ int controller(NodeRef nodeRef, long timeout, TimeUnit unit, RestartContext rest
// This is tracked in https://github.com/strimzi/strimzi-kafka-operator/issues/9373.
// Use admin client connected directly to this broker here, then any exception or timeout trying to connect to
// the current node will be caught and handled from this method, rather than appearing elsewhere.
try (Admin ac = adminClient(Set.of(nodeRef), false)) {
try (Admin ac = brokerAdminClient(Set.of(nodeRef))) {
Node controllerNode = null;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,10 +587,20 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) {
return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

@Override
public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return mockAdminClient;
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return mockAdminClient;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ public void testKraftListenersBrokerAndControllerNodes() {
"listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12",
"listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}",
"listener.name.controlplane-9090.ssl.truststore.type=PKCS12",
"listeners=CONTROLPLANE-9090://0.0.0.0:9090",
"listeners=CONTROLPLANE-9090://my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc:9090",
"listener.security.protocol.map=CONTROLPLANE-9090:SSL",
"sasl.enabled.mechanisms=",
"ssl.endpoint.identification.algorithm=HTTPS"));
Expand Down Expand Up @@ -975,7 +975,7 @@ public void testKraftOauthBrokerControllerAndMixedNodes() {
"listener.name.controlplane-9090.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12",
"listener.name.controlplane-9090.ssl.truststore.password=${CERTS_STORE_PASSWORD}",
"listener.name.controlplane-9090.ssl.truststore.type=PKCS12",
"listeners=CONTROLPLANE-9090://0.0.0.0:9090",
"listeners=CONTROLPLANE-9090://my-cluster-controllers-2.my-cluster-kafka-brokers.my-namespace.svc:9090",
"listener.security.protocol.map=CONTROLPLANE-9090:SSL",
"sasl.enabled.mechanisms=",
"ssl.endpoint.identification.algorithm=HTTPS",
Expand Down Expand Up @@ -2396,7 +2396,7 @@ public void testListenersOnMigration() {
}

assertThat(configuration, containsString("listener.name.controlplane-9090"));
assertThat(configuration, containsString("listeners=CONTROLPLANE-9090://0.0.0.0:9090"));
assertThat(configuration, containsString("listeners=CONTROLPLANE-9090://my-cluster-controllers-1.my-cluster-kafka-brokers.my-namespace.svc:9090"));
// controllers never advertises listeners
assertThat(configuration, not(containsString("advertised.listeners")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void testControllerNodeConfigurationOnMigration() {
}

assertThat(configuration, containsString("listener.name.controlplane-9090"));
assertThat(configuration, containsString("listeners=CONTROLPLANE-9090://0.0.0.0:9090"));
assertThat(configuration, containsString("listeners=CONTROLPLANE-9090://my-cluster-controllers-3.my-cluster-kafka-brokers.my-namespace.svc:9090"));
// controllers never advertises listeners
assertThat(configuration, not(containsString("advertised.listeners")));

Expand Down
Loading

0 comments on commit 0f6d51f

Please sign in to comment.