From 03b3f7c8228f0408472780faa56f0349a0c159c2 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 11 Jan 2024 15:46:51 -0500 Subject: [PATCH] GH-2978: Propagate number of partitions to KRaft broker Fixes: #2978 If we don't create topics manually, that can be done automatically on the broker side according to its configuration. For that goal the `EmbeddedKafkaKraftBroker` is missing to populate `KafkaConfig.NumPartitionsProp(): "" + this.partitionsPerTopic` broker property from `@EmbeddedKafka` configuration * Propagate `partitionsPerTopic` option down to the embedded broker(s) in the `EmbeddedKafkaKraftBroker` * Some other simple refactoring in the `EmbeddedKafkaKraftBroker` * Verify the option propagated via new unit test in the `KafkaTestUtilsTests.topicAutomaticallyCreatedWithProperNumberOfPartitions()` --- .../kafka/test/EmbeddedKafkaKraftBroker.java | 13 +++--- .../kafka/test/utils/KafkaTestUtilsTests.java | 46 ++++++++++++++----- 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 41d367943c..c414aa2e5f 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -199,7 +199,7 @@ public void setAdminTimeout(int adminTimeout) { public void afterPropertiesSet() { if (this.initialized.compareAndSet(false, true)) { overrideExitMethods(); - addDefaultBrokerPropsIfAbsent(this.brokerProperties, this.count); + addDefaultBrokerPropsIfAbsent(); start(); } } @@ -252,10 +252,11 @@ public void destroy() { this.cluster = null; } - private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int numBrokers) { - brokerConfig.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), "true"); - brokerConfig.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0"); - brokerConfig.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(numBrokers)); + private void addDefaultBrokerPropsIfAbsent() { + this.brokerProperties.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), "true"); + this.brokerProperties.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0"); + this.brokerProperties.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), "" + this.count); + this.brokerProperties.putIfAbsent(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic); } private void logDir(Properties brokerConfigProperties) { diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java index 0daf0bcf29..b8f12fde84 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import java.time.Duration; +import java.util.List; import java.util.Map; import org.apache.kafka.clients.admin.AdminClient; @@ -27,9 +28,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; import org.junit.jupiter.api.Test; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -37,15 +40,16 @@ /** * @author Gary Russell + * @author Artem Bilan * @since 2.2.7 * */ -@EmbeddedKafka(topics = { "singleTopic1", "singleTopic2", "singleTopic3", "singleTopic4", "singleTopic5", - "multiTopic1" }) +@EmbeddedKafka(topics = {"singleTopic1", "singleTopic2", "singleTopic3", "singleTopic4", "singleTopic5", + "multiTopic1"}) public class KafkaTestUtilsTests { @Test - void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) { + void testGetSingleWithMoreThanOneTopic(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); KafkaProducer producer = new KafkaProducer<>(producerProps); producer.send(new ProducerRecord<>("singleTopic1", 0, 1, "foo")); @@ -64,7 +68,7 @@ void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) { } @Test - void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker broker) { + void testGetSingleWithMoreThanOneTopicRecordNotThereYet(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); KafkaProducer producer = new KafkaProducer<>(producerProps); producer.send(new ProducerRecord<>("singleTopic4", 0, 1, "foo")); @@ -73,7 +77,7 @@ void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker brok broker.consumeFromEmbeddedTopics(consumer, "singleTopic4", "singleTopic5"); long t1 = System.currentTimeMillis(); assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> - KafkaTestUtils.getSingleRecord(consumer, "singleTopic5", Duration.ofSeconds(2))); + KafkaTestUtils.getSingleRecord(consumer, "singleTopic5", Duration.ofSeconds(2))); assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(2000L); producer.send(new ProducerRecord<>("singleTopic5", 1, "foo")); producer.close(); @@ -97,19 +101,19 @@ public void testGetOneRecord(EmbeddedKafkaBroker broker) throws Exception { assertThat(oneRecord.value()).isEqualTo("foo"); assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0)) .isNotNull() - .extracting(omd -> omd.offset()) + .extracting(OffsetAndMetadata::offset) .isEqualTo(1L); oneRecord = KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "getOne", "singleTopic3", 0, true, true, Duration.ofSeconds(10)); assertThat(oneRecord.value()).isEqualTo("foo"); assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0)) .isNotNull() - .extracting(omd -> omd.offset()) + .extracting(OffsetAndMetadata::offset) .isEqualTo(1L); } @Test - public void testMultiMinRecords(EmbeddedKafkaBroker broker) throws Exception { + public void testMultiMinRecords(EmbeddedKafkaBroker broker) { Map producerProps = KafkaTestUtils.producerProps(broker); KafkaProducer producer = new KafkaProducer<>(producerProps); producer.send(new ProducerRecord<>("multiTopic1", 0, 1, "foo")); @@ -135,16 +139,36 @@ public void testMultiMinRecords(EmbeddedKafkaBroker broker) throws Exception { public void testGetCurrentOffsetWithAdminClient(EmbeddedKafkaBroker broker) throws Exception { Map adminClientProps = Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); Map producerProps = KafkaTestUtils.producerProps(broker); - try (AdminClient adminClient = AdminClient.create(adminClientProps); KafkaProducer producer = new KafkaProducer<>(producerProps)) { + try (var adminClient = AdminClient.create(adminClientProps); var producer = new KafkaProducer<>(producerProps)) { producer.send(new ProducerRecord<>("singleTopic3", 0, 1, "foo")); KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "testGetCurrentOffsetWithAdminClient", "singleTopic3", 0, false, true, Duration.ofSeconds(10)); assertThat(KafkaTestUtils.getCurrentOffset(adminClient, "testGetCurrentOffsetWithAdminClient", "singleTopic3", 0)) .isNotNull() - .extracting(omd -> omd.offset()) + .extracting(OffsetAndMetadata::offset) .isEqualTo(1L); } + } + + @Test + public void topicAutomaticallyCreatedWithProperNumberOfPartitions(EmbeddedKafkaBroker broker) throws Exception { + Map producerProps = KafkaTestUtils.producerProps(broker); + + Map adminClientProps = + Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + try (var adminClient = AdminClient.create(adminClientProps); var producer = new KafkaProducer<>(producerProps)) { + producer.send(new ProducerRecord<>("auto-topic", "test data")).get(); + + List partitions = + adminClient.describeTopics(List.of("auto-topic")) + .allTopicNames() + .get() + .get("auto-topic") + .partitions(); + + assertThat(partitions).hasSize(2); + } }