springboot系列八、springboot整合kafka
生活随笔
收集整理的這篇文章主要介紹了
springboot系列八、springboot整合kafka
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
背景:
當(dāng)業(yè)務(wù)在同一時間出現(xiàn)高并發(fā)的時候,這個時候我們不想無限的增加服務(wù)器,但是又想提高吞吐量。這時可以考慮使用消息異步處理,進(jìn)行消峰填谷;同時還可以降低耦合度。常見的消息中間件有kafka,rabbitMQ,activeMQ,rocketMQ。其中性能最好的,吞吐量最高的是以kafka為代表,下面介紹kafka用法。kafka詳細(xì)原理介紹,參考kafka系列:https://www.cnblogs.com/wangzhuxing/category/1351802.html。
一、引入依賴
<!--kafka支持--> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>二、配置yml
spring:kafka: # 指定kafka 代理地址,可以多個bootstrap-servers: 47.52.199.52:9092template: # 指定默認(rèn)topic iddefault-topic: producerlistener: # 指定listener 容器中的線程數(shù),用于提高并發(fā)量concurrency: 5consumer:group-id: myGroup # 指定默認(rèn)消費者group idclient-id: 200max-poll-records: 200auto-offset-reset: earliest # 最早未被消費的offsetproducer:batch-size: 1000 # 每次批量發(fā)送消息的數(shù)量retries: 3client-id: 200三、生成者使用示例
package com.example.demo.kafka;import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.ExecutionException;@Component public class Producer {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;/*** 發(fā)送消息到kafka*/public RecordMetadata sendChannelMess(String topic, String message) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic,message);RecordMetadata recordMetadata = null;try {recordMetadata = future.get().getRecordMetadata();} catch (InterruptedException|ExecutionException e) {e.printStackTrace();System.out.println("發(fā)送失敗");}System.out.println("發(fā)送成功");System.out.println("partition:"+recordMetadata.partition());System.out.println("offset:"+recordMetadata.offset());System.out.println("topic:"+recordMetadata.topic());return recordMetadata;} }四、消費者使用示例
package com.example.demo.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;import java.util.List;@Component public class Consumer {/*** 有消息就讀取,只讀取消息value*/@KafkaListener(topics = {"test13"})public void receiveMessage(String message){//收到通道的消息之后執(zhí)行秒殺操作 System.out.println(message);}/*** 有消息就讀取,批量讀取消息value*/@KafkaListener(topics = "test12")public void onMessage(List<String> crs) {for(String str : crs){System.out.println("test12:" + str);}}/*** 有消息就讀取,讀取消息topic,offset,key,value等信息*/@KafkaListener(topics = "test14")public void listenT1(ConsumerRecord<?, ?> cr){System.out.println("listenT1收到消息,topic:>>>" + cr.topic() + " offset:>>" + cr.offset()+ " key:>>" + cr.key() + " value:>>" + cr.value());} }?
轉(zhuǎn)載于:https://www.cnblogs.com/wangzhuxing/p/10186666.html
總結(jié)
以上是生活随笔為你收集整理的springboot系列八、springboot整合kafka的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Oracle发布了Java SE支持路线
- 下一篇: 一次Maven依赖冲突采坑,把依赖调解、