日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka设计解析(五): Kafka Consumer设计解析

發(fā)布時間:2024/4/11 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka设计解析(五): Kafka Consumer设计解析 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Kafka設(shè)計解析(五)- Kafka Consumer設(shè)計解析

大數(shù)據(jù)架構(gòu)(郭俊_Jason) · 2015-09-18 08:24



點擊上方 大數(shù)據(jù)架構(gòu) ? 快速關(guān)注



Kafka Consumer設(shè)計解析

  本文主要介紹了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer實現(xiàn)的語義,和適用場景。以及未來版本中對High Level Consumer的重新設(shè)計——使用Consumer Coordinator解決Split Brain和Herd等問題。



High Level Consumer

  很多時候,客戶程序只是希望從Kafka讀取數(shù)據(jù),不太關(guān)心消息offset的處理。同時也希望提供一些語義,例如同一條消息只被某一個Consumer消費(單播)或被所有Consumer消費(廣播)。因此,Kafka Hight Level Consumer提供了一個從Kafka消費數(shù)據(jù)的高層抽象,從而屏蔽掉其中的細節(jié)并提供豐富的語義。   


Consumer Group

  High Level Consumer將從某個Partition讀取的最后一條消息的offset存于Zookeeper中(Kafka從0.8.2版本開始同時支持將offset存于Zookeeper中與將offset存于專用的Kafka Topic中)。這個offset基于客戶程序提供給Kafka的名字來保存,這個名字被稱為Consumer Group。Consumer Group是整個Kafka集群全局的,而非某個Topic的。每一個High Level Consumer實例都屬于一個Consumer Group,若不指定則屬于默認的Group。Zookeeper中Consumer相關(guān)節(jié)點如下圖所示


  
  很多傳統(tǒng)的Message Queue都會在消息被消費完后將消息刪除,一方面避免重復消費,另一方面可以保證Queue的長度比較短,提高效率。而如上文所述,Kafka并不刪除已消費的消息,為了實現(xiàn)傳統(tǒng)Message Queue消息只被消費一次的語義,Kafka保證每條消息在同一個Consumer Group里只會被某一個Consumer消費。與傳統(tǒng)Message Queue不同的是,Kafka還允許不同Consumer Group同時消費同一條消息,這一特性可以為消息的多元化處理提供支持。

  
  實際上,Kafka的設(shè)計理念之一就是同時提供離線處理和實時處理。根據(jù)這一特性,可以使用Storm這種實時流處理系統(tǒng)對消息進行實時在線處理,同時使用Hadoop這種批處理系統(tǒng)進行離線處理,還可以同時將數(shù)據(jù)實時備份到另一個數(shù)據(jù)中心,只需要保證這三個操作所使用的Consumer在不同的Consumer Group即可。下圖展示了Kafka在LinkedIn的一種簡化部署模型。

  
  為了更清晰展示Kafka Consumer Group的特性,筆者進行了一項測試。創(chuàng)建一個Topic (名為topic1),再創(chuàng)建一個屬于group1的Consumer實例,并創(chuàng)建三個屬于group2的Consumer實例,然后通過Producer向topic1發(fā)送Key分別為1,2,3的消息。結(jié)果發(fā)現(xiàn)屬于group1的Consumer收到了所有的這三條消息,同時group2中的3個Consumer分別收到了Key為1,2,3的消息,如下圖所示。

  注:上圖中每個黑色區(qū)域代表一個Consumer實例,每個實例只創(chuàng)建一個MessageStream。實際上,本實驗將Consumer應(yīng)用程序打成jar包,并在4個不同的命令行終端中傳入不同的參數(shù)運行。


