日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用 kafka 提升你的订单接口吞吐量

發(fā)布時間:2025/3/20 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用 kafka 提升你的订单接口吞吐量 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

今日推薦

最適合晚上睡不著看的 8 個網(wǎng)站,建議收藏哦

23 種設(shè)計模式的通俗解釋,雖然有點污,但是秒懂請立即卸載這款 IDEA 插件!SQL自動檢查神器,再也不用擔(dān)心SQL出錯了,自動補全、回滾等功能大全最新 955 不加班的公司名單(2022版)

我們使用的是jdk自帶的隊列,實現(xiàn)了服務(wù)的吞吐量增加,但是我們知道的是,jdk的隊列時基于內(nèi)存的,即當(dāng)請求量很大的時候,大量的請求緩存在內(nèi)存當(dāng)中,對于內(nèi)存的要求還是很大的,不是很適合并發(fā)量很大的業(yè)務(wù)場景。

尤其是在電商的場景,都會通過消息隊列的削峰,解耦,從而提高系統(tǒng)的吞吐量,保證穩(wěn)定性。所以我們接下來,繼續(xù)對系統(tǒng)進行改進,引入kafka,進一步對于穩(wěn)定性進行完善。

關(guān)于kafka的安裝,介紹,集成,請參考文末鏈接。

一、引入Kafka

引入依賴:

<!--?https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka?--> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.10.RELEASE</version> </dependency>

添加配置:

spring:kafka:bootstrap-servers:?172.16.3.29:9092producer:#?發(fā)生錯誤后,消息重發(fā)的次數(shù)。retries:?0#??????#當(dāng)有多個消息需要被發(fā)送到同一個分區(qū)時,生產(chǎn)者會把它們放在同一個批次里。該參數(shù)指定了一個批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計算。#??????batch-size:?16384#??????#?設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。#??????buffer-memory:?33554432#??????#?鍵的序列化方式key-serializer:?org.apache.kafka.common.serialization.StringSerializer#??????#?值的序列化方式value-serializer:?org.apache.kafka.common.serialization.StringSerializer#??????# acks=0?:?生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)。#??????# acks=1 :?只要集群的首領(lǐng)節(jié)點收到消息,生產(chǎn)者就會收到一個來自服務(wù)器成功響應(yīng)。#??????# acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點全部收到消息時,生產(chǎn)者才會收到一個來自服務(wù)器的成功響應(yīng)。#??????acks:?1consumer:group-id:?test#??????#?自動提交的時間間隔?在spring?boot?2.X?版本中這里采用的是值的類型為Duration?需要符合特定的格式,如1S,1M,2H,5D#??????auto-commit-interval:?1S#??????#?該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:#??????#?latest(默認(rèn)值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄)#??????# earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區(qū)的記錄#??????auto-offset-reset:?earliest#??????#?是否自動提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動提交偏移量#??????enable-auto-commit:?false#??????#?鍵的反序列化方式key-deserializer:?org.apache.kafka.common.serialization.StringDeserializer#??????#?值的反序列化方式value-deserializer:?org.apache.kafka.common.serialization.StringDeserializermax-poll-records:?150listener:#??????#?在偵聽器容器中運行的線程數(shù)。#??????concurrency:?5#??????#listner負責(zé)ack,每調(diào)用一次,就立即commit#??????ack-mode:?manual_immediatemissing-topics-fatal:?false

二、測試服務(wù)改造

2.1 消息生產(chǎn)者

提供一個簡單的生產(chǎn)者工具類,只有簡單的發(fā)送消息一個方法,參數(shù)是 topic 和 message。

import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.kafka.core.KafkaTemplate; import?org.springframework.stereotype.Component;/***?kafka生產(chǎn)者**?@author?weirx*?@date?2021/02/03?14:22**/ @Component public?class?KafkaProducer?{@Autowiredprivate?KafkaTemplate?kafkaTemplate;/***?kafka消息發(fā)送*?@param*?@author?weirx*?@return?void*?@date:?2021/2/3*/public?void?send(String?topic,String?message){kafkaTemplate.send(topic,message);} }

2.2 下單接口改造

引入KafkaProducer,下單時替換http請求成kafka推送,偽代碼如下:

@Autowired private?KafkaProducer?kafkaProducer; kafkaProducer.send("rob-necessities-order",JSONObject.toJSONString(map));

