日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)

發(fā)布時間:2024/9/27 javascript 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

            • 1. 技術(shù)選型
            • 2. 導(dǎo)入依賴
            • 3. kafka配置
            • 4. 生產(chǎn)者(同步)
            • 5. 生產(chǎn)者(異步)
            • 6. 消費(fèi)者

1. 技術(shù)選型
軟件/框架版本
jdk1.8.0_202
springboot2.5.4
kafka serverkafka_2.12-2.8.0
kafka client2.7.1
zookeeper3.7.0
2. 導(dǎo)入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3. kafka配置

properties版本

spring.application.name=springboot-kafka server.port=8080 # kafka 配置 spring.kafka.bootstrap-servers=node1:9092# producer 配置 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 生產(chǎn)者每個批次最多方多少條記錄 spring.kafka.producer.batch-size=16384 # 生產(chǎn)者一端總的可用緩沖區(qū)大小,此處設(shè)置為32M * 1024 * 1024 spring.kafka.producer.buffer-memory=33544432# consumer 配置 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.group-id=springboot-consumer-02 # earliest - 如果找不到當(dāng)前消費(fèi)者的有效偏移量,則自動重置向到最開始 spring.kafka.consumer.auto-offset-reset=earliest # 消費(fèi)者的偏移量是自動提交還是手動提交,此處自動提交偏移量 spring.kafka.consumer.enable-auto-commit=true # 消費(fèi)者偏移量自動提交時間間隔 spring.kafka.consumer.auto-commit-interval=1000

yml版本

server:port: 8080 spring:application:name: springboot-kafkakafka:bootstrap-servers: 192.168.92.104:9092consumer:auto-commit-interval: 1000auto-offset-reset: earliestenable-auto-commit: truegroup-id: springboot-consumer-02key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:batch-size: 16384buffer-memory: 33544432key-serializer: org.apache.kafka.common.serialization.IntegerSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
4. 生產(chǎn)者(同步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController public class KafkaSyncController {private final static Logger log = LoggerFactory.getLogger(KafkaSyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;@RequestMapping("/send/sync/{message}")public String send(@PathVariable String message) {//同步發(fā)送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 0, message);try {SendResult<Integer, String> sendResult = future.get();RecordMetadata metadata = sendResult.getRecordMetadata();log.info("發(fā)送的主題:{} ,發(fā)送的分區(qū):{} ,發(fā)送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());// System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}}
5. 生產(chǎn)者(異步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;@RestController public class KafkaAsyncController {private final static Logger log = LoggerFactory.getLogger(KafkaAsyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;//設(shè)置回調(diào)函數(shù),異步等待broker端的返回結(jié)束@RequestMapping("/send/async/{message}")public String sendAsync(@PathVariable String message) {//同步發(fā)送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 1, message);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable e) {log.info("發(fā)送消息失敗: {}", e.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {RecordMetadata metadata = result.getRecordMetadata();log.info("發(fā)送的主題:{} ,發(fā)送的分區(qū):{} ,發(fā)送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());}});return "success";} }
6. 消費(fèi)者
package com.gblfy.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;@Component public class KafkaConsumer {private final static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topics = {"topic-springboot-01"})public void onMessage(ConsumerRecord<Integer, String> record) {log.info("消費(fèi)者接收到消息主題:{} ,消息的分區(qū):{} ,消息偏移量:{} ,消息key: {} ,消息values:{} ",record.topic(), record.partition(), record.offset(), record.key(), record.value());} }

總結(jié)

以上是生活随笔為你收集整理的KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。