Getting Started with Spring Kafka

It is easy to get started with Apache Kafka using Spring Kafka. A lot is abstracted away and left configurable for our convenience. In this article, we are going to write code to display how easy it is to get started with Spring Kafka.

In the previous article, we looked at what Apache Kafka is and what it can be used for. If you haven’t checked that out yet, I recommend doing that before proceeding with this article.

As the title suggests, this article will focus on Java and the Spring Framework. The examples will use Spring Boot 2.0.0.M7 and Java 9. It should be no problems to follow along with a release version of Spring Boot 2 once it exists.

Spring Kafka dependency

Only 1 dependency will be required for the tutorial. I recommend heading to one of my favourite sites, Spring Initializr to create a project. Actually, you might want to create two projects, one for producing and one for consuming events. Even though you could run both the consumer and the producer on the same server, it is not as exciting.

Spring Initializr with Spring Kafka

If you want to add it to an existing project manually you should add the following dependency.


  org.springframework.kafka
  spring-kafka
  2.1.0.RELEASE

Spring Kafka 2.1.0.RELEASE requires Java 9, which means that you also have to use a Spring Boot 2.x release. As of today, you have to also add the Spring Milestone Repository in order to do so. But we are expecting the release any week now, so that might not be the case any longer while you read this article.

Producer

Configuration

We are going to start with writing the configuration and creating the beans to make it possible to produce events.

@Configuration
public class KafkaConfiguration {

  @Value("${kafka.host:localhost}")
  private String kafkaHost;

  @Value("${kafka.port:9092}")
  private int port;

  @Bean
  public ProducerFactory producerFactory() {
    Map configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost + ":" + port);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
  }

  @Bean
  public KafkaTemplate kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }

}

The KafkaTemplate that we create is what can be used to write events. In this example, we are only going to write strings, but the same method can be used to write any type of objects, you are just going to need a different template and serializer in order to do so, but we will go through that later on.

Sending

Since we have created a KafkaTemplate bean in our KafkaProducerConfiguration, we can now utilize that one for sending data to Kafka.

We can do that by autowiring the template and using it to send.

@GetMapping("/person/{name}")
public ResponseEntity getPerson(@PathVariable String name) {
    Optional person = personRepository.findById(name);

    if (person.isPresent()) {
        kafkaTemplate.send("Topic1", "GET /person/name OK > " + name);
        return ResponseEntity.ok(person.get());
    } else {
        kafkaTemplate.send("Topic1", "GET /person/name BadRequest > " + name);
        return ResponseEntity.badRequest().body(null);
    }
}

We send to the Topic1 because that’s what we created in the previous Apache Kafka Introduction article. But you can, of course, send it to any Topic that you’ve created. Additionally, because we created a KafkaTemplate for String we do that, but you can create a template for any object that can be serialized, normally to JSON via, for example, Jackson.

Consumer

Now that we are able to send data to Apache Kafka via Spring Kafka, it is time to consume it. You could, of course, have the producer and the consumer in the same application, but that doesn’t really make much sense. So, instead, I recommend creating another Spring Boot application that you can use for consuming it.

Configuration

Similar to how we created a configuration class for the consumer, we are going to need one for the producer as well.

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

  @Bean
  public Map consumerConfigs() {
    Map props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");

    return props;
  }

  @Bean
  public ConsumerFactory consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
  }

  @Bean
  public KafkaConsumer kafkaConsumer() {
    return new KafkaConsumer();
  }
}

Pretty standard stuff, we create a config from where to read the data from and instruct the ConsumerConfig how it is going to be deserialized. The group id defines the consumer group that this server is consuming on behalf of.

What is interesting here is the KafkaConsumer which is our own class that we create a bean from. Our KafkaConsumer looks like the following.


public class KafkaConsumer {

  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

  @KafkaListener(topics = "Topic1")
  public void receive(String payload) {
    LOGGER.info("Received payload='{}'", payload);
  }

}

In the KafkaConsumer we annotate a method with @KafkaListener and assign it to a topic that it should listen to.

And that’s all that’s required to start consuming. As soon as the other service produces a message, the consumer will almost instantaneously pick it up.

2018-01-26 20:43:41.717  INFO 4619 --- [nio-8001-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d
2018-01-26 20:43:42.008  INFO 4536 --- [ntainer#0-0-C-1] o.t.demo.skeleton.service.KafkaConsumer  : Received payload='GET /person/name OK > viktor'

Final words

As you probably agree, it is super easy to get started with Apache Kafka using Spring Kafka. With just three small classes we can already start producing and consuming data. As we previously went through in the Apache Kafka Introduction article in a more realistic scenario you will probably use Apache Kafka to integrate multiple services, where tons of data gets passed around, and that’s where you will first start to notice the power of Apache Kafka. But at least, now you have the knowledge and the tools to start taking advantage of it.

3 thoughts on “Getting Started with Spring Kafka”

  1. Hi,
    I have been waiting for this article
    Thank you very much
    I was wondering if spring kafka abstraction really makes developing kafka applications easier. Now it seems worth using to me
    Regards from Poland

Leave a Reply