Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve memory consumption on large Kubernetes clusters when NodePort listeners are used #11098

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

scholzj
Copy link
Member

@scholzj scholzj commented Jan 30, 2025

Type of change

  • Bugfix

Description

When using node port listeners, the cluster operator needs to figure out the bootstrap address for the node port listener in order to add it to the status section of the Kafka CR.

Right now, we do it the following way:

  1. We list all worker nodes
  2. We list all Kafka pods
  3. We match the pods to the nodes and collect the addresses of the nodes they are running on

But this approach can cause the operator to run out of memory on large Kubernetes clusters because:

  • We keep the list of the nodes in memory
  • A single node resource can be pretty big because its status contains lists of container images etc.

So this can easily lead to running out of memory, even with a small Kafka cluster. Reports from users suggest problems with 250 worker node OpenShift cluster with one Kafka cluster. I was able to reproduce it on a cluster with ~160 worker nodes with the help of 3 small Kafka clusters.

This PR updates the way how we collect the addresses for the status. It:

  • Gets the list of broker pods
  • From the pods, it gets a list of nodes they are running on
  • Gets the nodes from Kubernetes one-by-one with GET instead of LIST API
  • Extracts the addresses from the nodes

This has the trade-off of making more Kubernetes API calls and spending more CPU while being more carefull with the used memory.

It should help with the most common use case where the Kube cluster is large with many nodes, but the Kafka cluster is much smaller. For situations where the Kafka cluster is large as well, we will still need the memory to store many nodes. So users might need to increase the operator memory, but that is not unexpected for large Kafka cluster. For a large Kafka cluster, it might also make a large number of GET calls for the individual nodes instead of one LIST command. But I think overall this is a reasonable trade-off.

Checklist

  • Write tests
  • Make sure all tests pass
  • Try your changes from Pod inside your Kubernetes and OpenShift cluster, not just locally

@scholzj scholzj added this to the 0.46.0 milestone Jan 30, 2025
@scholzj scholzj requested a review from ppatierno January 30, 2025 16:29
@scholzj
Copy link
Member Author

scholzj commented Jan 30, 2025

/azp run regression

Copy link

Azure Pipelines successfully started running 1 pipeline(s).

Copy link
Member

@ppatierno ppatierno left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just a nit and a question for you.

@@ -1107,46 +1109,59 @@ protected Future<Void> sharedKafkaConfigurationCleanup() {
* types are done because it requires the Kafka brokers to be scheduled and running to collect their node addresses.
* Without that, we do not know on which node would they be running.
*
* Note: To avoid issues with bg clusters with many nodes, we first get the used nodes from the Pods and then get
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Note: To avoid issues with bg clusters with many nodes, we first get the used nodes from the Pods and then get
* Note: To avoid issues with big clusters with many nodes, we first get the used nodes from the Pods and then get

List<Future<Void>> nodeFutures = new ArrayList<>();

// We get the full node resource for each node with a broker
for (String nodeName : brokerNodes.values().stream().distinct().toList()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why there is a distinct() here? can you get duplicates from the previous code that you need to avoid here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also curious about this 🤔 .

// First we collect all the broker pods we have so that we can find out on which worker nodes they run
return podOperator.listAsync(reconciliation.namespace(), kafka.getSelectorLabels().withStrimziBrokerRole(true))
.compose(pods -> {
// We collect the nodes used by the brokers upfront to avid asking for the same node multiple times later
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// We collect the nodes used by the brokers upfront to avid asking for the same node multiple times later
// We collect the nodes used by the brokers upfront to avoid asking for the same node multiple times later

List<Future<Void>> nodeFutures = new ArrayList<>();

// We get the full node resource for each node with a broker
for (String nodeName : brokerNodes.values().stream().distinct().toList()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also curious about this 🤔 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants