日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

一套高可用、易伸缩、高并发的IM群聊架构方案设计实践

發(fā)布時(shí)間:2025/3/8 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 一套高可用、易伸缩、高并发的IM群聊架构方案设计实践 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、引言


要實(shí)現(xiàn)一整套能用于大用戶量、高并發(fā)場景下的IM群聊,技術(shù)難度遠(yuǎn)超IM系統(tǒng)中的其它功能,原因在于:IM群聊消息的實(shí)時(shí)寫擴(kuò)散特性帶來了一系列技術(shù)難題。

舉個(gè)例子:如一個(gè)2000人群里,一條普通消息的發(fā)出問題,將瞬間寫擴(kuò)散為2000條消息的接收問題,如何保證這些消息的及時(shí)、有序、高效地送達(dá),涉及到的技術(shù)問題點(diǎn)實(shí)在太多,更別說個(gè)別場景下萬人大群里的炸群消息難題了更別說個(gè)別場景下萬人大群里的炸群消息難題了。

這也是為什么一般中大型IM系統(tǒng)中,都會將群聊單獨(dú)拎出來考慮架構(gòu)的設(shè)計(jì),單獨(dú)有針對性地進(jìn)行架構(gòu)優(yōu)化,從而降低整個(gè)系統(tǒng)的設(shè)計(jì)難度。

本文將分享的是一套生產(chǎn)環(huán)境下的IM群聊消息系統(tǒng)的高可用、易伸縮、高并發(fā)架構(gòu)設(shè)計(jì)實(shí)踐,屬于原創(chuàng)第一手資料,內(nèi)容較專業(yè),適合有一定IM架構(gòu)經(jīng)驗(yàn)的后端程序員閱讀。

二、群聊技術(shù)文章


《IM群聊消息究竟是存1份(即擴(kuò)散讀)還是存多份(即擴(kuò)散寫)?》
《IM群聊消息的已讀回執(zhí)功能該怎么實(shí)現(xiàn)?》
《關(guān)于IM即時(shí)通訊群聊消息的亂序問題討論》
《現(xiàn)代IM系統(tǒng)中聊天消息的同步和存儲方案探討》
《移動(dòng)端IM中大規(guī)模群消息的推送如何保證效率、實(shí)時(shí)性?》
《微信后臺團(tuán)隊(duì):微信后臺異步消息隊(duì)列的優(yōu)化升級實(shí)踐分享》
《IM群聊消息如此復(fù)雜,如何保證不丟不重?》
《IM單聊和群聊中的在線狀態(tài)同步應(yīng)該用“推”還是“拉”?》
《如何保證IM實(shí)時(shí)消息的“時(shí)序性”與“一致性”?》
《快速裂變:見證微信強(qiáng)大后臺架構(gòu)從0到1的演進(jìn)歷程(一)》

三、萬事開頭難:初始的極簡實(shí)現(xiàn)


所謂的群聊消息系統(tǒng),就是一種多對多群體聊天方式,譬如直播房間內(nèi)的聊天室對應(yīng)的服務(wù)器端就是一個(gè)群聊消息系統(tǒng)。

2017年9月初,我們初步實(shí)現(xiàn)了一套極簡的群聊消息系統(tǒng),其大致架構(gòu)如下:

系統(tǒng)名詞解釋:
?

  • 1)Client :?消息發(fā)布者【或者叫做服務(wù)端群聊消息系統(tǒng)調(diào)用者】,publisher;
  • 2)Proxy :?系統(tǒng)代理,對外統(tǒng)一接口,收集Client發(fā)來的消息轉(zhuǎn)發(fā)給Broker;
  • 3)Broker :系統(tǒng)消息轉(zhuǎn)發(fā)Server,Broker 會根據(jù) Gateway Message 組織一個(gè) RoomGatewayList【key為RoomID,value為 Gateway IP:Port 地址列表】,然后把 Proxy 發(fā)來的消息轉(zhuǎn)發(fā)到 Room 中所有成員登錄的所有 Gateway;
  • 4)Router :用戶登錄消息轉(zhuǎn)發(fā)者,把Gateway轉(zhuǎn)發(fā)來的用戶登入登出消息轉(zhuǎn)發(fā)給所有的Broker;
  • 5)Gateway :所有服務(wù)端的入口,接收合法客戶端的連接,并把客戶端的登錄登出消息通過Router轉(zhuǎn)發(fā)給所有的Broker;
  • 6)Room Message :?Room聊天消息;
  • 7)Gateway Message :?Room內(nèi)某成員 登錄 或者 登出 某Gateway消息,包含用戶UIN/RoomID/Gateway地址{IP:Port}等消息。


當(dāng)一個(gè) Room 中多個(gè) Client 連接一個(gè) Gateway 的時(shí)候,Broker只會根據(jù) RoomID 把房間內(nèi)的消息轉(zhuǎn)發(fā)一次給這個(gè)Gateway,由Gateway再把消息復(fù)制多份分別發(fā)送給連接這個(gè) Gateway 的 Room 中的所有用戶的客戶端。

這套系統(tǒng)有如下特點(diǎn):
?

  • 1)系統(tǒng)只轉(zhuǎn)發(fā)房間內(nèi)的聊天消息,每個(gè)節(jié)點(diǎn)收到后立即轉(zhuǎn)發(fā)出去,不存儲任何房間內(nèi)的聊天消息,不考慮消息丟失以及消息重復(fù)的問題;
  • 2)系統(tǒng)固定地由一個(gè)Proxy、三個(gè)Broker和一個(gè)Router構(gòu)成;
  • 3)Proxy接收后端發(fā)送來的房間消息,然后按照一定的負(fù)載均衡算法把消息發(fā)往某個(gè)Broker,Broker則把消息發(fā)送到所有與Room有關(guān)系的接口機(jī)Gateway;
  • 4)Router接收Gateway轉(zhuǎn)發(fā)來的某個(gè)Room內(nèi)某成員在這個(gè)Gateway的登出或者登錄消息,然后把消息發(fā)送到所有Broker;
  • 5)Broker收到Router轉(zhuǎn)發(fā)來的Gateway消息后,更新(添加或者刪除)與某Room相關(guān)的Gateway集合記錄;
  • 6)整個(gè)系統(tǒng)的通信鏈路采用UDP通信方式。


從以上特點(diǎn),整個(gè)消息系統(tǒng)足夠簡單,沒有考慮擴(kuò)縮容問題,當(dāng)系統(tǒng)負(fù)載到達(dá)極限的時(shí)候,就重新再部署一套系統(tǒng)以應(yīng)對后端client的消息壓力。

這種處理方式本質(zhì)是把系統(tǒng)的擴(kuò)容能力甩鍋給了后端Client以及前端Gateway:每次擴(kuò)容一個(gè)系統(tǒng),所有Client需要在本地配置文件中添加一個(gè)Proxy地址然后全部重啟,所有Gateway則需要再本地配置文件添加一個(gè)Router地址然后全部重啟

這種“幸福我一人,辛苦千萬家”的擴(kuò)容應(yīng)對方式,必然導(dǎo)致公司內(nèi)部這套系統(tǒng)的使用者怨聲載道,下一階段的升級就是必然的了。

四、進(jìn)一步重點(diǎn)設(shè)計(jì):“可擴(kuò)展性”

?

4.1、基本思路


大道之行也,天下為公,不同的系統(tǒng)有不同的構(gòu)架,相同的系統(tǒng)總有類似的實(shí)現(xiàn)。類似于數(shù)據(jù)庫的分庫分表【關(guān)于分庫分表,目前看到的最好的文章是《一種支持自由規(guī)劃無須數(shù)據(jù)遷移和修改路由代碼的Replicaing擴(kuò)容方案》】,其擴(kuò)展實(shí)現(xiàn)核心思想是分Partition分Replica,但各Replica之間還區(qū)分leader(leader-follower,只有l(wèi)eader可接受寫請求)和non-leader(所有replica均可接收寫請求)兩種機(jī)制。

從數(shù)據(jù)角度來看,這套系統(tǒng)接收兩種消息:Room Message(房間聊天消息)和Gateway Message(用戶登錄消息)。兩種消息的交匯之地就是Broker,所以應(yīng)對擴(kuò)展的緊要地方就是Broker,Broker的每個(gè)Partition采用non-leader機(jī)制,各replica均可接收Gateway Message消息寫請求和Room Message轉(zhuǎn)發(fā)請求。

首先,當(dāng)Room Message量加大時(shí)可以對Proxy進(jìn)行水平擴(kuò)展,多部署Proxy即可因應(yīng)Room Message的流量。

其次,當(dāng)Gateway Message量加大時(shí)可以對Router進(jìn)行水平擴(kuò)展,多部署Router即可因應(yīng)Gateway Message的流量。

