Stream Processing With Spring, Kafka, Spark and Cassandra - Part 3


This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.

  1. Part 1 - Overview
  2. Part 2 - Setting up Kafka
  3. Part 3 - Writing a Spring Boot Kafka Producer
  4. Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
  5. Part 5 - Displaying Cassandra Data With Spring Boot

Writing a Spring Boot Kafka Producer

We'll go over the steps necessary to write a simple producer for a kafka topic by using spring boot. The application will essentially be a simple proxy application and will receive a JSON containing the key that's going to be sent to kafka topic. Pretty simple but enough to get us going. We'll use IntelliJ IDEA to set everything up. The easiest way to get started is by using Spring Initializr.

Setting up a project

  1. Project SDK: Java 8
  2. Initializr Service URL: https://start.spring.io
  3. Next
  4. Name: spring-boot-kafka-example
  5. Type: Gradle Project
  6. Packaging: Jar
  7. Java Version: 1.8
  8. Language: Java
  9. Group: com.example
  10. Artifact: spring-boot-kafka-example
  11. Vesion: 0.0.1-SNAPSHOT
  12. Description: Spring Boot Kafka Example
  13. Package: com.example
  14. Next
  15. Spring Boot Version: 1.3
  16. Core - Web
  17. Next
  18. Project name: spring-boot-kafka-example
  19. The rest is just fine ...
  20. Finish
  21. After creating project check sdk setting, it should be java 8

build.gradle dependencies





This is the class where all the important stuff is happening

package com.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class SpringBootKafkaProducer {

    private String brokerList;

    private String sync;

    private String topic;

    private Producer<String, String> producer;

    public SpringBootKafkaProducer() {

    public void initIt() {
        Properties kafkaProps = new Properties();

        kafkaProps.put("bootstrap.servers", brokerList);

        kafkaProps.put("acks", "1");

        kafkaProps.put("retries", "1");
        kafkaProps.put("linger.ms", 5);

        producer = new KafkaProducer<>(kafkaProps);


    public void send(String value) throws ExecutionException, 
            InterruptedException {
        if ("sync".equalsIgnoreCase(sync)) {
        } else {

    private void sendSync(String value) throws ExecutionException,
            InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);


    private void sendAsync(String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);

        producer.send(record, (RecordMetadata recordMetadata, Exception e) -> {
            if (e != null) {


This one will be automatically generated.

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

public class SpringBootKafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaExampleApplication.class, args);


Setup beans for the controller.

package com.example;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

public class AppBeans {

    public SpringBootKafkaProducer initProducer() {
        return new SpringBootKafkaProducer();

Helper beans

Status to return to clients, we'll just send "ok" every time.

package com.example;

public class Status {
    private String status;

    public Status(String status) {
        this.status = status;

    public Status() {

    public String getStatus() {
        return status;

    public void setStatus(String status) {
        this.status = status;
This will be the input to our app
package com.example;

public class Vote {
    private String name;

    public Vote(String name) {
        this.name = name;

    public Vote() {

    public String getName() {
        return name;

    public void setName(String name) {
        this.name = name;


This is the controller, after starting the app we should have an active endpoint available under http://localhost:8080/vote

package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

public class SpringBootKafkaController {

    SpringBootKafkaProducer springBootKafkaProducer;

    public Status vote(@RequestBody Vote vote) throws ExecutionException, InterruptedException {


        return new Status("ok");


Checking everything

There should be an active console reader from previous post so we won't cover this. After running the SpringBootKafkaExampleApplication simply open a rest client application like Postman and try to send the following JSON to http://localhost:8080/vote

    "name": "Test"
If everything was fine you should see the name that you send in this json in the console consumer. In Part 4 we are going to go over how to pickup the data from kafka with spark streaming, combine them with data in cassandra and push them back to cassandra.


Lukasz said...

What's the purpose of @Configuration annotation on SpringBootKafkaController class?

Marko Švaljek said...

I probably missed it during refactoring ...

Teik Hooi Beh said...

By any chance you have a github repo on the java codes?

Marko Švaljek said...

I know it's kind of stupid from my side but I do want people to try it out themselves. If you are really interested send me an e-mail on msvaljek@gmail.com

Marko Švaljek said...

I guess there will be more people interested ... here are the sources:


Ayman Elgharabawy said...


Marko Švaljek said...

password: hello

Arash Taheri said...
This comment has been removed by the author.
Arash Taheri said...

thank you Marko, it works very well. Do you have a consumer too?

Siahmed Halim said...

Thank You Marko