It is super simple to implement basic Kafka Producer and Consumer with the help of Spring.
- Download and install Apache Kafka
- Start Zookeeper using command:
./zookeeper-server-start.sh ../config/zookeeper.properties
- Start a broker using command:
./kafka-server-start.sh ../config/server.properties
- Create topic using command:
./kafka-topics.sh --zookeeper localhost:2181 --create --topic MyTopic --partitions 2 --replication-factor 1
That's it, your single node Kafka cluster is ready to use. Apache Kafka comes with two useful scripts which are running console producer and consumer. These are useful to quickly validate the setup.
- Run command
./kafka-console-producer.sh --broker-list localhost:9092 --topic MyTopic
and start sending messages
- Run command
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic MyTopic
and start consuming messages
Spring has made it super easy to configure and use Apache Kafka. Following is snippet of dependencies specified in pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
- spring-kafka dependency makes it easy to use Kafka in Java application.
- lombok dependency is not mandatory. However I am using it to get configuration driven logger.
Following is the application.yml file. Notice Spring's Kafka support wherein you need to specify producer and consumer properties.
- bootstrap-servers: List of brokers in the cluster
- key-serializer/ value-serializer: Every message over Kafka is sent in the form of array of bytes. This means the producer and consumer needs to serialize and deserialize actual message from array of bytes. We can specify serializer class details here.
- key-deserializer/ value-deserializer: Deserialization class details
- group-id: We can configure multiple consumers consuming records from same topic. group-id makes a provision to logically group consumers who are going to read from same topic. Max number of consumers cannot exceed number of partitions.
topicName: "MyTopic"
serializerClass: "org.apache.kafka.common.serialization.StringSerializer"
deserializerClass: "org.apache.kafka.common.serialization.StringDeserializer"
spring:
kafka:
bootstrap-servers: "localhost:9092"
producer:
key-serializer: ${serializerClass}
value-serializer: ${serializerClass}
consumer:
key-deserializer: ${deserializerClass}
value-deserializer: ${deserializerClass}
group-id: "MyGroup"
Kafka package already has serialization/ deserialization support for few basic data types like String, int, double. To keep it simple I am using String messages hence the configuration reflects corresponding Serialization and deserialization classes.
Use Kafka Template to send messages.
@Component
public class SimpleKafkaProducer {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Value("${topicName}")
private String topicName;
public void send(String message) {
kafkaTemplate.send(topicName, message);
}
}
Consuming messages is even more easier.
Just use annotation @KafkaListener(topics = "${topicName}")
and you are done.
@Component
@Slf4j
public class SimpleKafkaConsumer {
@KafkaListener(topics = "${topicName}")
void receive(String message) {
log.info("Received message: {}. It is received at {}", message, new Date().toString());
}
}