My objective here is to show how Spring Kafka provides an abstraction to raw Kafka Producer and Consumer API’s that is easy to use and is familiar to someone with a Spring background.
Sample scenario
The sample scenario is a simple one, I have a system which produces a message and another which processes it
Implementation using Raw Kafka Producer/Consumer API’s
To start with I have used raw Kafka Producer and Consumer API’s to implement this scenario. If you would rather look at the code, I have it available in my github repo here.
Producer
The following sets up a KafkaProducer instance which is used for sending a message to a Kafka topic:
KafkaProducer<String, WorkUnit> producer = new KafkaProducer<>(kafkaProps, stringKeySerializer(), workUnitJsonSerializer());
I have used a variation of the KafkaProducer constructor which takes in a custom Serializer to convert the domain object to a json representation.
Once an instance of KafkaProducer is available, it can be used for sending a message to the Kafka cluster, here I have used a synchronous version of the sender which waits for a response to be back.
ProducerRecord<String, WorkUnit> record = new ProducerRecord<>("workunits", workUnit.getId(), workUnit); RecordMetadata recordMetadata = this.workUnitProducer.send(record).get();
Consumer
On the Consumer side we create a KafkaConsumer with a variation of the constructor taking in a Deserializer which knows how to read a json message and translate that to the domain instance:
KafkaConsumer<String, WorkUnit> consumer
= new KafkaConsumer<>(props, stringKeyDeserializer()
, workUnitJsonValueDeserializer());
Once an instance of KafkaConsumer is available a listener loop can be put in place which reads a batch of records, processes them and waits for more records to come through:
consumer.subscribe("workunits); try { while (true) { ConsumerRecords<String, WorkUnit> records = this.consumer.poll(100); for (ConsumerRecord<String, WorkUnit> record : records) { log.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } finally { this.consumer.close(); }
Implementation using Spring Kafka
I have the implementation using Spring-kafka available in my github repo.
Producer
Spring-Kafka provides a KafkaTemplate class as a wrapper over the KafkaProducer to send messages to a Kafka topic:
@Bean public ProducerFactory<String, WorkUnit> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs(), stringKeySerializer(), workUnitJsonSerializer()); } @Bean public KafkaTemplate<String, WorkUnit> workUnitsKafkaTemplate() { KafkaTemplate<String, WorkUnit> kafkaTemplate = new KafkaTemplate<>(producerFactory()); kafkaTemplate.setDefaultTopic("workunits"); return kafkaTemplate; }
One thing to note is that whereas earlier I had implemented a custom Serializer/Deserializer to send a domain type as json and then to convert it back, Spring-Kafka provides Seralizer/Deserializer for json out of the box.
And using KafkaTemplate to send a message:
SendResult<String, WorkUnit> sendResult = workUnitsKafkaTemplate.sendDefault(workUnit.getId(), workUnit).get(); RecordMetadata recordMetadata = sendResult.getRecordMetadata(); LOGGER.info("topic = {}, partition = {}, offset = {}, workUnit = {}", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), workUnit);
Consumer
The consumer part is implemented using a Listener pattern that should be familiar to anybody who has implemented listeners for RabbitMQ/ActiveMQ. Here is first the configuration to set-up a listener container:
@Bean public ConcurrentKafkaListenerContainerFactory<String, WorkUnit> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, WorkUnit> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConcurrency(1); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<String, WorkUnit> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerProps(), stringKeyDeserializer(), workUnitJsonValueDeserializer()); }
and the service which responds to messages read by the container:
@Service public class WorkUnitsConsumer { private static final Logger log = LoggerFactory.getLogger(WorkUnitsConsumer.class); @KafkaListener(topics = "workunits") public void onReceiving(WorkUnit workUnit, @Header(KafkaHeaders.OFFSET) Integer offset, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { log.info("Processing topic = {}, partition = {}, offset = {}, workUnit = {}", topic, partition, offset, workUnit); } }
Here all the complexities of setting up a listener loop like with the raw consumer is avoided and is nicely hidden by the listener container.
Conclusion
I have brushed over a lot of the internals of setting up batch sizes, variations in acknowledgement, different API signatures. My intention is just to demonstrate a common use case using the raw Kafka API’s and show how Spring-Kafka wrapper simplifies it.
If you are interested in exploring further, the raw producer consumer sample is available here and the Spring Kafka one here
Reference: | Spring Kafka Producer/Consumer sample from our JCG partner Biju Kunjummen at the all and sundry blog. |