最后,兩種消息的交匯之地Broker如何擴(kuò)展呢?可以把若干Broker Replica組成一個(gè)Partition,因?yàn)镚ateway Message是在一個(gè)Partition內(nèi)廣播的,所有Broker Replica都會有相同的RoomGatewayList 數(shù)據(jù),因此當(dāng)Gateway Message增加時(shí)擴(kuò)容Partition即可。當(dāng)Room Message量增加時(shí),水平擴(kuò)容Partition內(nèi)的Broker Replica即可,因?yàn)镽oom Message只會發(fā)送到Partition內(nèi)某個(gè)Replica上。

從個(gè)人經(jīng)驗(yàn)來看,Room ID的增長以及Room內(nèi)成員的增加量在一段時(shí)間內(nèi)可以認(rèn)為是直線增加,而Room Message可能會以指數(shù)級增長,所以若設(shè)計(jì)得當(dāng)則Partition擴(kuò)容的概率很小,而Partition內(nèi)Replica水平增長的概率幾乎是100%。

不管是Partition級別的水平擴(kuò)容還是Partition Replica級別的水平擴(kuò)容,不可能像系統(tǒng)極簡版本那樣每次擴(kuò)容后都需要Client或者Gateway去更新配置文件然后重啟,因應(yīng)之道就是可用zookeeper充當(dāng)角色的Registriy。通過這個(gè)zookeeper注冊中心,相關(guān)角色擴(kuò)容的時(shí)候在Registry注冊后,與之相關(guān)的其他模塊得到通知即可獲取其地址等信息。采用zookeeper作為Registry的時(shí)候,所以程序?qū)崿F(xiàn)的時(shí)候采用實(shí)時(shí)watch和定時(shí)輪詢的策略保證數(shù)據(jù)可靠性,因?yàn)橐坏┚W(wǎng)絡(luò)有任何的抖動(dòng),zk就會認(rèn)為客戶端已經(jīng)宕機(jī)把鏈接關(guān)閉。

分析完畢,與之相對的架構(gòu)圖如下:

以下各分章節(jié)將描述各個(gè)模塊詳細(xì)流程。

4.2、Client


Client詳細(xì)流程如下:
?

  • 1)從配置文件加載Registry地址;
  • 2)從Registy上Proxy注冊路徑/pubsub/proxy下獲取所有的Proxy,依據(jù)各個(gè)Proxy ID大小順序遞增組成一個(gè)ProxyArray;
  • 3)啟動(dòng)一個(gè)線程實(shí)時(shí)關(guān)注Registry路徑/pubsub/proxy,以獲取Proxy的動(dòng)態(tài)變化,及時(shí)更新ProxyArray;
  • 4)啟動(dòng)一個(gè)線程定時(shí)輪詢獲取Registry路徑/pubsub/proxy下各個(gè)Proxy實(shí)例,作為關(guān)注策略的補(bǔ)充,以期本地ProxyArray內(nèi)各個(gè)Proxy成員與Registry上的各個(gè)Proxy保持一致;定時(shí)給各個(gè)Proxy發(fā)送心跳,異步獲取心跳回包;定時(shí)清除ProxyArray中心跳超時(shí)的Proxy成員;
  • 5)發(fā)送消息的時(shí)候采用snowflake算法給每個(gè)消息分配一個(gè)MessageID,然后采用相關(guān)負(fù)載均衡算法把消息轉(zhuǎn)發(fā)給某個(gè)Proxy。

?

4.3、Proxy


Proxy詳細(xì)流程如下:
?

  • 1)讀取配置文件,獲取Registry地址;
  • 2)把自身信息注冊到Registry路徑/pubsub/proxy下,把Registry返回的ReplicaID作為自身ID;
  • 3)從Registry路徑/pubsub/broker/partition(x)下獲取每個(gè)Broker Partition的各個(gè)replica;
  • 4)從Registry路徑/pubsub/broker/partition_num獲取當(dāng)前有效的Broker Partition Number;

?

  • 5)啟動(dòng)一個(gè)線程關(guān)注Registry上的Broker路徑/pubsub/broker,以實(shí)時(shí)獲取以下信息:
  • ? ? {Broker Partition Number}
  • ? ???- 新的Broker Partition(此時(shí)發(fā)生了擴(kuò)容);
  • ? ???- Broker Partition內(nèi)新的broker replica(Partition內(nèi)發(fā)生了replica擴(kuò)容);
  • ? ???- Broker Parition內(nèi)某replica掛掉的信息;

?

  • 6)定時(shí)向各個(gè)Broker Partition replica發(fā)送心跳,異步等待Broker返回的心跳響應(yīng)包,以探測其活性,以保證不向超時(shí)的replica轉(zhuǎn)發(fā)Room Message;
  • 7)啟動(dòng)一個(gè)線程定時(shí)讀取Registry上的Broker路徑/pubsub/broker下各個(gè)子節(jié)點(diǎn)的值,以定時(shí)輪詢的策略觀察Broker Partition Number變動(dòng),以及各Partition的變動(dòng)情況,作為實(shí)時(shí)策略的補(bǔ)充;同時(shí)定時(shí)檢查心跳包超時(shí)的Broker,從有效的BrokerList中刪除;
  • 8)依據(jù)規(guī)則【BrokerPartitionID = RoomID % BrokerPartitionNum, BrokerReplicaID = RoomID % BrokerPartitionReplicaNum】向某個(gè)Partition的replica轉(zhuǎn)發(fā)Room Message,收到Client的Heatbeat包時(shí)要及時(shí)給予響應(yīng)。


之所以把Room Message和Heartbeat Message放在一個(gè)線程處理,是為了防止進(jìn)程假死這種情況。

當(dāng)/pubsub/broker/partition_num的值發(fā)生改變的時(shí)候(譬如值改為4),意味著Router Partition進(jìn)行了擴(kuò)展,Proxy要及時(shí)獲取新Partition路徑(如/pubsub/broker/Partition2和/pubsub/broker/Partition3)下的實(shí)例,并關(guān)注這些路徑,獲取新Partition下的實(shí)例。

之所以Proxy在獲取Registry下所有當(dāng)前的Broker實(shí)例信息后再注冊自身信息,是因?yàn)榇藭r(shí)它才具有轉(zhuǎn)發(fā)消息的資格。

Proxy轉(zhuǎn)發(fā)某個(gè)Room消息時(shí)候,只發(fā)送給處于Running狀態(tài)的Broker。為Broker Partition內(nèi)所有replica依據(jù)Registry給其分配的replicaID進(jìn)行遞增排序,組成一個(gè)Broker Partition Replica Array,規(guī)則中BrokerPartitionReplicaNum為Array的size,而BrokerReplicaID為replica在Array中的下標(biāo)。

4.4、Pipeline


收到的 Room Message 需要做三部工作:收取 Room Message、消息協(xié)議轉(zhuǎn)換和向 Broker 發(fā)送消息。

初始系統(tǒng)這三步流程如果均放在一個(gè)線程內(nèi)處理,proxy 的整體吞吐率只有 50 000 Msg/s。

最后的實(shí)現(xiàn)方式是按照消息處理的三個(gè)步驟以 pipeline 方式做如下流程處理:
?

  • 1)啟動(dòng) 1 個(gè)消息接收線程和 N【N == Broker Parition 數(shù)目】個(gè)多寫一讀形式的無鎖隊(duì)列【稱之為消息協(xié)議轉(zhuǎn)換隊(duì)列】,消息接收線程分別啟動(dòng)一個(gè) epoll 循環(huán)流程收取消息,然后把消息以相應(yīng)的 hash 算法【隊(duì)列ID = UIN % N】寫入對應(yīng)的消息協(xié)議轉(zhuǎn)換隊(duì)列;
  • 2)啟動(dòng) N 個(gè)線程 和 N * 3 個(gè)一寫一讀的無鎖隊(duì)列【稱之為消息發(fā)送隊(duì)列】,每個(gè)消息協(xié)議專家線程從消息協(xié)議轉(zhuǎn)換隊(duì)列接收到消息并進(jìn)行協(xié)議轉(zhuǎn)換后,根據(jù)相應(yīng)的 hash 算法【隊(duì)列ID = UIN % 3N】寫入消息發(fā)送隊(duì)列;
  • 3)啟動(dòng) 3N 個(gè)消息發(fā)送線程,分別創(chuàng)建與之對應(yīng)的 Broker 的連接,每個(gè)線程單獨(dú)從對應(yīng)的某個(gè)消息發(fā)送隊(duì)列接收消息然后發(fā)送出去。


經(jīng)過以上流水線改造后,Proxy 的整體吞吐率可達(dá) 200 000 Msg/s。

關(guān)于 pipeline 自身的解釋,本文不做詳述,可以參考下圖:

4.5、大房間消息處理


每個(gè) Room 的人數(shù)不均,最簡便的解決方法就是給不同人數(shù)量級的 Room 各搭建一套消息系統(tǒng),不用修改任何代碼。

