日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

kafka协调者

發(fā)布時間:2025/4/5 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka协调者 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

2019獨角獸企業(yè)重金招聘Python工程師標準>>>

我們先假設初始時世界是混沌的還沒有盤古的開天辟地,協(xié)調(diào)者也是一片荒蕪人煙之地,沒有保存任何狀態(tài),因為消費組的初始狀態(tài)是Stable,在第一次的Rebalance時,正常的還沒有向消費組注冊過的消費者會執(zhí)行狀態(tài)為Stable而且memberId=UNKNOWN_MEMBER_ID條件分支。在第一次Rebalance之后,每個消費者都分配到了一個成員編號,系統(tǒng)又會進入Stable穩(wěn)定狀態(tài)(Stable穩(wěn)定狀態(tài)包括兩種:一種是沒有任何消費者的穩(wěn)定狀態(tài),一種是有消費者的穩(wěn)定狀態(tài))。因為所有消費者在執(zhí)行一次JoinGroup后并不是說系統(tǒng)就一直保持這種不變的狀態(tài),有可能因為這樣或那樣的事件導致消費者要重新進行JoinGroup,這個時候因為之前JoinGroup過了每個消費者都是有成員編號的,處理方式肯定是不一樣的。

所以定義一種事件驅(qū)動的狀態(tài)機就很有必要了,這世界看起來是雜亂無章的,不過只要遵循著狀態(tài)機的規(guī)則(萬物生長的理論),任何事件都是有跡可循有路可走有條不紊地進行著。

?

