Series
This blog entry is part of a series called Stream Processing With Spring, Kafka, Spark and Cassandra.
- Part 1 - Overview
- Part 2 - Setting up Kafka
- Part 3 - Writing a Spring Boot Kafka Producer
- Part 4 - Consuming Kafka data with Spark Streaming and Output to Cassandra
- Part 5 - Displaying Cassandra Data With Spring Boot
Writing a Spring Boot Kafka Producer
We'll go over the steps necessary to write a simple producer for a kafka topic by using spring boot.
The application will essentially be a simple proxy application and will receive a JSON containing
the key that's going to be sent to kafka topic. Pretty simple but enough to get us going. We'll use
IntelliJ IDEA to set everything up. The easiest way to get started is by using Spring Initializr.
Setting up a project
- Project SDK: Java 8
- Initializr Service URL: https://start.spring.io
- Next
- Name: spring-boot-kafka-example
- Type: Gradle Project
- Packaging: Jar
- Java Version: 1.8
- Language: Java
- Group: com.example
- Artifact: spring-boot-kafka-example
- Vesion: 0.0.1-SNAPSHOT
- Description: Spring Boot Kafka Example
- Package: com.example
- Next
- Spring Boot Version: 1.3
- Core - Web
- Next
- Project name: spring-boot-kafka-example
- The rest is just fine ...
- Finish
- After creating project check sdk setting, it should be java 8
build.gradle dependencies
compile('org.apache.kafka:kafka_2.11:0.9.0.0')
compile('org.apache.zookeeper:zookeeper:3.4.7')
application.properties
brokerList=localhost:9092
sync=sync
topic=votes
SpringBootKafkaProducer
This is the class where all the important stuff is happening
package com.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@Configuration
public class SpringBootKafkaProducer {
@Value("${brokerList}")
private String brokerList;
@Value("${sync}")
private String sync;
@Value("${topic}")
private String topic;
private Producer<String, String> producer;
public SpringBootKafkaProducer() {
}
@PostConstruct
public void initIt() {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", brokerList);
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("acks", "1");
kafkaProps.put("retries", "1");
kafkaProps.put("linger.ms", 5);
producer = new KafkaProducer<>(kafkaProps);
}
public void send(String value) throws ExecutionException,
InterruptedException {
if ("sync".equalsIgnoreCase(sync)) {
sendSync(value);
} else {
sendAsync(value);
}
}
private void sendSync(String value) throws ExecutionException,
InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
producer.send(record).get();
}
private void sendAsync(String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
producer.send(record, (RecordMetadata recordMetadata, Exception e) -> {
if (e != null) {
e.printStackTrace();
}
});
}
}
SpringBootKafkaExampleApplication
This one will be automatically generated.
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootKafkaExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaExampleApplication.class, args);
}
}
AppBeans
Setup beans for the controller.
package com.example;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AppBeans {
@Bean
public SpringBootKafkaProducer initProducer() {
return new SpringBootKafkaProducer();
}
}
Helper beans
Status to return to clients, we'll just send "ok" every time.
package com.example;
public class Status {
private String status;
public Status(String status) {
this.status = status;
}
public Status() {
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
This will be the input to our app
package com.example;
public class Vote {
private String name;
public Vote(String name) {
this.name = name;
}
public Vote() {
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
SpringBootKafkaController
This is the controller, after starting the app we should have an active endpoint available under http://localhost:8080/vote
package com.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
@RestController
public class SpringBootKafkaController {
@Autowired
SpringBootKafkaProducer springBootKafkaProducer;
@RequestMapping("/vote")
public Status vote(@RequestBody Vote vote) throws ExecutionException, InterruptedException {
springBootKafkaProducer.send(vote.getName());
return new Status("ok");
}
}
Checking everything
There should be an active console reader from previous post so we won't cover this. After running the
SpringBootKafkaExampleApplication simply open a rest client application like Postman and try to send the
following JSON to http://localhost:8080/vote
{
"name": "Test"
}
If everything was fine you should see the name that you send in this json in the console consumer. In
Part 4
we are going to go over how to pickup the data from kafka with spark streaming, combine them with data in cassandra and
push them back to cassandra.