Home » Java » Enterprise Java » Spring Kafka Producer/Consumer sample

About Biju Kunjummen

Spring Kafka Producer/Consumer sample

My objective here is to show how Spring Kafka provides an abstraction to raw Kafka Producer and Consumer API’s that is easy to use and is familiar to someone with a Spring background.

Sample scenario

The sample scenario is a simple one, I have a system which produces a message and another which processes it

Implementation using Raw Kafka Producer/Consumer API’s

To start with I have used raw Kafka Producer and Consumer API’s to implement this scenario. If you would rather look at the code, I have it available in my github repo here.


The following sets up a KafkaProducer instance which is used for sending a message to a Kafka topic:

KafkaProducer<String, WorkUnit> producer 
    = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());

I have used a variation of the KafkaProducer constructor which takes in a custom Serializer to convert the domain object to a json representation.

Once an instance of KafkaProducer is available, it can be used for sending a message to the Kafka cluster, here I have used a synchronous version of the sender which waits for a response to be back.

ProducerRecord<String, WorkUnit> record 
                = new ProducerRecord<>("workunits", workUnit.getId(), workUnit);

RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();


On the Consumer side we create a KafkaConsumer with a variation of the constructor taking in a Deserializer which knows how to read a json message and translate that to the domain instance:

KafkaConsumer<String, WorkUnit> consumer
= new KafkaConsumer<>(props, stringKeyDeserializer()
, workUnitJsonValueDeserializer());

Once an instance of KafkaConsumer is available a listener loop can be put in place which reads a batch of records, processes them and waits for more records to come through:


try {
    while (true) {
        ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100);
        for (ConsumerRecord<String, WorkUnit> record : records) {
            log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                    record.topic(), record.partition(), record.offset(), record.key(), record.value());

} finally {

Implementation using Spring Kafka

I have the implementation using Spring-kafka available in my github repo.


Spring-Kafka provides a KafkaTemplate class as a wrapper over the KafkaProducer to send messages to a Kafka topic:

public ProducerFactory<String, WorkUnit> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer());

public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() {
    KafkaTemplate<String, WorkUnit> kafkaTemplate =  new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;

One thing to note is that whereas earlier I had implemented a custom Serializer/Deserializer to send a domain type as json and then to convert it back, Spring-Kafka provides Seralizer/Deserializer for json out of the box.

And using KafkaTemplate to send a message:

SendResult<String, WorkUnit> sendResult = 
    workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get();

RecordMetadata recordMetadata = sendResult.getRecordMetadata();

LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}",
        recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);


The consumer part is implemented using a Listener pattern that should be familiar to anybody who has implemented listeners for RabbitMQ/ActiveMQ. Here is first the configuration to set-up a listener container:

public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    return factory;

public ConsumerFactory<String, WorkUnit> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer());

and the service which responds to messages read by the container:

public class WorkUnitsConsumer {
    private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class);

    @KafkaListener(topics = "workunits")
    public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset,
                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}",
                topic, partition, offset, workUnit);

Here all the complexities of setting up a listener loop like with the raw consumer is avoided and is nicely hidden by the listener container.


I have brushed over a lot of the internals of setting up batch sizes, variations in acknowledgement, different API signatures. My intention is just to demonstrate a common use case using the raw Kafka API’s and show how Spring-Kafka wrapper simplifies it.

If you are interested in exploring further, the raw producer consumer sample  is available here and the Spring Kafka one here

Reference: Spring Kafka Producer/Consumer sample from our JCG partner Biju Kunjummen at the all and sundry blog.

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you our best selling eBooks for FREE!


1. JPA Mini Book

2. JVM Troubleshooting Guide

3. JUnit Tutorial for Unit Testing

4. Java Annotations Tutorial

5. Java Interview Questions

6. Spring Interview Questions

7. Android UI Design


and many more ....


Receive Java & Developer job alerts in your Area

I have read and agree to the terms & conditions


Notify of

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Inline Feedbacks
View all comments