kafka 同步提交 异步_极限MQ (5) Kafka 消费者
要想知道如何從 Kafka 讀取消息,需要先了解消費者和消費者群組的概念。
假設我們有一個應用程序需要從 Kafka 主題讀取消息井驗證這些消息,然后再把它們保存起來。應用程序需要創(chuàng)建一個消費者對象,訂閱主題并開始接收消息,然后驗證消息井保存結果。
過了一陣子,生產者往主題寫入消息的速度超過了應用程序驗證數(shù)據(jù)的速度,這個時候該怎么辦?如果只使用單個消費者處理消息,應用程序會永遠跟不上消息生成的速度。顯然,此時很有必要對消費者進行橫向伸縮。就像多個生產者可以向相同的主題寫入消息一樣,我們也可以使用多個消費者從同一個主題讀取消息,對消息進行分流。
Kafka 消費者從屬于消費者群組。一個群組里的消費者訂閱的是同一個主題,每個消費者只接收主題的部分分區(qū)的消息。
- 假設主題 T1 有4 個分區(qū),我們創(chuàng)建了消費者 C1 ,它是群組 G1 里唯一的消費者,我們用它訂閱主題 T1 。那么消費者 C1 將收到主題 T1 全部 4 個分區(qū)的消息。
- 如果在群組 G1 里新增一個消費者 C2 ,那么每個消費者將分別從兩個分區(qū)接收消息。消費者 C1 接收分區(qū) 1 和分區(qū) 2 的消息,消費者 C2 接收分區(qū) 3 和分區(qū) 4 的消息。
- 如果群組有 4 個消費者,那么每個消費者可以分配到一個分區(qū)。
- 如果我們往群組里添加更多的消費者,甚至超過主題的分區(qū)數(shù)量,那么多出來的消費者就會被 閑置,不會接收到任何消息。
往群組里增加消費者是橫向伸縮消費能力的主要方式。
Kafka 消費者經常會做一些高延遲的操作,比如把數(shù)據(jù)寫到數(shù)據(jù)庫或 HDFS ,或者使用數(shù)據(jù)進行比較耗時的計算。在這些情況下,單個消費者無法跟上數(shù)據(jù)生成的速度,所以可以增加更多的消費者,讓它們分擔 載,每個消費者只處理部分分區(qū)的消息,這就是橫向伸縮的主要手段。
我們有必要為主題創(chuàng)建大量的分區(qū),這樣在負載增長時可以加入更多的消費者。不過不要讓消費者的數(shù)量超過主題分區(qū)的數(shù)量,多余的消費者只會被閑置。
除了通過增加消費者來橫向伸縮單個應用程序外,還經常出現(xiàn)多個應用程序從同主題讀取數(shù)據(jù)的情況。
實際上, Kafka 設計的主要目標之一 ,就是要讓 Kafka 主題里的數(shù)據(jù)能夠滿足企業(yè)各種應用場景的需求。在這些場景里,每個應用程序可以獲取到所有的消息, 而不只是其中的一部分。那么只要保證每個應用程序有自己的消費者群組,就可以讓它們獲取到主題所有的消息。不同于傳統(tǒng)的消息系統(tǒng),橫向伸縮 Kafka 消費者和消費者群組并不對性能造成負面影響。
在上面的例子里,如果新增一個只包含一個消費者的群組 G2 ,那么這個消費者將從主題 T1上接收所有的消息,與群組 G1之間互不影響。群組 G2 可以增加更多的消費者,每個消費者可以消費若干個分區(qū),就像群組 G1 那樣。
簡而言之,為每一個需要獲取一個或多個主題全部消息的應用程序創(chuàng)建一個消費者群組, 然后往群組里添加消費者來伸縮讀取能力和處理能力,群組里的每個消費者只處理部分消息。
然而,分區(qū)并非如此簡單。當一個群組的消費者增加和減少時,當這個主題的分區(qū)數(shù)量變化時,有很多問題要處理。
分區(qū)的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡。
再均衡非常重要, 它為群組帶來了高可用性和伸縮性(可以放心地添加或移除梢費者)。
不過在正常情況下,我們并不希望發(fā)生這樣的行為。在再均衡期間,消費者無法讀取消息,造成整個群組小段時間的不可用。
另外,當分區(qū)被重新分配給一個消費者時,消費者當前的讀取狀態(tài)會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態(tài)之前會拖慢程序。
分配分區(qū)是怎樣進行的?
當消費者要加入群組時,它會向群組協(xié)調器發(fā)送 Join Group 請求。第一個加入群組的消費者將成為“群主”。群主從協(xié)調器那里獲得群組的成員列表(列表中包含了所有最近發(fā)送過心跳的消費者,它們被認為是活躍的), 并負責給每一個消費者分配分區(qū)。
消費者通過向被指派為群組協(xié)調器的 broker (不同的群組可以有不同的協(xié)調器)發(fā)送心跳來維持它們和群組的從屬關系以及它們對分區(qū)的所有權關系。只要消費者以正常的時間間隔發(fā)送心跳,就被認為是活躍的,說明它還在讀取分區(qū)里的消息。消費者會在輪詢消息 (為了獲取消息)或提交偏移量時發(fā)送心跳。如果消費者停止發(fā)送心跳的時間足夠長,會話就會過期,群組協(xié)調器認為它已經死亡,就會觸發(fā)一次再均衡。
創(chuàng)建消費者對象與創(chuàng)建生產者對象非常相似。
唯一不同的屬性是 group.i.d ,而且甚至不是必填的,不過我們現(xiàn)在姑且認為它是必需 。它指定了屬于哪 個消費者群組。創(chuàng)建不屬于任何一個群組的消費者也是可以的,只是這樣不太常見。訂閱主題非常簡單,甚至支持正則。
consumer .subscribe( "test.*" );輪詢
輪訓是消費者 API 的核心,通過一個簡單的輪詢向服務器請求數(shù)據(jù)。一旦消費者訂閱 了主題,輪詢就會處理所有的細節(jié),包括群組協(xié)調、分區(qū)再均衡、發(fā)送心跳和獲取數(shù)據(jù),使用者只需要使用一組簡單的 API 來處理從分區(qū)返回的數(shù)據(jù)即可。消費者代碼的主要部分如下所示:
try{while (true) {ConsumerRecords<String String> records = consumer.poll(100);for(ConsumerRecord<String String> record :records){int updatedCount = 1;if(custCountryMap.countainsValue(record.value()) {updatedCount = custCountryMap.get (record.value() + 1);}custCountryMap.put(record.value(), updatedCount)System.out.println(custCountryMap);} finally {consumer.close(); }- 一個無線循環(huán)。
- poll 方法非常重要。就像鯊魚停止移動就會死掉一樣,消費者必須持續(xù)對 Kafka 進行輪詢,否則會被認為己經死亡 ,其負責的分區(qū)會被移交給群組里的其他消費者。傳給 poll 方法 參數(shù)是一個超時時間,用于控制 poll 方法的阻塞時間。如果該參數(shù)被設為 0, poll 會立即返回。
- poll 返回的是記錄列表。每條記錄都包含了記錄所屬主題的信息、分區(qū)的信息、分區(qū)偏移量 ,以及記錄的鍵值對。一般會遍歷這個列表 ,逐條處理這些記錄。
- 一般落地的處理結果就是結果保存起來或者對已有的記錄進行更新,處理過程也隨之結束。
- 在退出應用程序之前會觸發(fā)一次再均衡,而不是等待群組協(xié)調器發(fā)現(xiàn)它不再發(fā)送心跳井認定它已死亡, 因為那樣需要更長的時間,導致整個群組在一段時間內無法讀取消息。
在第一次調用新消費者的 poll 方法時,它會負責查找 GroupCoordinator 然后加入群組,接受分配分區(qū)。 如果發(fā)生了再均衡,整個過程也會在輪詢期間進行 。當然 ,心跳也會在輪詢里發(fā)迭出去的。
注意,一個消費者活在一個獨立的線程里。
提交和偏移量
Kafka 不會像其它隊列那樣需要得到消費者的確認,這是 Kafka 的獨特之處。
我們把更新分區(qū)當前位置的操作叫作提交。
那么消費者是如何提交偏移量的呢?
消費者往一個叫作 _consumer_offset d 特殊主題發(fā)送消息,消息里包含每個分區(qū)的偏移量。
如果消費者一直處于運行狀態(tài),那么偏移量就沒有什么用處。不過,如果發(fā)生崩壞或者有新的消費者加入群組,就會觸發(fā)再均衡,完成再均衡之后,每個消費者可能分配到新的分區(qū),而不是之前處理的那個。
為了能夠繼續(xù)之前的工作,消費者需要讀取每個分區(qū)最后一次提交的偏移量,然后從偏移量指定的地方繼續(xù)處理。
但是,
如果提交的偏移量小于客戶端處理的最后一個消息的偏移量 ,那么處于兩個偏移量之間的消息就會被重復處理。
如果提交的偏移量大于客戶端處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息將會丟失。
處理偏移量的方式對客戶端會有很大的影響。
自動提交
最簡單的提交方式是讓悄費者自動提交偏移量。
如果 enable.auto.commit 被設為 true ,那么每過5s,消費者會自動把從 poll 方法接收到的最大偏移量提交上去。自動提交也是在輪詢里進行的。
可是,假設使用默認的 5s 提交時間間隔,在最近一次提交之后的 3s 發(fā)生了再均衡。再均衡之后,消費者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經落后 3s ,所以在這 3s 內到達的消息會被重復處理。
自動提交不能避免重復消息。
開發(fā)者同步提交當前偏移量
把 enable.auto.commit 設為 false ,讓應用程序決定時提交偏移量。使用 commit.Sync() 提交偏移量。
commit.Sync() 將會提交由 poll 返回的最新偏移量。
如果發(fā)生了再均衡,從最近一批消息到發(fā)生再均衡之間的所有消息都將被重復處理。
異步提交
手動提交有一個不足之處在對提交請求作出回應之前,應用程序會阻塞,這會限制應用程序的吞吐量。
可以通過降低提交頻率來提升吞吐,但如果發(fā)生了再均衡, 會增加重復消息的數(shù)量。
可以使用異步提交。只管發(fā)送提交請求,無需等待 broker 的響應。
commit.Async();
在成功提交或碰到無怯恢復的錯誤之前, 同步方法會一直重試,但是異步方法不會。
同步和異步組合提交
般情況下,針對偶爾出現(xiàn)的提交失敗,不進行重試不會有太大問題。
但如果這是發(fā)生在關閉消費者或均衡前的最后一次提交,就要確保能夠提交成功。 在消費者關閉前一般會組合使用兩種方式。
總結
以上是生活随笔為你收集整理的kafka 同步提交 异步_极限MQ (5) Kafka 消费者的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: powwr shell_Powershe
- 下一篇: 大疆口袋云台 最大存储卡_佳能云台相机专