Skip to content

Commit

Permalink
wait to have the expected number of consumers in the consumer group
Browse files Browse the repository at this point in the history
When a pod restarts the previous consumers are kept in the consumer group
until the session times out, this can lead to the new consumer skipping
some messages on the non assigned partitions as there is no offset stored
for those initially (consumers are configured to start consuming from the
latest offset).

Issue: ZENKO-4286
  • Loading branch information
Kerkesni committed Jan 28, 2025
1 parent f46740b commit 36c3027
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
19 changes: 12 additions & 7 deletions .github/scripts/end2end/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,23 @@ wait_for_consumer_group() {
# Getting the name of the first kafka pod
kafka_pod=$(kubectl get pods -l brokerId=0,kafka_cr=end2end-base-queue,app=kafka -o jsonpath='{.items[0].metadata.name}')
consumer_group=$1
timeout_s=$2
interval_s=${3:-5}
# When a pod is restarted the previous consumer is kept in the group until the session timeout expires
expected_members=$2
timeout_s=$3
interval_s=${4:-5}
kubectl exec -it $kafka_pod -- bash -c '
export KAFKA_OPTS=
consumer_group=$1
timeout_s=$2
expected_members=$2
timeout_s=$3
interval_s=$4
start_time=$(date +%s)
while true; do
# The state becomes "Stable" when no rebalance is happening and at least one consumer is connected
state=$(kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group $consumer_group --state | awk '"'"'NF>1 && $(NF-1) != "STATE" {print (NF>1?$(NF-1):"None")} {next}'"'"')
if [ "$state" == "Stable" ]; then
members=$(kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group $consumer_group --state | awk '"'"'NF>1 && $NF != "#MEMBERS" {print (NF>1?$NF:"None")} {next}'"'"')
echo "Consumer group $consumer_group state: $state, members: $members"
if [ "$state" == "Stable" ] && [ "$members" -eq "$expected_members" ]; then
echo "Consumer group $consumer_group is now consuming."
exit 0
fi
Expand All @@ -88,8 +94,7 @@ while true; do
echo "Error: Timed out waiting for consumer group $consumer_group to start consuming."
exit 1
fi
# Sleep for 1 second before checking again
sleep 1
sleep $interval_s
done
' -- "$consumer_group" "$timeout_s" "$interval_s"
' -- "$consumer_group" "$expected_members" "$timeout_s" "$interval_s"
}
2 changes: 1 addition & 1 deletion .github/scripts/end2end/configure-e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@ kubectl wait --for condition=DeploymentFailure=false --timeout 25m -n ${NAMESPAC
kubectl wait --for condition=DeploymentInProgress=false --timeout 25m -n ${NAMESPACE} zenko/${ZENKO_NAME}

# wait for ingestion processor to start consuming from Kafka
wait_for_consumer_group $UUID.backbeat-ingestion-group 300
wait_for_consumer_group $UUID.backbeat-ingestion-group 1 300

0 comments on commit 36c3027

Please sign in to comment.