2.3 支付接口改造

在前面一篇文章當(dāng)中,支付接口是作為下單接口的回調(diào)接口被調(diào)用的,其實是一個完全同步的接口,易出現(xiàn)問題,如阻塞,請求失敗、超時等。

所以此處我們也將其改造成kafka異步消費,消費者工具類如下所示:

@Slf4j @Component public?class?KafkaConsumer?{@Autowiredprivate?TradingServiceImpl?tradingService;@KafkaListener(topics?=?{"rob-necessities-trading"})public?void?consumer(ConsumerRecord<?,??>?record)?{Optional<?>?kafkaMessage?=?Optional.ofNullable(record.value());if?(kafkaMessage.isPresent())?{Object?message?=?kafkaMessage.get();String?orderId?=?message.toString();log.info("支付開始時間***********************:{},訂單id:?{}",?LocalDateTime.now(),?orderId);tradingService.pay(Long.valueOf(orderId));log.info("支付完成時間/:{},訂單id:?{}",?LocalDateTime.now(),?orderId);}} }

三、訂單服務(wù)改造

3.1 支付回調(diào)改造

前面的支付方式,我們是在訂單完成后通過http接口形式,現(xiàn)在改用kafka,所以我們需要提供一個kafka消息生產(chǎn)者,將消息同送到測試服務(wù):

@Component public?class?KafkaProducer?{@Autowiredprivate?KafkaTemplate?kafkaTemplate;/***?kafka消息發(fā)送*?@param*?@author?weirx*?@return?void*?@date:?2021/2/3*/public?void?send(String?topic,String?message){kafkaTemplate.send(topic,message);} }

3.2 下單接口改造

接口下單命令的方式不再是等待http請求調(diào)用了,此處變成監(jiān)聽kafka,提供消費者如下:

@Slf4j @Component public?class?KafkaConsumer?{@Autowiredprivate?OrderService?orderService;@KafkaListener(topics?=?{"rob-necessities-order"})public?void?consumer(ConsumerRecord<?,??>?record)?{Optional<?>?kafkaMessage?=?Optional.ofNullable(record.value());if?(kafkaMessage.isPresent())?{Object?message?=?kafkaMessage.get();log.info("-----------------?record?="?+?record);log.info("------------------?message?="?+?message);JSONObject?jsonObject?=?JSONObject.parseObject(message.toString());OrderDTO?orderDTO?=?jsonObject.toJavaObject(OrderDTO.class);orderService.saveOrder(orderDTO);}} }

因為我們已經(jīng)使用了kafka作為并發(fā)時流量緩沖的組件,就不在需要我們前面自己添加進來的隊列了,所以改造后的下單接口如下所示:

@Autowired private?KafkaProducer?kafkaProducer;@Override public?Result?saveOrder(OrderDTO?orderDTO)?{//?下單實現(xiàn)Result?result?=?this.saveOrderImpl(orderDTO);String?orderId?=?JSONObject.parseObject(JSONObject.toJSONString(result.getData())).getString("id");kafkaProducer.send("rob-necessities-trading",?orderId);return?Result.success("下單成功"); }

如上所示,具體的訂單業(yè)務(wù)邏輯沒有變化。

四、測試

kafka是單節(jié)點的,在其他服務(wù)器上,因為我本地沒有內(nèi)存了。

全部完成時間大概是23秒,時間上有些許增加,但是整體吞吐量跟以前絕對不是一個量級的了。

另外,我們在調(diào)用支付接口的時候也可以通過kafka的形式,但是本文不做修改了。

補充

  • kafka安裝教程:https://juejin.cn/post/7004625159298482206

  • springboot集成kafka:https://juejin.cn/post/7005438980455923726

  • kafka專欄:https://juejin.cn/column/6996836630841524238

  • 本文項目代碼gitee地址:https://gitee.com/wei_rong_xin/rob-necessities.git

來源:juejin.cn/post/7068450775743070244

最后,再給大家推薦一個GitHub項目,該項目整理了上千本常用技術(shù)PDF,技術(shù)書籍都可以在這里找到。

GitHub地址:https://github.com/hello-go-maker/cs-books

電子書已經(jīng)更新好了,拿走不謝,記得點一個star,持續(xù)更新中...

總結(jié)

以上是生活随笔為你收集整理的使用 kafka 提升你的订单接口吞吐量的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。