平安保险基于 SPI 机制的 RocketMQ 定制化应用
作者:孫園園|平安人壽資深開發(fā)
為什么選用 RocketMQ
首先跟大家聊聊我們?yōu)槭裁磿x用 RocketMQ,在做技術(shù)選型的過程中,應(yīng)用場景應(yīng)該是最先考慮清楚的,只有確定好了應(yīng)用場景在做技術(shù)選型的過程中才有明確的目標和衡量的標準。像異步、解耦、削峰填谷這些消息中間件共有的特性就不一一介紹了,這些特性是決定你的場景需不需要使用消息中間件,這里主要講述下在確定使用消息中間件后,又是如何去選擇哪款消息中間件的。
同步雙寫,確保業(yè)務(wù)數(shù)據(jù)安全可靠不丟失
我們在搭建消息中間件平臺時的定位是給業(yè)務(wù)系統(tǒng)做業(yè)務(wù)數(shù)據(jù)的傳輸使用,對業(yè)務(wù)數(shù)據(jù)的很重要的一個要求就是不允許丟數(shù)據(jù),所以選用 RocketMQ 的第一點就是他有同步雙寫機制,數(shù)據(jù)在主從服務(wù)器上都刷盤成功才算發(fā)送成功。同步雙寫條件下,MQ 的寫入性能與異步刷盤異步賦值相比肯定會有所下降,與異步條件下大約會有 20% 左右的下降,單主從架構(gòu)下,1K 的消息寫入性能還是能達到 8W+ 的 TPS,對大部分業(yè)務(wù)場景而言性能是能完全滿足要求的,另外對下降的這部分性能可以通過 broker 的橫向擴招來彌補,所以在同步雙寫條件下,性能是能滿足業(yè)務(wù)需求的。
多 topic 應(yīng)用場景下,性能依舊強悍
第二點,業(yè)務(wù)系統(tǒng)的使用場景會特別多,使用場景廣泛帶來的問題就是會創(chuàng)建大量的 topic,所以這時候就得去衡量消息中間件在多 topic 場景下性能是否能滿足需求。我自己在測試的時候呢,用 1K 的消息隨機往 1 萬個 topic 寫數(shù)據(jù),單 broker 狀態(tài)下能達到2W左右的 TPS,這一點比 Kafka 要強很多。所以多 topic 應(yīng)用場景下,性能依舊強悍是我們選用 topic 的第二個原因。這點也是由底層文件存儲結(jié)構(gòu)決定的,像 Kafka、RocketMQ 這類消息中間件能做到接近內(nèi)存的讀寫能力,主要取決于文件的順序讀寫和內(nèi)存映射。RocketMQ 中的所有 topic 的消息都是寫在同一個 commitLog 文件中的,但是 Kafka 中的消息是以 topic 為基本單位組織的,不同的 topic 之間是相互獨立的。在多 topic 場景下就造成了大量的小文件,大量的小文件在讀寫時存在一個尋址的過程,就有點類似隨機讀寫了,影響整體的性能。
支持事務(wù)消息、順序消息、延遲消息、消息消費失敗重試等
RocketMQ 支持事務(wù)消息、順序消息、消息消費失敗重試、延遲消息等,功能比較豐富,比較適合復(fù)雜多變的業(yè)務(wù)場景使用
社區(qū)建設(shè)活躍,阿里開源系統(tǒng)
另外,在選用消息中間件時也要考慮下社區(qū)的活躍度和源碼所使用的開發(fā)語言,RocketMQ 使用 Java 開發(fā),對 Java 開發(fā)人員就比較友好,不管是閱讀源碼排查問題還是在 MQ 的基礎(chǔ)上做二次開發(fā)都比較容易一點。社區(qū)里同學(xué)大都是國內(nèi)的小伙伴,對大家參與 RocketMQ 開源貢獻也是比較親近的,這里呢也是希望更多的小伙伴能參與進來,為國內(nèi)開源項目多做貢獻。
SPI 機制簡介及應(yīng)用
介紹完為什么選用 RocketMQ 后,接下來給大家介紹下我們是如何基于 SPI 機制應(yīng)用 RocketMQ 的。SPI 全稱為 (Service Provider Interface) ,是 JDK 內(nèi)置的一種服務(wù)提供發(fā)現(xiàn)機制,我個人簡單理解就是面向接口編程,留給使用者一個擴展的點,像 springBoot 中的 spring.factories 也是 SPI 機制的一個應(yīng)用。如圖給大家展示的是 RocketMQ 中 SPI 的一個應(yīng)用。我們基于 SPI 機制的 RocketMQ 客戶端的應(yīng)用的靈感也是來自于 MQ 中 SPI 機制的應(yīng)用。RocketMQ 在實現(xiàn) ACL 權(quán)限校驗的時候,是通過實現(xiàn) AccessValidator 接口,PlainAccessValidator 是 MQ 中的默認實現(xiàn)。權(quán)限校驗這一塊,可能因為組織架構(gòu)的不一樣會有不同的實現(xiàn)方式,通過 SPI 機制提供一個接口,為開發(fā)者定制化開發(fā)提供擴展點。在有定制化需求時只需要重新實現(xiàn) AccessValidator 接口,不需要對源碼大動干戈。
接下來先給大家介紹下我們配置文件的一個簡單模型,在這個配置文件中除了 sendMsgService、consumeMsgConcurrently、consumeMsgOrderly 這三個配置項外其余的都是 RocketMQ 原生的配置文件,發(fā)送消息和消費消息這三個配置項呢就是 SPI 機制的應(yīng)用,是為具體實現(xiàn)提供的接口??赡苡械耐瑢W(xué)會有疑問,SPI 的配置文件不是應(yīng)該放在 META-INF.service 路徑下么?這里呢我們是為了方便配置文件的管理,索性就跟 MQ 配置文件放在了一起。前面也提到了,META-INF.service 只是一個默認的路徑而已,為了方便管理做相應(yīng)的修改也沒有違背SPI機制的思想。
我們再看下這個配置文件模型,這里的配置項呢囊括了使用 MQ 時所要配置的所有選項,proConfigs 支持所有的 MQ 原生配置,這樣呢也就實現(xiàn)了配置與應(yīng)用實現(xiàn)的解耦,應(yīng)用端只需呀關(guān)注的具體的業(yè)務(wù)邏輯即可,生產(chǎn)者消費者的實現(xiàn)和消費者消費的 topic 都可以通過配置文件來指定。另外該配置文件也支持多 nameserver 的多環(huán)境使用,在較復(fù)雜的應(yīng)用中支持往多套 RocketMQ 環(huán)境發(fā)送消息和消費多套不同環(huán)境下的消息。消費者提供了兩個接口主要是為了支持 RocketMQ 的并發(fā)消費和順序消費。接下來呢給大家分享下如何根據(jù)這個配置文件來初始化生產(chǎn)者消費者。首先給大家先介紹下我們抽象出來的客戶端加載的一個核心流程。
客戶端核心流程詳情
圖中大家可以看到,客戶端的核心流程我們抽象成了三部分,分別是啟動期、運行期和終止期。首先加載配置文件呢就是加載剛剛介紹的那個配置文件模型,在配置與應(yīng)用完全解耦的狀態(tài)下,必須先加載完配置文件才能初始化后續(xù)的流程。在初始化生產(chǎn)者和消費者之前應(yīng)當先創(chuàng)建好應(yīng)用實現(xiàn)的生產(chǎn)者和消費者的業(yè)務(wù)邏輯對象 供生產(chǎn)者和消費者使用。在運行期監(jiān)聽配置文件的變化,根據(jù)變化動態(tài)的調(diào)整生產(chǎn)者和消費者實例。這里還是要再強調(diào)下配置與應(yīng)用的解耦為動態(tài)調(diào)整提供了可能。終止期就比較簡單了,就是關(guān)閉生產(chǎn)者和消費者,并從容器中移除。這里的終止期指的生產(chǎn)者和消費者的終止,并不是整個應(yīng)用的終止,生產(chǎn)者和消費者的終止可能出現(xiàn)在動態(tài)調(diào)整的過程中,所以終止了的實例一定要從容器中移除,方便初始化后續(xù)的生產(chǎn)者和消費者。介紹完基本流程后,接下來給大家介紹下配置文件的加載過程。
如何加載配置文件
配置文件加載這一塊的話,流程是比較簡單的。這里主要講的是如何去兼容比較老的項目。RocketMQ 客戶端支持的 JDK 最低版本是 1.6,所以在封裝客戶端時應(yīng)該要考慮到新老項目兼容的問題。在這里呢我們客戶端的核心包是支持 JDK1.6 的,spring 早期的項目配置文件一般都是放在在 resources 路徑下,我們是自己實現(xiàn)了一套讀取配置文件的和監(jiān)聽配置文件的方法,具體的大家可以參考 acl 中配置文件的讀取和監(jiān)聽。在核心包的基礎(chǔ)上用 springBoot 又封裝了一套自動加載配置文件的包供微服務(wù)項目使用,配置文件的讀取和監(jiān)聽都用的 spring 的那一套。配置文件加載完之后, 配置文件中應(yīng)用實現(xiàn)的生產(chǎn)者和消費者是如何與 RocketMQ 的生產(chǎn)者和消費者相關(guān)聯(lián)的呢?接下來給大家分享下這方面的內(nèi)容。
如何將生產(chǎn)消費者與業(yè)務(wù)實現(xiàn)關(guān)聯(lián)
首先先看下消費者是如何實現(xiàn)關(guān)聯(lián)的,上圖是 MQ 消費者的消息監(jiān)聽器,需要我們?nèi)崿F(xiàn)具體的業(yè)務(wù)邏輯處理。通過將配置文件中實現(xiàn)的消費邏輯關(guān)聯(lián)到這里就能實現(xiàn)配置文件中的消費者與 RocketMQ 消費者的關(guān)聯(lián)。消費者的接口定義也是很簡單,就是去消費消息。消費消息的類型可以通過泛型指定,在初始化消費者的時候獲取具體實現(xiàn)的參數(shù)類型,并將
MQ 接受到的消息轉(zhuǎn)換為具體的業(yè)務(wù)類型數(shù)據(jù)。由客戶端統(tǒng)一封裝好消息類型的轉(zhuǎn)換。對消費消息的返回值大家可以根據(jù)需要與 MQ 提供的 status 做一個映射,這里的 demo 只是簡單顯示了下。在獲取具體的應(yīng)用消費者實例的時候,如果你的消費邏輯里使用了 spring 管理的對象,那么你實現(xiàn)的消費邏輯對象也要交給 spring 管理,通過 spring 上下文獲取初始化好的對象;如果你的消費邏輯里沒有使用 spring 進行管理,可以通過反射的方式自己創(chuàng)建具體的應(yīng)用實例。
與消費者不一樣的是生產(chǎn)者需要將初始化好的 producer 對象傳遞到應(yīng)用代碼中去,而消費者是去獲取應(yīng)用中實現(xiàn)的邏輯對象,那如何將 producer 傳遞到業(yè)務(wù)應(yīng)用中去呢?
業(yè)務(wù)代碼中實現(xiàn)的生產(chǎn)者需要繼承 SendMessage,這樣業(yè)務(wù)代碼就獲得了 RmqProducer 對象,這是一個被封裝后的生產(chǎn)者對象,該對象對發(fā)送消息的方法進行的規(guī)范化定義,使之符合公司的相應(yīng)規(guī)范制度,該對象中的方法也會對 topic 的命名規(guī)范進行檢查,規(guī)范 topic 有一個統(tǒng)一的命名規(guī)范。
如何動態(tài)調(diào)整生產(chǎn)消費者
首先談到動態(tài)調(diào)整就需要談一下動態(tài)調(diào)整發(fā)生的場景,如果沒有合適的使用場景的話實現(xiàn)動態(tài)調(diào)整就有點華而不實了。這里我列舉了四個配置文件發(fā)生變化的場景:
nameserver發(fā)生變化的時候,需要重新初始化所有的生產(chǎn)者和消費者,這個一般是在 MQ 做遷移或者當前 MQ 集群不可用是需要緊急切換 MQ;
增減實例的場景只要啟動或關(guān)閉相應(yīng)的實例即可,增加應(yīng)用實例的場景一般是在需要增加一個消費者來消費新的 topic 的,減少消費者一般是在某個消費者發(fā)生異常時需要緊急關(guān)閉這個消費者,及時止損。
調(diào)整消費者線程的場景中我們對源碼進行了一點修改,讓應(yīng)用端能獲取到消費者的線程池對象,以便對線程池的核心線程數(shù)進行動態(tài)調(diào)整。這個的應(yīng)用場景一般是在當某個消費者消費的數(shù)據(jù)比較多,占用過多的 CPU 資源時,導(dǎo)致優(yōu)先級更高的消息得不到及時處理,可以先將該消費者的線程調(diào)小一些。
應(yīng)用的優(yōu)點
發(fā)布云原生技術(shù)最新資訊、匯集云原生技術(shù)最全內(nèi)容,定期舉辦云原生活動、直播,阿里產(chǎn)品及用戶最佳實踐發(fā)布。與你并肩探索云原生技術(shù)點滴,分享你需要的云原生內(nèi)容。
關(guān)注【阿里巴巴云原生】公眾號,獲取更多云原生實時資訊!
總結(jié)
以上是生活随笔為你收集整理的平安保险基于 SPI 机制的 RocketMQ 定制化应用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 国内唯一,阿里云容器服务进入 Forre
- 下一篇: K8s Ingress Provider