-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-3145 : Add test for next generation consumer-group rebalance protocol #3237
GH-3145 : Add test for next generation consumer-group rebalance protocol #3237
Conversation
Thanks, @chickenchickenlove, for this PR. We will take a look at this today. |
@chickenchickenlove Sorry about the delay in reviewing. I took a look. Here are some general comments.
I was able to follow your sample, but could you please clarify the connection of Thanks! |
@sobychacko thanks for your comments 🙇♂️
I make new commit to apply your review.
Sorry to make you confused.
At first, i made new test codes by referring to other test codes that use When you have free time, Please take another look 🙇♂️ |
samples/sample-07/build.gradle
Outdated
testImplementation 'org.springframework.boot:spring-boot-starter-test' | ||
testImplementation 'org.springframework.kafka:spring-kafka-test' | ||
|
||
testImplementation 'org.testcontainers:kafka:1.19.7' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Testcontainers are managed dependecy by Spring Boot: https://github.com/spring-projects/spring-boot/blob/main/spring-boot-project/spring-boot-dependencies/build.gradle#L2119.
So, we don't need the version here.
samples/sample-07/build.gradle
Outdated
testImplementation 'org.springframework.kafka:spring-kafka-test' | ||
|
||
testImplementation 'org.testcontainers:kafka:1.19.7' | ||
implementation 'org.apache.kafka:kafka-clients:3.7.0' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this explicitly since we should rely on the transitive dependency from the org.springframework.kafka:spring-kafka
?
Well, I guess we have to make this sample fully based on Spring Boot 3.3
.
Even if it is going to be on SNAPSHOT
until GA release in a couple week, it is OK to have it in this sample.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artembilan thanks for your comments.
I use the springframework.boot:3.3.0-SNAPSHOT
.
does it make sense to you? I worry about misunderstanding of your opinion.
samples/sample-07/build.gradle
Outdated
plugins { | ||
id 'java' | ||
id 'org.springframework.boot' version '3.2.5' | ||
id 'io.spring.dependency-management' version '1.1.4' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already 1.1.15
samples/sample-07/build.gradle
Outdated
} | ||
|
||
group = 'com.example' | ||
version = '0.0.1-SNAPSHOT' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the version has to be exactly the same what is Spring Kafka project, so 3.2.0-SNAPSHOT
samples/sample-07/src/main/java/com/example/sample07/Sample07Application.java
Show resolved
Hide resolved
String portFormat = String.format("%d:%d/%s", 12000, 9094, InternetProtocol.TCP.toDockerNotation()); | ||
final List<String> portsBinding = Collections.singletonList(portFormat); | ||
broker.setPortBindings(portsBinding); | ||
broker.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, look into Spring Boot Testcontainers Support: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.testcontainers.
I would expect that community would like to see some convenient high-level API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@artembilan , sorry to say that it is impossible to use high-level API like @ServiceConnection
currently.
apache/kafka
supportserver side consumer rebalancing protocol
fromapache/kafka:3.7.0
testContainers
supportcp/conflunetic-kafka
only.- There is no
cp/confluent-kafka
version compatible withapache/kafka:3.7.0
. (See more, please click link) - The low-level code in
KafkaContainer
is coupled with a lot of Confluent stuff, including the name of the vars (Kafka Testcontainer should use bitnami/kafka image, or at least allow it as compatible testcontainers/testcontainers-java#8107 (comment))
Considering the above four points, I am unable to use KafkaContainer
.
Thus i should use GenericContainer
instead of KafkaContainer
.
The @ServiceConnection
you recommended, depends on KafkaContainer
.
As you can see this,
There is no ConnectionDetails instance for GenericContainer
.
This is why i cannot use high-level API of testContainers
like @ServiceConnection
.
I used @TestContainers
to the best of your advice. If you have a better way, please let me know. 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify? In the above comments, you say that - testContainers support cp/conflunetic-kafka only.
and then In the docs of test-containers:kafka, they support apache/kafka with org.testcontainers.kafka.KafkaContainer.
. Which one do they support? Incidentally, We recently switched to apache/kafka
from the old wurstmeister/kafka
for some AOT testing projects we have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs says
org.testcontainers.containers.KafkaContainer
supportsconfluentinc/cp-kafka
.org.testcontainers.kafka.KafkaContainer
supportsapache/kafka
.
However, there is no org.testcontainers.kafka.KafkaContainer
. (Please, see image below, testContainers.kafka:1.19.7
)
So I looked it up. I was able to find this issue on testcontainers
' github.
It's kind of well-known bug. i think it means some testContainers
version cannot use org.testcontainers.kafka.KafkaContainer
. (testcontainers/testcontainers-java#8576 (comment))
By the way, it seems to be fixed (1.19.8
, deployed may 9).
When I wrote this PR, the latest version was 1.19.7
. sorry to make you confused. 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification!
|
||
GenericContainer setUpBroker() { | ||
final DockerImageName imageName = DockerImageName.parse(KAFKA_IMAGE_NAME); | ||
final GenericContainer broker = new GenericContainer(imageName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use org.testcontainers:kafka
then if we don't rely on its specific API?
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol | ||
// https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes | ||
broker.addEnv("KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS", "classic,consumer"); | ||
broker.addEnv("KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE", "false"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cannot all of these env vars be extracted into a properties files?
And then we can use broker.getEnvMap().putAll()
import org.springframework.boot.test.context.SpringBootTest; | ||
|
||
@SpringBootTest | ||
class Sample07ApplicationTests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we still need this class?
Or why don't do the logic in your new class exactly in this one?
So, we won't need to fight for naming 😄
} | ||
|
||
@Bean | ||
public ThreadPoolTaskExecutor threadPoolTaskExecutor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need beans from here and above since they have to be auto-configured by Spring Boot.
When you switch to the @ServiceConnection
on the container bean definition, everything will be picked up by auto-configuration.
I'm not sure in your statement.
|
I think what he means is that the new consumer (non-classic) does not support explicitly triggering |
Ah! I see now in the
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some README.adoc required for this new sample to explain what is going on and why.
samples/sample-07/build.gradle
Outdated
|
||
tasks.withType(Checkstyle) { | ||
checkstyle { | ||
configDirectory.set(rootProject.file("src/checkstyle")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need any Checkstyle validation in samples.
The user may take it as it is and reformat for their own needs.
} | ||
|
||
@Bean | ||
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> producerFactory) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this bean. The auto-configuration does exactly the same.
|
||
@Bean | ||
ProducerFactory<Integer, String> producerFactory() { | ||
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(BROKER)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that BROKER
can be set from application.properties
and auto-configuration will take care for us to build this ProducerFactory
bean.
Consumer<Object, Object> dummyConsumer = new KafkaConsumer<>(propsDummy); | ||
|
||
Thread.sleep(5000); | ||
dummyConsumer.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a mix of concerns.
Why cannot we just rely on the auto-configuration for the ConsumerFactory
and then have such a dummy consumer created from the ApplicationRunner
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that new consumer protocol has tiny concurrent issue. (I reported it via https://issues.apache.org/jira/browse/KAFKA-16670)
In case of 3, my work around is using dummy consumer to make broker load metadata from __consumer_offsets.
I don't think so. it is intended. 😄
As i said in PR description, this is work around to solve tiny issue.
- Right after the
broker
starts,broker
seems not load metadata from__consumer_offsets
. Once the metadata is loaded, thebroker
can function as a normal coordinator. - When a
consumer
is created,consumer
sends afindCoordinator
request to thebootstrap server
. - Upon receiving
findCoordinator
request, thebroker
schedules a metadata load from__consumer_offsets
. However, at this point, since thebroker
has not loaded the metadata yet.
and then, consumer.subscribe()
and consumer.poll()
will be executed.
If broker
does not load metadata yet, consumer
received response of first consumer.poll()
and Response
said this coordinator is invalid
.
And then, consumer
try to find valid coordinator
, but stuck in that status forever.
This is problem which what i said.
To solve this problem, I create dummyConsumer
to make broker schedule loading metadata. (and dummyConsumer
will be closed after sleeping 5 sec, because it's only needed to trigger schedule).
This is why i make other consumer
without ConsumerFactory
! 🤔
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And then, consumertry to find validcoordinator`, but stuck in that status forever.
If that is the problem of Apache Kafka by itself for this new algorithm, how about to postpone such a sample until it is solved?
I wonder how end-user are supposed to solve this problem in the applications?
If you still insist having this sample, how about to rework it the way end-user would implement it in their own applications?
No one is going to do test stuff in production.
The sample purpose is to get it is and have your own application up-and-running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Um... I see!
I will try to find common way.
If i fail to find a good common way, it might be better to close this PR. 🤔
} | ||
|
||
@Bean | ||
public ThreadPoolTaskExecutor threadPoolTaskExecutor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this bean. Spring Boot auto-configures for us one: TaskExecutionAutoConfiguration
.
If we need some custom its properties, they can be set from the application.properties
: spring.task.execution
prefix.
} | ||
|
||
@Bean | ||
public GenericContainer<?> brokerContainer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this suppose to be a bean.
See @Container
and read its Javadocs how Testcontainers manage their lifecycle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to do @Container
However, there is a major reason why i cannot use it.
- As i commented on this,
broker
should load metadata beforeconsumer.subscribe()
is executed. - If using
@Container
,broker
container will be started successfully. however, there is no hooking point to executedummyConsumer
to makebroker
schedule load tometadata
beforeconsumer
which are managed byspring-kafka
executesubscribe()
.
We should make broker schedule to load metadata before consumer
which is managed by spring-kafka
are registered as spring bean. However, AFAIK, there is no annotation to define the execution order at the time of registering Spring beans
on @Configuration
.
I have this difficulty. Do you have good idea? 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... That's too much involved.
I would give up already making this working 😄 .
As I said in other my comment, it might be a time to postpone such a sample until Apache Kafka has that coordinator fixed. 😢
@sobychacko , WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you said, it seems to be better to postpone 😂😂.
Let me check that there is common way as last.
Because i'm not professional to kafka and still learning about it.
Thus some mis-understanding can be existed. so, i will take another look to check mis-understanding made by me as last 😭
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, @artembilan
I found root cause of kafka
, so tried to create a new PR on apache/kafka
.
However it seems to have already been fixed by this PR (apache/kafka#15698)
The problem was caused by not retrying the error response (as opposed to the log say).
However, that PR is not included in apache/kafka:3.7.0
. 😓
Thus, we should postpone this PR.
May I leave this PR in an open status and postpone it?
Thanks in advance!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We can keep it opened until the next Apache Kafka 2.7.1
.
Thank you!
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER); | ||
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); | ||
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"); | ||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These all could go to the application.properties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not for consumer managed by spring-kafka
.
I make two consumers
which not are managed by spring-kafka
.
One is rawConsumer
for triggering consumer rebalance
.
The other one is dummyConsumer
for making broker scheduling to load metadata. (#3237 (comment))
Thus, we don't need to application.properties
.
I think application.properties
is for the consumer managed by spring-kafka
, right? (for example, spring.kafka.admin.client-id
)
If you want to migrate this to properties, how about making dummy-consumer.properties
and use it?
static Map<String, String> getPropertiesFromFile() {
final Resource resource = new ClassPathResource(BROKER_PROPERTIES_FILE_PATH);
try {
final Properties properties = PropertiesLoaderUtils.loadAllProperties(resource.getFilename());
return properties.stringPropertyNames().stream()
.collect(Collectors.toMap(
s -> s,
s -> (String) properties.get(s)));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
What do you think? Please let me know your opinion 🙇♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, i missed one.
I will move the settings for consumer which are managed by spring-kafka
to application.properties
. However, it seems necessary to leave the settings for dummyConsumer
and rawConsumer
as they are.
What do you think?
|
||
factory.setConsumerFactory(consumerFactory); | ||
|
||
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ConsumerAwareRebalanceListener
could be just a bean and Spring Boot takes care about it to be injected into the auto-configured kafkaListenerContainerFactory
.
public class Sample07Application { | ||
|
||
public static void main(String[] args) { | ||
SpringApplication.run(Sample07Application.class, args); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking into this from high level, I wonder what we are trying to achieve with this sample.
Where are real consumers (@KafkaListener
method) for end-user purpose.
Having the test is OK, but it does not give end-users too much value since this sample really does not demonstrate (yet) how really develop an application with that new consumer group management algorithm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR in the test configuration currently has a KafkaListener where the underlying container factory uses a consumer factory where the new consumer group protocol is used. Since this feature is in its early access form (and not recommended for production usage yet), it might still be beneficial for any users who want to play around with this in a test format. In other words, we don't expect any users to use it in a real application yet until Apache Kafka makes that recommendation, but users might be interested in trying this out via Spring Kafka, and this sample could help with that — just my 2 cents.
Looks like there is a feature in Spring Boot called Let's see if that might work better for this our sample (instead of Testcontainers) since we indeed must not talk about fully concentrate on the test, but rather demonstration how to use new consumer group management. |
@artembilan My Understanding
Direction
If i misunderstood about your intention, please let me know and give more detail when you have free time 🙇♂️ |
Sounds like a plan. We need to have our consumer in a new consumer group algorithm and that's it. |
Hi, @artembilan, @sobychacko ! I created new commit to apply your comments.
I realized that there is new issue for When you have some free time, please take another look 🙇♂️. Thanks in advance! 🙇♂️ |
@chickenchickenlove Good point, created this issue: #3254. This is a groundwork for possibly including this property in Boot auto-config eventually. |
The sample clean up looks good. I think it might be beneficial to have a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comments about the README.
samples/sample-07/README.adoc
Outdated
@@ -0,0 +1,50 @@ | |||
== Sample 7 | |||
|
|||
This sample demonstrated the application of the new consumer rebalance protocol in Spring Kafka. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
demonstrates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spring for Apache Kafka.
samples/sample-07/README.adoc
Outdated
|
||
This sample demonstrated the application of the new consumer rebalance protocol in Spring Kafka. | ||
|
||
The new consumer rebalance protocol refers to the Server Side rebalance protocol proposed in KIP-848. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Link to KIP-848.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the inline comments in README.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chickenchickenlove We can use the property like below even today:
spring.kafka.consumer.properties[group.protocol]=CONSUMER
.
@sobychacko thanks for your quick comments 👍 Two points seem to work well as you recommended. (Even if new consumer rebalancing protocol)
When you have free time, please take another look! 🙇♂️ |
Sounds good. We will take a final look through the sample and merge it. |
* @since 3.3 | ||
*/ | ||
|
||
@EnableKafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this annotation since we rely on Spring Boot.
kafka: | ||
consumer: | ||
bootstrap-servers: localhost:10000 | ||
group-id: sample07 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you use some other groups in your @KafkaListener
methods.
Why do we still need it here?
|
||
testImplementation 'org.springframework.boot:spring-boot-starter-test' | ||
testImplementation 'org.springframework.kafka:spring-kafka-test' | ||
testImplementation 'org.springframework.boot:spring-boot-testcontainers' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need all of these test dependencies if we don't have any tests in this project?
And that raises the question: why we don't have any tests in this project?
I see PR has been merged. |
@chickenchickenlove Thanks for the PR with this sample. We went back and forth a lot, but thanks for being patient during the review process. We are glad that this sample is in. @artembilan Sorry, I noticed your latest review only after merging. I will follow up with the cleanup. |
Hi, @artembilan ! @sobychacko |
Motivation:
new Consumer Rebalancing Protocol
usingEmbeddedKafka
. because we cannot setcontroller.quorum.voters
. this is becausecontroller.quorum.voters
are hard-coded as0.0.0.0:0
andnew consumer rebalancing protocol
supportKRaft
only. so, we cannot testnew consumer Rebalancing protocol
by usingEmbeddedKafka
.TestContainers
to test it. however, it is not appropriate to add a new dependency tospring-kafka
just for that test. thus, i would like to add only a sample code for writing a Test Case usingTestContainer
.Modifications
sample-07
to/samples/
to testnew consumer rebalancing protocol
by usingtestContainers
.To Reviewer
new consumer protocol
does not supportconsumer.enforceRebalance()
.new consumer protocol
does not supportZookeeper mode
. (It's not explicitly mentioned. but you can check it from this issue : https://issues.apache.org/jira/browse/KAFKA-16657)new consumer protocol
has tiny concurrent issue. (I reported it via https://issues.apache.org/jira/browse/KAFKA-16670)In case of 3, my work around is using dummy
consumer
to make broker load metadata from__consumer_offsets
.Test Scenario
spring-kafka consumer
subscribehello-topic
. (1st rebalancing occurs)rawKafkaConsumer
subscribehello-topic
. (2nd rebalancing occurs. this is becausenew consumer rebalancing protocol
does not supportconsumer.enforceRebalance()
)rawKafkaConsumer
is closed. (3rd rebalancing occurs).Result:
Releated