然所謂需求推動(dòng)架構(gòu)改進(jìn),在系統(tǒng)迭代升級過程中遇到了這樣一個(gè)需求:業(yè)務(wù)方有一個(gè)全國 Room,用于給所有在線用戶進(jìn)行消息推送。針對這個(gè)需求,不可能為了一個(gè)這樣的 Room 單獨(dú)搭建一套系統(tǒng),況且這個(gè) Room 的消息量很少。

如果把這個(gè) Room 的消息直接發(fā)送給現(xiàn)有系統(tǒng),它有可能影響其他 Room 的消息發(fā)送:消息系統(tǒng)是一個(gè)寫放大的系統(tǒng),全國 Room 內(nèi)有系統(tǒng)所有的在線用戶,每次發(fā)送都會卡頓其他 Room 的消息發(fā)送。

最終的解決方案是:使用類似于分區(qū)的方法,把這樣的大 Room 映射為 64 個(gè)虛擬 Room【稱之為 VRoom】。在 Room 號段分配業(yè)務(wù)線的配合下,給消息系統(tǒng)專門保留了一個(gè)號段,用于這種大 Room 的切分,在 Proxy 層依據(jù)一個(gè) hash 方法 【 VRoomID = UserID % 64】 把每個(gè) User 分配到相應(yīng)的 VRoom,其他模塊代碼不用修改即完成了大 Room 消息的路由。

4.6、Broker


Broker詳細(xì)流程如下:
?

  • 1)Broker加載配置,獲取自身所在Partition的ID(假設(shè)為3);
  • 2)向Registry路徑/pubsub/broker/partition3注冊,設(shè)置其狀態(tài)為Init,注冊中心返回的ID作為自身的ID(replicaID);
  • 3)接收Router轉(zhuǎn)發(fā)來的Gateway Message,放入GatewayMessageQueue;
  • 4)從Database加載數(shù)據(jù),把自身所在的Broker Partition所應(yīng)該負(fù)責(zé)的 RoomGatewayList 數(shù)據(jù)加載進(jìn)來;
  • 5)異步處理GatewayMessageQueue內(nèi)的Gateway Message,只處理滿足規(guī)則【PartitionID == RoomID % PartitionNum】的消息,把數(shù)據(jù)存入本地路由信息緩存;
  • 6)修改Registry路徑/pubsub/broker/partition3下自身節(jié)點(diǎn)的狀態(tài)為Running;
  • 7)啟動(dòng)線程實(shí)時(shí)關(guān)注Registry路徑/pubsub/broker/partition_num的值;
  • 8)啟動(dòng)線程定時(shí)查詢Registry路徑/pubsub/broker/partition_num的值;
  • 9)當(dāng)Registry路徑/pubsub/broker/partition_num的值發(fā)生改變的時(shí)候,依據(jù)規(guī)則【PartitionID == RoomID % PartitionNum】清洗本地路由信息緩存中每條數(shù)據(jù);
  • 10)接收Proxy發(fā)來的Room Message,依據(jù)RoomID從路由信息緩存中查找Room有成員登陸的所有Gateway,把消息轉(zhuǎn)發(fā)給這些Gateway。


注意Broker之所以先注冊然后再加載Database中的數(shù)據(jù),是為了在加載數(shù)據(jù)的時(shí)候同時(shí)接收Router轉(zhuǎn)發(fā)來的Gateway Message,但是在數(shù)據(jù)加載完前這些受到的數(shù)據(jù)先被緩存起來,待所有 RoomGatewayList 數(shù)據(jù)加載完后就把這些數(shù)據(jù)重放一遍;

Broker之所以區(qū)分狀態(tài),是為了在加載完畢 RoomGatewayList 數(shù)據(jù)前不對Proxy提供轉(zhuǎn)發(fā)消息的服務(wù),同時(shí)也方便Broker Partition應(yīng)對的消息量增大時(shí)進(jìn)行水平擴(kuò)展。

當(dāng)Broker發(fā)生Partition擴(kuò)展的時(shí)候,新的Partition個(gè)數(shù)必須是2的冪,只有新Partition內(nèi)所有Broker Replica都加載實(shí)例完畢,再更改/pubsub/broker/partition_num的值。

老的Broker也要watch路徑/pubsub/broker/partition_num的值,當(dāng)這個(gè)值增加的時(shí)候,它也需要清洗本地的路由信息緩存。

Broker的擴(kuò)容過程猶如細(xì)胞分裂,形成中的兩個(gè)細(xì)胞有著完全相同的數(shù)據(jù),分裂完成后【Registry路徑/pubsub/broker/partition_num的值翻倍】則需要清洗垃圾信息。這種方法稱為翻倍法。

4.7、Router


Router詳細(xì)流程如下:
?

  • 1)Router加載配置,Registry地址;
  • 2)把自身信息注冊到Registry路徑/pubsub/router下,把Registry返回的ReplicaID作為自身ID;
  • 3)從Registry路徑/pubsub/broker/partition(x)下獲取每個(gè)Broker Partition的各個(gè)replica;
  • 4)從Registry路徑/pubsub/broker/partition_num獲取當(dāng)前有效的Broker Partition Number;

?

  • 5)啟動(dòng)一個(gè)線程關(guān)注Registry上的Broker路徑/pubsub/broker,以實(shí)時(shí)獲取以下信息:
  • ? ? {Broker Partition Number}
  • ? ? - 新的Broker Partition(此時(shí)發(fā)生了擴(kuò)容);
  • ? ? - Broker Partition內(nèi)新的broker replica(Partition內(nèi)發(fā)生了replica擴(kuò)容);
  • ? ? - Broker Parition內(nèi)某replica掛掉的信息;

?

  • 6)定時(shí)向各個(gè)Broker Partition replica發(fā)送心跳,異步等待Broker返回的心跳響應(yīng)包,以探測其活性,以保證不向超時(shí)的replica轉(zhuǎn)發(fā)Gateway Message;
  • 7)啟動(dòng)一個(gè)線程定時(shí)讀取Registry上的Broker路徑/pubsub/broker下各個(gè)子節(jié)點(diǎn)的值,以定時(shí)輪詢的策略觀察Broker Partition Number變動(dòng),以及各Partition的變動(dòng)情況,作為實(shí)時(shí)策略的補(bǔ)充;同時(shí)定時(shí)檢查心跳包超時(shí)的Broker,從有效的BrokerList中刪除;
  • 8)從Database全量加載路由 RoomGatewayList 數(shù)據(jù)放入本地緩存;
  • 9)收取Gateway發(fā)來的心跳消息,及時(shí)返回ack包;
  • 10)收取Gateway轉(zhuǎn)發(fā)來的Gateway Message,按照一定規(guī)則【BrokerPartitionID % BrokerPartitionNum = RoomID % BrokerPartitionNum】轉(zhuǎn)發(fā)給某個(gè)Broker Partition下所有Broker Replica,保證Partition下所有replica擁有同樣的路由 RoomGatewayList 數(shù)據(jù),再把Message內(nèi)數(shù)據(jù)存入本地緩存,當(dāng)檢測到數(shù)據(jù)不重復(fù)的時(shí)候把數(shù)據(jù)異步寫入Database。

?

4.8、Gateway


Gateway詳細(xì)流程如下:
?

  • 1)讀取配置文件,加載Registry地址;
  • 2)從Registry路徑/pubsub/router/下獲取所有router replica,依據(jù)各Replica的ID遞增排序組成replica數(shù)組RouterArray;
  • 3)啟動(dòng)一個(gè)線程實(shí)時(shí)關(guān)注Registry路徑/pubsub/router,以獲取Router的動(dòng)態(tài)變化,及時(shí)更新RouterArray;
  • 4)啟動(dòng)一個(gè)線程定時(shí)輪詢獲取Registry路徑/pubsub/router下各個(gè)Router實(shí)例,作為關(guān)注策略的補(bǔ)充,以期本地RouterArray及時(shí)更新;定時(shí)給各個(gè)Router發(fā)送心跳,異步獲取心跳回包;定時(shí)清除RouterArray中心跳超時(shí)的Router成員;
  • 5)當(dāng)有Room內(nèi)某成員客戶端連接上來或者Room內(nèi)所有成員都不連接當(dāng)前Gateway節(jié)點(diǎn)時(shí),依據(jù)規(guī)則【RouterArrayIndex = RoomID % RouterNum】向某個(gè)Router發(fā)送Gateway Message;
  • 6)收到Broker轉(zhuǎn)發(fā)來的Room Message時(shí),根據(jù)MessageID進(jìn)行去重,如果不重復(fù)則把消息發(fā)送到連接到當(dāng)前Gateway的Room內(nèi)所有客戶端,同時(shí)把MessageID緩存起來以用于去重判斷。


Gateway本地有一個(gè)基于共享內(nèi)存的LRU Cache,存儲最近一段時(shí)間發(fā)送的消息的MessageID。

五、接下來迫切要解決的:系統(tǒng)穩(wěn)定性


