除了在原始Kafka Producer和Consumer上更易于使用的抽象之外,优秀的Spring Kafka项目提供的简洁功能之一是在测试中使用Kafka的方法。 它通过提供可轻松设置和拆卸的Kafka嵌入式版本来实现此目的。
一个项目需要包括此支持的全部就是“ spring-kafka-test”模块,以便按以下方式构建gradle:
testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"
请注意,我正在使用该项目的快照版本,因为它支持Kafka 0.10+。
有了此依赖关系后,可以使用JUnit的@ClassRule在测试中启动嵌入式Kafka:
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");
这将启动具有2个代理的Kafka集群,其主题是使用2个分区的“消息”,并且类规则将确保在运行测试之前启动Kafka集群,然后在测试结束时将其关闭。
这是使用该嵌入式Kafka群集的Raw Kafka Producer / Consumer的示例的样子,嵌入式Kafka可用于检索Kafka Producer / Consumer所需的属性:
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get();
producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get();
producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get();
producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get();Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");final CountDownLatch latch = new CountDownLatch(4);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);kafkaConsumer.subscribe(Collections.singletonList("messages"));try {while (true) {ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<Integer, String> record : records) {LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());latch.countDown();}}} finally {kafkaConsumer.close();}
});assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();
有一点更全面的测试,请点击这里
翻译自: https://www.javacodegeeks.com/2016/12/using-kafka-junit.html