Skip to content

Commit

Permalink
Merge pull request #17 from diegosneves/feat/kafka
Browse files Browse the repository at this point in the history
Feat/kafka
  • Loading branch information
diegosneves authored Aug 11, 2024
2 parents 1f53236 + 971f73f commit c5708f4
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 87 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9094
17 changes: 4 additions & 13 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
version: '3.9'

services:
email_server:
image: diegoneves/email-server:latest
restart: always
container_name: email_server
networks:
- pedido-bridge
ports:
- "8081:8081"

racha-pedido-app:
image: diegoneves/racha-pedido:latest
image: diegoneves/racha-pedido:beta
container_name: racha-pedido
ports:
- "8080:8080"
depends_on:
- email_server
networks:
- pedido-bridge
environment:
- EMAIL_HOST=email_server
- EMAIL_PORT=8081


networks:
pedido-bridge:
driver: bridge
- KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9094
extra_hosts:
- "host.docker.internal:172.17.0.1"
18 changes: 16 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,26 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.0</version>
<version>3.3.2</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
<version>3.2.0</version>
<version>3.2.8</version>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.10</version>
</dependency>


<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@EnableKafka
@SpringBootApplication
public class RachaPedidoApplication {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package diegosneves.github.rachapedido.brokers;

import diegosneves.github.rachapedido.model.NotificationEmail;
import diegosneves.github.rachapedido.service.contract.EmailServiceContract;
import diegosneves.github.rachapedido.utils.JsonAdapter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@EnableKafka
@Component
public class EmailSendConsumer {


private final EmailServiceContract emailService;
private final JsonAdapter jsonAdapter;

@Autowired
public EmailSendConsumer(EmailServiceContract emailService, JsonAdapter jsonAdapter) {
this.emailService = emailService;
this.jsonAdapter = jsonAdapter;
}

@KafkaListener(topics = "${spring.kafka.topics.send_email}", groupId = "${spring.kafka.consumer.group-id}")
public void sendEmail(ConsumerRecord<String, String> emailMessage) {
NotificationEmail notification = jsonAdapter.fromJson(emailMessage.value(), NotificationEmail.class);
this.emailService.sendEmail(notification);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package diegosneves.github.rachapedido.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

private final String bootstrapServers;
private final String groupId;

@Autowired
public KafkaConfig(@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers, @Value("${spring.kafka.consumer.group-id}") String groupId) {
this.bootstrapServers = bootstrapServers;
this.groupId = groupId;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setMissingTopicsFatal(false);
return factory;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package diegosneves.github.rachapedido.infrastructure;

import diegosneves.github.rachapedido.utils.JsonAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final JsonAdapter jsonAdapter;

@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, JsonAdapter jsonAdapter) {
this.kafkaTemplate = kafkaTemplate;
this.jsonAdapter = jsonAdapter;
}

public void send(String topic, String message) {
this.kafkaTemplate.send(topic, message);
}

public void convertObjectToJsonAndSend(String topic, Object object) {
this.kafkaTemplate.send(topic, jsonAdapter.toJson(object));
}

public void convertObjectToJsonAndSend(String topic, String key, Object object) {
this.kafkaTemplate.send(topic, key, jsonAdapter.toJson(object));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import diegosneves.github.rachapedido.dto.InvoiceDTO;
import diegosneves.github.rachapedido.enums.DiscountType;
import diegosneves.github.rachapedido.exceptions.CalculateInvoiceException;
import diegosneves.github.rachapedido.infrastructure.KafkaProducer;
import diegosneves.github.rachapedido.mapper.BuilderMapper;
import diegosneves.github.rachapedido.mapper.BuildingStrategy;
import diegosneves.github.rachapedido.mapper.NotificationEmailMapper;
import diegosneves.github.rachapedido.model.*;
import diegosneves.github.rachapedido.service.contract.EmailServiceContract;
import diegosneves.github.rachapedido.service.contract.InvoiceServiceContract;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.List;
Expand All @@ -35,11 +37,16 @@ public class InvoiceService implements InvoiceServiceContract {
private static final String CALCULATION_ERROR_MESSAGE = "Houve um problema ao calcular o valor total do pedido.";
private static final String NULL_PARAMETER_ERROR_MESSAGE = "Um dos parâmetros necessários para a operação de cálculo da fatura está ausente ou nulo.";
private static final String VOID = "";
private final EmailServiceContract emailService;

private final KafkaProducer kafkaProducer;


private final String emailNotificationTopic;

@Autowired
public InvoiceService(EmailServiceContract emailService) {
this.emailService = emailService;
public InvoiceService(KafkaProducer kafkaProducer, @Value("${spring.kafka.topics.send_email}") String emailNotificationTopic) {
this.kafkaProducer = kafkaProducer;
this.emailNotificationTopic = emailNotificationTopic;
}

@Override
Expand Down Expand Up @@ -85,7 +92,9 @@ private BillSplit statementForPayment(List<Invoice> unpaidInvoices, BankAccount
invoice.setPaymentLink(VOID);
}
});
notificationEmails.forEach(this.emailService::sendEmail);
for(NotificationEmail notificationEmail : notificationEmails) {
this.kafkaProducer.convertObjectToJsonAndSend(this.emailNotificationTopic, notificationEmail);
}
Double total = unpaidInvoices.stream().mapToDouble(Invoice::getTotalPayable).sum();
List<InvoiceDTO> invoiceDTOs = unpaidInvoices.stream().map(this::convertToInvoiceDTO).toList();
return BillSplit.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package diegosneves.github.rachapedido.utils;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.AllArgsConstructor;
import lombok.Builder;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
@Builder
@AllArgsConstructor
public class GsonAdapter implements JsonAdapter {

private Gson gson;

public GsonAdapter() {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapter(LocalDateTime.class, new GsonLocalDateTime());
this.gson = gsonBuilder.setPrettyPrinting().create();
}

@Override
public String toJson(Object obj) {
return this.gson.toJson(obj);
}

@Override
public <T> T fromJson(String json, Class<T> classOfT) {
return this.gson.fromJson(json, classOfT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package diegosneves.github.rachapedido.utils;

import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;

import java.lang.reflect.Type;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class GsonLocalDateTime implements JsonSerializer<LocalDateTime>, JsonDeserializer<LocalDateTime> {

@Override
public LocalDateTime deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
String ldtString = jsonElement.getAsString();
return LocalDateTime.parse(ldtString, DateTimeFormatter.ISO_LOCAL_DATE_TIME);
}

@Override
public JsonElement serialize(LocalDateTime localDateTime, Type type, JsonSerializationContext jsonSerializationContext) {
return new JsonPrimitive(localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package diegosneves.github.rachapedido.utils;

public interface JsonAdapter {

String toJson(Object obj);

<T> T fromJson(String json, Class<T> classOfT);

}
16 changes: 16 additions & 0 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ spring:
enable: true
ssl.trust: smtp.gmail.com

kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
listener:
missing-topics-fatal: false
consumer:
group-id: racha-pedido-consumer
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topics:
send_email: racha-pedido.send_email


springdoc:
swagger-ui:
path: /swagger-ui.html
Expand Down
Loading

0 comments on commit c5708f4

Please sign in to comment.