系統(tǒng)具有了可擴(kuò)展性僅僅是系統(tǒng)可用的初步,整個(gè)系統(tǒng)要保證最低粒度的SLA(0.99),就必須在兩個(gè)維度對系統(tǒng)的可靠性就行感知:消息延遲和系統(tǒng)內(nèi)部組件的高可用。

5.1、消息延遲


準(zhǔn)確的消息延遲的統(tǒng)計(jì),通用的做法可以基于日志系統(tǒng)對系統(tǒng)所有消息或者以一定概率抽樣后進(jìn)行統(tǒng)計(jì),但限于人力目前沒有這樣做。

目前使用了一個(gè)方法:通過一種構(gòu)造一組偽用戶ID,定時(shí)地把消息發(fā)送給proxy,每條消息經(jīng)過一層就把在這層的進(jìn)入時(shí)間和發(fā)出時(shí)間以及組件自身的一些信息填入消息,這組偽用戶的消息最終會被發(fā)送到一個(gè)偽Gateway端,偽Gateway對這些消息的信息進(jìn)行歸并統(tǒng)計(jì)后,即可計(jì)算出當(dāng)前系統(tǒng)的平均消息延遲時(shí)間。

通過所有消息的平均延遲可以評估系統(tǒng)的整體性能。同時(shí),因?yàn)橄到y(tǒng)消息路由的哈希方式已知,當(dāng)固定時(shí)間內(nèi)偽Gateway沒有收到消息時(shí),就把消息當(dāng)做發(fā)送失敗,當(dāng)某條鏈路失敗一定次數(shù)后就可以產(chǎn)生告警了。

5.2、高可用


上面的方法同時(shí)能夠檢測某個(gè)鏈路是否出問題,但是鏈路具體出問題的點(diǎn)無法判斷,且實(shí)時(shí)性無法保證。

為了保證各個(gè)組件的高可用,系統(tǒng)引入了另一種評估方法:每個(gè)層次都給后端組件發(fā)送心跳包,通過心跳包的延遲和成功率判斷其下一級組件的當(dāng)前的可用狀態(tài)。

譬如proxy定時(shí)給每個(gè)Partition內(nèi)每個(gè)broker發(fā)送心跳,可以依據(jù)心跳的成功率來快速判斷broker是否處于“假死”狀態(tài)(最近業(yè)務(wù)就遇到過broker進(jìn)程還活著,但是對任何收到的消息都不處理的情況)。

同時(shí)依靠心跳包的延遲還可以判斷broker的處理能力,基于此延遲值可在同一Partition內(nèi)多broker端進(jìn)行負(fù)載均衡。

六、進(jìn)一步優(yōu)化:消息可靠性


公司內(nèi)部內(nèi)部原有一個(gè)走tcp通道的群聊消息系統(tǒng),但是經(jīng)過元旦一次大事故(幾乎全線崩潰)后,相關(guān)業(yè)務(wù)的一些重要消息改走這套基于UDP的群聊消息系統(tǒng)了。這些消息如服務(wù)端下達(dá)給客戶端的游戲動(dòng)作指令,是不允許丟失的,但其特點(diǎn)是相對于聊天消息來說量非常小(單人1秒最多一個(gè)),所以需要在目前UDP鏈路傳遞消息的基礎(chǔ)之上再構(gòu)建一個(gè)可靠消息鏈路。

國內(nèi)某IM大廠的消息系統(tǒng)也是以UDP鏈路為基礎(chǔ)的(見《為什么QQ用的是UDP協(xié)議而不是TCP協(xié)議?》),他們的做法是消息重試加ack構(gòu)建了可靠消息穩(wěn)定傳輸鏈路。但是這種做法會降低系統(tǒng)的吞吐率,所以需要獨(dú)辟蹊徑。

UDP通信的本質(zhì)就是偽裝的IP通信,TCP自身的穩(wěn)定性無非是重傳、去重和ack,所以不考慮消息順序性的情況下可以通過重傳與去重來保證消息的可靠性。

基于目前系統(tǒng)的可靠消息傳輸流程如下:

  • 1)Client給每個(gè)命令消息依據(jù)snowflake算法配置一個(gè)ID,復(fù)制三份,立即發(fā)送給不同的Proxy;
  • 2)Proxy收到命令消息以后隨機(jī)發(fā)送給一個(gè)Broker;
  • 3)Broker收到后傳輸給Gateway;
  • 4)Gateway接收到命令消息后根據(jù)消息ID進(jìn)行重復(fù)判斷,如果重復(fù)則丟棄,否則就發(fā)送給APP,并緩存之。


正常的消息在群聊消息系統(tǒng)中傳輸時(shí),Proxy會根據(jù)消息的Room ID傳遞給固定的Broker,以保證消息的有序性。

七、Router需要進(jìn)一步強(qiáng)化

?

7.1、簡述


當(dāng)線上需要部署多套群聊消息系統(tǒng)的時(shí)候,Gateway需要把同樣的Room Message復(fù)制多份轉(zhuǎn)發(fā)給多套群聊消息系統(tǒng),會增大Gateway壓力,可以把Router單獨(dú)獨(dú)立部署,然后把Room Message向所有的群聊消息系統(tǒng)轉(zhuǎn)發(fā)。

Router系統(tǒng)原有流程是:Gateway按照Room ID把消息轉(zhuǎn)發(fā)給某個(gè)Router,然后Router把消息轉(zhuǎn)發(fā)給下游Broker實(shí)例。新部署一套群聊消息系統(tǒng)的時(shí)候,新系統(tǒng)Broker的schema需要通過一套約定機(jī)制通知Router,使得Router自身邏輯過于復(fù)雜。

重構(gòu)后的Router架構(gòu)參照上圖,也采用分Partition分Replica設(shè)計(jì),Partition內(nèi)部各Replica之間采用non-leader機(jī)制;各Router Replica不會主動(dòng)把Gateway Message內(nèi)容push給各Broker,而是各Broker主動(dòng)通過心跳包形式向Router Partition內(nèi)某個(gè)Replica注冊,而后此Replica才會把消息轉(zhuǎn)發(fā)到這個(gè)Broker上。

類似于Broker,Router Partition也以2倍擴(kuò)容方式進(jìn)行Partition水平擴(kuò)展,并通過一定機(jī)制保證擴(kuò)容或者Partition內(nèi)部各個(gè)實(shí)例停止運(yùn)行或者新啟動(dòng)時(shí),盡力保證數(shù)據(jù)的一致性。

Router Replica收到Gateway Message后,replica先把Gateway Message轉(zhuǎn)發(fā)給Partition內(nèi)各個(gè)peer replica,然后再轉(zhuǎn)發(fā)給各個(gè)訂閱者。Router轉(zhuǎn)發(fā)消息的同時(shí)異步把消息數(shù)據(jù)寫入Database。

獨(dú)立Router架構(gòu)下,下面小節(jié)將分別詳述Gateway、Router和Broker三個(gè)相關(guān)模塊的詳細(xì)流程。

7.2、Gateway


Gateway詳細(xì)流程如下:
?

  • 1)從Registry路徑/pubsub/router/partition(x)下獲取每個(gè)Partition的各個(gè)replica;
  • 2)從Registry路徑/pubsub/router/partition_num獲取當(dāng)前有效的Router Partition Number;
  • 3)啟動(dòng)一個(gè)線程關(guān)注Registry上的Router路徑/pubsub/router,以實(shí)時(shí)獲取以下信息:{Router Partition Number} -> 新的Router Partition(此時(shí)發(fā)生了擴(kuò)容);??Partition內(nèi)新的replica(Partition內(nèi)發(fā)生了replica擴(kuò)容);??Parition內(nèi)某replica掛掉的信息;
  • 4)定時(shí)向各個(gè)Partition replica發(fā)送心跳,異步等待Router返回的心跳響應(yīng)包,以探測其活性,以保證不向超時(shí)的replica轉(zhuǎn)發(fā)Gateway Message;
  • 5)啟動(dòng)一個(gè)線程定時(shí)讀取Registry上的Router路徑/pubsub/router下各個(gè)子節(jié)點(diǎn)的值,以定時(shí)輪詢的策略觀察Router Partition Number變動(dòng),以及各Partition的變動(dòng)情況,作為實(shí)時(shí)策略的補(bǔ)充;同時(shí)定時(shí)檢查心跳包超時(shí)的Router,從有效的BrokerList中刪除;
  • 6 依據(jù)規(guī)則向某個(gè)Partition的replica轉(zhuǎn)發(fā)Gateway Message。


第六步的規(guī)則決定了Gateway Message的目的Partition和replica,規(guī)則內(nèi)容有:

如果某Router Partition ID滿足condition(RoomID % RouterPartitionNumber == RouterPartitionID % RouterPartitionNumber),則把消息轉(zhuǎn)發(fā)到此Partition;

