rabbitmq——镜像队列
轉自:http://my.oschina.net/hncscwc/blog/186350?p=1
1. 鏡像隊列的設置
鏡像隊列的配置通過添加policy完成,policy添加的命令為:
rabbitmqctl ?set_policy ?[-p Vhost] ?Name ?Pattern ?Definition ?[Priority]
-p Vhost: ?可選參數,針對指定vhost下的queue進行設置
Name: ?policy的名稱
Pattern: ?queue的匹配模式(正則表達式)
Definition: ?鏡像定義,包括三個部分 ha-mode,ha-params,ha-sync-mode
? ? ? ? ? ? ? ? ?ha-mode: ?指明鏡像隊列的模式,有效值為 all/exactly/nodes
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? all表示在集群所有的節點上進行鏡像
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? exactly表示在指定個數的節點上進行鏡像,節點的個數由ha-params指定
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? nodes表示在指定的節點上進行鏡像,節點名稱通過ha-params指定
? ? ? ? ? ? ? ? ?ha-params: ha-mode模式需要用到的參數
? ? ? ? ? ? ? ? ?ha-sync-mode: ?鏡像隊列中消息的同步方式,有效值為automatic,manually
Priority: ?可選參數, policy的優先級
例如,對隊列名稱以hello開頭的所有隊列進行鏡像,并在集群的兩個節點上完成鏡像,policy的設置命令為:
rabbitmqctl ?set_policy ?hello-ha ?"^hello" ?'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
2. 鏡像隊列的大概實現
(1) 整體介紹
通常隊列由兩部分組成:一部分是amqqueue_process,負責協議相關的消息處理,即接收生產者發布的消息、向消費者投遞消息、處理消息confirm、acknowledge等等;另一部分是backing_queue,它提供了相關的接口供amqqueue_process調用,完成消息的存儲以及可能的持久化工作等。
鏡像隊列同樣由這兩部分組成,amqqueue_process仍舊進行協議相關的消息處理,backing_queue則是由master節點和slave節點組成的一個特殊的backing_queue。master節點和slave節點都由一組進程組成,一個負責消息廣播的gm,一個負責對gm收到的廣播消息進行回調處理。在master節點上回調處理是coordinator,在slave節點上則是mirror_queue_slave。mirror_queue_slave中包含了普通的backing_queue進行消息的存儲,master節點中backing_queue包含在mirror_queue_master中由amqqueue_process進行調用。
注意:消息的發布與消費都是通過master節點完成。master節點對消息進行處理的同時將消息的處理動作通過gm廣播給所有的slave節點,slave節點的gm收到消息后,通過回調交由mirror_queue_slave進行實際的處理。
?
(2) gm(Guaranteed Multicast)
傳統的主從復制方式:由master節點負責向所有slave節點發送需要復制的消息,在復制過程中,如果有slave節點出現異常,master節點需要作出相應的處理;如果是master節點本身出現問題,那么slave節點間可能會進行通信決定本次復制是否繼續。當然為了處理各種異常情況,整個過程中的日志記錄是免不了的。
然而rabbitmq中并沒有采用這種方式,而是將所有的節點形成一個循環鏈表,每個節點都會監控位于自己左右兩邊的節點,當有節點新增時,相鄰的節點保證當前廣播的消息會復制到新的節點上;當有節點失效時,相鄰的節點會接管保證本次廣播的消息會復制到所有節點。
在master節點和slave節點上的這些gm形成一個group,group的信息會記錄在mnesia中。不同的鏡像隊列形成不同的group。
消息從master節點對應的gm發出后,順著鏈表依次傳送到所有節點,由于所有節點組成一個循環鏈表,master節點對應的gm最終會收到自己發送的消息,這個時候master節點就知道消息已經復制到所有slave節點了。
(3) 重要的表結構
rabbit_queue表記錄隊列的相關信息:
?| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | -record(amqqueue, { name,???????????? %%隊列的名稱 durable,????????? %%標識隊列是否持久化 auto_delete,????? %%標識隊列是否自動刪除 exclusive_owner,? %%標識是否獨占模式 arguments,??????? %%隊列創建時的參數 pid,????????????? %%amqqueue_process進程PID slave_pids,?????? %%mirror_queue_slave進程PID集合 sync_slave_pids,? %%已同步的slave進程PID集合 policy,?????????? %%與隊列有關的policy ??????????????????%%通過set_policy設置,沒有則為undefined gm_pids,????????? %%{gm,mirror_queue_coordinator},{gm,mirror_queue_slave}進程PID集合 decorator???????? %% }). |
注意:slave_pids的存儲是按照slave加入的時間來排序的,以便master節點失效時,提升"資格最老"的slave節點為新的master。
gm_group表記錄gm形成的group的相關信息:
?| 1 2 3 4 5 6 | -record(gm_group, { name,???? %%group的名稱,與queue的名稱一致 version,? %%group的版本號, 新增節點/節點失效時會遞增 members,? %%group的成員列表, 按照節點組成的鏈表順序進行排序 }). |
3. 鏡像隊列的一些細節
(1) 新增節點
slave節點先從gm_group中獲取對應group的所有成員信息,然后隨機選擇一個節點并向這個節點發送請求,這個節點收到請求后,更新gm_group對應的信息,同時通知左右節點更新鄰居信息(調整對左右節點的監控)及當前正在廣播的消息,然后回復通知請求節點成功加入group。請求加入group的節點收到回復后再更新rabbit_queue中的相關信息,并根據需要進行消息的同步。
(2) 消息的廣播
消息從master節點發出,順著節點鏈表發送。在這期間,所有的slave節點都會對消息進行緩存,當master節點收到自己發送的消息后,會再次廣播ack消息,同樣ack消息會順著節點鏈表經過所有的slave節點,其作用是通知slave節點可以清除緩存的消息,當ack消息回到master節點時對應廣播消息的生命周期結束。
下圖為一個簡單的示意圖,A節點為master節點,廣播一條內容為"test"的消息。"1"表示消息為廣播的第一條消息;"id=A"表示消息的發送者為節點A。右邊是slave節點記錄的狀態信息。
為什么所有的節點都需要緩存一份發布的消息呢?
master發布的消息是依次經過所有slave節點,在這期間的任何時刻,有可能有節點失效,那么相鄰的節點可能需要重新發送給新的節點。例如,A->B->C->D->A形成的循環鏈表,A為master節點,廣播消息發送給節點B,B再發送給C,如果節點C收到B發送的消息還未發送給D時異常結束了,那么節點B感知后節點C失效后需要重新將消息發送給D。同樣,如果B節點將消息發送給C后,B,C節點中新增了E節點,那么B節點需要再將消息發送給新增的E節點。
gm的狀態記錄:
?| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | -record(state, { self,????????????? %%gm本身的ID left,????????????? %%該節點左邊的節點 right,???????????? %%該節點右邊的節點 group_name,??????? %%group名稱 與隊列名一致 module,??????????? %%回調模塊 rabbit_mirror_queue_slave或者 ???????????????????%%rabbit_mirror_queue_coordinator view,????????????? %%group成員列表視圖信息 ???????????????????%%記錄了成員的ID及每個成員的左右鄰居節點 pub_count,???????? %%當前已發布的消息計數 members_state,???? %%group成員狀態列表 記錄了廣播狀態:[#member{}] callback_args,???? %%回調函數的參數信息 ???????????????????%%rabbit_mirror_queue_slave/rabbit_mirror_queue_coordinator進程PID confirms,????????? %%confirm列表 broadcast_buffer,? %%緩存待廣播的消息 broadcast_timer,?? %%廣播消息定時器 txn_executor?????? }). -record(member, { pending_ack,? %%待確認的消息,也就是已發布的消息緩存的地方 last_pub,???? %%最后一次發布的消息計數 last_ack????? %%最后一次確認的消息計數 }). |
(3) 節點的失效
當slave節點失效時,僅僅是相鄰節點感知,然后重新調整鄰居節點信息、更新rabbit_queue、gm_group的記錄等。如果是master節點失效,"資格最老"的slave節點被提升為master節點,slave節點會創建出新的coordinator,并告知gm修改回調處理為coordinator,原來的mirror_queue_slave充當amqqueue_process處理生產者發布的消息,向消費者投遞消息等。
上面提到如果是slave節點失效,只有相鄰的節點能感知到,那么master節點失效是不是也是只有相鄰的節點能感知到?假如是這樣的話,如果相鄰的節點不是"資格最老"的節點,怎么通知"資格最老"的節點提升為新的master節點呢?
實際上,所有的slave節點在加入group時,mirror_queue_slave進程會對master節點的amqqueue_process進程(也可能是mirror_queue_slave進程)進行監控,如果master節點失效的話,mirror_queue_slave會感知,然后再通過gm進行廣播,這樣所有的節點最終都會知道master節點失效。當然,只有"資格最老"的節點會提升自己為新的master。
另外,在slave提升為master時,mirror_queue_slave內部來了一次"偷梁換柱",即原本需要回調mirror_queue_slave的handle_call/handle_info/handle_cast等接口進行處理的消息,全部改為調用amqqueue_process的handle_call/handle_info/handle_cast等接口,從而可以解釋上面說的,mirror_queue_slave進程充當了amqqueue_process完成協議相關的消息的處理。
?
?| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | rabbit_mirror_queue_slave.erl handle_call({gm_deaths,LiveGMPids},From, ????????????State = #state{q = Q = #amqqueue{name=QName,pid=MPid}})-> ????Self = self(), ????case rabbit_mirror_queue_misc:remove_from_queue(QName, ????????????????????????????????????????????????????Self, ????????????????????????????????????????????????????LiveGMPids) of ????????{ok,Pid,DeadPids} -> ????????????case Pid of ????????????????MPid -> ????????????????????%% master hasn't changed ????????????????????gen_server2:reply(From, ok), ????????????????????noreply(State); ????????????????Self -> ????????????????????%% we've become master ????????????????????QueueState = promote_me(From,State), ????????????????????{become, ?????????????????????%% 改由rabbit_amqqueue_process模塊處理消息 ?????????????????????rabbit_amqqueue_process, ?????????????????????QueueState, hibernate}; ????????????????... gen_server2.erl handle_common_reply(Reply,Msg,GS2State = #gs2_state{name=Name, ????????????????????????????????????????????????????debug=Debug})-> ????case Reply of ????????... ????????{become, Mod, NState, Time1} -> ????????????Debug1=common_become(Name,Mod,NState,Debug), ????????????loop(find_prioritisers( ????????????????GS2State#gs2_state{mod=Mod, ???????????????????????????????????state=NState, ???????????????????????????????????time=Time1, ???????????????????????????????????debug=Debug1})); ????????... handle_msg({'gen_call',From,Msg}, ???????????GS2State=#gs2_state{mod=Mod, ???????????????????????????????state=State, ???????????????????????????????name=Name, ???????????????????????????????debug=Debug}) -> ????case catch Mod:handle_call(Msg, From, State) of ????????... handle_msg(Msg,GS2State=#gs2_state{mod=Mod,state=State})-> ????Reply = (catch dispatch(Msg,Mod,State)), ????handle_common_reply(Reply, Msg, GS2State). dispatch({'$gen_cast',Msg},Mod,State)-> ????Mod:handle_cast(Msg, State); dispatch(Info, Mod, State)-> ????Mod:handle_info(Info,State). |
?
(4) 消息的同步
配置鏡像隊列的時候有個ha-sync-mode屬性,這個有什么用呢?
新節點加入到group后,最多能從左邊節點獲取到當前正在廣播的消息內容,加入group之前已經廣播的消息則無法獲取到。如果此時master節點不幸失效,而新節點有恰好成為了新的master,那么加入group之前已經廣播的消息則會全部丟失。
注意:這里的消息具體是指新節點加入前已經發布并復制到所有slave節點的消息,并且這些消息還未被消費者消費或者未被消費者確認。如果新節點加入前,所有廣播的消息被消費者消費并確認了,master節點刪除消息的同時會通知slave節點完成相應動作。這種情況等同于新節點加入前沒有發布任何消息。
避免這種問題的解決辦法就是對新的slave節點進行消息同步。當ha-sync-mode配置為自動同步(automatic)時,新節點加入group時會自動進行消息的同步;如果配置為manually則需要手動操作完成同步。
轉載于:https://www.cnblogs.com/yanwei-wang/p/4715429.html
總結
以上是生活随笔為你收集整理的rabbitmq——镜像队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: sphinx配置 + php
- 下一篇: iOS学习 NSString常用技巧