From f264514ef22d0bd2c8b5171718bf6caa37a0b4b9 Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Sat, 7 Dec 2024 06:00:58 +0000 Subject: [PATCH 1/5] feat : upgrade to spring boot 3.4 and polish tc config --- .../springbootkafkaavro/model/Person.java | 1 - .../spring-boot-kafka-avro-producer/pom.xml | 13 +-- ...tKafkaAvroProducerApplicationIntTests.java | 3 +- ...pringBootKafkaAvroProducerApplication.java | 104 +----------------- .../containers/KafkaContainersConfig.java | 73 ++++++++++++ .../KafkaRaftWithExtraListenersContainer.java | 61 ---------- 6 files changed, 82 insertions(+), 173 deletions(-) create mode 100644 kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java delete mode 100644 kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaRaftWithExtraListenersContainer.java diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/generated-avro/com/example/springbootkafkaavro/model/Person.java b/kafka-avro/spring-boot-kafka-avro-consumer/generated-avro/com/example/springbootkafkaavro/model/Person.java index f417ccc6..a449970b 100644 --- a/kafka-avro/spring-boot-kafka-avro-consumer/generated-avro/com/example/springbootkafkaavro/model/Person.java +++ b/kafka-avro/spring-boot-kafka-avro-consumer/generated-avro/com/example/springbootkafkaavro/model/Person.java @@ -5,7 +5,6 @@ */ package com.example.springbootkafkaavro.model; -import org.apache.avro.generic.GenericArray; import org.apache.avro.specific.SpecificData; import org.apache.avro.util.Utf8; import org.apache.avro.message.BinaryMessageEncoder; diff --git a/kafka-avro/spring-boot-kafka-avro-producer/pom.xml b/kafka-avro/spring-boot-kafka-avro-producer/pom.xml index 8203bbd6..35d9f9b2 100644 --- a/kafka-avro/spring-boot-kafka-avro-producer/pom.xml +++ b/kafka-avro/spring-boot-kafka-avro-producer/pom.xml @@ -6,7 +6,7 @@ org.springframework.boot spring-boot-starter-parent - 3.3.6 + 3.4.0 com.example @@ -18,8 +18,8 @@ 21 1.12.0 - 7.8.0 - 2.6.0 + 7.7.2 + 2.7.0 2.43.0 @@ -43,11 +43,6 @@ org.springframework.kafka spring-kafka - - org.glassfish.jaxb - jaxb-runtime - provided - org.springdoc @@ -154,7 +149,7 @@ - 1.19.2 + 1.25.0 diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroProducerApplicationIntTests.java b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroProducerApplicationIntTests.java index 46e3d99f..370a380a 100644 --- a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroProducerApplicationIntTests.java +++ b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroProducerApplicationIntTests.java @@ -6,6 +6,7 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import com.example.springbootkafkaavro.containers.KafkaContainersConfig; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; @@ -23,7 +24,7 @@ "spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer", "spring.kafka.properties.specific.avro.reader=true" }, - classes = TestSpringBootKafkaAvroProducerApplication.class) + classes = KafkaContainersConfig.class) @AutoConfigureMockMvc @Import(AvroKafkaListener.class) @ExtendWith(OutputCaptureExtension.class) diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/TestSpringBootKafkaAvroProducerApplication.java b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/TestSpringBootKafkaAvroProducerApplication.java index 201a4bf4..254a5cf3 100644 --- a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/TestSpringBootKafkaAvroProducerApplication.java +++ b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/TestSpringBootKafkaAvroProducerApplication.java @@ -1,111 +1,13 @@ package com.example.springbootkafkaavro; -import com.example.springbootkafkaavro.containers.KafkaRaftWithExtraListenersContainer; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.List; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; +import com.example.springbootkafkaavro.containers.KafkaContainersConfig; import org.springframework.boot.SpringApplication; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.boot.testcontainers.service.connection.ServiceConnection; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.DependsOn; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.testcontainers.DockerClientFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.wait.strategy.Wait; -@TestConfiguration(proxyBeanMethods = false) -public class TestSpringBootKafkaAvroProducerApplication { - - private static final String KAFKA_NETWORK = "kafka-network"; - - Network network = getNetwork(); - - static Network getNetwork() { - Network defaultDaprNetwork = - new Network() { - @Override - public String getId() { - return KAFKA_NETWORK; - } - - @Override - public void close() {} - - @Override - public Statement apply(Statement base, Description description) { - return null; - } - }; - - List networks = - DockerClientFactory.instance() - .client() - .listNetworksCmd() - .withNameFilter(KAFKA_NETWORK) - .exec(); - if (networks.isEmpty()) { - Network.builder() - .createNetworkCmdModifier(cmd -> cmd.withName(KAFKA_NETWORK)) - .build() - .getId(); - } - return defaultDaprNetwork; - } - - @Bean - @ServiceConnection - KafkaContainer kafkaContainer() { - return new KafkaRaftWithExtraListenersContainer("confluentinc/cp-kafka:7.7.1") - .withAdditionalListener(() -> "kafka:19092") - .withKraft() - .withNetwork(network) - .withNetworkAliases("kafka") - .withReuse(true); - } - - @Bean - @DependsOn("kafkaContainer") - GenericContainer schemaregistry(DynamicPropertyRegistry dynamicPropertyRegistry) { - GenericContainer schemaRegistry = - new GenericContainer<>("confluentinc/cp-schema-registry:7.7.1") - .withExposedPorts(8085) - .withNetworkAliases("schemaregistry") - .withNetwork(network) - .withEnv( - "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", - "PLAINTEXT://kafka:19092") - .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085") - .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry") - .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT") - .waitingFor(Wait.forHttp("/subjects")) - .withStartupTimeout(Duration.of(120, ChronoUnit.SECONDS)) - .withLabel("com.testcontainers.desktop.service", "cp-schema-registry"); - dynamicPropertyRegistry.add( - "spring.kafka.producer.properties.schema.registry.url", - () -> - "http://%s:%d" - .formatted( - schemaRegistry.getHost(), - schemaRegistry.getMappedPort(8085))); - dynamicPropertyRegistry.add( - "spring.kafka.properties.schema.registry.url", - () -> - "http://%s:%d" - .formatted( - schemaRegistry.getHost(), - schemaRegistry.getMappedPort(8085))); - - return schemaRegistry; - } +class TestSpringBootKafkaAvroProducerApplication { public static void main(String[] args) { SpringApplication.from(SpringBootKafkaAvroProducerApplication::main) - .with(TestSpringBootKafkaAvroProducerApplication.class) + .with(KafkaContainersConfig.class) .run(args); } } diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java new file mode 100644 index 00000000..8516cd9a --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java @@ -0,0 +1,73 @@ +package com.example.springbootkafkaavro.containers; + +import java.time.Duration; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.testcontainers.service.connection.ServiceConnection; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.DependsOn; +import org.springframework.test.context.DynamicPropertyRegistrar; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.utility.DockerImageName; + +@TestConfiguration(proxyBeanMethods = false) +public class KafkaContainersConfig { + + private final Network network = Network.newNetwork(); + private final String CONFLUENT_VERSION = "7.7.2"; + + @Bean + @ServiceConnection + ConfluentKafkaContainer kafkaContainer() { + ConfluentKafkaContainer confluentKafkaContainer = + new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_VERSION)) + .withListener("tc-kafka:19092") // Internal alias and port + .withNetwork(network) // Shared network for communication + .withNetworkAliases("tc-kafka") // Alias to match Schema Registry + .withReuse(true); + confluentKafkaContainer.start(); + return confluentKafkaContainer; + } + + @Bean + @DependsOn("kafkaContainer") + GenericContainer schemaRegistry() { + GenericContainer schemaRegistry = + new GenericContainer<>(DockerImageName.parse("confluentinc/cp-schema-registry").withTag(CONFLUENT_VERSION)) + .withExposedPorts(8085) + .withNetworkAliases("schemaregistry") // Alias for Schema Registry + .withNetwork(network) // Use the same network as Kafka + .withEnv( + "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + "PLAINTEXT://tc-kafka:19092") // Match Kafka alias and port + .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085") + .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT") + .waitingFor(Wait.forHttp("/subjects").forStatusCode(200)) + .withStartupTimeout(Duration.ofSeconds(60)); + schemaRegistry.start(); + return schemaRegistry; + } + + @Bean + DynamicPropertyRegistrar dynamicPropertyRegistrar(GenericContainer schemaRegistry) { + return dynamicProperty -> { + dynamicProperty.add( + "spring.kafka.producer.properties.schema.registry.url", + () -> + "http://%s:%d" + .formatted( + schemaRegistry.getHost(), + schemaRegistry.getMappedPort(8085))); + dynamicProperty.add( + "spring.kafka.properties.schema.registry.url", + () -> + "http://%s:%d" + .formatted( + schemaRegistry.getHost(), + schemaRegistry.getMappedPort(8085))); + }; + } +} diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaRaftWithExtraListenersContainer.java b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaRaftWithExtraListenersContainer.java deleted file mode 100644 index 742a66ca..00000000 --- a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaRaftWithExtraListenersContainer.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.example.springbootkafkaavro.containers; - -import com.github.dockerjava.api.command.InspectContainerResponse; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Supplier; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.images.builder.Transferable; -import org.testcontainers.utility.DockerImageName; - -public class KafkaRaftWithExtraListenersContainer extends KafkaContainer { - - private final List> listeners = new ArrayList<>(); - - public KafkaRaftWithExtraListenersContainer(String image) { - super(DockerImageName.parse(image)); - } - - @Override - protected void configure() { - super.configure(); - withEnv( - "KAFKA_LISTENERS", - "%s,%s".formatted("INTERNAL://0.0.0.0:19092", getEnvMap().get("KAFKA_LISTENERS"))); - withEnv( - "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", - "%s,%s" - .formatted( - "INTERNAL:PLAINTEXT", - getEnvMap().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"))); - getEnvMap() - .put( - "KAFKA_CONTROLLER_QUORUM_VOTERS", - "%s@%s:9094" - .formatted( - getEnvMap().get("KAFKA_NODE_ID"), - getNetwork() != null - ? listeners.getFirst().get().split(":")[0] - : "localhost")); - } - - @Override - protected void containerIsStarting(InspectContainerResponse containerInfo) { - String command = "#!/bin/bash\n"; - // exporting KAFKA_ADVERTISED_LISTENERS with the container hostname - command += - "export KAFKA_ADVERTISED_LISTENERS=%s,%s,%s\n" - .formatted( - "INTERNAL://%s".formatted(listeners.getFirst().get()), - getBootstrapServers(), - brokerAdvertisedListener(containerInfo)); - - command += "/etc/confluent/docker/run \n"; - copyFileToContainer(Transferable.of(command, 0777), "/testcontainers_start.sh"); - } - - public KafkaRaftWithExtraListenersContainer withAdditionalListener(Supplier listener) { - this.listeners.add(listener); - return this; - } -} From 17ef214ec02d6cfe1f72b77d489c7b2551200fd6 Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Sat, 7 Dec 2024 06:44:49 +0000 Subject: [PATCH 2/5] feat : improve --- .../spring-boot-kafka-avro-consumer/pom.xml | 18 +-- .../entity/PersonEntity.java | 37 ++++-- .../listener/AvroKafkaListener.java | 13 +- .../util/ApplicationConstants.java | 8 +- .../ApplicationIntTests.java | 40 +++++++ .../springbootkafkaavro/KafkaProducer.java | 6 +- ...BootKafkaAvroConsumerApplicationTests.java | 112 ------------------ ...pringBootKafkaAvroConsumerApplication.java | 13 ++ .../common/KafkaContainersConfig.java | 70 +++++++++++ .../springbootkafkaavro/model/Person.java | 1 - .../containers/KafkaContainersConfig.java | 8 +- 11 files changed, 176 insertions(+), 150 deletions(-) create mode 100644 kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/ApplicationIntTests.java delete mode 100644 kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroConsumerApplicationTests.java create mode 100644 kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/TestSpringBootKafkaAvroConsumerApplication.java create mode 100644 kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/common/KafkaContainersConfig.java diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/pom.xml b/kafka-avro/spring-boot-kafka-avro-consumer/pom.xml index b6299470..1d85c7ed 100644 --- a/kafka-avro/spring-boot-kafka-avro-consumer/pom.xml +++ b/kafka-avro/spring-boot-kafka-avro-consumer/pom.xml @@ -18,7 +18,7 @@ 21 1.12.0 - 7.8.0 + 7.7.2 2.7.0 2.43.0 @@ -66,11 +66,6 @@ h2 runtime - - org.projectlombok - lombok - true - org.apache.avro @@ -89,8 +84,8 @@ test - org.springframework.kafka - spring-kafka-test + org.springframework.boot + spring-boot-testcontainers test @@ -103,11 +98,6 @@ kafka test - - org.awaitility - awaitility - test - @@ -158,7 +148,7 @@ - 1.19.2 + 1.25.0 diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/entity/PersonEntity.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/entity/PersonEntity.java index 1f4c260f..5a133a23 100644 --- a/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/entity/PersonEntity.java +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/entity/PersonEntity.java @@ -6,17 +6,9 @@ import jakarta.persistence.GenerationType; import jakarta.persistence.Id; import jakarta.persistence.Table; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; @Entity @Table(name = "person_entity") -@Setter -@Getter -@AllArgsConstructor -@NoArgsConstructor public class PersonEntity { @Id @@ -27,4 +19,33 @@ public class PersonEntity { private String name; private Integer age; + + public PersonEntity() {} + + public Integer getAge() { + return age; + } + + public PersonEntity setAge(Integer age) { + this.age = age; + return this; + } + + public Long getId() { + return id; + } + + public PersonEntity setId(Long id) { + this.id = id; + return this; + } + + public String getName() { + return name; + } + + public PersonEntity setName(String name) { + this.name = name; + return this; + } } diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/listener/AvroKafkaListener.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/listener/AvroKafkaListener.java index d3a8b68d..fcc80355 100644 --- a/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/listener/AvroKafkaListener.java +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/listener/AvroKafkaListener.java @@ -4,25 +4,28 @@ import com.example.springbootkafkaavro.model.Person; import com.example.springbootkafkaavro.repository.PersonRepository; import com.example.springbootkafkaavro.util.ApplicationConstants; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; -@Slf4j @Component -@RequiredArgsConstructor public class AvroKafkaListener { + private static final Logger log = LoggerFactory.getLogger(AvroKafkaListener.class); private final PersonRepository personRepository; + public AvroKafkaListener(PersonRepository personRepository) { + this.personRepository = personRepository; + } + @KafkaListener(topics = ApplicationConstants.PERSONS_TOPIC, groupId = "group_id") public void handler(ConsumerRecord personConsumerRecord) { Person person = personConsumerRecord.value(); log.info("Person received : {} : {} ", person.getName(), person.getAge()); PersonEntity personEntity = - new PersonEntity(null, person.getName().toString(), person.getAge()); + new PersonEntity().setName(person.getName().toString()).setAge(person.getAge()); this.personRepository.save(personEntity); } } diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/util/ApplicationConstants.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/util/ApplicationConstants.java index 85ca4bc7..a72760a3 100644 --- a/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/util/ApplicationConstants.java +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/main/java/com/example/springbootkafkaavro/util/ApplicationConstants.java @@ -1,9 +1,5 @@ package com.example.springbootkafkaavro.util; -import lombok.experimental.UtilityClass; - -@UtilityClass -public class ApplicationConstants { - - public static final String PERSONS_TOPIC = "persons"; +public interface ApplicationConstants { + String PERSONS_TOPIC = "persons"; } diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/ApplicationIntTests.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/ApplicationIntTests.java new file mode 100644 index 00000000..d4d43024 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/ApplicationIntTests.java @@ -0,0 +1,40 @@ +package com.example.springbootkafkaavro; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import com.example.springbootkafkaavro.common.KafkaContainersConfig; +import com.example.springbootkafkaavro.model.Person; +import com.example.springbootkafkaavro.repository.PersonRepository; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Import; +import org.springframework.test.web.servlet.MockMvc; + +@SpringBootTest( + properties = { + "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", + "spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer" + }, + classes = {KafkaContainersConfig.class}) +@AutoConfigureMockMvc +@Import(KafkaProducer.class) +class ApplicationIntTests { + + @Autowired MockMvc mockMvc; + @Autowired PersonRepository personRepository; + @Autowired KafkaProducer kafkaProducer; + + @Test + void contextLoads() { + Person person = new Person(); + person.setAge(33); + person.setName("junit"); + this.kafkaProducer.sendMessage(person); + await().atMost(10, SECONDS) + .untilAsserted(() -> assertThat(personRepository.count()).isEqualTo(1)); + } +} diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/KafkaProducer.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/KafkaProducer.java index 08884ad7..31b8fb87 100644 --- a/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/KafkaProducer.java +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/KafkaProducer.java @@ -2,16 +2,18 @@ import com.example.springbootkafkaavro.model.Person; import com.example.springbootkafkaavro.util.ApplicationConstants; -import lombok.RequiredArgsConstructor; import org.springframework.boot.test.context.TestComponent; import org.springframework.kafka.core.KafkaTemplate; @TestComponent -@RequiredArgsConstructor public class KafkaProducer { private final KafkaTemplate kafkaTemplate; + public KafkaProducer(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + public void sendMessage(Person person) { this.kafkaTemplate.send( ApplicationConstants.PERSONS_TOPIC, person.getName().toString(), person); diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroConsumerApplicationTests.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroConsumerApplicationTests.java deleted file mode 100644 index 627359f7..00000000 --- a/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/SpringBootKafkaAvroConsumerApplicationTests.java +++ /dev/null @@ -1,112 +0,0 @@ -package com.example.springbootkafkaavro; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -import com.example.springbootkafkaavro.model.Person; -import com.example.springbootkafkaavro.repository.PersonRepository; -import java.time.Duration; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; -import org.springframework.test.web.servlet.MockMvc; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.utility.DockerImageName; - -@SpringBootTest( - properties = { - "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", - "spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer" - }) -@AutoConfigureMockMvc -@Import(KafkaProducer.class) -class SpringBootKafkaAvroConsumerApplicationTests { - - @Autowired MockMvc mockMvc; - @Autowired PersonRepository personRepository; - @Autowired KafkaProducer kafkaProducer; - - private static final Network KAFKA_NETWORK = Network.newNetwork(); - private static final String CONFLUENT_PLATFORM_VERSION = "7.7.1"; - private static final DockerImageName KAFKA_IMAGE = - DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION); - private static final KafkaContainer KAFKA = - new KafkaContainer(KAFKA_IMAGE) - .withNetwork(KAFKA_NETWORK) - .withKraft() - .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") - .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1"); - - private static final SchemaRegistryContainer SCHEMA_REGISTRY = - new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION) - .withStartupTimeout(Duration.ofMinutes(2)); - - static { - KAFKA.start(); - SCHEMA_REGISTRY.withKafka(KAFKA).start(); - // Should be set after container is started - SCHEMA_REGISTRY.withEnv("SCHEMA_REGISTRY_LISTENERS", SCHEMA_REGISTRY.getSchemaUrl()); - } - - @DynamicPropertySource - static void setProperties(DynamicPropertyRegistry registry) { - // Connect our Spring application to our Testcontainers Kafka instance - registry.add("spring.kafka.consumer.bootstrap-servers", KAFKA::getBootstrapServers); - registry.add("spring.kafka.producer.bootstrap-servers", KAFKA::getBootstrapServers); - registry.add( - "spring.kafka.producer.properties.schema.registry.url", - SCHEMA_REGISTRY::getSchemaUrl); - registry.add("spring.kafka.properties.schema.registry.url", SCHEMA_REGISTRY::getSchemaUrl); - } - - private static class SchemaRegistryContainer extends GenericContainer { - public static final String SCHEMA_REGISTRY_IMAGE = "confluentinc/cp-schema-registry"; - public static final int SCHEMA_REGISTRY_PORT = 8081; - - public SchemaRegistryContainer() { - this(CONFLUENT_PLATFORM_VERSION); - } - - public SchemaRegistryContainer(String version) { - super(DockerImageName.parse(SCHEMA_REGISTRY_IMAGE).withTag(CONFLUENT_PLATFORM_VERSION)); - - waitingFor(Wait.forHttp("/subjects").forStatusCode(200)); - withExposedPorts(SCHEMA_REGISTRY_PORT); - } - - public SchemaRegistryContainer withKafka(KafkaContainer kafka) { - return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":9092"); - } - - public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) { - withNetwork(network); - withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry"); - withEnv( - "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", - "PLAINTEXT://" + bootstrapServers); - return self(); - } - - public String getSchemaUrl() { - return "http://%s:%d".formatted(getHost(), getMappedPort(SCHEMA_REGISTRY_PORT)); - } - } - - @Test - void contextLoads() { - Person person = new Person(); - person.setAge(33); - person.setName("junit"); - this.kafkaProducer.sendMessage(person); - await().atMost(10, SECONDS) - .untilAsserted(() -> assertThat(personRepository.count()).isEqualTo(1)); - } -} diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/TestSpringBootKafkaAvroConsumerApplication.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/TestSpringBootKafkaAvroConsumerApplication.java new file mode 100644 index 00000000..6484f7d7 --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/TestSpringBootKafkaAvroConsumerApplication.java @@ -0,0 +1,13 @@ +package com.example.springbootkafkaavro; + +import com.example.springbootkafkaavro.common.KafkaContainersConfig; +import org.springframework.boot.SpringApplication; + +class TestSpringBootKafkaAvroConsumerApplication { + + public static void main(String[] args) { + SpringApplication.from(SpringBootKafkaAvroConsumerApplication::main) + .with(KafkaContainersConfig.class) + .run(args); + } +} diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/common/KafkaContainersConfig.java b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/common/KafkaContainersConfig.java new file mode 100644 index 00000000..300b06dd --- /dev/null +++ b/kafka-avro/spring-boot-kafka-avro-consumer/src/test/java/com/example/springbootkafkaavro/common/KafkaContainersConfig.java @@ -0,0 +1,70 @@ +package com.example.springbootkafkaavro.common; + +import java.time.Duration; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.boot.testcontainers.service.connection.ServiceConnection; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.DependsOn; +import org.springframework.test.context.DynamicPropertyRegistrar; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.utility.DockerImageName; + +@TestConfiguration(proxyBeanMethods = false) +public class KafkaContainersConfig { + + private final Network network = Network.newNetwork(); + private final String CONFLUENT_VERSION = "7.7.2"; + + @Bean + @ServiceConnection + ConfluentKafkaContainer kafkaContainer() { + return new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_VERSION)) + .withListener("tc-kafka:19092") // Internal alias and port + .withNetwork(network) // Shared network for communication + .withNetworkAliases("tc-kafka") // Alias to match Schema Registry + .withReuse(true); + } + + @Bean + @DependsOn("kafkaContainer") + GenericContainer schemaRegistry() { + return new GenericContainer<>( + DockerImageName.parse("confluentinc/cp-schema-registry") + .withTag(CONFLUENT_VERSION)) + .withExposedPorts(8085) + .withNetworkAliases("schemaregistry") // Alias for Schema Registry + .withNetwork(network) // Use the same network as Kafka + .withEnv( + "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + "PLAINTEXT://tc-kafka:19092") // Match Kafka alias and port + .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085") + .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT") + .waitingFor(Wait.forHttp("/subjects").forStatusCode(200)) + .withStartupTimeout(Duration.ofSeconds(60)); + } + + @Bean + DynamicPropertyRegistrar dynamicPropertyRegistrar(GenericContainer schemaRegistry) { + return dynamicProperty -> { + dynamicProperty.add( + "spring.kafka.producer.properties.schema.registry.url", + () -> + "http://%s:%d" + .formatted( + schemaRegistry.getHost(), + schemaRegistry.getMappedPort(8085))); + dynamicProperty.add( + "spring.kafka.properties.schema.registry.url", + () -> + "http://%s:%d" + .formatted( + schemaRegistry.getHost(), + schemaRegistry.getMappedPort(8085))); + }; + } +} diff --git a/kafka-avro/spring-boot-kafka-avro-producer/generated-avro/com/example/springbootkafkaavro/model/Person.java b/kafka-avro/spring-boot-kafka-avro-producer/generated-avro/com/example/springbootkafkaavro/model/Person.java index a1f01974..23232c2f 100644 --- a/kafka-avro/spring-boot-kafka-avro-producer/generated-avro/com/example/springbootkafkaavro/model/Person.java +++ b/kafka-avro/spring-boot-kafka-avro-producer/generated-avro/com/example/springbootkafkaavro/model/Person.java @@ -5,7 +5,6 @@ */ package com.example.springbootkafkaavro.model; -import org.apache.avro.generic.GenericArray; import org.apache.avro.specific.SpecificData; import org.apache.avro.util.Utf8; import org.apache.avro.message.BinaryMessageEncoder; diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java index 8516cd9a..e70630b3 100644 --- a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java +++ b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java @@ -22,7 +22,9 @@ public class KafkaContainersConfig { @ServiceConnection ConfluentKafkaContainer kafkaContainer() { ConfluentKafkaContainer confluentKafkaContainer = - new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_VERSION)) + new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka") + .withTag(CONFLUENT_VERSION)) .withListener("tc-kafka:19092") // Internal alias and port .withNetwork(network) // Shared network for communication .withNetworkAliases("tc-kafka") // Alias to match Schema Registry @@ -35,7 +37,9 @@ ConfluentKafkaContainer kafkaContainer() { @DependsOn("kafkaContainer") GenericContainer schemaRegistry() { GenericContainer schemaRegistry = - new GenericContainer<>(DockerImageName.parse("confluentinc/cp-schema-registry").withTag(CONFLUENT_VERSION)) + new GenericContainer<>( + DockerImageName.parse("confluentinc/cp-schema-registry") + .withTag(CONFLUENT_VERSION)) .withExposedPorts(8085) .withNetworkAliases("schemaregistry") // Alias for Schema Registry .withNetwork(network) // Use the same network as Kafka From ae74b46eef7d32dfb8b2dfb9d676301cb7e86153 Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Sat, 7 Dec 2024 12:15:47 +0530 Subject: [PATCH 3/5] Update confluent version --- kafka-avro/spring-boot-kafka-avro-consumer/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-avro/spring-boot-kafka-avro-consumer/pom.xml b/kafka-avro/spring-boot-kafka-avro-consumer/pom.xml index 1d85c7ed..7f040654 100644 --- a/kafka-avro/spring-boot-kafka-avro-consumer/pom.xml +++ b/kafka-avro/spring-boot-kafka-avro-consumer/pom.xml @@ -18,7 +18,7 @@ 21 1.12.0 - 7.7.2 + 7.8.0 2.7.0 2.43.0 From 88453d181a50f5a19d0b5ed58b235536651058b0 Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Sat, 7 Dec 2024 12:16:51 +0530 Subject: [PATCH 4/5] Update pom.xml --- kafka-avro/spring-boot-kafka-avro-producer/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-avro/spring-boot-kafka-avro-producer/pom.xml b/kafka-avro/spring-boot-kafka-avro-producer/pom.xml index 35d9f9b2..12bf440a 100644 --- a/kafka-avro/spring-boot-kafka-avro-producer/pom.xml +++ b/kafka-avro/spring-boot-kafka-avro-producer/pom.xml @@ -18,7 +18,7 @@ 21 1.12.0 - 7.7.2 + 7.8.0 2.7.0 2.43.0 From 74ab107737662b424733389049cd907678b4af93 Mon Sep 17 00:00:00 2001 From: Raja Kolli Date: Sat, 7 Dec 2024 06:51:52 +0000 Subject: [PATCH 5/5] let spring boot manage container lifecycle --- .../containers/KafkaContainersConfig.java | 47 ++++++++----------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java index e70630b3..b02f4f95 100644 --- a/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java +++ b/kafka-avro/spring-boot-kafka-avro-producer/src/test/java/com/example/springbootkafkaavro/containers/KafkaContainersConfig.java @@ -21,38 +21,31 @@ public class KafkaContainersConfig { @Bean @ServiceConnection ConfluentKafkaContainer kafkaContainer() { - ConfluentKafkaContainer confluentKafkaContainer = - new ConfluentKafkaContainer( - DockerImageName.parse("confluentinc/cp-kafka") - .withTag(CONFLUENT_VERSION)) - .withListener("tc-kafka:19092") // Internal alias and port - .withNetwork(network) // Shared network for communication - .withNetworkAliases("tc-kafka") // Alias to match Schema Registry - .withReuse(true); - confluentKafkaContainer.start(); - return confluentKafkaContainer; + return new ConfluentKafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_VERSION)) + .withListener("tc-kafka:19092") // Internal alias and port + .withNetwork(network) // Shared network for communication + .withNetworkAliases("tc-kafka") // Alias to match Schema Registry + .withReuse(true); } @Bean @DependsOn("kafkaContainer") GenericContainer schemaRegistry() { - GenericContainer schemaRegistry = - new GenericContainer<>( - DockerImageName.parse("confluentinc/cp-schema-registry") - .withTag(CONFLUENT_VERSION)) - .withExposedPorts(8085) - .withNetworkAliases("schemaregistry") // Alias for Schema Registry - .withNetwork(network) // Use the same network as Kafka - .withEnv( - "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", - "PLAINTEXT://tc-kafka:19092") // Match Kafka alias and port - .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085") - .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry") - .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT") - .waitingFor(Wait.forHttp("/subjects").forStatusCode(200)) - .withStartupTimeout(Duration.ofSeconds(60)); - schemaRegistry.start(); - return schemaRegistry; + return new GenericContainer<>( + DockerImageName.parse("confluentinc/cp-schema-registry") + .withTag(CONFLUENT_VERSION)) + .withExposedPorts(8085) + .withNetworkAliases("schemaregistry") // Alias for Schema Registry + .withNetwork(network) // Use the same network as Kafka + .withEnv( + "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", + "PLAINTEXT://tc-kafka:19092") // Match Kafka alias and port + .withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085") + .withEnv("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry") + .withEnv("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT") + .waitingFor(Wait.forHttp("/subjects").forStatusCode(200)) + .withStartupTimeout(Duration.ofSeconds(60)); } @Bean