-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-3145 : Add test for next generation consumer-group rebalance protocol #3237
Changes from 14 commits
37f8eeb
09fc7b1
d540601
a9df402
8e8113c
e89c6d9
3e85fb3
489c422
2e65068
b4b28f8
b71624f
8fe0ce6
6b2a74f
edcb826
d21e4c3
bac2380
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
HELP.md | ||
.gradle | ||
build/ | ||
!gradle/wrapper/gradle-wrapper.jar | ||
!**/src/main/**/build/ | ||
!**/src/test/**/build/ | ||
|
||
### STS ### | ||
.apt_generated | ||
.classpath | ||
.factorypath | ||
.project | ||
.settings | ||
.springBeans | ||
.sts4-cache | ||
bin/ | ||
!**/src/main/**/bin/ | ||
!**/src/test/**/bin/ | ||
|
||
### IntelliJ IDEA ### | ||
.idea | ||
*.iws | ||
*.iml | ||
*.ipr | ||
out/ | ||
!**/src/main/**/out/ | ||
!**/src/test/**/out/ | ||
|
||
### NetBeans ### | ||
/nbproject/private/ | ||
/nbbuild/ | ||
/dist/ | ||
/nbdist/ | ||
/.nb-gradle/ | ||
|
||
### VS Code ### | ||
.vscode/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
== Sample 7 | ||
|
||
This sample demonstrated the application of the new consumer rebalance protocol in Spring Kafka. | ||
|
||
The new consumer rebalance protocol refers to the Server Side rebalance protocol proposed in KIP-848. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Link to KIP-848. |
||
|
||
`Spring Boot` starts the `Kafka Broker` container defined in the `compose.yaml` file upon startup. | ||
|
||
```yaml | ||
version: '3' | ||
services: | ||
broker: | ||
image: bitnami/kafka:3.7.0 | ||
... | ||
# KIP-848 | ||
KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer" | ||
KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false" | ||
``` | ||
|
||
The config of `group.protocol = conumser` should be added to `Consumer` configuration to apply new consumer rebalance protocol. | ||
|
||
```java | ||
@Bean | ||
public ConsumerFactory<String, String> consumerFactory() { | ||
final Map<String, Object> props = new HashMap<>(); | ||
... | ||
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"); | ||
... | ||
} | ||
``` | ||
|
||
Next, the `Consumer` created by `@KafkaListener` will request a subscription to the `test-topic` from the `Broker`. | ||
|
||
The `Broker` will then send the Topic Partition Assign information to the `Consumer`. That means the `Consumer` Rebalancing has finished and started to poll messages. | ||
|
||
```java | ||
@Component | ||
public class Sample07KafkaListener { | ||
|
||
@KafkaListener(topics = "test-topic", groupId = "sample07-1") | ||
public void listenWithGroup1(String message) { | ||
System.out.println("Received message at group sample07-1: " + message); | ||
} | ||
|
||
@KafkaListener(topics = "test-topic", groupId = "sample07-2") | ||
public void listenWithGroup2(String message) { | ||
System.out.println("Received message at group sample07-2: " + message); | ||
} | ||
} | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
plugins { | ||
id 'java' | ||
id 'org.springframework.boot' version '3.3.0-SNAPSHOT' | ||
id 'io.spring.dependency-management' version '1.1.5' | ||
} | ||
|
||
group = 'com.example' | ||
version = '3.2.0-SNAPSHOT' | ||
|
||
java { | ||
sourceCompatibility = '17' | ||
} | ||
|
||
repositories { | ||
mavenCentral() | ||
maven { url 'https://repo.spring.io/milestone' } | ||
maven { url 'https://repo.spring.io/snapshot' } | ||
} | ||
|
||
dependencies { | ||
implementation 'org.springframework.boot:spring-boot-starter' | ||
implementation 'org.springframework.kafka:spring-kafka' | ||
developmentOnly 'org.springframework.boot:spring-boot-docker-compose' | ||
|
||
testImplementation 'org.springframework.boot:spring-boot-starter-test' | ||
testImplementation 'org.springframework.kafka:spring-kafka-test' | ||
testImplementation 'org.springframework.boot:spring-boot-testcontainers' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need all of these test dependencies if we don't have any tests in this project? |
||
} | ||
|
||
tasks.named('test') { | ||
useJUnitPlatform() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
version: '3' | ||
services: | ||
broker: | ||
image: bitnami/kafka:3.7.0 | ||
hostname: broker | ||
container_name: broker | ||
ports: | ||
- "9092:9092" | ||
- "10000:9094" | ||
environment: | ||
# Kraft Settings | ||
KAFKA_CFG_NODE_ID: 0 | ||
KAFKA_KRAFT_CLUSTER_ID: HsDBs9l6UUmQq7Y5E6bNlw | ||
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 | ||
KAFKA_CFG_PROCESS_ROLES: controller,broker | ||
|
||
# Listeners | ||
KAFKA_CFG_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://0.0.0.0:9092, EXTERNAL://:9094, CONTROLLER://:9093 | ||
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://broker:9092, EXTERNAL://127.0.0.1:10000 | ||
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT | ||
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER | ||
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" | ||
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL | ||
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 | ||
|
||
# Clustering | ||
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1 | ||
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 | ||
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1 | ||
|
||
# KIP-848 | ||
KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer" | ||
KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
distributionBase=GRADLE_USER_HOME | ||
distributionPath=wrapper/dists | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip | ||
networkTimeout=10000 | ||
validateDistributionUrl=true | ||
zipStoreBase=GRADLE_USER_HOME | ||
zipStorePath=wrapper/dists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
demonstrates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spring for Apache Kafka.