使用 kafka 提升你的订单接口吞吐量
今日推薦
最適合晚上睡不著看的 8 個網(wǎng)站,建議收藏哦
我們使用的是jdk自帶的隊列,實現(xiàn)了服務(wù)的吞吐量增加,但是我們知道的是,jdk的隊列時基于內(nèi)存的,即當(dāng)請求量很大的時候,大量的請求緩存在內(nèi)存當(dāng)中,對于內(nèi)存的要求還是很大的,不是很適合并發(fā)量很大的業(yè)務(wù)場景。
尤其是在電商的場景,都會通過消息隊列的削峰,解耦,從而提高系統(tǒng)的吞吐量,保證穩(wěn)定性。所以我們接下來,繼續(xù)對系統(tǒng)進行改進,引入kafka,進一步對于穩(wěn)定性進行完善。
關(guān)于kafka的安裝,介紹,集成,請參考文末鏈接。
一、引入Kafka
引入依賴:
<!--?https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka?--> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.10.RELEASE</version> </dependency>添加配置:
spring:kafka:bootstrap-servers:?172.16.3.29:9092producer:#?發(fā)生錯誤后,消息重發(fā)的次數(shù)。retries:?0#??????#當(dāng)有多個消息需要被發(fā)送到同一個分區(qū)時,生產(chǎn)者會把它們放在同一個批次里。該參數(shù)指定了一個批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計算。#??????batch-size:?16384#??????#?設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。#??????buffer-memory:?33554432#??????#?鍵的序列化方式key-serializer:?org.apache.kafka.common.serialization.StringSerializer#??????#?值的序列化方式value-serializer:?org.apache.kafka.common.serialization.StringSerializer#??????# acks=0?:?生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)。#??????# acks=1 :?只要集群的首領(lǐng)節(jié)點收到消息,生產(chǎn)者就會收到一個來自服務(wù)器成功響應(yīng)。#??????# acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點全部收到消息時,生產(chǎn)者才會收到一個來自服務(wù)器的成功響應(yīng)。#??????acks:?1consumer:group-id:?test#??????#?自動提交的時間間隔?在spring?boot?2.X?版本中這里采用的是值的類型為Duration?需要符合特定的格式,如1S,1M,2H,5D#??????auto-commit-interval:?1S#??????#?該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:#??????#?latest(默認(rèn)值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄)#??????# earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區(qū)的記錄#??????auto-offset-reset:?earliest#??????#?是否自動提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動提交偏移量#??????enable-auto-commit:?false#??????#?鍵的反序列化方式key-deserializer:?org.apache.kafka.common.serialization.StringDeserializer#??????#?值的反序列化方式value-deserializer:?org.apache.kafka.common.serialization.StringDeserializermax-poll-records:?150listener:#??????#?在偵聽器容器中運行的線程數(shù)。#??????concurrency:?5#??????#listner負責(zé)ack,每調(diào)用一次,就立即commit#??????ack-mode:?manual_immediatemissing-topics-fatal:?false二、測試服務(wù)改造
2.1 消息生產(chǎn)者
提供一個簡單的生產(chǎn)者工具類,只有簡單的發(fā)送消息一個方法,參數(shù)是 topic 和 message。
import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.kafka.core.KafkaTemplate; import?org.springframework.stereotype.Component;/***?kafka生產(chǎn)者**?@author?weirx*?@date?2021/02/03?14:22**/ @Component public?class?KafkaProducer?{@Autowiredprivate?KafkaTemplate?kafkaTemplate;/***?kafka消息發(fā)送*?@param*?@author?weirx*?@return?void*?@date:?2021/2/3*/public?void?send(String?topic,String?message){kafkaTemplate.send(topic,message);} }2.2 下單接口改造
引入KafkaProducer,下單時替換http請求成kafka推送,偽代碼如下:
@Autowired private?KafkaProducer?kafkaProducer; kafkaProducer.send("rob-necessities-order",JSONObject.toJSONString(map));2.3 支付接口改造
在前面一篇文章當(dāng)中,支付接口是作為下單接口的回調(diào)接口被調(diào)用的,其實是一個完全同步的接口,易出現(xiàn)問題,如阻塞,請求失敗、超時等。
所以此處我們也將其改造成kafka異步消費,消費者工具類如下所示:
@Slf4j @Component public?class?KafkaConsumer?{@Autowiredprivate?TradingServiceImpl?tradingService;@KafkaListener(topics?=?{"rob-necessities-trading"})public?void?consumer(ConsumerRecord<?,??>?record)?{Optional<?>?kafkaMessage?=?Optional.ofNullable(record.value());if?(kafkaMessage.isPresent())?{Object?message?=?kafkaMessage.get();String?orderId?=?message.toString();log.info("支付開始時間***********************:{},訂單id:?{}",?LocalDateTime.now(),?orderId);tradingService.pay(Long.valueOf(orderId));log.info("支付完成時間/:{},訂單id:?{}",?LocalDateTime.now(),?orderId);}} }三、訂單服務(wù)改造
3.1 支付回調(diào)改造
前面的支付方式,我們是在訂單完成后通過http接口形式,現(xiàn)在改用kafka,所以我們需要提供一個kafka消息生產(chǎn)者,將消息同送到測試服務(wù):
@Component public?class?KafkaProducer?{@Autowiredprivate?KafkaTemplate?kafkaTemplate;/***?kafka消息發(fā)送*?@param*?@author?weirx*?@return?void*?@date:?2021/2/3*/public?void?send(String?topic,String?message){kafkaTemplate.send(topic,message);} }3.2 下單接口改造
接口下單命令的方式不再是等待http請求調(diào)用了,此處變成監(jiān)聽kafka,提供消費者如下:
@Slf4j @Component public?class?KafkaConsumer?{@Autowiredprivate?OrderService?orderService;@KafkaListener(topics?=?{"rob-necessities-order"})public?void?consumer(ConsumerRecord<?,??>?record)?{Optional<?>?kafkaMessage?=?Optional.ofNullable(record.value());if?(kafkaMessage.isPresent())?{Object?message?=?kafkaMessage.get();log.info("-----------------?record?="?+?record);log.info("------------------?message?="?+?message);JSONObject?jsonObject?=?JSONObject.parseObject(message.toString());OrderDTO?orderDTO?=?jsonObject.toJavaObject(OrderDTO.class);orderService.saveOrder(orderDTO);}} }因為我們已經(jīng)使用了kafka作為并發(fā)時流量緩沖的組件,就不在需要我們前面自己添加進來的隊列了,所以改造后的下單接口如下所示:
@Autowired private?KafkaProducer?kafkaProducer;@Override public?Result?saveOrder(OrderDTO?orderDTO)?{//?下單實現(xiàn)Result?result?=?this.saveOrderImpl(orderDTO);String?orderId?=?JSONObject.parseObject(JSONObject.toJSONString(result.getData())).getString("id");kafkaProducer.send("rob-necessities-trading",?orderId);return?Result.success("下單成功"); }如上所示,具體的訂單業(yè)務(wù)邏輯沒有變化。
四、測試
kafka是單節(jié)點的,在其他服務(wù)器上,因為我本地沒有內(nèi)存了。
全部完成時間大概是23秒,時間上有些許增加,但是整體吞吐量跟以前絕對不是一個量級的了。
另外,我們在調(diào)用支付接口的時候也可以通過kafka的形式,但是本文不做修改了。
補充
kafka安裝教程:https://juejin.cn/post/7004625159298482206
springboot集成kafka:https://juejin.cn/post/7005438980455923726
kafka專欄:https://juejin.cn/column/6996836630841524238
本文項目代碼gitee地址:https://gitee.com/wei_rong_xin/rob-necessities.git
來源:juejin.cn/post/7068450775743070244
最后,再給大家推薦一個GitHub項目,該項目整理了上千本常用技術(shù)PDF,技術(shù)書籍都可以在這里找到。
GitHub地址:https://github.com/hello-go-maker/cs-books
電子書已經(jīng)更新好了,拿走不謝,記得點一個star,持續(xù)更新中...總結(jié)
以上是生活随笔為你收集整理的使用 kafka 提升你的订单接口吞吐量的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 满屏的if-else,看我怎么消灭你!
- 下一篇: 别再用 httpClient了,快试试这