日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

在Junit上使用Kafka

發布時間:2023/12/3 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 在Junit上使用Kafka 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

除了在原始Kafka Producer和Consumer上更易于使用的抽象之外,優秀的Spring Kafka項目提供的簡潔功能之一是在測試中使用Kafka的方法。 它通過提供可輕松設置和拆卸的Kafka嵌入式版本來實現此目的。

一個項目需要包括此支持的全部就是“ spring-kafka-test”模塊,以便按以下方式構建gradle:

testCompile "org.springframework.kafka:spring-kafka-test:1.1.2.BUILD-SNAPSHOT"

請注意,我正在使用該項目的快照版本,因為它支持Kafka 0.10+。

有了此依賴關系后,可以使用JUnit的@ClassRule在測試中啟動嵌入式Kafka:

@ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(2, true, 2, "messages");

這將啟動具有2個代理的Kafka集群,其主題是使用2個分區的“消息”,并且類規則將確保在運行測試之前啟動Kafka集群,然后在測試結束時將其關閉。

這是使用該嵌入式Kafka群集的Raw Kafka Producer / Consumer的示例的樣子,嵌入式Kafka可用于檢索Kafka Producer / Consumer所需的屬性:

Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send(new ProducerRecord<>("messages", 0, 0, "message0")).get(); producer.send(new ProducerRecord<>("messages", 0, 1, "message1")).get(); producer.send(new ProducerRecord<>("messages", 1, 2, "message2")).get(); producer.send(new ProducerRecord<>("messages", 1, 3, "message3")).get();Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka); consumerProps.put("auto.offset.reset", "earliest");final CountDownLatch latch = new CountDownLatch(4); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(() -> {KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);kafkaConsumer.subscribe(Collections.singletonList("messages"));try {while (true) {ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);for (ConsumerRecord<Integer, String> record : records) {LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",record.topic(), record.partition(), record.offset(), record.key(), record.value());latch.countDown();}}} finally {kafkaConsumer.close();} });assertThat(latch.await(90, TimeUnit.SECONDS)).isTrue();

有一點更全面的測試,請點擊這里

翻譯自: https://www.javacodegeeks.com/2016/12/using-kafka-junit.html

總結

以上是生活随笔為你收集整理的在Junit上使用Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。