這里之所以不采用直接hash方式(RouterPartitionID = RoomID % RouterPartitionNumber)獲取Router Partition,是考慮到當(dāng)Router進(jìn)行2倍擴(kuò)容的時(shí)候當(dāng)所有新的Partition的所有Replica都啟動(dòng)完畢且數(shù)據(jù)一致時(shí)才會修改Registry路徑/pubsub/router/partitionnum的值,按照規(guī)則的計(jì)算公式才能保證新Partition的各個(gè)Replica在啟動(dòng)過程中就可以得到Gateway Message,也即此時(shí)每個(gè)Gateway Message會被發(fā)送到兩個(gè)Router Partition。 當(dāng)Router擴(kuò)容完畢,修改Registry路徑/pubsub/router/partitionnum的值后,此時(shí)新集群進(jìn)入穩(wěn)定期,每個(gè)Gateway Message只會被發(fā)送固定的一個(gè)Partition,condition(RoomID % RouterPartitionNumber == RouterPartitionID % RouterPartitionNumber)等效于condition(RouterPartitionID = RoomID % RouterPartitionNumber)。

如果Router Partition內(nèi)某replia滿足condition(replicaPartitionID = RoomID % RouterPartitionReplicaNumber),則把消息轉(zhuǎn)發(fā)到此replica。

replica向Registry注冊的時(shí)候得到的ID稱之為replicaID,Router Parition內(nèi)所有replica按照replicaID遞增排序組成replica數(shù)組RouterPartitionReplicaArray,replicaPartitionID即為replica在數(shù)組中的下標(biāo)。



Gateway Message數(shù)據(jù)一致性:

Gateway向Router發(fā)送的Router Message內(nèi)容有兩種:某user在當(dāng)前Gateway上進(jìn)入某Room 和 某user在當(dāng)前Gateway上退出某Room,數(shù)據(jù)項(xiàng)分別是UIN(用戶ID)、Room ID、Gateway Addr和User Action(Login or Logout。

由于所有消息都是走UDP鏈路進(jìn)行轉(zhuǎn)發(fā),則這些消息的順序就有可能亂序。Gateway可以統(tǒng)一給其發(fā)出的所有消息分配一個(gè)全局遞增的ID【下文稱為GatewayMsgID,Gateway Message ID】以保證消息的唯一性和全局有序性。

Gateway向Registry注冊臨時(shí)有序節(jié)點(diǎn)時(shí),Registry會給Gateway分配一個(gè)ID,Gateway可以用這個(gè)ID作為自身的Instance ID【假設(shè)這個(gè)ID上限是65535】。

GatewayMsgID字長是64bit,其格式如下:

1

2

// 63 -------------------------- 48 47 -------------- 38 37 ------------ 0

// |? 16bit Gateway Instance ID??? |?? 10bit Reserve??? |??? 38bit自增碼? |

?

7.3、Router


Router系統(tǒng)部署之前,先設(shè)置Registry路徑/pubsub/router/partition_num的值為1。

Router詳細(xì)流程如下:
?

  • 1)Router加載配置,獲取自身所在Partition的ID(假設(shè)為3);
  • 2)向Registry路徑/pubsub/router/partition3注冊,設(shè)置其狀態(tài)為Init,注冊中心返回的ID作為自身的ID(replicaID);
  • 3)注冊完畢會收到Gateway發(fā)來的Gateway Message以及Broker發(fā)來的心跳消息(HeartBeat Message),先緩存到消息隊(duì)列MessageQueue;
  • 4)從Registry路徑/pubsub/router/partition3下獲取自身所在的Partition內(nèi)的各個(gè)replica;
  • 5)從Registry路徑/pubsub/router/partition_num獲取當(dāng)前有效的Router Partition Number;
  • 6)啟動(dòng)一個(gè)線程關(guān)注Registry路徑/pubsub/router,以實(shí)時(shí)獲取以下信息:{Router Partition Number}??-> Partition內(nèi)新的replica(Partition內(nèi)發(fā)生了replica擴(kuò)容);??Parition內(nèi)某replica掛掉的信息;
  • 7)從Database加載數(shù)據(jù);
  • 8)啟動(dòng)一個(gè)線程異步處理MessageQueue內(nèi)的Gateway Message,把Gateway Message轉(zhuǎn)發(fā)給同Partition內(nèi)其他peer replica,然后依據(jù)規(guī)則【RoomID % BrokerPartitionNumber == BrokerReplicaPartitionID % BrokerPartitionNumber】轉(zhuǎn)發(fā)給BrokerList內(nèi)每個(gè)Broker;處理Broker發(fā)來的心跳包,把Broker的信息存入本地BrokerList,然后給Broker發(fā)送回包;
  • 9)修改Registry路徑/pubsub/router/partition3下節(jié)點(diǎn)的狀態(tài)為Running;
  • 10)啟動(dòng)一個(gè)線程定時(shí)讀取Registry路徑/pubsub/router下各個(gè)子路徑的值,以定時(shí)輪詢的策略觀察Router各Partition的變動(dòng)情況,作為實(shí)時(shí)策略的補(bǔ)充;檢查超時(shí)的Broker,把其從BrokerList中剔除;
  • 11)當(dāng)RouterPartitionNum倍增時(shí),Router依據(jù)規(guī)則【RoomID % BrokerPartitionNumber == BrokerReplicaPartitionID % BrokerPartitionNumber】清洗自身路由信息緩存中數(shù)據(jù);
  • 12)Router本地存儲每個(gè)Gateway的最大GatewayMsgID,收到小于GatewayMsgID的Gateway Message可以丟棄不處理,否則就更新GatewayMsgID并根據(jù)上面邏輯進(jìn)行處理。


之所以把Gateway Message和Heartbeat Message放在一個(gè)線程處理,是為了防止進(jìn)程假死這種情況。

Broker也采用了分Partition分Replica機(jī)制,所以向Broker轉(zhuǎn)發(fā)Gateway Message時(shí)候路由規(guī)則,與Gateway向Router轉(zhuǎn)發(fā)消息的路由規(guī)則相同。

另外啟動(dòng)一個(gè)工具,當(dāng)水平擴(kuò)展后新啟動(dòng)的Partition內(nèi)所有Replica的狀態(tài)都是Running的時(shí)候,修改Registry路徑/pubsub/router/partition_num的值為所有Partition的數(shù)目。

7.4、Broker


Broker詳細(xì)流程如下:
?

  • 1)Broker加載配置,獲取自身所在Partition的ID(假設(shè)為3);
  • 2)向Registry路徑/pubsub/broker/partition3注冊,設(shè)置其狀態(tài)為Init,注冊中心返回的ID作為自身的ID(replicaID);
  • 3)從Registry路徑/pubsub/router/partition_num獲取當(dāng)前有效的Router Partition Number;
  • 4)從Registry路徑/pubsub/router/partition(x)下獲取每個(gè)Router Partition的各個(gè)replica;
  • 5)啟動(dòng)一個(gè)線程關(guān)注Registry路徑/pubsub/router,以實(shí)時(shí)獲取以下信息:{Router Partition Number} -> 新的Router Partition(此時(shí)發(fā)生了擴(kuò)容);??Partition內(nèi)新的replica(Partition內(nèi)發(fā)生了replica擴(kuò)容);??Parition內(nèi)某replica掛掉的信息;
  • 6)依據(jù)規(guī)則【RouterPartitionID % BrokerPartitionNum == BrokerPartitionID % BrokerPartitionNum,RouterReplicaID = BrokerReplicaID % BrokerPartitionNum】選定目標(biāo)Router Partition下某個(gè)Router replica,向其發(fā)送心跳消息,包含BrokerPartitionNum、BrokerPartitionID、BrokerHostAddr和精確到秒級的Timestamp,并異步等待所有Router replica的回復(fù),所有Router轉(zhuǎn)發(fā)來的Gateway Message放入GatewayMessageQueue;
  • 7)依據(jù)規(guī)則【BrokerPartitionID == RoomID % BrokerParitionNum】從Database加載數(shù)據(jù);
  • 8)依據(jù)規(guī)則【BrokerPartitionID % BrokerParitionNum == RoomID % BrokerParitionNum】異步處理GatewayMessageQueue內(nèi)的Gateway Message,只留下合乎規(guī)則的消息的數(shù)據(jù);
  • 9)修改Registry路徑/pubsub/broker/partition3下自身節(jié)點(diǎn)的狀態(tài)為Running;
  • 10)啟動(dòng)一個(gè)線程定時(shí)讀取Registry路徑/pubsub/router下各個(gè)子路徑的值,以定時(shí)輪詢的策略觀察Router各Partition的變動(dòng)情況,作為實(shí)時(shí)策略的補(bǔ)充;定時(shí)檢查超時(shí)的Router,某Router超時(shí)后更換其所在的Partition內(nèi)其他Router替換之,定時(shí)發(fā)送心跳包;
  • 11)當(dāng)Registry路徑/pubsub/broker/partition_num的值BrokerPartitionNum發(fā)生改變的時(shí)候,依據(jù)規(guī)則【PartitionID == RoomID % PartitionNum】清洗本地路由信息緩存中每條數(shù)據(jù);
  • 12)接收Proxy發(fā)來的Room Message,依據(jù)RoomID從路由信息緩存中查找Room有成員登陸的所有Gateway,把消息轉(zhuǎn)發(fā)給這些Gateway;
  • 13)Broker本地存儲每個(gè)Gateway的最大GatewayMsgID,收到小于GatewayMsgID的Gateway Message可以丟棄不處理,否則更新GatewayMsgID并根據(jù)上面邏輯進(jìn)行處理。


