日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

kafka 同步提交 异步_极限MQ (5) Kafka 消费者

發(fā)布時(shí)間:2024/9/19 57 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 同步提交 异步_极限MQ (5) Kafka 消费者 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

要想知道如何從 Kafka 讀取消息,需要先了解消費(fèi)者和消費(fèi)者群組的概念。

假設(shè)我們有一個(gè)應(yīng)用程序需要從 Kafka 主題讀取消息井驗(yàn)證這些消息,然后再把它們保存起來(lái)。應(yīng)用程序需要?jiǎng)?chuàng)建一個(gè)消費(fèi)者對(duì)象,訂閱主題并開(kāi)始接收消息,然后驗(yàn)證消息井保存結(jié)果。

過(guò)了一陣子,生產(chǎn)者往主題寫(xiě)入消息的速度超過(guò)了應(yīng)用程序驗(yàn)證數(shù)據(jù)的速度,這個(gè)時(shí)候該怎么辦?如果只使用單個(gè)消費(fèi)者處理消息,應(yīng)用程序會(huì)永遠(yuǎn)跟不上消息生成的速度。顯然,此時(shí)很有必要對(duì)消費(fèi)者進(jìn)行橫向伸縮。就像多個(gè)生產(chǎn)者可以向相同的主題寫(xiě)入消息一樣,我們也可以使用多個(gè)消費(fèi)者從同一個(gè)主題讀取消息,對(duì)消息進(jìn)行分流。

Kafka 消費(fèi)者從屬于消費(fèi)者群組。一個(gè)群組里的消費(fèi)者訂閱的是同一個(gè)主題,每個(gè)消費(fèi)者只接收主題的部分分區(qū)的消息。

  • 假設(shè)主題 T1 有4 個(gè)分區(qū),我們創(chuàng)建了消費(fèi)者 C1 ,它是群組 G1 里唯一的消費(fèi)者,我們用它訂閱主題 T1 。那么消費(fèi)者 C1 將收到主題 T1 全部 4 個(gè)分區(qū)的消息。
  • 如果在群組 G1 里新增一個(gè)消費(fèi)者 C2 ,那么每個(gè)消費(fèi)者將分別從兩個(gè)分區(qū)接收消息。消費(fèi)者 C1 接收分區(qū) 1 和分區(qū) 2 的消息,消費(fèi)者 C2 接收分區(qū) 3 和分區(qū) 4 的消息。
  • 如果群組有 4 個(gè)消費(fèi)者,那么每個(gè)消費(fèi)者可以分配到一個(gè)分區(qū)。
  • 如果我們往群組里添加更多的消費(fèi)者,甚至超過(guò)主題的分區(qū)數(shù)量,那么多出來(lái)的消費(fèi)者就會(huì)被 閑置,不會(huì)接收到任何消息。

往群組里增加消費(fèi)者是橫向伸縮消費(fèi)能力的主要方式。

Kafka 消費(fèi)者經(jīng)常會(huì)做一些高延遲的操作,比如把數(shù)據(jù)寫(xiě)到數(shù)據(jù)庫(kù)或 HDFS ,或者使用數(shù)據(jù)進(jìn)行比較耗時(shí)的計(jì)算。在這些情況下,單個(gè)消費(fèi)者無(wú)法跟上數(shù)據(jù)生成的速度,所以可以增加更多的消費(fèi)者,讓它們分擔(dān) 載,每個(gè)消費(fèi)者只處理部分分區(qū)的消息,這就是橫向伸縮的主要手段。

我們有必要為主題創(chuàng)建大量的分區(qū),這樣在負(fù)載增長(zhǎng)時(shí)可以加入更多的消費(fèi)者。不過(guò)不要讓消費(fèi)者的數(shù)量超過(guò)主題分區(qū)的數(shù)量,多余的消費(fèi)者只會(huì)被閑置。

除了通過(guò)增加消費(fèi)者來(lái)橫向伸縮單個(gè)應(yīng)用程序外,還經(jīng)常出現(xiàn)多個(gè)應(yīng)用程序從同主題讀取數(shù)據(jù)的情況。

