日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

springboot 整合 kafka demo 顺便看一下源码

發(fā)布時(shí)間:2023/12/10 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 springboot 整合 kafka demo 顺便看一下源码 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

大家好,我是烤鴨:

? ? 今天分享下 springboot 整合 kafka。

1.? 環(huán)境參數(shù):

? ? ? windows +?kafka_2.11-2.3.0 +?zookeeper-3.5.6 + springboot 2.3.0?

2.? 下載安裝zookeeper?+?kafka
?

zookeeper:

https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz

復(fù)制?zoo_sample.cfg ,改名為?zoo.cfg,增加日志路徑:

dataDir=D:\xxx\env\apache-zookeeper-3.5.6-bin\data dataLogDir=D:\xxx\env\apache-zookeeper-3.5.6-bin\log

啟動zk,zkServer.cmd

kafka:

https://kafka.apache.org/downloads

?Binary downloads 下載
https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz

修改 config/server.properties,由于zk用的默認(rèn)端口 2181,所以不需要改

log.dirs=D:\\xxx\\env\\kafka\\logs

啟動kafka

D:\xxx\env\kafka\bin\windows\kafka-server-start.bat D:\xxx\env\kafka\config\server.properties

3.? springboot 接入

pom.xml

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.0.RELEASE</version><scope>compile</scope></dependency></dependencies>

application.yml

spring:kafka:# 指定kafka server的地址,集群配多個(gè),中間,逗號隔開bootstrap-servers: 127.0.0.1:9092# 生產(chǎn)者producer:# 寫入失敗時(shí),重試次數(shù)。當(dāng)leader節(jié)點(diǎn)失效,一個(gè)repli節(jié)點(diǎn)會替代成為leader節(jié)點(diǎn),此時(shí)可能出現(xiàn)寫入失敗,# 當(dāng)retris為0時(shí),produce不會重復(fù)。retirs重發(fā),此時(shí)repli節(jié)點(diǎn)完全成為leader節(jié)點(diǎn),不會產(chǎn)生消息丟失。retries: 0# 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送batch-size: 16384# produce積累數(shù)據(jù)一次發(fā)送,緩存大小達(dá)到buffer.memory就發(fā)送數(shù)據(jù)buffer-memory: 33554432# 指定消息key和消息體的編解碼方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger.ms: 1# 消費(fèi)者consumer:enable-auto-commit: falseauto-commit-interval: 100mskey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000group-id: group server:port: 8081

?KafkaDemoController.java

