日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)

發布時間:2024/9/27 javascript 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

          • 一、基礎集成
            • 1. 技術選型
            • 2. 導入依賴
            • 3. kafka配置
            • 4. auto-offset-reset 簡述
            • 5. 新增一個訂單類
            • 6. 生產者(異步)
            • 7. 消費者
            • 8. kafka配置類
            • 9.單元測試
            • 9. 效果圖
            • 10. 源碼地址
            • 11.微服務專欄

一、基礎集成
1. 技術選型
軟件/框架版本
jdk1.8.0_202
springboot2.5.4
kafka serverkafka_2.12-2.8.0
kafka client2.7.1
zookeeper3.7.0
2. 導入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3. kafka配置

properties版本

spring.application.name=springboot-kafka server.port=8080 # kafka 配置 spring.kafka.bootstrap-servers=node1:9092# producer 配置 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 生產者每個批次最多方多少條記錄 spring.kafka.producer.batch-size=16384 # 生產者一端總的可用緩沖區大小,此處設置為32M * 1024 * 1024 spring.kafka.producer.buffer-memory=33544432# consumer 配置 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.group-id=springboot-consumer-02 # earliest - 如果找不到當前消費者的有效偏移量,則自動重置向到最開始 spring.kafka.consumer.auto-offset-reset=earliest # 消費者的偏移量是自動提交還是手動提交,此處自動提交偏移量 spring.kafka.consumer.enable-auto-commit=true # 消費者偏移量自動提交時間間隔 spring.kafka.consumer.auto-commit-interval=1000

yml版本項目內部配置

server:port: 8002 spring:application:# 應用名稱name: ly-kafkaprofiles:# 環境配置active: devcloud:nacos:discovery:# 服務注冊地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

nacos-config 服務端配置

在這里插入代碼片
4. auto-offset-reset 簡述

關于
auto.offset.reset 配置有3個值可以設置,分別如下:

earliest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset時,從頭開始消費;
latest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,消費新產生的該分區下的數據;
none: topic 各分區都存在已提交的 offset 時,從 offset 后開始消費;只要有一個分區不存在已提交的 offset,則拋出異常;
默認建議用 earliest, 設置該參數后 kafka出錯后重啟,找到未消費的offset可以繼續消費。

而 latest 這個設置容易丟失消息,假如 kafka 出現問題,還有數據往topic中寫,這個時候重啟kafka,這個設置會從最新的offset開始消費, 中間出問題的哪些就不管了。

none 這個設置沒有用過,兼容性太差,經常出問題。

5. 新增一個訂單類

模擬業務系統中,用戶每下一筆訂單,就發送一個消息,供其他服務消費

package com.gblfy.kafka.entity;import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor;import java.time.LocalDateTime;@Data @Builder @AllArgsConstructor @NoArgsConstructor public class Order {/*** 訂單id*/private long orderId;/*** 訂單號*/private String orderNum;/*** 訂單創建時間*/private LocalDateTime createTime; }
6. 生產者(異步)
package com.gblfy.lykafka.provider;import com.alibaba.fastjson.JSONObject; import com.gblfy.common.constant.KafkaTopicConstants; import com.gblfy.common.entity.Order; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;import java.time.LocalDateTime;/*** Kafka生產者** @author gblfy* @date 2021-09-28*/ @Service public class KafkaProvider {private final static Logger log = LoggerFactory.getLogger(KafkaProvider.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {// 構建一個訂單類Order order = Order.builder().orderId(orderId).orderNum(orderNum).createTime(createTime).build();// 發送消息,訂單類的 json 作為消息體ListenableFuture<SendResult<String, String>> future =kafkaTemplate.send(KafkaTopicConstants.KAFKA_MSG_TOPIC, JSONObject.toJSONString(order));// 監聽回調future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable e) {log.info("發送消息失敗: {}", e.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> result) {RecordMetadata metadata = result.getRecordMetadata();log.info("發送的主題:{} ,發送的分區:{} ,發送的偏移量:{} ",metadata.topic(), metadata.partition(), metadata.offset());}});} }
7. 消費者
package com.gblfy.lykafka.controller;import com.gblfy.lykafka.provider.KafkaProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;@RestController @RequestMapping("/kafka") public class KafkaProviderController {@Autowiredprivate KafkaProvider kafkaProvider;@GetMapping("/sendMQ")public String sendMQContent() {kafkaProvider.sendMessage(0001, "10", LocalDateTime.now());return "OK";} }

通過 @KafkaListener注解,我們可以指定需要監聽的 topic 以及 groupId, 注意,這里的 topics 是個數組,意味著我們可以指定多個 topic,如:@KafkaListener(topics = {“topic-springboot-01”, “topic-springboot-02”}, groupId = “group_id”)。

注意:消息發布者的 TOPIC 需要保持與消費者監聽的 TOPIC 一致,否者消費不到消息。

8. kafka配置類
package com.gblfy.common.constant;public class KafkaTopicConstants {//kafka發送消息主題public static final String KAFKA_MSG_TOPIC = "topic-springboot-01";// kafka消費者組需要和yml文件中的 kafka.consumer.group-id的值保持一致public static final String KAFKA_MSG_TOPIC_GROUP = "springboot-consumer-02"; }
9.單元測試

新建單元測試,功能測試消息發布,以及消費。

package com.gblfy.kafka;import com.gblfy.kafka.controller.KafkaProvider; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;import java.time.LocalDateTime; import java.util.UUID; import java.util.concurrent.TimeUnit;@SpringBootTest class KafkaSpringbootApplicationTests {@Autowiredprivate KafkaProvider kafkaProvider;@Testpublic void sendMessage() throws InterruptedException {// 發送 1000 個消息for (int i = 0; i < 1000; i++) {long orderId = i+1;String orderNum = UUID.randomUUID().toString();kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());}TimeUnit.MINUTES.sleep(1);} }
9. 效果圖


10. 源碼地址

https://gitee.com/gb_90/kafka-parent

11.微服務專欄

https://gitee.com/gb_90/micro-service-parent

總結

以上是生活随笔為你收集整理的KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。