實(shí)際上, Kafka 設(shè)計(jì)的主要目標(biāo)之一 ,就是要讓 Kafka 主題里的數(shù)據(jù)能夠滿(mǎn)足企業(yè)各種應(yīng)用場(chǎng)景的需求。在這些場(chǎng)景里,每個(gè)應(yīng)用程序可以獲取到所有的消息, 而不只是其中的一部分。那么只要保證每個(gè)應(yīng)用程序有自己的消費(fèi)者群組,就可以讓它們獲取到主題所有的消息。不同于傳統(tǒng)的消息系統(tǒng),橫向伸縮 Kafka 消費(fèi)者和消費(fèi)者群組并不對(duì)性能造成負(fù)面影響。

在上面的例子里,如果新增一個(gè)只包含一個(gè)消費(fèi)者的群組 G2 ,那么這個(gè)消費(fèi)者將從主題 T1上接收所有的消息,與群組 G1之間互不影響。群組 G2 可以增加更多的消費(fèi)者,每個(gè)消費(fèi)者可以消費(fèi)若干個(gè)分區(qū),就像群組 G1 那樣。

簡(jiǎn)而言之,為每一個(gè)需要獲取一個(gè)或多個(gè)主題全部消息的應(yīng)用程序創(chuàng)建一個(gè)消費(fèi)者群組, 然后往群組里添加消費(fèi)者來(lái)伸縮讀取能力和處理能力,群組里的每個(gè)消費(fèi)者只處理部分消息。


然而,分區(qū)并非如此簡(jiǎn)單。當(dāng)一個(gè)群組的消費(fèi)者增加和減少時(shí),當(dāng)這個(gè)主題的分區(qū)數(shù)量變化時(shí),有很多問(wèn)題要處理。

分區(qū)的所有權(quán)從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者,這樣的行為被稱(chēng)為再均衡。

再均衡非常重要, 它為群組帶來(lái)了高可用性和伸縮性(可以放心地添加或移除梢費(fèi)者)。

不過(guò)在正常情況下,我們并不希望發(fā)生這樣的行為。在再均衡期間,消費(fèi)者無(wú)法讀取消息,造成整個(gè)群組小段時(shí)間的不可用。

另外,當(dāng)分區(qū)被重新分配給一個(gè)消費(fèi)者時(shí),消費(fèi)者當(dāng)前的讀取狀態(tài)會(huì)丟失,它有可能還需要去刷新緩存,在它重新恢復(fù)狀態(tài)之前會(huì)拖慢程序。


分配分區(qū)是怎樣進(jìn)行的?

當(dāng)消費(fèi)者要加入群組時(shí),它會(huì)向群組協(xié)調(diào)器發(fā)送 Join Group 請(qǐng)求。第一個(gè)加入群組的消費(fèi)者將成為“群主”。群主從協(xié)調(diào)器那里獲得群組的成員列表(列表中包含了所有最近發(fā)送過(guò)心跳的消費(fèi)者,它們被認(rèn)為是活躍的), 并負(fù)責(zé)給每一個(gè)消費(fèi)者分配分區(qū)。

消費(fèi)者通過(guò)向被指派為群組協(xié)調(diào)器的 broker (不同的群組可以有不同的協(xié)調(diào)器)發(fā)送心跳來(lái)維持它們和群組的從屬關(guān)系以及它們對(duì)分區(qū)的所有權(quán)關(guān)系。只要消費(fèi)者以正常的時(shí)間間隔發(fā)送心跳,就被認(rèn)為是活躍的,說(shuō)明它還在讀取分區(qū)里的消息。消費(fèi)者會(huì)在輪詢(xún)消息 (為了獲取消息)或提交偏移量時(shí)發(fā)送心跳。如果消費(fèi)者停止發(fā)送心跳的時(shí)間足夠長(zhǎng),會(huì)話(huà)就會(huì)過(guò)期,群組協(xié)調(diào)器認(rèn)為它已經(jīng)死亡,就會(huì)觸發(fā)一次再均衡。


