Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3145 : Add test for next generation consumer-group rebalance protocol #3237

Merged
merged 16 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions samples/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
* sample-04 - Topic based (non-blocking) retry
* sample-05 - Global embedded Kafka testing
* sample-06 - Kafka Streams tests with TopologyTestDriver
* sample-07 - The New consumer rebalance protocol in spring-kafka
37 changes: 37 additions & 0 deletions samples/sample-07/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
HELP.md
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/

### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/

### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/

### VS Code ###
.vscode/
50 changes: 50 additions & 0 deletions samples/sample-07/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
== Sample 7

This sample demonstrated the application of the new consumer rebalance protocol in Spring Kafka.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

demonstrates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spring for Apache Kafka.


The new consumer rebalance protocol refers to the Server Side rebalance protocol proposed in KIP-848.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link to KIP-848.


`Spring Boot` starts the `Kafka Broker` container defined in the `compose.yaml` file upon startup.

```yaml
version: '3'
services:
broker:
image: bitnami/kafka:3.7.0
...
# KIP-848
KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer"
KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false"
```

The config of `group.protocol = conumser` should be added to `Consumer` configuration to apply new consumer rebalance protocol.

```java
@Bean
public ConsumerFactory<String, String> consumerFactory() {
final Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
...
}
```

Next, the `Consumer` created by `@KafkaListener` will request a subscription to the `test-topic` from the `Broker`.

The `Broker` will then send the Topic Partition Assign information to the `Consumer`. That means the `Consumer` Rebalancing has finished and started to poll messages.

```java
@Component
public class Sample07KafkaListener {

@KafkaListener(topics = "test-topic", groupId = "sample07-1")
public void listenWithGroup1(String message) {
System.out.println("Received message at group sample07-1: " + message);
}

@KafkaListener(topics = "test-topic", groupId = "sample07-2")
public void listenWithGroup2(String message) {
System.out.println("Received message at group sample07-2: " + message);
}
}
```
32 changes: 32 additions & 0 deletions samples/sample-07/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.0-SNAPSHOT'
id 'io.spring.dependency-management' version '1.1.5'
}

group = 'com.example'
version = '3.2.0-SNAPSHOT'

java {
sourceCompatibility = '17'
}

repositories {
mavenCentral()
maven { url 'https://repo.spring.io/milestone' }
maven { url 'https://repo.spring.io/snapshot' }
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.kafka:spring-kafka'
developmentOnly 'org.springframework.boot:spring-boot-docker-compose'

testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testImplementation 'org.springframework.boot:spring-boot-testcontainers'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need all of these test dependencies if we don't have any tests in this project?
And that raises the question: why we don't have any tests in this project?

}

tasks.named('test') {
useJUnitPlatform()
}
34 changes: 34 additions & 0 deletions samples/sample-07/compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version: '3'
services:
broker:
image: bitnami/kafka:3.7.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "10000:9094"
environment:
# Kraft Settings
KAFKA_CFG_NODE_ID: 0
KAFKA_KRAFT_CLUSTER_ID: HsDBs9l6UUmQq7Y5E6bNlw
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
KAFKA_CFG_PROCESS_ROLES: controller,broker

# Listeners
KAFKA_CFG_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://0.0.0.0:9092, EXTERNAL://:9094, CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://broker:9092, EXTERNAL://127.0.0.1:10000
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

# Clustering
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1

# KIP-848
KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer"
KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false"
Binary file not shown.
7 changes: 7 additions & 0 deletions samples/sample-07/gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Loading