High Level Consumer Rebalance

  (本節(jié)所講述Rebalance相關(guān)內(nèi)容均基于Kafka High Level Consumer)
  Kafka保證同一Consumer Group中只有一個Consumer會消費某條消息,實際上,Kafka保證的是穩(wěn)定狀態(tài)下每一個Consumer實例只會消費某一個或多個特定Partition的數(shù)據(jù),而某個Partition的數(shù)據(jù)只會被某一個特定的Consumer實例所消費。也就是說Kafka對消息的分配是以Partition為單位分配的,而非以每一條消息作為分配單元。這樣設(shè)計的劣勢是無法保證同一個Consumer Group里的Consumer均勻消費數(shù)據(jù),優(yōu)勢是每個Consumer不用都跟大量的Broker通信,減少通信開銷,同時也降低了分配難度,實現(xiàn)也更簡單。另外,因為同一個Partition里的數(shù)據(jù)是有序的,這種設(shè)計可以保證每個Partition里的數(shù)據(jù)可以被有序消費。
  如果某Consumer Group中Consumer(每個Consumer只創(chuàng)建1個MessageStream)數(shù)量少于Partition數(shù)量,則至少有一個Consumer會消費多個Partition的數(shù)據(jù),如果Consumer的數(shù)量與Partition數(shù)量相同,則正好一個Consumer消費一個Partition的數(shù)據(jù)。而如果Consumer的數(shù)量多于Partition的數(shù)量時,會有部分Consumer無法消費該Topic下任何一條消息。
  如下例所示,如果topic1有0,1,2共三個Partition,當group1只有一個Consumer(名為consumer1)時,該 Consumer可消費這3個Partition的所有數(shù)據(jù)。
  
  
  增加一個Consumer(consumer2)后,其中一個Consumer(consumer1)可消費2個Partition的數(shù)據(jù)(Partition 0和Partition 1),另外一個Consumer(consumer2)可消費另外一個Partition(Partition 2)的數(shù)據(jù)。
  
  
  再增加一個Consumer(consumer3)后,每個Consumer可消費一個Partition的數(shù)據(jù)。consumer1消費partition0,consumer2消費partition1,consumer3消費partition2。
  
  
  再增加一個Consumer(consumer4)后,其中3個Consumer可分別消費一個Partition的數(shù)據(jù),另外一個Consumer(consumer4)不能消費topic1的任何數(shù)據(jù)。
  
  
  此時關(guān)閉consumer1,其余3個Consumer可分別消費一個Partition的數(shù)據(jù)。
  
  
  接著關(guān)閉consumer2,consumer3可消費2個Partition,consumer4可消費1個Partition。
  
  
  再關(guān)閉consumer3,僅存的consumer4可同時消費topic1的3個Partition。
  

  


Consumer Rebalance算法

* 將目標Topic下的所有Partirtion排序,存于PT

* 對某Consumer Group下所有Consumer排序,存于CG,第i

個Consumer記為Ci

* N=size(PT)/size(CG),向上取整

* 解除Ci對原來分配的Partition的消費權(quán)(i從0開始)

* 將第i*N到(i+1)*N個Partition分配給Ci

  

Consumer Rebalance流程  

  目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每一個Consumer通過在Zookeeper上注冊Watch完成的。每個Consumer被創(chuàng)建時會觸發(fā)Consumer Group的Rebalance,具體啟動流程如下:

* High Level Consumer啟動時將其ID注冊到其Consumer Group下,在Zookeeper上的路徑為/consumers/[consumer group]/ids/[consumer id]

* 在/consumers/[consumer group]/ids上注冊Watch

* 在/brokers/ids上注冊Watch

* 如果Consumer通過Topic Filter創(chuàng)建消息流,則它會同時在/brokers/topics上也創(chuàng)建Watch

* 強制自己在其Consumer Group內(nèi)啟動Rebalance流程


  在這種策略下,每一個Consumer或者Broker的增加或者減少都會觸發(fā)Consumer Rebalance。因為每個Consumer只負責調(diào)整自己所消費的Partition,為了保證整個Consumer Group的一致性,當一個Consumer觸發(fā)了Rebalance時,該Consumer Group內(nèi)的其它所有其它Consumer也應(yīng)該同時觸發(fā)Rebalance。該方式有如下缺陷:

Herd effect
  任何Broker或者Consumer的增減都會觸發(fā)所有的Consumer的Rebalance


Split Brain
  每個Consumer分別單獨通過Zookeeper判斷哪些Broker和Consumer 宕機了,那么不同Consumer在同一時刻從Zookeeper“看”到的View就可能不一樣,這是由Zookeeper的特性決定的,這就會造成不正確的Reblance嘗試。


調(diào)整結(jié)果不可控
  所有的Consumer都并不知道其它Consumer的Rebalance是否成功,這可能會導致Kafka工作在一個不正確的狀態(tài)。


  根據(jù)Kafka社區(qū)wiki,Kafka作者正在考慮在還未發(fā)布的0.9.x版本中使用中心協(xié)調(diào)器(Coordinator)。大體思想是為所有Consumer Group的子集選舉出一個Broker作為Coordinator,由它Watch Zookeeper,從而判斷是否有Partition或者Consumer的增減,然后生成Rebalance命令,并檢查是否這些Rebalance在所有相關(guān)的Consumer中被執(zhí)行成功,如果不成功則重試,若成功則認為此次Rebalance成功(這個過程跟Replication Controller非常類似)。具體方案將在后文中詳細闡述。

   