創(chuàng)建消費(fèi)者對(duì)象與創(chuàng)建生產(chǎn)者對(duì)象非常相似。

唯一不同的屬性是 group.i.d ,而且甚至不是必填的,不過(guò)我們現(xiàn)在姑且認(rèn)為它是必需 。它指定了屬于哪 個(gè)消費(fèi)者群組。創(chuàng)建不屬于任何一個(gè)群組的消費(fèi)者也是可以的,只是這樣不太常見(jiàn)。訂閱主題非常簡(jiǎn)單,甚至支持正則。

consumer .subscribe( "test.*" );

輪詢(xún)

輪訓(xùn)是消費(fèi)者 API 的核心,通過(guò)一個(gè)簡(jiǎn)單的輪詢(xún)向服務(wù)器請(qǐng)求數(shù)據(jù)。一旦消費(fèi)者訂閱 了主題,輪詢(xún)就會(huì)處理所有的細(xì)節(jié),包括群組協(xié)調(diào)、分區(qū)再均衡、發(fā)送心跳和獲取數(shù)據(jù),使用者只需要使用一組簡(jiǎn)單的 API 來(lái)處理從分區(qū)返回的數(shù)據(jù)即可。消費(fèi)者代碼的主要部分如下所示:

try{while (true) {ConsumerRecords<String String> records = consumer.poll(100);for(ConsumerRecord<String String> record :records){int updatedCount = 1;if(custCountryMap.countainsValue(record.value()) {updatedCount = custCountryMap.get (record.value() + 1);}custCountryMap.put(record.value(), updatedCount)System.out.println(custCountryMap);} finally {consumer.close(); }
  • 一個(gè)無(wú)線(xiàn)循環(huán)。
  • poll 方法非常重要。就像鯊魚(yú)停止移動(dòng)就會(huì)死掉一樣,消費(fèi)者必須持續(xù)對(duì) Kafka 進(jìn)行輪詢(xún),否則會(huì)被認(rèn)為己經(jīng)死亡 ,其負(fù)責(zé)的分區(qū)會(huì)被移交給群組里的其他消費(fèi)者。傳給 poll 方法 參數(shù)是一個(gè)超時(shí)時(shí)間,用于控制 poll 方法的阻塞時(shí)間。如果該參數(shù)被設(shè)為 0, poll 會(huì)立即返回。
  • poll 返回的是記錄列表。每條記錄都包含了記錄所屬主題的信息、分區(qū)的信息、分區(qū)偏移量 ,以及記錄的鍵值對(duì)。一般會(huì)遍歷這個(gè)列表 ,逐條處理這些記錄。
  • 一般落地的處理結(jié)果就是結(jié)果保存起來(lái)或者對(duì)已有的記錄進(jìn)行更新,處理過(guò)程也隨之結(jié)束。
  • 在退出應(yīng)用程序之前會(huì)觸發(fā)一次再均衡,而不是等待群組協(xié)調(diào)器發(fā)現(xiàn)它不再發(fā)送心跳井認(rèn)定它已死亡, 因?yàn)槟菢有枰L(zhǎng)的時(shí)間,導(dǎo)致整個(gè)群組在一段時(shí)間內(nèi)無(wú)法讀取消息。

在第一次調(diào)用新消費(fèi)者的 poll 方法時(shí),它會(huì)負(fù)責(zé)查找 GroupCoordinator 然后加入群組,接受分配分區(qū)。 如果發(fā)生了再均衡,整個(gè)過(guò)程也會(huì)在輪詢(xún)期間進(jìn)行 。當(dāng)然 ,心跳也會(huì)在輪詢(xún)里發(fā)迭出去的。

注意,一個(gè)消費(fèi)者活在一個(gè)獨(dú)立的線(xiàn)程里。


提交和偏移量

Kafka 不會(huì)像其它隊(duì)列那樣需要得到消費(fèi)者的確認(rèn),這是 Kafka 的獨(dú)特之處。

我們把更新分區(qū)當(dāng)前位置的操作叫作提交。

那么消費(fèi)者是如何提交偏移量的呢?

消費(fèi)者往一個(gè)叫作 _consumer_offset d 特殊主題發(fā)送消息,消息里包含每個(gè)分區(qū)的偏移量。

如果消費(fèi)者一直處于運(yùn)行狀態(tài),那么偏移量就沒(méi)有什么用處。不過(guò),如果發(fā)生崩壞或者有新的消費(fèi)者加入群組,就會(huì)觸發(fā)再均衡,完成再均衡之后,每個(gè)消費(fèi)者可能分配到新的分區(qū),而不是之前處理的那個(gè)。

為了能夠繼續(xù)之前的工作,消費(fèi)者需要讀取每個(gè)分區(qū)最后一次提交的偏移量,然后從偏移量指定的地方繼續(xù)處理。

但是,

如果提交的偏移量小于客戶(hù)端處理的最后一個(gè)消息的偏移量 ,那么處于兩個(gè)偏移量之間的消息就會(huì)被重復(fù)處理。

如果提交的偏移量大于客戶(hù)端處理的最后一個(gè)消息的偏移量,那么處于兩個(gè)偏移量之間的消息將會(huì)丟失。

處理偏移量的方式對(duì)客戶(hù)端會(huì)有很大的影響。


自動(dòng)提交

最簡(jiǎn)單的提交方式是讓悄費(fèi)者自動(dòng)提交偏移量。

如果 enable.auto.commit 被設(shè)為 true ,那么每過(guò)5s,消費(fèi)者會(huì)自動(dòng)把從 poll 方法接收到的最大偏移量提交上去。自動(dòng)提交也是在輪詢(xún)里進(jìn)行的。

可是,假設(shè)使用默認(rèn)的 5s 提交時(shí)間間隔,在最近一次提交之后的 3s 發(fā)生了再均衡。再均衡之后,消費(fèi)者從最后一次提交的偏移量位置開(kāi)始讀取消息。這個(gè)時(shí)候偏移量已經(jīng)落后 3s ,所以在這 3s 內(nèi)到達(dá)的消息會(huì)被重復(fù)處理。

自動(dòng)提交不能避免重復(fù)消息。


開(kāi)發(fā)者同步提交當(dāng)前偏移量

把 enable.auto.commit 設(shè)為 false ,讓?xiě)?yīng)用程序決定時(shí)提交偏移量。使用 commit.Sync() 提交偏移量。

