Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3145 : Add test for next generation consumer-group rebalance protocol #3237

Merged
merged 16 commits into from
May 20, 2024

Conversation

chickenchickenlove
Copy link
Contributor

@chickenchickenlove chickenchickenlove commented May 6, 2024

Motivation:

  • Add test for next generation consumer-group rebalance protocol #3145
  • Currently, i believe that we cannot adequately test the new Consumer Rebalancing Protocol using EmbeddedKafka. because we cannot set controller.quorum.voters. this is because controller.quorum.voters are hard-coded as 0.0.0.0:0 and new consumer rebalancing protocol support KRaft only. so, we cannot test new consumer Rebalancing protocol by using EmbeddedKafka.
  • We can use TestContainers to test it. however, it is not appropriate to add a new dependency to spring-kafka just for that test. thus, i would like to add only a sample code for writing a Test Case using TestContainer.

Modifications

  • Add sample-07 to /samples/ to test new consumer rebalancing protocol by using testContainers.

To Reviewer

  1. new consumer protocol does not support consumer.enforceRebalance().
  2. new consumer protocol does not support Zookeeper mode. (It's not explicitly mentioned. but you can check it from this issue : https://issues.apache.org/jira/browse/KAFKA-16657)
  3. I guess that new consumer protocol has tiny concurrent issue. (I reported it via https://issues.apache.org/jira/browse/KAFKA-16670)

In case of 3, my work around is using dummy consumer to make broker load metadata from __consumer_offsets.

			Consumer<Object, Object> dummyConsumer = new KafkaConsumer<>(props);
			Thread.sleep(5000);
			dummyConsumer.close();

Test Scenario

  1. spring-kafka consumer subscribe hello-topic. (1st rebalancing occurs)
  2. rawKafkaConsumer subscribe hello-topic. (2nd rebalancing occurs. this is because new consumer rebalancing protocol does not support consumer.enforceRebalance())
  3. rawKafkaConsumer is closed. (3rd rebalancing occurs).

Result:

Releated

@sobychacko
Copy link
Contributor

Thanks, @chickenchickenlove, for this PR. We will take a look at this today.

@sobychacko
Copy link
Contributor

@chickenchickenlove Sorry about the delay in reviewing. I took a look. Here are some general comments.

  1. Although it is a sample, adding the author name, copyrights, and checkstyle-related formatting is still recommended.
  2. Please rename the test with more descriptive names.

I was able to follow your sample, but could you please clarify the connection of Consumer#enforceRebalance in your comments above?

Thanks!

@chickenchickenlove
Copy link
Contributor Author

chickenchickenlove commented May 9, 2024

@sobychacko thanks for your comments 🙇‍♂️

Although it is a sample, adding the author name, copyrights, and checkstyle-related formatting is still recommended.
Please rename the test with more descriptive names.

I make new commit to apply your review.
There is no checkStyle rule for /samples/, so i did copy NewConsumerRebalancProtocol under spring-kafka/src/test/.../ and execute gradle :spring-kafka:checkStyleTest, and no error occurs.
I renamed test function as well!

I was able to follow your sample, but could you please clarify the connection of Consumer#enforceRebalance in your comments above?

Sorry to make you confused.
My intention is as follows:

  1. Since consumer.enforceRebalance() is not supported, I cannot trigger consumer rebalancing using the consumer API.
  2. I thought that adding a new Consumer would be the simplest way to trigger consumer rebalancing, so I added a rawConsumer.

At first, i made new test codes by referring to other test codes that use consumer.enfroceRebalance(). (link)
At that time, i think that it is good way to use consumer.enforceRebalance() to trigger consumer rebalancing without other consumer.

When you have free time, Please take another look 🙇‍♂️

testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

testImplementation 'org.testcontainers:kafka:1.19.7'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Testcontainers are managed dependecy by Spring Boot: https://github.com/spring-projects/spring-boot/blob/main/spring-boot-project/spring-boot-dependencies/build.gradle#L2119.

So, we don't need the version here.

testImplementation 'org.springframework.kafka:spring-kafka-test'

testImplementation 'org.testcontainers:kafka:1.19.7'
implementation 'org.apache.kafka:kafka-clients:3.7.0'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this explicitly since we should rely on the transitive dependency from the org.springframework.kafka:spring-kafka?
Well, I guess we have to make this sample fully based on Spring Boot 3.3.
Even if it is going to be on SNAPSHOT until GA release in a couple week, it is OK to have it in this sample.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan thanks for your comments.
I use the springframework.boot:3.3.0-SNAPSHOT.
does it make sense to you? I worry about misunderstanding of your opinion.

plugins {
id 'java'
id 'org.springframework.boot' version '3.2.5'
id 'io.spring.dependency-management' version '1.1.4'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already 1.1.15

}

group = 'com.example'
version = '0.0.1-SNAPSHOT'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the version has to be exactly the same what is Spring Kafka project, so 3.2.0-SNAPSHOT

String portFormat = String.format("%d:%d/%s", 12000, 9094, InternetProtocol.TCP.toDockerNotation());
final List<String> portsBinding = Collections.singletonList(portFormat);
broker.setPortBindings(portsBinding);
broker.start();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, look into Spring Boot Testcontainers Support: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.testcontainers.

I would expect that community would like to see some convenient high-level API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan , sorry to say that it is impossible to use high-level API like @ServiceConnection currently.

  1. apache/kafka support server side consumer rebalancing protocol from apache/kafka:3.7.0
  2. testContainers support cp/conflunetic-kafka only.
  3. There is no cp/confluent-kafka version compatible with apache/kafka:3.7.0. (See more, please click link)
  4. The low-level code in KafkaContainer is coupled with a lot of Confluent stuff, including the name of the vars (Kafka Testcontainer should use bitnami/kafka image, or at least allow it as compatible testcontainers/testcontainers-java#8107 (comment))

Considering the above four points, I am unable to use KafkaContainer.
Thus i should use GenericContainer instead of KafkaContainer.
The @ServiceConnection you recommended, depends on KafkaContainer.

As you can see this,
There is no ConnectionDetails instance for GenericContainer.
image

This is why i cannot use high-level API of testContainers like @ServiceConnection.
I used @TestContainers to the best of your advice. If you have a better way, please let me know. 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the docs of test-containers:kafka, they support apache/kafka with org.testcontainers.kafka.KafkaContainer. (link)

However, there is no org.testcontainers.kafka.KafkaContainer and maintainer said there is mismatch between the docs and the implementation (link).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify? In the above comments, you say that - testContainers support cp/conflunetic-kafka only. and then In the docs of test-containers:kafka, they support apache/kafka with org.testcontainers.kafka.KafkaContainer. . Which one do they support? Incidentally, We recently switched to apache/kafka from the old wurstmeister/kafka for some AOT testing projects we have.

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sobychacko

Docs says

  • org.testcontainers.containers.KafkaContainer supports confluentinc/cp-kafka.
  • org.testcontainers.kafka.KafkaContainer supports apache/kafka.

However, there is no org.testcontainers.kafka.KafkaContainer. (Please, see image below, testContainers.kafka:1.19.7)
image

So I looked it up. I was able to find this issue on testcontainers' github.
It's kind of well-known bug. i think it means some testContainers version cannot use org.testcontainers.kafka.KafkaContainer. (testcontainers/testcontainers-java#8576 (comment))

By the way, it seems to be fixed (1.19.8, deployed may 9).
image

When I wrote this PR, the latest version was 1.19.7. sorry to make you confused. 🙇‍♂️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification!


GenericContainer setUpBroker() {
final DockerImageName imageName = DockerImageName.parse(KAFKA_IMAGE_NAME);
final GenericContainer broker = new GenericContainer(imageName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use org.testcontainers:kafka then if we don't rely on its specific API?

// https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol
// https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes
broker.addEnv("KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS", "classic,consumer");
broker.addEnv("KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE", "false");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot all of these env vars be extracted into a properties files?
And then we can use broker.getEnvMap().putAll()

import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class Sample07ApplicationTests {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we still need this class?
Or why don't do the logic in your new class exactly in this one?
So, we won't need to fight for naming 😄

}

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need beans from here and above since they have to be auto-configured by Spring Boot.
When you switch to the @ServiceConnection on the container bean definition, everything will be picked up by auto-configuration.

@artembilan
Copy link
Member

@chickenchickenlove ,

Since consumer.enforceRebalance() is not supported, I cannot trigger consumer rebalancing using the consumer API.

I'm not sure in your statement.
See MessageListenerContainer:

	/**
	 * Alerting the consumer to trigger an enforced rebalance. The actual enforce will happen
	 * when the next poll() operation is invoked.
	 * @since 3.1.2
	 * @see org.apache.kafka.clients.consumer.KafkaConsumer#enforceRebalance()
	 */
	default void enforceRebalance() {

@sobychacko
Copy link
Contributor

I think what he means is that the new consumer (non-classic) does not support explicitly triggering enforceRebalance.

@artembilan
Copy link
Member

Ah! I see now in the AsyncKafkaConsumer:

    public void enforceRebalance(String reason) {
        log.warn("Operation not supported in new consumer group protocol");
    }

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some README.adoc required for this new sample to explain what is going on and why.


tasks.withType(Checkstyle) {
checkstyle {
configDirectory.set(rootProject.file("src/checkstyle"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need any Checkstyle validation in samples.
The user may take it as it is and reformat for their own needs.

}

@Bean
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> producerFactory) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this bean. The auto-configuration does exactly the same.


@Bean
ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(BROKER));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that BROKER can be set from application.properties and auto-configuration will take care for us to build this ProducerFactory bean.

Consumer<Object, Object> dummyConsumer = new KafkaConsumer<>(propsDummy);

Thread.sleep(5000);
dummyConsumer.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a mix of concerns.
Why cannot we just rely on the auto-configuration for the ConsumerFactory and then have such a dummy consumer created from the ApplicationRunner?

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that new consumer protocol has tiny concurrent issue. (I reported it via https://issues.apache.org/jira/browse/KAFKA-16670)
In case of 3, my work around is using dummy consumer to make broker load metadata from __consumer_offsets.

I don't think so. it is intended. 😄
As i said in PR description, this is work around to solve tiny issue.

  1. Right after the broker starts, broker seems not load metadata from __consumer_offsets. Once the metadata is loaded, the broker can function as a normal coordinator.
  2. When a consumer is created, consumer sends a findCoordinator request to the bootstrap server.
  3. Upon receiving findCoordinator request, the broker schedules a metadata load from __consumer_offsets. However, at this point, since the broker has not loaded the metadata yet.

and then, consumer.subscribe() and consumer.poll() will be executed.
If broker does not load metadata yet, consumer received response of first consumer.poll() and Response said this coordinator is invalid.
And then, consumer try to find valid coordinator, but stuck in that status forever.

This is problem which what i said.
To solve this problem, I create dummyConsumer to make broker schedule loading metadata. (and dummyConsumer will be closed after sleeping 5 sec, because it's only needed to trigger schedule).

This is why i make other consumer without ConsumerFactory! 🤔
What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then, consumertry to find validcoordinator`, but stuck in that status forever.

If that is the problem of Apache Kafka by itself for this new algorithm, how about to postpone such a sample until it is solved?
I wonder how end-user are supposed to solve this problem in the applications?
If you still insist having this sample, how about to rework it the way end-user would implement it in their own applications?
No one is going to do test stuff in production.
The sample purpose is to get it is and have your own application up-and-running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Um... I see!
I will try to find common way.
If i fail to find a good common way, it might be better to close this PR. 🤔

}

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this bean. Spring Boot auto-configures for us one: TaskExecutionAutoConfiguration.
If we need some custom its properties, they can be set from the application.properties: spring.task.execution prefix.

}

@Bean
public GenericContainer<?> brokerContainer() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this suppose to be a bean.
See @Container and read its Javadocs how Testcontainers manage their lifecycle.

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do @Container
However, there is a major reason why i cannot use it.

  1. As i commented on this, broker should load metadata before consumer.subscribe() is executed.
  2. If using @Container, broker container will be started successfully. however, there is no hooking point to execute dummyConsumer to make broker schedule load to metadata before consumer which are managed by spring-kafka execute subscribe().

We should make broker schedule to load metadata before consumer which is managed by spring-kafka are registered as spring bean. However, AFAIK, there is no annotation to define the execution order at the time of registering Spring beans on @Configuration.

I have this difficulty. Do you have good idea? 🙇‍♂️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... That's too much involved.
I would give up already making this working 😄 .

As I said in other my comment, it might be a time to postpone such a sample until Apache Kafka has that coordinator fixed. 😢

@sobychacko , WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you said, it seems to be better to postpone 😂😂.
Let me check that there is common way as last.
Because i'm not professional to kafka and still learning about it.
Thus some mis-understanding can be existed. so, i will take another look to check mis-understanding made by me as last 😭

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, @artembilan
I found root cause of kafka, so tried to create a new PR on apache/kafka.
However it seems to have already been fixed by this PR (apache/kafka#15698)

The problem was caused by not retrying the error response (as opposed to the log say).
However, that PR is not included in apache/kafka:3.7.0. 😓

Thus, we should postpone this PR.
May I leave this PR in an open status and postpone it?

Thanks in advance!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can keep it opened until the next Apache Kafka 2.7.1.
Thank you!

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These all could go to the application.properties

Copy link
Contributor Author

@chickenchickenlove chickenchickenlove May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not for consumer managed by spring-kafka.

I make two consumers which not are managed by spring-kafka.
One is rawConsumer for triggering consumer rebalance.
The other one is dummyConsumer for making broker scheduling to load metadata. (#3237 (comment))

Thus, we don't need to application.properties.
I think application.properties is for the consumer managed by spring-kafka, right? (for example, spring.kafka.admin.client-id)
If you want to migrate this to properties, how about making dummy-consumer.properties and use it?

	static Map<String, String> getPropertiesFromFile() {
		final Resource resource = new ClassPathResource(BROKER_PROPERTIES_FILE_PATH);
		try {
			final Properties properties = PropertiesLoaderUtils.loadAllProperties(resource.getFilename());
			return properties.stringPropertyNames().stream()
							.collect(Collectors.toMap(
										s -> s,
										s -> (String) properties.get(s)));
		}
		catch (IOException e) {
			throw new RuntimeException(e);
		}
	}

What do you think? Please let me know your opinion 🙇‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, i missed one.
I will move the settings for consumer which are managed by spring-kafka to application.properties. However, it seems necessary to leave the settings for dummyConsumer and rawConsumer as they are.

What do you think?


factory.setConsumerFactory(consumerFactory);

factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ConsumerAwareRebalanceListener could be just a bean and Spring Boot takes care about it to be injected into the auto-configured kafkaListenerContainerFactory.

public class Sample07Application {

public static void main(String[] args) {
SpringApplication.run(Sample07Application.class, args);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into this from high level, I wonder what we are trying to achieve with this sample.
Where are real consumers (@KafkaListener method) for end-user purpose.
Having the test is OK, but it does not give end-users too much value since this sample really does not demonstrate (yet) how really develop an application with that new consumer group management algorithm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR in the test configuration currently has a KafkaListener where the underlying container factory uses a consumer factory where the new consumer group protocol is used. Since this feature is in its early access form (and not recommended for production usage yet), it might still be beneficial for any users who want to play around with this in a test format. In other words, we don't expect any users to use it in a real application yet until Apache Kafka makes that recommendation, but users might be interested in trying this out via Spring Kafka, and this sample could help with that — just my 2 cents.

@artembilan
Copy link
Member

Looks like there is a feature in Spring Boot called Docker Compose: https://docs.spring.io/spring-boot/reference/features/dev-services.html#features.dev-services.docker-compose.

Let's see if that might work better for this our sample (instead of Testcontainers) since we indeed must not talk about fully concentrate on the test, but rather demonstration how to use new consumer group management.

@chickenchickenlove
Copy link
Contributor Author

@artembilan
Thanks for your comments and your times. 🙇‍♂️
I want to sync my understanding and your comments.

My Understanding

  1. Replace testContainers to docker-compose with spring-boot.
  2. No need to test New Consumer Rebalance in detail.

Direction

  • Just make docker-compose.yaml and use it with spring-boot docker-compose.
  • Simplify test codes. because we don't need to check each consumer rebalance.
    Thus, it seems sufficient to retain only the logic where the Producer supplies one message, and the Consumer checks if it can poll that message.

If i misunderstood about your intention, please let me know and give more detail when you have free time 🙇‍♂️

@artembilan
Copy link
Member

Thus, it seems sufficient to retain only the logic where the Producer supplies one message, and the Consumer checks if it can poll that message.

Sounds like a plan.

We need to have our consumer in a new consumer group algorithm and that's it.
Perhaps cause some rebalance adding a consumer on the fly.
Kinda cover all the functionality suggested by this new algorithm.

@chickenchickenlove
Copy link
Contributor Author

Hi, @artembilan, @sobychacko ! I created new commit to apply your comments.
I want to left some comments for new commit. 😃

  1. I cannot config Consumer via application.yaml. this is because spring-kafka does not support spring.kafka.consumer.group-protocol. thus, i have to config Consumer by manually.
  2. Anyway, this sample codes will not work properly now. this is because Consumer's problem which i said before. we should wait for kafka to be updated to > 3.7.0(KAFKA-16528: Client HB timing fix apache/kafka#15698). Also, spring-kafka should have it as well.

I realized that there is new issue for spring-kafka or spring-auto-configure to support spring.kafka.consumer.group-protocol.

When you have some free time, please take another look 🙇‍♂️.
If this sample is okay, I will wait until kafka and spring-kafka are updated as well.
If there's anything that needs to be fixed before kafka and spring-kafka being updated, please let me know, and I'll address it immediately.

Thanks in advance! 🙇‍♂️

@sobychacko
Copy link
Contributor

sobychacko commented May 17, 2024

@chickenchickenlove Good point, created this issue: #3254. This is a groundwork for possibly including this property in Boot auto-config eventually.

@sobychacko
Copy link
Contributor

The sample clean up looks good. I think it might be beneficial to have a README explaining what this sample is all about. I thought you had one before, but not seeing in the latest changes?

Copy link
Contributor

@sobychacko sobychacko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comments about the README.

@@ -0,0 +1,50 @@
== Sample 7

This sample demonstrated the application of the new consumer rebalance protocol in Spring Kafka.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

demonstrates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spring for Apache Kafka.


This sample demonstrated the application of the new consumer rebalance protocol in Spring Kafka.

The new consumer rebalance protocol refers to the Server Side rebalance protocol proposed in KIP-848.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Link to KIP-848.

Copy link
Contributor

@sobychacko sobychacko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the inline comments in README.

Copy link
Contributor

@sobychacko sobychacko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chickenchickenlove We can use the property like below even today:
spring.kafka.consumer.properties[group.protocol]=CONSUMER.

@chickenchickenlove
Copy link
Contributor Author

@sobychacko thanks for your quick comments 👍
I make new commit to apply your comments.

Two points seem to work well as you recommended. (Even if new consumer rebalancing protocol)

image
  • spring-autoconfigure put group.protocol=consumer to ConsumerProperties as expected.
image
  • Consumer rebalancing have been completed successfully via new rebalancing protocol.
  • The first attempt fails. This is due to the reasons discussed earlier.
  • Every attempt after the second always succeeds.

When you have free time, please take another look! 🙇‍♂️

@sobychacko
Copy link
Contributor

Sounds good. We will take a final look through the sample and merge it.

@sobychacko sobychacko merged commit 00fd11b into spring-projects:main May 20, 2024
3 checks passed
* @since 3.3
*/

@EnableKafka
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this annotation since we rely on Spring Boot.

kafka:
consumer:
bootstrap-servers: localhost:10000
group-id: sample07
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you use some other groups in your @KafkaListener methods.
Why do we still need it here?


testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testImplementation 'org.springframework.boot:spring-boot-testcontainers'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need all of these test dependencies if we don't have any tests in this project?
And that raises the question: why we don't have any tests in this project?

@artembilan
Copy link
Member

I see PR has been merged.
Not a bit deal: we always can have follow up PRs to improve the stuff.
For example, we have merged it into the current 3.2, but I see @since 3.3 🤷

@sobychacko
Copy link
Contributor

@chickenchickenlove Thanks for the PR with this sample. We went back and forth a lot, but thanks for being patient during the review process. We are glad that this sample is in.

@artembilan Sorry, I noticed your latest review only after merging. I will follow up with the cleanup.

@chickenchickenlove
Copy link
Contributor Author

Hi, @artembilan !
I'm sorry for the inconvenience caused by the unclear code.
You have taught me a lot (But i am still lacking 😅 ). Would it be okay to write a few more improvement PR later?

@sobychacko
You're welcome. I'm actually glad to have the opportunity to write such PR!
Rather, I am grateful to @sobychacko and @artembilan for thoroughly checking my inadequate PRs and being patient with me as well. Thanks again!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add test for next generation consumer-group rebalance protocol
3 participants