From bac238054ba206a605937e787ac7cc0432f76911 Mon Sep 17 00:00:00 2001 From: chickenchickenlove Date: Sat, 18 May 2024 08:37:16 +0900 Subject: [PATCH] apply comments --- samples/sample-07/README.adoc | 16 ++--- .../example/sample07/KafkaConsumerConfig.java | 66 ------------------- .../example/sample07/Sample07Application.java | 2 + .../sample07/Sample07KafkaListener.java | 4 +- .../src/main/resources/application.yaml | 8 +++ 5 files changed, 20 insertions(+), 76 deletions(-) delete mode 100644 samples/sample-07/src/main/java/com/example/sample07/KafkaConsumerConfig.java diff --git a/samples/sample-07/README.adoc b/samples/sample-07/README.adoc index 91f14c1f18..11bfecb6aa 100644 --- a/samples/sample-07/README.adoc +++ b/samples/sample-07/README.adoc @@ -19,14 +19,14 @@ services: The config of `group.protocol = conumser` should be added to `Consumer` configuration to apply new consumer rebalance protocol. -```java - @Bean - public ConsumerFactory consumerFactory() { - final Map props = new HashMap<>(); - ... - props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"); - ... - } +The `group.protocol` can be configured in the `resources/application.yaml` as follows: + +```yaml +spring: + kafka: + consumer: + properties: + group.protocol: consumer ``` Next, the `Consumer` created by `@KafkaListener` will request a subscription to the `test-topic` from the `Broker`. diff --git a/samples/sample-07/src/main/java/com/example/sample07/KafkaConsumerConfig.java b/samples/sample-07/src/main/java/com/example/sample07/KafkaConsumerConfig.java deleted file mode 100644 index 2e5c4097c6..0000000000 --- a/samples/sample-07/src/main/java/com/example/sample07/KafkaConsumerConfig.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2022-2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.example.sample07; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; - -/** - * New consumer rebalance protocol sample which purpose is only to demonstrate the application - * of the New Consumer Rebalance Protocol in Spring Kafka. - * The consumer configuration should be created by manually instead of using application.properties or - * application.yaml. Because spring-kafka does not support group.protocol via application.properties, yet. - * - * @author Sanghyeok An. - * - * @since 3.3 - */ - -@EnableKafka -@Configuration -public class KafkaConsumerConfig { - - @Bean - public ConsumerFactory consumerFactory() { - final Map props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:10000"); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample07"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"); - return new DefaultKafkaConsumerFactory<>(props); - } - - @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - final ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); - return factory; - } - -} diff --git a/samples/sample-07/src/main/java/com/example/sample07/Sample07Application.java b/samples/sample-07/src/main/java/com/example/sample07/Sample07Application.java index 23d24aec74..d40aaf050c 100644 --- a/samples/sample-07/src/main/java/com/example/sample07/Sample07Application.java +++ b/samples/sample-07/src/main/java/com/example/sample07/Sample07Application.java @@ -18,6 +18,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.annotation.EnableKafka; /** * New consumer rebalance protocol sample which purpose is only to demonstrate the application @@ -28,6 +29,7 @@ * @since 3.3 */ +@EnableKafka @SpringBootApplication public class Sample07Application { diff --git a/samples/sample-07/src/main/java/com/example/sample07/Sample07KafkaListener.java b/samples/sample-07/src/main/java/com/example/sample07/Sample07KafkaListener.java index 4e2ed8a1ba..dbf4c95c29 100644 --- a/samples/sample-07/src/main/java/com/example/sample07/Sample07KafkaListener.java +++ b/samples/sample-07/src/main/java/com/example/sample07/Sample07KafkaListener.java @@ -22,8 +22,8 @@ /** * New consumer rebalance protocol sample which purpose is only to demonstrate the application * of the New Consumer Rebalance Protocol in Spring Kafka. - * Each consumer subscribe test-topic with different group id. then, new consumer rebalance protocol - * will be completed successfully. + * Each consumer will subscribe test-topic with different group id. + * Then, new consumer rebalance protocol will be completed successfully. * * @author Sanghyeok An. * diff --git a/samples/sample-07/src/main/resources/application.yaml b/samples/sample-07/src/main/resources/application.yaml index 570664d178..8e0f91b92a 100644 --- a/samples/sample-07/src/main/resources/application.yaml +++ b/samples/sample-07/src/main/resources/application.yaml @@ -7,3 +7,11 @@ spring: stop: command: down timeout: 10s + kafka: + consumer: + bootstrap-servers: localhost:10000 + group-id: sample07 + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + properties: + group.protocol: consumer