In this post, I will show how we can integrate Apache Kafka with a Spring Boot application. I will also show how we can send and consume messages from our application.
What is Apache Kafka?
I previously wrote an introductory post about kafka. But if you still don’t know anything about Kafka, then this will be a good summary.
Kafka is a stream processing platform, currently available as open-source software from Apache. Kafka provides low latency ingestion of large amounts of data.
Nevertheless, one key advantage of Kafka is it allows to move large amounts of data and process it in real-time. Kafka takes into account horizontal scaling, which means it can be scaled by increasing more brokers in Kafka cluster.
Kafka Terminology
So, Kafka has its own terminology. But, it is also easy to understand if you are starting up.
- Producer – A Producer is a client that produces a message and sends it to Kafka Server on a specified topic.
- Consumer – A Consumer is a client that consumes the message from Kafka topic.
- Cluster – Kafka is a distributed system of brokers. Multiple brokers make a cluster.
- Broker – Kafka broker can create a Kafka cluster by sharing information between each other directly or indirectly through zookeeper. Therefore, a broker receives a message from the producer and the consumer fetches this message from the broker by topic, partition and offset.
- Topic – A topic is a category name to which the producer sends messages to and consumer consumes the messages from.
- Partition – Messages to a topic are spread across kafka clusters into several partitions.
- Offset – Offset is a pointer to the last message that Kafka has sent to the consumer.
Setting Up Spring Boot Application
As part of this post, I will show how we can use Apache Kafka with a Spring Boot application.
We will run a Kafka Server on the machine and our application will send a message through the producer to a topic. Part of the application will consume this message through the consumer.
To start with, we will need a Kafka dependency in our project.
implementation 'org.springframework.kafka:spring-kafka:2.5.2'
We will have a REST controller which will basically take a JSON message and send it to Kafka topic using Kafka Producer.
package com.betterjavacode.kafkademo.resource;
import com.betterjavacode.kafkademo.model.Company;
import com.betterjavacode.kafkademo.producers.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping(value = "/v1/kafka")
public class KafkaRestController
{
private final KafkaProducer kafkaProducer;
@Autowired
public KafkaRestController(KafkaProducer kafkaProducer)
{
this.kafkaProducer = kafkaProducer;
}
@PostMapping(value = "/send", consumes={"application/json"}, produces = {"application/json"})
public void sendMessageToKafkaTopic(@RequestBody Company company)
{
this.kafkaProducer.sendMessage(company);
}
}
This KafkaProducer
uses KafkaTemplate to send the message to a topic. KafkaProducer is a service that we created for this application.
@Service
public class KafkaProducer
{
private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
private static final String topic = "companies";
@Autowired
private KafkaTemplate<String, Company> kafkaTemplate;
public void sendMessage(Company company)
{
logger.info(String.format("Outgoing Message - Producing -> %s", company));
this.kafkaTemplate.send(topic, company);
}
}
Similarly, we will have our consumer.
@Service
public class KafkaConsumer
{
private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "companies", groupId = "group_id")
public void consume(String company)
{
logger.info(String.format("Incoming Message - Consuming -> %s", company));
}
}
Our consumer is using KafkaListener
which allows us to listen to a topic. When a message comes to this topic, the listener alerts the consumer and the consumer picks up that message.
Kafka Configuration
Before, showing how we will send and consume these messages, we still need to configure kafka cluster in our application. There are also some other properties our producer and consumer will need.
Basically, there are two ways to configure these properties. Either programmatically OR through YML configuration. For our application, I have configured this through yaml configuration file.
server:
port: 9000
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
spring.json.trusted.packages: "com.betterjavacode.kafkademo.model"
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
bootstrap-servers – Host and Port on which Kafka Server is running
key-deserializer – class to use to deserialize the key of the message that the consumer consumes
value-deserializer – class to use to deserialize the value of the message that the consumer consumes
key-serializer – Serializer class to serialize the key of the message that the producer produces
value-serializer – Serializer class to serialize the value of the message
group-id – This specifies the group to which the consumer belongs to
auto-offset-reset – Each message in a partition of a Kafka topic has a unique identifier which is the offset
. This setting automatically resets the offset to the earliest.
Before we can send messages, our topic must exist. I will manually create this topic and show in the next section. In the Spring Boot application, we can create a KafkaAdmin
bean and use it to create topics as well.
Running Kafka Server
As part of this demo, I will also show how we can start our first instance of Kafka server. Download the Kafka from here.
Once you download and extract in a directory, we will have to edit two properties in zookeeper.properties
and server.properties
.
Update zookeeper.properties
with data directory (dataDir) where you want a zookeeper to store data.
Start the zookeeper – zookeeper-server-start.bat ../../config/zookeeper.properties
Update server.properties
with logs directory.
Start the server – kafka-server-start.bat ../../config/server.properties
Now to create a topic, we can run another command prompt and use this command
kafka-topics.bat –create –topic companies–bootstrap-server localhost:9092
from the directory where we installed kafka.
Sending and Consuming Messages
So far, we have created a Spring Boot application with Kafka. We have configured our Kafka server. Now comes the moment to send and consume messages to Kafka topic.
We have a REST API to send a message about a company. This message then Kafka Producer will send it to Kafka Topic companies
. A Kafka consumer will read this message from the topic as it will be listening to it.
Once this message is sent, we can take a look at our application server log as it will show about producing this message through Kafka Producer.
At the same time, our Consumer is listening. So, it will pick up this message to consume.
Conclusion
So far, we have shown how we can use Apache Kafka with the Spring Boot application. With data in applications is increasing, it becomes important to stream with low latency. The code repository for this demo application is here.
If you want me to cover anything else with Kafka, feel free to comment on this post and I will cover more details about Kafka in upcoming posts.