Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/kafka #17

Merged
merged 4 commits into from
Aug 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9094
17 changes: 4 additions & 13 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
version: '3.9'

services:
email_server:
image: diegoneves/email-server:latest
restart: always
container_name: email_server
networks:
- pedido-bridge
ports:
- "8081:8081"

racha-pedido-app:
image: diegoneves/racha-pedido:latest
image: diegoneves/racha-pedido:beta
container_name: racha-pedido
ports:
- "8080:8080"
depends_on:
- email_server
networks:
- pedido-bridge
environment:
- EMAIL_HOST=email_server
- EMAIL_PORT=8081


networks:
pedido-bridge:
driver: bridge
- KAFKA_BOOTSTRAP_SERVERS=host.docker.internal:9094
extra_hosts:
- "host.docker.internal:172.17.0.1"
18 changes: 16 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,26 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.0</version>
<version>3.3.2</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
<version>3.2.0</version>
<version>3.2.8</version>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.10</version>
</dependency>


<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@EnableKafka
@SpringBootApplication
public class RachaPedidoApplication {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package diegosneves.github.rachapedido.brokers;

import diegosneves.github.rachapedido.model.NotificationEmail;
import diegosneves.github.rachapedido.service.contract.EmailServiceContract;
import diegosneves.github.rachapedido.utils.JsonAdapter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@EnableKafka
@Component
public class EmailSendConsumer {


private final EmailServiceContract emailService;
private final JsonAdapter jsonAdapter;

@Autowired
public EmailSendConsumer(EmailServiceContract emailService, JsonAdapter jsonAdapter) {
this.emailService = emailService;
this.jsonAdapter = jsonAdapter;
}

@KafkaListener(topics = "${spring.kafka.topics.send_email}", groupId = "${spring.kafka.consumer.group-id}")
public void sendEmail(ConsumerRecord<String, String> emailMessage) {
NotificationEmail notification = jsonAdapter.fromJson(emailMessage.value(), NotificationEmail.class);
this.emailService.sendEmail(notification);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package diegosneves.github.rachapedido.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

private final String bootstrapServers;
private final String groupId;

@Autowired
public KafkaConfig(@Value("${spring.kafka.bootstrap-servers}") String bootstrapServers, @Value("${spring.kafka.consumer.group-id}") String groupId) {
this.bootstrapServers = bootstrapServers;
this.groupId = groupId;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setMissingTopicsFatal(false);
return factory;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package diegosneves.github.rachapedido.infrastructure;

import diegosneves.github.rachapedido.utils.JsonAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final JsonAdapter jsonAdapter;

@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate, JsonAdapter jsonAdapter) {
this.kafkaTemplate = kafkaTemplate;
this.jsonAdapter = jsonAdapter;
}

public void send(String topic, String message) {
this.kafkaTemplate.send(topic, message);
}

public void convertObjectToJsonAndSend(String topic, Object object) {
this.kafkaTemplate.send(topic, jsonAdapter.toJson(object));
}

public void convertObjectToJsonAndSend(String topic, String key, Object object) {
this.kafkaTemplate.send(topic, key, jsonAdapter.toJson(object));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import diegosneves.github.rachapedido.dto.InvoiceDTO;
import diegosneves.github.rachapedido.enums.DiscountType;
import diegosneves.github.rachapedido.exceptions.CalculateInvoiceException;
import diegosneves.github.rachapedido.infrastructure.KafkaProducer;
import diegosneves.github.rachapedido.mapper.BuilderMapper;
import diegosneves.github.rachapedido.mapper.BuildingStrategy;
import diegosneves.github.rachapedido.mapper.NotificationEmailMapper;
import diegosneves.github.rachapedido.model.*;
import diegosneves.github.rachapedido.service.contract.EmailServiceContract;
import diegosneves.github.rachapedido.service.contract.InvoiceServiceContract;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.List;
Expand All @@ -35,11 +37,16 @@ public class InvoiceService implements InvoiceServiceContract {
private static final String CALCULATION_ERROR_MESSAGE = "Houve um problema ao calcular o valor total do pedido.";
private static final String NULL_PARAMETER_ERROR_MESSAGE = "Um dos parâmetros necessários para a operação de cálculo da fatura está ausente ou nulo.";
private static final String VOID = "";
private final EmailServiceContract emailService;

private final KafkaProducer kafkaProducer;


private final String emailNotificationTopic;

@Autowired
public InvoiceService(EmailServiceContract emailService) {
this.emailService = emailService;
public InvoiceService(KafkaProducer kafkaProducer, @Value("${spring.kafka.topics.send_email}") String emailNotificationTopic) {
this.kafkaProducer = kafkaProducer;
this.emailNotificationTopic = emailNotificationTopic;
}

@Override
Expand Down Expand Up @@ -85,7 +92,9 @@ private BillSplit statementForPayment(List<Invoice> unpaidInvoices, BankAccount
invoice.setPaymentLink(VOID);
}
});
notificationEmails.forEach(this.emailService::sendEmail);
for(NotificationEmail notificationEmail : notificationEmails) {
this.kafkaProducer.convertObjectToJsonAndSend(this.emailNotificationTopic, notificationEmail);
}
Double total = unpaidInvoices.stream().mapToDouble(Invoice::getTotalPayable).sum();
List<InvoiceDTO> invoiceDTOs = unpaidInvoices.stream().map(this::convertToInvoiceDTO).toList();
return BillSplit.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package diegosneves.github.rachapedido.utils;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.AllArgsConstructor;
import lombok.Builder;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
@Builder
@AllArgsConstructor
public class GsonAdapter implements JsonAdapter {

private Gson gson;

public GsonAdapter() {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.registerTypeAdapter(LocalDateTime.class, new GsonLocalDateTime());
this.gson = gsonBuilder.setPrettyPrinting().create();
}

@Override
public String toJson(Object obj) {
return this.gson.toJson(obj);
}

@Override
public <T> T fromJson(String json, Class<T> classOfT) {
return this.gson.fromJson(json, classOfT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package diegosneves.github.rachapedido.utils;

import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;

import java.lang.reflect.Type;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class GsonLocalDateTime implements JsonSerializer<LocalDateTime>, JsonDeserializer<LocalDateTime> {

@Override
public LocalDateTime deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
String ldtString = jsonElement.getAsString();
return LocalDateTime.parse(ldtString, DateTimeFormatter.ISO_LOCAL_DATE_TIME);
}

@Override
public JsonElement serialize(LocalDateTime localDateTime, Type type, JsonSerializationContext jsonSerializationContext) {
return new JsonPrimitive(localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package diegosneves.github.rachapedido.utils;

public interface JsonAdapter {

String toJson(Object obj);

<T> T fromJson(String json, Class<T> classOfT);

}
16 changes: 16 additions & 0 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ spring:
enable: true
ssl.trust: smtp.gmail.com

kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
listener:
missing-topics-fatal: false
consumer:
group-id: racha-pedido-consumer
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topics:
send_email: racha-pedido.send_email


springdoc:
swagger-ui:
path: /swagger-ui.html
Expand Down
Loading
Loading