-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-3145 : Add test for next generation consumer-group rebalance protocol #3237
Merged
sobychacko
merged 16 commits into
spring-projects:main
from
chickenchickenlove:kafka-test
May 20, 2024
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
37f8eeb
Add sample 06
chickenchickenlove 09fc7b1
clean up dependencies.
chickenchickenlove d540601
resolve conflict
chickenchickenlove a9df402
resolve conflict
chickenchickenlove 8e8113c
resolve conflict
chickenchickenlove e89c6d9
apply review
chickenchickenlove 3e85fb3
apply review
chickenchickenlove 489c422
remove useless dependencies
chickenchickenlove 2e65068
apply review
chickenchickenlove b4b28f8
modify miss typo
chickenchickenlove b71624f
remove check-styel
chickenchickenlove 8fe0ce6
Add README for sample 07
chickenchickenlove 6b2a74f
Merge branch 'main' into kafka-test
chickenchickenlove edcb826
use upper case
chickenchickenlove d21e4c3
Apply review
chickenchickenlove bac2380
apply comments
chickenchickenlove File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
HELP.md | ||
.gradle | ||
build/ | ||
!gradle/wrapper/gradle-wrapper.jar | ||
!**/src/main/**/build/ | ||
!**/src/test/**/build/ | ||
|
||
### STS ### | ||
.apt_generated | ||
.classpath | ||
.factorypath | ||
.project | ||
.settings | ||
.springBeans | ||
.sts4-cache | ||
bin/ | ||
!**/src/main/**/bin/ | ||
!**/src/test/**/bin/ | ||
|
||
### IntelliJ IDEA ### | ||
.idea | ||
*.iws | ||
*.iml | ||
*.ipr | ||
out/ | ||
!**/src/main/**/out/ | ||
!**/src/test/**/out/ | ||
|
||
### NetBeans ### | ||
/nbproject/private/ | ||
/nbbuild/ | ||
/dist/ | ||
/nbdist/ | ||
/.nb-gradle/ | ||
|
||
### VS Code ### | ||
.vscode/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
== Sample 7 | ||
|
||
This sample demonstrates the application of the new consumer rebalance protocol in Spring for Apache Kafka. | ||
|
||
The new consumer rebalance protocol refers to the Server Side rebalance protocol proposed in link:https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848]. | ||
|
||
`Spring Boot` starts the `Kafka Broker` container defined in the `compose.yaml` file upon startup. | ||
|
||
```yaml | ||
version: '3' | ||
services: | ||
broker: | ||
image: bitnami/kafka:3.7.0 | ||
... | ||
# KIP-848 | ||
KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer" | ||
KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false" | ||
``` | ||
|
||
The config of `group.protocol = conumser` should be added to `Consumer` configuration to apply new consumer rebalance protocol. | ||
|
||
The `group.protocol` can be configured in the `resources/application.yaml` as follows: | ||
|
||
```yaml | ||
spring: | ||
kafka: | ||
consumer: | ||
properties: | ||
group.protocol: consumer | ||
``` | ||
|
||
Next, the `Consumer` created by `@KafkaListener` will request a subscription to the `test-topic` from the `Broker`. | ||
|
||
The `Broker` will then send the Topic Partition Assign information to the `Consumer`. This means that the `Consumer` rebalancing has finished, and the `Consumer` has started to poll messages. | ||
|
||
```java | ||
@Component | ||
public class Sample07KafkaListener { | ||
|
||
@KafkaListener(topics = "test-topic", groupId = "sample07-1") | ||
public void listenWithGroup1(String message) { | ||
System.out.println("Received message at group sample07-1: " + message); | ||
} | ||
|
||
@KafkaListener(topics = "test-topic", groupId = "sample07-2") | ||
public void listenWithGroup2(String message) { | ||
System.out.println("Received message at group sample07-2: " + message); | ||
} | ||
} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
plugins { | ||
id 'java' | ||
id 'org.springframework.boot' version '3.3.0-SNAPSHOT' | ||
id 'io.spring.dependency-management' version '1.1.5' | ||
} | ||
|
||
group = 'com.example' | ||
version = '3.2.0-SNAPSHOT' | ||
|
||
java { | ||
sourceCompatibility = '17' | ||
} | ||
|
||
repositories { | ||
mavenCentral() | ||
maven { url 'https://repo.spring.io/milestone' } | ||
maven { url 'https://repo.spring.io/snapshot' } | ||
} | ||
|
||
dependencies { | ||
implementation 'org.springframework.boot:spring-boot-starter' | ||
implementation 'org.springframework.kafka:spring-kafka' | ||
developmentOnly 'org.springframework.boot:spring-boot-docker-compose' | ||
|
||
testImplementation 'org.springframework.boot:spring-boot-starter-test' | ||
testImplementation 'org.springframework.kafka:spring-kafka-test' | ||
testImplementation 'org.springframework.boot:spring-boot-testcontainers' | ||
} | ||
|
||
tasks.named('test') { | ||
useJUnitPlatform() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
version: '3' | ||
services: | ||
broker: | ||
image: bitnami/kafka:3.7.0 | ||
hostname: broker | ||
container_name: broker | ||
ports: | ||
- "9092:9092" | ||
- "10000:9094" | ||
environment: | ||
# Kraft Settings | ||
KAFKA_CFG_NODE_ID: 0 | ||
KAFKA_KRAFT_CLUSTER_ID: HsDBs9l6UUmQq7Y5E6bNlw | ||
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093 | ||
KAFKA_CFG_PROCESS_ROLES: controller,broker | ||
|
||
# Listeners | ||
KAFKA_CFG_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://0.0.0.0:9092, EXTERNAL://:9094, CONTROLLER://:9093 | ||
KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://broker:9092, EXTERNAL://127.0.0.1:10000 | ||
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT | ||
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER | ||
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" | ||
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL | ||
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 | ||
|
||
# Clustering | ||
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1 | ||
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 | ||
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 | ||
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1 | ||
|
||
# KIP-848 | ||
KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer" | ||
KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false" |
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
distributionBase=GRADLE_USER_HOME | ||
distributionPath=wrapper/dists | ||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip | ||
networkTimeout=10000 | ||
validateDistributionUrl=true | ||
zipStoreBase=GRADLE_USER_HOME | ||
zipStorePath=wrapper/dists |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need all of these test dependencies if we don't have any tests in this project?
And that raises the question: why we don't have any tests in this project?