Series
This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.
- Part 1 - Overview
- Part 2 - Setting up Kafka
- Part 3 - Writing a Spring Boot Kafka Producer
- Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
- Part 5 - Displaying Cassandra Data With Spring Boot
Writing a Spring Boot Kafka Producer
We'll go over the steps necessary to write a simple producer for a kafka topic by using spring boot. The application will essentially be a simple proxy application and will receive a JSON containing the key that's going to be sent to kafka topic. Pretty simple but enough to get us going. We'll use IntelliJ IDEA to set everything up. The easiest way to get started is by using Spring Initializr.
Setting up a project
- Project SDK: Java 8
- Initializr Service URL: https://start.spring.io
- Next
- Name: spring-boot-kafka-example
- Type: Gradle Project
- Packaging: Jar
- Java Version: 1.8
- Language: Java
- Group: com.example
- Artifact: spring-boot-kafka-example
- Vesion: 0.0.1-SNAPSHOT
- Description: Spring Boot Kafka Example
- Package: com.example
- Next
- Spring Boot Version: 1.3
- Core - Web
- Next
- Project name: spring-boot-kafka-example
- The rest is just fine ...
- Finish
- After creating project check sdk setting, it should be java 8
build.gradle dependencies
compile('org.apache.kafka:kafka_2.11:0.9.0.0') compile('org.apache.zookeeper:zookeeper:3.4.7')
application.properties
brokerList=localhost:9092 sync=sync topic=votes
SpringBootKafkaProducer
This is the class where all the important stuff is happening
package com.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import java.util.Properties; import java.util.concurrent.ExecutionException; @Configuration public class SpringBootKafkaProducer { @Value("${brokerList}") private String brokerList; @Value("${sync}") private String sync; @Value("${topic}") private String topic; private Producer<String, String> producer; public SpringBootKafkaProducer() { } @PostConstruct public void initIt() { Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", brokerList); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("acks", "1"); kafkaProps.put("retries", "1"); kafkaProps.put("linger.ms", 5); producer = new KafkaProducer<>(kafkaProps); } public void send(String value) throws ExecutionException, InterruptedException { if ("sync".equalsIgnoreCase(sync)) { sendSync(value); } else { sendAsync(value); } } private void sendSync(String value) throws ExecutionException, InterruptedException { ProducerRecord<String, String> record = new ProducerRecord<>(topic, value); producer.send(record).get(); } private void sendAsync(String value) { ProducerRecord<String, String> record = new ProducerRecord<>(topic, value); producer.send(record, (RecordMetadata recordMetadata, Exception e) -> { if (e != null) { e.printStackTrace(); } }); } }
SpringBootKafkaExampleApplication
This one will be automatically generated.
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringBootKafkaExampleApplication { public static void main(String[] args) { SpringApplication.run(SpringBootKafkaExampleApplication.class, args); } }
AppBeans
Setup beans for the controller.
package com.example; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AppBeans { @Bean public SpringBootKafkaProducer initProducer() { return new SpringBootKafkaProducer(); } }
Helper beans
Status to return to clients, we'll just send "ok" every time.
package com.example; public class Status { private String status; public Status(String status) { this.status = status; } public Status() { } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } }This will be the input to our app
package com.example; public class Vote { private String name; public Vote(String name) { this.name = name; } public Vote() { } public String getName() { return name; } public void setName(String name) { this.name = name; } }
SpringBootKafkaController
This is the controller, after starting the app we should have an active endpoint available under http://localhost:8080/vote
package com.example; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.ExecutionException; @RestController public class SpringBootKafkaController { @Autowired SpringBootKafkaProducer springBootKafkaProducer; @RequestMapping("/vote") public Status vote(@RequestBody Vote vote) throws ExecutionException, InterruptedException { springBootKafkaProducer.send(vote.getName()); return new Status("ok"); } }
Checking everything
There should be an active console reader from previous post so we won't cover this. After running the SpringBootKafkaExampleApplication simply open a rest client application like Postman and try to send the following JSON to http://localhost:8080/vote
{ "name": "Test" }If everything was fine you should see the name that you send in this json in the console consumer. In Part 4 we are going to go over how to pickup the data from kafka with spark streaming, combine them with data in cassandra and push them back to cassandra.
10 comments:
What's the purpose of @Configuration annotation on SpringBootKafkaController class?
I probably missed it during refactoring ...
By any chance you have a github repo on the java codes?
I know it's kind of stupid from my side but I do want people to try it out themselves. If you are really interested send me an e-mail on msvaljek@gmail.com
I guess there will be more people interested ... here are the sources:
https://drive.google.com/open?id=0Bz9kDTTW0oRgWXdoTGFtM1dLelE
password?
password: hello
thank you Marko, it works very well. Do you have a consumer too?
Thank You Marko
Post a Comment