Category Archives: Kafka

Using Apache Kafka With Spring Boot

In this post, I will show how we can integrate Apache Kafka with a Spring Boot application. I will also show how we can send and consume messages from our application.

What is Apache Kafka?

I previously wrote an introductory post about kafka. But if you still don’t know anything about Kafka, then this will be a good summary.

Kafka is a stream processing platform, currently available as open-source software from Apache. Kafka provides low latency ingestion of large amounts of data.

Nevertheless, one key advantage of Kafka is it allows to move large amounts of data and process it in real-time. Kafka takes into account horizontal scaling, which means it can be scaled by increasing more brokers in Kafka cluster.

Kafka Terminology

So, Kafka has its own terminology. But, it is also easy to understand if you are starting up.

  1. Producer – A Producer is a client that produces a message and sends it to Kafka Server on a specified topic.
  2. Consumer – A Consumer is a client that consumes the message from Kafka topic.
  3. Cluster – Kafka is a distributed system of brokers. Multiple brokers make a cluster.
  4. Broker – Kafka broker can create a Kafka cluster by sharing information between each other directly or indirectly through zookeeper. Therefore, a broker receives a message from the producer and the consumer fetches this message from the broker by topic, partition and offset.
  5. Topic – A topic is a category name to which the producer sends messages to and consumer consumes the messages from.
  6. Partition – Messages to a topic are spread across kafka clusters into several partitions.
  7. Offset – Offset is a pointer to the last message that Kafka has sent to the consumer.

Setting Up Spring Boot Application

As part of this post, I will show how we can use Apache Kafka with a Spring Boot application.

We will run a Kafka Server on the machine and our application will send a message through the producer to a topic. Part of the application will consume this message through the consumer.

To start with, we will need a Kafka dependency in our project.


implementation 'org.springframework.kafka:spring-kafka:2.5.2'

We will have a REST controller which will basically take a JSON message and send it to Kafka topic using Kafka Producer.


package com.betterjavacode.kafkademo.resource;

import com.betterjavacode.kafkademo.model.Company;
import com.betterjavacode.kafkademo.producers.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping(value = "/v1/kafka")
public class KafkaRestController
{
    private final KafkaProducer kafkaProducer;

    @Autowired
    public KafkaRestController(KafkaProducer kafkaProducer)
    {
        this.kafkaProducer = kafkaProducer;
    }

    @PostMapping(value = "/send", consumes={"application/json"}, produces = {"application/json"})
    public void sendMessageToKafkaTopic(@RequestBody Company company)
    {
        this.kafkaProducer.sendMessage(company);
    }
}

This KafkaProducer uses KafkaTemplate to send the message to a topic. KafkaProducer is a service that we created for this application.


@Service
public class KafkaProducer
{
    private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
    private static final String topic = "companies";

    @Autowired
    private KafkaTemplate<String, Company> kafkaTemplate;

    public void sendMessage(Company company)
    {
        logger.info(String.format("Outgoing Message - Producing -> %s", company));
        this.kafkaTemplate.send(topic, company);
    }
}

Similarly, we will have our consumer.


@Service
public class KafkaConsumer
{
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = "companies", groupId = "group_id")
    public void consume(String company)
    {
        logger.info(String.format("Incoming Message - Consuming -> %s", company));
    }
}

Our consumer is using KafkaListener which allows us to listen to a topic. When a message comes to this topic, the listener alerts the consumer and the consumer picks up that message.

Kafka Configuration

Before, showing how we will send and consume these messages, we still need to configure kafka cluster in our application. There are also some other properties our producer and consumer will need.

Basically, there are two ways to configure these properties. Either programmatically OR through YML configuration. For our application, I have configured this through yaml configuration file.


server:
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        spring.json.trusted.packages: "com.betterjavacode.kafkademo.model"
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

bootstrap-servers – Host and Port on which Kafka Server is running
key-deserializer – class to use to deserialize the key of the message that the consumer consumes
value-deserializer – class to use to deserialize the value of the message that the consumer consumes
key-serializer – Serializer class to serialize the key of the message that the producer produces
value-serializer – Serializer class to serialize the value of the message
group-id – This specifies the group to which the consumer belongs to