private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String,clientHost: String,sessionTimeoutMs: Int,protocolType: String,protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) {if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) {//protocolType對于消費者是consumer,注意這里的協(xié)議類型和PartitionAssignor協(xié)議不同哦//協(xié)議類型目前總共就兩種消費者和Worker,而協(xié)議是PartitionAssignor分配算法responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))} else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {//如果當前組沒有記錄該消費者,而該消費者卻被分配了成員編號,則重置為未知成員,并讓消費者重試responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))} else { group.currentState match {case Dead =>responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))case PreparingRebalance =>if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二個消費者在這里了!addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)} else {val member = group.get(memberId)updateMemberAndRebalance(group, member, protocols, responseCallback)}case Stable =>if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //1.初始時第一個消費者在這里!//如果消費者成員編號是未知的,則向GroupMetadata注冊并被記錄下來addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)} else { //3.第二次Rebalance時第一個消費者在這里,此時要分Leader還是普通的消費者了val member = group.get(memberId)if (memberId == group.leaderId || !member.matches(protocols)) {updateMemberAndRebalance(group, member, protocols, responseCallback)} else {responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId,generationId = group.generationId,subProtocol = group.protocol,leaderId = group.leaderId,errorCode = Errors.NONE.code))}}}if (group.is(PreparingRebalance))joinPurgatory.checkAndComplete(GroupKey(group.groupId))} }

addMemberAndRebalance和updateMemberAndRebalance會創(chuàng)建或更新MemberMetadata,并且會嘗試調(diào)用prepareRebalance,消費組中只有一個消費者有機會調(diào)用prepareRebalance,并且一旦調(diào)用該方法,會將消費組狀態(tài)更改為PreparingRebalance,就會使得下一個消費者只能從case PreparingRebalance入口進去了,假設第一個消費者是從Stable進入的,它更改了狀態(tài)為PreparingRebalance,下一個消費者就不會從Stable進來的。不過進入Stable狀態(tài)還要判斷消費者是不是已經(jīng)有了成員編號,通常是之前已經(jīng)發(fā)生了Rebalance,這種影響也是比較巨大的,每個消費者走的路徑跟第一次的Rebalance是完全不同的迷宮地圖了。

1)第一次Rebalance如圖6-18的上半部分:

  • 第一個消費者,狀態(tài)為Stable,沒有編號,addMemberAndRebalance,成為Leader,執(zhí)行prepareRebalance,更改狀態(tài)為PreparingRebalance,創(chuàng)建DelayedJoin
  • 第二個消費者,狀態(tài)為PreparingRebalance,沒有編號,addMemberAndRebalance(不執(zhí)行prepareRebalance,因為在狀態(tài)改變成PreparingRebalance后就不會被執(zhí)行了);后面的消費者同第二個
  • 所有消費者都要等協(xié)調(diào)者收集完所有成員編號在DelayedJoin完成時才會收到JoinGroup響應
  • ?

    圖6-18 第一次和第二次Rebalance

    2)第二次Rebalance,對于之前加入過的消費者都要成員編號如圖6-18的下半部分:

  • 第一個消費者是Leader,狀態(tài)為Stable,有編號,updateMemberAndRebalance,更改狀態(tài)為PreparingRebalance,創(chuàng)建DelayedJoin
  • 第二個消費者,狀態(tài)為PreparingRebalance,有編號,updateMemberAndRebalance;后面的消費者同第二個
  • 所有消費者也要等待,因為其他消費者發(fā)送Join請求在Leader消費者之后。
  • 3)不過如果有消費者在Leader之前發(fā)送又有點不一樣了如圖6-19:

  • 第一個消費者不是Leader,狀態(tài)為Stable,有編號,responseCallback,立即收到JoinGroup響應,好幸運啊!
  • 第二個消費者如果也不是Leader,恭喜你,協(xié)調(diào)者也放過他,直接返回JoinGroup響應
  • 第三個消費者是Leader(領導來了),狀態(tài)為Stable(什么,你們之前的消費者竟然都沒更新狀態(tài)!,因為他們都沒有add或update),有編號,updateMemberAndRebalance(還是我第一個調(diào)用add或update,看來還是只能我來更新狀態(tài)),更改狀態(tài)為PreparingRebalance,創(chuàng)建DelayedJoin
  • 第四個消費者不是Leader,狀態(tài)為PreparingRebalance,有編號,updateMemberAndRebalance(前面有領導,不好意思了,不能立即返回JoinGroup給你了,你們這些剩下的消費者都只能和領導一起返回了,算你們倒霉)
  • ?

    圖6-19 Leader非第一個發(fā)送JoinGroup請求

    4)如果第一個消費者不是Leader,也沒有編號,說明這是一個新增的消費者,流程又不同了如圖6-20:

  • 第一個消費者不是Leader,狀態(tài)為Stable,沒有編號,addMemberAndRebalance,執(zhí)行prepareRebalance(我是第一個調(diào)用add或update的哦,你們都別想跟我搶這個頭彩了),更改狀態(tài)為PreparingRebalance(我不是Leader但我驕傲啊),創(chuàng)建DelayedJoin(我搶到頭彩,當然創(chuàng)建DelayedJoin的工作只能由我來完成了)
  • 第二個消費者也不是Leader,恭喜你,協(xié)調(diào)者也放過他,直接返回JoinGroup響應
  • 第三個消費者是Leader(領導來了),狀態(tài)為PreparingRebalance(有個新來的不懂規(guī)矩,他已經(jīng)把狀態(tài)改了),有編號,updateMemberAndRebalance(有人已經(jīng)改了,你老就不用費心思了),凡是沒有立即返回響應的,都需要等待,領導也不例外
  • 第四個消費者不是Leader(廢話,只有一個領導,而且領導已經(jīng)在前面了),不會立即返回響應(你看領導都排隊呢)
  • 雖然DelayedJoin是由沒有編號的消費者創(chuàng)建,不過由于DelayedJoin是以消費組為級別的,所以不用擔心,上一次選舉出來的領導還是領導,協(xié)調(diào)者最終還是會把members交給領導,不會是給那個沒有編號的消費者的,雖然說在他注冊的時候已經(jīng)有編號了,但是大家不認啊。不過領導其實不在意是誰開始觸發(fā)prepareRebalance的,那個人要負責生成DelayedJoin,而不管是領導自己還是其他人一旦更改狀態(tài)為PreparingRebalance,后面的消費者都要等待DelayedJoin完成了,而領導者總是要等待的,所以他當然無所謂了,因為他知道最后協(xié)調(diào)者總是會把members交給他的。
  • ?

    圖6-20 新增消費組第一個發(fā)送JoinGroup請求

    根據(jù)上面的幾種場景總結(jié)下來狀態(tài)機的規(guī)則和一些結(jié)論如下:

  • 第一個調(diào)用addMemberAndRebalance或者updateMemberAndRebalance的會將狀態(tài)改為PreparingRebalance,并且負責生成DelayedJoin
  • 一旦狀態(tài)進入PreparingRebalance,其他消費者就只能從PreparingRebalance狀態(tài)入口進入,這里只有兩種選擇addMemberAndRebalance或者updateMemberAndRebalance,不過他們不會更改狀態(tài),也不會生成DelayedJoin
  • 發(fā)生DelayedJoin之后,其他消費者的JoinGroup響應都會被延遲,因為如規(guī)則2中,他們只能調(diào)用add或update,無法立即調(diào)用responseCallback,所以就要和DelayedJoin的那個消費者一起等待
  • 正常流程時,發(fā)生responseCallback的是存在成員編號的消費者在Leader之前發(fā)送了JoinGroup,或者新增加的消費者發(fā)送了JoinGroup請求之前
  • 第一次Rebalance時,第一個消費者會創(chuàng)建DelayedJoin,之后的Rebalance,只有新增的消費者才有機會創(chuàng)建(如果他在Leader之前發(fā)送的話,如果在Leader之后就沒有機會了),而普通消費者總是沒有機會創(chuàng)建DelayedJoin的,因為狀態(tài)為Stable時,他會直接開溜,有人(Leader或者新增加的消費者)創(chuàng)建了DelayedJoin之后,他又在那邊怨天尤人只能等待
  • 轉(zhuǎn)載于:https://my.oschina.net/u/2371517/blog/1142949

    總結(jié)

    以上是生活随笔為你收集整理的kafka协调者的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。