Using Kafka with Junit
One of the neat features that the excellent Spring Kafka project provides, apart from a easier to use abstraction over raw Kafka Producer and Consumer, is a way to use Kafka in tests. It does this by providing an embedded version of Kafka that can be set-up and torn down very easily.
All that a project needs to include this support is the “spring-kafka-test” module, for a gradle build the following way:
testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"
Note that I am using a snapshot version of the project as this has support for Kafka 0.10+.
With this dependency in place, an Embedded Kafka can be spun up in a test using the @ClassRule of JUnit:
@ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");
This would start up a Kafka Cluster with 2 brokers, with a topic called “messages” using 2 partitions and the class rule would make sure that a Kafka cluster is spun up before the tests are run and then shutdown at the end of it.
Here is how a sample with Raw Kafka Producer/Consumer using this embedded Kafka cluster looks like, the embedded Kafka can be used for retrieving the properties required by the Kafka Producer/Consumer:
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get();
producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get();
producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get();
producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");
final CountDownLatch latch = new CountDownLatch(4);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
kafkaConsumer.subscribe(Collections.singletonList("messages"));
try {
while (true) {
ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<Integer, String> record : records) {
LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
latch.countDown();
}
}
} finally {
kafkaConsumer.close();
}
});
assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();A little more comprehensive test is available here
| Reference: | Using Kafka with Junit from our JCG partner Biju Kunjummen at the all and sundry blog. |


testCompile group: ‘org.springframework’, name: ‘spring-test’, version: ‘5.0.6.RELEASE’
After this i am getting this error,can you please help
java.lang.NoClassDefFoundError: org/I0Itec/zkclient/serialize/ZkSerializer