auto-offset-reset – Each message in a partition of a Kafka topic has a unique identifier which is the offset. This setting automatically resets the offset to the earliest.

Before we can send messages, our topic must exist. I will manually create this topic and show in the next section. In the Spring Boot application, we can create a KafkaAdmin bean and use it to create topics as well.

Running Kafka Server

As part of this demo, I will also show how we can start our first instance of Kafka server. Download the Kafka from here.

Once you download and extract in a directory, we will have to edit two properties in zookeeper.properties and server.properties.

Update zookeeper.properties with data directory (dataDir) where you want a zookeeper to store data.

Start the zookeeper – zookeeper-server-start.bat ../../config/zookeeper.properties

Update server.properties with logs directory.

Start the server – kafka-server-start.bat ../../config/server.properties

Now to create a topic, we can run another command prompt and use this command
kafka-topics.bat –create –topic companies–bootstrap-server localhost:9092  from the directory where we installed kafka.

Sending and Consuming Messages

So far, we have created a Spring Boot application with Kafka. We have configured our Kafka server. Now comes the moment to send and consume messages to Kafka topic.

We have a REST API to send a message about a company. This message then Kafka Producer will send it to Kafka Topic companies. A Kafka consumer will read this message from the topic as it will be listening to it.

Use Apache Kafka With Spring Boot - REST API

Once this message is sent, we can take a look at our application server log as it will show about producing this message through Kafka Producer.

At the same time, our Consumer is listening. So, it will pick up this message to consume.

 

Conclusion

So far, we have shown how we can use Apache Kafka with the Spring Boot application. With data in applications is increasing, it becomes important to stream with low latency. The code repository for this demo application is here.

If you want me to cover anything else with Kafka, feel free to comment on this post and I will cover more details about Kafka in upcoming posts.

All about Kafka Streaming

Lately, I have been hearing a lot about Kafka streaming. Even though I have worked on microservices, I haven’t really tackled heavy data applications. In my previous experience, where we did deal with heavy data for health insurance benefits, it was very different.

Now with Netflix and Amazon, data streaming has become a major target. With growing technology and information, it has become even more important to tackle the growing data. In simple terms, all web applications should be able to process large data sets with improved performance. Data sets size should not deter applications usage.

What is Kafka Data Streaming?

Firstly we used to process large data in batch, but that is not continuous processing and sometimes, it doesn’t work in real-time scenarios for applications. Like Netflix, batch processing will never work. What’s the alternative? The data streaming. Data streaming is a process of sending data sets continuously. This process is the backbone for applications like Netflix and Amazon. Also the growing social network platforms, data streaming is at the heart of handling large data.

The streamed data is often used for real-time aggregation and correlation, filtering, or sampling. One of the major benefits of data streaming is that it allows us to view and analyze data in real-time.

Few challenges that data streaming often faces

  1. Scalability
  2. Durability of data
  3. Fault Tolerance

Tools for Data Streaming

There are a bunch of tools that are available for data streaming. Amazon offers Kinesis, Apache has few open-source tools like Kafka, Storm, and Flink. Similarly, in future posts, I will talk more about Apache Kafka and its usage. Here I am just giving a brief idea about Apache Kafka.

Apache Kafka Streaming

Apache Kafka is a real-time distributed streaming platform. Basically it allows us to publish and subscribe to streams of records.

There are two main usages where Apache Kafka is used:

  1. Building data pipelines where records are streamed continuously.
  2. Building applications that can consume data pipelines and react accordingly

Above all, the basic idea that Kafka has adapted is from Hadoop. It runs as a cluster of one or more servers that can span multiple data centers. These clusters store data streams. Each record in the stream comprised of key, value, and timestamp. Kafka provides four main APIs Producer, Consumer, Streams, Connector.

Similarly, in future posts, I will break down these APIs in detail with their usage in a sample application.

References

  1. Apache Kafka – Kafka
  2. Data Streaming – Data Streaming
  3. Stream Processing – Stream Processing