commit.Sync() 將會(huì)提交由 poll 返回的最新偏移量。

如果發(fā)生了再均衡,從最近一批消息到發(fā)生再均衡之間的所有消息都將被重復(fù)處理。


異步提交

手動(dòng)提交有一個(gè)不足之處在對(duì)提交請(qǐng)求作出回應(yīng)之前,應(yīng)用程序會(huì)阻塞,這會(huì)限制應(yīng)用程序的吞吐量。

可以通過(guò)降低提交頻率來(lái)提升吞吐,但如果發(fā)生了再均衡, 會(huì)增加重復(fù)消息的數(shù)量。

可以使用異步提交。只管發(fā)送提交請(qǐng)求,無(wú)需等待 broker 的響應(yīng)。

commit.Async();

在成功提交或碰到無(wú)怯恢復(fù)的錯(cuò)誤之前, 同步方法會(huì)一直重試,但是異步方法不會(huì)。


同步和異步組合提交

般情況下,針對(duì)偶爾出現(xiàn)的提交失敗,不進(jìn)行重試不會(huì)有太大問(wèn)題。

但如果這是發(fā)生在關(guān)閉消費(fèi)者或均衡前的最后一次提交,就要確保能夠提交成功。 在消費(fèi)者關(guān)閉前一般會(huì)組合使用兩種方式。

總結(jié)

以上是生活随笔為你收集整理的kafka 同步提交 异步_极限MQ (5) Kafka 消费者的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。