BrokerPartitionNumber可以小于或者等于或者大于RouterPartitionNumber,兩個(gè)數(shù)應(yīng)該均是2的冪,兩個(gè)集群可以分別進(jìn)行擴(kuò)展,互不影響。譬如BrokerPartitionNumber=4而RouterPartitionNumber=2,則Broker Partition 3只需要向Router Partition 1的某個(gè)follower發(fā)送心跳消息即可;若BrokerPartitionNumber=4而RouterPartitionNumber=8,則Broker Partition 3需要向Router Partition 3的某個(gè)follower發(fā)送心跳消息的同時(shí),還需要向Router Partition 7的某個(gè)follower發(fā)送心跳,以獲取全量的Gateway Message。

Broker需要關(guān)注/pubsub/router/partitionnum和/pubsub/broker/partitionnum的值的變化,當(dāng)router或者broker進(jìn)行parition水平擴(kuò)展的時(shí)候,Broker需要及時(shí)重新構(gòu)建與Router之間的對應(yīng)關(guān)系,及時(shí)變動(dòng)發(fā)送心跳的Router Replica對象【RouterPartitionID = BrokerReplicaID % RouterPartitionNum,RouterPartitionID為Router Replica在PartitionRouterReplicaArray數(shù)組的下標(biāo)】。

當(dāng)Router Partition內(nèi)replica死掉或者發(fā)送心跳包的replica對象死掉(無論是注冊中心通知還是心跳包超時(shí)),broker要及時(shí)變動(dòng)發(fā)送心跳的Router replica對象。

另外,Gateway使用UDP通信方式向Router發(fā)送Gateway Message,如若這個(gè)Message丟失則此Gateway上該Room內(nèi)所有成員一段時(shí)間內(nèi)(當(dāng)有新的成員在當(dāng)前Gateway上加入Room 時(shí)會產(chǎn)生新的Gateway Message)都無法再接收消息,為了保證消息的可靠性,可以使用這樣一個(gè)約束解決問題:在此Gateway上登錄的某Room內(nèi)的人數(shù)少于3時(shí),Gateway會把Gateway Message復(fù)制兩份非連續(xù)(如以10ms為時(shí)間間隔)重復(fù)發(fā)送給某個(gè)Partition leader。因Gateway Message消息處理的冪等性,重復(fù)Gateway Message并不會導(dǎo)致Room Message發(fā)送錯(cuò)誤,只在極少概率的情況下會導(dǎo)致Gateway收到消息的時(shí)候Room內(nèi)已經(jīng)沒有成員在此Gateway登錄,此時(shí)Gateway會把消息丟棄不作處理。

傳遞實(shí)時(shí)消息群聊消息系統(tǒng)的Broker向特定Gateway轉(zhuǎn)發(fā)Room Message的時(shí)候,會帶上Room內(nèi)在此Gateway上登錄的用戶列表,Gateway根據(jù)這個(gè)用戶列表下發(fā)消息時(shí)如果檢測到此用戶已經(jīng)下線,在放棄向此用戶轉(zhuǎn)發(fā)消息的同時(shí),還應(yīng)該把此用戶已經(jīng)下線的消息發(fā)送給Router,當(dāng)Router把這個(gè)消息轉(zhuǎn)發(fā)給Broker后,Broker把此用戶從用戶列表中剔除。通過這種負(fù)反饋機(jī)制保證用戶狀態(tài)更新的及時(shí)性

八、離線消息的處理

?

8.1、簡述


前期的系統(tǒng)只考慮了用戶在線情況下實(shí)時(shí)消息的傳遞,當(dāng)用戶離線時(shí)其消息便無法獲取。

若系統(tǒng)考慮用戶離線消息傳遞,需要考慮如下因素:
?

  • 1)消息固化:保證用戶上線時(shí)收到其離線期間的消息;
  • 2)消息有序:離線消息和在線消息都在一個(gè)消息系統(tǒng)傳遞,給每個(gè)消息分配一個(gè)ID以區(qū)分消息先后順序,消息順序越靠后則ID愈大。


離線消息的存儲和傳輸,需要考慮用戶的狀態(tài)以及每條消息的發(fā)送狀態(tài),整個(gè)消息核心鏈路流程會有大的重構(gòu)。

新消息架構(gòu)如下圖:

?

系統(tǒng)名詞解釋:
?

  • 1)Pi : 消息ID存儲模塊,存儲每個(gè)人未發(fā)送的消息ID有序遞增集合;
  • 2)Xiu : 消息存儲KV模塊,存儲每個(gè)人的消息,給每個(gè)消息分配ID,以ID為key,以消息內(nèi)為value;
  • 3)Gateway Message(HB) : 用戶登錄登出消息,包括APP保活定時(shí)心跳(Hearbeat)消息。


系統(tǒng)內(nèi)部代號貔貅(貔貅者,雄貔雌貅),源自上面兩個(gè)新模塊。

這個(gè)版本架構(gòu)流程的核心思想為“消息ID與消息內(nèi)容分離,消息與用戶狀態(tài)分離”。消息發(fā)送流程涉及到模塊 Client/Proxy/Pi/Xiu,消息推送流程則涉及到模塊 Pi/Xiu/Broker/Router/Gateway。

下面小節(jié)先細(xì)述Pi和Xiu的接口,然后再詳述發(fā)送和推送流程。

8.2、Xiu


Xiu模塊功能名稱是Message Storage,用戶緩存和固化消息,并給消息分配ID。Xiu 集群采用分 Partition 分 Replica 機(jī)制,Partition 初始數(shù)目須是2的倍數(shù),集群擴(kuò)容時(shí)采用翻倍法。
?

8.2.1存儲消息


存儲消息請求的參數(shù)列表為{SnowflakeID,UIN, Message},其流程如下:
?

  • 1)接收客戶端發(fā)來的消息,獲取消息接收人ID(UIN)和客戶端給消息分配的 SnowflakeID;
  • 2)檢查 UIN % Xiu_Partition_Num == Xiu_Partition_ID % Xiu_Partition_Num 添加是否成立【即接收人的消息是否應(yīng)當(dāng)由當(dāng)前Xiu負(fù)責(zé)】,不成立則返回錯(cuò)誤并退出;
  • 3)檢查 SnowflakeID 對應(yīng)的消息是否已經(jīng)被存儲過,若已經(jīng)存儲過則返回其對應(yīng)的消息ID然后退出;
  • 4)給消息分配一個(gè) MsgID:
    每個(gè)Xiu有自己唯一的 Xiu_Partition_ID,以及一個(gè)初始值為 0 的 Partition_Msg_ID。MsgID = 1B[ Xiu_Partition_ID ] + 1B[ Message Type ] + 6B[ ++ Partition_Msg_ID ]。每次分配的時(shí)候 Partition_Msg_ID 都自增加一。
  • 5)以 MsgID 為 key 把消息存入基于共享內(nèi)存的 Hashtable,并存入消息的 CRC32 hash值和插入時(shí)間,把 MsgID 存入一個(gè) LRU list 中:
    LRU List 自身并不存入共享內(nèi)存中,當(dāng)進(jìn)程重啟時(shí),可以根據(jù)Hashtable中的數(shù)據(jù)重構(gòu)出這個(gè)List。把消息存入 Hashtable 中時(shí),如果 Hashtable full,則依據(jù) LRU List 對Hashtable 中的消息進(jìn)行淘汰。
  • 6)把MsgID返回給客戶端;
  • 7)把MsgID異步通知給消息固化線程,消息固化線程根據(jù)MsgID從Hashtable中讀取消息并根據(jù)CRC32 hash值判斷消息內(nèi)容是否完整,完整則把消息存入本地RocksDB中。

?

8.2.2讀取消息


讀取消息請求的參數(shù)列表為{UIN, MsgIDList},其流程為:
?

  • 1)獲取請求的 MsgIDList,判斷每個(gè)MsgID MsgID{Xiu_Partition_ID} == Xiu_Partition_ID 條件是否成立,不成立則返回錯(cuò)誤并退出;
  • 2)從 Hashtable 中獲取每個(gè) MsgID 對應(yīng)的消息;
  • 3)如果 Hashtable 中不存在,則從 RocksDB 中讀取 MsgID 對應(yīng)的消息;
  • 4)讀取完畢則把所有獲取的消息返回給客戶端。

