平安保险基于 SPI 机制的 RocketMQ 定制化应用
作者:孫園園|平安人壽資深開發
為什么選用 RocketMQ
首先跟大家聊聊我們為什么會選用 RocketMQ,在做技術選型的過程中,應用場景應該是最先考慮清楚的,只有確定好了應用場景在做技術選型的過程中才有明確的目標和衡量的標準。像異步、解耦、削峰填谷這些消息中間件共有的特性就不一一介紹了,這些特性是決定你的場景需不需要使用消息中間件,這里主要講述下在確定使用消息中間件后,又是如何去選擇哪款消息中間件的。
同步雙寫,確保業務數據安全可靠不丟失
我們在搭建消息中間件平臺時的定位是給業務系統做業務數據的傳輸使用,對業務數據的很重要的一個要求就是不允許丟數據,所以選用 RocketMQ 的第一點就是他有同步雙寫機制,數據在主從服務器上都刷盤成功才算發送成功。同步雙寫條件下,MQ 的寫入性能與異步刷盤異步賦值相比肯定會有所下降,與異步條件下大約會有 20% 左右的下降,單主從架構下,1K 的消息寫入性能還是能達到 8W+ 的 TPS,對大部分業務場景而言性能是能完全滿足要求的,另外對下降的這部分性能可以通過 broker 的橫向擴招來彌補,所以在同步雙寫條件下,性能是能滿足業務需求的。
多 topic 應用場景下,性能依舊強悍
第二點,業務系統的使用場景會特別多,使用場景廣泛帶來的問題就是會創建大量的 topic,所以這時候就得去衡量消息中間件在多 topic 場景下性能是否能滿足需求。我自己在測試的時候呢,用 1K 的消息隨機往 1 萬個 topic 寫數據,單 broker 狀態下能達到2W左右的 TPS,這一點比 Kafka 要強很多。所以多 topic 應用場景下,性能依舊強悍是我們選用 topic 的第二個原因。這點也是由底層文件存儲結構決定的,像 Kafka、RocketMQ 這類消息中間件能做到接近內存的讀寫能力,主要取決于文件的順序讀寫和內存映射。RocketMQ 中的所有 topic 的消息都是寫在同一個 commitLog 文件中的,但是 Kafka 中的消息是以 topic 為基本單位組織的,不同的 topic 之間是相互獨立的。在多 topic 場景下就造成了大量的小文件,大量的小文件在讀寫時存在一個尋址的過程,就有點類似隨機讀寫了,影響整體的性能。
支持事務消息、順序消息、延遲消息、消息消費失敗重試等
RocketMQ 支持事務消息、順序消息、消息消費失敗重試、延遲消息等,功能比較豐富,比較適合復雜多變的業務場景使用
社區建設活躍,阿里開源系統
另外,在選用消息中間件時也要考慮下社區的活躍度和源碼所使用的開發語言,RocketMQ 使用 Java 開發,對 Java 開發人員就比較友好,不管是閱讀源碼排查問題還是在 MQ 的基礎上做二次開發都比較容易一點。社區里同學大都是國內的小伙伴,對大家參與 RocketMQ 開源貢獻也是比較親近的,這里呢也是希望更多的小伙伴能參與進來,為國內開源項目多做貢獻。
SPI 機制簡介及應用
介紹完為什么選用 RocketMQ 后,接下來給大家介紹下我們是如何基于 SPI 機制應用 RocketMQ 的。SPI 全稱為 (Service Provider Interface) ,是 JDK 內置的一種服務提供發現機制,我個人簡單理解就是面向接口編程,留給使用者一個擴展的點,像 springBoot 中的 spring.factories 也是 SPI 機制的一個應用。如圖給大家展示的是 RocketMQ 中 SPI 的一個應用。我們基于 SPI 機制的 RocketMQ 客戶端的應用的靈感也是來自于 MQ 中 SPI 機制的應用。RocketMQ 在實現 ACL 權限校驗的時候,是通過實現 AccessValidator 接口,PlainAccessValidator 是 MQ 中的默認實現。權限校驗這一塊,可能因為組織架構的不一樣會有不同的實現方式,通過 SPI 機制提供一個接口,為開發者定制化開發提供擴展點。在有定制化需求時只需要重新實現 AccessValidator 接口,不需要對源碼大動干戈。
接下來先給大家介紹下我們配置文件的一個簡單模型,在這個配置文件中除了 sendMsgService、consumeMsgConcurrently、consumeMsgOrderly 這三個配置項外其余的都是 RocketMQ 原生的配置文件,發送消息和消費消息這三個配置項呢就是 SPI 機制的應用,是為具體實現提供的接口。可能有的同學會有疑問,SPI 的配置文件不是應該放在 META-INF.service 路徑下么?這里呢我們是為了方便配置文件的管理,索性就跟 MQ 配置文件放在了一起。前面也提到了,META-INF.service 只是一個默認的路徑而已,為了方便管理做相應的修改也沒有違背SPI機制的思想。
我們再看下這個配置文件模型,這里的配置項呢囊括了使用 MQ 時所要配置的所有選項,proConfigs 支持所有的 MQ 原生配置,這樣呢也就實現了配置與應用實現的解耦,應用端只需呀關注的具體的業務邏輯即可,生產者消費者的實現和消費者消費的 topic 都可以通過配置文件來指定。另外該配置文件也支持多 nameserver 的多環境使用,在較復雜的應用中支持往多套 RocketMQ 環境發送消息和消費多套不同環境下的消息。消費者提供了兩個接口主要是為了支持 RocketMQ 的并發消費和順序消費。接下來呢給大家分享下如何根據這個配置文件來初始化生產者消費者。首先給大家先介紹下我們抽象出來的客戶端加載的一個核心流程。
客戶端核心流程詳情
圖中大家可以看到,客戶端的核心流程我們抽象成了三部分,分別是啟動期、運行期和終止期。首先加載配置文件呢就是加載剛剛介紹的那個配置文件模型,在配置與應用完全解耦的狀態下,必須先加載完配置文件才能初始化后續的流程。在初始化生產者和消費者之前應當先創建好應用實現的生產者和消費者的業務邏輯對象 供生產者和消費者使用。在運行期監聽配置文件的變化,根據變化動態的調整生產者和消費者實例。這里還是要再強調下配置與應用的解耦為動態調整提供了可能。終止期就比較簡單了,就是關閉生產者和消費者,并從容器中移除。這里的終止期指的生產者和消費者的終止,并不是整個應用的終止,生產者和消費者的終止可能出現在動態調整的過程中,所以終止了的實例一定要從容器中移除,方便初始化后續的生產者和消費者。介紹完基本流程后,接下來給大家介紹下配置文件的加載過程。
如何加載配置文件
配置文件加載這一塊的話,流程是比較簡單的。這里主要講的是如何去兼容比較老的項目。RocketMQ 客戶端支持的 JDK 最低版本是 1.6,所以在封裝客戶端時應該要考慮到新老項目兼容的問題。在這里呢我們客戶端的核心包是支持 JDK1.6 的,spring 早期的項目配置文件一般都是放在在 resources 路徑下,我們是自己實現了一套讀取配置文件的和監聽配置文件的方法,具體的大家可以參考 acl 中配置文件的讀取和監聽。在核心包的基礎上用 springBoot 又封裝了一套自動加載配置文件的包供微服務項目使用,配置文件的讀取和監聽都用的 spring 的那一套。配置文件加載完之后, 配置文件中應用實現的生產者和消費者是如何與 RocketMQ 的生產者和消費者相關聯的呢?接下來給大家分享下這方面的內容。
如何將生產消費者與業務實現關聯
首先先看下消費者是如何實現關聯的,上圖是 MQ 消費者的消息監聽器,需要我們去實現具體的業務邏輯處理。通過將配置文件中實現的消費邏輯關聯到這里就能實現配置文件中的消費者與 RocketMQ 消費者的關聯。消費者的接口定義也是很簡單,就是去消費消息。消費消息的類型可以通過泛型指定,在初始化消費者的時候獲取具體實現的參數類型,并將
MQ 接受到的消息轉換為具體的業務類型數據。由客戶端統一封裝好消息類型的轉換。對消費消息的返回值大家可以根據需要與 MQ 提供的 status 做一個映射,這里的 demo 只是簡單顯示了下。在獲取具體的應用消費者實例的時候,如果你的消費邏輯里使用了 spring 管理的對象,那么你實現的消費邏輯對象也要交給 spring 管理,通過 spring 上下文獲取初始化好的對象;如果你的消費邏輯里沒有使用 spring 進行管理,可以通過反射的方式自己創建具體的應用實例。
與消費者不一樣的是生產者需要將初始化好的 producer 對象傳遞到應用代碼中去,而消費者是去獲取應用中實現的邏輯對象,那如何將 producer 傳遞到業務應用中去呢?
業務代碼中實現的生產者需要繼承 SendMessage,這樣業務代碼就獲得了 RmqProducer 對象,這是一個被封裝后的生產者對象,該對象對發送消息的方法進行的規范化定義,使之符合公司的相應規范制度,該對象中的方法也會對 topic 的命名規范進行檢查,規范 topic 有一個統一的命名規范。
如何動態調整生產消費者
首先談到動態調整就需要談一下動態調整發生的場景,如果沒有合適的使用場景的話實現動態調整就有點華而不實了。這里我列舉了四個配置文件發生變化的場景:
nameserver發生變化的時候,需要重新初始化所有的生產者和消費者,這個一般是在 MQ 做遷移或者當前 MQ 集群不可用是需要緊急切換 MQ;
增減實例的場景只要啟動或關閉相應的實例即可,增加應用實例的場景一般是在需要增加一個消費者來消費新的 topic 的,減少消費者一般是在某個消費者發生異常時需要緊急關閉這個消費者,及時止損。
調整消費者線程的場景中我們對源碼進行了一點修改,讓應用端能獲取到消費者的線程池對象,以便對線程池的核心線程數進行動態調整。這個的應用場景一般是在當某個消費者消費的數據比較多,占用過多的 CPU 資源時,導致優先級更高的消息得不到及時處理,可以先將該消費者的線程調小一些。
應用的優點
發布云原生技術最新資訊、匯集云原生技術最全內容,定期舉辦云原生活動、直播,阿里產品及用戶最佳實踐發布。與你并肩探索云原生技術點滴,分享你需要的云原生內容。
關注【阿里巴巴云原生】公眾號,獲取更多云原生實時資訊!
總結
以上是生活随笔為你收集整理的平安保险基于 SPI 机制的 RocketMQ 定制化应用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 国内唯一,阿里云容器服务进入 Forre
- 下一篇: K8s Ingress Provider