Low Level Consumer

  使用Low Level Consumer(Simple Consumer)的主要原因是,用戶希望比Consumer Group更好的控制數(shù)據(jù)的消費。例如:

* 同一條消息讀多次

* 只讀取某個Topic的部分Partition

* 管理事務(wù),從而確保每條消息被處理一次,且僅被處理一次


  與Consumer Group相比,Low Level Consumer要求用戶做大量的額外工作。

* 必須在應(yīng)用程序中跟蹤offset,從而確定下一條應(yīng)該消費哪條消息

* 應(yīng)用程序需要通過程序獲知每個Partition的Leader是誰

* 必須處理Leader的變化


  使用Low Level Consumer的一般流程如下

* 查找到一個“活著”的Broker,并且找出每個Partition的Leader

* 找出每個Partition的Follower

* 定義好請求,該請求應(yīng)該能描述應(yīng)用程序需要哪些數(shù)據(jù)

* Fetch數(shù)據(jù)

* 識別Leader的變化,并對之作出必要的響應(yīng)


Consumer 重新設(shè)計

  根據(jù)社區(qū)社區(qū)wiki,Kafka在0.9.*版本中,重新設(shè)計Consumer可能是最重要的Feature之一。本節(jié)會根據(jù)社區(qū)wiki介紹Kafka 0.9.*中對Consumer可能的設(shè)計方向及思路。


設(shè)計方向

簡化消費者客戶端
  部分用戶希望開發(fā)和使用non-java的客戶端。現(xiàn)階段使用non-java發(fā)SimpleConsumer比較方便,但想開發(fā)High Level Consumer并不容易。因為High Level Consumer需要實現(xiàn)一些復雜但必不可少的失敗探測和Rebalance。如果能將消費者客戶端更精簡,使依賴最小化,將會極大的方便non-java用戶實現(xiàn)自己的Consumer。
  
中心Coordinator
  如上文所述,當前版本的High Level Consumer存在Herd Effect和Split Brain的問題。如果將失敗探測和Rebalance的邏輯放到一個高可用的中心Coordinator,那么這兩個問題即可解決。同時還可大大減少Zookeeper的負載,有利于Kafka Broker的Scale Out。
  
允許手工管理offset
  一些系統(tǒng)希望以特定的時間間隔在自定義的數(shù)據(jù)庫中管理Offset。這就要求Consumer能獲取到每條消息的metadata,例如Topic,Partition,Offset,同時還需要在Consumer啟動時得到每個Partition的Offset。實現(xiàn)這些,需要提供新的Consumer API。同時有個問題不得不考慮,即是否允許Consumer手工管理部分Topic的Offset,而讓Kafka自動通過Zookeeper管理其它Topic的Offset。一個可能的選項是讓每個Consumer只能選取1種Offset管理機制,這可極大的簡化Consumer API的設(shè)計和實現(xiàn)。
  
Rebalance后觸發(fā)用戶指定的回調(diào)
  一些應(yīng)用可能會在內(nèi)存中為每個Partition維護一些狀態(tài),Rebalance時,它們可能需要將該狀態(tài)持久化。因此該需求希望支持用戶實現(xiàn)并指定一些可插拔的并在Rebalance時觸發(fā)的回調(diào)。如果用戶使用手動的Offset管理,那該需求可方便得由用戶實現(xiàn),而如果用戶希望使用Kafka提供的自動Offset管理,則需要Kafka提供該回調(diào)機制。


非阻塞式Consumer API
  該需求源于那些實現(xiàn)高層流處理操作,如filter by, group by, join等,的系統(tǒng)。現(xiàn)階段的阻塞式Consumer幾乎不可能實現(xiàn)Join操作。


如何通過中心Coordinator實現(xiàn)Rebalance

  成功Rebalance的結(jié)果是,被訂閱的所有Topic的每一個Partition將會被Consumer Group內(nèi)的一個(有且僅有一個)Consumer擁有。每一個Broker將被選舉為某些Consumer Group的Coordinator。某個Cosnumer Group的Coordinator負責在該Consumer Group的成員變化或者所訂閱的Topic的Partititon變化時協(xié)調(diào)Rebalance操作。


Consumer
  1) Consumer啟動時,先向Broker列表中的任意一個Broker發(fā)送ConsumerMetadataRequest,并通過ConsumerMetadataResponse獲取它所在Group的Coordinator信息。ConsumerMetadataRequest和ConsumerMetadataResponse的結(jié)構(gòu)如下

ConsumerMetadataRequest
{
GroupId => String}