?

8.2.3主從數(shù)據(jù)同步


目前從簡,暫定Xiu的副本只有一個(gè)。

Xiu節(jié)點(diǎn)啟動(dòng)的時(shí)候根據(jù)自身配置文件中分配的 Xiu_Partition_ID 到Registry路徑 /pubsub/xiu/partition_id 下進(jìn)行注冊一個(gè)臨時(shí)有序節(jié)點(diǎn),注冊成功則Registry會返回Xiu的節(jié)點(diǎn) ID。

Xiu節(jié)點(diǎn)獲取 /pubsub/xiu/partition_id 下的所有節(jié)點(diǎn)的ID和地址信息,依據(jù) 節(jié)點(diǎn)ID最小者為leader 的原則,即可判定自己的角色。只有l(wèi)eader可接受讀寫數(shù)據(jù)請求。

數(shù)據(jù)同步流程如下:
?

  • 1)follower定時(shí)向leader發(fā)送心跳信息,心跳信息包含本地最新消息的ID;
  • 2)leader啟動(dòng)一個(gè)數(shù)據(jù)同步線程處理follower的心跳信息,leader的數(shù)據(jù)同步線程從LRU list中查找 follower_latest_msg_id 之后的N條消息的ID,若獲取到則讀取消息并同步給follower,獲取不到則回復(fù)其與leader之間消息差距太大;
  • 3)follower從leader獲取到最新一批消息,則存儲之;
  • 4)follower若獲取leader的消息差距太大響應(yīng),則請求leader的agent把RocksDB的固化數(shù)據(jù)全量同步過來,整理完畢后再次啟動(dòng)與leader之間的數(shù)據(jù)同步流程。


follower會關(guān)注Registry路徑 /pubsub/xiu/partition_id 下所有所有節(jié)點(diǎn)的變化情況,如果leader掛掉則及時(shí)轉(zhuǎn)換身份并接受客戶端請求。如果follower 與 leader 之間的心跳超時(shí),則follower刪掉 leader 的 Registry 路徑節(jié)點(diǎn),及時(shí)進(jìn)行身份轉(zhuǎn)換處理客戶端請求。

當(dāng)leader重啟或者follower轉(zhuǎn)換為leader的時(shí)候,需要把 Partition_Msg_ID 進(jìn)行一個(gè)大數(shù)值增值(譬如增加1000)以防止可能的消息ID亂序情況。
?

8.2.4集群擴(kuò)容


Xiu 集群擴(kuò)容采用翻倍法,擴(kuò)容時(shí)新 Partition 的節(jié)點(diǎn)啟動(dòng)后工作流程如下:
?

  • 1)向Registry的路徑 /pubsub/xiu/partition_id 下自己的 node 的 state 為 running,同時(shí)注冊自己的對外服務(wù)地址信息;
  • 2)另外啟動(dòng)一個(gè)工具,當(dāng)水平擴(kuò)展后所有新啟動(dòng)的 Partition 內(nèi)所有 Replica 的狀態(tài)都是 Running 的時(shí)候,修改 Registry 路徑 /pubsub/xiu/partition_num 的值為擴(kuò)容后 Partition 的數(shù)目。按照開頭的例子,即由2升級為4。


之所以 Xiu 不用像 Broker 和 Router 那樣啟動(dòng)的時(shí)候向老的 Partition 同步數(shù)據(jù),是因?yàn)槊總€(gè) Xiu 分配的 MsgID 中已經(jīng)帶有 Xiu 的 PartitionID 信息,即使集群擴(kuò)容這個(gè) ID 也不變,根據(jù)這個(gè)ID也可以定位到其所在的Partition,而不是借助 hash 方法。

8.3、Pi


Pi 模塊功能名稱是 Message ID Storage,存儲每個(gè)用戶的 MsgID List。Xiu 集群也采用分 Partition 分 Replica 機(jī)制,Partition 初始數(shù)目須是2的倍數(shù),集群擴(kuò)容時(shí)采用翻倍法。
?

8.3.1存儲消息ID


MsgID 存儲的請求參數(shù)列表為{UIN,MsgID},Pi 工作流程如下:
?

  • 1)判斷條件 UIN % Pi_Partition_Num == Pi_Partition_ID % Pi_Partition_Num 是否成立,若不成立則返回error退出;
  • 2)把 MsgID 插入U(xiǎn)IN的 MsgIDList 中,保持 MsgIDList 中所有 MsgID 不重復(fù)有序遞增,把請求內(nèi)容寫入本地log,給請求者返回成功響應(yīng)。


Pi有專門的日志記錄線程,給每個(gè)日志操作分配一個(gè) LogID,每個(gè) Log 文件記錄一定量的寫操作,當(dāng)文件 size 超過配置的上限后刪除之。
?

8.3.2讀取消息ID列表


讀取請求參數(shù)列表為{UIN, StartMsgID, MsgIDNum, ExpireFlag},其意義為獲取用戶 UIN 自起始ID為 StartMsgID 起(不包括 StartMsgID )的數(shù)目為 MsgIDNum 的消息ID列表,ExpireFlag意思是 所有小于等于 StartMsgID 的消息ID是否刪除。?

流程如下:
?

  • 1)判斷條件 UIN % Pi_Partition_Num == Pi_Partition_ID % Pi_Partition_Num 是否成立,若不成立則返回error退出;
  • 2)獲取 (StartID, StartMsgID + MsgIDNum] 范圍內(nèi)的所有 MsgID,把結(jié)果返回給客戶端;
  • 3)如果 ExpireFlag 有效,則刪除MsgIDList內(nèi)所有在 [0, StartMsgID] 范圍內(nèi)的MsgID,把請求內(nèi)容寫入本地log。

?

8.3.3主從數(shù)據(jù)同步


同 Xiu 模塊,暫定 Pi 的同 Parition 副本只有一個(gè)。

Pi 節(jié)點(diǎn)啟動(dòng)的時(shí)候根據(jù)自身配置文件中分配的 Pi_Partition_ID 到Registry路徑 /pubsub/pi/partition_id 下進(jìn)行注冊一個(gè)臨時(shí)有序節(jié)點(diǎn),注冊成功則 Registry 會返回 Pi 的節(jié)點(diǎn) ID。

Pi 節(jié)點(diǎn)獲取 /pubsub/pi/partition_id 下的所有節(jié)點(diǎn)的ID和地址信息,依據(jù) 節(jié)點(diǎn)ID最小者為leader 的原則,即可判定自己的角色。只有 leader 可接受讀寫數(shù)據(jù)請求。

數(shù)據(jù)同步流程如下:
?

  • 1)follower 定時(shí)向 leader 發(fā)送心跳信息,心跳信息包含本地最新 LogID;
  • 2)leader 啟動(dòng)一個(gè)數(shù)據(jù)同步線程處理 follower 的心跳信息,根據(jù) follower 匯報(bào)的 logID 把此 LogID;
  • 3)follower 從 leader 獲取到最新一批 Log,先存儲然后重放。


follower 會關(guān)注Registry路徑 /pubsub/pi/partition_id 下所有節(jié)點(diǎn)的變化情況,如果 leader 掛掉則及時(shí)轉(zhuǎn)換身份并接受客戶端請求。如果follower 與 leader 之間的心跳超時(shí),則follower刪掉 leader 的 Registry 路徑節(jié)點(diǎn),及時(shí)進(jìn)行身份轉(zhuǎn)換處理客戶端請求。
?

8.3.4集群擴(kuò)容


Pi 集群擴(kuò)容采用翻倍法。則節(jié)點(diǎn)啟動(dòng)后工作流程如下:
?

  • 1)向 Registry 注冊,獲取 Registry 路徑 /pubsub/xiu/partition_num 的值 PartitionNumber;
  • 2)如果發(fā)現(xiàn)自己 PartitionID 滿足條件 PartitionID >= PartitionNumber 時(shí),則意味著當(dāng)前 Partition 是擴(kuò)容后的新集群,更新 Registry 中自己狀態(tài)為start;
  • 3)讀取 Registry 路徑 /pubsub/xiu 下所有 Parition 的 leader,根據(jù)條件 自身PartitionID % PartitionNumber == PartitionID % PartitionNumber 尋找對應(yīng)的老 Partition 的 leader,稱之為 parent_leader;
  • 4)緩存收到 Proxy 轉(zhuǎn)發(fā)來的用戶請求;
  • 5)向 parent_leader 獲取log;
  • 6)向 parent_leader 同步內(nèi)存數(shù)據(jù);
  • 7)重放 parent_leader 的log;
  • 8)更新 Registry 中自己的狀態(tài)為 Running;
  • 9)重放用戶請求;
  • 10)當(dāng) Registry 路徑 /pubsub/xiu/partition_num 的值 PartitionNumber 滿足條件 PartitionID >= PartitionNumber 時(shí),意味著擴(kuò)容完成,處理用戶請求時(shí)要給用戶返回響應(yīng)。


