-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve memory consumption on large Kubernetes clusters when NodePort listeners are used #11098
base: main
Are you sure you want to change the base?
Conversation
… listeners are used Signed-off-by: Jakub Scholz <[email protected]>
/azp run regression |
Azure Pipelines successfully started running 1 pipeline(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just a nit and a question for you.
@@ -1107,46 +1109,59 @@ protected Future<Void> sharedKafkaConfigurationCleanup() { | |||
* types are done because it requires the Kafka brokers to be scheduled and running to collect their node addresses. | |||
* Without that, we do not know on which node would they be running. | |||
* | |||
* Note: To avoid issues with bg clusters with many nodes, we first get the used nodes from the Pods and then get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Note: To avoid issues with bg clusters with many nodes, we first get the used nodes from the Pods and then get | |
* Note: To avoid issues with big clusters with many nodes, we first get the used nodes from the Pods and then get |
List<Future<Void>> nodeFutures = new ArrayList<>(); | ||
|
||
// We get the full node resource for each node with a broker | ||
for (String nodeName : brokerNodes.values().stream().distinct().toList()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why there is a distinct()
here? can you get duplicates from the previous code that you need to avoid here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also curious about this 🤔 .
// First we collect all the broker pods we have so that we can find out on which worker nodes they run | ||
return podOperator.listAsync(reconciliation.namespace(), kafka.getSelectorLabels().withStrimziBrokerRole(true)) | ||
.compose(pods -> { | ||
// We collect the nodes used by the brokers upfront to avid asking for the same node multiple times later |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// We collect the nodes used by the brokers upfront to avid asking for the same node multiple times later | |
// We collect the nodes used by the brokers upfront to avoid asking for the same node multiple times later |
List<Future<Void>> nodeFutures = new ArrayList<>(); | ||
|
||
// We get the full node resource for each node with a broker | ||
for (String nodeName : brokerNodes.values().stream().distinct().toList()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also curious about this 🤔 .
Type of change
Description
When using node port listeners, the cluster operator needs to figure out the bootstrap address for the node port listener in order to add it to the status section of the Kafka CR.
Right now, we do it the following way:
But this approach can cause the operator to run out of memory on large Kubernetes clusters because:
So this can easily lead to running out of memory, even with a small Kafka cluster. Reports from users suggest problems with 250 worker node OpenShift cluster with one Kafka cluster. I was able to reproduce it on a cluster with ~160 worker nodes with the help of 3 small Kafka clusters.
This PR updates the way how we collect the addresses for the status. It:
This has the trade-off of making more Kubernetes API calls and spending more CPU while being more carefull with the used memory.
It should help with the most common use case where the Kube cluster is large with many nodes, but the Kafka cluster is much smaller. For situations where the Kafka cluster is large as well, we will still need the memory to store many nodes. So users might need to increase the operator memory, but that is not unexpected for large Kafka cluster. For a large Kafka cluster, it might also make a large number of GET calls for the individual nodes instead of one LIST command. But I think overall this is a reasonable trade-off.
Checklist