ConsumerMetadataResponse
{
ErrorCode => int16
Coordinator => Broker
}

  2)Consumer連接到Coordinator并發(fā)送HeartbeatRequest,如果返回的HeartbeatResponse沒有任何錯誤碼,Consumer繼續(xù)fetch數(shù)據(jù)。若其中包含IllegalGeneration錯誤碼,即說明Coordinator已經(jīng)發(fā)起了Rebalance操作,此時Consumer停止fetch數(shù)據(jù),commit offset,并發(fā)送JoinGroupRequest給它的Coordinator,并在JoinGroupResponse中獲得它應(yīng)該擁有的所有Partition列表和它所屬的Group的新的Generation ID。此時Rebalance完成,Consumer開始fetch數(shù)據(jù)。相應(yīng)Request和Response結(jié)構(gòu)如下

HeartbeatRequest
{
GroupId => String
GroupGenerationId => int32
ConsumerId => String}

HeartbeatResponse
{
ErrorCode => int16
}

JoinGroupRequest
{
GroupId => String
SessionTimeout => int32
Topics => [String]
ConsumerId => String
PartitionAssignmentStrategy => String}

JoinGroupResponse
{
ErrorCode => int16
GroupGenerationId => int32
ConsumerId => String
PartitionsToOwn => [TopicName [Partition]]
}
TopicName => StringPartition => int32


Consumer狀態(tài)機
  
Down:Consumer停止工作
Start up & discover coordinator:Consumer檢測其所在Group的Coordinator。一旦它檢測到Coordinator,即向其發(fā)送JoinGroupRequest。
Part of a group:該狀態(tài)下,Consumer已經(jīng)是該Group的成員,并周期性發(fā)送HeartbeatRequest。如HeartbeatResponse包含IllegalGeneration錯誤碼,則轉(zhuǎn)換到Stopped Consumption狀態(tài)。若連接丟失,HeartbeatResponse包含NotCoordinatorForGroup錯誤碼,則轉(zhuǎn)換到Rediscover coordinator狀態(tài)。
Rediscover coordinator:該狀態(tài)下,Consumer不停止消費而是嘗試通過發(fā)送ConsumerMetadataRequest來探測新的Coordinator,并且等待直到獲得無錯誤碼的響應(yīng)。
Stopped consumption:該狀態(tài)下,Consumer停止消費并提交offset,直到它再次加入Group。
  
    
故障檢測機制
  Consumer成功加入Group后,Consumer和相應(yīng)的Coordinator同時開始故障探測程序。Consumer向Coordinator發(fā)起周期性的Heartbeat(HeartbeatRequest)并等待響應(yīng),該周期為 session.timeout.ms/heartbeat.frequency。若Consumer在session.timeout.ms內(nèi)未收到HeartbeatResponse,或者發(fā)現(xiàn)相應(yīng)的Socket channel斷開,它即認為Coordinator已宕機并啟動Coordinator探測程序。若Coordinator在session.timeout.ms內(nèi)沒有收到一次HeartbeatRequest,則它將該Consumer標記為宕機狀態(tài)并為其所在Group觸發(fā)一次Rebalance操作。
  Coordinator Failover過程中,Consumer可能會在新的Coordinator完成Failover過程之前或之后發(fā)現(xiàn)新的Coordinator并向其發(fā)送HeatbeatRequest。對于后者,新的Cooodinator可能拒絕該請求,致使該Consumer重新探測Coordinator并發(fā)起新的連接請求。如果該Consumer向新的Coordinator發(fā)送連接請求太晚,新的Coordinator可能已經(jīng)在此之前將其標記為宕機狀態(tài)而將之視為新加入的Consumer并觸發(fā)一次Rebalance操作。


Coordinator
  1)穩(wěn)定狀態(tài)下,Coordinator通過上述故障探測機制跟蹤其所管理的每個Group下的每個Consumer的健康狀態(tài)。
  2)剛啟動時或選舉完成后,Coordinator從Zookeeper讀取它所管理的Group列表及這些Group的成員列表。如果沒有獲取到Group成員信息,它不會做任何事情直到某個Group中有成員注冊進來。
  3)在Coordinator完成加載其管理的Group列表及其相應(yīng)的成員信息之前,它將為HeartbeatRequest,OffsetCommitRequest和JoinGroupRequests返回CoordinatorStartupNotComplete錯誤碼。此時,Consumer會重新發(fā)送請求。
  4)Coordinator會跟蹤被其所管理的任何Consumer Group注冊的Topic的Partition的變化,并為該變化觸發(fā)Rebalance操作。創(chuàng)建新的Topic也可能觸發(fā)Rebalance,因為Consumer可以在Topic被創(chuàng)建之前就已經(jīng)訂閱它了。
  Coordinator發(fā)起Rebalance操作流程如下所示。