package com.mys.mys.demo.kafka.web;import com.mys.mys.demo.kafka.service.KafkaSendService; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import java.util.Arrays; import java.util.HashMap; import java.util.Map;@RestController public class KafkaDemoController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@AutowiredKafkaSendService kafkaSendService;@GetMapping("/message/send")public boolean send(@RequestParam String message) {//默認(rèn)自動創(chuàng)建,消費(fèi)者端 allow.auto.create.topics = true//createTopic();kafkaTemplate.send("testTopic-xxx15", message);return true;}//同步@GetMapping("/message/sendSync")public boolean sendSync(@RequestParam String message){kafkaSendService.sendSync("synctopic",message);return true;}//異步示例@GetMapping("/message/sendAnsyc")public boolean sendAnsys(@RequestParam String message){kafkaSendService.sendAnsyc("ansyctopic",message);return true;}/*** @Author* @Description 創(chuàng)建主題* @Date 2020/5/23 19:03* @Param []* @return void**/private void createTopic() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");KafkaAdmin admin = new KafkaAdmin(configs);NewTopic newTopic = new NewTopic("testTopic-xxx15",1,(short)1);AdminClient adminClient = AdminClient.create(admin.getConfigurationProperties());adminClient.createTopics(Arrays.asList(newTopic));} }

KafkaSendService.java

package com.mys.mys.demo.kafka.service;import com.mys.mys.demo.kafka.handler.KafkaSendResultHandler; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;@Service public class KafkaSendService {@Autowiredprivate KafkaTemplate<String,Object> kafkaTemplate;@Autowiredprivate KafkaSendResultHandler producerListener;/*** 異步示例* */public void sendAnsyc(final String topic,final String message){//統(tǒng)一監(jiān)聽處理kafkaTemplate.setProducerListener(producerListener);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic,message);//具體業(yè)務(wù)的寫自己的監(jiān)聽邏輯future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("發(fā)送消息成功:" + result);}@Overridepublic void onFailure(Throwable ex) {System.out.println("發(fā)送消息失敗:"+ ex.getMessage());}});}/*** 同步示例* */public void sendSync(final String topic,final String message){ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, message);try {kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);System.out.println("發(fā)送成功");}catch (ExecutionException e) {System.out.println("發(fā)送消息失敗:"+ e.getMessage());}catch (TimeoutException | InterruptedException e) {System.out.println("發(fā)送消息失敗:"+ e.getMessage());}} }

CustomerListener.java

package com.mys.mys.demo.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;@Component public class CustomerListener {@KafkaListener(topics="testTopic")public void onMessage(String message){System.out.println("消費(fèi)="+message);}@KafkaListener(topics="testTopic-xxx14")public void onMessage1(String message){System.out.println("消費(fèi)="+message);}@KafkaListener(topics="testTopic-xxx15")public void onMessage15(String message){System.out.println("消費(fèi)="+message);} }

KafkaSendResultHandler.java(用于接收異步的返回值)

package com.mys.mys.demo.kafka.handler;import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.support.ProducerListener; import org.springframework.stereotype.Component;@Component public class KafkaSendResultHandler implements ProducerListener {private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {log.info("Message send success : " + producerRecord.toString());}@Overridepublic void onError(ProducerRecord producerRecord, Exception exception) {log.info("Message send error : " + producerRecord.toString());} }

4.? 效果和部分源碼分析

?看一下項(xiàng)目啟動的日志,消費(fèi)者監(jiān)聽到的分區(qū)和隊(duì)列名稱。另外如果kafka沒有這個(gè)隊(duì)列,在調(diào)用send方法時(shí)自動創(chuàng)建,看以下這個(gè)配置。
auto.create.topics.enable ,默認(rèn)為 true。

訪問路徑:http://localhost:8081/message/send?message=1234
輸出結(jié)果。

可以看下 ProducerRecord 這個(gè)類,方法先不貼了,看這幾個(gè)屬性。

public class ProducerRecord<K, V> {//隊(duì)列名稱private final String topic;//分區(qū)名稱,如果沒有指定,會按照key的hash值分配。如果key也沒有,按照循環(huán)的方式分配。private final Integer partition;//請求頭,用來存放k、v以外的信息,默認(rèn)是只讀的private final Headers headers;//key-valueprivate final K key;private final V value;//時(shí)間戳,如果不傳,默認(rèn)按服務(wù)器時(shí)間來private final Long timestamp; }

再看下 Producer,重點(diǎn)看下 send方法,kafka支持同步或異步接收消息發(fā)送的結(jié)果,實(shí)現(xiàn)都是靠Future,只是異步的時(shí)候future執(zhí)行了回調(diào)方法,支持?jǐn)r截器方式。

/*** The interface for the {@link KafkaProducer}* @see KafkaProducer* @see MockProducer*/ public interface Producer<K, V> extends Closeable {/*** See {@link KafkaProducer#send(ProducerRecord)}*/Future<RecordMetadata> send(ProducerRecord<K, V> record);/*** See {@link KafkaProducer#send(ProducerRecord, Callback)}*/Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback); }

更詳細(xì)的看這篇文章說的很好:

https://www.cnblogs.com/dingwpmz/p/12153036.html

簡單總結(jié)一下:
Producer的send方法并不會直接像broker發(fā)送數(shù)據(jù),而是計(jì)算消息長度是否超限,是否開啟事務(wù),如果當(dāng)前緩存區(qū)已寫滿或創(chuàng)建了一個(gè)新的緩存區(qū),則喚醒 Sender(消息發(fā)送線程),將緩存區(qū)中的消息發(fā)送到 broker 服務(wù)器,以隊(duì)列的形式(每個(gè)topic+每個(gè)partition維護(hù)一個(gè)雙端隊(duì)列),即 ArrayDeque,內(nèi)部存放的元素為 ProducerBatch,即代表一個(gè)批次,即 Kafka 消息發(fā)送是按批發(fā)送的。

?

總結(jié)

以上是生活随笔為你收集整理的springboot 整合 kafka demo 顺便看一下源码的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。