Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore : add tests #637

Merged
merged 3 commits into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void publishPersonWithGender(CapturedOutput output) {
}

@Test
void concurrentPublishing(CapturedOutput output) throws Exception {
void concurrentPublishing(CapturedOutput output) {
int numberOfRequests = 10;
for (int i = 0; i < numberOfRequests; i++) {
this.mockMvcTester
Expand Down Expand Up @@ -138,7 +138,7 @@ void publishPersonWithEmptyName() {
.hasStatus(HttpStatus.BAD_REQUEST)
.bodyJson()
.convertTo(ProblemDetail.class)
.satisfies(problemDetail -> assertBadRequestProblem(problemDetail));
.satisfies(this::assertBadRequestProblem);
}

@Test
Expand All @@ -153,7 +153,7 @@ void publishPersonWithNegativeAge() {
.hasStatus(HttpStatus.BAD_REQUEST)
.bodyJson()
.convertTo(ProblemDetail.class)
.satisfies(problemDetail -> assertBadRequestProblem(problemDetail));
.satisfies(this::assertBadRequestProblem);
}

private void assertBadRequestProblem(ProblemDetail problemDetail, String expectedDetail) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway
interface KafkaGateway {
public interface KafkaGateway {

@Gateway(requestChannel = "toKafka.input")
void sendToKafka(String payload, @Header(KafkaHeaders.TOPIC) String topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ KafkaContainer kafkaContainer() {

@Bean
DynamicPropertyRegistrar kafkaProperties(KafkaContainer kafkaContainer) {
return (properties) -> {
properties.add("spring.kafka.bootstrapServers", kafkaContainer::getBootstrapServers);
};
return properties -> properties.add("spring.kafka.bootstrapServers", kafkaContainer::getBootstrapServers);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,80 @@
package com.example.integration.kafkadsl;

import static org.assertj.core.api.Assertions.assertThat;

import com.example.integration.kafkadsl.config.KafkaAppProperties;
import com.example.integration.kafkadsl.config.KafkaGateway;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;

@SpringBootTest(classes = ContainerConfiguration.class)
@SpringBootTest(classes = {ContainerConfiguration.class})
class KafkaDslApplicationTests {

@Autowired
KafkaGateway kafkaGateway;

@Autowired
KafkaAppProperties kafkaAppProperties;

@Autowired
KafkaTemplate<String, String> kafkaTemplate;

@Test
void sendMessageToKafka() {
String message = "test-message";
kafkaGateway.sendToKafka(message, kafkaAppProperties.topic());
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo(message);
}

@Test
void receiveMessageFromKafka() {
String message = "test-message";
kafkaTemplate.send(kafkaAppProperties.topic(), message);
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo(message);
}

@Test
void sendAndReceiveMultipleMessages() {
for (int i = 0; i < 10; i++) {
String message = "message" + i;
kafkaGateway.sendToKafka(message, kafkaAppProperties.topic());
}
for (int i = 0; i < 10; i++) {
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo("message" + i);
}
}

@Test
void sendMessageToNewTopic() {
String message = "new-topic-message";
kafkaGateway.sendToKafka(message, kafkaAppProperties.newTopic());
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo(message);
}

@Test
void receiveMessageFromNewTopic() {
String message = "new-topic-message";
kafkaTemplate.send(kafkaAppProperties.newTopic(), message);
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo(message);
}

@Test
void contextLoads() {}
void sendAndReceiveMultipleMessagesFromNewTopic() {
for (int i = 0; i < 10; i++) {
String message = "new-topic-message" + i;
kafkaGateway.sendToKafka(message, kafkaAppProperties.newTopic());
}
for (int i = 0; i < 10; i++) {
Message<?> received = kafkaGateway.receiveFromKafka();
assertThat(received.getPayload()).isEqualTo("new-topic-message" + i);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class KafkaSampleIntegrationTest {
private Receiver2 receiver2;

@Test
@Order(1)
@Order(101)
void sendAndReceiveMessage() throws Exception {
long initialCount = receiver2.getLatch().getCount();
this.mockMvcTester
Expand All @@ -57,7 +57,7 @@ void sendAndReceiveMessage() throws Exception {
}

@Test
@Order(2)
@Order(102)
void sendAndReceiveMessageInDeadLetter() throws Exception {
this.mockMvcTester
.post()
Expand All @@ -74,6 +74,7 @@ void sendAndReceiveMessageInDeadLetter() throws Exception {
}

@Test
@Order(51)
void topicsWithPartitionsCount() {
String expectedJson =
"""
Expand Down Expand Up @@ -122,6 +123,7 @@ void topicsWithPartitionsCount() {
}

@Test
@Order(1)
void getListOfContainers() {
String expectedJson =
"""
Expand All @@ -143,6 +145,7 @@ void getListOfContainers() {
}

@Test
@Order(2)
void stopAndStartContainers() throws Exception {
String expectedJson =
"""
Expand Down Expand Up @@ -178,6 +181,7 @@ void stopAndStartContainers() throws Exception {
}

@Test
@Order(3)
void invalidContainerOperation() throws Exception {
this.mockMvcTester
.post()
Expand Down Expand Up @@ -209,8 +213,15 @@ void invalidContainerOperation() throws Exception {
}

@Test
@Order(4)
void whenInvalidOperation_thenReturnsBadRequest() {
String invalidRequest = "{ \"containerId\": \"myListener\", \"operation\": \"INVALID\" }";
String invalidRequest =
"""
{
"containerId": "topic_2_Listener-dlt",
"operation": "INVALID"
}
""";

this.mockMvcTester
.post()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;

@TestConfiguration(proxyBeanMethods = false)
public class OrderListener {
Expand All @@ -18,7 +22,10 @@ public class OrderListener {
private final CountDownLatch latch = new CountDownLatch(1);
private final CountDownLatch dlqLatch = new CountDownLatch(1);

@RetryableTopic
@RetryableTopic(
attempts = "2",
backoff = @Backoff(delay = 1000, multiplier = 2.0),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "order-created", groupId = "notification")
public void notify(OrderRecord event) {
log.info(
Expand All @@ -32,12 +39,13 @@ public void notify(OrderRecord event) {
}

@DltHandler
public void notifyDLT(OrderRecord event) {
public void notifyDLT(OrderRecord event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.error(
"Order processing failed, received in DLT - OrderId: {}, Status: {}, Items: {}",
+event.id(),
"Order processing failed, received in DLT - OrderId: {}, Status: {}, Items: {} from topic: {}",
event.id(),
event.status(),
event.orderItems());
event.orderItems(),
topic);
dlqLatch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,27 @@ void shouldTriggerOrderCreatedEvent(Scenario scenario) {
.toArriveAndVerify(event ->
assertThat(event.orderItems().getFirst().productCode()).isEqualTo("Coffee"));
}

@Test
void shouldCreateOrderWithMultipleItems(Scenario scenario) {
when(kafkaOperations.send(any(), any(), any())).then(invocation -> {
log.info(
"Sending message key {}, value {} to {}.",
invocation.getArguments()[1],
invocation.getArguments()[2],
invocation.getArguments()[0]);
return CompletableFuture.completedFuture(new SendResult<>(null, null));
});

scenario.stimulate(() -> orders.saveOrder(new OrderRequest(
null,
List.of(
new OrderItemRequest("Coffee", BigDecimal.TEN, 100),
new OrderItemRequest("Tea", BigDecimal.valueOf(5), 50)))))
.andWaitForEventOfType(OrderRecord.class)
.toArriveAndVerify(event -> {
assertThat(event.orderItems().get(0).productCode()).isEqualTo("Coffee");
assertThat(event.orderItems().get(1).productCode()).isEqualTo("Tea");
});
}
}
Loading