Coordinator狀態(tài)機
  
Down:Coordinator不再擔任之前負責的Consumer Group的Coordinator
Catch up:該狀態(tài)下,Coordinator競選成功,但還未能做好服務(wù)相應(yīng)請求的準備。
Ready:該狀態(tài)下,新競選出來的Coordinator已經(jīng)完成從Zookeeper中加載它所負責管理的所有Group的metadata,并可開始接收相應(yīng)的請求。
Prepare for rebalance:該狀態(tài)下,Coordinator在所有HeartbeatResponse中返回IllegalGeneration錯誤碼,并等待所有Consumer向其發(fā)送JoinGroupRequest后轉(zhuǎn)到Rebalancing狀態(tài)。
Rebalancing:該狀態(tài)下,Coordinator已經(jīng)收到了JoinGroupRequest請求,并增加其Group Generation ID,分配Consumer ID,分配Partition。Rebalance成功后,它會等待接收包含新的Consumer Generation ID的HeartbeatRequest,并轉(zhuǎn)至Ready狀態(tài)。


Coordinator Failover
  如前文所述,Rebalance操作需要經(jīng)歷如下幾個階段
  1)Topic/Partition的改變或者新Consumer的加入或者已有Consumer停止,觸發(fā)Coordinator注冊在Zookeeper上的watch,Coordinator收到通知準備發(fā)起Rebalance操作。
  2)Coordinator通過在HeartbeatResponse中返回IllegalGeneration錯誤碼發(fā)起Rebalance操作。
  3)Consumer發(fā)送JoinGroupRequest
  4)Coordinator在Zookeeper中增加Group的Generation ID并將新的Partition分配情況寫入Zookeeper
  5)Coordinator發(fā)送JoinGroupResponse
  
  在這個過程中的每個階段,Coordinator都可能出現(xiàn)故障。下面給出Rebalance不同階段中Coordinator的Failover處理方式。
  1)如果Coordinator的故障發(fā)生在第一階段,即它收到Notification并未來得及作出響應(yīng),則新的Coordinator將從Zookeeper讀取Group的metadata,包含這些Group訂閱的Topic列表和之前的Partition分配。如果某個Group所訂閱的Topic數(shù)或者某個Topic的Partition數(shù)與之前的Partition分配不一致,亦或者某個Group連接到新的Coordinator的Consumer數(shù)與之前Partition分配中的不一致,新的Coordinator會發(fā)起Rebalance操作。
  2)如果失敗發(fā)生在階段2,它可能對部分而非全部Consumer發(fā)出帶錯誤碼的HeartbeatResponse。與第上面第一種情況一樣,新的Coordinator會檢測到Rebalance的必要性并發(fā)起一次Rebalance操作。如果Rebalance是由Consumer的失敗所觸發(fā)并且Cosnumer在Coordinator的Failover完成前恢復,新的Coordinator不會為此發(fā)起新的Rebalance操作。
  3)如果Failure發(fā)生在階段3,新的Coordinator可能只收到部分而非全部Consumer的JoinGroupRequest。Failover完成后,它可能收到部分Consumer的HeartRequest及另外部分Consumer的JoinGroupRequest。與第1種情況類似,它將發(fā)起新一輪的Rebalance操作。
  4)如果Failure發(fā)生在階段4,即它將新的Group Generation ID和Group成員信息寫入Zookeeper后。新的Generation ID和Group成員信息以一個原子操作一次性寫入Zookeeper。Failover完成后,Consumer會發(fā)送HeartbeatRequest給新的Coordinator,并包含舊的Generation ID。此時新的Coordinator通過在HeartbeatResponse中返回IllegalGeneration錯誤碼發(fā)起新的一輪Rebalance。這也解釋了為什么每次HeartbeatRequest中都需要包含Generation ID和Consumer ID。
  5)如果Failure發(fā)生在階段5,舊的Coordinator可能只向Group中的部分Consumer發(fā)送了JoinGroupResponse。收到JoinGroupResponse的Consumer在下次向已經(jīng)失效的Coordinator發(fā)送HeartbeatRequest或者提交Offset時會檢測到它已經(jīng)失敗。此時,它將檢測新的Coordinator并向其發(fā)送帶有新的Generation ID 的HeartbeatRequest。而未收到JoinGroupResponse的Consumer將檢測新的Coordinator并向其發(fā)送JoinGroupRequest,這將促使新的Coordinator發(fā)起新一輪的Rebalance。

總結(jié)

以上是生活随笔為你收集整理的Kafka设计解析(五): Kafka Consumer设计解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。