It is easy to get started with Apache Kafka using Spring Kafka. A lot is abstracted away and left configurable for our convenience. In this article, we are going to write code to display how easy it is to get started with Spring Kafka.
In the previous article, we looked at what Apache Kafka is and what it can be used for. If you haven’t checked that out yet, I recommend doing that before proceeding with this article.
As the title suggests, this article will focus on Java and the Spring Framework. The examples will use Spring Boot 2.0.0.M7 and Java 9. It should be no problems to follow along with a release version of Spring Boot 2 once it exists.
Spring Kafka dependency
Only 1 dependency will be required for the tutorial. I recommend heading to one of my favourite sites, Spring Initializr to create a project. Actually, you might want to create two projects, one for producing and one for consuming events. Even though you could run both the consumer and the producer on the same server, it is not as exciting.
If you want to add it to an existing project manually you should add the following dependency.
org.springframework.kafka
spring-kafka
2.1.0.RELEASE
Spring Kafka 2.1.0.RELEASE requires Java 9, which means that you also have to use a Spring Boot 2.x release. As of today, you have to also add the Spring Milestone Repository in order to do so. But we are expecting the release any week now, so that might not be the case any longer while you read this article.
Producer
Configuration
We are going to start with writing the configuration and creating the beans to make it possible to produce events.
@Configuration
public class KafkaConfiguration {
@Value("${kafka.host:localhost}")
private String kafkaHost;
@Value("${kafka.port:9092}")
private int port;
@Bean
public ProducerFactory producerFactory() {
Map configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + port);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
The KafkaTemplate
that we create is what can be used to write events. In this example, we are only going to write strings, but the same method can be used to write any type of objects, you are just going to need a different template and serializer in order to do so, but we will go through that later on.
Sending
Since we have created a KafkaTemplate
bean in our KafkaProducerConfiguration, we can now utilize that one for sending data to Kafka.
We can do that by autowiring the template and using it to send.
@GetMapping("/person/{name}")
public ResponseEntity getPerson(@PathVariable String name) {
Optional person = personRepository.findById(name);
if (person.isPresent()) {
kafkaTemplate.send("Topic1", "GET /person/name OK > " + name);
return ResponseEntity.ok(person.get());
} else {
kafkaTemplate.send("Topic1", "GET /person/name BadRequest > " + name);
return ResponseEntity.badRequest().body(null);
}
}
We send to the Topic1 because that’s what we created in the previous Apache Kafka Introduction article. But you can, of course, send it to any Topic that you’ve created. Additionally, because we created a KafkaTemplate
for String we do that, but you can create a template for any object that can be serialized, normally to JSON via, for example, Jackson.
Consumer
Now that we are able to send data to Apache Kafka via Spring Kafka, it is time to consume it. You could, of course, have the producer and the consumer in the same application, but that doesn’t really make much sense. So, instead, I recommend creating another Spring Boot application that you can use for consuming it.
Configuration
Similar to how we created a configuration class for the consumer, we are going to need one for the producer as well.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
return props;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaConsumer kafkaConsumer() {
return new KafkaConsumer();
}
}
Pretty standard stuff, we create a config from where to read the data from and instruct the ConsumerConfig how it is going to be deserialized. The group id defines the consumer group that this server is consuming on behalf of.
What is interesting here is the KafkaConsumer
which is our own class that we create a bean from. Our KafkaConsumer
looks like the following.
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "Topic1")
public void receive(String payload) {
LOGGER.info("Received payload='{}'", payload);
}
}
In the KafkaConsumer
we annotate a method with @KafkaListener and assign it to a topic that it should listen to.
And that’s all that’s required to start consuming. As soon as the other service produces a message, the consumer will almost instantaneously pick it up.
2018-01-26 20:43:41.717 INFO 4619 --- [nio-8001-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d 2018-01-26 20:43:42.008 INFO 4536 --- [ntainer#0-0-C-1] o.t.demo.skeleton.service.KafkaConsumer : Received payload='GET /person/name OK > viktor'
Final words
As you probably agree, it is super easy to get started with Apache Kafka using Spring Kafka. With just three small classes we can already start producing and consuming data. As we previously went through in the Apache Kafka Introduction article in a more realistic scenario you will probably use Apache Kafka to integrate multiple services, where tons of data gets passed around, and that’s where you will first start to notice the power of Apache Kafka. But at least, now you have the knowledge and the tools to start taking advantage of it.
Thanks for a great article! I played around with the code a bit and the result can be found in the following repository.
https://github.com/tonsV2/spring-kotlin-kafka
Happy you liked it, and thanks for sharing.
Hi,
I have been waiting for this article
Thank you very much
I was wondering if spring kafka abstraction really makes developing kafka applications easier. Now it seems worth using to me
Regards from Poland