Skip to content

Commit

Permalink
wait to have the expected number of consumers in the consumer group
Browse files Browse the repository at this point in the history
Issue: ZENKO-4286
  • Loading branch information
Kerkesni committed Jan 28, 2025
1 parent 6ecbaa5 commit ddfbc3f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
19 changes: 12 additions & 7 deletions .github/scripts/end2end/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,23 @@ wait_for_consumer_group() {
# Getting the name of the first kafka pod
kafka_pod=$(kubectl get pods -l brokerId=0,kafka_cr=end2end-base-queue,app=kafka -o jsonpath='{.items[0].metadata.name}')
consumer_group=$1
timeout_s=$2
interval_s=${3:-5}
# When a pod is restarted the previous consumer is kept in the group until the session timeout expires
expected_members=$2
timeout_s=$3
interval_s=${4:-5}
kubectl exec -it $kafka_pod -- bash -c '
export KAFKA_OPTS=
consumer_group=$1
timeout_s=$2
expected_members=$2
timeout_s=$3
interval_s=$4
start_time=$(date +%s)
while true; do
# The state becomes "Stable" when no rebalance is happening and at least one consumer is connected
state=$(kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group $consumer_group --state | awk '"'"'NF>1 && $(NF-1) != "STATE" {print (NF>1?$(NF-1):"None")} {next}'"'"')
if [ "$state" == "Stable" ]; then
members=$(kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group $consumer_group --state | awk '"'"'NF>1 && $NF != "#MEMBERS" {print (NF>1?$NF:"None")} {next}'"'"')
echo "Consumer group $consumer_group state: $state, members: $members"
if [ "$state" == "Stable" ] && [ "$members" -eq "$expected_members" ]; then
echo "Consumer group $consumer_group is now consuming."
exit 0
fi
Expand All @@ -88,8 +94,7 @@ while true; do
echo "Error: Timed out waiting for consumer group $consumer_group to start consuming."
exit 1
fi
# Sleep for 1 second before checking again
sleep 1
sleep $interval_s
done
' -- "$consumer_group" "$timeout_s" "$interval_s"
' -- "$consumer_group" "$expected_members" "$timeout_s" "$interval_s"
}
2 changes: 1 addition & 1 deletion .github/scripts/end2end/configure-e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@ kubectl wait --for condition=DeploymentFailure=false --timeout 25m -n ${NAMESPAC
kubectl wait --for condition=DeploymentInProgress=false --timeout 25m -n ${NAMESPACE} zenko/${ZENKO_NAME}

# wait for ingestion processor to start consuming from Kafka
wait_for_consumer_group $UUID.backbeat-ingestion-group 300
wait_for_consumer_group $UUID.backbeat-ingestion-group 1 300

0 comments on commit ddfbc3f

Please sign in to comment.