Proxy 會把讀寫請求參照條件 UIN % Pi\_Partition\_Num == Pi\_Partition\_ID % Pi\_Partition\_Num 向相關(guān) partition 的 leader 轉(zhuǎn)發(fā)用戶請求。假設(shè)原來 PartitionNumber 值為2,擴(kuò)容后值為4,則原來轉(zhuǎn)發(fā)給 partition0 的寫請求現(xiàn)在需同時(shí)轉(zhuǎn)發(fā)給 partition0 和 partition2,原來轉(zhuǎn)發(fā)給 partition1 的寫請求現(xiàn)在需同時(shí)轉(zhuǎn)發(fā)給 partition1 和 partition3。

另外啟動(dòng)一個(gè)工具,當(dāng)水平擴(kuò)展后所有新啟動(dòng)的 Partition 內(nèi)所有 Replica 的狀態(tài)都是 Running 的時(shí)候,修改Registry路徑/pubsub/xiu/partition_num的值為擴(kuò)容后 Partition 的數(shù)目。

8.4、數(shù)據(jù)發(fā)送流程


消息自 PiXiu 的外部客戶端(Client,服務(wù)端所有使用 PiXiu 提供的服務(wù)者統(tǒng)稱為客戶端)按照一定負(fù)載均衡規(guī)則發(fā)送到 Proxy,然后存入 Xiu 中,把 MsgID 存入 Pi 中。

其詳細(xì)流程如下:
?

  • 1)Client 依據(jù) snowflake 算法給消息分配 SnowflakeID,依據(jù) ProxyID = UIN % ProxyNum 規(guī)則把消息發(fā)往某個(gè) Proxy;
  • 2)Proxy 收到消息后轉(zhuǎn)發(fā)到 Xiu;
  • 3)Proxy 收到 Xiu 返回的響應(yīng)后,把響應(yīng)轉(zhuǎn)發(fā)給 Client;
  • 4)如果 Proxy 收到 Xiu 返回的響應(yīng)帶有 MsgID,則發(fā)起 Pi 寫流程,把 MsgID 同步到 Pi 中;
  • 5)如果 Proxy 收到 Xiu 返回的響應(yīng)帶有 MsgID,則給 Broker 發(fā)送一個(gè) Notify,告知其某 UIN 的最新 MsgID。

?

8.5、數(shù)據(jù)轉(zhuǎn)發(fā)流程


轉(zhuǎn)發(fā)消息的主體是Broker,原來的在線消息轉(zhuǎn)發(fā)流程是它收到 Proxy 轉(zhuǎn)發(fā)來的 Message,然后根據(jù)用戶是否在線然后轉(zhuǎn)發(fā)給 Gateway。

PiXiu架構(gòu)下 Broker 會收到以下類型消息:
?

  • 1)用戶登錄消息;
  • 2)用戶心跳消息;
  • 3)用戶登出消息;
  • 4)Notify 消息;
  • 5)Ack 消息。


Broker流程受這五種消息驅(qū)動(dòng),下面分別詳述其收到這五種消息時(shí)的處理流程。

用戶登錄消息流程如下:
?

  • 1)檢查用戶的當(dāng)前狀態(tài),若為 OffLine 則把其狀態(tài)值為在線 OnLine;
  • 2)檢查用戶的待發(fā)送消息隊(duì)列是否為空,不為空則退出;
  • 3)向 Pi 模塊發(fā)送獲取 N 條消息 ID 的請求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},設(shè)置用戶狀態(tài)為 GettingMsgIDList 并等待回應(yīng);
  • 4)根據(jù) Pi 返回的消息 ID 隊(duì)列,向 Xiu 發(fā)起獲取消息請求 {UIN: uin, MsgIDList: msg ID List},設(shè)置用戶狀態(tài)為 GettingMsgList 并等待回應(yīng);
  • 5)Xiu 返回消息列表后,設(shè)置狀態(tài)為 SendingMsg,并向 Gateway 轉(zhuǎn)發(fā)消息。


可以把用戶心跳消息當(dāng)做用戶登錄消息處理。

Gateway的用戶登出消息產(chǎn)生有三種情況:
?

  • 1)用戶主動(dòng)退出;
  • 2)用戶心跳超時(shí);
  • 3)給用戶轉(zhuǎn)發(fā)消息時(shí)發(fā)生網(wǎng)絡(luò)錯(cuò)誤。


用戶登出消息處理流程如下:
?

  • 1)檢查用戶狀態(tài),如果為 OffLine,則退出;
  • 2)用戶狀態(tài)不為 OffLine 且檢查用戶已經(jīng)發(fā)送出去的消息列表的最后一條消息的 ID(LastMsgID),向 Pi 發(fā)送獲取 MsgID 請求{UIN: uin, StartMsgID: LastMsgID, MsgIDNum: 0, ExpireFlag: True},待 Pi 返回響應(yīng)后退出。


處理 Proxy 發(fā)來的 Notify 消息處理流程如下:
?

  • 1)如果用戶狀態(tài)為 OffLine,則退出;
  • 2)更新用戶的最新消息 ID(LatestMsgID),如果用戶發(fā)送消息隊(duì)列不為空則退出;
  • 3)向 Pi 模塊發(fā)送獲取 N 條消息 ID 的請求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},設(shè)置用戶狀態(tài)為 GettingMsgIDList 并等待回應(yīng);
  • 4)根據(jù) Pi 返回的消息 ID 隊(duì)列,向 Xiu 發(fā)起獲取消息請求 {UIN: uin, MsgIDList: msg ID List},設(shè)置用戶狀態(tài)為 GettingMsgList 并等待回應(yīng);
  • 5)Xiu 返回消息列表后,設(shè)置狀態(tài)為 SendingMsg,并向 Gateway 轉(zhuǎn)發(fā)消息。


所謂 Ack 消息,就是 Broker 經(jīng) Gateway 把消息轉(zhuǎn)發(fā)給 App 后,App 給Broker的消息回復(fù),告知Broker其最近成功收到消息的 MsgID。?

Ack 消息處理流程如下:
?

  • 1)如果用戶狀態(tài)為 OffLine,則退出;
  • 2)更新 LatestAckMsgID 的值;
  • 3)如果用戶發(fā)送消息隊(duì)列不為空,則發(fā)送下一個(gè)消息后退出;
  • 4)如果 LatestAckMsgID >= LatestMsgID,則退出;
  • 5)向 Pi 模塊發(fā)送獲取 N 條消息 ID 的請求 {UIN: uin, StartMsgID: 0, MsgIDNum: N, ExpireFlag: false},設(shè)置用戶狀態(tài)為 GettingMsgIDList 并等待回應(yīng);
  • 6)根據(jù) Pi 返回的消息 ID 隊(duì)列,向 Xiu 發(fā)起獲取消息請求 {UIN: uin, MsgIDList: msg ID List},設(shè)置用戶狀態(tài)為 GettingMsgList 并等待回應(yīng);
  • 7)Xiu 返回消息列表后,設(shè)置狀態(tài)為 SendingMsg,并向 Gateway 轉(zhuǎn)發(fā)消息。


總體上,PiXiu 轉(zhuǎn)發(fā)消息流程采用拉取(pull)轉(zhuǎn)發(fā)模型,以上面五種消息為驅(qū)動(dòng)進(jìn)行狀態(tài)轉(zhuǎn)換,并作出相應(yīng)的動(dòng)作行為。

九、本文總結(jié)


這套群聊消息系統(tǒng)尚有以下task list需完善:
?

  • 1)消息以UDP鏈路傳遞,不可靠【2018/01/29解決之】;
  • 2)目前的負(fù)載均衡算法采用了極簡的RoundRobin算法,可以根據(jù)成功率和延遲添加基于權(quán)重的負(fù)載均衡算法實(shí)現(xiàn);
  • 3)只考慮傳遞,沒有考慮消息的去重,可以根據(jù)消息ID實(shí)現(xiàn)這個(gè)功能【2018/01/29解決之】;
  • 4)各個(gè)模塊之間沒有考慮心跳方案,整個(gè)系統(tǒng)的穩(wěn)定性依賴于Registry【2018/01/17解決之】;
  • 5)離線消息處理【2018/03/03解決之】;
  • 6)區(qū)分消息優(yōu)先級。

網(wǎng)易云信,你身邊的即時(shí)通訊和音視頻技術(shù)專家,了解我們,請戳網(wǎng)易云信官網(wǎng)

想要行業(yè)洞察和技術(shù)干貨,請關(guān)注網(wǎng)易云信博客

本文轉(zhuǎn)載自52im,作者:JackJiang

與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖

總結(jié)

以上是生活随笔為你收集整理的一套高可用、易伸缩、高并发的IM群聊架构方案设计实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。